-
Notifications
You must be signed in to change notification settings - Fork 758
Advanced Usage
xwings edited this page Jul 6, 2025
·
4 revisions
This guide covers advanced Qiling Framework features for sophisticated analysis and emulation scenarios.
Multi-level Instrumentation:
from qiling import Qiling
from qiling.const import QL_VERBOSE
class QilingInstrumenter:
def __init__(self, ql):
self.ql = ql
self.instruction_count = 0
self.api_calls = []
self.memory_accesses = []
self.basic_blocks = set()
def setup_hooks(self):
# Instruction-level tracking
self.ql.hook_code(self.track_instructions)
# Basic block tracking
self.ql.hook_block(self.track_blocks)
# Memory access monitoring
self.ql.hook_mem_read(self.track_memory_read)
self.ql.hook_mem_write(self.track_memory_write)
# System call interception
self.ql.hook_intr(self.track_syscalls)
def track_instructions(self, ql, address, size):
self.instruction_count += 1
if self.instruction_count % 10000 == 0:
print(f"Executed {self.instruction_count} instructions")
def track_blocks(self, ql, address, size):
self.basic_blocks.add(address)
def track_memory_read(self, ql, access, address, size, value):
self.memory_accesses.append({
'type': 'read',
'address': address,
'size': size,
'pc': ql.arch.regs.arch_pc
})
def track_memory_write(self, ql, access, address, size, value):
self.memory_accesses.append({
'type': 'write',
'address': address,
'size': size,
'value': value,
'pc': ql.arch.regs.arch_pc
})
def track_syscalls(self, ql, intno):
if ql.os.type == QL_OS.LINUX and intno == 0x80:
syscall_num = ql.arch.regs.rax
self.api_calls.append({
'syscall': syscall_num,
'pc': ql.arch.regs.arch_pc
})
def get_report(self):
return {
'instruction_count': self.instruction_count,
'basic_blocks': len(self.basic_blocks),
'memory_accesses': len(self.memory_accesses),
'api_calls': len(self.api_calls)
}
# Usage
ql = Qiling(['binary'], 'rootfs', verbose=QL_VERBOSE.DEBUG)
instrumenter = QilingInstrumenter(ql)
instrumenter.setup_hooks()
ql.run()
report = instrumenter.get_report()
print(f"Analysis complete: {report}")Custom API Replacement:
def hook_malloc(ql, size):
"""Custom malloc implementation with tracking"""
print(f"malloc({size}) called from 0x{ql.arch.regs.arch_pc:x}")
# Allocate memory
addr = ql.os.heap.alloc(size)
# Track allocation
if not hasattr(ql, 'malloc_tracker'):
ql.malloc_tracker = {}
ql.malloc_tracker[addr] = {
'size': size,
'pc': ql.arch.regs.arch_pc,
'allocated': True
}
return addr
def hook_free(ql, ptr):
"""Custom free implementation with tracking"""
print(f"free(0x{ptr:x}) called from 0x{ql.arch.regs.arch_pc:x}")
# Track deallocation
if hasattr(ql, 'malloc_tracker') and ptr in ql.malloc_tracker:
ql.malloc_tracker[ptr]['allocated'] = False
# Free memory
return ql.os.heap.free(ptr)
# Set up custom heap management
ql.set_api("malloc", hook_malloc)
ql.set_api("free", hook_free)Windows API Monitoring:
def setup_windows_api_hooks(ql):
"""Monitor common Windows APIs"""
def hook_createfile(ql, lpFileName, dwDesiredAccess, dwShareMode,
lpSecurityAttributes, dwCreationDisposition,
dwFlagsAndAttributes, hTemplateFile):
filename = ql.os.utils.read_wstring(lpFileName)
print(f"CreateFileW: {filename}")
# Call original API
return None # Let original handle it
def hook_writeprocess(ql, hProcess, lpBaseAddress, lpBuffer,
nSize, lpNumberOfBytesWritten):
data = ql.mem.read(lpBuffer, nSize)
print(f"WriteProcessMemory: 0x{lpBaseAddress:x}, size={nSize}")
print(f"Data: {data[:32].hex()}...") # First 32 bytes
return None
ql.set_api("CreateFileW", hook_createfile)
ql.set_api("WriteProcessMemory", hook_writeprocess)Memory Layout Analysis:
def analyze_memory_layout(ql):
"""Analyze the current memory layout"""
mapinfo = ql.mem.get_mapinfo()
print("Memory Layout:")
print("-" * 60)
for region in mapinfo:
start = region[0]
end = region[1]
perm = region[2]
label = region[3] if len(region) > 3 else "Unknown"
perm_str = ""
if perm & UC_PROT_READ:
perm_str += "R"
if perm & UC_PROT_WRITE:
perm_str += "W"
if perm & UC_PROT_EXEC:
perm_str += "X"
print(f"0x{start:016x}-0x{end:016x} {perm_str:3} {label}")
def memory_diff(ql, addr, size, label=""):
"""Take memory snapshot for diffing"""
if not hasattr(ql, 'memory_snapshots'):
ql.memory_snapshots = {}
data = ql.mem.read(addr, size)
ql.memory_snapshots[label] = (addr, data)
def compare_memory_snapshots(ql, label1, label2):
"""Compare two memory snapshots"""
if label1 not in ql.memory_snapshots or label2 not in ql.memory_snapshots:
print("Snapshot not found")
return
addr1, data1 = ql.memory_snapshots[label1]
addr2, data2 = ql.memory_snapshots[label2]
if len(data1) != len(data2):
print("Different sizes")
return
changes = []
for i in range(len(data1)):
if data1[i] != data2[i]:
changes.append((addr1 + i, data1[i], data2[i]))
print(f"Found {len(changes)} differences:")
for addr, old, new in changes[:10]: # Show first 10
print(f" 0x{addr:x}: 0x{old:02x} -> 0x{new:02x}")Heap Analysis:
class HeapAnalyzer:
def __init__(self, ql):
self.ql = ql
self.allocations = {}
self.freed_chunks = []
def track_allocation(self, addr, size, caller):
self.allocations[addr] = {
'size': size,
'caller': caller,
'timestamp': len(self.allocations)
}
def track_free(self, addr):
if addr in self.allocations:
chunk = self.allocations.pop(addr)
self.freed_chunks.append((addr, chunk))
def detect_leaks(self):
"""Detect potential memory leaks"""
return list(self.allocations.keys())
def detect_double_free(self, addr):
"""Check for double-free attempts"""
for freed_addr, _ in self.freed_chunks:
if freed_addr == addr:
return True
return False
def get_stats(self):
total_allocated = sum(chunk['size'] for chunk in self.allocations.values())
return {
'active_chunks': len(self.allocations),
'freed_chunks': len(self.freed_chunks),
'total_allocated': total_allocated
}Function Call Tracking:
class FunctionTracker:
def __init__(self, ql):
self.ql = ql
self.call_stack = []
self.function_map = {}
self.call_graph = {}
def setup_tracking(self):
self.ql.hook_code(self.track_calls_and_returns)
def track_calls_and_returns(self, ql, address, size):
code = ql.mem.read(address, min(size, 5))
# x86/x64 call detection
if code[0] == 0xe8: # CALL rel32
target = self._get_call_target(address, code)
self.handle_call(address, target)
elif code[0] in [0xc3, 0xc2]: # RET
self.handle_return(address)
elif len(code) >= 2 and code[:2] == b'\xff\xd0': # CALL reg
target = ql.arch.regs.rax # Simplified
self.handle_call(address, target)
def _get_call_target(self, addr, code):
if len(code) >= 5:
offset = int.from_bytes(code[1:5], byteorder='little', signed=True)
return addr + 5 + offset
return 0
def handle_call(self, caller, target):
self.call_stack.append((caller, target))
if caller not in self.call_graph:
self.call_graph[caller] = []
self.call_graph[caller].append(target)
def handle_return(self, ret_addr):
if self.call_stack:
self.call_stack.pop()
def get_function_stats(self):
function_calls = {}
for caller, targets in self.call_graph.items():
for target in targets:
function_calls[target] = function_calls.get(target, 0) + 1
return function_callsSimple Taint Tracking:
class TaintTracker:
def __init__(self, ql):
self.ql = ql
self.tainted_memory = set()
self.tainted_registers = set()
self.taint_sources = []
def mark_memory_tainted(self, addr, size, source="unknown"):
"""Mark memory region as tainted"""
for i in range(size):
self.tainted_memory.add(addr + i)
self.taint_sources.append((addr, size, source))
def mark_register_tainted(self, reg_name):
"""Mark register as tainted"""
self.tainted_registers.add(reg_name)
def setup_taint_tracking(self):
self.ql.hook_mem_read(self.track_taint_read)
self.ql.hook_mem_write(self.track_taint_write)
self.ql.hook_code(self.track_taint_propagation)
def track_taint_read(self, ql, access, address, size, value):
# Check if reading tainted memory
for i in range(size):
if (address + i) in self.tainted_memory:
print(f"Reading tainted memory at 0x{address:x}")
# Mark destination as tainted based on instruction
def track_taint_write(self, ql, access, address, size, value):
# Propagate taint to written memory
pc = ql.arch.regs.arch_pc
# Simplified: if any source register is tainted, mark dest as tainted
def track_taint_propagation(self, ql, address, size):
# Analyze instruction for taint propagation
# This is a simplified version
pass
def is_tainted(self, addr):
return addr in self.tainted_memory
def get_taint_report(self):
return {
'tainted_memory_regions': len(self.tainted_memory),
'tainted_registers': list(self.tainted_registers),
'taint_sources': self.taint_sources
}Linux Syscall Hijacking:
def custom_linux_syscalls(ql):
"""Replace Linux syscalls with custom implementations"""
def my_open(ql, filename_ptr, flags, mode):
filename = ql.os.utils.read_cstring(filename_ptr)
print(f"Custom open: {filename}")
# Custom logic here
if filename == "/etc/passwd":
# Redirect to fake file
return ql.os.fd_table.open("/tmp/fake_passwd", flags, mode)
# Default behavior
return ql.os.syscall_open(filename_ptr, flags, mode)
def my_read(ql, fd, buf, count):
print(f"Custom read: fd={fd}, count={count}")
# Log data being read
result = ql.os.syscall_read(fd, buf, count)
if result > 0:
data = ql.mem.read(buf, result)
print(f"Read data: {data[:32]}") # First 32 bytes
return result
# Replace syscalls
ql.os.set_syscall("open", my_open)
ql.os.set_syscall("read", my_read)Windows API Replacement:
def custom_windows_apis(ql):
"""Custom Windows API implementations"""
def fake_internet_check(ql):
"""Always return 'connected' for internet checks"""
print("Faking internet connectivity")
return 1 # TRUE
def log_registry_access(ql, hKey, lpSubKey, ulOptions, samDesired, phkResult):
subkey = ql.os.utils.read_wstring(lpSubKey) if lpSubKey else "None"
print(f"Registry access: {subkey}")
# Call original
return None
ql.set_api("InternetCheckConnectionW", fake_internet_check)
ql.set_api("RegOpenKeyExW", log_registry_access)Network Simulation:
class NetworkSimulator:
def __init__(self, ql):
self.ql = ql
self.connections = {}
self.responses = {}
def add_response(self, host, port, response):
"""Add canned response for host:port"""
self.responses[(host, port)] = response
def setup_network_hooks(self):
# Hook socket-related syscalls
self.ql.os.set_syscall("socket", self.hook_socket)
self.ql.os.set_syscall("connect", self.hook_connect)
self.ql.os.set_syscall("send", self.hook_send)
self.ql.os.set_syscall("recv", self.hook_recv)
def hook_socket(self, ql, domain, type, protocol):
print(f"Socket created: domain={domain}, type={type}")
return ql.os.syscall_socket(domain, type, protocol)
def hook_connect(self, ql, sockfd, addr_ptr, addrlen):
# Parse sockaddr structure
addr_info = self.parse_sockaddr(ql, addr_ptr, addrlen)
print(f"Connect to: {addr_info}")
# Simulate connection
self.connections[sockfd] = addr_info
return 0 # Success
def hook_send(self, ql, sockfd, buf, len, flags):
data = ql.mem.read(buf, len)
print(f"Send data: {data[:64]}") # First 64 bytes
return len # Pretend all data sent
def hook_recv(self, ql, sockfd, buf, len, flags):
# Return canned response if available
if sockfd in self.connections:
addr_info = self.connections[sockfd]
key = (addr_info.get('host'), addr_info.get('port'))
if key in self.responses:
response = self.responses[key]
write_len = min(len, len(response))
ql.mem.write(buf, response[:write_len])
return write_len
return 0 # No data
def parse_sockaddr(self, ql, addr_ptr, addrlen):
# Simplified sockaddr parsing
data = ql.mem.read(addr_ptr, addrlen)
# Parse based on address family
return {'family': 'inet', 'host': '127.0.0.1', 'port': 80}Performance Profiler:
import time
from collections import defaultdict
class QilingProfiler:
def __init__(self, ql):
self.ql = ql
self.start_time = None
self.function_times = defaultdict(list)
self.call_stack = []
self.hotspots = defaultdict(int)
def start_profiling(self):
self.start_time = time.time()
self.ql.hook_code(self.profile_instructions)
self.ql.hook_address_range(self.profile_function_entry,
self.ql.loader.load_address,
self.ql.loader.load_address + self.ql.loader.load_size)
def profile_instructions(self, ql, address, size):
self.hotspots[address] += 1
def profile_function_entry(self, ql, address):
# Simplified function detection
code = ql.mem.read(address, 5)
if code[0] == 0xe8: # CALL
func_start = time.time()
self.call_stack.append((address, func_start))
elif code[0] in [0xc3, 0xc2]: # RET
if self.call_stack:
func_addr, func_start = self.call_stack.pop()
func_time = time.time() - func_start
self.function_times[func_addr].append(func_time)
def get_profile_report(self):
total_time = time.time() - self.start_time
# Top hotspots
top_hotspots = sorted(self.hotspots.items(),
key=lambda x: x[1], reverse=True)[:10]
# Function timing stats
func_stats = {}
for func_addr, times in self.function_times.items():
func_stats[func_addr] = {
'calls': len(times),
'total_time': sum(times),
'avg_time': sum(times) / len(times),
'max_time': max(times)
}
return {
'total_time': total_time,
'instruction_count': sum(self.hotspots.values()),
'top_hotspots': top_hotspots,
'function_stats': func_stats
}Lazy Loading and Caching:
class OptimizedQiling:
def __init__(self, argv, rootfs, **kwargs):
self.ql = Qiling(argv, rootfs, libcache=True, **kwargs)
self.page_cache = {}
self.setup_optimizations()
def setup_optimizations(self):
# Enable lazy loading for large binaries
self.ql.loader.lazy_load = True
# Set up page-based memory caching
self.original_mem_read = self.ql.mem.read
self.ql.mem.read = self.cached_mem_read
def cached_mem_read(self, addr, size):
page_addr = addr & ~0xfff # Page-align
if page_addr not in self.page_cache:
# Load full page
try:
page_data = self.original_mem_read(page_addr, 0x1000)
self.page_cache[page_addr] = page_data
except:
return self.original_mem_read(addr, size)
# Return requested data from cache
offset = addr - page_addr
cached_page = self.page_cache[page_addr]
return cached_page[offset:offset + size]
def run_optimized(self, **kwargs):
# Pre-warm cache for critical sections
if hasattr(self.ql.loader, 'entry_point'):
self.cached_mem_read(self.ql.loader.entry_point, 0x1000)
return self.ql.run(**kwargs)class ComparativeAnalyzer:
def __init__(self, targets, rootfs_list):
self.targets = targets
self.rootfs_list = rootfs_list
self.results = {}
def analyze_all(self):
for i, (target, rootfs) in enumerate(zip(self.targets, self.rootfs_list)):
print(f"Analyzing target {i+1}: {target}")
self.results[target] = self.analyze_single(target, rootfs)
def analyze_single(self, target, rootfs):
ql = Qiling([target], rootfs)
# Set up common instrumentation
analyzer = QilingInstrumenter(ql)
analyzer.setup_hooks()
try:
ql.run(timeout=30000000) # 30 second timeout
return analyzer.get_report()
except Exception as e:
return {'error': str(e)}
def compare_results(self):
"""Compare analysis results across targets"""
comparison = {}
metrics = ['instruction_count', 'basic_blocks', 'memory_accesses', 'api_calls']
for metric in metrics:
comparison[metric] = {}
for target, result in self.results.items():
if 'error' not in result:
comparison[metric][target] = result.get(metric, 0)
return comparison
def find_anomalies(self):
"""Identify targets with unusual behavior"""
comparison = self.compare_results()
anomalies = []
for metric, values in comparison.items():
if len(values) < 2:
continue
avg_value = sum(values.values()) / len(values)
for target, value in values.items():
deviation = abs(value - avg_value) / avg_value
if deviation > 0.5: # 50% deviation threshold
anomalies.append({
'target': target,
'metric': metric,
'value': value,
'average': avg_value,
'deviation': deviation
})
return anomaliesThis advanced usage guide provides sophisticated techniques for binary analysis, emulation customization, and performance optimization. These patterns can be combined and extended based on specific analysis requirements.
- Home
- Getting Started
- Core Concepts
- Usage
- Features
- Tutorials
- Development
- Resources