-
Notifications
You must be signed in to change notification settings - Fork 758
Memory Management
xwings edited this page Jul 6, 2025
·
2 revisions
Comprehensive guide to Qiling Framework's advanced memory management capabilities, including virtual memory systems, heap management, and memory analysis techniques.
Qiling's memory management system provides a sophisticated virtual memory environment that accurately emulates target operating systems while offering powerful analysis capabilities. The framework supports multiple memory models, advanced debugging features, and comprehensive memory instrumentation.
┌─────────────────────────────────────────────────────────────┐
│ Memory Management Architecture │
├─────────────────────────────────────────────────────────────┤
│ Analysis Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Memory │ │ Heap │ │ Memory │ │
│ │ Hooks │ │ Analysis │ │ Forensics │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Memory Management Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Virtual │ │ Stack │ │ Heap │ │
│ │ Memory │ │ Management │ │ Management │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ OS-Specific Memory Models │
│ Windows │ Linux │ macOS │ UEFI │ Embedded │
├─────────────────────────────────────────────────────────────┤
│ Hardware Abstraction Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Memory │ │ Address │ │ Cache │ │
│ │ Protection │ │ Translation │ │ Management │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Unicorn Engine │
│ (Physical Memory Emulation) │
└─────────────────────────────────────────────────────────────┘
from qiling import Qiling
from qiling.const import QL_ARCH, QL_OS
class MemoryManager:
def __init__(self, ql):
self.ql = ql
self.allocations = {}
self.protection_changes = []
def allocate_memory(self, size, alignment=0x1000, permissions=7):
"""Allocate aligned memory region"""
# Find suitable address
address = self.find_free_address_space(size, alignment)
# Map memory with specified permissions
self.ql.mem.map(address, size, permissions)
# Track allocation
self.allocations[address] = {
'size': size,
'permissions': permissions,
'alignment': alignment,
'allocated_at': self.ql.arch.regs.arch_pc
}
return address
def find_free_address_space(self, size, alignment):
"""Find free address space for allocation"""
# Get current memory map
memory_map = self.ql.mem.get_mapinfo()
# Start searching from a safe address
search_start = 0x10000000 # Start at 256MB
current_addr = self.align_address(search_start, alignment)
for region in sorted(memory_map, key=lambda x: x[0]):
region_start, region_end = region[0], region[1]
# Check if we have enough space before this region
if current_addr + size <= region_start:
return current_addr
# Move past this region
current_addr = self.align_address(region_end, alignment)
# Use address after all mapped regions
return current_addr
def align_address(self, address, alignment):
"""Align address to specified boundary"""
return (address + alignment - 1) & ~(alignment - 1)
def deallocate_memory(self, address):
"""Deallocate memory region"""
if address in self.allocations:
allocation = self.allocations[address]
self.ql.mem.unmap(address, allocation['size'])
del self.allocations[address]
return True
return False
def change_protection(self, address, size, new_permissions):
"""Change memory protection"""
old_permissions = self.get_memory_permissions(address)
self.ql.mem.protect(address, size, new_permissions)
self.protection_changes.append({
'address': address,
'size': size,
'old_permissions': old_permissions,
'new_permissions': new_permissions,
'changed_at': self.ql.arch.regs.arch_pc
})
def get_memory_permissions(self, address):
"""Get current memory permissions"""
memory_map = self.ql.mem.get_mapinfo()
for region in memory_map:
start, end, perms = region[0], region[1], region[2]
if start <= address < end:
return perms
return 0 # No permissions if not mapped
def get_memory_statistics(self):
"""Get comprehensive memory statistics"""
total_allocated = sum(alloc['size'] for alloc in self.allocations.values())
memory_map = self.ql.mem.get_mapinfo()
stats = {
'total_allocated': total_allocated,
'active_allocations': len(self.allocations),
'protection_changes': len(self.protection_changes),
'memory_regions': len(memory_map),
'virtual_memory_usage': sum(region[1] - region[0] for region in memory_map)
}
return stats
# Usage example
def demonstrate_memory_management():
ql = Qiling(code=b'\x90' * 100, archtype=QL_ARCH.X8664, ostype=QL_OS.LINUX)
memory_mgr = MemoryManager(ql)
# Allocate memory regions
buffer1 = memory_mgr.allocate_memory(0x1000, alignment=0x1000)
buffer2 = memory_mgr.allocate_memory(0x2000, alignment=0x1000)
# Write data to allocated memory
ql.mem.write(buffer1, b'Hello, World!' * 100)
# Change memory protection
memory_mgr.change_protection(buffer1, 0x1000, 5) # Read + Execute
# Get statistics
stats = memory_mgr.get_memory_statistics()
print(f"Memory statistics: {stats}")import struct
from collections import defaultdict
class MemoryAnalyzer:
def __init__(self, ql):
self.ql = ql
self.memory_accesses = defaultdict(list)
self.memory_hotspots = defaultdict(int)
self.memory_patterns = {}
def setup_memory_monitoring(self):
"""Set up comprehensive memory access monitoring"""
def track_memory_read(ql, access, address, size, value):
self.memory_accesses['reads'].append({
'address': address,
'size': size,
'pc': ql.arch.regs.arch_pc,
'timestamp': len(self.memory_accesses['reads'])
})
# Track hotspots
self.memory_hotspots[address] += 1
# Analyze access patterns
self.analyze_access_pattern(address, size, 'read')
def track_memory_write(ql, access, address, size, value):
self.memory_accesses['writes'].append({
'address': address,
'size': size,
'value': value,
'pc': ql.arch.regs.arch_pc,
'timestamp': len(self.memory_accesses['writes'])
})
# Track hotspots
self.memory_hotspots[address] += 1
# Analyze access patterns
self.analyze_access_pattern(address, size, 'write')
# Check for suspicious patterns
self.check_suspicious_writes(address, size, value)
def track_memory_fetch(ql, access, address, size, value):
self.memory_accesses['fetches'].append({
'address': address,
'size': size,
'pc': ql.arch.regs.arch_pc,
'timestamp': len(self.memory_accesses['fetches'])
})
self.ql.hook_mem_read(track_memory_read)
self.ql.hook_mem_write(track_memory_write)
self.ql.hook_mem_fetch(track_memory_fetch)
def analyze_access_pattern(self, address, size, access_type):
"""Analyze memory access patterns"""
# Detect sequential access patterns
if access_type in self.memory_patterns:
last_access = self.memory_patterns[access_type]
if address == last_access['address'] + last_access['size']:
# Sequential access detected
if 'sequential_count' not in last_access:
last_access['sequential_count'] = 1
last_access['sequential_count'] += 1
self.memory_patterns[access_type] = {'address': address, 'size': size}
def check_suspicious_writes(self, address, size, value):
"""Check for suspicious memory write patterns"""
# Check for potential shellcode injection
if size >= 4:
# Convert value to bytes for analysis
try:
if isinstance(value, int):
data = value.to_bytes(size, byteorder='little')
else:
data = value
# Check for common shellcode patterns
if self.contains_shellcode_patterns(data):
print(f"Potential shellcode injection at 0x{address:x}")
# Check for PE header injection
if data[:2] == b'MZ':
print(f"PE header injection detected at 0x{address:x}")
except Exception:
pass
def contains_shellcode_patterns(self, data):
"""Check for common shellcode patterns"""
shellcode_patterns = [
b'\x90' * 4, # NOP sled
b'\x31\xc0', # XOR EAX, EAX
b'\x68', # PUSH immediate
b'\xff\xe4', # JMP ESP
b'\xeb', # Short jump
]
for pattern in shellcode_patterns:
if pattern in data:
return True
return False
def find_memory_leaks(self):
"""Detect potential memory leaks"""
# Analyze allocation vs deallocation patterns
allocated_regions = set()
freed_regions = set()
# This would need to be integrated with heap monitoring
# For now, we'll analyze general memory access patterns
potential_leaks = []
for address, accesses in self.memory_accesses.items():
if len(accesses) > 100: # High access count
# Check if memory is still being accessed
recent_accesses = [a for a in accesses if a['timestamp'] > len(accesses) - 10]
if not recent_accesses:
potential_leaks.append(address)
return potential_leaks
def analyze_memory_corruption(self):
"""Analyze potential memory corruption"""
corruption_indicators = []
# Look for writes to executable memory
for write in self.memory_accesses['writes']:
address = write['address']
if self.is_executable_memory(address):
corruption_indicators.append({
'type': 'write_to_executable',
'address': address,
'pc': write['pc']
})
# Look for large overwrites
for write in self.memory_accesses['writes']:
if write['size'] > 1024: # Large write
corruption_indicators.append({
'type': 'large_write',
'address': write['address'],
'size': write['size'],
'pc': write['pc']
})
return corruption_indicators
def is_executable_memory(self, address):
"""Check if memory region is executable"""
memory_map = self.ql.mem.get_mapinfo()
for region in memory_map:
start, end, perms = region[0], region[1], region[2]
if start <= address < end:
return bool(perms & 4) # Execute permission
return False
def generate_memory_report(self):
"""Generate comprehensive memory analysis report"""
return {
'access_summary': {
'total_reads': len(self.memory_accesses['reads']),
'total_writes': len(self.memory_accesses['writes']),
'total_fetches': len(self.memory_accesses['fetches'])
},
'hotspots': dict(sorted(self.memory_hotspots.items(),
key=lambda x: x[1], reverse=True)[:10]),
'potential_leaks': self.find_memory_leaks(),
'corruption_indicators': self.analyze_memory_corruption(),
'access_patterns': self.memory_patterns
}class HeapAnalyzer:
def __init__(self, ql):
self.ql = ql
self.heap_operations = []
self.active_allocations = {}
self.freed_allocations = {}
self.heap_corruption_checks = True
def setup_heap_monitoring(self):
"""Set up comprehensive heap monitoring"""
# Hook malloc family functions
self.ql.set_api("malloc", self.hook_malloc)
self.ql.set_api("free", self.hook_free)
self.ql.set_api("realloc", self.hook_realloc)
self.ql.set_api("calloc", self.hook_calloc)
# Hook Windows heap functions
self.ql.set_api("HeapAlloc", self.hook_heapalloc)
self.ql.set_api("HeapFree", self.hook_heapfree)
self.ql.set_api("HeapReAlloc", self.hook_heaprealloc)
# Hook VirtualAlloc family
self.ql.set_api("VirtualAlloc", self.hook_virtualalloc)
self.ql.set_api("VirtualFree", self.hook_virtualfree)
def hook_malloc(self, ql, size):
"""Hook malloc function"""
# Call original malloc
addr = ql.os.heap.alloc(size)
# Track allocation
self.track_allocation('malloc', addr, size, ql.arch.regs.arch_pc)
# Add heap corruption detection metadata
if self.heap_corruption_checks:
self.add_heap_guards(addr, size)
return addr
def hook_free(self, ql, ptr):
"""Hook free function"""
# Validate free operation
if not self.validate_free(ptr):
print(f"Invalid free detected: 0x{ptr:x}")
return -1
# Check for heap corruption before freeing
if self.heap_corruption_checks:
self.check_heap_guards(ptr)
# Track deallocation
self.track_deallocation('free', ptr, ql.arch.regs.arch_pc)
# Call original free
return ql.os.heap.free(ptr)
def hook_realloc(self, ql, ptr, size):
"""Hook realloc function"""
old_size = 0
if ptr != 0 and ptr in self.active_allocations:
old_size = self.active_allocations[ptr]['size']
# Call original realloc
new_addr = ql.os.heap.realloc(ptr, size)
# Update tracking
if ptr != 0:
self.track_deallocation('realloc_free', ptr, ql.arch.regs.arch_pc)
self.track_allocation('realloc', new_addr, size, ql.arch.regs.arch_pc)
return new_addr
def hook_calloc(self, ql, num, size):
"""Hook calloc function"""
total_size = num * size
addr = ql.os.heap.alloc(total_size)
# Zero the memory (calloc behavior)
ql.mem.write(addr, b'\x00' * total_size)
# Track allocation
self.track_allocation('calloc', addr, total_size, ql.arch.regs.arch_pc)
return addr
def track_allocation(self, operation, address, size, caller_pc):
"""Track heap allocation"""
allocation_info = {
'operation': operation,
'address': address,
'size': size,
'caller_pc': caller_pc,
'timestamp': len(self.heap_operations),
'stack_trace': self.get_stack_trace()
}
self.heap_operations.append(allocation_info)
self.active_allocations[address] = allocation_info
def track_deallocation(self, operation, address, caller_pc):
"""Track heap deallocation"""
if address in self.active_allocations:
allocation_info = self.active_allocations.pop(address)
deallocation_info = {
'operation': operation,
'address': address,
'caller_pc': caller_pc,
'timestamp': len(self.heap_operations),
'original_allocation': allocation_info
}
self.heap_operations.append(deallocation_info)
self.freed_allocations[address] = deallocation_info
def validate_free(self, ptr):
"""Validate free operation"""
# Check for NULL pointer
if ptr == 0:
return True # Free(NULL) is valid
# Check if pointer was allocated
if ptr not in self.active_allocations:
# Check if already freed (double-free)
if ptr in self.freed_allocations:
print(f"Double-free detected: 0x{ptr:x}")
return False
print(f"Free of unallocated pointer: 0x{ptr:x}")
return False
return True
def add_heap_guards(self, address, size):
"""Add heap corruption detection guards"""
# Add guard bytes before and after allocation
guard_pattern = b'\xDEADBEEF'
try:
# Map additional space for guards
guard_size = len(guard_pattern)
# We'll store guard info separately since we can't always expand the allocation
guard_info = {
'pre_guard': guard_pattern,
'post_guard': guard_pattern,
'size': size
}
if not hasattr(self, 'heap_guards'):
self.heap_guards = {}
self.heap_guards[address] = guard_info
except Exception:
# Guard creation failed, continue without guards
pass
def check_heap_guards(self, address):
"""Check heap guards for corruption"""
if not hasattr(self, 'heap_guards') or address not in self.heap_guards:
return True
guard_info = self.heap_guards[address]
# This is a simplified check - in a real implementation,
# we would check actual guard bytes in memory
print(f"Heap guard check for 0x{address:x}: OK")
return True
def get_stack_trace(self):
"""Get simplified stack trace"""
# In a real implementation, this would walk the stack
# For now, we'll return the current PC
return [self.ql.arch.regs.arch_pc]
def detect_heap_vulnerabilities(self):
"""Detect common heap vulnerabilities"""
vulnerabilities = []
# Detect double-free vulnerabilities
for addr, dealloc_info in self.freed_allocations.items():
free_count = sum(1 for op in self.heap_operations
if op.get('address') == addr and 'free' in op.get('operation', ''))
if free_count > 1:
vulnerabilities.append({
'type': 'double_free',
'address': addr,
'operations': [op for op in self.heap_operations if op.get('address') == addr]
})
# Detect memory leaks
if len(self.active_allocations) > 100: # Threshold for leak detection
vulnerabilities.append({
'type': 'memory_leak',
'leaked_allocations': len(self.active_allocations),
'total_leaked_size': sum(alloc['size'] for alloc in self.active_allocations.values())
})
# Detect use-after-free patterns (simplified)
# This would require memory access monitoring
return vulnerabilities
def generate_heap_report(self):
"""Generate comprehensive heap analysis report"""
total_allocated = sum(alloc['size'] for alloc in self.active_allocations.values())
total_operations = len(self.heap_operations)
# Categorize operations
operation_counts = {}
for op in self.heap_operations:
op_type = op['operation']
operation_counts[op_type] = operation_counts.get(op_type, 0) + 1
return {
'summary': {
'total_operations': total_operations,
'active_allocations': len(self.active_allocations),
'freed_allocations': len(self.freed_allocations),
'total_allocated_memory': total_allocated
},
'operation_breakdown': operation_counts,
'vulnerabilities': self.detect_heap_vulnerabilities(),
'largest_allocations': sorted(
self.active_allocations.values(),
key=lambda x: x['size'],
reverse=True
)[:10]
}class MemoryForensicsAnalyzer:
def __init__(self, ql):
self.ql = ql
self.memory_snapshots = {}
self.forensic_markers = []
def create_memory_snapshot(self, label="snapshot"):
"""Create complete memory snapshot"""
memory_map = self.ql.mem.get_mapinfo()
snapshot = {
'label': label,
'timestamp': len(self.memory_snapshots),
'memory_regions': [],
'register_state': self.ql.arch.regs.save()
}
# Dump all mapped memory regions
for region in memory_map:
start, end, perms, info = region[0], region[1], region[2], region[3] if len(region) > 3 else "Unknown"
size = end - start
try:
data = self.ql.mem.read(start, size)
snapshot['memory_regions'].append({
'start': start,
'end': end,
'size': size,
'permissions': perms,
'info': info,
'data': data
})
except Exception as e:
print(f"Failed to read memory region 0x{start:x}-0x{end:x}: {e}")
self.memory_snapshots[label] = snapshot
return snapshot
def compare_memory_snapshots(self, snapshot1_label, snapshot2_label):
"""Compare two memory snapshots"""
if snapshot1_label not in self.memory_snapshots or snapshot2_label not in self.memory_snapshots:
return None
snap1 = self.memory_snapshots[snapshot1_label]
snap2 = self.memory_snapshots[snapshot2_label]
differences = {
'new_regions': [],
'removed_regions': [],
'modified_regions': [],
'register_changes': {}
}
# Create lookup tables
snap1_regions = {(r['start'], r['end']): r for r in snap1['memory_regions']}
snap2_regions = {(r['start'], r['end']): r for r in snap2['memory_regions']}
# Find new and removed regions
snap1_keys = set(snap1_regions.keys())
snap2_keys = set(snap2_regions.keys())
for key in snap2_keys - snap1_keys:
differences['new_regions'].append(snap2_regions[key])
for key in snap1_keys - snap2_keys:
differences['removed_regions'].append(snap1_regions[key])
# Find modified regions
for key in snap1_keys & snap2_keys:
region1 = snap1_regions[key]
region2 = snap2_regions[key]
if region1['data'] != region2['data']:
# Find specific differences
data_diff = self.find_data_differences(region1['data'], region2['data'])
differences['modified_regions'].append({
'region': key,
'changes': data_diff
})
# Compare register states
reg1 = snap1['register_state']
reg2 = snap2['register_state']
for reg_name in reg1:
if reg1[reg_name] != reg2.get(reg_name):
differences['register_changes'][reg_name] = {
'old': reg1[reg_name],
'new': reg2.get(reg_name)
}
return differences
def find_data_differences(self, data1, data2):
"""Find specific byte differences between data blocks"""
differences = []
min_len = min(len(data1), len(data2))
# Find byte-level differences
for i in range(min_len):
if data1[i] != data2[i]:
differences.append({
'offset': i,
'old_byte': data1[i],
'new_byte': data2[i]
})
# Check for size differences
if len(data1) != len(data2):
differences.append({
'type': 'size_change',
'old_size': len(data1),
'new_size': len(data2)
})
return differences
def search_memory_patterns(self, pattern, memory_type='all'):
"""Search for specific patterns in memory"""
matches = []
memory_map = self.ql.mem.get_mapinfo()
for region in memory_map:
start, end, perms, info = region[0], region[1], region[2], region[3] if len(region) > 3 else "Unknown"
# Filter by memory type if specified
if memory_type != 'all':
if memory_type == 'executable' and not (perms & 4):
continue
elif memory_type == 'writable' and not (perms & 2):
continue
elif memory_type == 'stack' and 'stack' not in info.lower():
continue
elif memory_type == 'heap' and 'heap' not in info.lower():
continue
try:
data = self.ql.mem.read(start, end - start)
# Search for pattern
offset = 0
while True:
pos = data.find(pattern, offset)
if pos == -1:
break
matches.append({
'address': start + pos,
'region_start': start,
'region_end': end,
'region_info': info,
'context': data[max(0, pos-16):pos+len(pattern)+16]
})
offset = pos + 1
except Exception:
continue
return matches
def extract_strings(self, min_length=4, encoding='utf-8'):
"""Extract strings from memory"""
strings = []
memory_map = self.ql.mem.get_mapinfo()
for region in memory_map:
start, end, perms, info = region[0], region[1], region[2], region[3] if len(region) > 3 else "Unknown"
try:
data = self.ql.mem.read(start, end - start)
# Extract ASCII strings
current_string = b""
current_start = 0
for i, byte in enumerate(data):
if 32 <= byte <= 126: # Printable ASCII
if not current_string:
current_start = i
current_string += bytes([byte])
else:
if len(current_string) >= min_length:
strings.append({
'address': start + current_start,
'string': current_string.decode('ascii', errors='ignore'),
'length': len(current_string),
'region_info': info
})
current_string = b""
# Check final string
if len(current_string) >= min_length:
strings.append({
'address': start + current_start,
'string': current_string.decode('ascii', errors='ignore'),
'length': len(current_string),
'region_info': info
})
except Exception:
continue
return sorted(strings, key=lambda x: x['length'], reverse=True)
def analyze_memory_layout(self):
"""Analyze overall memory layout"""
memory_map = self.ql.mem.get_mapinfo()
analysis = {
'total_regions': len(memory_map),
'total_memory': 0,
'executable_regions': [],
'writable_regions': [],
'read_only_regions': [],
'gaps': []
}
last_end = 0
for region in sorted(memory_map, key=lambda x: x[0]):
start, end, perms, info = region[0], region[1], region[2], region[3] if len(region) > 3 else "Unknown"
size = end - start
analysis['total_memory'] += size
# Categorize by permissions
if perms & 4: # Executable
analysis['executable_regions'].append({
'start': start,
'end': end,
'size': size,
'info': info
})
if perms & 2: # Writable
analysis['writable_regions'].append({
'start': start,
'end': end,
'size': size,
'info': info
})
if perms == 1: # Read-only
analysis['read_only_regions'].append({
'start': start,
'end': end,
'size': size,
'info': info
})
# Check for gaps
if last_end > 0 and start > last_end:
analysis['gaps'].append({
'start': last_end,
'end': start,
'size': start - last_end
})
last_end = end
return analysisThis comprehensive memory management guide provides enterprise-grade capabilities for memory analysis, heap debugging, and forensic investigation. The framework supports advanced memory protection, corruption detection, and comprehensive memory layout analysis.
- Home
- Getting Started
- Core Concepts
- Usage
- Features
- Tutorials
- Development
- Resources