11#!/usr/bin/env python3
2- """
3- Simple example of how to use Qiling together with AFLplusplus.
4- This is tested with the recent Qiling framework (the one you cloned),
5- afl++ from https://github.com/AFLplusplus/AFLplusplus
62
7- After building afl++, make sure you install `unicorn_mode/setup_unicorn.sh`
3+ """Simple example of how to use Qiling together with AFLplusplus.
4+
5+ Steps:
6+ o Clone and build AFL++
7+ $ git clone https://github.com/AFLplusplus/AFLplusplus.git
8+ $ make -C AFLplusplus
9+
10+ o Build Unicorn support
11+ $ ( cd AFLplusplus/unicorn_mode ; ./build_unicorn_support.sh )
812
9- Then, run this file using afl++ unicorn mode with
10- afl-fuzz -i ./afl_inputs -o ./afl_outputs -m none -U -- python3 ./fuzz_x8664_linux.py @@
13+ o Start fuzzing
14+ $ AFL_AUTORESUME=1 AFL_PATH="$(realpath ./AFLplusplus)" PATH="$AFL_PATH:$PATH" afl-fuzz -i afl_inputs -o afl_outputs -U -- python3 ./fuzz_x8664_linux.py @@
15+
16+ o Cleanup results
17+ $ rm -fr afl_outputs/default/
1118"""
1219
1320# This is new. Instead of unicorn, we import unicornafl. It's the same Uc with some new `afl_` functions
14- import unicornafl
21+ import unicornafl as UcAfl
1522
1623# Make sure Qiling uses our patched unicorn instead of it's own, second so without instrumentation!
17- unicornafl .monkeypatch ()
24+ UcAfl .monkeypatch ()
1825
19- import sys , os
20- from binascii import hexlify
26+ import os
27+ import sys
2128
22- from capstone import *
29+ from typing import Any , Optional
2330
2431sys .path .append ("../../.." )
25- from qiling import *
26-
27- # we cache this for some extra speed
28- stdin_fstat = os .fstat (sys .stdin .fileno ())
32+ from qiling import Qiling
33+ from qiling .const import QL_VERBOSE
34+ from qiling .os .posix import stat
2935
30- # This is mostly taken from the crackmes
3136class MyPipe ():
37+ """Fake stdin to handle incoming fuzzed keystrokes.
38+ """
39+
3240 def __init__ (self ):
3341 self .buf = b''
3442
35- def write (self , s ):
43+ def write (self , s : bytes ):
3644 self .buf += s
3745
38- def read (self , size ):
39- if size <= len (self .buf ):
40- ret = self .buf [: size ]
41- self .buf = self .buf [size :]
42- else :
43- ret = self .buf
44- self .buf = ''
46+ def read (self , size : int ) -> bytes :
47+ ret = self .buf [:size ]
48+ self .buf = self .buf [size :]
49+
4550 return ret
4651
47- def fileno (self ):
52+ def fileno (self ) -> int :
4853 return 0
4954
5055 def show (self ):
@@ -59,98 +64,62 @@ def flush(self):
5964 def close (self ):
6065 self .outpipe .close ()
6166
62- def fstat (self ):
63- return stdin_fstat
67+ def lseek (self , offset : int , origin : int ):
68+ pass
6469
70+ def fstat (self ):
71+ return stat .Fstat (self .fileno ())
6572
66- def main (input_file , enable_trace = False ):
73+ def main (input_file : str ):
6774 stdin = MyPipe ()
68- ql = Qiling (["./x8664_fuzz" ], "../../rootfs/x8664_linux" ,
69- stdin = stdin ,
70- stdout = 1 if enable_trace else None ,
71- stderr = 1 if enable_trace else None ,
72- console = True if enable_trace else False )
7375
74- # or this for output:
75- # ... stdout=sys.stdout, stderr=sys.stderr)
76+ ql = Qiling (["./x8664_fuzz" ], "../../rootfs/x8664_linux" ,
77+ verbose = QL_VERBOSE .OFF , # keep qiling logging off
78+ console = False , # thwart program output
79+ stdin = stdin , # redirect stdin to our fake one
80+ stdout = None ,
81+ stderr = None )
82+
83+ def place_input_callback (uc : UcAfl .Uc , input : bytes , persistent_round : int , data : Any ) -> Optional [bool ]:
84+ """Called with every newly generated input.
85+ """
7686
77- def place_input_callback (uc , input , _ , data ):
7887 stdin .write (input )
7988
8089 def start_afl (_ql : Qiling ):
90+ """Callback from inside.
8191 """
82- Callback from inside
83- """
92+
8493 # We start our AFL forkserver or run once if AFL is not available.
8594 # This will only return after the fuzzing stopped.
8695 try :
87- #print("Starting afl_fuzz().")
88- if not _ql .uc .afl_fuzz (input_file = input_file ,
89- place_input_callback = place_input_callback ,
90- exits = [ql .os .exit_point ]):
91- print ("Ran once without AFL attached." )
92- os ._exit (0 ) # that's a looot faster than tidying up.
93- except unicornafl .UcAflError as ex :
96+ if not _ql .uc .afl_fuzz (input_file = input_file , place_input_callback = place_input_callback , exits = [ql .os .exit_point ]):
97+ _ql .log .warning ("Ran once without AFL attached" )
98+ os ._exit (0 )
99+
100+ except UcAfl .UcAflError as ex :
94101 # This hook trigers more than once in this example.
95102 # If this is the exception cause, we don't care.
103+
96104 # TODO: Chose a better hook position :)
97- if ex != unicornafl .UC_AFL_RET_CALLED_TWICE :
105+ if ex . errno != UcAfl .UC_AFL_RET_CALLED_TWICE :
98106 raise
99-
100- # 64 bit loader addrs are placed at 0x7ffbf0100000
101- # see loader/elf.py:load_with_ld(..)
102- X64BASE = int (ql .profile .get ("OS64" , "load_address" ), 16 )
103-
104- # crash in case we reach stackcheck_fail:
105- # 1225: e8 16 fe ff ff callq 1040 <__stack_chk_fail@plt>
106- ql .hook_address (callback = lambda x : os .abort (), address = X64BASE + 0x1225 )
107-
108- # Add hook at main() that will fork Unicorn and start instrumentation.
109- # main starts at X64BASE + 0x122c
110- main_addr = X64BASE + 0x122c
111- ql .hook_address (callback = start_afl , address = main_addr )
112-
113- if enable_trace :
114- # The following lines are only for `-t` debug output
115-
116- md = Cs (CS_ARCH_X86 , CS_MODE_64 )
117- count = [0 ]
118-
119- def spaced_hex (data ):
120- return b' ' .join (hexlify (data )[i :i + 2 ] for i in range (0 , len (hexlify (data )), 2 )).decode ('utf-8' )
121-
122- def disasm (count , ql , address , size ):
123- buf = ql .mem .read (address , size )
124- try :
125- for i in md .disasm (buf , address ):
126- return "{:08X}\t {:08X}: {:24s} {:10s} {:16s}" .format (count [0 ], i .address , spaced_hex (buf ), i .mnemonic ,
127- i .op_str )
128- except :
129- import traceback
130- print (traceback .format_exc ())
131-
132- def trace_cb (ql , address , size , count ):
133- rtn = '{:100s}' .format (disasm (count , ql , address , size ))
134- print (rtn )
135- count [0 ] += 1
136-
137- ql .hook_code (trace_cb , count )
138-
139- # okay, ready to roll.
140- # try:
141- ql .run ()
142- # except Exception as ex:
143- # # Probable unicorn memory error. Treat as crash.
144- # print(ex)
145- # os.abort()
146107
147- os ._exit (0 ) # that's a looot faster than tidying up.
108+ # get image base address
109+ ba = ql .loader .images [0 ].base
110+
111+ # make process crash whenever __stack_chk_fail@plt is about to be called.
112+ # this way afl will count stack protection violations as crashes
113+ ql .hook_address (callback = lambda x : os .abort (), address = ba + 0x1225 )
148114
115+ # set a hook on main() to let unicorn fork and start instrumentation
116+ ql .hook_address (callback = start_afl , address = ba + 0x122c )
117+
118+ # okay, ready to roll
119+ ql .run ()
149120
150121if __name__ == "__main__" :
151122 if len (sys .argv ) == 1 :
152123 raise ValueError ("No input file provided." )
153- if len (sys .argv ) > 2 and sys .argv [1 ] == "-t" :
154- main (sys .argv [2 ], enable_trace = True )
155- else :
156- main (sys .argv [1 ])
124+
125+ main (sys .argv [1 ])
0 commit comments