Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 260 additions & 0 deletions pymhf/core/hashing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
import ctypes
import ctypes.wintypes as wintypes
import hashlib
import os
from io import BufferedReader

import psutil
import pymem
from pymem.ressources.structure import (
MEMORY_BASIC_INFORMATION,
MEMORY_BASIC_INFORMATION32,
MEMORY_BASIC_INFORMATION64,
MEMORY_PROTECTION,
MEMORY_STATE,
MEMORY_TYPES,
MODULEINFO,
SYSTEM_INFO,
)
from typing_extensions import TYPE_CHECKING, Any, TypeAlias, Union

from pymhf.utils.winapi import (
IMAGE_DOS_HEADER,
IMAGE_DOS_SIGNATURE,
IMAGE_FILE_HEADER,
IMAGE_NT_SIGNATURE,
IMAGE_SCN_MEM_EXECUTE,
IMAGE_SCN_MEM_WRITE,
IMAGE_SECTION_HEADER,
GetSystemInfo,
VirtualQueryEx,
)

if TYPE_CHECKING:
from ctypes import _CData, _Pointer, _SimpleCData

CDataLike: TypeAlias = (
_CData | _SimpleCData | _Pointer[Any] | ctypes.Structure | ctypes.Union | ctypes.Array[Any]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the place where it's using 3.10+ syntax...

)
else:
CDataLike = Any


def _is_hashable_page(mbi: Union[MEMORY_BASIC_INFORMATION32, MEMORY_BASIC_INFORMATION64]) -> bool:
"""Check if a memory page is suitable for hashing. The page must not change during runtime and/or
between runs."""
if mbi.State != MEMORY_STATE.MEM_COMMIT:
return False
if mbi.Type != MEMORY_TYPES.MEM_IMAGE:
return False
if mbi.Protect & (
MEMORY_PROTECTION.PAGE_GUARD
| MEMORY_PROTECTION.PAGE_WRITECOPY
| MEMORY_PROTECTION.PAGE_EXECUTE_WRITECOPY
):
return False
if mbi.Protect & (MEMORY_PROTECTION.PAGE_READWRITE | MEMORY_PROTECTION.PAGE_EXECUTE_READWRITE):
return False
if not (mbi.Protect & (MEMORY_PROTECTION.PAGE_EXECUTE | MEMORY_PROTECTION.PAGE_EXECUTE_READ)):
return False
return True


def _read_bytes_into(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect this function could be replaced with read_ctype in pymem...

pm_binary: pymem.Pymem,
address: int,
out_obj: CDataLike,
size: Union[int, None] = None,
out_bytes_read: ctypes.c_size_t = ctypes.c_size_t(),
raise_on_err: bool = True,
) -> bool:
"""Read bytes from the process memory into a `ctypes` object."""
if size is None:
size = ctypes.sizeof(out_obj)

try:
data = pm_binary.read_bytes(address, size)

buffer = (ctypes.c_char * len(data)).from_buffer_copy(data)
ctypes.memmove(ctypes.byref(out_obj), buffer, len(data))

out_bytes_read.value = len(data)
return True
except Exception as e:
out_bytes_read.value = 0
if raise_on_err:
raise OSError(f"Failed to read memory at 0x{address:X}") from e
return False


def _get_page_size() -> int:
"""Get the system page size. Defaults to 4096 if it cannot be determined."""
sys_info = SYSTEM_INFO()
GetSystemInfo(ctypes.byref(sys_info))
return sys_info.dwPageSize or 4096


def _get_main_module(pm_binary: pymem.Pymem) -> MODULEINFO:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

functions like this I think could also go in the winapi.py file.
I think for this file we can have probably just the two functions hash_bytes_from_file and hash_bytes_from_memory, and pretty much everything else can go under the other file.

binary_path = psutil.Process(pm_binary.process_id).exe().lower()
binary_exe = os.path.basename(binary_path).lower()

main_module = None
modules = list(pm_binary.list_modules())
for module in modules:
if module.filename.lower() == binary_path or module.name.lower() == binary_exe:
main_module = module
break
if not main_module:
main_module = modules[0] # Usually the first module is the main module
# Maybe raising an error or returning `None` here instead would be safer
# raise OSError(f"Could not find main module for process {pid}")

return main_module


def _get_sections_info(pm_binary: pymem.Pymem, address: int) -> tuple[int, int]:
"""Get the base address and number of sections in the PE file at the given address."""
dos_header = IMAGE_DOS_HEADER()
_read_bytes_into(pm_binary, address, dos_header)
if dos_header.e_magic != IMAGE_DOS_SIGNATURE:
raise ValueError(f"Invalid DOS header magic for address 0x{address:X}")

address += dos_header.e_lfanew
signature = wintypes.DWORD()
_read_bytes_into(pm_binary, address, signature)
if signature.value != IMAGE_NT_SIGNATURE:
raise ValueError(f"Invalid PE header signature for address 0x{address:X}")

address += ctypes.sizeof(wintypes.DWORD)
file_header = IMAGE_FILE_HEADER()
_read_bytes_into(pm_binary, address, file_header)

num_sections = int(file_header.NumberOfSections)
opt_header_size = int(file_header.SizeOfOptionalHeader)
sections_base = address + ctypes.sizeof(IMAGE_FILE_HEADER) + opt_header_size

return sections_base, num_sections


def _get_read_only_sections(
pm_binary: pymem.Pymem,
sections_base: int,
num_sections: int,
max_module_size: int,
):
"""Get a list of read-only sections in the PE file at the given address."""
sections = []
for i in range(num_sections):
section_address = sections_base + i * ctypes.sizeof(IMAGE_SECTION_HEADER)
section_header = IMAGE_SECTION_HEADER()
_read_bytes_into(pm_binary, section_address, section_header)

characteristics = section_header.Characteristics
if not (characteristics & IMAGE_SCN_MEM_EXECUTE) or (characteristics & IMAGE_SCN_MEM_WRITE):
continue

virtual_addr = int(section_header.VirtualAddress)
virtual_size = int(section_header.Misc.VirtualSize) or int(section_header.SizeOfRawData)
if virtual_addr == 0 or virtual_size == 0:
continue

end_addr = min(virtual_addr + virtual_size, max_module_size)
if end_addr <= virtual_addr:
continue

section = (
virtual_addr,
end_addr - virtual_addr,
bytes(bytearray(section_header.Name)).rstrip(b"\x00").decode(errors="ignore"),
)
sections.append(section)

return sections


def hash_bytes_from_file(fileobj: BufferedReader, _bufsize: int = 2**18) -> str:
# Essentially implement hashlib.file_digest since it's python 3.11+
# cf. https://github.com/python/cpython/blob/main/Lib/hashlib.py#L195
digestobj = hashlib.sha1()
buf = bytearray(_bufsize) # Reusable buffer to reduce allocations.
view = memoryview(buf)
while True:
size = fileobj.readinto(buf)
if size == 0:
break # EOF
digestobj.update(view[:size])
return digestobj.hexdigest()


def hash_bytes_from_memory(pm_binary: pymem.Pymem, _bufsize: int = 2**18) -> str:
"""Hash the bytes of the main module of the given `pymem.Pymem` instance.
In order to ensure that the hash is stable across runs, this only read from sections that are not expected
to change between runs."""
process_handle = pm_binary.process_handle
pid = pm_binary.process_id
if not process_handle or not pid:
raise ValueError("Pymem instance does not have a valid process handle")

main_module = _get_main_module(pm_binary)
if not main_module:
raise OSError(f"Could not find main module for process {pid}")

base_address = main_module.lpBaseOfDll
module_size = main_module.SizeOfImage
if not base_address or not module_size:
raise OSError("Failed to resolve main module base/size")

sections_base, num_sections = _get_sections_info(pm_binary, base_address)
sections = _get_read_only_sections(pm_binary, sections_base, num_sections, module_size)
if not sections:
raise ValueError("No read-only sections found in the main module")
sections.sort(key=lambda s: s[0])

page_size = _get_page_size()
digest = hashlib.sha1()
buffer = (ctypes.c_ubyte * _bufsize)()
bytes_read = ctypes.c_size_t()
for rva, size, name in sections:
start = base_address + rva
end = start + size
address = start

while address < end:
page = MEMORY_BASIC_INFORMATION()
if not VirtualQueryEx(
process_handle,
ctypes.c_void_p(address),
ctypes.byref(page),
ctypes.sizeof(page),
):
address += page_size
continue

region_end = min(end, address + page.RegionSize)
if not _is_hashable_page(page):
address = region_end
continue

current = address
while current < region_end:
to_read = min(_bufsize, region_end - current)
res = _read_bytes_into(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use read_bytes from pymem also

pm_binary,
current,
buffer,
to_read,
bytes_read,
raise_on_err=False,
)
if not res or bytes_read.value == 0:
current = (current + page_size) & ~(page_size - 1)
if current < address:
current = address + page_size
continue

digest.update(memoryview(buffer)[: bytes_read.value])
current += bytes_read.value

address = region_end

return digest.hexdigest()
16 changes: 0 additions & 16 deletions pymhf/core/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import hashlib
import logging
from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor
from ctypes import byref, c_ulong, create_unicode_buffer, windll
from functools import wraps
from io import BufferedReader
from typing import Optional

import psutil
Expand Down Expand Up @@ -124,20 +122,6 @@ def does_pid_have_focus(pid: int) -> bool:
return pid == get_foreground_pid()


def hash_bytes(fileobj: BufferedReader, _bufsize: int = 2**18) -> str:
# Essentially implement hashlib.file_digest since it's python 3.11+
# cf. https://github.com/python/cpython/blob/main/Lib/hashlib.py#L195
digestobj = hashlib.sha1()
buf = bytearray(_bufsize) # Reusable buffer to reduce allocations.
view = memoryview(buf)
while True:
size = fileobj.readinto(buf)
if size == 0:
break # EOF
digestobj.update(view[:size])
return digestobj.hexdigest()


# TODO: Do something about this...
# class AutosavingConfig(ConfigParser):
# def __init__(self, *args, **kwargs):
Expand Down
10 changes: 7 additions & 3 deletions pymhf/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from pymhf.core.log_handling import open_log_console
from pymhf.core.process import start_process
from pymhf.core.protocols import ESCAPE_SEQUENCE, TerminalProtocol
from pymhf.core.utils import hash_bytes
from pymhf.core.hashing import hash_bytes_from_file, hash_bytes_from_memory
from pymhf.utils.config import canonicalize_setting
from pymhf.utils.parse_toml import read_pymhf_settings
from pymhf.utils.winapi import get_exe_path_from_pid
Expand Down Expand Up @@ -319,8 +319,12 @@ def kill_injected_code(loop: asyncio.AbstractEventLoop):
# Have a small nap just to give it some time.
time.sleep(0.5)
if binary_path:
with open(binary_path, "rb") as f:
binary_hash = hash_bytes(f)
try:
with open(binary_path, "rb") as f:
binary_hash = hash_bytes_from_file(f)
except PermissionError:
print(f"Cannot open {binary_path!r} to hash it. Trying to read from memory...")
binary_hash = hash_bytes_from_memory(pm_binary)
print(f"Exe hash is: {binary_hash}")
else:
binary_hash = 0
Expand Down
Loading
Loading