Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,4 @@ result
.direnv

scratchpad.py
injection_dll.dll
6 changes: 3 additions & 3 deletions memobj/hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def post_hook(self):
pass

def hook(self) -> Any:
raise NotImplemented()
raise NotImplementedError()

def unhook(self):
pass
Expand All @@ -94,7 +94,7 @@ def active(self) -> bool:
def get_code(
self,
) -> list[Instruction]:
raise NotImplemented()
raise NotImplementedError()

def allocate_variable(self, name: str, size: int) -> Allocation:
if self._variables.get("name") is not None:
Expand Down Expand Up @@ -326,7 +326,7 @@ def get_jump_code(self, hook_address: int, noops_needed: int) -> list[Instructio
return jump_instructions

def get_code(self) -> list[Instruction]:
raise NotImplemented()
raise NotImplementedError()


# NOTE: this registertype is kinda mid, we may need to provide our own
Expand Down
2 changes: 2 additions & 0 deletions memobj/process/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
from .base import Process
from .module import Module
from .windows.process import WindowsProcess
from .windows.module import WindowsModule
11 changes: 10 additions & 1 deletion memobj/process/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import struct
import typing
from pathlib import Path
from typing import Any, Self, Literal, TypeAlias
from typing import Any, Self, Literal, TypeAlias, Iterator

import regex

Expand All @@ -22,6 +22,13 @@
class Process:
"""A connected process"""

@functools.cached_property
def process_id(self) -> int:
"""
The process id
"""
raise NotImplementedError()

@functools.cached_property
def process_64_bit(self) -> bool:
"""
Expand Down Expand Up @@ -91,6 +98,8 @@ def from_id(cls, pid: int) -> Self:
"""
raise NotImplementedError()

def itererate_modules(self): ...

def allocate_memory(self, size: int) -> int:
"""
Allocate <size> amount of memory in the process
Expand Down
20 changes: 20 additions & 0 deletions memobj/process/module.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from dataclasses import dataclass

from .base import Process


@dataclass
class Module:
name: str
base_address: int
executable_path: str
size: int
process: Process

def get_symbols(self) -> dict[str, int]:
"""Get the module's symbols

Returns:
dict[str, int]: A mapping of module symbol to address
"""
raise NotImplementedError()
Empty file.
127 changes: 127 additions & 0 deletions memobj/process/windows/module.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""


process.get_symbol_address("user32.dll", "GetCursorPos") -> int
process.get_module("user32.dll").get_symbols()["GetCursorPos"] -> int

Module(...)
.name
.get_symbols()
.base_address
.executable_path
.size

.handle (windows only)
.process (multi-process helper)

"""
import ctypes

from typing import Self, Iterator, TYPE_CHECKING
from memobj.process import Module

from .utils import ModuleEntry32, CheckWindowsOsError

if TYPE_CHECKING:
from .process import WindowsProcess


INVALID_HANDLE_VALUE: int = -1
TH32CS_SNAPMODULE: int = 0x8


class WindowsModule(Module):
_symbols: dict[str, int] | None = None

# TODO: make a user facing iterface to this copying the object so it isnt changed while they're using it
# TODO: get wide character variants working
# adapted to python from https://learn.microsoft.com/en-us/windows/win32/toolhelp/traversing-the-module-list
@staticmethod
def _iter_modules(process: "WindowsProcess") -> Iterator[ModuleEntry32]:
"""
Note that the yielded modules are only valid for one iteration, i.e. references to them should not
be stored
"""
with CheckWindowsOsError():
# https://learn.microsoft.com/en-us/windows/win32/api/tlhelp32/nf-tlhelp32-createtoolhelp32snapshot
module_snapshot = ctypes.windll.kernel32.CreateToolhelp32Snapshot(TH32CS_SNAPMODULE, process.process_id)

if module_snapshot == INVALID_HANDLE_VALUE:
raise ValueError("Creating module snapshot failed")

module_entry = ModuleEntry32()
module_entry.dwSize = ctypes.sizeof(ModuleEntry32)

# https://learn.microsoft.com/en-us/windows/win32/api/tlhelp32/nf-tlhelp32-module32first
success = ctypes.windll.kernel32.Module32First(module_snapshot, ctypes.byref(module_entry))

if success == 0:
raise ValueError("Get first module failed")

yield module_entry

# https://learn.microsoft.com/en-us/windows/win32/api/tlhelp32/nf-tlhelp32-module32next
while ctypes.windll.kernel32.Module32Next(module_snapshot, ctypes.byref(module_entry)) != 0:
yield module_entry

# https://learn.microsoft.com/en-us/windows/win32/api/handleapi/nf-handleapi-closehandle
ctypes.windll.kernel32.CloseHandle(module_snapshot)

@classmethod
def from_name(cls, process: "WindowsProcess", name: str, *, ignore_case: bool = True) -> Self:
if ignore_case:
name = name.lower()

for module in cls._iter_modules(process):
module_name = module.szModule.decode()

# use another variable to preserve case in WindowsModule object
if ignore_case:
compare_name = module_name.lower()
else:
compare_name = module_name

if compare_name == name:
return cls(
name=module_name,
base_address=module.modBaseAddr,
executable_path=module.szExePath.decode(),
size=module.modBaseSize,
process=process,
)

raise ValueError(f"No modules named {name}")

def get_symbol_with_name(self, name: str) -> int:
try:
return self.get_symbols()[name]
except KeyError:
raise ValueError(f"No symbol named {name}")

def get_symbols(self) -> dict[str, int]:
if self._symbols is not None:
return self._symbols

# lazy import windows only library
import pefile

portable_executable = pefile.PE(self.executable_path)

# this api is really bad
if not hasattr(portable_executable, "DIRECTORY_ENTRY_EXPORT"):
self._symbols = {}
return {}

symbols: dict[str, int] = {}

for export in portable_executable.DIRECTORY_ENTRY_EXPORT.symbols: # type: ignore
if export.name:
symbols[export.name.decode()] = export.address + self.base_address

else:
symbols[f"Ordinal {export.ordinal}"] = export.address + self.base_address

self._symbols = symbols
return symbols


98 changes: 96 additions & 2 deletions memobj/process/windows/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import regex

from memobj.allocation import Allocator
from memobj.process.windows.module import WindowsModule
from memobj.process.windows.utils import (
CheckWindowsOsError,
WindowsModuleInfo,
Expand All @@ -21,10 +23,12 @@
from memobj.process import Process


# TODO: update everything that uses modules to use the new WindowsModule
class WindowsProcess(Process):
def __init__(self, process_handle: int):
self.process_handle = process_handle

# TODO: why are we getting debug privileges for our own process?
@staticmethod
def _get_debug_privileges():
with CheckWindowsOsError():
Expand Down Expand Up @@ -83,6 +87,15 @@ def _get_debug_privileges():
if adjust_token_success == 0:
raise RuntimeError("AdjustTokenPrivileges failed")

@functools.cached_property
def process_id(self) -> int:
# https://learn.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-getprocessid
maybe_process_id = ctypes.windll.kernel32.GetProcessId(self.process_handle)
if maybe_process_id == 0:
raise ValueError("GetProcessId failed")

return maybe_process_id

@functools.cached_property
def process_64_bit(self) -> bool:
# True = system 64 bit process 32 bit
Expand Down Expand Up @@ -119,7 +132,7 @@ def executable_path(self) -> Path:
return Path(file_name.value)

@classmethod
def from_name(cls, name: str, *, require_debug: bool = True) -> Self:
def from_name(cls, name: str, *, require_debug: bool = True, ignore_case: bool = True) -> Self:
with CheckWindowsOsError():
# https://learn.microsoft.com/en-us/windows/win32/api/tlhelp32/nf-tlhelp32-createtoolhelp32snapshot
snapshot = ctypes.windll.kernel32.CreateToolhelp32Snapshot(
Expand Down Expand Up @@ -148,8 +161,16 @@ def from_name(cls, name: str, *, require_debug: bool = True) -> Self:
if process_32_success == 0:
raise RuntimeError("Process32First failed")

if ignore_case:
name = name.lower()

while process_32_success:
if process_entry.szExeFile.decode() == name:
if ignore_case:
compare_name = process_entry.szExeFile.decode().lower()
else:
compare_name = process_entry.szExeFile.decode()

if compare_name == name:
return cls.from_id(process_entry.th32ProcessID, require_debug=require_debug)
process_32_success = ctypes.windll.kernel32.Process32Next(
snapshot,
Expand Down Expand Up @@ -182,6 +203,9 @@ def from_id(cls, pid: int, *, require_debug: bool = True) -> Self:

return cls(process_handle)

def create_allocator(self) -> Allocator:
return Allocator(self)

def allocate_memory(self, size: int, *, preferred_start: int | None = None) -> int: # type: ignore
"""
Allocate <size> amount of memory in the process
Expand Down Expand Up @@ -446,3 +470,73 @@ def get_module_named(self, name: str) -> WindowsModuleInfo:
return module

raise ValueError(f"No modules named {name}")

# note: platform dependent
def create_remote_thread(self, address: int, *, param_pointer: ctypes.c_void_p | None = None, thread_wait_time: int = 0) -> int:
"""Create a remote thread in the process

Args:
address (int): Address to call
param_pointer (ctypes.c_void_p | None, optional): Pointer to param to pass (must be allocated in process). Defaults to None.
thread_wait_time (int, optional): Amount of time to wait for thread to finish in milliseconds (pass -1 to wait forever). Defaults to 0.

Raises:
ValueError: If CreateRemoteThread
TimeoutError: If waiting for the thread times out

Returns:
int: A handle to the thread
"""
# https://learn.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-createremotethread
thread_handle = ctypes.windll.kernel32.CreateRemoteThread(
self.process_handle, # hProcess
0, # lpThreadAttributes
0, # dwStackSize
ctypes.c_void_p(address), # lpStartAddress
param_pointer if param_pointer is not None else 0, # lpParameter
0, # dwCreationFlags
0 # lpThreadId
)

if thread_handle == 0:
raise ValueError(f"CreateRemoteThread failed for address {hex(address)}")

# https://learn.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-waitforsingleobject
thread_status = ctypes.windll.kernel32.WaitForSingleObject(
thread_handle,
thread_wait_time
)

if thread_wait_time != 0 and thread_status != 0:
raise TimeoutError(f"Waiting for injected dll thread to finish failed: {thread_status}")

return thread_handle

# note: platform dependent
def inject_dll(self, path: Path | str) -> WindowsModule:
"""Inject a dll into process

Args:
path (Path | str): Filepath to the dll to inject

Returns:
WindowsModule: The injected module
"""

if isinstance(path, str):
path = Path(path)

encoded_path = str(path.absolute()).encode("utf-16le")

path_allocator = self.create_allocator()
path_allocation = path_allocator.allocate(len(encoded_path))

self.write_memory(path_allocation.address, encoded_path)

kernel32 = WindowsModule.from_name(self, "kernel32.dll")
LoadLibraryW = kernel32.get_symbol_with_name("LoadLibraryW")
# https://learn.microsoft.com/en-us/windows/win32/api/libloaderapi/nf-libloaderapi-loadlibraryw
self.create_remote_thread(LoadLibraryW, param_pointer=ctypes.c_void_p(path_allocation.address), thread_wait_time=-1)

path_allocator.close()
return WindowsModule.from_name(self, path.name)
Loading