Adding support for --openapi

This commit is contained in:
Miroslav Štampar 2026-07-02 21:12:46 +02:00
parent d6299fc4f5
commit fe69e6bfcc
7 changed files with 893 additions and 10 deletions

View file

@ -181,15 +181,15 @@ c2db614a3ce7dda889152bea8bd6d709e5d8c2b556741fdbfe44469f27ce266b lib/core/enums
5387168e5dfedd94ae22af7bb255f27d6baaca50b24179c6b98f4f325f5cc7b4 lib/core/exception.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/core/__init__.py
914a13ee21fd610a6153a37cbe50830fcbd1324c7ebc1e7fc206d5e598b0f7ad lib/core/log.py
4fe3ac4c0d354d1ac42ad3f5dc1b308993588f8a249ff880d273f5031d6b52b0 lib/core/optiondict.py
ca3d9185aa5418cdfc79f43beb4ad6f6503496763f349ecef57fff278bcfc8c8 lib/core/option.py
91cc64c3dadf05eae666fcbbb0cd44c8ed8dd60592334b419ec8748cdded5f30 lib/core/optiondict.py
227716f876f3af24e2c5ae4818d1e3b9bc17627f1876d66bcefc4953e660f1af lib/core/option.py
21b2b1745107c211fc7593923a3da7a808d40763c00091c28de5f7c129bcf3bc lib/core/patch.py
49c0fa7e3814dfda610d665ee02b12df299b28bc0b6773815b4395514ddf8dec lib/core/profiling.py
0c36a65b6237732eb001d333f80f0c58c088ff01ae80cf07e4dcc6da2a806364 lib/core/readlineng.py
9bf174058f15d14e24e94f9aaf42df045119d3617c6c54bd2f3af79b462f331d lib/core/replication.py
0b8c38a01bb01f843d94a6c5f2075ee47520d0c4aa799cecea9c3e2c5a4a23a6 lib/core/revision.py
888daba83fd4a34e9503fe21f01fef4cc730e5cde871b1d40e15d4cbc847d56c lib/core/session.py
5fa3141353791446463a215a5481048346aa0f1dde08f1fe8fa6834a22aa23c1 lib/core/settings.py
1769800f72aa1e88c885ffb641e6e816d7d569b8c4a554bf7c7de821961a5235 lib/core/settings.py
c7804223319e18eb0b8e2cbf0a8b6896d1cefb7b0b1a2e9f1cf826a8a3b56750 lib/core/shell.py
a2e98a94b231432736d6b304fc75525c8b5fdb4768c418387c5b4c1a610dad64 lib/core/subprocessng.py
15d36cdac9389d0a54a6c33fbb89f32bb65e303f50de573773dcb6d4618bca64 lib/core/target.py
@ -200,12 +200,13 @@ b9aacb840310173202f79c2ba125b0243003ee6b44c92eca50424f2bdfc83c02 lib/core/unesc
2400e465fa4d13e4c32795910878c71ff212e4361b46428d57ce43983f5e997c lib/core/wordlist.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/__init__.py
54bfd31ebded3ffa5848df1c644f196eb704116517c7a3d860b5d081e984d821 lib/parse/banner.py
2b1ccf7adab06d64784639ba4db9772cc7bd3de30ad52513d4350fbf798082ed lib/parse/cmdline.py
1a67c8e0c46fb1244535d3961c35300da4aecd1872fd1fe2e3a752a5643875ed lib/parse/cmdline.py
02d82e4069bd98c52755417f8b8e306d79945672656ac24f1a45e7a6eff4b158 lib/parse/configfile.py
c5b258be7485089fac9d9cd179960e774fbd85e62836dc67cce76cc028bb6aeb lib/parse/handler.py
5c9a9caee948843d5537745640cc7b98d70a0412cc0949f59d4ebe8b2907c06c lib/parse/headers.py
ea9b195e5f5030b96d1993c106c1e13fb5c7faaf6bdc5daacfd06ec984e7f323 lib/parse/html.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/parse/__init__.py
9cb95cc5136d5ac624860578099929fdb335face41026f79f49df4f52da9805d lib/parse/openapi.py
d2e771cdacef25ee3fdc0e0355b92e7cd1b68f5edc2756ffc19f75d183ba2c73 lib/parse/payloads.py
c2f34e27578742e729c2fa9c1d4f0a0d8f8f7f4cf0fc14c62ec817a260c71dec lib/parse/sitemap.py
1be3da334411657461421b8a26a0f2ff28e1af1e28f1e963c6c92768f9b0847c lib/request/basicauthhandler.py
@ -631,6 +632,7 @@ d539d0ae758b5bb91e314ab82ab4fe03d6fb2f8b377d16aefa6d7d1d77a7d5a9 tests/test_ide
caa06fed7323b2bb6d0f2443ce343de94f75bf8ad012c055d5e07741d908ebad tests/test_misc.py
790b78c600b61eb0bdd6e07e14b1db3eb2ddd5fc5d4edb9e975f85ced38558c7 tests/test_nosql.py
88a8c7ce0ba0ca721dffbcf9351cd07f7e471ad2fe667a10608c18952b09868d tests/test_openapi_drift.py
a0d173bb595ffbd2b49ee7fb1519d9898aefc262f2565923c4fe41bbc06f57e0 tests/test_openapi.py
6e63ed05db0490148d1c8428d785a23b0d5d5a0f566cd397c9c4a8fe8a6ed7dc tests/test_option.py
cde0bea1263ae857561f91ed2bd515e972b716743f017d31b1718a8546c72759 tests/test_pagecontent.py
7554a918309cf0f2cd8a63a3bb7659708f13beffbcd5ce498ece9f9167d55c97 tests/test_parse_modules.py

View file

@ -492,6 +492,65 @@ def _setBulkMultipleTargets():
warnMsg = "no usable links found (with GET parameters)"
logger.warning(warnMsg)
def _setOpenApiTargets():
if not conf.openApiFile:
return
from lib.parse.openapi import openApiTargets
if conf.method:
warnMsg = "option '--method' will override the HTTP method(s) derived from the OpenAPI/Swagger specification"
logger.warning(warnMsg)
origin = None
if re.match(r"(?i)\Ahttps?://", conf.openApiFile):
infoMsg = "fetching OpenAPI/Swagger specification from '%s'" % conf.openApiFile
logger.info(infoMsg)
from lib.request.connect import Connect as Request
content = Request.getPage(url=conf.openApiFile, raise404=True)[0]
match = re.match(r"(?i)(https?://[^/]+)", conf.openApiFile)
origin = match.group(1) if match else None
else:
conf.openApiFile = safeExpandUser(conf.openApiFile)
checkFile(conf.openApiFile)
infoMsg = "parsing OpenAPI/Swagger specification from '%s'" % conf.openApiFile
logger.info(infoMsg)
content = openFile(conf.openApiFile).read()
try:
targets = openApiTargets(content, origin)
except ValueError as ex:
errMsg = "unable to parse the OpenAPI/Swagger specification ('%s')" % getSafeExString(ex)
raise SqlmapSyntaxException(errMsg)
if re.search(r"(?i)securitySchemes|securityDefinitions", content) and not any((conf.authType, conf.authCred, conf.authFile)) and not any((_[0] or "").lower() == HTTP_HEADER.AUTHORIZATION.lower() for _ in (conf.httpHeaders or [])):
warnMsg = "the OpenAPI/Swagger specification declares authentication (security schemes) but no credentials were provided. "
warnMsg += "If the API requires authentication, requests are likely to be rejected. Provide credentials with "
warnMsg += "'--auth-type'/'--auth-cred' or a header (e.g. --headers=\"Authorization: Bearer ...\")"
logger.warning(warnMsg)
before = len(kb.targets) # openapi carries per-target bodies -> no conf.data fallback
mutating = 0
for url, method, data, headers in targets:
if conf.scope and not re.search(conf.scope, url, re.I):
continue
if method not in ("GET", "HEAD", "OPTIONS"):
mutating += 1
kb.targets.add((url, method, data, conf.cookie, tuple(headers) if headers else None))
added = len(kb.targets) - before
if added:
conf.multipleTargets = True
infoMsg = "derived %d target(s) from the OpenAPI/Swagger specification" % added
logger.info(infoMsg)
if mutating:
warnMsg = "%d of the derived target(s) use state-changing HTTP methods (e.g. POST/PUT/PATCH/DELETE). " % mutating
warnMsg += "Scanning them may create, modify or delete server-side data"
logger.warning(warnMsg)
else:
warnMsg = "no usable targets derived from the OpenAPI/Swagger specification"
logger.warning(warnMsg)
def _findPageForms():
if not conf.forms or conf.crawlDepth:
return
@ -1852,7 +1911,7 @@ def _cleanupOptions():
if conf.tmpPath:
conf.tmpPath = ntToPosixSlashes(normalizePath(conf.tmpPath))
if any((conf.googleDork, conf.logFile, conf.bulkFile, conf.forms, conf.crawlDepth, conf.stdinPipe)):
if any((conf.googleDork, conf.logFile, conf.bulkFile, conf.forms, conf.crawlDepth, conf.stdinPipe, conf.openApiFile)):
conf.multipleTargets = True
if conf.optimize:
@ -2728,8 +2787,8 @@ def _basicOptionValidation():
errMsg += "'SQLMAP_UNSAFE_EVAL=1' to be explicitly set"
raise SqlmapSystemException(errMsg)
if conf.chunked and not any((conf.data, conf.requestFile, conf.forms)):
errMsg = "switch '--chunked' requires usage of (POST) options/switches '--data', '-r' or '--forms'"
if conf.chunked and not any((conf.data, conf.requestFile, conf.forms, conf.openApiFile)):
errMsg = "switch '--chunked' requires usage of (POST) options/switches '--data', '-r', '--forms' or '--openapi'"
raise SqlmapSyntaxException(errMsg)
if conf.api and not conf.configFile:
@ -3022,7 +3081,7 @@ def init():
parseTargetDirect()
if any((conf.url, conf.logFile, conf.bulkFile, conf.requestFile, conf.googleDork, conf.stdinPipe)):
if any((conf.url, conf.logFile, conf.bulkFile, conf.requestFile, conf.googleDork, conf.stdinPipe, conf.openApiFile)):
_setHostname()
_setHTTPTimeout()
_setHTTPExtraHeaders()
@ -3038,6 +3097,7 @@ def init():
_doSearch()
_setStdinPipeTargets()
_setBulkMultipleTargets()
_setOpenApiTargets()
_checkTor()
_setCrawler()
_findPageForms()

View file

@ -19,6 +19,7 @@ optDict = {
"sessionFile": "string",
"googleDork": "string",
"configFile": "string",
"openApiFile": "string",
},
"Request": {

View file

@ -20,7 +20,7 @@ from lib.core.enums import OS
from thirdparty import six
# sqlmap version (<major>.<minor>.<month>.<monthly commit>)
VERSION = "1.10.7.16"
VERSION = "1.10.7.17"
TYPE = "dev" if VERSION.count('.') > 2 and VERSION.split('.')[-1] != '0' else "stable"
TYPE_COLORS = {"dev": 33, "stable": 90, "pip": 34}
VERSION_STRING = "sqlmap/%s#%s" % ('.'.join(VERSION.split('.')[:-1]) if VERSION.count('.') > 2 and VERSION.split('.')[-1] == '0' else VERSION, TYPE)

View file

@ -144,6 +144,9 @@ def cmdLineParser(argv=None):
target.add_argument("-c", dest="configFile",
help="Load options from a configuration INI file")
target.add_argument("--openapi", dest="openApiFile",
help="Derive targets from an OpenAPI/Swagger specification (file or URL)")
# Request options
request = parser.add_argument_group("Request", "These options can be used to specify how to connect to the target URL")
@ -1172,7 +1175,7 @@ def cmdLineParser(argv=None):
else:
args.stdinPipe = None
if not any((args.direct, args.url, args.logFile, args.bulkFile, args.googleDork, args.configFile, args.requestFile, args.updateAll, args.smokeTest, args.vulnTest, args.fpTest, args.apiTest, args.wizard, args.dependencies, args.purge, args.listTampers, args.hashFile, args.stdinPipe)):
if not any((args.direct, args.url, args.logFile, args.bulkFile, args.googleDork, args.configFile, args.requestFile, args.openApiFile, args.updateAll, args.smokeTest, args.vulnTest, args.fpTest, args.apiTest, args.wizard, args.dependencies, args.purge, args.listTampers, args.hashFile, args.stdinPipe)):
errMsg = "missing a mandatory option (-d, -u, -l, -m, -r, -g, -c, --wizard, --shell, --update, --purge, --list-tampers or --dependencies). "
errMsg += "Use -h for basic and -hh for advanced help\n"
parser.error(errMsg)

361
lib/parse/openapi.py Normal file
View file

@ -0,0 +1,361 @@
#!/usr/bin/env python
"""
Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org)
See the file 'LICENSE' for copying permission
"""
import json
import re
from lib.core.common import getSafeExString
from lib.core.data import logger
from lib.core.enums import HTTP_HEADER
from lib.core.settings import CUSTOM_INJECTION_MARK_CHAR
from thirdparty import six
from thirdparty.six.moves.urllib.parse import quote as _quote
try:
import yaml # optional (only needed for YAML specs)
except ImportError:
yaml = None
# Best-effort extraction of concrete request targets from an OpenAPI (v3) / Swagger (v2) document. The
# document is treated as a request generator, NOT a contract to validate: for every operation a single
# concrete request is synthesized (base URL + filled path + example query/body from the schema) and any
# operation that cannot be built is skipped with a warning, so a loose/incomplete spec degrades gracefully.
MAX_REF_DEPTH = 25
def _loadSpec(content):
try:
return json.loads(content)
except ValueError:
if yaml is None:
errMsg = "the provided OpenAPI/Swagger specification is not JSON and the optional "
errMsg += "'pyyaml' module (needed for YAML specifications) is not available"
raise ValueError(errMsg)
try:
return yaml.safe_load(content)
except Exception as ex:
raise ValueError("not valid JSON nor YAML (%s)" % getSafeExString(ex))
def _resolve(spec, node, seen=None, depth=0):
seen = seen or set()
if isinstance(node, dict) and "$ref" in node:
ref = node["$ref"]
if not isinstance(ref, six.string_types): # malformed '$ref' (non-string) -> treat as no ref
return {}
if ref in seen or depth > MAX_REF_DEPTH:
return {}
if not ref.startswith("#/"):
logger.warning("skipping external OpenAPI $ref '%s'" % ref)
return {}
seen = seen | set([ref])
current = spec
for part in ref[2:].split('/'):
part = part.replace("~1", "/").replace("~0", "~")
if not isinstance(current, dict) or part not in current:
logger.warning("skipping dangling OpenAPI $ref '%s'" % ref)
return {}
current = current[part]
return _resolve(spec, current, seen, depth + 1)
return node
EXAMPLE_MAX_DEPTH = 8 # request examples do not need deep nesting; caps runaway synthesis on large specs
def _example(spec, schema, seen=None, depth=0, cache=None):
# 'cache' memoizes the synthesized example per $ref across the whole run - big real-world specs
# (Stripe/GitHub/k8s) reuse the same large schemas across thousands of operations, so without this
# the extraction is exponential. 'depth' caps recursion for deeply nested / self-referential schemas.
seen = seen or set()
if cache is None:
cache = {}
if depth > EXAMPLE_MAX_DEPTH:
return "1"
ref = schema.get("$ref") if isinstance(schema, dict) else None
if not isinstance(ref, six.string_types): # only a string $ref is a valid (hashable) cache key
ref = None
if ref is not None and ref in cache:
return cache[ref]
schema = _resolve(spec, schema or {}, seen, depth)
if not isinstance(schema, dict):
return "1"
value = None
if "example" in schema:
value = schema["example"]
elif "const" in schema: # JSON Schema 2020-12 (OpenAPI 3.1)
value = schema["const"]
elif "default" in schema:
value = schema["default"]
elif isinstance(schema.get("examples"), list) and schema["examples"]:
value = schema["examples"][0]
elif isinstance(schema.get("enum"), list) and schema["enum"]:
value = schema["enum"][0]
else:
combinator = next((_ for _ in ("allOf", "oneOf", "anyOf") if schema.get(_)), None)
if combinator:
if combinator == "allOf":
merged = {}
for sub in schema[combinator]:
part = _example(spec, sub, seen, depth + 1, cache)
if isinstance(part, dict):
merged.update(part)
value = merged if merged else _example(spec, schema[combinator][0], seen, depth + 1, cache)
else:
value = _example(spec, schema[combinator][0], seen, depth + 1, cache)
else:
_type = schema.get("type")
if isinstance(_type, list): # OpenAPI 3.1 allows a list of types (e.g. ["string", "null"])
_type = next((_ for _ in _type if _ != "null"), None)
if _type == "object" or ("properties" in schema and not _type):
properties = schema.get("properties")
value = dict((name, _example(spec, sub, seen, depth + 1, cache)) for name, sub in (properties if isinstance(properties, dict) else {}).items())
elif _type == "array":
value = [_example(spec, schema.get("items") or {}, seen, depth + 1, cache)]
elif _type in ("integer", "number"):
value = 1
elif _type == "boolean":
value = True
elif _type == "string":
formats = {"uuid": "11111111-1111-1111-1111-111111111111", "date": "2020-01-01", "date-time": "2020-01-01T00:00:00Z", "email": "a@b.co", "byte": "MQ=="}
value = formats.get(schema.get("format"), "1")
else:
value = "1"
if ref is not None:
cache[ref] = value
return value
def _scalar(value):
if isinstance(value, bool):
return "true" if value else "false"
if isinstance(value, (int, float)):
return str(value)
if isinstance(value, six.string_types):
return value
try:
return json.dumps(value)
except TypeError: # e.g. datetime.date from a YAML 'example: 2020-01-01'
return str(value)
_NO_EXAMPLE = object()
def _explicitExample(spec, container):
# a concrete 'example'/'examples' declared on a parameter or media-type object - preferred over a
# schema-synthesized value (real specs carry the canonical, validation-passing sample here). 'examples'
# is a map of name -> {"value": ...} (each entry possibly a $ref).
if not isinstance(container, dict):
return _NO_EXAMPLE
if container.get("example") is not None: # 'null' -> treat as absent, fall back to schema synthesis
return container["example"]
examples = container.get("examples")
if isinstance(examples, dict) and examples:
first = _resolve(spec, next(iter(examples.values())))
if isinstance(first, dict) and first.get("value") is not None:
return first["value"]
return _NO_EXAMPLE
def _noMark(text):
# strip any custom injection mark already present in a synthesized value so only the intentionally
# appended mark (if any) survives (avoids a stray/second injection point)
return text.replace(CUSTOM_INJECTION_MARK_CHAR, "")
def _headerClean(text):
# remove characters that can not legally appear in an HTTP header name/value (CR, LF, NUL and other
# C0 controls) so a spec-supplied header can not inject extra headers or corrupt the request line
return re.sub(r"[\x00-\x1f\x7f]", "", text)
_HEADER_NAME_RE = re.compile(r"\A[!#$%&'*+.^_`|~0-9A-Za-z-]+\Z") # RFC 7230 header field-name token (no spaces / ':' / separators)
def _urlSafe(value, safe=""):
# percent-encode a synthesized value/name so it can not break the URL/body structure (spaces, '&',
# '=', '/', '?', '#', ...); py2/py3-safe (py2 urllib.quote needs bytes for non-ASCII). 'safe' keeps
# selected chars unescaped (e.g. "[]" for deep-object parameter names like filter[status]).
try:
return _quote(value.encode("utf-8") if isinstance(value, six.text_type) else str(value), safe=safe)
except Exception:
return value
def _baseUrl(spec, origin=None, servers=None):
# defensive throughout: a hostile/loose spec must not crash here (this runs outside the per-operation
# try/except, so an exception would abort the whole extraction). 'servers' overrides the spec-level
# 'servers' (used for per-path / per-operation 'servers').
basePath = spec.get("basePath") if isinstance(spec.get("basePath"), six.string_types) else ""
if basePath and not basePath.startswith("/"): # Swagger v2 basePath is a path -> ensure it is slash-prefixed
basePath = "/" + basePath
servers = servers if servers is not None else spec.get("servers")
if isinstance(servers, list) and servers and isinstance(servers[0], dict):
url = servers[0].get("url")
url = url if isinstance(url, six.string_types) else ""
variables = servers[0].get("variables")
if isinstance(variables, dict):
for name, meta in variables.items():
default = meta.get("default", "1") if isinstance(meta, dict) else "1"
url = url.replace("{%s}" % name, str(default))
if re.match(r"(?i)[a-z][a-z0-9+.-]*://", url): # absolute server URL -> used as declared (the host is NOT rewritten to the spec's own origin)
return url.rstrip('/')
return ((origin.rstrip('/') if origin else "") + "/" + url.lstrip('/')).rstrip('/') # relative server URL -> resolved against origin
if spec.get("host"): # Swagger v2 with an explicit host
schemes = spec.get("schemes")
scheme = schemes[0] if isinstance(schemes, list) and schemes else "https"
return "%s://%s%s" % (scheme, spec["host"], basePath.rstrip('/'))
return (origin.rstrip('/') if origin else "") + basePath.rstrip('/') # no servers/host -> spec's own origin
_METHODS = ("get", "post", "put", "delete", "patch", "options", "head")
def openApiTargets(content, origin=None):
"""
Returns a list of (url, method, data, headers) request tuples derived from an OpenAPI/Swagger
specification. 'headers' is a list of (name, value) tuples (matching conf.httpHeaders). 'origin'
(scheme://host[:port] of the specification's own location) is used only to resolve RELATIVE 'servers'
entries - absolute server URLs are used as declared. Path parameters and header/cookie values carry
the custom injection mark so they become testable injection points.
"""
spec = _loadSpec(content)
if not isinstance(spec, dict) or not isinstance(spec.get("paths"), dict) or not spec.get("paths"):
errMsg = "no valid 'paths' object found in the provided OpenAPI/Swagger specification"
raise ValueError(errMsg)
try:
rootBase = _baseUrl(spec, origin)
except Exception: # never let base-URL synthesis abort the whole run
rootBase = origin.rstrip('/') if isinstance(origin, six.string_types) else ""
isV2 = "swagger" in spec and "openapi" not in spec
retVal = []
cache = {} # $ref -> synthesized example, shared across all operations (large specs reuse schemas)
for path, item in (spec.get("paths") or {}).items():
item = _resolve(spec, item) # a Path Item object may itself be a $ref
if not isinstance(item, dict):
continue
shared = item.get("parameters") or [] # 'or []': a present-but-null 'parameters' must not break concatenation
for method, operation in item.items():
if str(method).lower() not in _METHODS or not isinstance(operation, dict): # str(): YAML keys can be non-string (e.g. 404, 'on'->bool)
continue
try:
# effective base URL with OpenAPI precedence: operation 'servers' > path-item 'servers' > root
opServers = operation.get("servers") or item.get("servers")
base = rootBase
if opServers:
try:
base = _baseUrl(spec, origin, opServers)
except Exception:
base = rootBase
# merge path-level + operation-level parameters, de-duplicated by (in, name); operation wins
params, seen = [], {}
for raw in ((shared if isinstance(shared, list) else []) + (operation.get("parameters") or [])):
resolved = _resolve(spec, raw)
if isinstance(resolved, dict) and resolved.get("name"):
key = (resolved.get("in"), resolved.get("name"))
if key in seen:
params[seen[key]] = resolved
continue
seen[key] = len(params)
params.append(resolved)
urlPath = path if isinstance(path, six.string_types) else str(path)
query, headers, form, cookies = [], [], [], []
for param in params:
if not isinstance(param, dict):
continue
location, name = param.get("in"), param.get("name")
if not name:
continue
if not isinstance(name, six.string_types): # YAML can yield a non-string param name (e.g. 5)
name = str(name)
explicit = _explicitExample(spec, param) # parameter-level example/examples wins over schema synthesis
if explicit is not _NO_EXAMPLE:
value = _scalar(explicit)
else:
schema = param.get("schema") or {"type": param.get("type", "string")}
value = _scalar(_example(spec, schema, cache=cache))
if location == "path":
# mark the filled path segment as a (custom) URI injection point - path parameters are
# prime REST injection targets; the value is encoded first so its own chars add no mark
urlPath = urlPath.replace("{%s}" % name, _urlSafe(value) + CUSTOM_INJECTION_MARK_CHAR)
elif location == "query":
# best-effort: array/object query params are scalarized (single value), NOT expanded per
# OpenAPI style/explode (repeated keys, comma/space/pipe delimited, deepObject) - the goal
# is one testable request per operation, not faithful serialization
query.append("%s=%s" % (_urlSafe(name, "[]"), _urlSafe(value)))
elif location == "header":
# append the custom injection mark so the header value becomes a testable (custom)
# injection point (non-exclusive: query/body params are still auto-tested); skip names
# that are not valid HTTP field-name tokens
headerName = _headerClean(name)
if headerName and _HEADER_NAME_RE.match(headerName):
headers.append((headerName, "%s%s" % (_headerClean(_noMark(value)), CUSTOM_INJECTION_MARK_CHAR)))
elif location == "cookie":
# a cookie name is a token; the value must not contain cookie-structure chars ('; ,'
# and whitespace) or a spec could smuggle extra cookie pairs
cookieName = _headerClean(name)
if cookieName and _HEADER_NAME_RE.match(cookieName):
cookieValue = re.sub(r"[;,\s]", "", _headerClean(_noMark(value)))
cookies.append("%s=%s%s" % (cookieName, cookieValue, CUSTOM_INJECTION_MARK_CHAR))
elif location == "formData": # Swagger v2 in:"formData" -> urlencoded body field
form.append("%s=%s" % (_urlSafe(name, "[]"), _urlSafe(value)))
if cookies: # aggregate all cookie params into a single Cookie header
headers.append((HTTP_HEADER.COOKIE, "; ".join(cookies)))
urlPath = urlPath.replace(" ", "%20").replace("?", "%3F").replace("#", "%23") # keep a literal path key from breaking the URL (filled values are already encoded)
if urlPath and not urlPath.startswith("/"): # OpenAPI path keys start with '/'; harden a loose spec so base+path is not glued (/v1pets)
urlPath = "/" + urlPath
url = base + urlPath
if query:
url += "?" + "&".join(query)
url = re.sub(r"\{[^}]+\}", "1", url) # any leftover template var (undefined path OR server variable) -> "1"
if not re.match(r"(?i)[a-z][a-z0-9+.-]*://", url): # no scheme/host -> unscannable relative URL
logger.warning("skipping OpenAPI operation '%s %s' (unable to resolve an absolute target URL; provide the specification by URL or add a 'servers'/'host' entry)" % (str(method).upper(), path))
continue
data = None
body = _resolve(spec, operation.get("requestBody") or {})
content_ = body.get("content") if isinstance(body, dict) else None
if isinstance(content_, dict) and content_:
mediaTypes = [_ for _ in content_ if isinstance(_, six.string_types)] # media-type keys must be strings
picked = next((_ for _ in mediaTypes if _ == "application/json" or _.endswith("+json") or "json" in _), None) \
or ("application/x-www-form-urlencoded" if "application/x-www-form-urlencoded" in mediaTypes else None) \
or (mediaTypes[0] if mediaTypes else None)
if picked:
mediaType = content_[picked] if isinstance(content_[picked], dict) else {}
example = _explicitExample(spec, mediaType) # media-type-level example/examples wins over schema synthesis
if example is _NO_EXAMPLE:
example = _example(spec, mediaType.get("schema") or {}, cache=cache)
if "json" in picked:
data = _noMark(json.dumps(example, default=str))
headers.append((HTTP_HEADER.CONTENT_TYPE, "application/json"))
elif picked == "application/x-www-form-urlencoded" and isinstance(example, dict):
data = "&".join("%s=%s" % (_urlSafe(name, "[]"), _urlSafe(_scalar(value))) for name, value in example.items())
headers.append((HTTP_HEADER.CONTENT_TYPE, "application/x-www-form-urlencoded"))
elif isinstance(example, six.string_types):
# raw (text / xml / ...) body -> mark it so the whole body becomes a testable point
data = _noMark(example) + CUSTOM_INJECTION_MARK_CHAR
headers.append((HTTP_HEADER.CONTENT_TYPE, picked))
else: # e.g. multipart/form-data or a structured non-JSON body (no safe serialization)
logger.debug("not synthesizing a '%s' request body for '%s %s'" % (picked, str(method).upper(), path))
elif isinstance(operation.get("parameters"), list) or isV2:
for param in params: # Swagger v2 in:"body"
if isinstance(param, dict) and param.get("in") == "body":
example = _example(spec, param.get("schema") or {}, cache=cache)
data = _noMark(json.dumps(example, default=str))
headers.append((HTTP_HEADER.CONTENT_TYPE, "application/json"))
if data is None and form: # Swagger v2 in:"formData" fields -> urlencoded body
data = "&".join(form)
headers.append((HTTP_HEADER.CONTENT_TYPE, "application/x-www-form-urlencoded"))
retVal.append((url, str(method).upper(), data, headers or None))
except Exception as ex:
logger.warning("skipping OpenAPI operation '%s %s' (%s)" % (str(method).upper(), path, getSafeExString(ex)))
return retVal

456
tests/test_openapi.py Normal file
View file

@ -0,0 +1,456 @@
#!/usr/bin/env python
"""
Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org)
See the file 'LICENSE' for copying permission
Unit coverage for the OpenAPI/Swagger target extractor (lib/parse/openapi.py): schema example
synthesis, $ref resolution (incl. cycles), base-URL resolution (v2 + v3, relative/templated servers),
request-body handling (JSON / form), parameter->PLACE mapping, and (importantly) graceful handling of
malformed / poorly-defined specifications (a broken spec must never crash or hang the parser).
stdlib unittest only (no pytest / no pip); works on Python 2.7 and 3.x.
"""
import json
import os
import sys
import unittest
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from _testutils import bootstrap
bootstrap()
from lib.parse.openapi import openApiTargets, yaml as _yaml
HAS_YAML = _yaml is not None
def _targets(spec, origin="http://h"):
return openApiTargets(json.dumps(spec) if isinstance(spec, dict) else spec, origin)
def _byMethodPath(targets):
return dict(("%s %s" % (method, url), (method, url, data, headers)) for url, method, data, headers in targets)
class TestOpenApi(unittest.TestCase):
def test_v3_query_path_and_base(self):
spec = {"openapi": "3.0.0", "servers": [{"url": "/api"}],
"paths": {"/pet/{id}": {"get": {"parameters": [
{"name": "id", "in": "path", "schema": {"type": "integer"}},
{"name": "q", "in": "query", "schema": {"type": "string", "example": "x"}}]}}}}
targets = _targets(spec, "http://host:8080")
self.assertEqual(len(targets), 1)
url, method, data, headers = targets[0]
self.assertEqual(method, "GET")
from lib.core.settings import CUSTOM_INJECTION_MARK_CHAR as MARK
self.assertEqual(url, "http://host:8080/api/pet/1%s?q=x" % MARK) # relative server + filled+marked path + query
self.assertIsNone(data)
def test_v3_json_body_sets_data_and_content_type(self):
spec = {"openapi": "3.0.0", "paths": {"/o": {"post": {"requestBody": {"content": {"application/json":
{"schema": {"type": "object", "properties": {"name": {"type": "string"}, "qty": {"type": "integer"}}}}}}}}}}
url, method, data, headers = _targets(spec)[0]
self.assertEqual(method, "POST")
self.assertEqual(json.loads(data), {"name": "1", "qty": 1})
self.assertIn(("Content-Type", "application/json"), headers)
def test_form_urlencoded_body(self):
spec = {"openapi": "3.0.0", "paths": {"/login": {"post": {"requestBody": {"content":
{"application/x-www-form-urlencoded": {"schema": {"type": "object",
"properties": {"u": {"type": "string"}, "p": {"type": "string"}}}}}}}}}}
url, method, data, headers = _targets(spec)[0]
self.assertEqual(sorted(data.split("&")), ["p=1", "u=1"])
def test_value_synthesis(self):
spec = {"openapi": "3.0.0", "paths": {"/x": {"get": {"parameters": [
{"name": "a", "in": "query", "schema": {"type": "integer"}},
{"name": "b", "in": "query", "schema": {"type": "boolean"}},
{"name": "c", "in": "query", "schema": {"type": "string", "enum": ["first", "second"]}},
{"name": "d", "in": "query", "schema": {"type": "string", "default": "dd"}},
{"name": "e", "in": "query", "schema": {"type": "string", "format": "uuid"}}]}}}}
url = _targets(spec)[0][0]
self.assertIn("a=1", url)
self.assertIn("b=true", url)
self.assertIn("c=first", url) # enum[0]
self.assertIn("d=dd", url) # default
self.assertIn("e=11111111-1111-1111-1111-111111111111", url) # format uuid
def test_ref_resolution_and_allof_oneof(self):
spec = {"openapi": "3.0.0",
"components": {"schemas": {"Tag": {"type": "object", "properties": {"n": {"type": "string"}}}}},
"paths": {
"/ref": {"post": {"requestBody": {"content": {"application/json": {"schema": {"$ref": "#/components/schemas/Tag"}}}}}},
"/all": {"post": {"requestBody": {"content": {"application/json": {"schema": {"allOf": [
{"type": "object", "properties": {"x": {"type": "string"}}},
{"type": "object", "properties": {"y": {"type": "integer"}}}]}}}}}},
"/one": {"post": {"requestBody": {"content": {"application/json": {"schema": {"oneOf": [
{"type": "object", "properties": {"only": {"type": "string"}}},
{"type": "object", "properties": {"other": {"type": "string"}}}]}}}}}}}}
m = _byMethodPath(_targets(spec))
self.assertEqual(json.loads(m["POST http://h/ref"][2]), {"n": "1"})
self.assertEqual(json.loads(m["POST http://h/all"][2]), {"x": "1", "y": 1}) # allOf merged
self.assertEqual(json.loads(m["POST http://h/one"][2]), {"only": "1"}) # oneOf -> first
def test_ref_cycle_terminates(self):
spec = {"openapi": "3.0.0",
"components": {"schemas": {"Node": {"type": "object", "properties": {
"name": {"type": "string"}, "parent": {"$ref": "#/components/schemas/Node"}}}}},
"paths": {"/n": {"post": {"requestBody": {"content": {"application/json":
{"schema": {"$ref": "#/components/schemas/Node"}}}}}}}}
targets = _targets(spec) # must not hang / recurse forever
self.assertEqual(len(targets), 1)
self.assertTrue(json.loads(targets[0][2]).get("name") == "1")
def test_swagger_v2_base_and_body(self):
spec = {"swagger": "2.0", "host": "api.example.com", "basePath": "/v2", "schemes": ["https"],
"paths": {"/pet": {"post": {"parameters": [{"name": "b", "in": "body",
"schema": {"type": "object", "properties": {"id": {"type": "integer"}}}}]}}}}
url, method, data, headers = _targets(spec, None)[0]
self.assertEqual(url, "https://api.example.com/v2/pet")
self.assertEqual(json.loads(data), {"id": 1})
def test_server_template_variables(self):
spec = {"openapi": "3.0.0", "servers": [{"url": "https://{env}.x.io/{ver}",
"variables": {"env": {"default": "prod"}, "ver": {"default": "v3"}}}],
"paths": {"/p": {"get": {}}}}
self.assertEqual(_targets(spec, None)[0][0], "https://prod.x.io/v3/p")
def test_headers_are_hashable_tuples(self):
# kb.targets is an OrderedSet, so the emitted headers must be hashable (tuple, not list)
spec = {"openapi": "3.0.0", "paths": {"/x": {"get": {"parameters": [
{"name": "h", "in": "header", "schema": {"type": "string"}}]}}}}
headers = _targets(spec)[0][3]
self.assertTrue(headers is None or isinstance(tuple(headers), tuple))
def test_header_and_cookie_params_are_injection_marked(self):
# header/cookie params get the custom injection mark ('*') appended so they become testable
# (custom) injection points (query/body params are still auto-tested alongside them)
from lib.core.settings import CUSTOM_INJECTION_MARK_CHAR as MARK
spec = {"openapi": "3.0.0", "paths": {"/x": {"get": {"parameters": [
{"name": "X-Api", "in": "header", "schema": {"type": "string", "example": "k"}},
{"name": "sess", "in": "cookie", "schema": {"type": "string", "example": "v"}}]}}}}
headers = dict(_targets(spec)[0][3])
self.assertEqual(headers["X-Api"], "k" + MARK)
self.assertEqual(headers["Cookie"], "sess=v" + MARK)
# --- graceful degradation: a broken/poorly-defined spec must never crash the parser ---
def test_malformed_raises_valueerror(self):
for bad in ("{not json,,,", "[1,2,3]", "{}", '{"openapi":"3.0.0"}', '{"openapi":"3.0.0","paths":[1,2]}'):
self.assertRaises(ValueError, openApiTargets, bad, "http://h")
def test_malformed_servers_do_not_crash(self):
for servers in ('{"url":"/a"}', '"http://h"', "[]"):
spec = '{"openapi":"3.0.0","servers":%s,"paths":{"/x":{"get":{}}}}' % servers
self.assertEqual(len(openApiTargets(spec, "http://h")), 1) # no crash, still one target
def test_url_and_body_values_are_encoded(self):
# special characters in synthesized values must be percent-encoded so they can not break the
# URL structure (param smuggling) or the form body
spec = {"openapi": "3.0.0", "paths": {
"/x/{p}": {"get": {"parameters": [
{"name": "p", "in": "path", "schema": {"type": "string", "example": "a/b"}},
{"name": "q", "in": "query", "schema": {"type": "string", "example": "a b&c=d"}}]}},
"/f": {"post": {"requestBody": {"content": {"application/x-www-form-urlencoded":
{"schema": {"type": "object", "properties": {"u": {"type": "string", "example": "a b&x"}}}}}}}}}}
byMethod = dict((method, (url, data)) for url, method, data, headers in _targets(spec))
getUrl = byMethod["GET"][0]
self.assertIn("/x/a%2Fb", getUrl) # path value '/' encoded (no extra segment)
self.assertIn("q=a%20b%26c%3Dd", getUrl) # query value space/&/= encoded (no smuggling)
self.assertNotIn(" ", getUrl)
self.assertEqual(byMethod["POST"][1], "u=a%20b%26x")
@unittest.skipUnless(HAS_YAML, "pyyaml not available")
def test_yaml_spec(self):
y = ("openapi: 3.0.0\n"
"paths:\n"
" /y:\n"
" get:\n"
" parameters:\n"
" - name: q\n"
" in: query\n"
" schema: {type: string, example: hi}\n")
targets = openApiTargets(y, "http://h")
self.assertEqual(len(targets), 1)
self.assertEqual(targets[0][0], "http://h/y?q=hi")
def test_shared_recursive_refs_scale(self):
# a self-referential schema reused across many operations must terminate promptly (depth cap +
# per-$ref memoization); without them this would blow up exponentially and hang the test
schemas = {"Node": {"type": "object", "properties": {
"name": {"type": "string"},
"child": {"$ref": "#/components/schemas/Node"},
"list": {"type": "array", "items": {"$ref": "#/components/schemas/Node"}}}}}
paths = dict(("/n%d" % i, {"post": {"requestBody": {"content": {"application/json":
{"schema": {"$ref": "#/components/schemas/Node"}}}}}}) for i in range(60))
targets = _targets({"openapi": "3.0.0", "components": {"schemas": schemas}, "paths": paths})
self.assertEqual(len(targets), 60)
self.assertEqual(json.loads(targets[0][2]).get("name"), "1")
def test_swagger_v2_formdata_body(self):
# in:"formData" params must become a urlencoded body (previously dropped -> empty POST)
spec = {"swagger": "2.0", "host": "h", "paths": {"/l": {"post": {"parameters": [
{"name": "u", "in": "formData", "type": "string"},
{"name": "p", "in": "formData", "type": "string"}]}}}}
url, method, data, headers = _targets(spec, None)[0]
self.assertEqual(method, "POST")
self.assertEqual(sorted(data.split("&")), ["p=1", "u=1"])
def test_relative_base_is_skipped(self):
# a spec that yields no scheme/host (relative server + no origin) must be skipped, not emitted
spec = {"openapi": "3.0.0", "servers": [{"url": "/api"}], "paths": {"/x": {"get": {}}}}
self.assertEqual(openApiTargets(json.dumps(spec), None), []) # relative -> skipped
self.assertEqual(len(openApiTargets(json.dumps(spec), "http://h")), 1) # absolute with origin -> kept
def test_unsupported_body_media_type_no_crash(self):
# a structured body under a non-JSON/form media type must not crash and must not fabricate a body,
# but the endpoint URL is still produced
spec = {"openapi": "3.0.0", "paths": {"/x": {"post": {"requestBody": {"content": {"application/xml":
{"schema": {"type": "object", "properties": {"a": {"type": "string"}}}}}}}}}}
url, method, data, headers = _targets(spec)[0]
self.assertEqual((url, method, data), ("http://h/x", "POST", None))
def test_injection_mark_char_in_value_is_not_doubled(self):
# an example value already containing the custom injection mark must not create a stray point
from lib.core.settings import CUSTOM_INJECTION_MARK_CHAR as MARK
spec = {"openapi": "3.0.0", "paths": {"/x": {"post": {
"parameters": [{"name": "H", "in": "header", "schema": {"type": "string", "example": "a%sb" % MARK}}],
"requestBody": {"content": {"application/json": {"schema": {"type": "object",
"properties": {"n": {"type": "string", "example": "x%sy" % MARK}}}}}}}}}}
url, method, data, headers = _targets(spec)[0]
self.assertEqual(dict(headers)["H"], "ab" + MARK) # single trailing mark only
self.assertEqual(json.loads(data), {"n": "xy"}) # mark stripped from body value
@unittest.skipUnless(HAS_YAML, "pyyaml not available")
def test_non_string_method_keys_do_not_crash(self):
# YAML path-item keys are not guaranteed to be strings (404 -> int, on -> bool); must not crash
y = ("openapi: 3.0.0\n"
"servers: [{url: 'http://h'}]\n"
"paths:\n"
" /x:\n"
" get: {}\n"
" 404: {}\n"
" on: {}\n")
targets = openApiTargets(y, "http://h")
self.assertEqual(len(targets), 1) # only the real GET operation
self.assertEqual(targets[0][1], "GET")
def test_hostile_base_url_metadata_does_not_crash(self):
# _baseUrl runs once, OUTSIDE the per-operation try, so malformed server/scheme/basePath metadata
# must not raise (it would abort the entire extraction)
hostile = [
{"openapi": "3.0.0", "servers": [{"url": "https://{e}.x/", "variables": [1, 2]}], "paths": {"/x": {"get": {}}}},
{"openapi": "3.0.0", "servers": [{"url": "https://{e}.x/", "variables": {"e": "prod"}}], "paths": {"/x": {"get": {}}}},
{"openapi": "3.0.0", "servers": [{"url": 123}], "paths": {"/x": {"get": {}}}},
{"swagger": "2.0", "host": "h", "schemes": {"a": 1}, "paths": {"/x": {"get": {}}}},
{"swagger": "2.0", "host": "h", "basePath": 123, "paths": {"/x": {"get": {}}}}]
for spec in hostile:
self.assertEqual(len(_targets(spec)), 1) # no crash, still one target
def test_param_entry_not_a_dict_is_skipped(self):
spec = {"openapi": "3.0.0", "paths": {"/x": {"get": {"parameters": ["oops", {"name": "q", "in": "query"}]}}}}
self.assertIn("q=1", _targets(spec)[0][0]) # bad entry skipped, good one still used
@unittest.skipUnless(HAS_YAML, "pyyaml not available")
def test_yaml_date_examples_serialize(self):
# unquoted YAML dates parse to datetime.date, which is not JSON-serializable -> must be stringified,
# not silently dropped (dates are pervasive in real specs)
y = ("openapi: 3.0.0\n"
"servers: [{url: 'http://h'}]\n"
"paths:\n"
" /x:\n"
" post:\n"
" requestBody:\n"
" content:\n"
" application/json:\n"
" schema: {type: object, properties: {created: {type: string, example: 2020-01-01}}}\n")
url, method, data, headers = openApiTargets(y, "http://h")[0]
self.assertEqual(json.loads(data), {"created": "2020-01-01"})
def test_crlf_in_header_and_cookie_is_stripped(self):
# a spec-supplied header/cookie name or value must not carry CR/LF (header injection / request
# corruption); query/path values are separately percent-encoded
spec = {"openapi": "3.0.0", "paths": {"/x": {"get": {"parameters": [
{"name": "X-A", "in": "header", "schema": {"type": "string", "example": "a\r\nX-Evil: 1"}},
{"name": "X\r\nB", "in": "header", "schema": {"type": "string", "example": "v"}},
{"name": "sid", "in": "cookie", "schema": {"type": "string", "example": "a\r\nSet: x"}}]}}}}
headers = dict(_targets(spec)[0][3])
for name, value in headers.items():
self.assertNotIn("\r", name + value)
self.assertNotIn("\n", name + value)
self.assertIn("X-A", headers)
self.assertIn("XB", headers) # control chars removed from the name
def test_explicit_examples_preferred_over_schema(self):
# a concrete example/examples on the media-type or parameter object must win over schema synthesis
# (real specs carry the canonical, validation-passing value there)
body = {"openapi": "3.0.0", "paths": {"/x": {"post": {"requestBody": {"content": {"application/json": {
"schema": {"type": "object", "properties": {"name": {"type": "string"}}}, "example": {"name": "real"}}}}}}}}
self.assertEqual(json.loads(_targets(body)[0][2]), {"name": "real"})
examples = {"openapi": "3.0.0", "paths": {"/x": {"post": {"requestBody": {"content": {"application/json": {
"schema": {"type": "object"}, "examples": {"first": {"value": {"k": "v1"}}}}}}}}}}
self.assertEqual(json.loads(_targets(examples)[0][2]), {"k": "v1"})
param = {"openapi": "3.0.0", "paths": {"/x": {"get": {"parameters": [
{"name": "q", "in": "query", "example": "E", "schema": {"type": "string"}}]}}}}
self.assertIn("q=E", _targets(param)[0][0])
def test_openapi_31_const_and_type_array(self):
spec = {"openapi": "3.1.0", "paths": {"/x": {"get": {"parameters": [
{"name": "c", "in": "query", "schema": {"const": "CV"}},
{"name": "n", "in": "query", "schema": {"type": ["integer", "null"]}}]}}}}
url = _targets(spec)[0][0]
self.assertIn("c=CV", url) # const used
self.assertIn("n=1", url) # ["integer","null"] resolved to integer, not the generic fallback
def test_parameter_names_are_encoded(self):
# a param NAME with structural chars must be encoded so it can not split/smuggle params or truncate
# at a fragment; deep-object brackets ([]) are preserved
spec = {"openapi": "3.0.0", "paths": {
"/q": {"get": {"parameters": [
{"name": "a&b=c", "in": "query", "schema": {"type": "string"}},
{"name": "a#b", "in": "query", "schema": {"type": "string"}},
{"name": "filter[status]", "in": "query", "schema": {"type": "string"}}]}},
"/f": {"post": {"requestBody": {"content": {"application/x-www-form-urlencoded":
{"schema": {"type": "object", "properties": {"x&y": {"type": "string"}}}}}}}}}}
byMethod = dict((method, (url, data)) for url, method, data, headers in _targets(spec))
getUrl = byMethod["GET"][0]
self.assertIn("a%26b%3Dc=1", getUrl)
self.assertIn("a%23b=1", getUrl)
self.assertIn("filter[status]=1", getUrl) # brackets kept (deep-object param names)
self.assertNotIn("#", getUrl)
self.assertEqual(byMethod["POST"][1], "x%26y=1")
def test_undefined_template_var_does_not_leak(self):
# a server/path template variable with no definition must not leave a literal '{...}' in the URL
spec = {"openapi": "3.0.0", "servers": [{"url": "https://api.x.com/{basePath}/v3"}],
"paths": {"/pets": {"get": {}}}}
url = _targets(spec, "http://h")[0][0]
self.assertNotIn("{", url)
self.assertEqual(url, "https://api.x.com/1/v3/pets") # absolute server used as-is (host not rewritten)
def test_absolute_server_url_is_not_rewritten_to_origin(self):
# a spec served from one host but declaring an absolute API server on another host must scan the
# DECLARED API host, not the spec's origin
spec = {"openapi": "3.0.0", "servers": [{"url": "https://api.example.com/v1"}],
"paths": {"/pets": {"get": {}}}}
self.assertEqual(_targets(spec, "https://docs.example.com")[0][0], "https://api.example.com/v1/pets")
def test_path_parameter_is_injection_marked(self):
from lib.core.settings import CUSTOM_INJECTION_MARK_CHAR as MARK
spec = {"openapi": "3.0.0", "paths": {"/users/{id}": {"get": {"parameters": [
{"name": "id", "in": "path", "schema": {"type": "integer"}}]}}}}
self.assertEqual(_targets(spec)[0][0], "http://h/users/1" + MARK)
def test_form_urlencoded_sets_content_type_and_multipart_skipped(self):
form = {"openapi": "3.0.0", "paths": {"/f": {"post": {"requestBody": {"content":
{"application/x-www-form-urlencoded": {"schema": {"type": "object", "properties": {"u": {"type": "string"}}}}}}}}}}
url, method, data, headers = _targets(form)[0]
self.assertEqual(data, "u=1")
self.assertIn(("Content-Type", "application/x-www-form-urlencoded"), headers)
multipart = {"openapi": "3.0.0", "paths": {"/m": {"post": {"requestBody": {"content":
{"multipart/form-data": {"schema": {"type": "object", "properties": {"u": {"type": "string"}}}}}}}}}}
url, method, data, headers = _targets(multipart)[0]
self.assertIsNone(data) # multipart is skipped, not mis-serialized as urlencoded
def test_path_item_ref_is_resolved(self):
spec = {"openapi": "3.1.0",
"components": {"pathItems": {"Ping": {"get": {"parameters": [
{"name": "q", "in": "query", "schema": {"type": "string", "example": "z"}}]}}}},
"paths": {"/ping": {"$ref": "#/components/pathItems/Ping"}}}
targets = _targets(spec)
self.assertEqual(len(targets), 1)
self.assertIn("q=z", targets[0][0])
def test_operation_parameter_overrides_path_level(self):
spec = {"openapi": "3.0.0", "paths": {"/x": {
"parameters": [{"name": "q", "in": "query", "schema": {"type": "string", "example": "shared"}}],
"get": {"parameters": [{"name": "q", "in": "query", "schema": {"type": "string", "example": "op"}}]}}}}
url = _targets(spec)[0][0]
self.assertIn("q=op", url) # operation value wins
self.assertEqual(url.count("q="), 1) # not duplicated
def test_multiple_cookies_aggregate_into_one_header(self):
from lib.core.settings import CUSTOM_INJECTION_MARK_CHAR as MARK
spec = {"openapi": "3.0.0", "paths": {"/x": {"get": {"parameters": [
{"name": "a", "in": "cookie", "schema": {"type": "string"}},
{"name": "b", "in": "cookie", "schema": {"type": "string"}}]}}}}
headers = _targets(spec)[0][3]
cookieHeaders = [v for (k, v) in headers if k == "Cookie"]
self.assertEqual(cookieHeaders, ["a=1%s; b=1%s" % (MARK, MARK)]) # one aggregated Cookie header
def test_cookie_name_value_cannot_smuggle_pairs(self):
# a cookie name that is not a token is dropped; structural chars in the value ('; ,' / whitespace)
# are stripped so a spec can not inject additional cookie pairs
spec = {"openapi": "3.0.0", "paths": {"/x": {"get": {"parameters": [
{"name": "a; injected", "in": "cookie", "schema": {"type": "string"}},
{"name": "sid", "in": "cookie", "schema": {"type": "string", "example": "v; z=1"}}]}}}}
cookieHeaders = [v for (k, v) in (_targets(spec)[0][3] or []) if k == "Cookie"]
self.assertEqual(len(cookieHeaders), 1)
cookie = cookieHeaders[0]
self.assertNotIn(";", cookie.rstrip("*")) # no interior ';' -> no smuggled pair
self.assertNotIn("injected", cookie) # invalid cookie name dropped
self.assertNotIn(" ", cookie)
def test_loose_path_without_leading_slash(self):
# a malformed path key missing its leading '/' must not glue onto the base (".../v1pets")
spec = {"openapi": "3.0.0", "servers": [{"url": "https://api.x/v1"}], "paths": {"pets": {"get": {}}}}
self.assertEqual(_targets(spec, None)[0][0], "https://api.x/v1/pets")
def test_array_query_param_is_best_effort_scalar(self):
# documents current best-effort behavior: an array query param is scalarized+encoded, NOT expanded
# per style/explode. If richer serialization is added later, update this expectation deliberately.
spec = {"openapi": "3.0.0", "paths": {"/x": {"get": {"parameters": [
{"name": "ids", "in": "query", "schema": {"type": "array", "items": {"type": "integer"}}}]}}}}
url = _targets(spec)[0][0]
self.assertIn("ids=", url)
self.assertNotIn(" ", url) # whatever the encoding, it must not break the URL
self.assertTrue(url.startswith("http://h/x?ids="))
def test_invalid_header_name_is_skipped(self):
spec = {"openapi": "3.0.0", "paths": {"/x": {"get": {"parameters": [
{"name": "Bad Name", "in": "header", "schema": {"type": "string"}},
{"name": "Also:Bad", "in": "header", "schema": {"type": "string"}},
{"name": "X-Good", "in": "header", "schema": {"type": "string"}}]}}}}
headers = dict(_targets(spec)[0][3] or [])
self.assertIn("X-Good", headers)
self.assertNotIn("Bad Name", headers)
self.assertNotIn("Also:Bad", headers)
def test_explicit_null_example_falls_back_to_schema(self):
# 'example: null' must not serialize as null/"null" - fall back to schema synthesis
q = {"openapi": "3.0.0", "paths": {"/x": {"get": {"parameters": [
{"name": "q", "in": "query", "example": None, "schema": {"type": "string", "example": "good"}}]}}}}
self.assertIn("q=good", _targets(q)[0][0])
b = {"openapi": "3.0.0", "paths": {"/x": {"post": {"requestBody": {"content": {"application/json":
{"example": None, "schema": {"type": "object", "properties": {"a": {"type": "integer"}}}}}}}}}}
self.assertEqual(json.loads(_targets(b)[0][2]), {"a": 1})
def test_degrade_not_skip_on_odd_shapes(self):
# enum-as-dict, non-string param name, and content[type]-as-list must degrade (op preserved)
for spec in (
{"openapi": "3.0.0", "paths": {"/x": {"get": {"parameters": [{"name": "q", "in": "query", "schema": {"enum": {"a": 1}}}]}}}},
{"openapi": "3.0.0", "paths": {"/x": {"get": {"parameters": [{"name": 5, "in": "header", "schema": {"type": "string"}}]}}}},
{"openapi": "3.0.0", "paths": {"/x": {"post": {"requestBody": {"content": {"application/json": [1, 2]}}}}}}):
self.assertEqual(len(_targets(spec)), 1)
def test_malformed_ref_and_properties_degrade_not_skip(self):
# a non-string/unhashable $ref or a non-dict 'properties' must degrade the value (not lose the op)
for schema in ({"$ref": 123}, {"$ref": [1, 2]}, {"type": "object", "properties": [1, 2]}):
spec = {"openapi": "3.0.0", "paths": {"/x": {"post": {"requestBody":
{"content": {"application/json": {"schema": schema}}}}}}}
self.assertEqual(len(_targets(spec)), 1) # operation preserved, not skipped
def test_undefined_bits_are_skipped_not_fatal(self):
spec = {"openapi": "3.0.0", "paths": {
"/a": {"get": {"parameters": [{}]}}, # param with no name
"/b": {"post": {"requestBody": {"content": {"application/json":
{"schema": {"$ref": "#/components/schemas/DoesNotExist"}}}}}}, # dangling $ref
"/c": {"get": {"parameters": [{"name": "p", "in": "query",
"schema": {"$ref": "https://other/x.json#/Y"}}]}}}} # external $ref
targets = _targets(spec)
self.assertEqual(len(targets), 3) # all three still produced
if __name__ == "__main__":
unittest.main()