diff --git a/data/txt/sha256sums.txt b/data/txt/sha256sums.txt index 17ec05495..abab556e8 100644 --- a/data/txt/sha256sums.txt +++ b/data/txt/sha256sums.txt @@ -181,15 +181,15 @@ c2db614a3ce7dda889152bea8bd6d709e5d8c2b556741fdbfe44469f27ce266b lib/core/enums 5387168e5dfedd94ae22af7bb255f27d6baaca50b24179c6b98f4f325f5cc7b4 lib/core/exception.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/core/__init__.py 914a13ee21fd610a6153a37cbe50830fcbd1324c7ebc1e7fc206d5e598b0f7ad lib/core/log.py -4fe3ac4c0d354d1ac42ad3f5dc1b308993588f8a249ff880d273f5031d6b52b0 lib/core/optiondict.py -ca3d9185aa5418cdfc79f43beb4ad6f6503496763f349ecef57fff278bcfc8c8 lib/core/option.py +91cc64c3dadf05eae666fcbbb0cd44c8ed8dd60592334b419ec8748cdded5f30 lib/core/optiondict.py +227716f876f3af24e2c5ae4818d1e3b9bc17627f1876d66bcefc4953e660f1af lib/core/option.py 21b2b1745107c211fc7593923a3da7a808d40763c00091c28de5f7c129bcf3bc lib/core/patch.py 49c0fa7e3814dfda610d665ee02b12df299b28bc0b6773815b4395514ddf8dec lib/core/profiling.py 0c36a65b6237732eb001d333f80f0c58c088ff01ae80cf07e4dcc6da2a806364 lib/core/readlineng.py 9bf174058f15d14e24e94f9aaf42df045119d3617c6c54bd2f3af79b462f331d lib/core/replication.py 0b8c38a01bb01f843d94a6c5f2075ee47520d0c4aa799cecea9c3e2c5a4a23a6 lib/core/revision.py 888daba83fd4a34e9503fe21f01fef4cc730e5cde871b1d40e15d4cbc847d56c lib/core/session.py -5fa3141353791446463a215a5481048346aa0f1dde08f1fe8fa6834a22aa23c1 lib/core/settings.py +1769800f72aa1e88c885ffb641e6e816d7d569b8c4a554bf7c7de821961a5235 lib/core/settings.py c7804223319e18eb0b8e2cbf0a8b6896d1cefb7b0b1a2e9f1cf826a8a3b56750 lib/core/shell.py a2e98a94b231432736d6b304fc75525c8b5fdb4768c418387c5b4c1a610dad64 lib/core/subprocessng.py 15d36cdac9389d0a54a6c33fbb89f32bb65e303f50de573773dcb6d4618bca64 lib/core/target.py @@ -200,12 +200,13 @@ b9aacb840310173202f79c2ba125b0243003ee6b44c92eca50424f2bdfc83c02 lib/core/unesc 2400e465fa4d13e4c32795910878c71ff212e4361b46428d57ce43983f5e997c lib/core/wordlist.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/__init__.py 54bfd31ebded3ffa5848df1c644f196eb704116517c7a3d860b5d081e984d821 lib/parse/banner.py -2b1ccf7adab06d64784639ba4db9772cc7bd3de30ad52513d4350fbf798082ed lib/parse/cmdline.py +1a67c8e0c46fb1244535d3961c35300da4aecd1872fd1fe2e3a752a5643875ed lib/parse/cmdline.py 02d82e4069bd98c52755417f8b8e306d79945672656ac24f1a45e7a6eff4b158 lib/parse/configfile.py c5b258be7485089fac9d9cd179960e774fbd85e62836dc67cce76cc028bb6aeb lib/parse/handler.py 5c9a9caee948843d5537745640cc7b98d70a0412cc0949f59d4ebe8b2907c06c lib/parse/headers.py ea9b195e5f5030b96d1993c106c1e13fb5c7faaf6bdc5daacfd06ec984e7f323 lib/parse/html.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/parse/__init__.py +9cb95cc5136d5ac624860578099929fdb335face41026f79f49df4f52da9805d lib/parse/openapi.py d2e771cdacef25ee3fdc0e0355b92e7cd1b68f5edc2756ffc19f75d183ba2c73 lib/parse/payloads.py c2f34e27578742e729c2fa9c1d4f0a0d8f8f7f4cf0fc14c62ec817a260c71dec lib/parse/sitemap.py 1be3da334411657461421b8a26a0f2ff28e1af1e28f1e963c6c92768f9b0847c lib/request/basicauthhandler.py @@ -631,6 +632,7 @@ d539d0ae758b5bb91e314ab82ab4fe03d6fb2f8b377d16aefa6d7d1d77a7d5a9 tests/test_ide caa06fed7323b2bb6d0f2443ce343de94f75bf8ad012c055d5e07741d908ebad tests/test_misc.py 790b78c600b61eb0bdd6e07e14b1db3eb2ddd5fc5d4edb9e975f85ced38558c7 tests/test_nosql.py 88a8c7ce0ba0ca721dffbcf9351cd07f7e471ad2fe667a10608c18952b09868d tests/test_openapi_drift.py +a0d173bb595ffbd2b49ee7fb1519d9898aefc262f2565923c4fe41bbc06f57e0 tests/test_openapi.py 6e63ed05db0490148d1c8428d785a23b0d5d5a0f566cd397c9c4a8fe8a6ed7dc tests/test_option.py cde0bea1263ae857561f91ed2bd515e972b716743f017d31b1718a8546c72759 tests/test_pagecontent.py 7554a918309cf0f2cd8a63a3bb7659708f13beffbcd5ce498ece9f9167d55c97 tests/test_parse_modules.py diff --git a/lib/core/option.py b/lib/core/option.py index e69067f68..8fd9c491d 100644 --- a/lib/core/option.py +++ b/lib/core/option.py @@ -492,6 +492,65 @@ def _setBulkMultipleTargets(): warnMsg = "no usable links found (with GET parameters)" logger.warning(warnMsg) +def _setOpenApiTargets(): + if not conf.openApiFile: + return + + from lib.parse.openapi import openApiTargets + + if conf.method: + warnMsg = "option '--method' will override the HTTP method(s) derived from the OpenAPI/Swagger specification" + logger.warning(warnMsg) + + origin = None + if re.match(r"(?i)\Ahttps?://", conf.openApiFile): + infoMsg = "fetching OpenAPI/Swagger specification from '%s'" % conf.openApiFile + logger.info(infoMsg) + from lib.request.connect import Connect as Request + content = Request.getPage(url=conf.openApiFile, raise404=True)[0] + match = re.match(r"(?i)(https?://[^/]+)", conf.openApiFile) + origin = match.group(1) if match else None + else: + conf.openApiFile = safeExpandUser(conf.openApiFile) + checkFile(conf.openApiFile) + infoMsg = "parsing OpenAPI/Swagger specification from '%s'" % conf.openApiFile + logger.info(infoMsg) + content = openFile(conf.openApiFile).read() + + try: + targets = openApiTargets(content, origin) + except ValueError as ex: + errMsg = "unable to parse the OpenAPI/Swagger specification ('%s')" % getSafeExString(ex) + raise SqlmapSyntaxException(errMsg) + + if re.search(r"(?i)securitySchemes|securityDefinitions", content) and not any((conf.authType, conf.authCred, conf.authFile)) and not any((_[0] or "").lower() == HTTP_HEADER.AUTHORIZATION.lower() for _ in (conf.httpHeaders or [])): + warnMsg = "the OpenAPI/Swagger specification declares authentication (security schemes) but no credentials were provided. " + warnMsg += "If the API requires authentication, requests are likely to be rejected. Provide credentials with " + warnMsg += "'--auth-type'/'--auth-cred' or a header (e.g. --headers=\"Authorization: Bearer ...\")" + logger.warning(warnMsg) + + before = len(kb.targets) # openapi carries per-target bodies -> no conf.data fallback + mutating = 0 + for url, method, data, headers in targets: + if conf.scope and not re.search(conf.scope, url, re.I): + continue + if method not in ("GET", "HEAD", "OPTIONS"): + mutating += 1 + kb.targets.add((url, method, data, conf.cookie, tuple(headers) if headers else None)) + + added = len(kb.targets) - before + if added: + conf.multipleTargets = True + infoMsg = "derived %d target(s) from the OpenAPI/Swagger specification" % added + logger.info(infoMsg) + if mutating: + warnMsg = "%d of the derived target(s) use state-changing HTTP methods (e.g. POST/PUT/PATCH/DELETE). " % mutating + warnMsg += "Scanning them may create, modify or delete server-side data" + logger.warning(warnMsg) + else: + warnMsg = "no usable targets derived from the OpenAPI/Swagger specification" + logger.warning(warnMsg) + def _findPageForms(): if not conf.forms or conf.crawlDepth: return @@ -1852,7 +1911,7 @@ def _cleanupOptions(): if conf.tmpPath: conf.tmpPath = ntToPosixSlashes(normalizePath(conf.tmpPath)) - if any((conf.googleDork, conf.logFile, conf.bulkFile, conf.forms, conf.crawlDepth, conf.stdinPipe)): + if any((conf.googleDork, conf.logFile, conf.bulkFile, conf.forms, conf.crawlDepth, conf.stdinPipe, conf.openApiFile)): conf.multipleTargets = True if conf.optimize: @@ -2728,8 +2787,8 @@ def _basicOptionValidation(): errMsg += "'SQLMAP_UNSAFE_EVAL=1' to be explicitly set" raise SqlmapSystemException(errMsg) - if conf.chunked and not any((conf.data, conf.requestFile, conf.forms)): - errMsg = "switch '--chunked' requires usage of (POST) options/switches '--data', '-r' or '--forms'" + if conf.chunked and not any((conf.data, conf.requestFile, conf.forms, conf.openApiFile)): + errMsg = "switch '--chunked' requires usage of (POST) options/switches '--data', '-r', '--forms' or '--openapi'" raise SqlmapSyntaxException(errMsg) if conf.api and not conf.configFile: @@ -3022,7 +3081,7 @@ def init(): parseTargetDirect() - if any((conf.url, conf.logFile, conf.bulkFile, conf.requestFile, conf.googleDork, conf.stdinPipe)): + if any((conf.url, conf.logFile, conf.bulkFile, conf.requestFile, conf.googleDork, conf.stdinPipe, conf.openApiFile)): _setHostname() _setHTTPTimeout() _setHTTPExtraHeaders() @@ -3038,6 +3097,7 @@ def init(): _doSearch() _setStdinPipeTargets() _setBulkMultipleTargets() + _setOpenApiTargets() _checkTor() _setCrawler() _findPageForms() diff --git a/lib/core/optiondict.py b/lib/core/optiondict.py index 21c6cfa37..d449259df 100644 --- a/lib/core/optiondict.py +++ b/lib/core/optiondict.py @@ -19,6 +19,7 @@ optDict = { "sessionFile": "string", "googleDork": "string", "configFile": "string", + "openApiFile": "string", }, "Request": { diff --git a/lib/core/settings.py b/lib/core/settings.py index d39b04e52..50535bacb 100644 --- a/lib/core/settings.py +++ b/lib/core/settings.py @@ -20,7 +20,7 @@ from lib.core.enums import OS from thirdparty import six # sqlmap version (...) -VERSION = "1.10.7.16" +VERSION = "1.10.7.17" TYPE = "dev" if VERSION.count('.') > 2 and VERSION.split('.')[-1] != '0' else "stable" TYPE_COLORS = {"dev": 33, "stable": 90, "pip": 34} VERSION_STRING = "sqlmap/%s#%s" % ('.'.join(VERSION.split('.')[:-1]) if VERSION.count('.') > 2 and VERSION.split('.')[-1] == '0' else VERSION, TYPE) diff --git a/lib/parse/cmdline.py b/lib/parse/cmdline.py index dde875d91..e8ddc2d4f 100644 --- a/lib/parse/cmdline.py +++ b/lib/parse/cmdline.py @@ -144,6 +144,9 @@ def cmdLineParser(argv=None): target.add_argument("-c", dest="configFile", help="Load options from a configuration INI file") + target.add_argument("--openapi", dest="openApiFile", + help="Derive targets from an OpenAPI/Swagger specification (file or URL)") + # Request options request = parser.add_argument_group("Request", "These options can be used to specify how to connect to the target URL") @@ -1172,7 +1175,7 @@ def cmdLineParser(argv=None): else: args.stdinPipe = None - if not any((args.direct, args.url, args.logFile, args.bulkFile, args.googleDork, args.configFile, args.requestFile, args.updateAll, args.smokeTest, args.vulnTest, args.fpTest, args.apiTest, args.wizard, args.dependencies, args.purge, args.listTampers, args.hashFile, args.stdinPipe)): + if not any((args.direct, args.url, args.logFile, args.bulkFile, args.googleDork, args.configFile, args.requestFile, args.openApiFile, args.updateAll, args.smokeTest, args.vulnTest, args.fpTest, args.apiTest, args.wizard, args.dependencies, args.purge, args.listTampers, args.hashFile, args.stdinPipe)): errMsg = "missing a mandatory option (-d, -u, -l, -m, -r, -g, -c, --wizard, --shell, --update, --purge, --list-tampers or --dependencies). " errMsg += "Use -h for basic and -hh for advanced help\n" parser.error(errMsg) diff --git a/lib/parse/openapi.py b/lib/parse/openapi.py new file mode 100644 index 000000000..996b5ece6 --- /dev/null +++ b/lib/parse/openapi.py @@ -0,0 +1,361 @@ +#!/usr/bin/env python + +""" +Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org) +See the file 'LICENSE' for copying permission +""" + +import json +import re + +from lib.core.common import getSafeExString +from lib.core.data import logger +from lib.core.enums import HTTP_HEADER +from lib.core.settings import CUSTOM_INJECTION_MARK_CHAR +from thirdparty import six +from thirdparty.six.moves.urllib.parse import quote as _quote + +try: + import yaml # optional (only needed for YAML specs) +except ImportError: + yaml = None + +# Best-effort extraction of concrete request targets from an OpenAPI (v3) / Swagger (v2) document. The +# document is treated as a request generator, NOT a contract to validate: for every operation a single +# concrete request is synthesized (base URL + filled path + example query/body from the schema) and any +# operation that cannot be built is skipped with a warning, so a loose/incomplete spec degrades gracefully. + +MAX_REF_DEPTH = 25 + +def _loadSpec(content): + try: + return json.loads(content) + except ValueError: + if yaml is None: + errMsg = "the provided OpenAPI/Swagger specification is not JSON and the optional " + errMsg += "'pyyaml' module (needed for YAML specifications) is not available" + raise ValueError(errMsg) + try: + return yaml.safe_load(content) + except Exception as ex: + raise ValueError("not valid JSON nor YAML (%s)" % getSafeExString(ex)) + +def _resolve(spec, node, seen=None, depth=0): + seen = seen or set() + if isinstance(node, dict) and "$ref" in node: + ref = node["$ref"] + if not isinstance(ref, six.string_types): # malformed '$ref' (non-string) -> treat as no ref + return {} + if ref in seen or depth > MAX_REF_DEPTH: + return {} + if not ref.startswith("#/"): + logger.warning("skipping external OpenAPI $ref '%s'" % ref) + return {} + seen = seen | set([ref]) + current = spec + for part in ref[2:].split('/'): + part = part.replace("~1", "/").replace("~0", "~") + if not isinstance(current, dict) or part not in current: + logger.warning("skipping dangling OpenAPI $ref '%s'" % ref) + return {} + current = current[part] + return _resolve(spec, current, seen, depth + 1) + return node + +EXAMPLE_MAX_DEPTH = 8 # request examples do not need deep nesting; caps runaway synthesis on large specs + +def _example(spec, schema, seen=None, depth=0, cache=None): + # 'cache' memoizes the synthesized example per $ref across the whole run - big real-world specs + # (Stripe/GitHub/k8s) reuse the same large schemas across thousands of operations, so without this + # the extraction is exponential. 'depth' caps recursion for deeply nested / self-referential schemas. + seen = seen or set() + if cache is None: + cache = {} + if depth > EXAMPLE_MAX_DEPTH: + return "1" + ref = schema.get("$ref") if isinstance(schema, dict) else None + if not isinstance(ref, six.string_types): # only a string $ref is a valid (hashable) cache key + ref = None + if ref is not None and ref in cache: + return cache[ref] + + schema = _resolve(spec, schema or {}, seen, depth) + if not isinstance(schema, dict): + return "1" + + value = None + if "example" in schema: + value = schema["example"] + elif "const" in schema: # JSON Schema 2020-12 (OpenAPI 3.1) + value = schema["const"] + elif "default" in schema: + value = schema["default"] + elif isinstance(schema.get("examples"), list) and schema["examples"]: + value = schema["examples"][0] + elif isinstance(schema.get("enum"), list) and schema["enum"]: + value = schema["enum"][0] + else: + combinator = next((_ for _ in ("allOf", "oneOf", "anyOf") if schema.get(_)), None) + if combinator: + if combinator == "allOf": + merged = {} + for sub in schema[combinator]: + part = _example(spec, sub, seen, depth + 1, cache) + if isinstance(part, dict): + merged.update(part) + value = merged if merged else _example(spec, schema[combinator][0], seen, depth + 1, cache) + else: + value = _example(spec, schema[combinator][0], seen, depth + 1, cache) + else: + _type = schema.get("type") + if isinstance(_type, list): # OpenAPI 3.1 allows a list of types (e.g. ["string", "null"]) + _type = next((_ for _ in _type if _ != "null"), None) + if _type == "object" or ("properties" in schema and not _type): + properties = schema.get("properties") + value = dict((name, _example(spec, sub, seen, depth + 1, cache)) for name, sub in (properties if isinstance(properties, dict) else {}).items()) + elif _type == "array": + value = [_example(spec, schema.get("items") or {}, seen, depth + 1, cache)] + elif _type in ("integer", "number"): + value = 1 + elif _type == "boolean": + value = True + elif _type == "string": + formats = {"uuid": "11111111-1111-1111-1111-111111111111", "date": "2020-01-01", "date-time": "2020-01-01T00:00:00Z", "email": "a@b.co", "byte": "MQ=="} + value = formats.get(schema.get("format"), "1") + else: + value = "1" + + if ref is not None: + cache[ref] = value + return value + +def _scalar(value): + if isinstance(value, bool): + return "true" if value else "false" + if isinstance(value, (int, float)): + return str(value) + if isinstance(value, six.string_types): + return value + try: + return json.dumps(value) + except TypeError: # e.g. datetime.date from a YAML 'example: 2020-01-01' + return str(value) + +_NO_EXAMPLE = object() + +def _explicitExample(spec, container): + # a concrete 'example'/'examples' declared on a parameter or media-type object - preferred over a + # schema-synthesized value (real specs carry the canonical, validation-passing sample here). 'examples' + # is a map of name -> {"value": ...} (each entry possibly a $ref). + if not isinstance(container, dict): + return _NO_EXAMPLE + if container.get("example") is not None: # 'null' -> treat as absent, fall back to schema synthesis + return container["example"] + examples = container.get("examples") + if isinstance(examples, dict) and examples: + first = _resolve(spec, next(iter(examples.values()))) + if isinstance(first, dict) and first.get("value") is not None: + return first["value"] + return _NO_EXAMPLE + +def _noMark(text): + # strip any custom injection mark already present in a synthesized value so only the intentionally + # appended mark (if any) survives (avoids a stray/second injection point) + return text.replace(CUSTOM_INJECTION_MARK_CHAR, "") + +def _headerClean(text): + # remove characters that can not legally appear in an HTTP header name/value (CR, LF, NUL and other + # C0 controls) so a spec-supplied header can not inject extra headers or corrupt the request line + return re.sub(r"[\x00-\x1f\x7f]", "", text) + +_HEADER_NAME_RE = re.compile(r"\A[!#$%&'*+.^_`|~0-9A-Za-z-]+\Z") # RFC 7230 header field-name token (no spaces / ':' / separators) + +def _urlSafe(value, safe=""): + # percent-encode a synthesized value/name so it can not break the URL/body structure (spaces, '&', + # '=', '/', '?', '#', ...); py2/py3-safe (py2 urllib.quote needs bytes for non-ASCII). 'safe' keeps + # selected chars unescaped (e.g. "[]" for deep-object parameter names like filter[status]). + try: + return _quote(value.encode("utf-8") if isinstance(value, six.text_type) else str(value), safe=safe) + except Exception: + return value + +def _baseUrl(spec, origin=None, servers=None): + # defensive throughout: a hostile/loose spec must not crash here (this runs outside the per-operation + # try/except, so an exception would abort the whole extraction). 'servers' overrides the spec-level + # 'servers' (used for per-path / per-operation 'servers'). + basePath = spec.get("basePath") if isinstance(spec.get("basePath"), six.string_types) else "" + if basePath and not basePath.startswith("/"): # Swagger v2 basePath is a path -> ensure it is slash-prefixed + basePath = "/" + basePath + servers = servers if servers is not None else spec.get("servers") + if isinstance(servers, list) and servers and isinstance(servers[0], dict): + url = servers[0].get("url") + url = url if isinstance(url, six.string_types) else "" + variables = servers[0].get("variables") + if isinstance(variables, dict): + for name, meta in variables.items(): + default = meta.get("default", "1") if isinstance(meta, dict) else "1" + url = url.replace("{%s}" % name, str(default)) + if re.match(r"(?i)[a-z][a-z0-9+.-]*://", url): # absolute server URL -> used as declared (the host is NOT rewritten to the spec's own origin) + return url.rstrip('/') + return ((origin.rstrip('/') if origin else "") + "/" + url.lstrip('/')).rstrip('/') # relative server URL -> resolved against origin + if spec.get("host"): # Swagger v2 with an explicit host + schemes = spec.get("schemes") + scheme = schemes[0] if isinstance(schemes, list) and schemes else "https" + return "%s://%s%s" % (scheme, spec["host"], basePath.rstrip('/')) + return (origin.rstrip('/') if origin else "") + basePath.rstrip('/') # no servers/host -> spec's own origin + +_METHODS = ("get", "post", "put", "delete", "patch", "options", "head") + +def openApiTargets(content, origin=None): + """ + Returns a list of (url, method, data, headers) request tuples derived from an OpenAPI/Swagger + specification. 'headers' is a list of (name, value) tuples (matching conf.httpHeaders). 'origin' + (scheme://host[:port] of the specification's own location) is used only to resolve RELATIVE 'servers' + entries - absolute server URLs are used as declared. Path parameters and header/cookie values carry + the custom injection mark so they become testable injection points. + """ + + spec = _loadSpec(content) + if not isinstance(spec, dict) or not isinstance(spec.get("paths"), dict) or not spec.get("paths"): + errMsg = "no valid 'paths' object found in the provided OpenAPI/Swagger specification" + raise ValueError(errMsg) + + try: + rootBase = _baseUrl(spec, origin) + except Exception: # never let base-URL synthesis abort the whole run + rootBase = origin.rstrip('/') if isinstance(origin, six.string_types) else "" + isV2 = "swagger" in spec and "openapi" not in spec + retVal = [] + cache = {} # $ref -> synthesized example, shared across all operations (large specs reuse schemas) + + for path, item in (spec.get("paths") or {}).items(): + item = _resolve(spec, item) # a Path Item object may itself be a $ref + if not isinstance(item, dict): + continue + shared = item.get("parameters") or [] # 'or []': a present-but-null 'parameters' must not break concatenation + for method, operation in item.items(): + if str(method).lower() not in _METHODS or not isinstance(operation, dict): # str(): YAML keys can be non-string (e.g. 404, 'on'->bool) + continue + try: + # effective base URL with OpenAPI precedence: operation 'servers' > path-item 'servers' > root + opServers = operation.get("servers") or item.get("servers") + base = rootBase + if opServers: + try: + base = _baseUrl(spec, origin, opServers) + except Exception: + base = rootBase + + # merge path-level + operation-level parameters, de-duplicated by (in, name); operation wins + params, seen = [], {} + for raw in ((shared if isinstance(shared, list) else []) + (operation.get("parameters") or [])): + resolved = _resolve(spec, raw) + if isinstance(resolved, dict) and resolved.get("name"): + key = (resolved.get("in"), resolved.get("name")) + if key in seen: + params[seen[key]] = resolved + continue + seen[key] = len(params) + params.append(resolved) + + urlPath = path if isinstance(path, six.string_types) else str(path) + query, headers, form, cookies = [], [], [], [] + + for param in params: + if not isinstance(param, dict): + continue + location, name = param.get("in"), param.get("name") + if not name: + continue + if not isinstance(name, six.string_types): # YAML can yield a non-string param name (e.g. 5) + name = str(name) + explicit = _explicitExample(spec, param) # parameter-level example/examples wins over schema synthesis + if explicit is not _NO_EXAMPLE: + value = _scalar(explicit) + else: + schema = param.get("schema") or {"type": param.get("type", "string")} + value = _scalar(_example(spec, schema, cache=cache)) + if location == "path": + # mark the filled path segment as a (custom) URI injection point - path parameters are + # prime REST injection targets; the value is encoded first so its own chars add no mark + urlPath = urlPath.replace("{%s}" % name, _urlSafe(value) + CUSTOM_INJECTION_MARK_CHAR) + elif location == "query": + # best-effort: array/object query params are scalarized (single value), NOT expanded per + # OpenAPI style/explode (repeated keys, comma/space/pipe delimited, deepObject) - the goal + # is one testable request per operation, not faithful serialization + query.append("%s=%s" % (_urlSafe(name, "[]"), _urlSafe(value))) + elif location == "header": + # append the custom injection mark so the header value becomes a testable (custom) + # injection point (non-exclusive: query/body params are still auto-tested); skip names + # that are not valid HTTP field-name tokens + headerName = _headerClean(name) + if headerName and _HEADER_NAME_RE.match(headerName): + headers.append((headerName, "%s%s" % (_headerClean(_noMark(value)), CUSTOM_INJECTION_MARK_CHAR))) + elif location == "cookie": + # a cookie name is a token; the value must not contain cookie-structure chars ('; ,' + # and whitespace) or a spec could smuggle extra cookie pairs + cookieName = _headerClean(name) + if cookieName and _HEADER_NAME_RE.match(cookieName): + cookieValue = re.sub(r"[;,\s]", "", _headerClean(_noMark(value))) + cookies.append("%s=%s%s" % (cookieName, cookieValue, CUSTOM_INJECTION_MARK_CHAR)) + elif location == "formData": # Swagger v2 in:"formData" -> urlencoded body field + form.append("%s=%s" % (_urlSafe(name, "[]"), _urlSafe(value))) + + if cookies: # aggregate all cookie params into a single Cookie header + headers.append((HTTP_HEADER.COOKIE, "; ".join(cookies))) + + urlPath = urlPath.replace(" ", "%20").replace("?", "%3F").replace("#", "%23") # keep a literal path key from breaking the URL (filled values are already encoded) + if urlPath and not urlPath.startswith("/"): # OpenAPI path keys start with '/'; harden a loose spec so base+path is not glued (/v1pets) + urlPath = "/" + urlPath + + url = base + urlPath + if query: + url += "?" + "&".join(query) + + url = re.sub(r"\{[^}]+\}", "1", url) # any leftover template var (undefined path OR server variable) -> "1" + + if not re.match(r"(?i)[a-z][a-z0-9+.-]*://", url): # no scheme/host -> unscannable relative URL + logger.warning("skipping OpenAPI operation '%s %s' (unable to resolve an absolute target URL; provide the specification by URL or add a 'servers'/'host' entry)" % (str(method).upper(), path)) + continue + + data = None + body = _resolve(spec, operation.get("requestBody") or {}) + content_ = body.get("content") if isinstance(body, dict) else None + if isinstance(content_, dict) and content_: + mediaTypes = [_ for _ in content_ if isinstance(_, six.string_types)] # media-type keys must be strings + picked = next((_ for _ in mediaTypes if _ == "application/json" or _.endswith("+json") or "json" in _), None) \ + or ("application/x-www-form-urlencoded" if "application/x-www-form-urlencoded" in mediaTypes else None) \ + or (mediaTypes[0] if mediaTypes else None) + if picked: + mediaType = content_[picked] if isinstance(content_[picked], dict) else {} + example = _explicitExample(spec, mediaType) # media-type-level example/examples wins over schema synthesis + if example is _NO_EXAMPLE: + example = _example(spec, mediaType.get("schema") or {}, cache=cache) + if "json" in picked: + data = _noMark(json.dumps(example, default=str)) + headers.append((HTTP_HEADER.CONTENT_TYPE, "application/json")) + elif picked == "application/x-www-form-urlencoded" and isinstance(example, dict): + data = "&".join("%s=%s" % (_urlSafe(name, "[]"), _urlSafe(_scalar(value))) for name, value in example.items()) + headers.append((HTTP_HEADER.CONTENT_TYPE, "application/x-www-form-urlencoded")) + elif isinstance(example, six.string_types): + # raw (text / xml / ...) body -> mark it so the whole body becomes a testable point + data = _noMark(example) + CUSTOM_INJECTION_MARK_CHAR + headers.append((HTTP_HEADER.CONTENT_TYPE, picked)) + else: # e.g. multipart/form-data or a structured non-JSON body (no safe serialization) + logger.debug("not synthesizing a '%s' request body for '%s %s'" % (picked, str(method).upper(), path)) + elif isinstance(operation.get("parameters"), list) or isV2: + for param in params: # Swagger v2 in:"body" + if isinstance(param, dict) and param.get("in") == "body": + example = _example(spec, param.get("schema") or {}, cache=cache) + data = _noMark(json.dumps(example, default=str)) + headers.append((HTTP_HEADER.CONTENT_TYPE, "application/json")) + + if data is None and form: # Swagger v2 in:"formData" fields -> urlencoded body + data = "&".join(form) + headers.append((HTTP_HEADER.CONTENT_TYPE, "application/x-www-form-urlencoded")) + + retVal.append((url, str(method).upper(), data, headers or None)) + except Exception as ex: + logger.warning("skipping OpenAPI operation '%s %s' (%s)" % (str(method).upper(), path, getSafeExString(ex))) + + return retVal diff --git a/tests/test_openapi.py b/tests/test_openapi.py new file mode 100644 index 000000000..40c8cd930 --- /dev/null +++ b/tests/test_openapi.py @@ -0,0 +1,456 @@ +#!/usr/bin/env python + +""" +Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org) +See the file 'LICENSE' for copying permission + +Unit coverage for the OpenAPI/Swagger target extractor (lib/parse/openapi.py): schema example +synthesis, $ref resolution (incl. cycles), base-URL resolution (v2 + v3, relative/templated servers), +request-body handling (JSON / form), parameter->PLACE mapping, and (importantly) graceful handling of +malformed / poorly-defined specifications (a broken spec must never crash or hang the parser). + +stdlib unittest only (no pytest / no pip); works on Python 2.7 and 3.x. +""" + +import json +import os +import sys +import unittest + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from _testutils import bootstrap +bootstrap() + +from lib.parse.openapi import openApiTargets, yaml as _yaml + +HAS_YAML = _yaml is not None + + +def _targets(spec, origin="http://h"): + return openApiTargets(json.dumps(spec) if isinstance(spec, dict) else spec, origin) + +def _byMethodPath(targets): + return dict(("%s %s" % (method, url), (method, url, data, headers)) for url, method, data, headers in targets) + + +class TestOpenApi(unittest.TestCase): + def test_v3_query_path_and_base(self): + spec = {"openapi": "3.0.0", "servers": [{"url": "/api"}], + "paths": {"/pet/{id}": {"get": {"parameters": [ + {"name": "id", "in": "path", "schema": {"type": "integer"}}, + {"name": "q", "in": "query", "schema": {"type": "string", "example": "x"}}]}}}} + targets = _targets(spec, "http://host:8080") + self.assertEqual(len(targets), 1) + url, method, data, headers = targets[0] + self.assertEqual(method, "GET") + from lib.core.settings import CUSTOM_INJECTION_MARK_CHAR as MARK + self.assertEqual(url, "http://host:8080/api/pet/1%s?q=x" % MARK) # relative server + filled+marked path + query + self.assertIsNone(data) + + def test_v3_json_body_sets_data_and_content_type(self): + spec = {"openapi": "3.0.0", "paths": {"/o": {"post": {"requestBody": {"content": {"application/json": + {"schema": {"type": "object", "properties": {"name": {"type": "string"}, "qty": {"type": "integer"}}}}}}}}}} + url, method, data, headers = _targets(spec)[0] + self.assertEqual(method, "POST") + self.assertEqual(json.loads(data), {"name": "1", "qty": 1}) + self.assertIn(("Content-Type", "application/json"), headers) + + def test_form_urlencoded_body(self): + spec = {"openapi": "3.0.0", "paths": {"/login": {"post": {"requestBody": {"content": + {"application/x-www-form-urlencoded": {"schema": {"type": "object", + "properties": {"u": {"type": "string"}, "p": {"type": "string"}}}}}}}}}} + url, method, data, headers = _targets(spec)[0] + self.assertEqual(sorted(data.split("&")), ["p=1", "u=1"]) + + def test_value_synthesis(self): + spec = {"openapi": "3.0.0", "paths": {"/x": {"get": {"parameters": [ + {"name": "a", "in": "query", "schema": {"type": "integer"}}, + {"name": "b", "in": "query", "schema": {"type": "boolean"}}, + {"name": "c", "in": "query", "schema": {"type": "string", "enum": ["first", "second"]}}, + {"name": "d", "in": "query", "schema": {"type": "string", "default": "dd"}}, + {"name": "e", "in": "query", "schema": {"type": "string", "format": "uuid"}}]}}}} + url = _targets(spec)[0][0] + self.assertIn("a=1", url) + self.assertIn("b=true", url) + self.assertIn("c=first", url) # enum[0] + self.assertIn("d=dd", url) # default + self.assertIn("e=11111111-1111-1111-1111-111111111111", url) # format uuid + + def test_ref_resolution_and_allof_oneof(self): + spec = {"openapi": "3.0.0", + "components": {"schemas": {"Tag": {"type": "object", "properties": {"n": {"type": "string"}}}}}, + "paths": { + "/ref": {"post": {"requestBody": {"content": {"application/json": {"schema": {"$ref": "#/components/schemas/Tag"}}}}}}, + "/all": {"post": {"requestBody": {"content": {"application/json": {"schema": {"allOf": [ + {"type": "object", "properties": {"x": {"type": "string"}}}, + {"type": "object", "properties": {"y": {"type": "integer"}}}]}}}}}}, + "/one": {"post": {"requestBody": {"content": {"application/json": {"schema": {"oneOf": [ + {"type": "object", "properties": {"only": {"type": "string"}}}, + {"type": "object", "properties": {"other": {"type": "string"}}}]}}}}}}}} + m = _byMethodPath(_targets(spec)) + self.assertEqual(json.loads(m["POST http://h/ref"][2]), {"n": "1"}) + self.assertEqual(json.loads(m["POST http://h/all"][2]), {"x": "1", "y": 1}) # allOf merged + self.assertEqual(json.loads(m["POST http://h/one"][2]), {"only": "1"}) # oneOf -> first + + def test_ref_cycle_terminates(self): + spec = {"openapi": "3.0.0", + "components": {"schemas": {"Node": {"type": "object", "properties": { + "name": {"type": "string"}, "parent": {"$ref": "#/components/schemas/Node"}}}}}, + "paths": {"/n": {"post": {"requestBody": {"content": {"application/json": + {"schema": {"$ref": "#/components/schemas/Node"}}}}}}}} + targets = _targets(spec) # must not hang / recurse forever + self.assertEqual(len(targets), 1) + self.assertTrue(json.loads(targets[0][2]).get("name") == "1") + + def test_swagger_v2_base_and_body(self): + spec = {"swagger": "2.0", "host": "api.example.com", "basePath": "/v2", "schemes": ["https"], + "paths": {"/pet": {"post": {"parameters": [{"name": "b", "in": "body", + "schema": {"type": "object", "properties": {"id": {"type": "integer"}}}}]}}}} + url, method, data, headers = _targets(spec, None)[0] + self.assertEqual(url, "https://api.example.com/v2/pet") + self.assertEqual(json.loads(data), {"id": 1}) + + def test_server_template_variables(self): + spec = {"openapi": "3.0.0", "servers": [{"url": "https://{env}.x.io/{ver}", + "variables": {"env": {"default": "prod"}, "ver": {"default": "v3"}}}], + "paths": {"/p": {"get": {}}}} + self.assertEqual(_targets(spec, None)[0][0], "https://prod.x.io/v3/p") + + def test_headers_are_hashable_tuples(self): + # kb.targets is an OrderedSet, so the emitted headers must be hashable (tuple, not list) + spec = {"openapi": "3.0.0", "paths": {"/x": {"get": {"parameters": [ + {"name": "h", "in": "header", "schema": {"type": "string"}}]}}}} + headers = _targets(spec)[0][3] + self.assertTrue(headers is None or isinstance(tuple(headers), tuple)) + + def test_header_and_cookie_params_are_injection_marked(self): + # header/cookie params get the custom injection mark ('*') appended so they become testable + # (custom) injection points (query/body params are still auto-tested alongside them) + from lib.core.settings import CUSTOM_INJECTION_MARK_CHAR as MARK + spec = {"openapi": "3.0.0", "paths": {"/x": {"get": {"parameters": [ + {"name": "X-Api", "in": "header", "schema": {"type": "string", "example": "k"}}, + {"name": "sess", "in": "cookie", "schema": {"type": "string", "example": "v"}}]}}}} + headers = dict(_targets(spec)[0][3]) + self.assertEqual(headers["X-Api"], "k" + MARK) + self.assertEqual(headers["Cookie"], "sess=v" + MARK) + + # --- graceful degradation: a broken/poorly-defined spec must never crash the parser --- + + def test_malformed_raises_valueerror(self): + for bad in ("{not json,,,", "[1,2,3]", "{}", '{"openapi":"3.0.0"}', '{"openapi":"3.0.0","paths":[1,2]}'): + self.assertRaises(ValueError, openApiTargets, bad, "http://h") + + def test_malformed_servers_do_not_crash(self): + for servers in ('{"url":"/a"}', '"http://h"', "[]"): + spec = '{"openapi":"3.0.0","servers":%s,"paths":{"/x":{"get":{}}}}' % servers + self.assertEqual(len(openApiTargets(spec, "http://h")), 1) # no crash, still one target + + def test_url_and_body_values_are_encoded(self): + # special characters in synthesized values must be percent-encoded so they can not break the + # URL structure (param smuggling) or the form body + spec = {"openapi": "3.0.0", "paths": { + "/x/{p}": {"get": {"parameters": [ + {"name": "p", "in": "path", "schema": {"type": "string", "example": "a/b"}}, + {"name": "q", "in": "query", "schema": {"type": "string", "example": "a b&c=d"}}]}}, + "/f": {"post": {"requestBody": {"content": {"application/x-www-form-urlencoded": + {"schema": {"type": "object", "properties": {"u": {"type": "string", "example": "a b&x"}}}}}}}}}} + byMethod = dict((method, (url, data)) for url, method, data, headers in _targets(spec)) + getUrl = byMethod["GET"][0] + self.assertIn("/x/a%2Fb", getUrl) # path value '/' encoded (no extra segment) + self.assertIn("q=a%20b%26c%3Dd", getUrl) # query value space/&/= encoded (no smuggling) + self.assertNotIn(" ", getUrl) + self.assertEqual(byMethod["POST"][1], "u=a%20b%26x") + + @unittest.skipUnless(HAS_YAML, "pyyaml not available") + def test_yaml_spec(self): + y = ("openapi: 3.0.0\n" + "paths:\n" + " /y:\n" + " get:\n" + " parameters:\n" + " - name: q\n" + " in: query\n" + " schema: {type: string, example: hi}\n") + targets = openApiTargets(y, "http://h") + self.assertEqual(len(targets), 1) + self.assertEqual(targets[0][0], "http://h/y?q=hi") + + def test_shared_recursive_refs_scale(self): + # a self-referential schema reused across many operations must terminate promptly (depth cap + + # per-$ref memoization); without them this would blow up exponentially and hang the test + schemas = {"Node": {"type": "object", "properties": { + "name": {"type": "string"}, + "child": {"$ref": "#/components/schemas/Node"}, + "list": {"type": "array", "items": {"$ref": "#/components/schemas/Node"}}}}} + paths = dict(("/n%d" % i, {"post": {"requestBody": {"content": {"application/json": + {"schema": {"$ref": "#/components/schemas/Node"}}}}}}) for i in range(60)) + targets = _targets({"openapi": "3.0.0", "components": {"schemas": schemas}, "paths": paths}) + self.assertEqual(len(targets), 60) + self.assertEqual(json.loads(targets[0][2]).get("name"), "1") + + def test_swagger_v2_formdata_body(self): + # in:"formData" params must become a urlencoded body (previously dropped -> empty POST) + spec = {"swagger": "2.0", "host": "h", "paths": {"/l": {"post": {"parameters": [ + {"name": "u", "in": "formData", "type": "string"}, + {"name": "p", "in": "formData", "type": "string"}]}}}} + url, method, data, headers = _targets(spec, None)[0] + self.assertEqual(method, "POST") + self.assertEqual(sorted(data.split("&")), ["p=1", "u=1"]) + + def test_relative_base_is_skipped(self): + # a spec that yields no scheme/host (relative server + no origin) must be skipped, not emitted + spec = {"openapi": "3.0.0", "servers": [{"url": "/api"}], "paths": {"/x": {"get": {}}}} + self.assertEqual(openApiTargets(json.dumps(spec), None), []) # relative -> skipped + self.assertEqual(len(openApiTargets(json.dumps(spec), "http://h")), 1) # absolute with origin -> kept + + def test_unsupported_body_media_type_no_crash(self): + # a structured body under a non-JSON/form media type must not crash and must not fabricate a body, + # but the endpoint URL is still produced + spec = {"openapi": "3.0.0", "paths": {"/x": {"post": {"requestBody": {"content": {"application/xml": + {"schema": {"type": "object", "properties": {"a": {"type": "string"}}}}}}}}}} + url, method, data, headers = _targets(spec)[0] + self.assertEqual((url, method, data), ("http://h/x", "POST", None)) + + def test_injection_mark_char_in_value_is_not_doubled(self): + # an example value already containing the custom injection mark must not create a stray point + from lib.core.settings import CUSTOM_INJECTION_MARK_CHAR as MARK + spec = {"openapi": "3.0.0", "paths": {"/x": {"post": { + "parameters": [{"name": "H", "in": "header", "schema": {"type": "string", "example": "a%sb" % MARK}}], + "requestBody": {"content": {"application/json": {"schema": {"type": "object", + "properties": {"n": {"type": "string", "example": "x%sy" % MARK}}}}}}}}}} + url, method, data, headers = _targets(spec)[0] + self.assertEqual(dict(headers)["H"], "ab" + MARK) # single trailing mark only + self.assertEqual(json.loads(data), {"n": "xy"}) # mark stripped from body value + + @unittest.skipUnless(HAS_YAML, "pyyaml not available") + def test_non_string_method_keys_do_not_crash(self): + # YAML path-item keys are not guaranteed to be strings (404 -> int, on -> bool); must not crash + y = ("openapi: 3.0.0\n" + "servers: [{url: 'http://h'}]\n" + "paths:\n" + " /x:\n" + " get: {}\n" + " 404: {}\n" + " on: {}\n") + targets = openApiTargets(y, "http://h") + self.assertEqual(len(targets), 1) # only the real GET operation + self.assertEqual(targets[0][1], "GET") + + def test_hostile_base_url_metadata_does_not_crash(self): + # _baseUrl runs once, OUTSIDE the per-operation try, so malformed server/scheme/basePath metadata + # must not raise (it would abort the entire extraction) + hostile = [ + {"openapi": "3.0.0", "servers": [{"url": "https://{e}.x/", "variables": [1, 2]}], "paths": {"/x": {"get": {}}}}, + {"openapi": "3.0.0", "servers": [{"url": "https://{e}.x/", "variables": {"e": "prod"}}], "paths": {"/x": {"get": {}}}}, + {"openapi": "3.0.0", "servers": [{"url": 123}], "paths": {"/x": {"get": {}}}}, + {"swagger": "2.0", "host": "h", "schemes": {"a": 1}, "paths": {"/x": {"get": {}}}}, + {"swagger": "2.0", "host": "h", "basePath": 123, "paths": {"/x": {"get": {}}}}] + for spec in hostile: + self.assertEqual(len(_targets(spec)), 1) # no crash, still one target + + def test_param_entry_not_a_dict_is_skipped(self): + spec = {"openapi": "3.0.0", "paths": {"/x": {"get": {"parameters": ["oops", {"name": "q", "in": "query"}]}}}} + self.assertIn("q=1", _targets(spec)[0][0]) # bad entry skipped, good one still used + + @unittest.skipUnless(HAS_YAML, "pyyaml not available") + def test_yaml_date_examples_serialize(self): + # unquoted YAML dates parse to datetime.date, which is not JSON-serializable -> must be stringified, + # not silently dropped (dates are pervasive in real specs) + y = ("openapi: 3.0.0\n" + "servers: [{url: 'http://h'}]\n" + "paths:\n" + " /x:\n" + " post:\n" + " requestBody:\n" + " content:\n" + " application/json:\n" + " schema: {type: object, properties: {created: {type: string, example: 2020-01-01}}}\n") + url, method, data, headers = openApiTargets(y, "http://h")[0] + self.assertEqual(json.loads(data), {"created": "2020-01-01"}) + + def test_crlf_in_header_and_cookie_is_stripped(self): + # a spec-supplied header/cookie name or value must not carry CR/LF (header injection / request + # corruption); query/path values are separately percent-encoded + spec = {"openapi": "3.0.0", "paths": {"/x": {"get": {"parameters": [ + {"name": "X-A", "in": "header", "schema": {"type": "string", "example": "a\r\nX-Evil: 1"}}, + {"name": "X\r\nB", "in": "header", "schema": {"type": "string", "example": "v"}}, + {"name": "sid", "in": "cookie", "schema": {"type": "string", "example": "a\r\nSet: x"}}]}}}} + headers = dict(_targets(spec)[0][3]) + for name, value in headers.items(): + self.assertNotIn("\r", name + value) + self.assertNotIn("\n", name + value) + self.assertIn("X-A", headers) + self.assertIn("XB", headers) # control chars removed from the name + + def test_explicit_examples_preferred_over_schema(self): + # a concrete example/examples on the media-type or parameter object must win over schema synthesis + # (real specs carry the canonical, validation-passing value there) + body = {"openapi": "3.0.0", "paths": {"/x": {"post": {"requestBody": {"content": {"application/json": { + "schema": {"type": "object", "properties": {"name": {"type": "string"}}}, "example": {"name": "real"}}}}}}}} + self.assertEqual(json.loads(_targets(body)[0][2]), {"name": "real"}) + examples = {"openapi": "3.0.0", "paths": {"/x": {"post": {"requestBody": {"content": {"application/json": { + "schema": {"type": "object"}, "examples": {"first": {"value": {"k": "v1"}}}}}}}}}} + self.assertEqual(json.loads(_targets(examples)[0][2]), {"k": "v1"}) + param = {"openapi": "3.0.0", "paths": {"/x": {"get": {"parameters": [ + {"name": "q", "in": "query", "example": "E", "schema": {"type": "string"}}]}}}} + self.assertIn("q=E", _targets(param)[0][0]) + + def test_openapi_31_const_and_type_array(self): + spec = {"openapi": "3.1.0", "paths": {"/x": {"get": {"parameters": [ + {"name": "c", "in": "query", "schema": {"const": "CV"}}, + {"name": "n", "in": "query", "schema": {"type": ["integer", "null"]}}]}}}} + url = _targets(spec)[0][0] + self.assertIn("c=CV", url) # const used + self.assertIn("n=1", url) # ["integer","null"] resolved to integer, not the generic fallback + + def test_parameter_names_are_encoded(self): + # a param NAME with structural chars must be encoded so it can not split/smuggle params or truncate + # at a fragment; deep-object brackets ([]) are preserved + spec = {"openapi": "3.0.0", "paths": { + "/q": {"get": {"parameters": [ + {"name": "a&b=c", "in": "query", "schema": {"type": "string"}}, + {"name": "a#b", "in": "query", "schema": {"type": "string"}}, + {"name": "filter[status]", "in": "query", "schema": {"type": "string"}}]}}, + "/f": {"post": {"requestBody": {"content": {"application/x-www-form-urlencoded": + {"schema": {"type": "object", "properties": {"x&y": {"type": "string"}}}}}}}}}} + byMethod = dict((method, (url, data)) for url, method, data, headers in _targets(spec)) + getUrl = byMethod["GET"][0] + self.assertIn("a%26b%3Dc=1", getUrl) + self.assertIn("a%23b=1", getUrl) + self.assertIn("filter[status]=1", getUrl) # brackets kept (deep-object param names) + self.assertNotIn("#", getUrl) + self.assertEqual(byMethod["POST"][1], "x%26y=1") + + def test_undefined_template_var_does_not_leak(self): + # a server/path template variable with no definition must not leave a literal '{...}' in the URL + spec = {"openapi": "3.0.0", "servers": [{"url": "https://api.x.com/{basePath}/v3"}], + "paths": {"/pets": {"get": {}}}} + url = _targets(spec, "http://h")[0][0] + self.assertNotIn("{", url) + self.assertEqual(url, "https://api.x.com/1/v3/pets") # absolute server used as-is (host not rewritten) + + def test_absolute_server_url_is_not_rewritten_to_origin(self): + # a spec served from one host but declaring an absolute API server on another host must scan the + # DECLARED API host, not the spec's origin + spec = {"openapi": "3.0.0", "servers": [{"url": "https://api.example.com/v1"}], + "paths": {"/pets": {"get": {}}}} + self.assertEqual(_targets(spec, "https://docs.example.com")[0][0], "https://api.example.com/v1/pets") + + def test_path_parameter_is_injection_marked(self): + from lib.core.settings import CUSTOM_INJECTION_MARK_CHAR as MARK + spec = {"openapi": "3.0.0", "paths": {"/users/{id}": {"get": {"parameters": [ + {"name": "id", "in": "path", "schema": {"type": "integer"}}]}}}} + self.assertEqual(_targets(spec)[0][0], "http://h/users/1" + MARK) + + def test_form_urlencoded_sets_content_type_and_multipart_skipped(self): + form = {"openapi": "3.0.0", "paths": {"/f": {"post": {"requestBody": {"content": + {"application/x-www-form-urlencoded": {"schema": {"type": "object", "properties": {"u": {"type": "string"}}}}}}}}}} + url, method, data, headers = _targets(form)[0] + self.assertEqual(data, "u=1") + self.assertIn(("Content-Type", "application/x-www-form-urlencoded"), headers) + multipart = {"openapi": "3.0.0", "paths": {"/m": {"post": {"requestBody": {"content": + {"multipart/form-data": {"schema": {"type": "object", "properties": {"u": {"type": "string"}}}}}}}}}} + url, method, data, headers = _targets(multipart)[0] + self.assertIsNone(data) # multipart is skipped, not mis-serialized as urlencoded + + def test_path_item_ref_is_resolved(self): + spec = {"openapi": "3.1.0", + "components": {"pathItems": {"Ping": {"get": {"parameters": [ + {"name": "q", "in": "query", "schema": {"type": "string", "example": "z"}}]}}}}, + "paths": {"/ping": {"$ref": "#/components/pathItems/Ping"}}} + targets = _targets(spec) + self.assertEqual(len(targets), 1) + self.assertIn("q=z", targets[0][0]) + + def test_operation_parameter_overrides_path_level(self): + spec = {"openapi": "3.0.0", "paths": {"/x": { + "parameters": [{"name": "q", "in": "query", "schema": {"type": "string", "example": "shared"}}], + "get": {"parameters": [{"name": "q", "in": "query", "schema": {"type": "string", "example": "op"}}]}}}} + url = _targets(spec)[0][0] + self.assertIn("q=op", url) # operation value wins + self.assertEqual(url.count("q="), 1) # not duplicated + + def test_multiple_cookies_aggregate_into_one_header(self): + from lib.core.settings import CUSTOM_INJECTION_MARK_CHAR as MARK + spec = {"openapi": "3.0.0", "paths": {"/x": {"get": {"parameters": [ + {"name": "a", "in": "cookie", "schema": {"type": "string"}}, + {"name": "b", "in": "cookie", "schema": {"type": "string"}}]}}}} + headers = _targets(spec)[0][3] + cookieHeaders = [v for (k, v) in headers if k == "Cookie"] + self.assertEqual(cookieHeaders, ["a=1%s; b=1%s" % (MARK, MARK)]) # one aggregated Cookie header + + def test_cookie_name_value_cannot_smuggle_pairs(self): + # a cookie name that is not a token is dropped; structural chars in the value ('; ,' / whitespace) + # are stripped so a spec can not inject additional cookie pairs + spec = {"openapi": "3.0.0", "paths": {"/x": {"get": {"parameters": [ + {"name": "a; injected", "in": "cookie", "schema": {"type": "string"}}, + {"name": "sid", "in": "cookie", "schema": {"type": "string", "example": "v; z=1"}}]}}}} + cookieHeaders = [v for (k, v) in (_targets(spec)[0][3] or []) if k == "Cookie"] + self.assertEqual(len(cookieHeaders), 1) + cookie = cookieHeaders[0] + self.assertNotIn(";", cookie.rstrip("*")) # no interior ';' -> no smuggled pair + self.assertNotIn("injected", cookie) # invalid cookie name dropped + self.assertNotIn(" ", cookie) + + def test_loose_path_without_leading_slash(self): + # a malformed path key missing its leading '/' must not glue onto the base (".../v1pets") + spec = {"openapi": "3.0.0", "servers": [{"url": "https://api.x/v1"}], "paths": {"pets": {"get": {}}}} + self.assertEqual(_targets(spec, None)[0][0], "https://api.x/v1/pets") + + def test_array_query_param_is_best_effort_scalar(self): + # documents current best-effort behavior: an array query param is scalarized+encoded, NOT expanded + # per style/explode. If richer serialization is added later, update this expectation deliberately. + spec = {"openapi": "3.0.0", "paths": {"/x": {"get": {"parameters": [ + {"name": "ids", "in": "query", "schema": {"type": "array", "items": {"type": "integer"}}}]}}}} + url = _targets(spec)[0][0] + self.assertIn("ids=", url) + self.assertNotIn(" ", url) # whatever the encoding, it must not break the URL + self.assertTrue(url.startswith("http://h/x?ids=")) + + def test_invalid_header_name_is_skipped(self): + spec = {"openapi": "3.0.0", "paths": {"/x": {"get": {"parameters": [ + {"name": "Bad Name", "in": "header", "schema": {"type": "string"}}, + {"name": "Also:Bad", "in": "header", "schema": {"type": "string"}}, + {"name": "X-Good", "in": "header", "schema": {"type": "string"}}]}}}} + headers = dict(_targets(spec)[0][3] or []) + self.assertIn("X-Good", headers) + self.assertNotIn("Bad Name", headers) + self.assertNotIn("Also:Bad", headers) + + def test_explicit_null_example_falls_back_to_schema(self): + # 'example: null' must not serialize as null/"null" - fall back to schema synthesis + q = {"openapi": "3.0.0", "paths": {"/x": {"get": {"parameters": [ + {"name": "q", "in": "query", "example": None, "schema": {"type": "string", "example": "good"}}]}}}} + self.assertIn("q=good", _targets(q)[0][0]) + b = {"openapi": "3.0.0", "paths": {"/x": {"post": {"requestBody": {"content": {"application/json": + {"example": None, "schema": {"type": "object", "properties": {"a": {"type": "integer"}}}}}}}}}} + self.assertEqual(json.loads(_targets(b)[0][2]), {"a": 1}) + + def test_degrade_not_skip_on_odd_shapes(self): + # enum-as-dict, non-string param name, and content[type]-as-list must degrade (op preserved) + for spec in ( + {"openapi": "3.0.0", "paths": {"/x": {"get": {"parameters": [{"name": "q", "in": "query", "schema": {"enum": {"a": 1}}}]}}}}, + {"openapi": "3.0.0", "paths": {"/x": {"get": {"parameters": [{"name": 5, "in": "header", "schema": {"type": "string"}}]}}}}, + {"openapi": "3.0.0", "paths": {"/x": {"post": {"requestBody": {"content": {"application/json": [1, 2]}}}}}}): + self.assertEqual(len(_targets(spec)), 1) + + def test_malformed_ref_and_properties_degrade_not_skip(self): + # a non-string/unhashable $ref or a non-dict 'properties' must degrade the value (not lose the op) + for schema in ({"$ref": 123}, {"$ref": [1, 2]}, {"type": "object", "properties": [1, 2]}): + spec = {"openapi": "3.0.0", "paths": {"/x": {"post": {"requestBody": + {"content": {"application/json": {"schema": schema}}}}}}} + self.assertEqual(len(_targets(spec)), 1) # operation preserved, not skipped + + def test_undefined_bits_are_skipped_not_fatal(self): + spec = {"openapi": "3.0.0", "paths": { + "/a": {"get": {"parameters": [{}]}}, # param with no name + "/b": {"post": {"requestBody": {"content": {"application/json": + {"schema": {"$ref": "#/components/schemas/DoesNotExist"}}}}}}, # dangling $ref + "/c": {"get": {"parameters": [{"name": "p", "in": "query", + "schema": {"$ref": "https://other/x.json#/Y"}}]}}}} # external $ref + targets = _targets(spec) + self.assertEqual(len(targets), 3) # all three still produced + + +if __name__ == "__main__": + unittest.main()