-
Notifications
You must be signed in to change notification settings - Fork 759
Fuzzing
xwings edited this page Jul 6, 2025
·
2 revisions
Qiling Framework provides excellent support for fuzzing through integration with AFL++, unicornafl, and custom fuzzing engines.
Installation:
# Install AFL++
git clone https://github.com/AFLplusplus/AFLplusplus
cd AFLplusplus
make all
make install
# Install Qiling with AFL support
pip install qiling[afl]Basic AFL++ Setup:
from qiling import Qiling
from qiling.extensions.afl import ql_afl_fuzz
def harness_function(ql, input_data, persistent_addr, exit_addr):
"""Custom harness function for AFL++ fuzzing"""
# Set input data in target location
input_addr = 0x10000000 # Target input buffer
ql.mem.map(input_addr, len(input_data) + 0x1000)
ql.mem.write(input_addr, input_data)
# Set up registers for target function
ql.arch.regs.rdi = input_addr # First argument
ql.arch.regs.rsi = len(input_data) # Second argument
# Run target function
ql.run(begin=persistent_addr, end=exit_addr)
def main():
# Initialize Qiling
ql = Qiling(['./target_binary'], './rootfs', verbose=0)
# Set up AFL++ fuzzing
ql_afl_fuzz(ql,
input_file="./afl_inputs/seed",
persistent_addr=0x401000, # Function entry point
exit_addr=0x401100, # Function exit point
harness_func=harness_function)
if __name__ == "__main__":
main()Persistent mode significantly improves fuzzing performance by avoiding process restart overhead:
from qiling import Qiling
from qiling.extensions.afl import ql_afl_fuzz_custom
def persistent_harness(ql, input_data, persistent_addr, exit_addrs):
"""Persistent mode harness"""
# Reset state for each iteration
ql.arch.regs.rsp = ql.os.heap.start_address - 0x1000 # Reset stack
ql.arch.regs.rbp = ql.arch.regs.rsp
# Place input data
input_buffer = 0x20000000
if not ql.mem.is_mapped(input_buffer, len(input_data)):
ql.mem.map(input_buffer, 0x10000)
ql.mem.write(input_buffer, input_data)
# Set function arguments
ql.arch.regs.rdi = input_buffer
ql.arch.regs.rsi = len(input_data)
# Execute target function
try:
ql.run(begin=persistent_addr, end=exit_addrs[0])
return True
except Exception as e:
# Crash detected
print(f"Crash: {e}")
return False
def setup_persistent_fuzzing():
ql = Qiling(['./vulnerable_program'], './rootfs')
# Configure persistent fuzzing
ql_afl_fuzz_custom(
ql=ql,
input_file="@@", # AFL placeholder for input file
persistent_addr=0x401000,
exit_addrs=[0x401200, 0x401300], # Multiple exit points
harness_func=persistent_harness,
persistent_iters=1000 # Iterations before restart
)def file_fuzzing_example():
"""Fuzz a program that reads from files"""
def file_harness(ql, input_data, persistent_addr, exit_addrs):
# Write input to a file the target will read
input_file = "/tmp/fuzz_input"
with open(input_file, "wb") as f:
f.write(input_data)
# Set up command line arguments
ql.argv = ['./target_program', input_file]
# Run from main function
try:
ql.run(begin=persistent_addr, end=exit_addrs[0])
return True
except:
return False
ql = Qiling(['./file_parser'], './rootfs')
ql_afl_fuzz_custom(
ql=ql,
input_file="./seeds/sample.dat",
persistent_addr=0x401000, # main function
exit_addrs=[0x402000],
harness_func=file_harness
)from qiling import Qiling
import socket
import threading
class NetworkFuzzer:
def __init__(self, target_binary, rootfs):
self.ql = Qiling([target_binary], rootfs, multithread=True)
self.server_port = 8080
self.setup_network_environment()
def setup_network_environment(self):
"""Set up network emulation environment"""
# Hook socket operations
self.ql.os.set_syscall("socket", self.hook_socket)
self.ql.os.set_syscall("bind", self.hook_bind)
self.ql.os.set_syscall("listen", self.hook_listen)
self.ql.os.set_syscall("accept", self.hook_accept)
self.ql.os.set_syscall("recv", self.hook_recv)
self.test_data = b""
self.connection_fd = None
def hook_socket(self, ql, domain, type, protocol):
# Create real socket for emulation
real_socket = socket.socket(domain, type, protocol)
fd = ql.os.fd_table.get_free_fd()
ql.os.fd_table[fd] = real_socket
return fd
def hook_bind(self, ql, sockfd, addr_ptr, addrlen):
sock = ql.os.fd_table[sockfd]
sock.bind(('localhost', self.server_port))
return 0
def hook_listen(self, ql, sockfd, backlog):
sock = ql.os.fd_table[sockfd]
sock.listen(backlog)
return 0
def hook_accept(self, ql, sockfd, addr_ptr, addrlen_ptr):
sock = ql.os.fd_table[sockfd]
# Send test data when connection is accepted
def send_test_data():
client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_sock.connect(('localhost', self.server_port))
client_sock.send(self.test_data)
client_sock.close()
# Start client thread
client_thread = threading.Thread(target=send_test_data)
client_thread.daemon = True
client_thread.start()
# Accept connection
conn, addr = sock.accept()
fd = ql.os.fd_table.get_free_fd()
ql.os.fd_table[fd] = conn
self.connection_fd = fd
return fd
def hook_recv(self, ql, sockfd, buf, len, flags):
if sockfd == self.connection_fd:
# Receive fuzzed data
sock = ql.os.fd_table[sockfd]
data = sock.recv(len)
ql.mem.write(buf, data)
return len(data)
return 0
def fuzz_with_data(self, test_data):
"""Fuzz server with specific test data"""
self.test_data = test_data
try:
self.ql.run(timeout=5000000) # 5 second timeout
return True
except Exception as e:
print(f"Crash with input: {test_data.hex()}")
return False
def network_fuzzing_example():
"""Example of network protocol fuzzing"""
fuzzer = NetworkFuzzer('./http_server', './rootfs')
# Basic HTTP requests
test_cases = [
b"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n",
b"GET " + b"A" * 1000 + b" HTTP/1.1\r\n\r\n", # Long path
b"POST / HTTP/1.1\r\nContent-Length: -1\r\n\r\n", # Negative length
b"\x00\x01\x02\x03" * 100, # Binary data
]
for i, test_case in enumerate(test_cases):
print(f"Testing case {i+1}")
fuzzer.fuzz_with_data(test_case)def udp_fuzzing_setup():
"""Set up UDP protocol fuzzing"""
def udp_harness(ql, input_data, persistent_addr, exit_addrs):
# Simulate UDP packet reception
packet_buffer = 0x30000000
if not ql.mem.is_mapped(packet_buffer, len(input_data)):
ql.mem.map(packet_buffer, 0x10000)
ql.mem.write(packet_buffer, input_data)
# Set up packet processing function arguments
ql.arch.regs.rdi = packet_buffer # Packet data
ql.arch.regs.rsi = len(input_data) # Packet length
try:
ql.run(begin=persistent_addr, end=exit_addrs[0])
return True
except:
return False
ql = Qiling(['./udp_server'], './rootfs')
ql_afl_fuzz_custom(
ql=ql,
input_file="./seeds/udp_packet.bin",
persistent_addr=0x401500, # packet_handler function
exit_addrs=[0x401600],
harness_func=udp_harness
)import random
import struct
class QilingMutationFuzzer:
def __init__(self, ql, target_function, seed_inputs):
self.ql = ql
self.target_function = target_function
self.seed_inputs = seed_inputs
self.crashes = []
self.unique_paths = set()
def mutate_input(self, data):
"""Apply random mutations to input data"""
if not data:
return b"\x00" * random.randint(1, 100)
data = bytearray(data)
mutation_count = random.randint(1, 5)
for _ in range(mutation_count):
mutation_type = random.choice([
'bit_flip', 'byte_flip', 'insert', 'delete',
'duplicate', 'shuffle', 'arithmetic'
])
if mutation_type == 'bit_flip' and data:
pos = random.randint(0, len(data) - 1)
bit = random.randint(0, 7)
data[pos] ^= (1 << bit)
elif mutation_type == 'byte_flip' and data:
pos = random.randint(0, len(data) - 1)
data[pos] = random.randint(0, 255)
elif mutation_type == 'insert':
pos = random.randint(0, len(data))
data.insert(pos, random.randint(0, 255))
elif mutation_type == 'delete' and data:
pos = random.randint(0, len(data) - 1)
del data[pos]
elif mutation_type == 'duplicate' and data:
chunk_size = min(random.randint(1, 10), len(data))
start = random.randint(0, len(data) - chunk_size)
chunk = data[start:start + chunk_size]
pos = random.randint(0, len(data))
data[pos:pos] = chunk
elif mutation_type == 'arithmetic' and len(data) >= 4:
pos = random.randint(0, len(data) - 4)
value = struct.unpack('<I', data[pos:pos+4])[0]
value += random.randint(-100, 100)
data[pos:pos+4] = struct.pack('<I', value & 0xffffffff)
return bytes(data)
def execute_input(self, input_data):
"""Execute target with given input"""
# Reset emulator state
self.ql.arch.regs.restore(self.initial_state)
# Set up input
input_addr = 0x40000000
if not self.ql.mem.is_mapped(input_addr, len(input_data) + 0x1000):
self.ql.mem.map(input_addr, len(input_data) + 0x1000)
self.ql.mem.write(input_addr, input_data)
# Set function arguments
self.ql.arch.regs.rdi = input_addr
self.ql.arch.regs.rsi = len(input_data)
# Track execution path
path_hash = 0
def track_path(ql, address, size):
nonlocal path_hash
path_hash ^= address
hook_id = self.ql.hook_code(track_path)
try:
self.ql.run(begin=self.target_function, end=self.target_function + 0x1000,
timeout=1000000) # 1 second timeout
success = True
except Exception as e:
success = False
self.crashes.append({
'input': input_data,
'error': str(e),
'path_hash': path_hash
})
self.ql.hook_del(hook_id)
# Track unique execution paths
self.unique_paths.add(path_hash)
return success, path_hash
def fuzz(self, iterations=10000):
"""Main fuzzing loop"""
# Save initial state
self.initial_state = self.ql.arch.regs.save()
print(f"Starting fuzzing for {iterations} iterations")
for i in range(iterations):
# Select seed input
seed = random.choice(self.seed_inputs)
# Mutate input
mutated_input = self.mutate_input(seed)
# Execute
success, path_hash = self.execute_input(mutated_input)
if i % 1000 == 0:
print(f"Iteration {i}: {len(self.crashes)} crashes, "
f"{len(self.unique_paths)} unique paths")
return {
'crashes': self.crashes,
'unique_paths': len(self.unique_paths),
'total_iterations': iterations
}
# Usage example
def custom_fuzzing_example():
ql = Qiling(['./vulnerable_function'], './rootfs')
seed_inputs = [
b"Hello, World!",
b"A" * 100,
b"\x00" * 50,
struct.pack('<I', 0x41414141)
]
fuzzer = QilingMutationFuzzer(ql, 0x401000, seed_inputs)
results = fuzzer.fuzz(iterations=50000)
print(f"Fuzzing complete:")
print(f"Crashes found: {len(results['crashes'])}")
print(f"Unique paths: {results['unique_paths']}")
# Analyze crashes
for i, crash in enumerate(results['crashes'][:5]): # Show first 5
print(f"Crash {i+1}: {crash['error']}")
print(f"Input: {crash['input'][:32].hex()}...")class ProtocolGrammarFuzzer:
def __init__(self, ql, target_function):
self.ql = ql
self.target_function = target_function
self.grammar = self.define_protocol_grammar()
def define_protocol_grammar(self):
"""Define protocol grammar rules"""
return {
'packet': ['<header><payload>'],
'header': ['<magic><length><type>'],
'magic': [b'\x42\x42\x42\x42'],
'length': ['<uint32>'],
'type': [b'\x01', b'\x02', b'\x03', b'\xff'],
'payload': ['<data>', '<string>', '<binary>'],
'data': [b'A' * random.randint(0, 100) for _ in range(10)],
'string': [b'hello', b'world', b'test', b''],
'binary': [bytes([random.randint(0, 255) for _ in range(random.randint(0, 50))])
for _ in range(10)],
'uint32': [struct.pack('<I', random.randint(0, 0xffffffff)) for _ in range(20)]
}
def generate_from_grammar(self, symbol):
"""Generate data from grammar symbol"""
if symbol.startswith('<') and symbol.endswith('>'):
# Non-terminal symbol
rule_name = symbol[1:-1]
if rule_name in self.grammar:
choice = random.choice(self.grammar[rule_name])
if isinstance(choice, str):
# Expand further
result = b''
i = 0
while i < len(choice):
if choice[i] == '<':
# Find matching '>'
end = choice.find('>', i)
if end != -1:
subsymbol = choice[i:end+1]
result += self.generate_from_grammar(subsymbol)
i = end + 1
else:
result += choice[i].encode()
i += 1
else:
result += choice[i].encode()
i += 1
return result
else:
return choice
else:
return b''
else:
# Terminal symbol
return symbol.encode() if isinstance(symbol, str) else symbol
def fuzz_with_grammar(self, iterations=1000):
"""Fuzz using grammar-based generation"""
crashes = []
for i in range(iterations):
# Generate packet from grammar
packet = self.generate_from_grammar('<packet>')
# Test packet
success = self.test_packet(packet)
if not success:
crashes.append(packet)
if i % 100 == 0:
print(f"Generated {i} packets, {len(crashes)} crashes")
return crashes
def test_packet(self, packet_data):
"""Test generated packet"""
try:
# Place packet in memory
packet_addr = 0x50000000
if not self.ql.mem.is_mapped(packet_addr, len(packet_data) + 0x1000):
self.ql.mem.map(packet_addr, len(packet_data) + 0x1000)
self.ql.mem.write(packet_addr, packet_data)
# Set up function call
self.ql.arch.regs.rdi = packet_addr
self.ql.arch.regs.rsi = len(packet_data)
# Execute
self.ql.run(begin=self.target_function,
end=self.target_function + 0x1000,
timeout=500000)
return True
except Exception as e:
print(f"Crash with packet: {packet_data[:32].hex()}")
return Falsefrom qiling.extensions.coverage import QlCoverage
class CoverageGuidedFuzzer:
def __init__(self, ql, target_function):
self.ql = ql
self.target_function = target_function
self.coverage = QlCoverage(ql)
self.interesting_inputs = []
self.total_coverage = set()
def test_input_with_coverage(self, input_data):
"""Test input and collect coverage information"""
# Reset coverage
self.coverage.reset()
# Set up input
input_addr = 0x60000000
if not self.ql.mem.is_mapped(input_addr, len(input_data) + 0x1000):
self.ql.mem.map(input_addr, len(input_data) + 0x1000)
self.ql.mem.write(input_addr, input_data)
# Set function arguments
self.ql.arch.regs.rdi = input_addr
self.ql.arch.regs.rsi = len(input_data)
try:
self.ql.run(begin=self.target_function,
end=self.target_function + 0x1000)
# Get coverage
current_coverage = self.coverage.get_coverage()
# Check if this input found new coverage
new_coverage = current_coverage - self.total_coverage
if new_coverage:
self.total_coverage.update(current_coverage)
self.interesting_inputs.append(input_data)
print(f"New coverage found: {len(new_coverage)} new blocks")
return True
except Exception as e:
print(f"Crash: {e}")
return False
return False
def coverage_guided_fuzz(self, seed_inputs, iterations=10000):
"""Perform coverage-guided fuzzing"""
# Start with seed inputs
queue = list(seed_inputs)
for i in range(iterations):
if not queue:
# Generate new random input if queue is empty
input_data = bytes([random.randint(0, 255)
for _ in range(random.randint(1, 100))])
else:
# Select input from queue and mutate
base_input = random.choice(queue)
input_data = self.mutate_input(base_input)
# Test with coverage tracking
is_interesting = self.test_input_with_coverage(input_data)
if is_interesting:
queue.append(input_data)
if i % 1000 == 0:
print(f"Iteration {i}: {len(self.total_coverage)} blocks covered, "
f"{len(self.interesting_inputs)} interesting inputs")
return {
'total_coverage': len(self.total_coverage),
'interesting_inputs': len(self.interesting_inputs),
'coverage_blocks': list(self.total_coverage)
}- Focus on input parsing functions
- Target network protocol handlers
- Identify file format parsers
- Look for memory allocation routines
- Use valid protocol samples
- Include edge cases and boundary conditions
- Create minimal test cases
- Gather real-world data samples
- Use persistent mode when possible
- Minimize emulation overhead
- Focus fuzzing on specific functions
- Use efficient mutation strategies
- Collect detailed crash information
- Reproduce crashes reliably
- Analyze crash root causes
- Prioritize unique crash signatures
- Track basic block coverage
- Use feedback-driven mutations
- Explore different execution paths
- Balance breadth vs. depth
For more fuzzing examples and advanced techniques, see the fuzzing examples in the Qiling repository.
- Home
- Getting Started
- Core Concepts
- Usage
- Features
- Tutorials
- Development
- Resources