|
| 1 | +#!/usr/bin/env python3 |
| 2 | +""" |
| 3 | +Simple example of how to use Qiling together with AFLplusplus. |
| 4 | +This is tested with the recent Qiling framework (the one you cloned), |
| 5 | +afl++ from https://github.com/AFLplusplus/AFLplusplus |
| 6 | +
|
| 7 | +After building afl++, make sure you install `unicorn_mode/setup_unicorn.sh` |
| 8 | +
|
| 9 | +Then, run this file using afl++ unicorn mode with |
| 10 | +afl-fuzz -i ./afl_inputs -o ./afl_outputs -m none -U -- python3 ./fuzz_x8664_linux.py @@ |
| 11 | +""" |
| 12 | + |
| 13 | +# This is new. Instead of unicorn, we import unicornafl. It's the same Uc with some new `afl_` functions |
| 14 | +import unicornafl |
| 15 | + |
| 16 | +# Make sure Qiling uses our patched unicorn instead of it's own, second so without instrumentation! |
| 17 | +unicornafl.monkeypatch() |
| 18 | + |
| 19 | +import sys, os |
| 20 | +from binascii import hexlify |
| 21 | + |
| 22 | +from capstone import * |
| 23 | + |
| 24 | +sys.path.append("../../..") |
| 25 | +from qiling import * |
| 26 | + |
| 27 | +# we cache this for some extra speed |
| 28 | +stdin_fstat = os.fstat(sys.stdin.fileno()) |
| 29 | + |
| 30 | +# This is mostly taken from the crackmes |
| 31 | +class MyPipe(): |
| 32 | + def __init__(self): |
| 33 | + self.buf = b'' |
| 34 | + |
| 35 | + def write(self, s): |
| 36 | + self.buf += s |
| 37 | + |
| 38 | + def read(self, size): |
| 39 | + if size <= len(self.buf): |
| 40 | + ret = self.buf[: size] |
| 41 | + self.buf = self.buf[size:] |
| 42 | + else: |
| 43 | + ret = self.buf |
| 44 | + self.buf = '' |
| 45 | + return ret |
| 46 | + |
| 47 | + def fileno(self): |
| 48 | + return 0 |
| 49 | + |
| 50 | + def show(self): |
| 51 | + pass |
| 52 | + |
| 53 | + def clear(self): |
| 54 | + pass |
| 55 | + |
| 56 | + def flush(self): |
| 57 | + pass |
| 58 | + |
| 59 | + def close(self): |
| 60 | + self.outpipe.close() |
| 61 | + |
| 62 | + def fstat(self): |
| 63 | + return stdin_fstat |
| 64 | + |
| 65 | + |
| 66 | +def main(input_file, enable_trace=False): |
| 67 | + stdin = MyPipe() |
| 68 | + ql = Qiling(["./arm_fuzz"], "../../rootfs/arm_qnx", |
| 69 | + stdin=stdin, |
| 70 | + stdout=1 if enable_trace else None, |
| 71 | + stderr=1 if enable_trace else None, |
| 72 | + console = True if enable_trace else False) |
| 73 | + |
| 74 | + # or this for output: |
| 75 | + # ... stdout=sys.stdout, stderr=sys.stderr) |
| 76 | + |
| 77 | + def place_input_callback(uc, input, _, data): |
| 78 | + stdin.write(input) |
| 79 | + |
| 80 | + def start_afl(_ql: Qiling): |
| 81 | + """ |
| 82 | + Callback from inside |
| 83 | + """ |
| 84 | + # We start our AFL forkserver or run once if AFL is not available. |
| 85 | + # This will only return after the fuzzing stopped. |
| 86 | + try: |
| 87 | + #print("Starting afl_fuzz().") |
| 88 | + if not _ql.uc.afl_fuzz(input_file=input_file, |
| 89 | + place_input_callback=place_input_callback, |
| 90 | + exits=[ql.os.exit_point]): |
| 91 | + print("Ran once without AFL attached.") |
| 92 | + os._exit(0) # that's a looot faster than tidying up. |
| 93 | + except unicornafl.UcAflError as ex: |
| 94 | + # This hook trigers more than once in this example. |
| 95 | + # If this is the exception cause, we don't care. |
| 96 | + # TODO: Chose a better hook position :) |
| 97 | + if ex != unicornafl.UC_AFL_RET_CALLED_TWICE: |
| 98 | + raise |
| 99 | + |
| 100 | + LIBC_BASE = int(ql.profile.get("OS32", "interp_address"), 16) |
| 101 | + |
| 102 | + # crash in case we reach SignalKill |
| 103 | + ql.hook_address(callback=lambda x: os.abort(), address=LIBC_BASE + 0x456d4) |
| 104 | + |
| 105 | + # Add hook at main() that will fork Unicorn and start instrumentation. |
| 106 | + main_addr = 0x08048aa0 |
| 107 | + ql.hook_address(callback=start_afl, address=main_addr) |
| 108 | + |
| 109 | + if enable_trace: |
| 110 | + # The following lines are only for `-t` debug output |
| 111 | + md = ql.create_disassembler() |
| 112 | + count = [0] |
| 113 | + |
| 114 | + def spaced_hex(data): |
| 115 | + return b' '.join(hexlify(data)[i:i+2] for i in range(0, len(hexlify(data)), 2)).decode('utf-8') |
| 116 | + |
| 117 | + def disasm(count, ql, address, size): |
| 118 | + buf = ql.mem.read(address, size) |
| 119 | + try: |
| 120 | + for i in md.disasm(buf, address): |
| 121 | + return "{:08X}\t{:08X}: {:24s} {:10s} {:16s}".format(count[0], i.address, spaced_hex(buf), i.mnemonic, |
| 122 | + i.op_str) |
| 123 | + except: |
| 124 | + import traceback |
| 125 | + print(traceback.format_exc()) |
| 126 | + |
| 127 | + def trace_cb(ql, address, size, count): |
| 128 | + rtn = '{:100s}'.format(disasm(count, ql, address, size)) |
| 129 | + print(rtn) |
| 130 | + count[0] += 1 |
| 131 | + |
| 132 | + ql.hook_code(trace_cb, count) |
| 133 | + |
| 134 | + # okay, ready to roll. |
| 135 | + # try: |
| 136 | + ql.run() |
| 137 | + # except Exception as ex: |
| 138 | + # # Probable unicorn memory error. Treat as crash. |
| 139 | + # print(ex) |
| 140 | + # os.abort() |
| 141 | + |
| 142 | + os._exit(0) # that's a looot faster than tidying up. |
| 143 | + |
| 144 | + |
| 145 | +if __name__ == "__main__": |
| 146 | + if len(sys.argv) == 1: |
| 147 | + raise ValueError("No input file provided.") |
| 148 | + if len(sys.argv) > 2 and sys.argv[1] == "-t": |
| 149 | + main(sys.argv[2], enable_trace=True) |
| 150 | + else: |
| 151 | + main(sys.argv[1]) |
0 commit comments