#!/usr/bin/env python
"""
Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org)
See the file 'LICENSE' for copying permission
Offline, deterministic tests for the NoSQL injection engine. Mock oracles stand in for the
HTTP/back-end layer so detection and blind extraction can be exercised without a live target,
covering each dialect: MongoDB/CouchDB operator injection, Elasticsearch/Solr query_string,
Neo4j Cypher and ArangoDB AQL string break-out.
"""
import re
import unittest
from _testutils import bootstrap
bootstrap()
import lib.techniques.nosql.inject as ni
SECRET = "S3cr3t_9"
MATCH = "
Welcome user; rows: alpha, bravo, charlie"
NOMATCH = "Invalid credentials; no rows"
def _mongo(place, parameter, op, value, isArray=False):
if op == "$ne":
return MATCH
if op == "$in":
return NOMATCH
if op == "$regex":
try:
return MATCH if re.match(value, SECRET) is not None else NOMATCH
except re.error:
return "error: invalid regular expression"
return ""
def _es(place, parameter, value):
if value == "*":
return MATCH
if value == ni.NOSQL_SENTINEL:
return NOMATCH
if value.startswith("/") and value.endswith("/"): # Lucene regexp is full-anchored
try:
return MATCH if re.match("^(?:%s)$" % value[1:-1], SECRET) is not None else NOMATCH
except re.error:
return "error: parse_exception"
return NOMATCH
class TestNoSqlMongo(unittest.TestCase):
def setUp(self):
self._orig = ni._fetch
ni._fetch = _mongo
def tearDown(self):
ni._fetch = self._orig
def test_detect(self):
self.assertTrue(ni._detectMongo("GET", "password"))
def test_extract(self):
template = ni._fetch("GET", "password", "$ne", ni.NOSQL_SENTINEL)
value = ni._extract(template,
lambda v: ni._fetch("GET", "password", "$regex", v),
lambda n: "^.{%d,}$" % n,
lambda known, klass: "^" + re.escape(known) + klass)
self.assertEqual(value, SECRET)
def test_not_injectable(self):
ni._fetch = lambda *args, **kwargs: MATCH
self.assertIsNone(ni._detectMongo("GET", "password"))
class TestNoSqlElasticsearch(unittest.TestCase):
def setUp(self):
self._orig = ni._fetchValue
ni._fetchValue = _es
def tearDown(self):
ni._fetchValue = self._orig
def test_detect(self):
self.assertTrue(ni._detectES("GET", "q"))
def test_extract(self):
template = ni._fetchValue("GET", "q", "*")
value = ni._extract(template,
lambda v: ni._fetchValue("GET", "q", v),
lambda n: "/.{%d,}/" % n,
lambda known, klass: "/%s%s.*/" % (ni._lucene(known), klass))
self.assertEqual(value, SECRET)
def test_not_injectable(self):
ni._fetchValue = lambda *args, **kwargs: MATCH
self.assertIsNone(ni._detectES("GET", "q"))
def _cypher(place, parameter, value):
if "'1'='1" in value:
return MATCH
if "'1'='2" in value:
return NOMATCH
m = re.search(r"=~ '\^(.*)$", value) # the regex body after the =~ operator
if m:
try:
return MATCH if re.match("^(?:%s)$" % m.group(1), SECRET) is not None else NOMATCH
except re.error:
return NOMATCH
return NOMATCH
class TestNoSqlCypher(unittest.TestCase):
def setUp(self):
self._orig = ni._fetchValue
ni._fetchValue = _cypher
def tearDown(self):
ni._fetchValue = self._orig
def test_detect(self):
self.assertTrue(ni._detectCypher("GET", "password"))
def test_extract(self):
template = ni._fetchValue("GET", "password", ni.NOSQL_SENTINEL + "' OR '1'='1")
value = ni._extract(template,
lambda v: ni._fetchValue("GET", "password", v),
lambda n: "%s' OR u.password =~ '^.{%d,}" % (ni.NOSQL_SENTINEL, n),
lambda known, klass: "%s' OR u.password =~ '^%s%s.*" % (ni.NOSQL_SENTINEL, ni._javaEscape(known), klass))
self.assertEqual(value, SECRET)
def _aql(place, parameter, value):
m = re.search(r"=~ '(\^[^']*)'", value) # the regex body inside =~ '...'
if m:
try: # ArangoDB =~ is a partial (unanchored) match
return MATCH if re.search(m.group(1), SECRET) is not None else NOMATCH
except re.error:
return NOMATCH
if "'1'=='1" in value:
return MATCH
return NOMATCH
class TestNoSqlArango(unittest.TestCase):
def setUp(self):
self._orig = ni._fetchValue
ni._fetchValue = _aql
def tearDown(self):
ni._fetchValue = self._orig
def test_detect(self):
self.assertTrue(ni._detectAQL("GET", "password"))
def test_extract(self):
template = ni._fetchValue("GET", "password", ni.NOSQL_SENTINEL + "' || '1'=='1")
value = ni._extract(template,
lambda v: ni._fetchValue("GET", "password", v),
lambda n: "%s' || (u.password =~ '^.{%d,}') || '1'=='2" % (ni.NOSQL_SENTINEL, n),
lambda known, klass: "%s' || (u.password =~ '^%s%s') || '1'=='2" % (ni.NOSQL_SENTINEL, ni._javaEscape(known), klass))
self.assertEqual(value, SECRET)
def _n1ql(place, parameter, value):
m = re.search(r"REGEXP_CONTAINS\([^,]+, '([^']*)'\)", value)
if m:
try: # model the single-quoted string layer (collapse the doubled backslashes)
return MATCH if re.search(m.group(1).replace("\\\\", "\\"), SECRET) is not None else NOMATCH
except re.error:
return NOMATCH
if "=~" in value: # N1QL has no =~ operator -> engine error
return "error: syntax error near '=~'"
if "'1'='1" in value:
return MATCH
return NOMATCH
class TestNoSqlN1QL(unittest.TestCase):
"""Couchbase N1QL shares the ' OR '1'='1 break-out with Neo4j; _resolve() must disambiguate by the
regexp-match primitive (=~ fails, REGEXP_CONTAINS works) and still extract"""
def setUp(self):
self._f, self._fv = ni._fetch, ni._fetchValue
ni._fetch = lambda *args, **kwargs: "" # keep MongoDB operator detection out of the way
ni._fetchValue = _n1ql
ni.conf.parameters = {"GET": "name=luther&password=x"}
def tearDown(self):
ni._fetch, ni._fetchValue = self._f, self._fv
def test_resolve_disambiguates_couchbase(self):
vector = ni._resolve("GET", "password", "password")
self.assertEqual(vector.dbms, "Couchbase")
self.assertEqual(vector.bypass, "' OR '1'='1")
def test_extract(self):
vector = ni._resolve("GET", "password", "password")
self.assertEqual(ni._extract(vector.template, vector.fetch, vector.lengthValue, vector.charValue, vector.truth), SECRET)
def _whereTruth(payload):
# emulate the $where timing oracle: a payload "delays" (=> True) iff its embedded JS condition holds
m = re.search(r"length>=(\d+)", payload)
if m:
return len(SECRET) >= int(m.group(1))
m = re.search(r"/\^([^/]*)/\.test", payload)
if m:
return re.search("^" + m.group(1), SECRET) is not None
return False
class TestNoSqlWhere(unittest.TestCase):
"""MongoDB $where time-based: validates the server-side-JS payload shapes and the time-based
extraction loop (timing predicate emulated deterministically)"""
def setUp(self):
ni.conf.timeSec = 5
def test_extract(self):
key = "password"
lengthValue = lambda n: ni._whereDelay("d.%s&&d.%s.length>=%d" % (key, key, n))
charValue = lambda known, klass: ni._whereDelay("d.%s&&/^%s%s/.test(d.%s)" % (key, ni._javaEscape(known), klass, key))
self.assertEqual(ni._extract(None, None, lengthValue, charValue, _whereTruth), SECRET)
def _jswhere(place, parameter, value):
# emulate a content-bearing MongoDB $where (server-side JavaScript) endpoint
if " OR " in value or " =~ " in value: # not valid JS -> consistent (non-diverging) error
return ""
m = re.search(r"/(.)/\.test\('x'\)", value) # JS regexp-test disambiguation probe
if m:
return MATCH if re.search(m.group(1), "x") is not None else NOMATCH
m = re.search(r"/\^([^/]*)/\.test\(this\.password\)", value) # value extraction
if m:
try:
return MATCH if re.search("^" + m.group(1), SECRET) is not None else NOMATCH
except re.error:
return NOMATCH
m = re.search(r"length>=(\d+)", value) # length search
if m:
return MATCH if len(SECRET) >= int(m.group(1)) else NOMATCH
if "'1'=='1" in value or "this.password)" in value: # boolean detection / bound always-true template
return MATCH
return NOMATCH
class TestNoSqlWhereContent(unittest.TestCase):
"""Content-bearing MongoDB $where shares the ' || '1'=='1 break-out with ArangoDB; _resolve() must
disambiguate (AQL '=~' fails, a JS /re/.test() holds) and extract via the content oracle"""
def setUp(self):
self._f, self._fv = ni._fetch, ni._fetchValue
ni._fetch = lambda *args, **kwargs: ""
ni._fetchValue = _jswhere
ni.conf.parameters = {"GET": "username=luther&password=x"}
def tearDown(self):
ni._fetch, ni._fetchValue = self._f, self._fv
def test_resolve_where_content(self):
vector = ni._resolve("GET", "password", "password")
self.assertEqual(vector.dbms, "MongoDB ($where)")
self.assertEqual(vector.bypass, "' || '1'=='1")
def test_extract(self):
vector = ni._resolve("GET", "password", "password")
self.assertEqual(ni._extract(vector.template, vector.fetch, vector.lengthValue, vector.charValue, vector.truth), SECRET)
class TestNoSqlWhereDump(unittest.TestCase):
"""$where whole-document dump: Object.keys(this) enumeration drives name + value recovery for every
field (per-field char recovery itself is covered by TestNoSqlWhere)"""
DOC = [("id", "1"), ("username", "luther"), ("password", "s3cr3t"), ("role", "admin")]
def setUp(self):
self._orig = ni._whereField
names = [name for name, _ in self.DOC]
values = dict(self.DOC)
def fake(place, parameter, bound, expr, threshold):
m = re.search(r"Object\.keys\(d\)\[(\d+)\]", expr)
if m:
index = int(m.group(1))
return names[index] if index < len(names) else None
m = re.search(r"d\['([^']*)'\]", expr)
if m:
return values.get(m.group(1))
return None
ni._whereField = fake
def tearDown(self):
ni._whereField = self._orig
def test_dump(self):
columns, rows = ni._whereDump("GET", "password", "", 0)
self.assertEqual(columns, ["id", "username", "password", "role"])
self.assertEqual(rows, [["1", "luther", "s3cr3t", "admin"]])
def test_empty_document(self):
ni._whereField = lambda *args, **kwargs: None
self.assertIsNone(ni._whereDump("GET", "password", "", 0))
class TestNoSqlEnumDump(unittest.TestCase):
"""Content-based whole-document dump (e.g. Neo4j keys(u)): enumerate field names then values"""
DOC = [("id", "1"), ("username", "luther"), ("password", "s3cr3t"), ("role", "admin")]
def setUp(self):
self._ef, self._fv = ni._enumField, ni._fetchValue
ni._fetchValue = lambda *args, **kwargs: "Welcome" # non-error single-record template
names = [name for name, _ in self.DOC]
values = dict(self.DOC)
def fake(place, parameter, template, payloadFor):
probe = payloadFor("X") # render to inspect the target expression
m = re.search(r"\(u\)\[(\d+)\]", probe) # keys/ATTRIBUTES/OBJECT_NAMES(u)[i]
if m:
index = int(m.group(1))
return names[index] if index < len(names) else None
m = re.search(r"u\['([^']*)'\]", probe) # toString/TO_STRING/TOSTRING(u['name'])
if m:
return values.get(m.group(1))
return None
ni._enumField = fake
def tearDown(self):
ni._enumField, ni._fetchValue = self._ef, self._fv
def _check(self, keysExpr, valueExpr):
makePayload = lambda expr, rb: "X' OR %s =~ '^%s.*" % (expr, rb)
columns, rows = ni._enumDump("GET", "password", makePayload, keysExpr, valueExpr)
self.assertEqual(columns, ["id", "username", "password", "role"])
self.assertEqual(rows, [["1", "luther", "s3cr3t", "admin"]])
def test_cypher(self):
self._check(lambda i: "keys(u)[%d]" % i, lambda n: "toString(u[%s])" % ni._propLiteral(n))
def test_aql(self):
self._check(lambda i: "ATTRIBUTES(u)[%d]" % i, lambda n: "TO_STRING(u[%s])" % ni._propLiteral(n))
def test_n1ql(self):
self._check(lambda i: "OBJECT_NAMES(u)[%d]" % i, lambda n: "TOSTRING(u[%s])" % ni._propLiteral(n))
class TestNoSqlBypass(unittest.TestCase):
"""Confirmed injection must surface the always-true (authentication/filter bypass) payload"""
def setUp(self):
self._f = ni._fetch
ni._fetch = _mongo
def tearDown(self):
ni._fetch = self._f
def test_mongo_bypass(self):
vector = ni._resolve("GET", "password", "password")
self.assertEqual(vector.dbms, "MongoDB")
self.assertEqual(vector.bypass, '{"$ne": null}')
class TestNoSqlInband(unittest.TestCase):
"""In-band exposure gate: _inband() returns the always-true response only when it carries
materially more reflected content than the original request"""
def setUp(self):
self._fv = ni._fetchValue
ni.conf.parameters = {"GET": "id=1"}
def tearDown(self):
ni._fetchValue = self._fv
def test_exposure_detected(self):
ni._fetchValue = lambda place, parameter, value: "
1
luther
" # original (one row)
template = "
1
luther
2
fluffy
3
wu
"
self.assertEqual(ni._inband("GET", "id", template), template)
def test_no_exposure_when_not_larger(self):
ni._fetchValue = lambda place, parameter, value: "X" * 200 # original (large)
self.assertIsNone(ni._inband("GET", "id", "Welcome")) # always-true smaller -> no dump
class TestNoSqlRecords(unittest.TestCase):
"""Reflected responses are parsed into (columns, rows) for a regular table dump"""
def test_html_table_without_header(self):
page = ("Results: