diff --git a/examples/fuzzing/dlink_dir815/dir815_mips32el_linux.py b/examples/fuzzing/dlink_dir815/dir815_mips32el_linux.py index 28afce921..22ea02ffb 100644 --- a/examples/fuzzing/dlink_dir815/dir815_mips32el_linux.py +++ b/examples/fuzzing/dlink_dir815/dir815_mips32el_linux.py @@ -5,7 +5,7 @@ # Everything about the bug and firmware https://www.exploit-db.com/exploits/33863 -import os,sys +import sys sys.path.append("../../..") from qiling import Qiling @@ -13,7 +13,7 @@ from qiling.extensions.afl import ql_afl_fuzz -def main(input_file, enable_trace=False): +def main(input_file: str): env_vars = { "REQUEST_METHOD": "POST", @@ -24,40 +24,36 @@ def main(input_file, enable_trace=False): # "CONTENT_LENGTH": "8", # no needed } - ql = Qiling(["./rootfs/htdocs/web/hedwig.cgi"], "./rootfs", - verbose=QL_VERBOSE.DEBUG, env=env_vars, console=enable_trace) + ql = Qiling(["./rootfs/htdocs/web/hedwig.cgi"], "./rootfs", verbose=QL_VERBOSE.DISABLED, env=env_vars) - def place_input_callback(ql: Qiling, input: bytes, _: int): - env_var = ("HTTP_COOKIE=uid=1234&password=").encode() - env_vars = env_var + input + b"\x00" + (ql.path).encode() + b"\x00" - ql.mem.write(ql.target_addr, env_vars) + def place_input_callback(ql: Qiling, data: bytes, _: int) -> bool: + # construct the payload + payload = b''.join((b"HTTP_COOKIE=uid=1234&password=", bytes(data), b"\x00", ql_path, b"\x00")) - def start_afl(_ql: Qiling): + # patch the value of 'HTTP_COOKIE' in memory + ql.mem.write(target_addr, payload) + + # payload is in place, we are good to go + return True + def start_afl(_ql: Qiling): """ Callback from inside """ + ql_afl_fuzz(_ql, input_file=input_file, place_input_callback=place_input_callback, exits=[ql.os.exit_point]) - addr = ql.mem.search("HTTP_COOKIE=uid=1234&password=".encode()) - ql.target_addr = addr[0] + addr = ql.mem.search(b"HTTP_COOKIE=uid=1234&password=") + target_addr = addr[0] + ql_path = ql.path.encode() - main_addr = ql.loader.elf_entry - ql.hook_address(callback=start_afl, address=main_addr) + ql.hook_address(start_afl, ql.loader.elf_entry) - try: - ql.run() - os._exit(0) - except: - if enable_trace: - print("\nFuzzer Went Shit") - os._exit(0) + ql.run() if __name__ == "__main__": - if len(sys.argv) == 1: + if len(sys.argv) < 2: raise ValueError("No input file provided.") - if len(sys.argv) > 2 and sys.argv[1] == "-t": - main(sys.argv[2], enable_trace=True) - else: - main(sys.argv[1]) + + main(sys.argv[1]) diff --git a/examples/sality.py b/examples/sality.py index 22d6f6515..be05753ba 100644 --- a/examples/sality.py +++ b/examples/sality.py @@ -159,7 +159,7 @@ def hook_StartServiceA(ql: Qiling, address: int, params): init_unseen_symbols(ql.amsint32_driver, ntoskrnl.base+0xb7695, b"NtTerminateProcess", 0, "ntoskrnl.exe") #ql.amsint32_driver.debugger= ":9999" try: - ql.amsint32_driver.load() + ql.amsint32_driver.run() return 1 except UcError as e: print("Load driver error: ", e) diff --git a/examples/tendaac1518_httpd.py b/examples/tendaac1518_httpd.py index 0a32fd275..165aff1f2 100644 --- a/examples/tendaac1518_httpd.py +++ b/examples/tendaac1518_httpd.py @@ -78,6 +78,8 @@ def __vfork(ql: Qiling): ql.os.set_syscall('vfork', __vfork) + os.unlink(fr'{ROOTFS}/proc/sys/kernel/core_pattern') + ql.run() diff --git a/qiling/arch/utils.py b/qiling/arch/utils.py index 6782c9321..4d44b545f 100644 --- a/qiling/arch/utils.py +++ b/qiling/arch/utils.py @@ -48,7 +48,7 @@ def get_base_and_name(self, addr: int) -> Tuple[int, str]: return addr, '-' def disassembler(self, ql: Qiling, address: int, size: int): - data = ql.mem.read(address, size) + data = memoryview(ql.mem.read(address, size)) # knowing that all binary sections are aligned to page boundary allows # us to 'cheat' and search for the containing image using the aligned @@ -64,11 +64,14 @@ def disassembler(self, ql: Qiling, address: int, size: int): ba, name = self.get_base_and_name(ql.mem.align(address)) anibbles = ql.arch.bits // 4 + pos = 0 - for insn in ql.arch.disassembler.disasm(data, address): - offset = insn.address - ba + for iaddr, isize, mnem, ops in ql.arch.disassembler.disasm_lite(data, address): + offset = iaddr - ba + ibytes = data[pos:pos + isize] - ql.log.info(f'{insn.address:0{anibbles}x} [{name:20s} + {offset:#08x}] {insn.bytes.hex(" "):20s} {insn.mnemonic:20s} {insn.op_str}') + ql.log.info(f'{iaddr:0{anibbles}x} [{name:20s} + {offset:#08x}] {ibytes.hex():22s} {mnem:16s} {ops}') + pos += isize if ql.verbose >= QL_VERBOSE.DUMP: for reg in ql.arch.regs.register_mapping: diff --git a/qiling/cc/__init__.py b/qiling/cc/__init__.py index 99c9e5643..a1f354818 100644 --- a/qiling/cc/__init__.py +++ b/qiling/cc/__init__.py @@ -70,6 +70,12 @@ def setReturnValue(self, val: int) -> None: raise NotImplementedError + def getReturnAddress(self) -> int: + """Get function return address. + """ + + raise NotImplementedError + def setReturnAddress(self, addr: int) -> None: """Set function return address. diff --git a/qiling/cc/arm.py b/qiling/cc/arm.py index 51d798b23..29ac126be 100644 --- a/qiling/cc/arm.py +++ b/qiling/cc/arm.py @@ -21,17 +21,22 @@ class QlArmBaseCC(QlCommonBaseCC): def getNumSlots(argbits: int) -> int: return 1 + def getReturnAddress(self) -> int: + return self.arch.regs.lr + def setReturnAddress(self, addr: int) -> None: self.arch.regs.lr = addr def unwind(self, nslots: int) -> int: # TODO: cleanup? - return self.arch.regs.lr + return self.getReturnAddress() + class aarch64(QlArmBaseCC): _retreg = UC_ARM64_REG_X0 _argregs = make_arg_list(UC_ARM64_REG_X0, UC_ARM64_REG_X1, UC_ARM64_REG_X2, UC_ARM64_REG_X3, UC_ARM64_REG_X4, UC_ARM64_REG_X5, UC_ARM64_REG_X6, UC_ARM64_REG_X7) + class aarch32(QlArmBaseCC): _retreg = UC_ARM_REG_R0 _argregs = make_arg_list(UC_ARM_REG_R0, UC_ARM_REG_R1, UC_ARM_REG_R2, UC_ARM_REG_R3) diff --git a/qiling/cc/intel.py b/qiling/cc/intel.py index ca1796034..f2e6971d1 100644 --- a/qiling/cc/intel.py +++ b/qiling/cc/intel.py @@ -15,6 +15,9 @@ class QlIntelBaseCC(QlCommonBaseCC): Supports arguments passing over registers and stack. """ + def getReturnAddress(self) -> int: + return self.arch.stack_read(0) + def setReturnAddress(self, addr: int) -> None: self.arch.stack_push(addr) diff --git a/qiling/cc/mips.py b/qiling/cc/mips.py index 9ebf23375..472b2a3ec 100644 --- a/qiling/cc/mips.py +++ b/qiling/cc/mips.py @@ -12,6 +12,9 @@ class mipso32(QlCommonBaseCC): _shadow = 4 _retaddr_on_stack = False + def getReturnAddress(self) -> int: + return self.arch.regs.ra + def setReturnAddress(self, addr: int): self.arch.regs.ra = addr diff --git a/qiling/cc/ppc.py b/qiling/cc/ppc.py index 2440fab15..b4a88f791 100644 --- a/qiling/cc/ppc.py +++ b/qiling/cc/ppc.py @@ -22,5 +22,8 @@ class ppc(QlCommonBaseCC): def getNumSlots(argbits: int): return 1 + def getReturnAddress(self) -> int: + return self.arch.regs.lr + def setReturnAddress(self, addr: int): self.arch.regs.lr = addr diff --git a/qiling/cc/riscv.py b/qiling/cc/riscv.py index 3a360bd8d..f9f09522c 100644 --- a/qiling/cc/riscv.py +++ b/qiling/cc/riscv.py @@ -22,5 +22,8 @@ class riscv(QlCommonBaseCC): def getNumSlots(argbits: int): return 1 + def getReturnAddress(self) -> int: + return self.arch.regs.ra + def setReturnAddress(self, addr: int): self.arch.regs.ra = addr diff --git a/qiling/core_struct.py b/qiling/core_struct.py index 6c0d99cca..f10fd42f4 100644 --- a/qiling/core_struct.py +++ b/qiling/core_struct.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# +# # Cross Platform and Multi Architecture Advanced Binary Emulation Framework # @@ -25,14 +25,14 @@ def __init__(self, endian: QL_ENDIAN, bit: int): QL_ENDIAN.EB: '>' }[endian] - self._fmt8 = f'{modifier}B' - self._fmt8s = f'{modifier}b' - self._fmt16 = f'{modifier}H' - self._fmt16s = f'{modifier}h' - self._fmt32 = f'{modifier}I' - self._fmt32s = f'{modifier}i' - self._fmt64 = f'{modifier}Q' - self._fmt64s = f'{modifier}q' + self._fmt8 = struct.Struct(f'{modifier}B') + self._fmt8s = struct.Struct(f'{modifier}b') + self._fmt16 = struct.Struct(f'{modifier}H') + self._fmt16s = struct.Struct(f'{modifier}h') + self._fmt32 = struct.Struct(f'{modifier}I') + self._fmt32s = struct.Struct(f'{modifier}i') + self._fmt64 = struct.Struct(f'{modifier}Q') + self._fmt64s = struct.Struct(f'{modifier}q') handlers = { 64 : (self.pack64, self.pack64s, self.unpack64, self.unpack64s), @@ -51,49 +51,49 @@ def __init__(self, endian: QL_ENDIAN, bit: int): self.unpacks = ups def pack64(self, x: int, /) -> bytes: - return struct.pack(self._fmt64, x) + return self._fmt64.pack(x) def pack64s(self, x: int, /) -> bytes: - return struct.pack(self._fmt64s, x) + return self._fmt64s.pack(x) def unpack64(self, x: ReadableBuffer, /) -> int: - return struct.unpack(self._fmt64, x)[0] + return self._fmt64.unpack(x)[0] def unpack64s(self, x: ReadableBuffer, /) -> int: - return struct.unpack(self._fmt64s, x)[0] + return self._fmt64s.unpack(x)[0] def pack32(self, x: int, /) -> bytes: - return struct.pack(self._fmt32, x) + return self._fmt32.pack(x) def pack32s(self, x: int, /) -> bytes: - return struct.pack(self._fmt32s, x) + return self._fmt32s.pack(x) def unpack32(self, x: ReadableBuffer, /) -> int: - return struct.unpack(self._fmt32, x)[0] + return self._fmt32.unpack(x)[0] def unpack32s(self, x: ReadableBuffer, /) -> int: - return struct.unpack(self._fmt32s, x)[0] + return self._fmt32s.unpack(x)[0] def pack16(self, x: int, /) -> bytes: - return struct.pack(self._fmt16, x) + return self._fmt16.pack(x) def pack16s(self, x: int, /) -> bytes: - return struct.pack(self._fmt16s, x) + return self._fmt16s.pack(x) def unpack16(self, x: ReadableBuffer, /) -> int: - return struct.unpack(self._fmt16, x)[0] + return self._fmt16.unpack(x)[0] def unpack16s(self, x: ReadableBuffer, /) -> int: - return struct.unpack(self._fmt16s, x)[0] + return self._fmt16s.unpack(x)[0] def pack8(self, x: int, /) -> bytes: - return struct.pack(self._fmt8, x) + return self._fmt8.pack(x) def pack8s(self, x: int, /) -> bytes: - return struct.pack(self._fmt8s, x) + return self._fmt8s.pack(x) def unpack8(self, x: ReadableBuffer, /) -> int: - return struct.unpack(self._fmt8, x)[0] + return self._fmt8.unpack(x)[0] def unpack8s(self, x: ReadableBuffer, /) -> int: - return struct.unpack(self._fmt8s, x)[0] + return self._fmt8s.unpack(x)[0] diff --git a/qiling/extensions/afl/afl.py b/qiling/extensions/afl/afl.py index 4aef943ee..4128af5a4 100644 --- a/qiling/extensions/afl/afl.py +++ b/qiling/extensions/afl/afl.py @@ -96,8 +96,8 @@ def ql_afl_fuzz_custom(ql: Qiling, def __place_input_wrapper(uc: Uc, input_bytes: Array[c_char], iters: int, context: Any) -> bool: return place_input_callback(ql, input_bytes.raw, iters) - def __validate_crash_wrapper(uc: Uc, result: int, input_bytes: bytes, iters: int, context: Any) -> bool: - return validate_crash_callback(ql, result, input_bytes, iters) + def __validate_crash_wrapper(uc: Uc, result: int, input_bytes: Array[c_char], iters: int, context: Any) -> bool: + return validate_crash_callback(ql, result, input_bytes.raw, iters) def __fuzzing_wrapper(uc: Uc, context: Any) -> int: return fuzzing_callback(ql) diff --git a/qiling/os/disk.py b/qiling/os/disk.py index 765712ac1..ddc68e4a3 100644 --- a/qiling/os/disk.py +++ b/qiling/os/disk.py @@ -1,23 +1,33 @@ #!/usr/bin/env python3 -# +# # Cross Platform and Multi Architecture Advanced Binary Emulation Framework # +from typing import AnyStr, Optional, Union from .mapper import QlFsMappedObject +ReadableBuffer = Union[bytes, bytearray, memoryview] + + # Open a file as a Disk # host_path: The file path on the host machine. # drive_path: The drive path on the emulated system. e.g. /dev/sda \\.\PHYSICALDRIVE0 0x80 -# +# # Note: CHS and LBA support is very limited since a raw file doesn't contain enough information. # We simply assume that it is a disk with 1 head, 1 cylinder and (filesize/512) sectors. +# # See: https://en.wikipedia.org/wiki/Cylinder-head-sector # https://en.wikipedia.org/wiki/Logical_block_addressing # http://www.uruk.org/orig-grub/PC_partitioning.txt + class QlDisk(QlFsMappedObject): - def __init__(self, host_path, drive_path, n_heads=1, n_cylinders=1, sector_size=512): - self._host_path = host_path + # 512 bytes/sector + # 63 sectors/track + # 255 heads (tracks/cylinder) + # 1024 cylinders + + def __init__(self, host_path: AnyStr, drive_path, n_cylinders: int = 1, n_heads: int = 1, sector_size: int = 512): self._drive_path = drive_path self._fp = open(host_path, "rb+") self._n_heads = n_heads @@ -25,7 +35,7 @@ def __init__(self, host_path, drive_path, n_heads=1, n_cylinders=1, sector_size= self._sector_size = sector_size self.lseek(0, 2) self._filesize = self.tell() - self._n_sectors = (self._filesize - 1)// self.sector_size + 1 + self._n_sectors = (self._filesize - 1) // self.sector_size + 1 def __del__(self): if not self.fp.closed: @@ -51,50 +61,43 @@ def n_cylinders(self): def sector_size(self): return self._sector_size - @property - def host_path(self): - return self._host_path - - @property - def drive_path(self): - return self._drive_path - @property def fp(self): return self._fp # Methods from FsMappedObject - def read(self, l): - return self.fp.read(l) - - def write(self, bs): - return self.fp.write(bs) + def read(self, size: Optional[int]) -> bytes: + return self.fp.read(size) - def lseek(self, offset, origin): + def write(self, buffer: ReadableBuffer) -> int: + return self.fp.write(buffer) + + def lseek(self, offset: int, origin: int) -> int: return self.fp.seek(offset, origin) - - def tell(self): + + def tell(self) -> int: return self.fp.tell() - def close(self): - return self.fp.close() - + def close(self) -> None: + self.fp.close() + # Methods for QlDisk - def lba(self, cylinder, head, sector): + def lba(self, cylinder: int, head: int, sector: int) -> int: return (cylinder * self.n_heads + head) * self._n_sectors + sector - 1 - - def read_sectors(self, lba, cnt): + + def read_sectors(self, lba: int, cnt: int) -> bytes: self.lseek(self.sector_size * lba, 0) - return self.read(self.sector_size*cnt) - - def read_chs(self, cylinder, head, sector, cnt): + + return self.read(self.sector_size * cnt) + + def read_chs(self, cylinder: int, head: int, sector: int, cnt: int) -> bytes: return self.read_sectors(self.lba(cylinder, head, sector), cnt) - def write_sectors(self, lba, cnt, buffer): - if len(buffer) > self.sector_size * cnt: - buffer = buffer[:self.sector_size*cnt] + def write_sectors(self, lba: int, cnt: int, buffer: ReadableBuffer) -> int: + buffer = memoryview(buffer) self.lseek(self.sector_size * lba, 0) - return self.write(buffer) - - def write_chs(self, cylinder, head, sector, cnt, buffer): - return self.write_sectors(self.lba(cylinder, head, sector), cnt, buffer) \ No newline at end of file + + return self.write(buffer[:self.sector_size * cnt]) + + def write_chs(self, cylinder: int, head: int, sector: int, cnt: int, buffer: ReadableBuffer): + return self.write_sectors(self.lba(cylinder, head, sector), cnt, buffer) diff --git a/qiling/os/dos/interrupts/int21.py b/qiling/os/dos/interrupts/int21.py index da9ea64e9..0b3dc02f4 100644 --- a/qiling/os/dos/interrupts/int21.py +++ b/qiling/os/dos/interrupts/int21.py @@ -9,11 +9,6 @@ from .. import utils -# exit -def __leaf_4c(ql: Qiling): - ql.log.info("Program terminated gracefully") - ql.emu_stop() - # write a character to screen def __leaf_02(ql: Qiling): ch = ql.arch.regs.dl @@ -131,6 +126,45 @@ def __leaf_43(ql: Qiling): ql.arch.regs.cx = 0xffff ql.os.clear_cf() + +def __leaf_48(ql: Qiling): + """Allocate memory. + """ + + size = ql.arch.regs.bx * 0x10 + + # announce it but do not do anything really + ql.log.debug(f'allocating memory block at {addr:#06x} to {size:#x} bytes') + + # success + ql.os.clear_cf() + + +def __leaf_49(ql: Qiling): + """Deallocate memory. + """ + ... + + +def __leaf_4a(ql: Qiling): + """Modify memory allocation. + """ + + addr = ql.arch.regs.es + size = ql.arch.regs.bx * 0x10 + + # announce it but do not do anything really + ql.log.debug(f'resizing memory block at {addr:#06x} to {size:#x} bytes') + + # success + ql.os.clear_cf() + + +def __leaf_4c(ql: Qiling): + ql.log.info("Program terminated gracefully") + ql.emu_stop() + + def handler(ql: Qiling): ah = ql.arch.regs.ah diff --git a/qiling/os/linux/kernel_api/kernel_api.py b/qiling/os/linux/kernel_api/kernel_api.py index c16ed3256..43510e3e7 100644 --- a/qiling/os/linux/kernel_api/kernel_api.py +++ b/qiling/os/linux/kernel_api/kernel_api.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# +# # Cross Platform and Multi Architecture Advanced Binary Emulation Framework # @@ -53,31 +53,63 @@ def hook_mcount(ql, address, params): return 0 -@linux_kernel_api(params={ - "Ptr": POINTER -}) -def hook___x86_indirect_thunk_rax(ql, address, params): - return 0 +def __x86_indirect_thunk(ql: Qiling, dest: int): + ql.log.debug('retpoline to %#010x', dest) + ql.arch.regs.arch_pc = dest + +# using passthru as a hack to avoid syscall handler overwrite instruction pointer +@linux_kernel_api(passthru=True) +def hook___x86_indirect_thunk_rax(ql: Qiling, address: int, params): + __x86_indirect_thunk(ql, ql.arch.regs.rax) -@linux_kernel_api(params={ - "Ptr": POINTER -}) -def hook__copy_to_user(ql, address, params): - return 0 + +@linux_kernel_api(passthru=True) +def hook___x86_indirect_thunk_r14(ql, address, params): + __x86_indirect_thunk(ql, ql.arch.regs.r14) @linux_kernel_api(params={ - "Ptr": POINTER + "ubuf": POINTER, + "kbuf": POINTER, + "count": SIZE_T }) -def hook__copy_from_user(ql, address, params): +def hook__copy_to_user(ql: Qiling, address: int, params) -> int: + ubuf = params['ubuf'] + kbuf = params['kbuf'] + count = params['count'] + + # if user-mode buffer is not available, fail + # TODO: also fail if destination is not writeable + if not ql.mem.is_mapped(ubuf, count): + return count + + data = ql.mem.read(kbuf, count) + + ql.mem.write(ubuf, data) + return 0 @linux_kernel_api(params={ - "Ptr": POINTER + "kbuf": POINTER, + "ubuf": POINTER, + "count": SIZE_T }) -def hook___x86_indirect_thunk_r14(ql, address, params): +def hook__copy_from_user(ql: Qiling, address: int, params) -> int: + ubuf = params['ubuf'] + kbuf = params['kbuf'] + count = params['count'] + + # if user-mode buffer is not available, fail + # TODO: also fail if source is not readable + if not ql.mem.is_mapped(ubuf, count): + return count + + data = ql.mem.read(ubuf, count) + + ql.mem.write(kbuf, data) + return 0 diff --git a/qiling/os/memory.py b/qiling/os/memory.py index ec7aef19f..4c0f86545 100644 --- a/qiling/os/memory.py +++ b/qiling/os/memory.py @@ -64,6 +64,31 @@ def __init__(self, ql: Qiling, pagesize: int = 0x1000): # make sure pagesize is a power of 2 assert self.pagesize & (self.pagesize - 1) == 0, 'pagesize has to be a power of 2' + self._packers = { + (1, True): ql.pack8s, + (2, True): ql.pack16s, + (4, True): ql.pack32s, + (8, True): ql.pack64s, + + (1, False): ql.pack8, + (2, False): ql.pack16, + (4, False): ql.pack32, + (8, False): ql.pack64 + } + + self._unpackers = { + (1, True): ql.unpack8s, + (2, True): ql.unpack16s, + (4, True): ql.unpack32s, + (8, True): ql.unpack64s, + + (1, False): ql.unpack8, + (2, False): ql.unpack16, + (4, False): ql.unpack32, + (8, False): ql.unpack64 + } + + def __read_string(self, addr: int) -> str: ret = bytearray() c = self.read(addr, 1) @@ -344,22 +369,12 @@ def read_ptr(self, addr: int, size: int = 0, *, signed = False) -> int: if not size: size = self.ql.arch.pointersize - __unpack = ({ - 1: self.ql.unpack8s, - 2: self.ql.unpack16s, - 4: self.ql.unpack32s, - 8: self.ql.unpack64s - } if signed else { - 1: self.ql.unpack8, - 2: self.ql.unpack16, - 4: self.ql.unpack32, - 8: self.ql.unpack64 - }).get(size) - - if __unpack is None: + try: + _unpack = self._unpackers[(size, signed)] + except KeyError: raise QlErrorStructConversion(f"Unsupported pointer size: {size}") - return __unpack(self.read(addr, size)) + return _unpack(self.read(addr, size)) def write(self, addr: int, data: bytes) -> None: """Write bytes to a memory. @@ -385,22 +400,12 @@ def write_ptr(self, addr: int, value: int, size: int = 0, *, signed = False) -> if not size: size = self.ql.arch.pointersize - __pack = ({ - 1: self.ql.pack8s, - 2: self.ql.pack16s, - 4: self.ql.pack32s, - 8: self.ql.pack64s - } if signed else { - 1: self.ql.pack8, - 2: self.ql.pack16, - 4: self.ql.pack32, - 8: self.ql.pack64 - }).get(size) - - if __pack is None: + try: + _pack = self._packers[(size, signed)] + except KeyError: raise QlErrorStructConversion(f"Unsupported pointer size: {size}") - self.write(addr, __pack(value)) + self.write(addr, _pack(value)) def search(self, needle: Union[bytes, Pattern[bytes]], begin: Optional[int] = None, end: Optional[int] = None) -> List[int]: """Search for a sequence of bytes in memory. diff --git a/qiling/os/posix/syscall/__init__.py b/qiling/os/posix/syscall/__init__.py index 38b10e64e..1ed1125e7 100644 --- a/qiling/os/posix/syscall/__init__.py +++ b/qiling/os/posix/syscall/__init__.py @@ -14,6 +14,7 @@ from .ptrace import * from .random import * from .resource import * +from .rseq import * from .sched import * from .select import * from .sendfile import * diff --git a/qiling/os/posix/syscall/rseq.py b/qiling/os/posix/syscall/rseq.py new file mode 100644 index 000000000..403595a65 --- /dev/null +++ b/qiling/os/posix/syscall/rseq.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python3 +# +# Cross Platform and Multi Architecture Advanced Binary Emulation Framework +# + +from qiling import Qiling + + +def ql_syscall_rseq(ql: Qiling, rseq: int, rseq_len: int, flags: int, sig: int): + # indicate rseq is not supported by this kernel + # return -ENOSYS + + return 0 diff --git a/qiling/os/uefi/UefiSpec.py b/qiling/os/uefi/UefiSpec.py index 2259e8c35..583ef6d89 100644 --- a/qiling/os/uefi/UefiSpec.py +++ b/qiling/os/uefi/UefiSpec.py @@ -10,6 +10,10 @@ from .UefiBaseType import * from .UefiMultiPhase import * +from .protocols.EfiSimpleTextInProtocol import EFI_SIMPLE_TEXT_INPUT_PROTOCOL +from .protocols.EfiSimpleTextOutProtocol import EFI_SIMPLE_TEXT_OUTPUT_PROTOCOL + + # definitions for EFI_TIME.Daylight EFI_TIME_ADJUST_DAYLIGHT = (1 << 1) EFI_TIME_IN_DAYLIGHT = (1 << 2) @@ -223,14 +227,6 @@ class EFI_CONFIGURATION_TABLE(STRUCT): ('VendorTable', PTR(VOID)), ] -# TODO: to be implemented -# @see: MdePkg\Include\Protocol\SimpleTextIn.h -EFI_SIMPLE_TEXT_INPUT_PROTOCOL = STRUCT - -# TODO: to be implemented -# @see: MdePkg\Include\Protocol\SimpleTextOut.h -EFI_SIMPLE_TEXT_OUTPUT_PROTOCOL = STRUCT - class EFI_SYSTEM_TABLE(STRUCT): _pack_ = 8 @@ -264,4 +260,4 @@ class EFI_SYSTEM_TABLE(STRUCT): 'EFI_DEVICE_PATH_PROTOCOL', 'EFI_OPEN_PROTOCOL_INFORMATION_ENTRY', 'EFI_IMAGE_UNLOAD' -] \ No newline at end of file +] diff --git a/qiling/os/uefi/fncc.py b/qiling/os/uefi/fncc.py index 83f999bf3..6294cbd74 100644 --- a/qiling/os/uefi/fncc.py +++ b/qiling/os/uefi/fncc.py @@ -1,14 +1,14 @@ #!/usr/bin/env python3 -# +# # Cross Platform and Multi Architecture Advanced Binary Emulation Framework # -from typing import Any, Mapping +from typing import Any, Mapping, Optional from qiling import Qiling from qiling.const import QL_INTERCEPT -def dxeapi(params: Mapping[str, Any] = {}): +def dxeapi(params: Optional[Mapping[str, Any]] = None, passthru: bool = False): def decorator(func): def wrapper(ql: Qiling): pc = ql.arch.regs.arch_pc @@ -18,7 +18,7 @@ def wrapper(ql: Qiling): onenter = ql.os.user_defined_api[QL_INTERCEPT.ENTER].get(fname) onexit = ql.os.user_defined_api[QL_INTERCEPT.EXIT].get(fname) - return ql.os.call(pc, f, params, onenter, onexit) + return ql.os.call(pc, f, params or {}, onenter, onexit, passthru) return wrapper diff --git a/qiling/os/uefi/protocols/EfiSimpleTextInProtocol.py b/qiling/os/uefi/protocols/EfiSimpleTextInProtocol.py new file mode 100644 index 000000000..1a8e3eedd --- /dev/null +++ b/qiling/os/uefi/protocols/EfiSimpleTextInProtocol.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 +# +# Cross Platform and Multi Architecture Advanced Binary Emulation Framework +# + +from qiling.os.const import * +from qiling.os.uefi.fncc import dxeapi +from qiling.os.uefi.utils import * +from qiling.os.uefi.ProcessorBind import * +from qiling.os.uefi.UefiBaseType import EFI_STATUS, EFI_EVENT + + +# @see: MdePkg/Include/Protocol/SimpleTextIn.h +class EFI_INPUT_KEY(STRUCT): + _fields_ = [ + ('ScanCode', UINT16), + ('UnicodeChar', CHAR16) + ] + +class EFI_SIMPLE_TEXT_INPUT_PROTOCOL(STRUCT): + EFI_SIMPLE_TEXT_INPUT_PROTOCOL = STRUCT + + _fields_ = [ + ('Reset', FUNCPTR(EFI_STATUS, PTR(EFI_SIMPLE_TEXT_INPUT_PROTOCOL), BOOLEAN)), + ('ReadKeyStroke', FUNCPTR(EFI_STATUS, PTR(EFI_SIMPLE_TEXT_INPUT_PROTOCOL), PTR(EFI_INPUT_KEY))), + ('WaitForKey', EFI_EVENT) + ] + + +@dxeapi(params={ + "This": POINTER, # IN PTR(EFI_SIMPLE_TEXT_OUTPUT_PROTOCOL) + "ExtendedVerification": BOOL # IN BOOLEAN +}) +def hook_Input_Reset(ql: Qiling, address: int, params): + pass + +@dxeapi(params={ + "This": POINTER, # IN PTR(EFI_SIMPLE_TEXT_OUTPUT_PROTOCOL) + "Key": POINTER # OUT PTR(EFI_INPUT_KEY) +}) +def hook_Read_Key_Stroke(ql: Qiling, address: int, params): + pass + + +def initialize(ql: Qiling, gIP: int): + descriptor = { + 'struct': EFI_SIMPLE_TEXT_INPUT_PROTOCOL, + 'fields': ( + ('Reset', hook_Input_Reset), + ('ReadKeyStroke', hook_Read_Key_Stroke), + ('WaitForKey', None) + ) + } + + instance = init_struct(ql, gIP, descriptor) + instance.save_to(ql.mem, gIP) diff --git a/qiling/os/uefi/protocols/EfiSimpleTextOutProtocol.py b/qiling/os/uefi/protocols/EfiSimpleTextOutProtocol.py new file mode 100644 index 000000000..d69cd3a37 --- /dev/null +++ b/qiling/os/uefi/protocols/EfiSimpleTextOutProtocol.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python3 +# +# Cross Platform and Multi Architecture Advanced Binary Emulation Framework +# + +from qiling.os.const import * +from qiling.os.uefi.fncc import dxeapi +from qiling.os.uefi.utils import * +from qiling.os.uefi.ProcessorBind import * +from qiling.os.uefi.UefiBaseType import EFI_STATUS + + +# @see: MdePkg/Include/Protocol/SimpleTextOut.h +class SIMPLE_TEXT_OUTPUT_MODE(STRUCT): + _fields_ = [ + ("MaxMode", INT32), + ("Mode", INT32), + ("Attribute", INT32), + ("CursorColumn", INT32), + ("CursorRow", INT32), + ("CursorVisible", BOOLEAN), + ] + + +class EFI_SIMPLE_TEXT_OUTPUT_PROTOCOL(STRUCT): + EFI_SIMPLE_TEXT_OUTPUT_PROTOCOL = STRUCT + + _fields_ = [ + ("Reset", FUNCPTR(EFI_STATUS, PTR(EFI_SIMPLE_TEXT_OUTPUT_PROTOCOL), BOOLEAN)), + ("OutputString", FUNCPTR(EFI_STATUS, PTR(EFI_SIMPLE_TEXT_OUTPUT_PROTOCOL), PTR(CHAR16))), + ("TestString", FUNCPTR(EFI_STATUS, PTR(EFI_SIMPLE_TEXT_OUTPUT_PROTOCOL), PTR(CHAR16))), + ("QueryMode", FUNCPTR(EFI_STATUS, PTR(EFI_SIMPLE_TEXT_OUTPUT_PROTOCOL), UINTN, PTR(UINTN), PTR(UINTN))), + ("SetMode", FUNCPTR(EFI_STATUS, PTR(EFI_SIMPLE_TEXT_OUTPUT_PROTOCOL), UINTN)), + ("SetAttribute", FUNCPTR(EFI_STATUS, PTR(EFI_SIMPLE_TEXT_OUTPUT_PROTOCOL), UINTN)), + ("ClearScreen", FUNCPTR(EFI_STATUS, PTR(EFI_SIMPLE_TEXT_OUTPUT_PROTOCOL))), + ("SetCursorPosition", FUNCPTR(EFI_STATUS, PTR(EFI_SIMPLE_TEXT_OUTPUT_PROTOCOL), UINTN, UINTN)), + ("EnableCursor", FUNCPTR(EFI_STATUS, PTR(EFI_SIMPLE_TEXT_OUTPUT_PROTOCOL), BOOLEAN)), + ("Mode", PTR(SIMPLE_TEXT_OUTPUT_MODE)) + ] + + +@dxeapi(params={ + "This": POINTER, # IN PTR(EFI_SIMPLE_TEXT_OUTPUT_PROTOCOL) + "ExtendedVerification": BOOL # IN BOOLEAN +}) +def hook_TextReset(ql: Qiling, address: int, params): + pass + +@dxeapi(params={ + "This": POINTER, # IN PTR(EFI_SIMPLE_TEXT_OUTPUT_PROTOCOL) + "String": WSTRING # IN PTR(CHAR16) +}) +def hook_OutputString(ql: Qiling, address: int, params): + print(params['String']) + + return EFI_SUCCESS + +@dxeapi(params={ + "This": POINTER, # IN PTR(EFI_SIMPLE_TEXT_OUTPUT_PROTOCOL) + "String": WSTRING # IN PTR(CHAR16) +}) +def hook_TestString(ql: Qiling, address: int, params): + pass + +@dxeapi(params={ + "This": POINTER, # IN PTR(EFI_SIMPLE_TEXT_OUTPUT_PROTOCOL) + "ModeNumber": ULONGLONG, # IN UINTN + "Columns": POINTER, # OUT PTR(UINTN) + "Rows": POINTER # OUT PTR(UINTN) +}) +def hook_QueryMode(ql: Qiling, address: int, params): + pass + +@dxeapi(params={ + "This": POINTER, # IN PTR(EFI_SIMPLE_TEXT_OUTPUT_PROTOCOL) + "ModeNumber": ULONGLONG # IN UINTN +}) +def hook_SetMode(ql: Qiling, address: int, params): + pass + +@dxeapi(params={ + "This": POINTER, # IN PTR(EFI_SIMPLE_TEXT_OUTPUT_PROTOCOL) + "Attribute": ULONGLONG # IN UINTN +}) +def hook_SetAttribute(ql: Qiling, address: int, params): + pass + +@dxeapi(params={ + "This": POINTER # IN PTR(EFI_SIMPLE_TEXT_OUTPUT_PROTOCOL) +}) +def hook_ClearScreen(ql: Qiling, address: int, params): + pass + +@dxeapi(params={ + "This": POINTER, # IN PTR(EFI_SIMPLE_TEXT_OUTPUT_PROTOCOL) + "Column": ULONGLONG, # IN UINTN + "Row": ULONGLONG # IN UINTN +}) +def hook_SetCursorPosition(ql: Qiling, address: int, params): + pass + +@dxeapi(params={ + "This": POINTER, # IN PTR(EFI_SIMPLE_TEXT_OUTPUT_PROTOCOL) + "Visible": BOOL # IN BOOLEAN +}) +def hook_EnableCursor(ql: Qiling, address: int, params): + pass + + +def initialize(ql: Qiling, base: int): + descriptor = { + 'struct': EFI_SIMPLE_TEXT_OUTPUT_PROTOCOL, + 'fields': ( + ('Reset', hook_TextReset), + ('OutputString', hook_OutputString), + ('TestString', hook_TestString), + ('QueryMode', hook_QueryMode), + ('SetMode', hook_SetMode), + ('SetAttribute', hook_SetAttribute), + ('ClearScreen', hook_ClearScreen), + ('SetCursorPosition', hook_SetCursorPosition), + ('EnableCursor', hook_EnableCursor), + ('Mode', None) + ) + } + + instance = init_struct(ql, base, descriptor) + instance.save_to(ql.mem, base) diff --git a/qiling/os/uefi/st.py b/qiling/os/uefi/st.py index b5fca9225..305c4664e 100644 --- a/qiling/os/uefi/st.py +++ b/qiling/os/uefi/st.py @@ -3,58 +3,81 @@ # Cross Platform and Multi Architecture Advanced Binary Emulation Framework # -from qiling import Qiling +from __future__ import annotations + +from typing import TYPE_CHECKING + from qiling.os.uefi import bs, rt, ds from qiling.os.uefi.context import UefiContext from qiling.os.uefi.utils import install_configuration_table -from qiling.os.uefi.UefiSpec import EFI_SYSTEM_TABLE, EFI_BOOT_SERVICES, EFI_RUNTIME_SERVICES +from qiling.os.uefi.UefiSpec import EFI_SYSTEM_TABLE, EFI_SIMPLE_TEXT_INPUT_PROTOCOL, EFI_SIMPLE_TEXT_OUTPUT_PROTOCOL, EFI_BOOT_SERVICES, EFI_RUNTIME_SERVICES + +import qiling.os.uefi.protocols.EfiSimpleTextInProtocol as txt_in +import qiling.os.uefi.protocols.EfiSimpleTextOutProtocol as txt_out + + +if TYPE_CHECKING: + from qiling import Qiling # static mem layout: # -# +-- EFI_SYSTEM_TABLE ---------+ -# | | -# | ... | -# | RuntimeServices* -> (1) | -# | BootServices* -> (2) | -# | NumberOfTableEntries | -# | ConfigurationTable* -> (4) | -# +-----------------------------+ -# (1) +-- EFI_RUNTIME_SERVICES -----+ -# | | -# | ... | -# +-----------------------------+ -# (2) +-- EFI_BOOT_SERVICES --------+ -# | | -# | ... | -# +-----------------------------+ -# (3) +-- EFI_DXE_SERVICES ---------+ -# | | -# | ... | -# +-----------------------------+ -# (4) +-- EFI_CONFIGURATION_TABLE --+ of HOB_LIST -# | VendorGuid | -# | VendorTable* -> (5) | -# +-----------------------------+ -# +-- EFI_CONFIGURATION_TABLE --+ of DXE_SERVICE_TABLE -# | VendorGuid | -# | VendorTable* -> (3) | -# +-----------------------------+ +# +-- EFI_SYSTEM_TABLE -----------------+ +# | | +# | ... | +# | ConIn* -> (1) | +# | ConOut* -> (2) | +# | RuntimeServices* -> (3) | +# | BootServices* -> (4) | +# | NumberOfTableEntries | +# | ConfigurationTable* -> (6) | +# +-------------------------------------+ +# (1) +-- EFI_SIMPLE_TEXT_INPUT_PROTOCOL ---+ +# | | +# | ... | +# +-------------------------------------+ +# (2) +-- EFI_SIMPLE_TEXT_OUTPUT_PROTOCOL --+ +# | | +# | ... | +# +-------------------------------------+ +# (3) +-- EFI_RUNTIME_SERVICES -------------+ +# | | +# | ... | +# +-------------------------------------+ +# (4) +-- EFI_BOOT_SERVICES ----------------+ +# | | +# | ... | +# +-------------------------------------+ +# (5) +-- EFI_DXE_SERVICES -----------------+ +# | | +# | ... | +# +-------------------------------------+ +# (6) +-- EFI_CONFIGURATION_TABLE ----------+ of HOB_LIST +# | VendorGuid | +# | VendorTable* -> (7) | +# +-------------------------------------+ +# +-- EFI_CONFIGURATION_TABLE ----------+ of DXE_SERVICE_TABLE +# | VendorGuid | +# | VendorTable* -> (5) | +# +-------------------------------------+ # # ... the remainder of the chunk may be used for additional EFI_CONFIGURATION_TABLE entries - +# # dynamically allocated (context.conf_table_data_ptr): # -# (5) +-- VOID* --------------------+ -# | ... | -# +-----------------------------+ +# (7) +-- VOID* ----------------------------+ +# | ... | +# +-------------------------------------+ + def initialize(ql: Qiling, context: UefiContext, gST: int): ql.loader.gST = gST - gBS = gST + EFI_SYSTEM_TABLE.sizeof() # boot services - gRT = gBS + EFI_BOOT_SERVICES.sizeof() # runtime services - gDS = gRT + EFI_RUNTIME_SERVICES.sizeof() # dxe services - cfg = gDS + ds.EFI_DXE_SERVICES.sizeof() # configuration tables array + sti = gST + EFI_SYSTEM_TABLE.sizeof() # input protocols + sto = sti + EFI_SIMPLE_TEXT_INPUT_PROTOCOL.sizeof() # output protocols + gRT = sto + EFI_SIMPLE_TEXT_OUTPUT_PROTOCOL.sizeof() # runtime services + gBS = gRT + EFI_RUNTIME_SERVICES.sizeof() # boot services + gDS = gBS + EFI_BOOT_SERVICES.sizeof() # dxe services + cfg = gDS + ds.EFI_DXE_SERVICES.sizeof() # configuration tables array ql.log.info(f'Global tables:') ql.log.info(f' | gST {gST:#010x}') @@ -63,11 +86,16 @@ def initialize(ql: Qiling, context: UefiContext, gST: int): ql.log.info(f' | gDS {gDS:#010x}') ql.log.info(f'') + txt_in.initialize(ql, sti) + txt_out.initialize(ql, sto) + bs.initialize(ql, gBS) rt.initialize(ql, gRT) ds.initialize(ql, gDS) EFI_SYSTEM_TABLE( + ConIn = sti, + ConOut = sto, RuntimeServices = gRT, BootServices = gBS, NumberOfTableEntries = 0, @@ -79,4 +107,4 @@ def initialize(ql: Qiling, context: UefiContext, gST: int): __all__ = [ 'initialize' -] \ No newline at end of file +]