Skip to content

Commit a5bd505

Browse files
committed
trampoline overwrite experiment
- 00.check_security_env.py: check security environment - 40.trampoline_single_process_test.py: single process test - 41.analyze_trampoline_offsets.py: analyze trampoline offsets are important.
1 parent c4b4d61 commit a5bd505

13 files changed

+430
-64
lines changed

jit_poc/0. check_security_env.py

Lines changed: 0 additions & 63 deletions
This file was deleted.

jit_poc/00.check_security_env.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import subprocess
2+
import os
3+
import re
4+
5+
SYSCALLS = ["mprotect", "mmap", "execve", "execveat", "mremap", "ptrace"]
6+
7+
def run_command(cmd, require_sudo=False):
8+
"""Run command, adding sudo only if required and not already root"""
9+
if require_sudo and os.geteuid() != 0 and cmd[0] != "sudo":
10+
cmd.insert(0, "sudo")
11+
return subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode()
12+
13+
def check_apparmor():
14+
print("[AppArmor ] Checking AppArmor status...")
15+
try:
16+
output = run_command(["aa-status"]) # no sudo needed
17+
enforce_profiles = re.search(r"(\d+)\s+profiles are in enforce mode", output)
18+
python_profile = "/usr/local/bin/python3.14" in output
19+
if enforce_profiles:
20+
print(f"[AppArmor ] ✅ {enforce_profiles.group(1)} profiles in enforce mode")
21+
else:
22+
print("[AppArmor ] ❌ Not in enforce mode")
23+
print("[AppArmor profile] ✅ Loaded for Python3.14"
24+
if python_profile else "[AppArmor profile] ❌ Not loaded for Python3.14")
25+
except Exception as e:
26+
print(f"[AppArmor ] ❌ Error: {e}")
27+
28+
def check_seccomp():
29+
print("[Seccomp ] Checking seccomp mode...")
30+
try:
31+
with open("/proc/self/status") as f:
32+
for line in f:
33+
if line.startswith("Seccomp:"):
34+
mode = int(line.strip().split()[1])
35+
if mode == 0:
36+
print("[Seccomp ] ✅ Disabled (mode 0)")
37+
elif mode == 1:
38+
print("[Seccomp ] ⚠️ Strict mode (1)")
39+
elif mode == 2:
40+
print("[Seccomp ] ⚠️ Filter mode (2)")
41+
else:
42+
print(f"[Seccomp ] ❓ Unknown mode: {mode}")
43+
return
44+
print("[Seccomp ] ❌ Not found")
45+
except Exception as e:
46+
print(f"[Seccomp ] ❌ Error: {e}")
47+
48+
def check_auditd():
49+
print("[Auditd ] Checking auditd status...")
50+
try:
51+
status = subprocess.check_output(["systemctl", "is-active", "auditd"],
52+
stderr=subprocess.STDOUT).decode().strip()
53+
if status == "active":
54+
print("[Auditd ] ✅ Running (systemd)")
55+
else:
56+
print(f"[Auditd ] ❌ Status: {status}")
57+
except subprocess.CalledProcessError:
58+
print("[Auditd ] ❌ Not running or not installed")
59+
60+
def apply_syscall_rules():
61+
print("[Auditd rules ] ➕ Applying missing syscall rules...")
62+
for syscall in SYSCALLS:
63+
try:
64+
run_command(["auditctl", "-a", "exit,always", "-F", "arch=b64", "-S", syscall], require_sudo=True)
65+
print(f"✅ Rule added for: {syscall}")
66+
except subprocess.CalledProcessError as e:
67+
print(f"❌ Failed to add rule for {syscall}: {e}")
68+
except Exception as e:
69+
print(f"❌ Exception adding rule for {syscall}: {e}")
70+
71+
def check_auditd_rules():
72+
print("[Auditd rules ] Checking syscall audit rules...")
73+
try:
74+
rules_output = run_command(["auditctl", "-l"], require_sudo=True)
75+
syscall_rules = [line for line in rules_output.splitlines() if re.search(r"-S\s+\w+", line)]
76+
77+
if syscall_rules:
78+
print(f"[Auditd rules ] ✅ Found {len(syscall_rules)} syscall rules")
79+
else:
80+
print("[Auditd rules ] ❌ No syscall rules — applying now")
81+
apply_syscall_rules()
82+
83+
except subprocess.CalledProcessError as e:
84+
print(f"[Auditd rules ] ❌ Error: {e.output.decode().strip()}")
85+
except Exception as e:
86+
print(f"[Auditd rules ] ❌ Exception: {e}")
87+
88+
if __name__ == "__main__":
89+
print("=== [ Trampoline Overwrite Experiment Pre-Check + Auto Patch ] ===")
90+
check_apparmor()
91+
check_seccomp()
92+
check_auditd()
93+
check_auditd_rules()
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import ctypes
2+
import mmap
3+
import struct
4+
import os
5+
6+
# Load shellcode (base64로 인코딩된 binary 파일)
7+
with open("shellcode.bin", "rb") as f:
8+
shellcode = f.read()
9+
print(f"[+] Loaded shellcode: {len(shellcode)} bytes")
10+
11+
# Allocate RWX memory using mmap
12+
libc = ctypes.CDLL("libc.so.6")
13+
libc.mmap.restype = ctypes.c_void_p
14+
sc_addr = libc.mmap(None, 0x1000, 7, 0x22, -1, 0)
15+
ctypes.memmove(sc_addr, shellcode, len(shellcode))
16+
print(f"[+] Shellcode mmap @ 0x{sc_addr:x}")
17+
18+
# Allocate trampoline region (RWX도 가능, shellcode 앞에 위치)
19+
tramp_addr = libc.mmap(None, 0x1000, 7, 0x22, -1, 0)
20+
print(f"[+] Trampoline mmap @ 0x{tramp_addr:x} (→ shellcode)")
21+
22+
# Create trampoline code:
23+
# ldr x16, [pc, #8]
24+
# br x16
25+
# .quad shellcode address
26+
trampoline = (
27+
b"\x50\x00\x00\x58" + # ldr x16, [pc, #8]
28+
b"\x00\x02\x1f\xd6" + # br x16
29+
struct.pack("<Q", sc_addr) # target shellcode address
30+
)
31+
ctypes.memmove(tramp_addr, trampoline, len(trampoline))
32+
33+
# Execute the trampoline using function cast
34+
print("[*] Jumping to trampoline (via overwrite)...")
35+
shell_func_type = ctypes.CFUNCTYPE(None)
36+
shell_func = shell_func_type(tramp_addr)
37+
shell_func() # 실제 실행
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
#!/usr/bin/env python3
2+
import ctypes
3+
import mmap
4+
import multiprocessing
5+
import json
6+
import time
7+
import os
8+
import struct
9+
import traceback
10+
import fcntl
11+
12+
# Lightweight shellcode: write "OK" then exit(0)
13+
OK_EXIT_CODE = (
14+
b"\x20\x00\x80\xd2" # movz x0, #1
15+
b"\xe1\x00\x00\x10" # adr x1, #0x20
16+
b"\x42\x00\x80\xd2" # movz x2, #2
17+
b"\x08\x08\x80\xd2" # movz x8, #8 (write)
18+
b"\x01\x00\x00\xd4" # svc #0
19+
b"\x00\x00\x80\xd2" # movz x0, #0
20+
b"\xa8\x0b\x80\xd2" # movz x8, #93 (exit)
21+
b"\x01\x00\x00\xd4" # svc #0
22+
b"OK"
23+
)
24+
25+
def execute_once(run_id):
26+
libc = ctypes.CDLL("libc.so.6")
27+
libc.mmap.restype = ctypes.c_void_p
28+
sc_addr = libc.mmap(None, 0x1000, 7, 0x22, -1, 0) # PROT_RWX, MAP_PRIVATE|MAP_ANON
29+
ctypes.memmove(sc_addr, OK_EXIT_CODE, len(OK_EXIT_CODE))
30+
31+
tramp = (
32+
b"\x50\x00\x00\x58" + # ldr x16, #8
33+
b"\x00\x02\x1f\xd6" + # br x16
34+
struct.pack("<Q", sc_addr)
35+
)
36+
tramp_buf = ctypes.create_string_buffer(tramp)
37+
tramp_addr = ctypes.addressof(tramp_buf)
38+
39+
log = {
40+
"pid": os.getpid(),
41+
"run_id": run_id,
42+
"trampoline_addr": hex(tramp_addr),
43+
"shellcode_addr": hex(sc_addr),
44+
"status": "unknown",
45+
"timestamp": time.time()
46+
}
47+
48+
try:
49+
fn = ctypes.CFUNCTYPE(None)(tramp_addr)
50+
fn()
51+
log["status"] = "success"
52+
except Exception as e:
53+
log["status"] = "fail"
54+
log["error"] = str(e)
55+
log["traceback"] = traceback.format_exc()
56+
57+
# Safe concurrent file write using fcntl lock
58+
with open("trampoline_overwrite_results.jsonl", "a") as f:
59+
fcntl.flock(f, fcntl.LOCK_EX)
60+
f.write(json.dumps(log) + "\n")
61+
fcntl.flock(f, fcntl.LOCK_UN)
62+
63+
if __name__ == "__main__":
64+
multiprocessing.set_start_method("spawn")
65+
pool = multiprocessing.Pool(processes=8)
66+
pool.map(execute_once, list(range(100)))
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
#!/usr/bin/env python3
2+
import ctypes
3+
import mmap
4+
import json
5+
import time
6+
import os
7+
import struct
8+
import traceback
9+
import fcntl
10+
11+
OK_EXIT_CODE = (
12+
b"\x20\x00\x80\xd2" # 0x00: movz x0, #1 ; stdout
13+
b"\xe1\x00\x00\x10" # 0x04: adr x1, #0x20 ; → 0x28
14+
b"\x82\x00\x80\xd2" # 0x08: movz x2, #2 ; length
15+
b"\x08\x08\x80\xd2" # 0x0c: movz x8, #8 ; sys_write
16+
b"\x01\x00\x00\xd4" # 0x10: svc #0
17+
b"\xc0\x03\x5f\xd6" # 0x14: ret
18+
b"\x1f\x20\x03\xd5" # 0x18: nop
19+
b"\x1f\x20\x03\xd5" # 0x1c: nop
20+
b"OK \n" # 0x20: message
21+
)
22+
23+
def execute_once(run_id, offset):
24+
libc = ctypes.CDLL("libc.so.6")
25+
libc.mmap.restype = ctypes.c_void_p
26+
mem_size = 0x4000
27+
mem_addr = libc.mmap(None, mem_size, 7, 0x22, -1, 0)
28+
sc_addr = mem_addr
29+
ctypes.memmove(sc_addr, OK_EXIT_CODE, len(OK_EXIT_CODE))
30+
31+
tramp = (
32+
b"\x50\x00\x00\x58" + # ldr x16, #8
33+
b"\x00\x02\x1f\xd6" + # br x16
34+
struct.pack("<Q", sc_addr)
35+
)
36+
tramp_addr = sc_addr + offset
37+
ctypes.memmove(tramp_addr, tramp, len(tramp))
38+
39+
log = {
40+
"pid": os.getpid(),
41+
"run_id": run_id,
42+
"trampoline_offset": offset,
43+
"trampoline_addr": hex(tramp_addr),
44+
"shellcode_addr": hex(sc_addr),
45+
"status": "unknown",
46+
"timestamp": time.time()
47+
}
48+
49+
try:
50+
start = time.time()
51+
fn = ctypes.CFUNCTYPE(None)(tramp_addr)
52+
fn()
53+
log["status"] = "success"
54+
log["exec_time_ms"] = round((time.time() - start) * 1000, 3)
55+
except Exception as e:
56+
log["status"] = "fail"
57+
log["error"] = str(e)
58+
log["traceback"] = traceback.format_exc()
59+
60+
with open("trampoline_offset_experiment.jsonl", "a") as f:
61+
fcntl.flock(f, fcntl.LOCK_EX)
62+
f.write(json.dumps(log) + "\n")
63+
fcntl.flock(f, fcntl.LOCK_UN)
64+
65+
if __name__ == "__main__":
66+
for i, offset in enumerate(range(0x1000, 0x3000, 0x100)):
67+
execute_once(run_id=i, offset=offset)
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import ctypes
2+
import struct
3+
import sys
4+
import os
5+
6+
print("=== [ Trampoline Overwrite Shellcode Test ] ===")
7+
8+
# 1. 입력 파일로부터 shellcode 로드
9+
if len(sys.argv) < 2:
10+
print("Usage: python3.14 40a.trampoline_overwrite_test.py <shellcode.bin>")
11+
sys.exit(1)
12+
13+
path = sys.argv[1]
14+
with open(path, "rb") as f:
15+
bcode = f.read()
16+
print(f"[+] Loaded binary code: {len(bcode)} bytes")
17+
18+
# 2. RWX mmap 영역에 binary code 저장
19+
libc = ctypes.CDLL("libc.so.6")
20+
libc.mmap.restype = ctypes.c_void_p
21+
sc_addr = libc.mmap(None, 0x1000, 7, 0x22, -1, 0)
22+
ctypes.memmove(sc_addr, bcode, len(bcode))
23+
print(f"[+] Binary code mmap @ 0x{sc_addr:x}")
24+
25+
# Allocate trampoline region (RWX도 가능, shellcode 앞에 위치)
26+
tramp_addr = libc.mmap(None, 0x1000, 7, 0x22, -1, 0)
27+
print(f"[+] Trampoline mmap @ 0x{tramp_addr:x} (→ binary code)")
28+
29+
# Create trampoline code:
30+
# ldr x16, [pc, #8]
31+
# br x16
32+
# .quad shellcode address
33+
trampoline = (
34+
b"\x50\x00\x00\x58" + # ldr x16, [pc, #8]
35+
b"\x00\x02\x1f\xd6" + # br x16
36+
struct.pack("<Q", sc_addr) # target binary code address
37+
)
38+
ctypes.memmove(tramp_addr, trampoline, len(trampoline))
39+
print(f"[+] Trampoline @ 0x{tramp_addr:x} → binary code")
40+
41+
# 4. Binary code 실행 (x16 = binary code, br x16)
42+
print("[*] Jumping to trampoline (via overwrite)...")
43+
44+
try:
45+
fn = ctypes.CFUNCTYPE(None)(tramp_addr)
46+
fn()
47+
except Exception as e:
48+
print(f"[!] Exception occurred: {e}")

0 commit comments

Comments
 (0)