Adding support for better JSON comparison

This commit is contained in:
Miroslav Štampar 2026-06-16 10:02:44 +02:00
parent cc7f803d60
commit a0cbfba9bd
7 changed files with 256 additions and 32 deletions

View file

@ -160,7 +160,7 @@ ca86d61d3349ed2d94a6b164d4648cff9701199b5e32378c3f40fca0f517b128 extra/shutils/
df768bcb9838dc6c46dab9b4a877056cb4742bd6cfaaf438c4a3712c5cc0d264 extra/shutils/recloak.sh
1972990a67caf2d0231eacf60e211acf545d9d0beeb3c145a49ba33d5d491b3f extra/shutils/strip.sh
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 extra/vulnserver/__init__.py
9e5e4d3d9acb767412259895a3ee75e1a5f42d0b9923f17605d771db384a6f60 extra/vulnserver/vulnserver.py
072a2c19162cc4e76476cf474134f18a5ec45cce9a4e4d216dad8e7a71ece048 extra/vulnserver/vulnserver.py
b8411d1035bb49b073476404e61e1be7f4c61e205057730e2f7880beadcd5f60 lib/controller/action.py
6da812281a69c8b7a5181c2f76374dc695e4727b2936042651bacbeda4e6bcc9 lib/controller/checks.py
969737ac9cd3fa7bac8b582a85016bd348ba2087daa3644a570a9127e686363b lib/controller/controller.py
@ -168,7 +168,7 @@ d69e84f1648cdb907f5d2dd454f03874a4613752b07867510145d51d84b3c56f lib/controller
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/controller/__init__.py
b36b085ff1b5797e375c1e2ca3b12c7ab4204f48acd1a1efb075cff8302d9750 lib/core/agent.py
ca3e5ce56cb1cae0a8e815425ab6810068004bffe8861d1037c7c87c0ae02477 lib/core/bigarray.py
7fc5a845a78e6fb7b1a2fdef2fe529510ac5f2c9fac78de588844b4a8c1504e1 lib/core/common.py
734a00fd87c67cde48d9ab9b5cdfa8b064300939898c4de2636e91d16a4223ba lib/core/common.py
8f1272487e1adfcc8c755a2f56f0c6d21eac5e685a73a9a159482f9dc9142bc5 lib/core/compat.py
742bce10b97034966021ec60c7ac294db4af4fe7893613d63172a02c29f009f8 lib/core/convert.py
c03dc585f89642cfd81b087ac2723e3e1bb3bfa8c60e6f5fe58ef3b0113ebfe6 lib/core/data.py
@ -189,11 +189,11 @@ ccc4a717e887652b1fcce073d9409d9c59a3b28548c703a9e453d15845f90cd7 lib/core/patch
48797d6c34dd9bb8a53f7f3794c85f4288d82a9a1d6be7fcf317d388cb20d4b3 lib/core/replication.py
0b8c38a01bb01f843d94a6c5f2075ee47520d0c4aa799cecea9c3e2c5a4a23a6 lib/core/revision.py
888daba83fd4a34e9503fe21f01fef4cc730e5cde871b1d40e15d4cbc847d56c lib/core/session.py
878a1bbd202fa07ded97ab33e630b196e159aec49a6377d01247c4ccb1152a37 lib/core/settings.py
222177a7a8e4c16ec4eae9f9542794ebf46a34b29390e967fe9fc26189261372 lib/core/settings.py
cd5a66deee8963ba8e7e9af3dd36eb5e8127d4d68698811c29e789655f507f82 lib/core/shell.py
bcb5d8090d5e3e0ef2a586ba09ba80eef0c6d51feb0f611ed25299fbb254f725 lib/core/subprocessng.py
70ea3768f1b3062b22d20644df41c86238157ec80dd43da40545c620714273c6 lib/core/target.py
daf2ad65fcea430b6272e3c538022c9871fdc3aba78f71669130fb0bc954c78e lib/core/testing.py
40b703993441fcd10ab06545b7dbe4a4762ab1ff517592a7e104a52785e62586 lib/core/testing.py
e3e653364d08d04d7492aa40a2bd29c6a28f4d78fecdd6c10f21f6cb28b98b4c lib/core/threads.py
b9aacb840310173202f79c2ba125b0243003ee6b44c92eca50424f2bdfc83c02 lib/core/unescaper.py
53e396902cb2546eaa09e77073fcba8be8827ee9ce055cfc899e81b0e6ad4d6d lib/core/update.py
@ -211,7 +211,7 @@ c2f34e27578742e729c2fa9c1d4f0a0d8f8f7f4cf0fc14c62ec817a260c71dec lib/parse/site
1be3da334411657461421b8a26a0f2ff28e1af1e28f1e963c6c92768f9b0847c lib/request/basicauthhandler.py
369484a2999d29f49bf839a329d1686ed94f6ea27c695e027fe08c8da51f30a3 lib/request/basic.py
bc61bc944b81a7670884f82231033a6ac703324b34b071c9834886a92e249d0e lib/request/chunkedhandler.py
09c2d8786fb5280f5f14a7b4345ecb2e7c2ca836ee06a6cf9b51770df923d94c lib/request/comparison.py
390cc4882ba9c76e16a5376ba6d856079e7cb47a3e4ee11925139e637ce05050 lib/request/comparison.py
ec14b5139cd6b03aa167a7b91fab913baf042d4370471390c13eed325eeb245f lib/request/connect.py
8e06682280fce062eef6174351bfebcb6040e19976acff9dc7b3699779783498 lib/request/direct.py
cf019248253a5d7edb7bc474aa020b9e8625d73008a463c56ba2b539d7f2d8ec lib/request/dns.py
@ -571,6 +571,7 @@ d4d7d3525d25ce72bf38bd38b5fdf61144e381993d63be7dc72b2b4811ffab67 tests/test_big
27ad87c0ea377e0657bd6f6a4eaa0e9756aa9d28ec0483bdadeb3f66dcc4660d tests/test_charset.py
9e678a56e16211c49ab4995b6c658d3f122bfa3b357d9e17ff38f5a489ace6ad tests/test_cloak.py
a48c411fea864e6bcd6a1c7e1a35094b8cda8d15088fd9e7b0270542ae20daa9 tests/test_common_helpers.py
899bc085e96d68f8a8cbe0d7e55863e98ef37b73ab0e4234f7d969e31ea2d23a tests/test_comparison_json.py
7b72d4f850bbd059b8e95fceb45a58470354cb7270c99b0e9981aaa189af20d1 tests/test_comparison.py
8593f14a18c4445c58b2e59462adcb761074ac7217cd7c3808519a90ba279bda tests/test_convert.py
5016119bdb57094381afdca35ef29a4a6641e26e4b48a9119f1db633e6123d29 tests/test_datafiles.py

View file

@ -229,6 +229,7 @@ class ReqHandler(BaseHTTPRequestHandler):
self.wfile.write(b"<!DOCTYPE html><html><head><title>vulnserver</title></head><body><h3>GET:</h3><a href='/?id=1'>link</a><hr><h3>POST:</h3><form method='post'>ID: <input type='text' name='id'><input type='submit' value='Submit'></form></body></html>")
else:
code, output = OK, "<body><html>"
contentType = "text/html"
try:
if self.params.get("echo", ""):
@ -247,38 +248,48 @@ class ReqHandler(BaseHTTPRequestHandler):
_cursor.execute("SELECT * FROM users WHERE id=%s LIMIT 0, 1" % self.params["id"])
results = _cursor.fetchall()
output += "<b>SQL results:</b><br>\n"
if self.params.get("code", ""):
if not results:
if self.params.get("json", ""):
# JSON response mode: serialize the SAME query results as application/json
# (exercises the structure-aware comparison oracle end to end). HTML branches
# below are untouched, so existing tests are unaffected.
if self.params.get("code", "") and not results:
code = INTERNAL_SERVER_ERROR
else:
contentType = "application/json"
output = json.dumps({"results": [list(row) for row in results], "count": len(results)})
else:
if results:
output += "<table border=\"1\">\n"
output += "<b>SQL results:</b><br>\n"
for row in results:
output += "<tr>"
for value in row:
output += "<td>%s</td>" % value
output += "</tr>\n"
output += "</table>\n"
if self.params.get("code", ""):
if not results:
code = INTERNAL_SERVER_ERROR
else:
output += "no results found"
if results:
output += "<table border=\"1\">\n"
if not results:
output = "<title>No results</title>" + output
else:
output = "<title>Results</title>" + output
for row in results:
output += "<tr>"
for value in row:
output += "<td>%s</td>" % value
output += "</tr>\n"
output += "</body></html>"
output += "</table>\n"
else:
output += "no results found"
if not results:
output = "<title>No results</title>" + output
else:
output = "<title>Results</title>" + output
output += "</body></html>"
except Exception as ex:
code = INTERNAL_SERVER_ERROR
output = "%s: %s" % (re.search(r"'([^']+)'", str(type(ex))).group(1), ex)
self.send_response(code)
self.send_header("Content-type", "text/html")
self.send_header("Content-type", contentType)
self.send_header("Connection", "close")
if self.raw_requestline.startswith(b"HEAD"):

View file

@ -1442,6 +1442,45 @@ def parseJson(content):
return retVal
def jsonMinimize(content):
"""
Returns an order-independent canonical "leaf-path" projection of a JSON document, used for
structure-aware response comparison (so key reordering / whitespace / number formatting do
not perturb the comparison ratio, while a changed value or array length does). Returns None
(and only None) when content is not parseable JSON, so callers can fall back to text comparison
>>> jsonMinimize('{"b": 2, "a": 1}') == jsonMinimize('{"a":1, "b":2}')
True
>>> jsonMinimize('{"a": {"b": 1}}') == '.a.b=1'
True
>>> jsonMinimize('not json') is None
True
>>> jsonMinimize('{}') == ''
True
"""
try:
data = json.loads(content)
except (ValueError, TypeError):
return None
lines = []
def _walk(obj, path):
if isinstance(obj, dict):
for key in sorted(obj): # sorted keys -> key-order/whitespace immune
_walk(obj[key], "%s.%s" % (path, key))
elif isinstance(obj, (list, tuple)):
lines.append("%s.__len__=%d" % (path, len(obj))) # length change always registers
for index in xrange(len(obj)): # index kept -> order-sensitive (correct for result sets)
_walk(obj[index], "%s[%d]" % (path, index))
else:
lines.append("%s=%s" % (path, obj)) # scalar values kept (boolean detection flips values)
_walk(data, "")
return "\n".join(sorted(lines))
def parsePasswordHash(password):
"""
In case of Microsoft SQL Server password hash value is expanded to its components

View file

@ -20,7 +20,7 @@ from lib.core.enums import OS
from thirdparty import six
# sqlmap version (<major>.<minor>.<month>.<monthly commit>)
VERSION = "1.10.6.118"
VERSION = "1.10.6.119"
TYPE = "dev" if VERSION.count('.') > 2 and VERSION.split('.')[-1] != '0' else "stable"
TYPE_COLORS = {"dev": 33, "stable": 90, "pip": 34}
VERSION_STRING = "sqlmap/%s#%s" % ('.'.join(VERSION.split('.')[:-1]) if VERSION.count('.') > 2 and VERSION.split('.')[-1] == '0' else VERSION, TYPE)

View file

@ -55,6 +55,7 @@ def vulnTest():
("--dummy", ("all tested parameters do not appear to be injectable", "does not seem to be injectable", "there is not at least one", "~might be injectable")),
("-u \"<url>&id2=1\" -p id2 -v 5 --flush-session --level=5 --text-only --test-filter=\"AND boolean-based blind - WHERE or HAVING clause (MySQL comment)\"", ("~1AND",)),
("--list-tampers", ("between", "MySQL", "xforwardedfor")),
("-u \"<url>&json=1\" -p id --flush-session --technique=B --banner", ("Type: boolean-based blind", "banner: '3.")), # JSON-response detection via the structure-aware oracle (no --string hint)
("-r <request> --flush-session -v 5 --test-skip=\"heavy\" --save=<config>", ("CloudFlare", "web application technology: Express", "possible DBMS: 'SQLite'", "User-Agent: foobar", "~Type: time-based blind", "saved command line options to the configuration file")),
("-c <config>", ("CloudFlare", "possible DBMS: 'SQLite'", "User-Agent: foobar", "~Type: time-based blind")),
("-l <log> --flush-session --keep-alive --skip-waf -vvvvv --technique=U --union-from=users --banner --parse-errors", ("banner: '3.", "ORDER BY term out of range", "~xp_cmdshell", "Connection: keep-alive")),

View file

@ -11,6 +11,7 @@ import re
from lib.core.common import extractRegexResult
from lib.core.common import getFilteredPageContent
from lib.core.common import jsonMinimize
from lib.core.common import listToStrValue
from lib.core.common import removeDynamicContent
from lib.core.common import getLastRequestHTTPError
@ -20,6 +21,7 @@ from lib.core.convert import getBytes
from lib.core.data import conf
from lib.core.data import kb
from lib.core.data import logger
from lib.core.enums import HTTP_HEADER
from lib.core.exception import SqlmapNoneDataException
from lib.core.settings import DEFAULT_PAGE_ENCODING
from lib.core.settings import DIFF_TOLERANCE
@ -34,6 +36,20 @@ from lib.core.settings import URI_HTTP_HEADER
from lib.core.threads import getCurrentThreadData
from thirdparty import six
def _isJsonResponse(headers):
"""
Returns True if the response Content-Type indicates a JSON document (e.g. 'application/json'
or a structured suffix like 'application/vnd.api+json')
"""
retVal = False
if headers:
contentType = (headers.get(HTTP_HEADER.CONTENT_TYPE) or "").split(';')[0].strip().lower()
retVal = contentType == "application/json" or contentType.endswith("+json")
return retVal
def comparison(page, headers, code=None, getRatioValue=False, pageLength=None):
if not isinstance(page, (six.text_type, six.binary_type, type(None))):
logger.critical("got page of type %s; repr(page)[:200]=%s" % (type(page), repr(page)[:200]))
@ -97,6 +113,10 @@ def _comparison(page, headers, code, getRatioValue, pageLength):
seqMatcher = threadData.seqMatcher
seqMatcher.set_seq1(kb.pageTemplate)
# raw (pre-dynamic-removal) body, kept for the structured (JSON) comparison path below;
# parsing the raw form avoids removeDynamicContent splicing JSON mid-token
rawPage = page
if page:
# In case of an DBMS error page return None
if kb.errorIsNone and (wasLastResponseDBMSError() or wasLastResponseHTTPError()) and not kb.negativeLogic:
@ -148,12 +168,22 @@ def _comparison(page, headers, code, getRatioValue, pageLength):
else:
seq1, seq2 = None, None
if conf.titles:
seq1 = extractRegexResult(HTML_TITLE_REGEX, seqMatcher.a)
seq2 = extractRegexResult(HTML_TITLE_REGEX, page)
else:
seq1 = getFilteredPageContent(seqMatcher.a, True) if conf.textOnly else seqMatcher.a
seq2 = getFilteredPageContent(page, True) if conf.textOnly else page
# Structure-aware comparison for JSON responses: compare an order-independent
# projection of the parsed bodies instead of raw text, so key reordering/whitespace
# noise does not perturb the ratio while a changed value/array-length does. Engages
# only on a JSON Content-Type with both bodies parseable; any doubt (or an explicit
# --text-only/--titles) falls back to the exact text path below.
if _isJsonResponse(headers) and not (conf.titles or conf.textOnly or kb.nullConnection):
seq1 = jsonMinimize(kb.pageTemplate)
seq2 = jsonMinimize(rawPage)
if seq1 is None or seq2 is None:
if conf.titles:
seq1 = extractRegexResult(HTML_TITLE_REGEX, seqMatcher.a)
seq2 = extractRegexResult(HTML_TITLE_REGEX, page)
else:
seq1 = getFilteredPageContent(seqMatcher.a, True) if conf.textOnly else seqMatcher.a
seq2 = getFilteredPageContent(page, True) if conf.textOnly else page
if seq1 is None or seq2 is None:
return None

View file

@ -0,0 +1,142 @@
#!/usr/bin/env python
"""
Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org)
See the file 'LICENSE' for copying permission
D1 - structure-aware (JSON) detection oracle. Two layers:
* jsonMinimize() (lib/core/common.py): the order-independent leaf-path projection.
* comparison() (lib/request/comparison.py): when the response Content-Type is JSON, the
similarity ratio is computed over that projection instead of raw text - so key
reordering / whitespace noise no longer perturbs it (false-positive fix) and a small
value/structure change is no longer drowned out in a large body (false-negative fix).
The headline tests assert the JSON path is *better* than the text path on the same inputs,
not merely that it runs; and that any non-JSON / unparseable / explicit-mode case falls
back to the exact text behavior (so the HTML oracle is untouched).
"""
import os
import sys
import unittest
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from _testutils import bootstrap
bootstrap()
from lib.core.common import jsonMinimize
from lib.core.data import conf, kb
from lib.core.enums import HTTP_HEADER
from lib.core.settings import UPPER_RATIO_BOUND
from lib.core.threads import getCurrentThreadData
from lib.request.comparison import comparison
class _Headers(object):
"""Minimal stand-in for the per-response headers object the oracle receives."""
def __init__(self, contentType):
self._ct = contentType
def get(self, name, default=None):
return self._ct if (self._ct and name.lower() == HTTP_HEADER.CONTENT_TYPE.lower()) else default
@property
def headers(self):
return ["%s: %s\r\n" % (HTTP_HEADER.CONTENT_TYPE, self._ct)] if self._ct else []
class TestJsonMinimize(unittest.TestCase):
def test_order_and_whitespace_immune(self):
self.assertEqual(jsonMinimize('{"b":2,"a":1}'), jsonMinimize('{ "a": 1,\n "b": 2 }'))
def test_value_flip_differs(self):
self.assertNotEqual(jsonMinimize('{"ok":true}'), jsonMinimize('{"ok":false}'))
def test_array_length_registers(self):
self.assertNotEqual(jsonMinimize('{"r":[1,2,3]}'), jsonMinimize('{"r":[1,2,3,4]}'))
def test_parse_failure_is_none(self):
for bad in ("", "{bad", "<html></html>", "{'a':1}", None):
self.assertIsNone(jsonMinimize(bad))
def test_valid_edge_shapes_are_not_none(self):
# bare array, scalar, and top-level null are valid JSON -> defined (non-None) projections
for ok in ("[1,2]", "42", "null", '"x"'):
self.assertIsNotNone(jsonMinimize(ok))
self.assertEqual(jsonMinimize("{}"), "") # empty object -> empty projection (not None)
class _OracleCase(unittest.TestCase):
_FLAGS = ("string", "notString", "regexp", "code", "titles", "textOnly")
_KB = ("matchRatio", "nullConnection", "heavilyDynamic", "skipSeqMatcher",
"errorIsNone", "negativeLogic", "dynamicMarkings", "testMode", "pageTemplate")
def setUp(self):
self._c = dict((k, conf.get(k)) for k in self._FLAGS)
self._k = dict((k, kb.get(k)) for k in self._KB)
for k in self._FLAGS:
conf[k] = None
kb.nullConnection = kb.heavilyDynamic = kb.skipSeqMatcher = kb.errorIsNone = kb.negativeLogic = kb.testMode = False
kb.dynamicMarkings = []
def tearDown(self):
for k, v in self._c.items():
conf[k] = v
for k, v in self._k.items():
kb[k] = v
def ratio(self, template, page, contentType):
# fresh, uncalibrated comparison each call
kb.matchRatio = None
kb.pageTemplate = template
td = getCurrentThreadData()
td.lastPageTemplate = None
return comparison(page, _Headers(contentType), getRatioValue=True)
class TestStructuredOracle(_OracleCase):
def test_noise_immunity_beats_text(self):
# same data, keys reordered + reindented: JSON path ~identical, text path measurably lower.
# This is D1's core win - reorder/whitespace noise (ubiquitous in real APIs) stops
# perturbing the ratio, which also stabilizes the kb.matchRatio calibration.
a = '{"id":1,"name":"alice","role":"admin"}'
b = '{ "role": "admin",\n "name": "alice",\n "id": 1 }'
jsonRatio = self.ratio(a, b, "application/json")
textRatio = self.ratio(a, b, "text/html")
self.assertGreater(jsonRatio, UPPER_RATIO_BOUND) # JSON: noise ignored -> True
self.assertLess(textRatio, jsonRatio) # text: perturbed by reordering
def test_real_difference_still_detected(self):
# normalization must not over-collapse: a genuinely different value still separates
a = '{"role":"admin"}'
b = '{"role":"guest"}'
self.assertLess(self.ratio(a, b, "application/json"), UPPER_RATIO_BOUND)
def test_html_contenttype_uses_text_path(self):
# identical inputs through a text/html response must equal the pure text baseline
a = '{"id":1,"name":"alice"}'
b = '{ "name": "alice", "id": 1 }'
conf.code = None
self.assertEqual(self.ratio(a, b, "text/html"), self.ratio(a, b, None))
def test_unparseable_json_falls_back(self):
# application/json Content-Type but a non-JSON body -> behaves exactly like the text path
a, b = "<html>x</html>", "<html>y</html>"
self.assertEqual(self.ratio(a, b, "application/json"), self.ratio(a, b, "text/html"))
def test_structured_suffix_contenttype_gated_in(self):
a = '{"id":1,"name":"alice","role":"admin"}'
b = '{ "role":"admin", "name":"alice", "id":1 }'
self.assertGreater(self.ratio(a, b, "application/vnd.api+json; charset=utf-8"), UPPER_RATIO_BOUND)
def test_textonly_escape_hatch_bypasses_json(self):
a = '{"id":1,"name":"alice"}'
b = '{ "name":"alice", "id":1 }'
withJson = self.ratio(a, b, "application/json")
conf.textOnly = True
withoutJson = self.ratio(a, b, "application/json")
self.assertGreater(withJson, withoutJson) # --text-only opts out of the JSON path
if __name__ == "__main__":
unittest.main()