diff --git a/pyplugins/wrappers/ptregs_wrap.py b/pyplugins/wrappers/ptregs_wrap.py index f3d0d9e4f..a66345fed 100644 --- a/pyplugins/wrappers/ptregs_wrap.py +++ b/pyplugins/wrappers/ptregs_wrap.py @@ -383,6 +383,51 @@ def get_syscall_number(self) -> Optional[int]: """Get syscall number. Subclasses must implement.""" return None + def get_retaddr(self) -> Optional[int]: + """ + Get the return address (best guess for this architecture). + Supports both generator and non-generator implementations in subclasses. + """ + try: + ret = self._get_retaddr() + return ret + except PandaMemReadFail: + pass + + def get_retaddr_portal(self) -> Generator[Optional[int], Any, Optional[int]]: + """ + Coroutine/generator version of get_retaddr for portal/coroutine use. + """ + try: + ret = self._get_retaddr() + return ret + except PandaMemReadFail as e: + if e.size == 4: + val = yield from plugins.mem.read_int(e.addr) + else: + val = yield from plugins.mem.read_long(e.addr) + return val + + def get_return_address(self) -> Optional[int]: + """ + Alias for get_retaddr. + """ + return self.get_retaddr() + + def _get_retaddr(self): + """ + Subclasses should override this to implement return address logic. + Can be a generator or a regular function. + """ + return None + + def in_kernel(self) -> bool: + """ + Returns True if the pt_regs represents kernel mode, False otherwise. + Subclasses should override for architecture-specific logic. + """ + return False + class X86PtRegsWrapper(PtRegsWrapper): """Wrapper for x86 (32-bit) pt_regs""" @@ -440,6 +485,19 @@ def get_syscall_number(self) -> Optional[int]: """Get syscall number from EAX register""" return self.get_register("orig_eax") + def _get_retaddr(self): + # On x86, return address is at [esp] (top of stack) + sp = self.get_sp() + if sp is not None: + return self._read_memory(sp, 4, 'ptr') + return None + + def in_kernel(self) -> bool: + # On x86, check CPL from eflags. CPL is in bits 12-13. + # CPL=0 is kernel mode. + eflags = self.get_register("eflags") + return eflags is not None and ((eflags >> 12) & 3) == 0 + class X86_64PtRegsWrapper(PtRegsWrapper): """Wrapper for x86_64 pt_regs""" @@ -602,6 +660,19 @@ def get_syscall_number(self) -> Optional[int]: return self._get_x86_delegate().get_syscall_number() return self.get_register("orig_rax") + def _get_retaddr(self): + # On x86_64, return address is at [rsp] (top of stack) + sp = self.get_sp() + if sp is not None: + return self._read_memory(sp, 8, 'ptr') + return None + + def in_kernel(self) -> bool: + # On x86_64, check CPL from eflags. CPL is in bits 12-13. + # CPL=0 is kernel mode. + eflags = self.get_register("eflags") + return eflags is not None and ((eflags >> 12) & 3) == 0 + class ArmPtRegsWrapper(PtRegsWrapper): """Wrapper for ARM pt_regs""" @@ -643,6 +714,16 @@ def get_syscall_number(self) -> Optional[int]: """Get syscall number from r7 register""" return self.get_register("r7") + def _get_retaddr(self): + # On ARM, link register (lr/r14) holds return address + return self.get_register("lr") + + def in_kernel(self) -> bool: + # On ARM, check CPSR mode bits (lowest 5 bits) + cpsr = self.get_register("cpsr") + # User mode is 0b10000 (0x10). Any other mode is privileged. + return cpsr is not None and (cpsr & 0x1F) != 0x10 + class AArch64PtRegsWrapper(PtRegsWrapper): """Wrapper for AArch64 pt_regs""" @@ -774,6 +855,16 @@ def get_syscall_number(self) -> Optional[int]: return self._get_arm_delegate().get_syscall_number() return self.get_register("syscallno") + def _get_retaddr(self): + # On AArch64, link register (x30/lr) holds return address + return self.get_register("lr") + + def in_kernel(self) -> bool: + # On AArch64, check PSTATE Exception Level bits (bits 2-3) + pstate = self.get_register("pstate") + # EL0 (user) is 0. EL1, EL2, EL3 are kernel/hypervisor. + return pstate is not None and ((pstate >> 2) & 3) != 0 + class MipsPtRegsWrapper(PtRegsWrapper): """Wrapper for MIPS pt_regs""" @@ -830,6 +921,16 @@ def get_syscall_number(self) -> Optional[int]: """Get syscall number from v0 register""" return self.get_register("v0") + def _get_retaddr(self): + # On MIPS, ra (r31) holds return address + return self.get_register("ra") + + def in_kernel(self) -> bool: + # On MIPS, check status register KUc bit (bit 1) + status = self.get_register("cp0_status") + # KUc == 0 means kernel mode. + return status is not None and ((status >> 1) & 1) == 0 + class Mips64PtRegsWrapper(MipsPtRegsWrapper): """Wrapper for MIPS64 pt_regs""" @@ -867,6 +968,10 @@ def get_userland_arg(self, num: int) -> Optional[int]: return self._read_memory(addr, 8, 'ptr') + def _get_retaddr(self): + # On MIPS64, ra (r31) holds return address + return self.get_register("ra") + class PowerPCPtRegsWrapper(PtRegsWrapper): """Wrapper for PowerPC pt_regs""" @@ -940,6 +1045,16 @@ def get_syscall_number(self) -> Optional[int]: """Get syscall number from r0 register""" return self.get_register("r0") + def _get_retaddr(self): + # On PowerPC, link register (lr) holds return address + return self.get_register("lr") + + def in_kernel(self) -> bool: + # On PowerPC, check MSR PR bit (bit 14). + msr = self.get_register("msr") + # PR (Problem state) == 0 means supervisor (kernel) mode. + return msr is not None and ((msr >> 14) & 1) == 0 + class PowerPC64PtRegsWrapper(PowerPCPtRegsWrapper): """Wrapper for PowerPC64 pt_regs""" @@ -1001,6 +1116,16 @@ def get_syscall_number(self) -> Optional[int]: """Get syscall number from a7 register""" return self.get_register("a7") + def _get_retaddr(self): + # On LoongArch64, ra (r1) holds return address + return self.get_register("ra") + + def in_kernel(self) -> bool: + # On LoongArch64, check CSR_PRMD's PPLV field (bits 0-1). + prmd = self.get_register("csr_prmd") + # PPLV == 0 means kernel mode (PLV0). + return prmd is not None and (prmd & 0x3) == 0 + class Riscv32PtRegsWrapper(PtRegsWrapper): """Wrapper for RISC-V 32-bit pt_regs""" @@ -1066,6 +1191,16 @@ def get_syscall_number(self) -> Optional[int]: """Get syscall number from a7 register""" return self.get_register("a7") + def _get_retaddr(self): + # On RISC-V, ra (x1) holds return address + return self.get_register("ra") + + def in_kernel(self) -> bool: + # On RISC-V, check status register SPP bit (bit 8). + status = self.get_register("status") + # If SPP is 1, previous mode was Supervisor (kernel). + return status is not None and ((status >> 8) & 1) == 1 + class Riscv64PtRegsWrapper(Riscv32PtRegsWrapper): """Wrapper for RISC-V 64-bit pt_regs - same structure as 32-bit but with 64-bit registers""" @@ -1087,6 +1222,10 @@ def get_userland_arg(self, num: int) -> Optional[int]: addr = sp + ((num - 8) * 8) return self._read_memory(addr, 8, 'ptr') + def _get_retaddr(self): + # On RISC-V 64, ra (x1) holds return address + return self.get_register("ra") + # --- Caching Factory ---