-
Notifications
You must be signed in to change notification settings - Fork 771
Examples
xwings edited this page Jul 6, 2025
·
2 revisions
This page provides practical examples demonstrating various Qiling Framework capabilities.
Linux x86_64 Binary:
from qiling import Qiling
from qiling.const import QL_VERBOSE
def basic_linux_emulation():
# Simple Linux binary emulation
ql = Qiling(['examples/rootfs/x8664_linux/bin/x8664_hello'],
'examples/rootfs/x8664_linux',
verbose=QL_VERBOSE.DEBUG)
ql.run()
if __name__ == "__main__":
basic_linux_emulation()Windows x86 Binary:
from qiling import Qiling
def basic_windows_emulation():
# Windows binary emulation (requires Windows DLLs)
ql = Qiling(['examples/rootfs/x86_windows/bin/x86_hello.exe'],
'examples/rootfs/x86_windows')
ql.run()
if __name__ == "__main__":
basic_windows_emulation()Linux x86_64 execve Shellcode:
from qiling import Qiling
from qiling.const import QL_ARCH, QL_OS, QL_VERBOSE
def shellcode_example():
# execve("/bin/sh", NULL, NULL) shellcode
shellcode = bytes.fromhex('''
48c7c03c00000048c7c700000000488d35060000000f05
2f62696e2f736800
''')
ql = Qiling(code=shellcode,
rootfs='examples/rootfs/x8664_linux',
archtype=QL_ARCH.X8664,
ostype=QL_OS.LINUX,
verbose=QL_VERBOSE.DEBUG)
ql.run()
if __name__ == "__main__":
shellcode_example()Windows x86 MessageBox Shellcode:
from qiling import Qiling
from qiling.const import QL_ARCH, QL_OS
def windows_shellcode():
# MessageBox shellcode
shellcode = bytes.fromhex('''
fc4881e4f0ffffffe8d0000000415141505251564831d265488b52603e488b52
183e488b52203e488b72503e480fb74a4a4d31c94831c0ac3c617c022c2041c1
c90d4101c1e2ed5241513e488b52203e8b423c4801d03e8b80880000004885c0
746f4801d0503e8b48183e448b40204901d0e35c48ffc93e418b34884801d64d
31c94831c0ac41c1c90d4101c138e075f13e4c034c24084539d175d6583e448b
40244901d0663e418b0c483e448b401c4901d03e418b04884801d0415841585e
595a41584159415a4883ec204152ffe05841595a3e488b12e949ffffff5d49c7
c1000000003e488d95fe0000003e4c8d850f0100004831c941ba45835607ffd5
4831c941baf0b5a256ffd548656c6c6f2c2066726f6d204d534621004d657373
616765426f7800
''')
ql = Qiling(code=shellcode,
rootfs='examples/rootfs/x8664_windows',
archtype=QL_ARCH.X8664,
ostype=QL_OS.WINDOWS)
ql.run()
if __name__ == "__main__":
windows_shellcode()Hooking malloc and free:
from qiling import Qiling
def memory_tracking_example():
ql = Qiling(['examples/rootfs/x8664_linux/bin/x8664_hello'],
'examples/rootfs/x8664_linux')
# Track memory allocations
allocations = {}
def hook_malloc(ql, size):
addr = ql.os.heap.alloc(size)
allocations[addr] = size
print(f"malloc({size}) = 0x{addr:x}")
return addr
def hook_free(ql, ptr):
if ptr in allocations:
size = allocations.pop(ptr)
print(f"free(0x{ptr:x}) [size: {size}]")
ql.os.heap.free(ptr)
return 0
# Set up hooks
ql.set_api("malloc", hook_malloc)
ql.set_api("free", hook_free)
ql.run()
# Show remaining allocations (potential leaks)
if allocations:
print("Potential memory leaks:")
for addr, size in allocations.items():
print(f" 0x{addr:x}: {size} bytes")
if __name__ == "__main__":
memory_tracking_example()Windows API Monitoring:
from qiling import Qiling
def windows_api_monitoring():
ql = Qiling(['examples/rootfs/x86_windows/bin/x86_hello.exe'],
'examples/rootfs/x86_windows')
def hook_createfile(ql, lpFileName, dwDesiredAccess, dwShareMode,
lpSecurityAttributes, dwCreationDisposition,
dwFlagsAndAttributes, hTemplateFile):
filename = ql.os.utils.read_wstring(lpFileName)
access_str = "GENERIC_READ" if dwDesiredAccess & 0x80000000 else "GENERIC_WRITE"
print(f"CreateFileW: '{filename}' with {access_str}")
return None # Call original
def hook_writeprocess(ql, hProcess, lpBaseAddress, lpBuffer,
nSize, lpNumberOfBytesWritten):
if lpBuffer and nSize > 0:
data = ql.mem.read(lpBuffer, min(nSize, 32))
print(f"WriteProcessMemory: addr=0x{lpBaseAddress:x}, size={nSize}")
print(f"Data preview: {data.hex()}")
return None
ql.set_api("CreateFileW", hook_createfile)
ql.set_api("WriteProcessMemory", hook_writeprocess)
ql.run()
if __name__ == "__main__":
windows_api_monitoring()Execution Tracer:
from qiling import Qiling
def execution_tracer():
ql = Qiling(['examples/rootfs/x8664_linux/bin/x8664_hello'],
'examples/rootfs/x8664_linux')
instruction_count = 0
trace_file = open("execution_trace.txt", "w")
def trace_instruction(ql, address, size):
nonlocal instruction_count
instruction_count += 1
# Read and disassemble instruction
code = ql.mem.read(address, size)
disasm = ql.arch.utils.disassembler(ql, address, size)
trace_file.write(f"0x{address:016x}: {code.hex()} {disasm}\n")
if instruction_count % 1000 == 0:
print(f"Executed {instruction_count} instructions")
def trace_memory_write(ql, access, address, size, value):
trace_file.write(f"MEM_WRITE: 0x{address:x} = 0x{value:x} ({size} bytes)\n")
ql.hook_code(trace_instruction)
ql.hook_mem_write(trace_memory_write)
try:
ql.run()
finally:
trace_file.close()
print(f"Trace written to execution_trace.txt ({instruction_count} instructions)")
if __name__ == "__main__":
execution_tracer()Function Call Tracker:
from qiling import Qiling
def function_call_tracker():
ql = Qiling(['examples/rootfs/x8664_linux/bin/x8664_hello'],
'examples/rootfs/x8664_linux')
call_stack = []
function_calls = {}
def track_calls(ql, address, size):
code = ql.mem.read(address, min(size, 5))
# x86_64 call instruction detection
if len(code) >= 5 and code[0] == 0xe8: # CALL rel32
# Calculate target address
offset = int.from_bytes(code[1:5], byteorder='little', signed=True)
target = address + 5 + offset
call_stack.append((address, target))
function_calls[target] = function_calls.get(target, 0) + 1
print(f"CALL 0x{target:x} from 0x{address:x} (depth: {len(call_stack)})")
elif len(code) >= 1 and code[0] in [0xc3, 0xc2]: # RET
if call_stack:
caller, target = call_stack.pop()
print(f"RET from 0x{target:x} to 0x{caller:x} (depth: {len(call_stack)})")
ql.hook_code(track_calls)
ql.run()
print("\\nFunction call statistics:")
sorted_calls = sorted(function_calls.items(), key=lambda x: x[1], reverse=True)
for addr, count in sorted_calls[:10]:
print(f"0x{addr:x}: {count} calls")
if __name__ == "__main__":
function_call_tracker()from qiling import Qiling
class SimpleTaintAnalyzer:
def __init__(self, ql):
self.ql = ql
self.tainted_memory = set()
self.tainted_registers = set()
def mark_memory_tainted(self, addr, size):
for i in range(size):
self.tainted_memory.add(addr + i)
def mark_register_tainted(self, reg_name):
self.tainted_registers.add(reg_name)
def setup_hooks(self):
self.ql.hook_mem_read(self.track_memory_read)
self.ql.hook_mem_write(self.track_memory_write)
self.ql.hook_code(self.track_instruction)
def track_memory_read(self, ql, access, address, size, value):
# Check if reading tainted memory
for i in range(size):
if (address + i) in self.tainted_memory:
print(f"Reading tainted memory at 0x{address + i:x}")
break
def track_memory_write(self, ql, access, address, size, value):
# Simple taint propagation: if any source is tainted, mark dest as tainted
# This is a simplified version - real taint analysis would analyze instructions
pass
def track_instruction(self, ql, address, size):
# Analyze instruction for taint propagation
# This would need proper instruction analysis
pass
def taint_analysis_example():
ql = Qiling(['examples/rootfs/x8664_linux/bin/x8664_hello'],
'examples/rootfs/x8664_linux')
analyzer = SimpleTaintAnalyzer(ql)
analyzer.setup_hooks()
# Mark user input as tainted (example)
def mark_user_input(ql):
# If there's a read() call, mark the buffer as tainted
buffer_addr = ql.arch.regs.rsi # Second argument (buffer)
buffer_size = ql.arch.regs.rdx # Third argument (size)
analyzer.mark_memory_tainted(buffer_addr, buffer_size)
print(f"Marked buffer 0x{buffer_addr:x} as tainted ({buffer_size} bytes)")
# Hook read() syscall to mark input as tainted
ql.set_api("read", lambda ql, fd, buf, count: (
mark_user_input(ql) if fd == 0 else None, # stdin
ql.os.syscall_read(fd, buf, count)
)[1])
ql.run()
if __name__ == "__main__":
taint_analysis_example()from qiling import Qiling
from qiling.extensions.coverage import QlCoverage
def coverage_analysis():
ql = Qiling(['examples/rootfs/x8664_linux/bin/x8664_hello'],
'examples/rootfs/x8664_linux')
# Set up coverage collection
coverage = QlCoverage(ql)
# Add additional coverage hooks
basic_blocks = set()
def track_basic_blocks(ql, address, size):
basic_blocks.add(address)
ql.hook_block(track_basic_blocks)
# Run with coverage
ql.run()
# Get coverage results
coverage_data = coverage.get_coverage()
print(f"Coverage Analysis Results:")
print(f"Basic blocks executed: {len(basic_blocks)}")
print(f"Instructions executed: {len(coverage_data)}")
# Write coverage to file
with open("coverage_report.txt", "w") as f:
f.write("Basic Blocks:\\n")
for addr in sorted(basic_blocks):
f.write(f"0x{addr:x}\\n")
f.write("\\nInstructions:\\n")
for addr in sorted(coverage_data):
f.write(f"0x{addr:x}\\n")
print("Coverage report written to coverage_report.txt")
if __name__ == "__main__":
coverage_analysis()from qiling import Qiling
from qiling.os import QlOs
class CustomOS(QlOs):
def __init__(self, ql):
super().__init__(ql)
self.setup_custom_syscalls()
def setup_custom_syscalls(self):
"""Set up custom system calls"""
def custom_exit(ql, exit_code):
print(f"Custom exit with code: {exit_code}")
ql.emu_stop()
return 0
def custom_write(ql, fd, buf, count):
data = ql.mem.read(buf, count)
print(f"Custom write: {data.decode('utf-8', errors='ignore')}")
return count
self.set_syscall("exit", custom_exit)
self.set_syscall("write", custom_write)
def custom_os_example():
# This would require a specially crafted binary
# that uses minimal system calls
ql = Qiling(['custom_binary'], 'minimal_rootfs')
# Replace OS with custom implementation
ql.os = CustomOS(ql)
ql.run()
# Note: This is a conceptual example
# Real implementation would require more setupfrom qiling import Qiling
import pickle
def snapshot_restore_example():
ql = Qiling(['examples/rootfs/x8664_linux/bin/x8664_hello'],
'examples/rootfs/x8664_linux')
snapshots = {}
def take_snapshot(name):
"""Take a snapshot of current state"""
snapshot = {
'registers': ql.arch.regs.save(),
'memory': ql.mem.save()
}
snapshots[name] = snapshot
print(f"Snapshot '{name}' taken")
def restore_snapshot(name):
"""Restore from snapshot"""
if name in snapshots:
snapshot = snapshots[name]
ql.arch.regs.restore(snapshot['registers'])
ql.mem.restore(snapshot['memory'])
print(f"Restored from snapshot '{name}'")
else:
print(f"Snapshot '{name}' not found")
# Hook specific addresses to take snapshots
def snapshot_hook(ql):
pc = ql.arch.regs.arch_pc
take_snapshot(f"pc_{pc:x}")
# Set up snapshot points
ql.hook_address(snapshot_hook, 0x401000) # Example address
# Run and take snapshots
ql.run()
# Demonstrate restore
if snapshots:
first_snapshot = list(snapshots.keys())[0]
restore_snapshot(first_snapshot)
print(f"Restored to {first_snapshot}, can continue execution")
if __name__ == "__main__":
snapshot_restore_example()from qiling import Qiling
from qiling.const import QL_ARCH, QL_OS
def multi_arch_analysis():
"""Analyze the same shellcode on different architectures"""
# Simple NOP shellcode for different architectures
shellcodes = {
QL_ARCH.X8664: b'\\x90\\x90\\x90\\x90', # NOP on x64
QL_ARCH.X86: b'\\x90\\x90\\x90\\x90', # NOP on x86
QL_ARCH.ARM: b'\\x00\\x00\\xa0\\xe1', # NOP on ARM
QL_ARCH.ARM64: b'\\x1f\\x20\\x03\\xd5', # NOP on ARM64
}
rootfs_map = {
QL_ARCH.X8664: 'examples/rootfs/x8664_linux',
QL_ARCH.X86: 'examples/rootfs/x86_linux',
QL_ARCH.ARM: 'examples/rootfs/arm_linux',
QL_ARCH.ARM64: 'examples/rootfs/arm64_linux',
}
for arch, shellcode in shellcodes.items():
print(f"\\nAnalyzing on {arch.name}:")
try:
ql = Qiling(code=shellcode,
rootfs=rootfs_map.get(arch, 'examples/rootfs/x8664_linux'),
archtype=arch,
ostype=QL_OS.LINUX)
# Track execution
instructions = []
def track_instruction(ql, address, size):
code = ql.mem.read(address, size)
instructions.append((address, code.hex()))
ql.hook_code(track_instruction)
ql.run()
print(f"Executed {len(instructions)} instructions:")
for addr, code in instructions:
print(f" 0x{addr:x}: {code}")
except Exception as e:
print(f"Error on {arch.name}: {e}")
if __name__ == "__main__":
multi_arch_analysis()from qiling import Qiling
import random
def basic_fuzzer():
"""Simple mutation-based fuzzer"""
def mutate_data(data):
"""Apply random mutations to data"""
if not data:
return b"\\x00" * random.randint(1, 100)
data = bytearray(data)
for _ in range(random.randint(1, 3)):
if random.choice([True, False]) and data:
# Bit flip
pos = random.randint(0, len(data) - 1)
bit = random.randint(0, 7)
data[pos] ^= (1 << bit)
else:
# Byte insertion
pos = random.randint(0, len(data))
data.insert(pos, random.randint(0, 255))
return bytes(data)
def test_input(input_data):
"""Test input with target program"""
try:
ql = Qiling(['examples/rootfs/x8664_linux/bin/x8664_hello'],
'examples/rootfs/x8664_linux')
# Simulate input (this would depend on how your target reads input)
# For demonstration, we'll just run the program
ql.run(timeout=1000000) # 1 second timeout
return True
except Exception as e:
print(f"Crash found with input: {input_data[:32].hex()}")
print(f"Error: {e}")
return False
# Seed inputs
seeds = [
b"Hello",
b"A" * 100,
b"\\x00\\x01\\x02\\x03",
]
crashes = 0
for i in range(1000): # 1000 iterations
seed = random.choice(seeds)
mutated = mutate_data(seed)
if not test_input(mutated):
crashes += 1
if i % 100 == 0:
print(f"Iteration {i}, crashes found: {crashes}")
print(f"Fuzzing complete. Total crashes: {crashes}")
if __name__ == "__main__":
basic_fuzzer()from qiling import Qiling
def extract_binary_info(binary_path, rootfs_path):
"""Extract information from a binary"""
ql = Qiling([binary_path], rootfs_path)
print(f"Binary Analysis: {binary_path}")
print("=" * 50)
# Basic information
print(f"Architecture: {ql.arch.type}")
print(f"OS Type: {ql.os.type}")
print(f"Entry Point: 0x{ql.loader.entry_point:x}")
print(f"Load Address: 0x{ql.loader.load_address:x}")
# Memory layout
print("\\nMemory Layout:")
for region in ql.mem.get_mapinfo():
start, end, perm, info = region[:4]
perm_str = ""
if perm & 1: perm_str += "R"
if perm & 2: perm_str += "W"
if perm & 4: perm_str += "X"
print(f" 0x{start:016x}-0x{end:016x} {perm_str:3} {info}")
# Imported functions (if available)
if hasattr(ql.loader, 'import_symbols') and ql.loader.import_symbols:
print("\\nImported Functions:")
for symbol, addr in ql.loader.import_symbols.items():
print(f" {symbol}: 0x{addr:x}")
# Exported functions (if available)
if hasattr(ql.loader, 'export_symbols') and ql.loader.export_symbols:
print("\\nExported Functions:")
for symbol, addr in ql.loader.export_symbols.items():
print(f" {symbol}: 0x{addr:x}")
if __name__ == "__main__":
import sys
if len(sys.argv) != 3:
print("Usage: python script.py <binary> <rootfs>")
sys.exit(1)
extract_binary_info(sys.argv[1], sys.argv[2])from qiling import Qiling
import os
def automated_unpacker(packed_binary, rootfs_path, output_dir):
"""Automated malware unpacker using Qiling"""
ql = Qiling([packed_binary], rootfs_path)
# Track memory writes for potential unpacking
memory_writes = []
executable_regions = []
def track_memory_writes(ql, access, address, size, value):
memory_writes.append((address, size, value))
def track_memory_mapping(ql, address, size, perms):
if perms & 4: # Executable
executable_regions.append((address, size))
print(f"New executable region: 0x{address:x} (size: 0x{size:x})")
def check_for_pe_header(ql, address, size, value):
"""Check if written data contains PE header"""
if size >= 2 and value == 0x5a4d: # "MZ" header
print(f"Potential PE header at 0x{address:x}")
# Try to dump potential unpacked PE
try:
# Read potential PE size (simplified)
pe_data = ql.mem.read(address, min(0x100000, size)) # Read up to 1MB
output_file = os.path.join(output_dir, f"unpacked_{address:x}.exe")
with open(output_file, "wb") as f:
f.write(pe_data)
print(f"Dumped potential unpacked PE to {output_file}")
except Exception as e:
print(f"Error dumping PE: {e}")
# Set up hooks
ql.hook_mem_write(track_memory_writes)
ql.hook_mem_write(check_for_pe_header)
# Hook VirtualAlloc for tracking new memory regions
def hook_virtualalloc(ql, lpAddress, dwSize, flAllocationType, flProtect):
addr = ql.os.heap.alloc(dwSize)
if flProtect & 0x40: # PAGE_EXECUTE_READWRITE
executable_regions.append((addr, dwSize))
print(f"VirtualAlloc: executable region 0x{addr:x} (size: 0x{dwSize:x})")
return addr
ql.set_api("VirtualAlloc", hook_virtualalloc)
# Create output directory
os.makedirs(output_dir, exist_ok=True)
try:
ql.run(timeout=30000000) # 30 second timeout
except Exception as e:
print(f"Emulation ended: {e}")
print(f"\\nUnpacking summary:")
print(f"Memory writes: {len(memory_writes)}")
print(f"Executable regions: {len(executable_regions)}")
# Dump all executable regions
for i, (addr, size) in enumerate(executable_regions):
try:
data = ql.mem.read(addr, size)
output_file = os.path.join(output_dir, f"exec_region_{i}_{addr:x}.bin")
with open(output_file, "wb") as f:
f.write(data)
print(f"Dumped executable region to {output_file}")
except Exception as e:
print(f"Error dumping region {i}: {e}")
if __name__ == "__main__":
import sys
if len(sys.argv) != 4:
print("Usage: python unpacker.py <packed_binary> <rootfs> <output_dir>")
sys.exit(1)
automated_unpacker(sys.argv[1], sys.argv[2], sys.argv[3])These examples demonstrate various Qiling capabilities from basic emulation to advanced analysis techniques. Each example can be modified and extended based on specific analysis requirements.
For more examples, visit the Qiling GitHub repository.
- Home
- Getting Started
- Core Concepts
- Usage
- Features
- Tutorials
- Development
- Resources