From 62a7bf3b03c60d8b31ccf28e24a4d6e010fa9f7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0tampar?= Date: Wed, 1 Jul 2026 15:19:30 +0200 Subject: [PATCH] Adding tests for http2 functionality --- data/txt/sha256sums.txt | 5 +- lib/core/settings.py | 2 +- lib/request/http2.py | 43 +++++- tests/test_http2.py | 283 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 329 insertions(+), 4 deletions(-) create mode 100644 tests/test_http2.py diff --git a/data/txt/sha256sums.txt b/data/txt/sha256sums.txt index 956c8865d..751527657 100644 --- a/data/txt/sha256sums.txt +++ b/data/txt/sha256sums.txt @@ -189,7 +189,7 @@ f8de57606325456928e46ae2896f5f8bbec9ad18b1c644b492a566fa992216f6 lib/core/decor 9bf174058f15d14e24e94f9aaf42df045119d3617c6c54bd2f3af79b462f331d lib/core/replication.py 0b8c38a01bb01f843d94a6c5f2075ee47520d0c4aa799cecea9c3e2c5a4a23a6 lib/core/revision.py 888daba83fd4a34e9503fe21f01fef4cc730e5cde871b1d40e15d4cbc847d56c lib/core/session.py -61024490352e898a43f1cb001fb79276d185ef3579b6230df46badf573336833 lib/core/settings.py +39884227376b9370b8ef246d791b98346a7acba146f9ca12a5bf540a252b31ba lib/core/settings.py c7804223319e18eb0b8e2cbf0a8b6896d1cefb7b0b1a2e9f1cf826a8a3b56750 lib/core/shell.py a2e98a94b231432736d6b304fc75525c8b5fdb4768c418387c5b4c1a610dad64 lib/core/subprocessng.py 19f1e3c5e3ba703d28d510cd7a9ab8284d5fbe9df5ce7e77c86e5931571364b7 lib/core/target.py @@ -215,7 +215,7 @@ bc61bc944b81a7670884f82231033a6ac703324b34b071c9834886a92e249d0e lib/request/ch c96deaa69743d2cf4ae48f2ae0036f7e11b838f97a0e8c7f1205c61e9dd36bc1 lib/request/connect.py 8e06682280fce062eef6174351bfebcb6040e19976acff9dc7b3699779783498 lib/request/direct.py a6b37b436838caeb197fea858d0a39fadbff4736256e741b5fcec1f28fcf1ce0 lib/request/dns.py -21e8e2d44788b124f741b76a483ce9528ca53ff6da6691808ee679fe91128050 lib/request/http2.py +3afb06089f2801d5a12458a313b278db62c17a8d8fd3b8c46f07670699119af3 lib/request/http2.py 92c81cc31ff4a396723242058fb2152c9e9745f8412d01ea74480b048a53af6c lib/request/httpshandler.py 1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/request/__init__.py 7a0ac2522213e756348fd871a7af74cc963bdc82f9d7ade57be5de42b5bf7cab lib/request/inject.py @@ -622,6 +622,7 @@ f1f38f8b8ca667caadcb027d1a20eb895be4ef0935511114db235e66903bb463 tests/test_gra cc7677bc6c568c395112c1aa7d01e1d664e4d5940c86cb4d44987172864bae6f tests/test_hash_crack.py 0336c875dd2b6554bff6eafd746229e38c69ca8070cd933d45cf27c82ef3e05f tests/test_hashdb.py c04e8358fb6df45f69f2f26435c971acde280535bf304e84d30cf2681158c6a7 tests/test_hash.py +b23bf934dafe54c241761517a7b8c139159aa4b941db10832a626a51fea81e35 tests/test_http2.py d539d0ae758b5bb91e314ab82ab4fe03d6fb2f8b377d16aefa6d7d1d77a7d5a9 tests/test_identifiers_output.py 5372270b7ed82b62f273c2e9bd1f7ecd8605371e66cd0ad70663762cb08d42f1 tests/test_inference_engine.py 0fc7bd9bae4fbd09f51027780b7a8e72eab73810dccdfdf87ed9e489e6e671c9 tests/test_ldap.py diff --git a/lib/core/settings.py b/lib/core/settings.py index 0c7de36ad..b844d9470 100644 --- a/lib/core/settings.py +++ b/lib/core/settings.py @@ -20,7 +20,7 @@ from lib.core.enums import OS from thirdparty import six # sqlmap version (...) -VERSION = "1.10.7.4" +VERSION = "1.10.7.5" TYPE = "dev" if VERSION.count('.') > 2 and VERSION.split('.')[-1] != '0' else "stable" TYPE_COLORS = {"dev": 33, "stable": 90, "pip": 34} VERSION_STRING = "sqlmap/%s#%s" % ('.'.join(VERSION.split('.')[:-1]) if VERSION.count('.') > 2 and VERSION.split('.')[-1] == '0' else VERSION, TYPE) diff --git a/lib/request/http2.py b/lib/request/http2.py index 2af00c69e..81351db4c 100644 --- a/lib/request/http2.py +++ b/lib/request/http2.py @@ -154,6 +154,11 @@ FLAG_PRIORITY = 0x20 CONNECTION_PREFACE = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" def encode_frame(ftype, flags, stream_id, payload=b""): + """Serialize an HTTP/2 frame (RFC 7540 s4.1): 24-bit length + type + flags + 31-bit stream id. + + >>> decode_frame_header(encode_frame(HEADERS, FLAG_END_HEADERS, 1, b'abc')[:9]) + (3, 1, 4, 1) + """ if len(payload) > 0xffffff: raise ValueError("frame payload exceeds 24-bit length") header = struct.pack("!I", len(payload))[1:] # 24-bit length (drop MSB of the 32-bit pack) @@ -161,6 +166,11 @@ def encode_frame(ftype, flags, stream_id, payload=b""): return header + payload def decode_frame_header(nine): + """Parse the 9-byte frame header into (length, type, flags, stream_id); the reserved high bit of the stream id is masked off. + + >>> decode_frame_header(encode_frame(DATA, 0, 0x80000001, b'')[:9]) + (0, 0, 0, 1) + """ if len(nine) != 9: raise ValueError("frame header must be exactly 9 bytes") length = struct.unpack("!I", b"\x00" + nine[:3])[0] @@ -169,6 +179,13 @@ def decode_frame_header(nine): # ---------- Huffman ---------- def huffman_encode(data): + """Huffman-encode a byte string per the RFC 7541 static table (s5.2), padding with EOS 1-bits. + + >>> huffman_decode(huffman_encode(b'www.example.com')) == b'www.example.com' + True + >>> huffman_encode(b'') == b'' + True + """ if not data: return b"" acc = 0 @@ -224,6 +241,13 @@ def huffman_decode(data): # ---------- integer / string (RFC 7541 5.1 / 5.2) ---------- def encode_integer(value, prefix_bits, first_byte=0): + """Encode an integer with an N-bit prefix (RFC 7541 s5.1); the C.1.2 example is 1337 / 5-bit prefix. + + >>> list(encode_integer(10, 5)) + [10] + >>> list(encode_integer(1337, 5)) + [31, 154, 10] + """ mask = (1 << prefix_bits) - 1 if value < mask: return bytearray([first_byte | value]) @@ -236,6 +260,11 @@ def encode_integer(value, prefix_bits, first_byte=0): return out def decode_integer(data, pos, prefix_bits): + """Decode an N-bit-prefixed integer, returning (value, new_pos) (RFC 7541 s5.1). + + >>> decode_integer(bytearray([31, 154, 10]), 0, 5) + (1337, 3) + """ mask = (1 << prefix_bits) - 1 value = data[pos] & mask pos += 1 @@ -296,6 +325,11 @@ class Decoder(object): return self.dynamic[index] def decode(self, data): + """Decode an HPACK header block into a list of (name, value) byte pairs (RFC 7541 s6). + + >>> Decoder().decode(bytes(bytearray([0x82, 0x86, 0x84]))) == [(b':method', b'GET'), (b':scheme', b'http'), (b':path', b'/')] + True + """ data = bytearray(data) pos = 0 headers = [] @@ -469,7 +503,14 @@ def h2_request(host, port=443, method="GET", path="/", authority=None, headers=N class H2Response(object): """A urllib-response-compatible wrapper around a native HTTP/2 response, so the rest of sqlmap's - request pipeline can consume it exactly like a urllib response (code/msg/info()/read()/geturl()).""" + request pipeline can consume it exactly like a urllib response (code/msg/info()/read()/geturl()). + + >>> r = H2Response('https://x/', 200, [(b':status', b'200'), (b'content-type', b'text/html')], b'body') + >>> (r.code, r.msg, r.read() == b'body', r.geturl()) + (200, 'OK', True, 'https://x/') + >>> ':status' in r.info() + False + """ def __init__(self, url, status, headers, body): self.url = url diff --git a/tests/test_http2.py b/tests/test_http2.py new file mode 100644 index 000000000..7c7626481 --- /dev/null +++ b/tests/test_http2.py @@ -0,0 +1,283 @@ +#!/usr/bin/env python + +""" +Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org) +See the file 'LICENSE' for copying permission + +Unit coverage for the PURE (network-free) parts of the native HTTP/2 client in +lib/request/http2.py: the RFC 7540 frame codec, the RFC 7541 HPACK integer / +Huffman / string primitives, the HPACK Decoder/Encoder (static + dynamic table), +and the urllib-compatible H2Response wrapper. + +Nothing here opens a socket or negotiates TLS - only the deterministic codecs and +the response adapter are exercised. Known vectors are the canonical RFC 7541 +examples; everything else is a round-trip / invariant check. + +stdlib unittest only (no pytest / no pip); works on Python 2.7 and 3.x. +""" + +import binascii +import os +import sys +import unittest + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from _testutils import bootstrap +bootstrap() + +from lib.request.http2 import ( + Decoder, + Encoder, + H2Response, + REDIRECT_CODES, + STATIC_LEN, + STATIC_TABLE, + DATA, + HEADERS, + FLAG_END_HEADERS, + FLAG_END_STREAM, + decode_frame_header, + decode_integer, + decode_string, + encode_frame, + encode_integer, + encode_string, + huffman_decode, + huffman_encode, +) + + +def _b(*ints): + # build a bytes object from ints (identical on Python 2 and 3) + return bytes(bytearray(ints)) + + +class TestFrameCodec(unittest.TestCase): + def test_roundtrip(self): + header = encode_frame(HEADERS, FLAG_END_HEADERS, 1, b"abc")[:9] + self.assertEqual(decode_frame_header(header), (3, HEADERS, FLAG_END_HEADERS, 1)) + + def test_payload_is_appended_verbatim(self): + frame = encode_frame(DATA, 0, 1, b"hello") + self.assertEqual(frame[9:], b"hello") + + def test_reserved_stream_bit_is_masked(self): + # the high (reserved) bit of the 31-bit stream id must be dropped on both ends + header = encode_frame(DATA, 0, 0x80000001, b"")[:9] + self.assertEqual(decode_frame_header(header), (0, DATA, 0, 1)) + + def test_zero_length_payload(self): + header = encode_frame(DATA, FLAG_END_STREAM, 1, b"")[:9] + length, _, flags, _ = decode_frame_header(header) + self.assertEqual(length, 0) + self.assertEqual(flags, FLAG_END_STREAM) + + def test_oversized_payload_rejected(self): + with self.assertRaises(ValueError): + encode_frame(DATA, 0, 1, b"x" * (0xFFFFFF + 1)) + + def test_bad_header_length_rejected(self): + with self.assertRaises(ValueError): + decode_frame_header(b"123") + + +class TestIntegerCoding(unittest.TestCase): + def test_rfc_c11_small(self): + # RFC 7541 C.1.1: 10 with a 5-bit prefix fits in the prefix + self.assertEqual(list(encode_integer(10, 5)), [10]) + + def test_rfc_c12_multibyte(self): + # RFC 7541 C.1.2: 1337 with a 5-bit prefix + self.assertEqual(list(encode_integer(1337, 5)), [31, 154, 10]) + self.assertEqual(decode_integer(bytearray([31, 154, 10]), 0, 5), (1337, 3)) + + def test_rfc_c13_full_byte_prefix(self): + # RFC 7541 C.1.3: 42 starting from a full (8-bit prefix at an octet boundary) + self.assertEqual(list(encode_integer(42, 8)), [42]) + + def test_roundtrip_across_prefixes(self): + for prefix in (4, 5, 6, 7, 8): + for value in (0, 1, 2, 30, 31, 32, 127, 128, 255, 256, 16384, 1000000): + encoded = bytearray(encode_integer(value, prefix)) + decoded, pos = decode_integer(encoded, 0, prefix) + self.assertEqual(decoded, value) + self.assertEqual(pos, len(encoded)) + + def test_first_byte_bits_preserved(self): + # a caller-supplied opcode in the high bits must survive a small value + self.assertEqual(bytearray(encode_integer(5, 7, 0x80))[0], 0x80 | 5) + + +class TestHuffman(unittest.TestCase): + def test_known_vector_www_example_com(self): + # RFC 7541 C.4.1 + self.assertEqual(binascii.hexlify(huffman_encode(b"www.example.com")), b"f1e3c2e5f23a6ba0ab90f4ff") + + def test_empty(self): + self.assertEqual(huffman_encode(b""), b"") + self.assertEqual(huffman_decode(b""), b"") + + def test_roundtrip(self): + for sample in (b"a", b"hello world", b"/index.html?a=1&b=2", + b"GET", b"application/json", b"ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789", + bytes(bytearray(range(256)))): + self.assertEqual(huffman_decode(huffman_encode(sample)), sample) + + def test_shrinks_typical_text(self): + sample = b"www.example.com" + self.assertLess(len(huffman_encode(sample)), len(sample)) + + def test_padding_too_long_rejected(self): + # 0xfe walks eight 1-bits into a long (unterminated) code -> more than a byte of padding + with self.assertRaises(ValueError): + huffman_decode(_b(0xFE)) + + +class TestStringCoding(unittest.TestCase): + def test_huffman_branch_roundtrip(self): + encoded = encode_string(b"custom-value") + self.assertTrue(bytearray(encoded)[0] & 0x80) # huffman flag set for compressible text + self.assertEqual(decode_string(bytearray(encoded), 0), (b"custom-value", len(encoded))) + + def test_literal_branch_when_huffman_would_not_shrink(self): + encoded = encode_string(_b(0xFF)) + self.assertFalse(bytearray(encoded)[0] & 0x80) # falls back to a literal string + self.assertEqual(decode_string(bytearray(encoded), 0), (_b(0xFF), len(encoded))) + + def test_disable_huffman(self): + encoded = encode_string(b"abc", huffman=False) + self.assertFalse(bytearray(encoded)[0] & 0x80) + self.assertEqual(decode_string(bytearray(encoded), 0), (b"abc", len(encoded))) + + +class TestHpackDecoder(unittest.TestCase): + def test_indexed_static_entries(self): + # 0x82/0x86/0x84 -> static indices 2, 6, 4 + self.assertEqual( + Decoder().decode(_b(0x82, 0x86, 0x84)), + [(b":method", b"GET"), (b":scheme", b"http"), (b":path", b"/")], + ) + + def test_static_lookup_bounds(self): + d = Decoder() + self.assertEqual(d._get(1), (b":authority", b"")) + self.assertEqual(d._get(2), (b":method", b"GET")) + self.assertEqual(d._get(STATIC_LEN), STATIC_TABLE[-1]) + + def test_index_zero_rejected(self): + with self.assertRaises(ValueError): + Decoder()._get(0) + + def test_index_out_of_range_rejected(self): + with self.assertRaises(ValueError): + Decoder()._get(STATIC_LEN + 1) # no dynamic entries yet + + def test_literal_incremental_indexing_populates_dynamic_table(self): + # 0x40 = literal with incremental indexing, new name + block = bytearray([0x40]) + encode_string(b"custom-key") + encode_string(b"custom-value") + d = Decoder() + self.assertEqual(d.decode(bytes(block)), [(b"custom-key", b"custom-value")]) + # entry is now addressable at the first dynamic index (STATIC_LEN + 1) + self.assertEqual(d._get(STATIC_LEN + 1), (b"custom-key", b"custom-value")) + self.assertEqual(d._size, 32 + len(b"custom-key") + len(b"custom-value")) + + def test_literal_without_indexing_does_not_touch_dynamic_table(self): + block = bytearray([0x00]) + encode_string(b"k") + encode_string(b"v") + d = Decoder() + self.assertEqual(d.decode(bytes(block)), [(b"k", b"v")]) + self.assertEqual(d.dynamic, []) + + def test_dynamic_table_eviction(self): + d = Decoder(max_size=40) # each 2+2 byte entry costs 32+2+2 = 36 + d._add(b"aa", b"bb") + self.assertEqual(len(d.dynamic), 1) + d._add(b"cc", b"dd") # 72 > 40 -> oldest evicted + self.assertEqual(d.dynamic, [(b"cc", b"dd")]) + self.assertEqual(d._size, 36) + + def test_dynamic_size_update_clears(self): + d = Decoder() + d._add(b"x", b"y") + d.decode(_b(0x20)) # 0x20 = dynamic table size update to 0 + self.assertEqual(d.max_size, 0) + self.assertEqual(d.dynamic, []) + + +class TestHpackEncoderRoundTrip(unittest.TestCase): + def test_roundtrip_through_decoder(self): + headers = [ + (b":method", b"GET"), + (b":scheme", b"https"), + (b":path", b"/a/b?c=d"), + (b":authority", b"example.com"), + (b"user-agent", b"sqlmap"), + (b"accept", b""), # empty value + (b"x-custom", b"\x00\x01\xff"), # non-ASCII value + ] + self.assertEqual(Decoder().decode(Encoder().encode(headers)), headers) + + def test_encoder_output_is_bytes(self): + self.assertIsInstance(Encoder().encode([(b"a", b"b")]), bytes) + + +class TestH2Response(unittest.TestCase): + def _make(self, status=200, headers=None, body=b"body"): + headers = headers if headers is not None else [(b":status", b"200"), (b"content-type", b"text/html")] + return H2Response("https://target/x", status, headers, body) + + def test_basic_fields(self): + r = self._make() + self.assertEqual(r.code, 200) + self.assertEqual(r.status, 200) + self.assertEqual(r.msg, "OK") + self.assertEqual(r.http_version, "HTTP/2.0") + self.assertEqual(r.geturl(), "https://target/x") + + def test_unknown_status_message(self): + self.assertEqual(self._make(status=799).msg, "") + + def test_pseudo_headers_stripped(self): + r = self._make() + self.assertNotIn(":status", r.info()) + self.assertEqual(r.info().get("content-type"), "text/html") + + def test_read_full_then_empty(self): + r = self._make(body=b"hello") + self.assertEqual(r.read(), b"hello") + self.assertEqual(r.read(), b"") # offset exhausted + + def test_read_in_chunks(self): + r = self._make(body=b"abcdef") + self.assertEqual(r.read(2), b"ab") + self.assertEqual(r.read(3), b"cde") + self.assertEqual(r.read(10), b"f") # asking past the end returns the remainder + self.assertEqual(r.read(10), b"") + + def test_str_header_names_accepted(self): + # headers may arrive already decoded to str (not only bytes) + r = H2Response("https://t/", 200, [("content-type", "application/json")], b"{}") + self.assertEqual(r.info().get("content-type"), "application/json") + + def test_mimetools_style_headers_list(self): + # patchHeaders() relies on a '.headers' list of "Name: value\r\n" lines being present + r = self._make() + self.assertTrue(hasattr(r.info(), "headers")) + self.assertIn("content-type: text/html\r\n", r.info().headers) + + def test_close_is_noop(self): + self.assertIsNone(self._make().close()) + + +class TestConstants(unittest.TestCase): + def test_redirect_codes(self): + for code in (301, 302, 303, 307, 308): + self.assertIn(code, REDIRECT_CODES) + self.assertNotIn(200, REDIRECT_CODES) + + def test_static_table_length(self): + self.assertEqual(STATIC_LEN, len(STATIC_TABLE)) + self.assertEqual(STATIC_LEN, 61) # RFC 7541 Appendix A + + +if __name__ == "__main__": + unittest.main(verbosity=2)