Skip to content
1 change: 1 addition & 0 deletions examples/extensions/r2/hello_r2.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def my_sandbox(path, rootfs):
ql.hook_address(func, r2.functions['main'].offset)
# enable trace powered by r2 symsmap
# r2.enable_trace()
r2.set_backtrace(0x401906)
ql.run()

if __name__ == "__main__":
Expand Down
88 changes: 88 additions & 0 deletions examples/extensions/r2/mem_r2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import sys
from types import FrameType
sys.path.append('..')

from tests.test_elf import ELFTest
from qiling import Qiling
from qiling.const import QL_VERBOSE
from qiling.extensions.r2 import R2

def test_elf_linux_arm():
def my_puts(ql: Qiling):
params = ql.os.resolve_fcall_params(ELFTest.PARAMS_PUTS)
print(f'puts("{params["s"]}")')
# all_mem = ql.mem.save()
# for lbound, ubound, perm, _, _, _data in ql.mem.map_info:
# print(f"{lbound:#x} - {ubound:#x} {ubound - lbound:#x} {len(_data):#x} {perm:#x}")
# print()
# ql.mem.restore(all_mem)

ql = Qiling(["../examples/rootfs/arm_linux/bin/arm_stat64"], "../examples/rootfs/arm_linux", verbose=QL_VERBOSE.DEBUG)
ql.os.set_api('puts', my_puts)
ql.run()
del ql

def fn(frame: FrameType, msg, arg):
if msg == 'return':
print("Return: ", arg)
return
if msg != 'call': return
# Filter as appropriate
if 'memory' not in frame.f_code.co_filename: return
if '<' in frame.f_code.co_name: return
caller = frame.f_back.f_code.co_name
print("Called ", frame.f_code.co_name, "from ", caller)
for i in range(frame.f_code.co_argcount):
name = frame.f_code.co_varnames[i]
var = frame.f_locals[name]
if isinstance(var, (bytes, bytearray)):
var = f'{type(var)} len {len(var)}'
print(" Argument", name, "is", var)

sys.settrace(fn)

def unmap_hook(ql: "Qiling", access: int, addr: int, size: int, value: int):
print(f"Unmapped memory access at {addr:#x} - {addr + size:#x} with {value:#x} in type {access}")

def mem_cmp_hook(ql: "Qiling", addr: int, size: int):
mapinfo = ql.mem.map_info
for i, mem_region in enumerate(ql.uc.mem_regions()):
assert (mapinfo[i][0], mapinfo[i][1] - 1, mapinfo[i][2]) == mem_region
uc_mem = ql.mem.read(mem_region[0], mem_region[1] - mem_region[0] + 1)
data = ql.mem.map_info[i][5]
if uc_mem == data: continue
print(f"Memory region {i} {mem_region[0]:#x} - {mem_region[1]:#x} not equal to map_info from {addr:#x}")
for line in ql.mem.get_formatted_mapinfo():
print(line)
with open("mem.bin", "wb") as f:
f.write(uc_mem)
with open("map.bin", "wb") as f:
f.write(data)
assert False

def addr_hook(ql: "Qiling"):
mapinfo = ql.mem.map_info
for i, mem_region in enumerate(ql.uc.mem_regions()):
if i != 8: continue
uc_mem = ql.mem.read(mem_region[0], mem_region[1] - mem_region[0] + 1)
with open('right.bin', 'wb') as f:
f.write(uc_mem)

if __name__ == '__main__':
# from tests.test_shellcode import X8664_LIN
env = {'LD_DEBUG': 'all'}
# ql = Qiling(rootfs="rootfs/x8664_linux", code=X8664_LIN, archtype="x8664", ostype="linux", verbose=QL_VERBOSE.DEBUG)
# ql = Qiling(["rootfs/x86_windows/bin/x86_hello.exe"], "rootfs/x86_windows")
# ql = Qiling(["rootfs/arm_linux/bin/arm_hello_static"], "rootfs/arm_linux", verbose=QL_VERBOSE.DISASM)
# ql = Qiling(["rootfs/arm_linux/bin/arm_hello"], "rootfs/arm_linux", env=env, verbose=QL_VERBOSE.DEBUG)
ql = Qiling(["rootfs/x86_linux/bin/x86_hello"], "rootfs/x86_linux", verbose=QL_VERBOSE.DEBUG)
# ql.hook_mem_unmapped(unmap_hook)
# ql.hook_code(mem_cmp_hook)
# mprot_addr = 0x047d4824
# ql.hook_address(addr_hook, mprot_addr)
# ql.debugger = 'qdb'
# ql = Qiling(["rootfs/x8664_linux/bin/testcwd"], "rootfs/x8664_linux", verbose=QL_VERBOSE.DEBUG)
for line in ql.mem.get_formatted_mapinfo():
print(line)
ql.run()
# test_elf_linux_arm()
2 changes: 1 addition & 1 deletion qiling/arch/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self, ql: Qiling):

@lru_cache(maxsize=64)
def get_base_and_name(self, addr: int) -> Tuple[int, str]:
for begin, end, _, name, _ in self.ql.mem.map_info:
for begin, end, _, name, _, _ in self.ql.mem.map_info:
if begin <= addr < end:
return begin, basename(name)

Expand Down
75 changes: 75 additions & 0 deletions qiling/extensions/r2/callstack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from dataclasses import dataclass
from typing import Iterator, Optional


@dataclass
class CallStack:
"""Linked Frames
See https://github.com/angr/angr/blob/master/angr/state_plugins/callstack.py
"""
addr: int
sp: int
bp: int
name: str = None # 'name + offset'
next: Optional['CallStack'] = None

def __iter__(self) -> Iterator['CallStack']:
"""
Iterate through the callstack, from top to bottom
(most recent first).
"""
i = self
while i is not None:
yield i
i = i.next

def __getitem__(self, k):
"""
Returns the CallStack at index k, indexing from the top of the stack.
"""
orig_k = k
for i in self:
if k == 0:
return i
k -= 1
raise IndexError(orig_k)

def __len__(self):
"""
Get how many frames there are in the current call stack.

:return: Number of frames
:rtype: int
"""

o = 0
for _ in self:
o += 1
return o

def __repr__(self):
"""
Get a string representation.

:return: A printable representation of the CallStack object
:rtype: str
"""
return "<CallStack (depth %d)>" % len(self)

def __str__(self):
return "Backtrace:\n" + "\n".join(f"Frame {i}: [{f.name}] {f.addr:#x} sp={f.sp:#x}, bp={f.bp:#x}" for i, f in enumerate(self))

def __eq__(self, other):
if not isinstance(other, CallStack):
return False

if self.addr != other.addr or self.sp != other.sp or self.bp != other.bp:
return False

return self.next == other.next

def __ne__(self, other):
return not (self == other)

def __hash__(self):
return hash(tuple((c.addr, c.sp, c.bp) for c in self))
79 changes: 61 additions & 18 deletions qiling/extensions/r2/r2.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from qiling.const import QL_ARCH
from qiling.extensions import trace
from unicorn import UC_PROT_NONE, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC, UC_PROT_ALL
from .callstack import CallStack

if TYPE_CHECKING:
from qiling.core import Qiling
Expand Down Expand Up @@ -141,10 +142,8 @@ def __init__(self, ql: "Qiling", baseaddr=(1 << 64) - 1, loadaddr=0):
self.loadaddr = loadaddr # r2 -m [addr] map file at given address
self.analyzed = False
self._r2c = libr.r_core.r_core_new()
if ql.code:
self._setup_code(ql.code)
else:
self._setup_file(ql.path)
self._r2i = ctypes.cast(self._r2c.contents.io, ctypes.POINTER(libr.r_io.struct_r_io_t))
self._setup_mem(ql)

def _qlarch2r(self, archtype: QL_ARCH) -> str:
return {
Expand All @@ -161,20 +160,21 @@ def _qlarch2r(self, archtype: QL_ARCH) -> str:
QL_ARCH.PPC: "ppc",
}[archtype]

def _setup_code(self, code: bytes):
path = f'malloc://{len(code)}'.encode()
fh = libr.r_core.r_core_file_open(self._r2c, path, UC_PROT_ALL, self.loadaddr)
libr.r_core.r_core_bin_load(self._r2c, path, self.baseaddr)
self._cmd(f'wx {code.hex()}')
def _rbuf_map(self, buf: bytearray, perm: int = UC_PROT_ALL, addr: int = 0, delta: int = 0):
rbuf = libr.r_buf_new_with_pointers(ctypes.c_ubyte.from_buffer(buf), len(buf), False)
rbuf = ctypes.cast(rbuf, ctypes.POINTER(libr.r_io.struct_r_buf_t))
desc = libr.r_io_open_buffer(self._r2i, rbuf, perm, 0) # last arg `mode` is always 0 in r2 code
libr.r_io.r_io_map_add(self._r2i, desc.contents.fd, desc.contents.perm, delta, addr, len(buf))

def _setup_mem(self, ql: 'Qiling'):
if not hasattr(ql, '_mem'):
return
for start, end, perms, _label, _mmio, buf in ql.mem.map_info:
self._rbuf_map(buf, perms, start)
# set architecture and bits for r2 asm
arch = self._qlarch2r(self.ql.arch.type)
self._cmd(f"e,asm.arch={arch},asm.bits={self.ql.arch.bits}")

def _setup_file(self, path: str):
path = path.encode()
fh = libr.r_core.r_core_file_open(self._r2c, path, UC_PROT_READ | UC_PROT_EXEC, self.loadaddr)
libr.r_core.r_core_bin_load(self._r2c, path, self.baseaddr)

arch = self._qlarch2r(ql.arch.type)
self._cmd(f"e,asm.arch={arch},asm.bits={ql.arch.bits}")

def _cmd(self, cmd: str) -> str:
r = libr.r_core.r_core_cmd_str(
self._r2c, ctypes.create_string_buffer(cmd.encode("utf-8")))
Expand Down Expand Up @@ -268,6 +268,40 @@ def dis_nbytes(self, addr: int, size: int) -> List[Instruction]:
insts = [Instruction(**dic) for dic in self._cmdj(f"pDj {size} @ {addr}")]
return insts

def dis_ninsts(self, addr: int, n: int=1) -> List[Instruction]:
insts = [Instruction(**dic) for dic in self._cmdj(f"pdj {n} @ {addr}")]
return insts

def _backtrace_fuzzy(self, at: int = None, depth: int = 128) -> Optional[CallStack]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More arch support

'''Fuzzy backtrace, see https://github.com/radareorg/radare2/blob/master/libr/debug/p/native/bt/fuzzy_all.c#L38
Args:
at: address to start walking stack, default to current SP
depth: limit of stack walking
Returns:
List of Frame
'''
sp = at or self.ql.arch.regs.arch_sp
wordsize = self.ql.arch.bits // 8
frame = None
cursp = oldsp = sp
for i in range(depth):
addr = self.ql.stack_read(i * wordsize)
inst = self.dis_ninsts(addr)[0]
if inst.type.lower() == 'call':
newframe = CallStack(addr=addr, sp=cursp, bp=oldsp, name=self.at(addr), next=frame)
frame = newframe
oldsp = cursp
cursp += wordsize
return frame

def set_backtrace(self, target: Union[int, str]):
'''Set backtrace at target address before executing'''
if isinstance(target, str):
target = self.where(target)
def bt_hook(__ql: "Qiling", *args):
print(self._backtrace_fuzzy())
self.ql.hook_address(bt_hook, target)

def disassembler(self, ql: 'Qiling', addr: int, size: int, filt: Pattern[str]=None) -> int:
'''A human-friendly monkey patch of QlArchUtils.disassembler powered by r2, can be used for hook_code
:param ql: Qiling instance
Expand All @@ -279,7 +313,7 @@ def disassembler(self, ql: 'Qiling', addr: int, size: int, filt: Pattern[str]=No
anibbles = ql.arch.bits // 4
progress = 0
for inst in self.dis_nbytes(addr, size):
if inst.type.lower() == 'invalid':
if inst.type.lower() in ('invalid', 'ill'):
break # stop disasm
name, offset = self.at(inst.offset, parse=True)
if filt is None or filt.search(name):
Expand All @@ -301,5 +335,14 @@ def enable_trace(self, mode='full'):
elif mode == 'history':
trace.enable_history_trace(self.ql)

def shell(self):
while True:
offset = self._r2c.contents.offset
print(f"[{offset:#x}]> ", end="")
cmd = input()
if cmd.strip() == "q":
break
print(self._cmd(cmd))

def __del__(self):
libr.r_core.r_core_free(self._r2c)
Loading