Adding tests for http2 functionality

This commit is contained in:
Miroslav Štampar 2026-07-01 15:19:30 +02:00
parent 3e7d064cc9
commit 62a7bf3b03
4 changed files with 329 additions and 4 deletions

View file

@ -189,7 +189,7 @@ f8de57606325456928e46ae2896f5f8bbec9ad18b1c644b492a566fa992216f6 lib/core/decor
9bf174058f15d14e24e94f9aaf42df045119d3617c6c54bd2f3af79b462f331d lib/core/replication.py
0b8c38a01bb01f843d94a6c5f2075ee47520d0c4aa799cecea9c3e2c5a4a23a6 lib/core/revision.py
888daba83fd4a34e9503fe21f01fef4cc730e5cde871b1d40e15d4cbc847d56c lib/core/session.py
61024490352e898a43f1cb001fb79276d185ef3579b6230df46badf573336833 lib/core/settings.py
39884227376b9370b8ef246d791b98346a7acba146f9ca12a5bf540a252b31ba lib/core/settings.py
c7804223319e18eb0b8e2cbf0a8b6896d1cefb7b0b1a2e9f1cf826a8a3b56750 lib/core/shell.py
a2e98a94b231432736d6b304fc75525c8b5fdb4768c418387c5b4c1a610dad64 lib/core/subprocessng.py
19f1e3c5e3ba703d28d510cd7a9ab8284d5fbe9df5ce7e77c86e5931571364b7 lib/core/target.py
@ -215,7 +215,7 @@ bc61bc944b81a7670884f82231033a6ac703324b34b071c9834886a92e249d0e lib/request/ch
c96deaa69743d2cf4ae48f2ae0036f7e11b838f97a0e8c7f1205c61e9dd36bc1 lib/request/connect.py
8e06682280fce062eef6174351bfebcb6040e19976acff9dc7b3699779783498 lib/request/direct.py
a6b37b436838caeb197fea858d0a39fadbff4736256e741b5fcec1f28fcf1ce0 lib/request/dns.py
21e8e2d44788b124f741b76a483ce9528ca53ff6da6691808ee679fe91128050 lib/request/http2.py
3afb06089f2801d5a12458a313b278db62c17a8d8fd3b8c46f07670699119af3 lib/request/http2.py
92c81cc31ff4a396723242058fb2152c9e9745f8412d01ea74480b048a53af6c lib/request/httpshandler.py
1966ca704961fb987ab757f0a4afddbf841d1a880631b701487c75cef63d60c3 lib/request/__init__.py
7a0ac2522213e756348fd871a7af74cc963bdc82f9d7ade57be5de42b5bf7cab lib/request/inject.py
@ -622,6 +622,7 @@ f1f38f8b8ca667caadcb027d1a20eb895be4ef0935511114db235e66903bb463 tests/test_gra
cc7677bc6c568c395112c1aa7d01e1d664e4d5940c86cb4d44987172864bae6f tests/test_hash_crack.py
0336c875dd2b6554bff6eafd746229e38c69ca8070cd933d45cf27c82ef3e05f tests/test_hashdb.py
c04e8358fb6df45f69f2f26435c971acde280535bf304e84d30cf2681158c6a7 tests/test_hash.py
b23bf934dafe54c241761517a7b8c139159aa4b941db10832a626a51fea81e35 tests/test_http2.py
d539d0ae758b5bb91e314ab82ab4fe03d6fb2f8b377d16aefa6d7d1d77a7d5a9 tests/test_identifiers_output.py
5372270b7ed82b62f273c2e9bd1f7ecd8605371e66cd0ad70663762cb08d42f1 tests/test_inference_engine.py
0fc7bd9bae4fbd09f51027780b7a8e72eab73810dccdfdf87ed9e489e6e671c9 tests/test_ldap.py

View file

@ -20,7 +20,7 @@ from lib.core.enums import OS
from thirdparty import six
# sqlmap version (<major>.<minor>.<month>.<monthly commit>)
VERSION = "1.10.7.4"
VERSION = "1.10.7.5"
TYPE = "dev" if VERSION.count('.') > 2 and VERSION.split('.')[-1] != '0' else "stable"
TYPE_COLORS = {"dev": 33, "stable": 90, "pip": 34}
VERSION_STRING = "sqlmap/%s#%s" % ('.'.join(VERSION.split('.')[:-1]) if VERSION.count('.') > 2 and VERSION.split('.')[-1] == '0' else VERSION, TYPE)

View file

@ -154,6 +154,11 @@ FLAG_PRIORITY = 0x20
CONNECTION_PREFACE = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
def encode_frame(ftype, flags, stream_id, payload=b""):
"""Serialize an HTTP/2 frame (RFC 7540 s4.1): 24-bit length + type + flags + 31-bit stream id.
>>> decode_frame_header(encode_frame(HEADERS, FLAG_END_HEADERS, 1, b'abc')[:9])
(3, 1, 4, 1)
"""
if len(payload) > 0xffffff:
raise ValueError("frame payload exceeds 24-bit length")
header = struct.pack("!I", len(payload))[1:] # 24-bit length (drop MSB of the 32-bit pack)
@ -161,6 +166,11 @@ def encode_frame(ftype, flags, stream_id, payload=b""):
return header + payload
def decode_frame_header(nine):
"""Parse the 9-byte frame header into (length, type, flags, stream_id); the reserved high bit of the stream id is masked off.
>>> decode_frame_header(encode_frame(DATA, 0, 0x80000001, b'')[:9])
(0, 0, 0, 1)
"""
if len(nine) != 9:
raise ValueError("frame header must be exactly 9 bytes")
length = struct.unpack("!I", b"\x00" + nine[:3])[0]
@ -169,6 +179,13 @@ def decode_frame_header(nine):
# ---------- Huffman ----------
def huffman_encode(data):
"""Huffman-encode a byte string per the RFC 7541 static table (s5.2), padding with EOS 1-bits.
>>> huffman_decode(huffman_encode(b'www.example.com')) == b'www.example.com'
True
>>> huffman_encode(b'') == b''
True
"""
if not data:
return b""
acc = 0
@ -224,6 +241,13 @@ def huffman_decode(data):
# ---------- integer / string (RFC 7541 5.1 / 5.2) ----------
def encode_integer(value, prefix_bits, first_byte=0):
"""Encode an integer with an N-bit prefix (RFC 7541 s5.1); the C.1.2 example is 1337 / 5-bit prefix.
>>> list(encode_integer(10, 5))
[10]
>>> list(encode_integer(1337, 5))
[31, 154, 10]
"""
mask = (1 << prefix_bits) - 1
if value < mask:
return bytearray([first_byte | value])
@ -236,6 +260,11 @@ def encode_integer(value, prefix_bits, first_byte=0):
return out
def decode_integer(data, pos, prefix_bits):
"""Decode an N-bit-prefixed integer, returning (value, new_pos) (RFC 7541 s5.1).
>>> decode_integer(bytearray([31, 154, 10]), 0, 5)
(1337, 3)
"""
mask = (1 << prefix_bits) - 1
value = data[pos] & mask
pos += 1
@ -296,6 +325,11 @@ class Decoder(object):
return self.dynamic[index]
def decode(self, data):
"""Decode an HPACK header block into a list of (name, value) byte pairs (RFC 7541 s6).
>>> Decoder().decode(bytes(bytearray([0x82, 0x86, 0x84]))) == [(b':method', b'GET'), (b':scheme', b'http'), (b':path', b'/')]
True
"""
data = bytearray(data)
pos = 0
headers = []
@ -469,7 +503,14 @@ def h2_request(host, port=443, method="GET", path="/", authority=None, headers=N
class H2Response(object):
"""A urllib-response-compatible wrapper around a native HTTP/2 response, so the rest of sqlmap's
request pipeline can consume it exactly like a urllib response (code/msg/info()/read()/geturl())."""
request pipeline can consume it exactly like a urllib response (code/msg/info()/read()/geturl()).
>>> r = H2Response('https://x/', 200, [(b':status', b'200'), (b'content-type', b'text/html')], b'body')
>>> (r.code, r.msg, r.read() == b'body', r.geturl())
(200, 'OK', True, 'https://x/')
>>> ':status' in r.info()
False
"""
def __init__(self, url, status, headers, body):
self.url = url

283
tests/test_http2.py Normal file
View file

@ -0,0 +1,283 @@
#!/usr/bin/env python
"""
Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org)
See the file 'LICENSE' for copying permission
Unit coverage for the PURE (network-free) parts of the native HTTP/2 client in
lib/request/http2.py: the RFC 7540 frame codec, the RFC 7541 HPACK integer /
Huffman / string primitives, the HPACK Decoder/Encoder (static + dynamic table),
and the urllib-compatible H2Response wrapper.
Nothing here opens a socket or negotiates TLS - only the deterministic codecs and
the response adapter are exercised. Known vectors are the canonical RFC 7541
examples; everything else is a round-trip / invariant check.
stdlib unittest only (no pytest / no pip); works on Python 2.7 and 3.x.
"""
import binascii
import os
import sys
import unittest
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from _testutils import bootstrap
bootstrap()
from lib.request.http2 import (
Decoder,
Encoder,
H2Response,
REDIRECT_CODES,
STATIC_LEN,
STATIC_TABLE,
DATA,
HEADERS,
FLAG_END_HEADERS,
FLAG_END_STREAM,
decode_frame_header,
decode_integer,
decode_string,
encode_frame,
encode_integer,
encode_string,
huffman_decode,
huffman_encode,
)
def _b(*ints):
# build a bytes object from ints (identical on Python 2 and 3)
return bytes(bytearray(ints))
class TestFrameCodec(unittest.TestCase):
def test_roundtrip(self):
header = encode_frame(HEADERS, FLAG_END_HEADERS, 1, b"abc")[:9]
self.assertEqual(decode_frame_header(header), (3, HEADERS, FLAG_END_HEADERS, 1))
def test_payload_is_appended_verbatim(self):
frame = encode_frame(DATA, 0, 1, b"hello")
self.assertEqual(frame[9:], b"hello")
def test_reserved_stream_bit_is_masked(self):
# the high (reserved) bit of the 31-bit stream id must be dropped on both ends
header = encode_frame(DATA, 0, 0x80000001, b"")[:9]
self.assertEqual(decode_frame_header(header), (0, DATA, 0, 1))
def test_zero_length_payload(self):
header = encode_frame(DATA, FLAG_END_STREAM, 1, b"")[:9]
length, _, flags, _ = decode_frame_header(header)
self.assertEqual(length, 0)
self.assertEqual(flags, FLAG_END_STREAM)
def test_oversized_payload_rejected(self):
with self.assertRaises(ValueError):
encode_frame(DATA, 0, 1, b"x" * (0xFFFFFF + 1))
def test_bad_header_length_rejected(self):
with self.assertRaises(ValueError):
decode_frame_header(b"123")
class TestIntegerCoding(unittest.TestCase):
def test_rfc_c11_small(self):
# RFC 7541 C.1.1: 10 with a 5-bit prefix fits in the prefix
self.assertEqual(list(encode_integer(10, 5)), [10])
def test_rfc_c12_multibyte(self):
# RFC 7541 C.1.2: 1337 with a 5-bit prefix
self.assertEqual(list(encode_integer(1337, 5)), [31, 154, 10])
self.assertEqual(decode_integer(bytearray([31, 154, 10]), 0, 5), (1337, 3))
def test_rfc_c13_full_byte_prefix(self):
# RFC 7541 C.1.3: 42 starting from a full (8-bit prefix at an octet boundary)
self.assertEqual(list(encode_integer(42, 8)), [42])
def test_roundtrip_across_prefixes(self):
for prefix in (4, 5, 6, 7, 8):
for value in (0, 1, 2, 30, 31, 32, 127, 128, 255, 256, 16384, 1000000):
encoded = bytearray(encode_integer(value, prefix))
decoded, pos = decode_integer(encoded, 0, prefix)
self.assertEqual(decoded, value)
self.assertEqual(pos, len(encoded))
def test_first_byte_bits_preserved(self):
# a caller-supplied opcode in the high bits must survive a small value
self.assertEqual(bytearray(encode_integer(5, 7, 0x80))[0], 0x80 | 5)
class TestHuffman(unittest.TestCase):
def test_known_vector_www_example_com(self):
# RFC 7541 C.4.1
self.assertEqual(binascii.hexlify(huffman_encode(b"www.example.com")), b"f1e3c2e5f23a6ba0ab90f4ff")
def test_empty(self):
self.assertEqual(huffman_encode(b""), b"")
self.assertEqual(huffman_decode(b""), b"")
def test_roundtrip(self):
for sample in (b"a", b"hello world", b"/index.html?a=1&b=2",
b"GET", b"application/json", b"ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789",
bytes(bytearray(range(256)))):
self.assertEqual(huffman_decode(huffman_encode(sample)), sample)
def test_shrinks_typical_text(self):
sample = b"www.example.com"
self.assertLess(len(huffman_encode(sample)), len(sample))
def test_padding_too_long_rejected(self):
# 0xfe walks eight 1-bits into a long (unterminated) code -> more than a byte of padding
with self.assertRaises(ValueError):
huffman_decode(_b(0xFE))
class TestStringCoding(unittest.TestCase):
def test_huffman_branch_roundtrip(self):
encoded = encode_string(b"custom-value")
self.assertTrue(bytearray(encoded)[0] & 0x80) # huffman flag set for compressible text
self.assertEqual(decode_string(bytearray(encoded), 0), (b"custom-value", len(encoded)))
def test_literal_branch_when_huffman_would_not_shrink(self):
encoded = encode_string(_b(0xFF))
self.assertFalse(bytearray(encoded)[0] & 0x80) # falls back to a literal string
self.assertEqual(decode_string(bytearray(encoded), 0), (_b(0xFF), len(encoded)))
def test_disable_huffman(self):
encoded = encode_string(b"abc", huffman=False)
self.assertFalse(bytearray(encoded)[0] & 0x80)
self.assertEqual(decode_string(bytearray(encoded), 0), (b"abc", len(encoded)))
class TestHpackDecoder(unittest.TestCase):
def test_indexed_static_entries(self):
# 0x82/0x86/0x84 -> static indices 2, 6, 4
self.assertEqual(
Decoder().decode(_b(0x82, 0x86, 0x84)),
[(b":method", b"GET"), (b":scheme", b"http"), (b":path", b"/")],
)
def test_static_lookup_bounds(self):
d = Decoder()
self.assertEqual(d._get(1), (b":authority", b""))
self.assertEqual(d._get(2), (b":method", b"GET"))
self.assertEqual(d._get(STATIC_LEN), STATIC_TABLE[-1])
def test_index_zero_rejected(self):
with self.assertRaises(ValueError):
Decoder()._get(0)
def test_index_out_of_range_rejected(self):
with self.assertRaises(ValueError):
Decoder()._get(STATIC_LEN + 1) # no dynamic entries yet
def test_literal_incremental_indexing_populates_dynamic_table(self):
# 0x40 = literal with incremental indexing, new name
block = bytearray([0x40]) + encode_string(b"custom-key") + encode_string(b"custom-value")
d = Decoder()
self.assertEqual(d.decode(bytes(block)), [(b"custom-key", b"custom-value")])
# entry is now addressable at the first dynamic index (STATIC_LEN + 1)
self.assertEqual(d._get(STATIC_LEN + 1), (b"custom-key", b"custom-value"))
self.assertEqual(d._size, 32 + len(b"custom-key") + len(b"custom-value"))
def test_literal_without_indexing_does_not_touch_dynamic_table(self):
block = bytearray([0x00]) + encode_string(b"k") + encode_string(b"v")
d = Decoder()
self.assertEqual(d.decode(bytes(block)), [(b"k", b"v")])
self.assertEqual(d.dynamic, [])
def test_dynamic_table_eviction(self):
d = Decoder(max_size=40) # each 2+2 byte entry costs 32+2+2 = 36
d._add(b"aa", b"bb")
self.assertEqual(len(d.dynamic), 1)
d._add(b"cc", b"dd") # 72 > 40 -> oldest evicted
self.assertEqual(d.dynamic, [(b"cc", b"dd")])
self.assertEqual(d._size, 36)
def test_dynamic_size_update_clears(self):
d = Decoder()
d._add(b"x", b"y")
d.decode(_b(0x20)) # 0x20 = dynamic table size update to 0
self.assertEqual(d.max_size, 0)
self.assertEqual(d.dynamic, [])
class TestHpackEncoderRoundTrip(unittest.TestCase):
def test_roundtrip_through_decoder(self):
headers = [
(b":method", b"GET"),
(b":scheme", b"https"),
(b":path", b"/a/b?c=d"),
(b":authority", b"example.com"),
(b"user-agent", b"sqlmap"),
(b"accept", b""), # empty value
(b"x-custom", b"\x00\x01\xff"), # non-ASCII value
]
self.assertEqual(Decoder().decode(Encoder().encode(headers)), headers)
def test_encoder_output_is_bytes(self):
self.assertIsInstance(Encoder().encode([(b"a", b"b")]), bytes)
class TestH2Response(unittest.TestCase):
def _make(self, status=200, headers=None, body=b"body"):
headers = headers if headers is not None else [(b":status", b"200"), (b"content-type", b"text/html")]
return H2Response("https://target/x", status, headers, body)
def test_basic_fields(self):
r = self._make()
self.assertEqual(r.code, 200)
self.assertEqual(r.status, 200)
self.assertEqual(r.msg, "OK")
self.assertEqual(r.http_version, "HTTP/2.0")
self.assertEqual(r.geturl(), "https://target/x")
def test_unknown_status_message(self):
self.assertEqual(self._make(status=799).msg, "")
def test_pseudo_headers_stripped(self):
r = self._make()
self.assertNotIn(":status", r.info())
self.assertEqual(r.info().get("content-type"), "text/html")
def test_read_full_then_empty(self):
r = self._make(body=b"hello")
self.assertEqual(r.read(), b"hello")
self.assertEqual(r.read(), b"") # offset exhausted
def test_read_in_chunks(self):
r = self._make(body=b"abcdef")
self.assertEqual(r.read(2), b"ab")
self.assertEqual(r.read(3), b"cde")
self.assertEqual(r.read(10), b"f") # asking past the end returns the remainder
self.assertEqual(r.read(10), b"")
def test_str_header_names_accepted(self):
# headers may arrive already decoded to str (not only bytes)
r = H2Response("https://t/", 200, [("content-type", "application/json")], b"{}")
self.assertEqual(r.info().get("content-type"), "application/json")
def test_mimetools_style_headers_list(self):
# patchHeaders() relies on a '.headers' list of "Name: value\r\n" lines being present
r = self._make()
self.assertTrue(hasattr(r.info(), "headers"))
self.assertIn("content-type: text/html\r\n", r.info().headers)
def test_close_is_noop(self):
self.assertIsNone(self._make().close())
class TestConstants(unittest.TestCase):
def test_redirect_codes(self):
for code in (301, 302, 303, 307, 308):
self.assertIn(code, REDIRECT_CODES)
self.assertNotIn(200, REDIRECT_CODES)
def test_static_table_length(self):
self.assertEqual(STATIC_LEN, len(STATIC_TABLE))
self.assertEqual(STATIC_LEN, 61) # RFC 7541 Appendix A
if __name__ == "__main__":
unittest.main(verbosity=2)