From abca79831e3780736c25b6f0c06132de08bc2767 Mon Sep 17 00:00:00 2001 From: SolitudePy <47316655+SolitudePy@users.noreply.github.com> Date: Sun, 25 May 2025 21:31:36 +0300 Subject: [PATCH 1/9] inlinehooks plugin --- .../framework/plugins/windows/inlinehooks.py | 400 ++++++++++++++++++ 1 file changed, 400 insertions(+) create mode 100644 volatility3/framework/plugins/windows/inlinehooks.py diff --git a/volatility3/framework/plugins/windows/inlinehooks.py b/volatility3/framework/plugins/windows/inlinehooks.py new file mode 100644 index 0000000000..226a238e03 --- /dev/null +++ b/volatility3/framework/plugins/windows/inlinehooks.py @@ -0,0 +1,400 @@ +import logging +from typing import List, Tuple, Iterable, Optional +from enum import Enum +import time + +from volatility3.framework import interfaces, renderers, exceptions +from volatility3.framework.configuration import requirements +from volatility3.framework.renderers import format_hints +from volatility3.plugins.windows import pslist, pe_symbols, modules + +vollog = logging.getLogger(__name__) + + +class HookType(Enum): + """Enum defining different types of API hooks""" + + INLINE = "Inline Hook" + IAT = "Import Address Table Hook" + EAT = "Export Address Table Hook" + + +# https://github.com/SolitudePy/Inline-Hooks +class InlineHooks(interfaces.plugins.PluginInterface): + """Detect various types of Inline hooks in process memory""" + + _version = (1, 0, 0) + _required_framework_version = (2, 0, 0) + + @classmethod + def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: + return [ + requirements.ModuleRequirement( + name="kernel", + description="Windows kernel", + architectures=["Intel32", "Intel64"], + ), + requirements.VersionRequirement( + name="pslist", component=pslist.PsList, version=(3, 0, 0) + ), + requirements.VersionRequirement( + name="modules", + component=modules.Modules, + version=(3, 0, 0), # Updated from 2.0.0 to 3.0.0 + ), + requirements.ListRequirement( + name="pid", + description="Filter on specific process IDs", + element_type=int, + optional=True, + ), + ] + + def _count_until_padding(self, data: bytes) -> int: + # Check for padding sequences + for i in range(len(data)): + # Double int3 (CC CC) + if i + 1 < len(data) and data[i] == 0xCC and data[i + 1] == 0xCC: + return i + # 11-byte NOP (66 66 66 0f 1f 84 00 00 00 00 00) + if ( + i + 11 <= len(data) + and data[i : i + 11] == b"\x66\x66\x66\x0f\x1f\x84\x00\x00\x00\x00\x00" + ): + return i + # 2-byte NOP (66 66) + if i + 2 <= len(data) and data[i : i + 2] == b"\x66\x66": + return i + return len(data) + + def check_inline_hook( + self, data: bytes, addr: int = 0 + ) -> Optional[Tuple[bytes, str]]: + """Check for inline API hooks by analyzing instructions""" + + # Constants for minimum function sizes + # This is important to avoid false positives on stub functions + MIN_FUNC_SIZE_FOR_JMP = 2 + MIN_FUNC_SIZE_FOR_RET = 2 + MIN_FUNC_SIZE_FOR_CALL = 2 + + if len(data) < 1: + return None + + try: + import capstone + except ImportError: + vollog.warning( + "Capstone not available - inline hook detection will be limited" + ) + return None + + data = data[: self._count_until_padding(data)] + + try: + # Set up disassembler + md = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_64) + md.detail = True + + disasm = list(md.disasm(data, addr)) + func_insn_count = len(disasm) + + if func_insn_count == 0: + return None + + # Check for NOP sled + if ( + func_insn_count >= 3 + and disasm[0].mnemonic == "nop" + and disasm[1].mnemonic == "nop" + and disasm[2].mnemonic == "nop" + ): + return (data, "NOP sled") + + # Check for Early RET + if func_insn_count >= MIN_FUNC_SIZE_FOR_RET: + if ( + (disasm[0].id == capstone.x86.X86_INS_RET) + or ( + func_insn_count >= 2 + and disasm[0].mnemonic == "push" + and disasm[1].id == capstone.x86.X86_INS_RET + ) + or ( + func_insn_count >= 2 + and disasm[0].mnemonic == "call" + and disasm[1].id == capstone.x86.X86_INS_RET + ) + or ( + func_insn_count >= 2 + and disasm[0].mnemonic == "jmp" + and disasm[1].id == capstone.x86.X86_INS_RET + ) + ): + return (data, "Early RET") + + # Check for JMP relative hooks + if disasm[0].bytes[0] == 0xE9 and func_insn_count >= MIN_FUNC_SIZE_FOR_JMP: + return (data, "JMP relative") + + # Check for Early CALL hooks + if ( + func_insn_count >= MIN_FUNC_SIZE_FOR_CALL + and disasm[0].mnemonic == "call" + ): + return (data, "Early CALL") + + except Exception as e: + vollog.debug(f"Error during disassembly at {addr:#x}: {e}") + + return None + + """def _check_iat_hooks_for_process(self, proc) -> List[Tuple[str, int, str]]: + #Check for Import Address Table hooks for an entire process using the existing IAT plugin + hooked_imports = [] + + try: + # Create IAT plugin instance to reuse its functionality + iat_plugin = iat.IAT(self.context, self.config_path) + + # Get IAT entries for this specific process + iat_entries = list(iat_plugin._generator([proc])) + # print(f"Found {len(iat_entries)} IAT entries for process {proc.UniqueProcessId}") # Commented out debug print + + # Get process layer for module bounds checking + proc_layer_name = proc.add_process_layer() + proc_layer = self.context.layers[proc_layer_name] + + # Build a map of loaded modules and their bounds + module_bounds = {} + for mod in proc.load_order_modules(): + try: + module_name = mod.BaseDllName.get_string().lower() + module_bounds[module_name] = (mod.DllBase, mod.DllBase + mod.SizeOfImage) + except: + continue + + # Check each IAT entry for suspicious redirections + for _, (pid, proc_name, dll_name, bound, function_name, function_address) in iat_entries: + try: + if function_address: + addr = int(function_address) if hasattr(function_address, '__int__') else function_address + + # Check if this address is within the expected module bounds + is_within_bounds = False + + # Check against all loaded modules (not just the declaring DLL) + for mod_name, (mod_start, mod_end) in module_bounds.items(): + if mod_start <= addr < mod_end: + is_within_bounds = True + break + + if not is_within_bounds: + hooked_imports.append(( + f"{dll_name}::{function_name}", + addr, + "IAT entry points outside any loaded module (hooked)" + )) + + except (exceptions.InvalidAddressException, ValueError, TypeError): + continue + + except Exception as e: + vollog.debug(f"Error in IAT hook detection: {e}") + + return hooked_imports""" + + """def _check_eat_hook(self, proc_layer, module, export_dir) -> List[Tuple[str, int, str]]: + #Check for Export Address Table hooks + hooked_exports = [] + + try: + module_start = module.DllBase + module_end = module.DllBase + module.SizeOfImage + + # Walk the EAT + for export in export_dir.entries(): + try: + expected_rva = export.rva + # Read pointer value properly using struct.unpack + pointer_size = proc_layer.context.symbol_space.get_type("pointer").size + pointer_data = proc_layer.read(module.DllBase + expected_rva, pointer_size) + actual_addr = struct.unpack(" Iterable[Tuple[HookType, str, int, str, str, bytes]]: + """Scan modules in a process for different types of API hooks""" + try: + proc_layer_name = proc.add_process_layer() + proc_layer = self.context.layers[proc_layer_name] + except Exception as e: + vollog.debug( + f"Error getting process layer for PID {proc.UniqueProcessId}: {e}" + ) + return + + # Scan each loaded module + for mod in proc.load_order_modules(): + try: + module_name = mod.BaseDllName.get_string() + except exceptions.InvalidAddressException: + vollog.debug(f"Invalid module name for {mod.DllBase:#x}") + continue + + # Get exported symbols + symbol_table_name = mod.get_symbol_table_name() + + if not symbol_table_name: + continue + + try: + # Get symbol finders for this module + export_finders = list( + pe_symbols.PESymbols._find_symbols_through_exports( + self.context, + self.config_path, + [(proc_layer_name, mod.DllBase, mod.SizeOfImage)], + module_name.lower(), + ) + ) + + if not export_finders: + continue + + # Check exports across all export finders + for finder in export_finders: + for export in finder._symbol_module: + try: + export_name = finder._get_name(export) + if not export_name: + continue + + export_addr = finder._module_start + export.address + if not export_addr: + continue + + try: + vollog.debug( + f"Analyzing {module_name}::{export_name} at {export_addr:#x}" + ) + + # Read data for analysis + data = proc_layer.read(export_addr, 24) + + disasm = renderers.Disassembly(data, export_addr) + + inline_hook_check = self.check_inline_hook( + data=data, addr=export_addr + ) + if inline_hook_check: + vollog.info( + f"Inline hook detected in {module_name}::{export_name} at {export_addr:#x}: {inline_hook_check[1]}" + ) + yield ( + HookType.INLINE, + module_name, + export_addr, + export_name, + inline_hook_check[1], + inline_hook_check[0], + disasm, + ) + + except exceptions.InvalidAddressException: + vollog.debug( + f"Invalid address for {module_name}::{export_name} at {export_addr:#x}" + ) + continue + + except Exception as e: + vollog.debug( + f"Error processing export {export_name if 'export_name' in locals() else 'unknown'}: {e}" + ) + continue + + except Exception as e: + vollog.debug(f"Error processing module {module_name}: {e}") + continue + + def _generator(self): + start = time.time() + filter_func = pslist.PsList.create_pid_filter(self.config.get("pid", None)) + + hook_count = 0 + + for proc in pslist.PsList.list_processes( + context=self.context, + kernel_module_name=self.config["kernel"], + filter_func=filter_func, + ): + proc_id = proc.UniqueProcessId + proc_name = proc.ImageFileName.cast( + "string", max_length=proc.ImageFileName.vol.count, errors="replace" + ) + + # Check for all hook types + for ( + hook_type, + module_name, + hook_addr, + function_name, + hook_info, + hook_data, + disasm, + ) in self._scan_process_modules(proc): + hook_count += 1 + yield ( + 0, + ( + proc_id, + proc_name, + hook_type.value, + module_name, + format_hints.Hex(hook_addr), + function_name, + hook_info, + ( + format_hints.HexBytes(hook_data) + if hook_data + else format_hints.HexBytes(b"") + ), + disasm, + ), + ) + + end = time.time() + vollog.info( + f"InlineHooks plugin completed in {end - start:.2f} seconds, found {hook_count} hooks" + ) + + def run(self): + return renderers.TreeGrid( + [ + ("PID", int), + ("Process", str), + ("Hook Type", str), + ("Module", str), + ("Hook Address", format_hints.Hex), + ("Function", str), + ("Hook Info", str), + ("Hook Hexdump", format_hints.HexBytes), + ("Disasm", renderers.Disassembly), + ], + self._generator(), + ) From 2d7830d2888b0936cb75ec99617871de61a8b2f9 Mon Sep 17 00:00:00 2001 From: SolitudePy <47316655+SolitudePy@users.noreply.github.com> Date: Sun, 25 May 2025 21:46:12 +0300 Subject: [PATCH 2/9] pe_symbols req --- volatility3/framework/plugins/windows/inlinehooks.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/volatility3/framework/plugins/windows/inlinehooks.py b/volatility3/framework/plugins/windows/inlinehooks.py index 226a238e03..ffddc1c228 100644 --- a/volatility3/framework/plugins/windows/inlinehooks.py +++ b/volatility3/framework/plugins/windows/inlinehooks.py @@ -42,6 +42,9 @@ def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface] component=modules.Modules, version=(3, 0, 0), # Updated from 2.0.0 to 3.0.0 ), + requirements.VersionRequirement( + name="pe_symbols", component=pe_symbols.PESymbols, version=(3, 0, 0) + ), requirements.ListRequirement( name="pid", description="Filter on specific process IDs", From 4ff0a4274e550af1ab5e6c2c02c4aa07f97e4553 Mon Sep 17 00:00:00 2001 From: SolitudePy <47316655+SolitudePy@users.noreply.github.com> Date: Sun, 25 May 2025 21:50:12 +0300 Subject: [PATCH 3/9] ruff hates comments --- .../framework/plugins/windows/inlinehooks.py | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/volatility3/framework/plugins/windows/inlinehooks.py b/volatility3/framework/plugins/windows/inlinehooks.py index ffddc1c228..48318338fb 100644 --- a/volatility3/framework/plugins/windows/inlinehooks.py +++ b/volatility3/framework/plugins/windows/inlinehooks.py @@ -159,15 +159,15 @@ def check_inline_hook( try: # Create IAT plugin instance to reuse its functionality iat_plugin = iat.IAT(self.context, self.config_path) - - # Get IAT entries for this specific process + + # Get IAT entries for this specific process iat_entries = list(iat_plugin._generator([proc])) # print(f"Found {len(iat_entries)} IAT entries for process {proc.UniqueProcessId}") # Commented out debug print - + # Get process layer for module bounds checking proc_layer_name = proc.add_process_layer() proc_layer = self.context.layers[proc_layer_name] - + # Build a map of loaded modules and their bounds module_bounds = {} for mod in proc.load_order_modules(): @@ -176,32 +176,32 @@ def check_inline_hook( module_bounds[module_name] = (mod.DllBase, mod.DllBase + mod.SizeOfImage) except: continue - + # Check each IAT entry for suspicious redirections for _, (pid, proc_name, dll_name, bound, function_name, function_address) in iat_entries: try: if function_address: addr = int(function_address) if hasattr(function_address, '__int__') else function_address - + # Check if this address is within the expected module bounds is_within_bounds = False - + # Check against all loaded modules (not just the declaring DLL) for mod_name, (mod_start, mod_end) in module_bounds.items(): if mod_start <= addr < mod_end: is_within_bounds = True break - + if not is_within_bounds: hooked_imports.append(( f"{dll_name}::{function_name}", addr, "IAT entry points outside any loaded module (hooked)" )) - + except (exceptions.InvalidAddressException, ValueError, TypeError): continue - + except Exception as e: vollog.debug(f"Error in IAT hook detection: {e}") @@ -214,7 +214,7 @@ def check_inline_hook( try: module_start = module.DllBase module_end = module.DllBase + module.SizeOfImage - + # Walk the EAT for export in export_dir.entries(): try: @@ -233,7 +233,7 @@ def check_inline_hook( )) except exceptions.InvalidAddressException: continue - + except exceptions.InvalidAddressException: pass From bb61515619a0ec13b9dba3820acab625a1c8bf41 Mon Sep 17 00:00:00 2001 From: SolitudePy <47316655+SolitudePy@users.noreply.github.com> Date: Sun, 25 May 2025 22:26:38 +0300 Subject: [PATCH 4/9] VSL comment --- volatility3/framework/plugins/windows/inlinehooks.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/volatility3/framework/plugins/windows/inlinehooks.py b/volatility3/framework/plugins/windows/inlinehooks.py index 48318338fb..6e619eebcd 100644 --- a/volatility3/framework/plugins/windows/inlinehooks.py +++ b/volatility3/framework/plugins/windows/inlinehooks.py @@ -1,3 +1,6 @@ +# This file is Copyright 2025 Volatility Foundation and licensed under the Volatility Software License 1.0 +# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0 +# import logging from typing import List, Tuple, Iterable, Optional from enum import Enum From bd01702e002213364b4f0e9e343e09782b43f786 Mon Sep 17 00:00:00 2001 From: SolitudePy <47316655+SolitudePy@users.noreply.github.com> Date: Wed, 28 May 2025 20:36:37 +0300 Subject: [PATCH 5/9] call/jmp to immediate value in reg --- .../framework/plugins/windows/inlinehooks.py | 42 +++++++++++++------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/volatility3/framework/plugins/windows/inlinehooks.py b/volatility3/framework/plugins/windows/inlinehooks.py index 6e619eebcd..efca67b6aa 100644 --- a/volatility3/framework/plugins/windows/inlinehooks.py +++ b/volatility3/framework/plugins/windows/inlinehooks.py @@ -43,14 +43,14 @@ def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface] requirements.VersionRequirement( name="modules", component=modules.Modules, - version=(3, 0, 0), # Updated from 2.0.0 to 3.0.0 + version=(3, 0, 0), ), requirements.VersionRequirement( name="pe_symbols", component=pe_symbols.PESymbols, version=(3, 0, 0) ), requirements.ListRequirement( name="pid", - description="Filter on specific process IDs", + description="Process IDs to include (all other processes are excluded)", element_type=int, optional=True, ), @@ -59,15 +59,18 @@ def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface] def _count_until_padding(self, data: bytes) -> int: # Check for padding sequences for i in range(len(data)): + # Double int3 (CC CC) if i + 1 < len(data) and data[i] == 0xCC and data[i + 1] == 0xCC: return i + # 11-byte NOP (66 66 66 0f 1f 84 00 00 00 00 00) if ( i + 11 <= len(data) and data[i : i + 11] == b"\x66\x66\x66\x0f\x1f\x84\x00\x00\x00\x00\x00" ): return i + # 2-byte NOP (66 66) if i + 2 <= len(data) and data[i : i + 2] == b"\x66\x66": return i @@ -139,16 +142,31 @@ def check_inline_hook( ): return (data, "Early RET") - # Check for JMP relative hooks - if disasm[0].bytes[0] == 0xE9 and func_insn_count >= MIN_FUNC_SIZE_FOR_JMP: - return (data, "JMP relative") - - # Check for Early CALL hooks - if ( - func_insn_count >= MIN_FUNC_SIZE_FOR_CALL - and disasm[0].mnemonic == "call" - ): - return (data, "Early CALL") + # Check for JMP relative/Register JMP hooks + if func_insn_count >= MIN_FUNC_SIZE_FOR_JMP: + if (disasm[0].bytes[0] == 0xE9) or ( + func_insn_count >= 2 + and disasm[0].mnemonic == "mov" + and disasm[0].operands[0].type == capstone.x86.X86_OP_REG + and disasm[0].operands[1].type == capstone.x86.X86_OP_IMM + and disasm[1].mnemonic == "jmp" + and disasm[1].operands[0].type == capstone.x86.X86_OP_REG + and disasm[1].operands[0].reg == disasm[0].operands[0].reg + ): + return (data, "Early JMP") + + # Check for Early CALL/Register CALL hooks + if func_insn_count >= MIN_FUNC_SIZE_FOR_CALL: + if (disasm[0].mnemonic == "call") or ( + func_insn_count >= 2 + and disasm[0].mnemonic == "mov" + and disasm[0].operands[0].type == capstone.x86.X86_OP_REG + and disasm[0].operands[1].type == capstone.x86.X86_OP_IMM + and disasm[1].mnemonic == "call" + and disasm[1].operands[0].type == capstone.x86.X86_OP_REG + and disasm[1].operands[0].reg == disasm[0].operands[0].reg + ): + return (data, "Early CALL") except Exception as e: vollog.debug(f"Error during disassembly at {addr:#x}: {e}") From ad85d2eee26e2849e38cc653afdfa278e95e45c0 Mon Sep 17 00:00:00 2001 From: SolitudePy <47316655+SolitudePy@users.noreply.github.com> Date: Fri, 30 May 2025 20:55:41 +0300 Subject: [PATCH 6/9] added xor,ret patch technique to inlinehooks --- volatility3/framework/plugins/windows/inlinehooks.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/volatility3/framework/plugins/windows/inlinehooks.py b/volatility3/framework/plugins/windows/inlinehooks.py index efca67b6aa..b4a3a404f9 100644 --- a/volatility3/framework/plugins/windows/inlinehooks.py +++ b/volatility3/framework/plugins/windows/inlinehooks.py @@ -85,6 +85,7 @@ def check_inline_hook( # This is important to avoid false positives on stub functions MIN_FUNC_SIZE_FOR_JMP = 2 MIN_FUNC_SIZE_FOR_RET = 2 + MIN_FUNC_SIZE_FOR_XOR_RET = 3 MIN_FUNC_SIZE_FOR_CALL = 2 if len(data) < 1: @@ -139,6 +140,13 @@ def check_inline_hook( and disasm[0].mnemonic == "jmp" and disasm[1].id == capstone.x86.X86_INS_RET ) + or ( + func_insn_count >= MIN_FUNC_SIZE_FOR_XOR_RET + and disasm[0].mnemonic == "xor" + and disasm[0].operands[0].type == capstone.x86.X86_OP_REG + and disasm[0].operands[1].type == capstone.x86.X86_OP_REG + and disasm[1].id == capstone.x86.X86_INS_RET + ) ): return (data, "Early RET") From 2369950a59f9fb7f05068a856710aa481e55ae9c Mon Sep 17 00:00:00 2001 From: SolitudePy <47316655+SolitudePy@users.noreply.github.com> Date: Fri, 30 May 2025 20:56:06 +0300 Subject: [PATCH 7/9] reimplemented as avpatch (check_inline_hook + amsi) --- .../windows/{etwpatch.py => avpatch.py} | 69 ++++++++++++------- 1 file changed, 43 insertions(+), 26 deletions(-) rename volatility3/framework/plugins/windows/{etwpatch.py => avpatch.py} (62%) diff --git a/volatility3/framework/plugins/windows/etwpatch.py b/volatility3/framework/plugins/windows/avpatch.py similarity index 62% rename from volatility3/framework/plugins/windows/etwpatch.py rename to volatility3/framework/plugins/windows/avpatch.py index 476d0eea72..ee66f6d524 100644 --- a/volatility3/framework/plugins/windows/etwpatch.py +++ b/volatility3/framework/plugins/windows/avpatch.py @@ -8,25 +8,22 @@ from volatility3.framework.objects import utility from volatility3.framework.renderers import format_hints from volatility3.plugins.windows import pslist, pe_symbols +from volatility3.plugins.windows import inlinehooks vollog = logging.getLogger(__name__) # EtwpEventWriteFull -> https://github.com/SolitudePy/Stealthy-ETW-Patch -# CAPA rule -> https://github.com/mandiant/capa-rules/blob/master/anti-analysis/anti-av/patch-event-tracing-for-windows-function.yml -class EtwPatch(interfaces.plugins.PluginInterface): - """Identifies ETW (Event Tracing for Windows) patching techniques used by malware to evade detection. - - This plugin examines the first opcode of key ETW functions in ntdll.dll and advapi32.dll - to detect common ETW bypass techniques such as return pointer manipulation (RET) or function - redirection (JMP). Attackers often patch these functions to prevent security tools from - receiving telemetry about process execution, API calls, and other system events. - """ +# ETW CAPA rule -> https://github.com/mandiant/capa-rules/blob/master/anti-analysis/anti-av/patch-event-tracing-for-windows-function.yml +# AMSI CAPA rule -> https://github.com/mandiant/capa-rules/blob/master/anti-analysis/anti-av/patch-antimalware-scan-interface-function.yml +# AMSI patch -> https://github.com/okankurtuluss/AMSIBypassPatch +class AvPatch(interfaces.plugins.PluginInterface): + """Detects ETW & AMSI in-memory patching used by malware for defense evasion.""" _version = (1, 0, 0) _required_framework_version = (2, 26, 0) - etw_functions = { + av_functions = { "ntdll.dll": { pe_symbols.wanted_names_identifier: [ "EtwEventWrite", @@ -41,6 +38,14 @@ class EtwPatch(interfaces.plugins.PluginInterface): "advapi32.dll": { pe_symbols.wanted_names_identifier: ["EventWrite", "TraceEvent"], }, + "amsi.dll": { + pe_symbols.wanted_names_identifier: [ + "AmsiScanBuffer", + "AmsiScanString", + "AmsiInitialize", + "AmsiOpenSession", + ], + }, } @classmethod @@ -57,6 +62,9 @@ def get_requirements(cls): requirements.VersionRequirement( name="pe_symbols", component=pe_symbols.PESymbols, version=(3, 0, 0) ), + requirements.VersionRequirement( + name="inlinehooks", component=inlinehooks.InlineHooks, version=(1, 0, 0) + ), requirements.ListRequirement( name="pid", description="Filter on specific process IDs", @@ -66,16 +74,18 @@ def get_requirements(cls): ] def _generator(self): - # Get all ETW function addresses before looping through processes + # Get all ETW & AMSI function addresses before looping through processes found_symbols = pe_symbols.PESymbols.addresses_for_process_symbols( context=self.context, config_path=self.config_path, kernel_module_name=self.config["kernel"], - symbols=self.etw_functions, + symbols=self.av_functions, ) filter_func = pslist.PsList.create_pid_filter(self.config.get("pid", None)) + inlineHooks = inlinehooks.InlineHooks(self.context, self.config_path) + for proc in pslist.PsList.list_processes( context=self.context, kernel_module_name=self.config["kernel"], @@ -89,20 +99,19 @@ def _generator(self): vollog.debug(f"Unable to create process layer for PID {proc_id}") continue - # Map of opcodes to their instruction names - opcode_map = { - 0xC3: "RET", - 0xE9: "JMP", - } - for dll_name, functions in found_symbols.items(): for func_name, func_addr in functions: try: - opcode = self.context.layers[proc_layer_name].read( - func_addr, 1 - )[0] - if opcode in opcode_map: - instruction = opcode_map[opcode] + data = self.context.layers[proc_layer_name].read(func_addr, 24) + disasm = renderers.Disassembly(data, func_addr) + inline_hook_check = inlineHooks.check_inline_hook( + data=data, addr=func_addr + ) + + if inline_hook_check: + vollog.debug( + f"Inline hook detected at {func_addr:#x} in process {proc_id} ({proc_name}) for function {func_name}" + ) yield ( 0, ( @@ -111,7 +120,13 @@ def _generator(self): dll_name, func_name, format_hints.Hex(func_addr), - f"{opcode:02x} ({instruction})", + inline_hook_check[1], + ( + format_hints.HexBytes(inline_hook_check[0]) + if inline_hook_check[0] + else format_hints.HexBytes(b"") + ), + disasm, ), ) except exceptions.InvalidAddressException: @@ -126,8 +141,10 @@ def run(self): ("Process", str), ("DLL", str), ("Function", str), - ("Offset", format_hints.Hex), - ("Opcode", str), + ("Hook Address", format_hints.Hex), + ("Hook Info", str), + ("Hook Hexdump", format_hints.HexBytes), + ("Disasm", renderers.Disassembly), ], self._generator(), ) From b2f136214a4a6aa9e158b5f0e800c720db78ce95 Mon Sep 17 00:00:00 2001 From: SolitudePy <47316655+SolitudePy@users.noreply.github.com> Date: Sat, 7 Jun 2025 13:46:07 +0300 Subject: [PATCH 8/9] categorize as malware plugins --- .../framework/plugins/windows/avpatch.py | 150 ------ .../framework/plugins/windows/inlinehooks.py | 432 ------------------ 2 files changed, 582 deletions(-) delete mode 100644 volatility3/framework/plugins/windows/avpatch.py delete mode 100644 volatility3/framework/plugins/windows/inlinehooks.py diff --git a/volatility3/framework/plugins/windows/avpatch.py b/volatility3/framework/plugins/windows/avpatch.py deleted file mode 100644 index ee66f6d524..0000000000 --- a/volatility3/framework/plugins/windows/avpatch.py +++ /dev/null @@ -1,150 +0,0 @@ -# This file is Copyright 2025 Volatility Foundation and licensed under the Volatility Software License 1.0 -# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0 -# -import logging - -from volatility3.framework import exceptions, interfaces, renderers -from volatility3.framework.configuration import requirements -from volatility3.framework.objects import utility -from volatility3.framework.renderers import format_hints -from volatility3.plugins.windows import pslist, pe_symbols -from volatility3.plugins.windows import inlinehooks - -vollog = logging.getLogger(__name__) - - -# EtwpEventWriteFull -> https://github.com/SolitudePy/Stealthy-ETW-Patch -# ETW CAPA rule -> https://github.com/mandiant/capa-rules/blob/master/anti-analysis/anti-av/patch-event-tracing-for-windows-function.yml -# AMSI CAPA rule -> https://github.com/mandiant/capa-rules/blob/master/anti-analysis/anti-av/patch-antimalware-scan-interface-function.yml -# AMSI patch -> https://github.com/okankurtuluss/AMSIBypassPatch -class AvPatch(interfaces.plugins.PluginInterface): - """Detects ETW & AMSI in-memory patching used by malware for defense evasion.""" - - _version = (1, 0, 0) - _required_framework_version = (2, 26, 0) - - av_functions = { - "ntdll.dll": { - pe_symbols.wanted_names_identifier: [ - "EtwEventWrite", - "EtwEventWriteFull", - "NtTraceEvent", - "ZwTraceEvent", - "NtTraceControl", - "ZwTraceControl", - "EtwpEventWriteFull", - ], - }, - "advapi32.dll": { - pe_symbols.wanted_names_identifier: ["EventWrite", "TraceEvent"], - }, - "amsi.dll": { - pe_symbols.wanted_names_identifier: [ - "AmsiScanBuffer", - "AmsiScanString", - "AmsiInitialize", - "AmsiOpenSession", - ], - }, - } - - @classmethod - def get_requirements(cls): - return [ - requirements.ModuleRequirement( - name="kernel", - description="Windows kernel", - architectures=["Intel32", "Intel64"], - ), - requirements.VersionRequirement( - name="pslist", component=pslist.PsList, version=(3, 0, 0) - ), - requirements.VersionRequirement( - name="pe_symbols", component=pe_symbols.PESymbols, version=(3, 0, 0) - ), - requirements.VersionRequirement( - name="inlinehooks", component=inlinehooks.InlineHooks, version=(1, 0, 0) - ), - requirements.ListRequirement( - name="pid", - description="Filter on specific process IDs", - element_type=int, - optional=True, - ), - ] - - def _generator(self): - # Get all ETW & AMSI function addresses before looping through processes - found_symbols = pe_symbols.PESymbols.addresses_for_process_symbols( - context=self.context, - config_path=self.config_path, - kernel_module_name=self.config["kernel"], - symbols=self.av_functions, - ) - - filter_func = pslist.PsList.create_pid_filter(self.config.get("pid", None)) - - inlineHooks = inlinehooks.InlineHooks(self.context, self.config_path) - - for proc in pslist.PsList.list_processes( - context=self.context, - kernel_module_name=self.config["kernel"], - filter_func=filter_func, - ): - try: - proc_id = proc.UniqueProcessId - proc_name = utility.array_to_string(proc.ImageFileName) - proc_layer_name = proc.add_process_layer() - except exceptions.InvalidAddressException: - vollog.debug(f"Unable to create process layer for PID {proc_id}") - continue - - for dll_name, functions in found_symbols.items(): - for func_name, func_addr in functions: - try: - data = self.context.layers[proc_layer_name].read(func_addr, 24) - disasm = renderers.Disassembly(data, func_addr) - inline_hook_check = inlineHooks.check_inline_hook( - data=data, addr=func_addr - ) - - if inline_hook_check: - vollog.debug( - f"Inline hook detected at {func_addr:#x} in process {proc_id} ({proc_name}) for function {func_name}" - ) - yield ( - 0, - ( - proc_id, - proc_name, - dll_name, - func_name, - format_hints.Hex(func_addr), - inline_hook_check[1], - ( - format_hints.HexBytes(inline_hook_check[0]) - if inline_hook_check[0] - else format_hints.HexBytes(b"") - ), - disasm, - ), - ) - except exceptions.InvalidAddressException: - vollog.debug( - f"Invalid address when reading function {func_name} at {func_addr:#x} in process {proc_id}" - ) - - def run(self): - return renderers.TreeGrid( - [ - ("PID", int), - ("Process", str), - ("DLL", str), - ("Function", str), - ("Hook Address", format_hints.Hex), - ("Hook Info", str), - ("Hook Hexdump", format_hints.HexBytes), - ("Disasm", renderers.Disassembly), - ], - self._generator(), - ) diff --git a/volatility3/framework/plugins/windows/inlinehooks.py b/volatility3/framework/plugins/windows/inlinehooks.py deleted file mode 100644 index b4a3a404f9..0000000000 --- a/volatility3/framework/plugins/windows/inlinehooks.py +++ /dev/null @@ -1,432 +0,0 @@ -# This file is Copyright 2025 Volatility Foundation and licensed under the Volatility Software License 1.0 -# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0 -# -import logging -from typing import List, Tuple, Iterable, Optional -from enum import Enum -import time - -from volatility3.framework import interfaces, renderers, exceptions -from volatility3.framework.configuration import requirements -from volatility3.framework.renderers import format_hints -from volatility3.plugins.windows import pslist, pe_symbols, modules - -vollog = logging.getLogger(__name__) - - -class HookType(Enum): - """Enum defining different types of API hooks""" - - INLINE = "Inline Hook" - IAT = "Import Address Table Hook" - EAT = "Export Address Table Hook" - - -# https://github.com/SolitudePy/Inline-Hooks -class InlineHooks(interfaces.plugins.PluginInterface): - """Detect various types of Inline hooks in process memory""" - - _version = (1, 0, 0) - _required_framework_version = (2, 0, 0) - - @classmethod - def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: - return [ - requirements.ModuleRequirement( - name="kernel", - description="Windows kernel", - architectures=["Intel32", "Intel64"], - ), - requirements.VersionRequirement( - name="pslist", component=pslist.PsList, version=(3, 0, 0) - ), - requirements.VersionRequirement( - name="modules", - component=modules.Modules, - version=(3, 0, 0), - ), - requirements.VersionRequirement( - name="pe_symbols", component=pe_symbols.PESymbols, version=(3, 0, 0) - ), - requirements.ListRequirement( - name="pid", - description="Process IDs to include (all other processes are excluded)", - element_type=int, - optional=True, - ), - ] - - def _count_until_padding(self, data: bytes) -> int: - # Check for padding sequences - for i in range(len(data)): - - # Double int3 (CC CC) - if i + 1 < len(data) and data[i] == 0xCC and data[i + 1] == 0xCC: - return i - - # 11-byte NOP (66 66 66 0f 1f 84 00 00 00 00 00) - if ( - i + 11 <= len(data) - and data[i : i + 11] == b"\x66\x66\x66\x0f\x1f\x84\x00\x00\x00\x00\x00" - ): - return i - - # 2-byte NOP (66 66) - if i + 2 <= len(data) and data[i : i + 2] == b"\x66\x66": - return i - return len(data) - - def check_inline_hook( - self, data: bytes, addr: int = 0 - ) -> Optional[Tuple[bytes, str]]: - """Check for inline API hooks by analyzing instructions""" - - # Constants for minimum function sizes - # This is important to avoid false positives on stub functions - MIN_FUNC_SIZE_FOR_JMP = 2 - MIN_FUNC_SIZE_FOR_RET = 2 - MIN_FUNC_SIZE_FOR_XOR_RET = 3 - MIN_FUNC_SIZE_FOR_CALL = 2 - - if len(data) < 1: - return None - - try: - import capstone - except ImportError: - vollog.warning( - "Capstone not available - inline hook detection will be limited" - ) - return None - - data = data[: self._count_until_padding(data)] - - try: - # Set up disassembler - md = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_64) - md.detail = True - - disasm = list(md.disasm(data, addr)) - func_insn_count = len(disasm) - - if func_insn_count == 0: - return None - - # Check for NOP sled - if ( - func_insn_count >= 3 - and disasm[0].mnemonic == "nop" - and disasm[1].mnemonic == "nop" - and disasm[2].mnemonic == "nop" - ): - return (data, "NOP sled") - - # Check for Early RET - if func_insn_count >= MIN_FUNC_SIZE_FOR_RET: - if ( - (disasm[0].id == capstone.x86.X86_INS_RET) - or ( - func_insn_count >= 2 - and disasm[0].mnemonic == "push" - and disasm[1].id == capstone.x86.X86_INS_RET - ) - or ( - func_insn_count >= 2 - and disasm[0].mnemonic == "call" - and disasm[1].id == capstone.x86.X86_INS_RET - ) - or ( - func_insn_count >= 2 - and disasm[0].mnemonic == "jmp" - and disasm[1].id == capstone.x86.X86_INS_RET - ) - or ( - func_insn_count >= MIN_FUNC_SIZE_FOR_XOR_RET - and disasm[0].mnemonic == "xor" - and disasm[0].operands[0].type == capstone.x86.X86_OP_REG - and disasm[0].operands[1].type == capstone.x86.X86_OP_REG - and disasm[1].id == capstone.x86.X86_INS_RET - ) - ): - return (data, "Early RET") - - # Check for JMP relative/Register JMP hooks - if func_insn_count >= MIN_FUNC_SIZE_FOR_JMP: - if (disasm[0].bytes[0] == 0xE9) or ( - func_insn_count >= 2 - and disasm[0].mnemonic == "mov" - and disasm[0].operands[0].type == capstone.x86.X86_OP_REG - and disasm[0].operands[1].type == capstone.x86.X86_OP_IMM - and disasm[1].mnemonic == "jmp" - and disasm[1].operands[0].type == capstone.x86.X86_OP_REG - and disasm[1].operands[0].reg == disasm[0].operands[0].reg - ): - return (data, "Early JMP") - - # Check for Early CALL/Register CALL hooks - if func_insn_count >= MIN_FUNC_SIZE_FOR_CALL: - if (disasm[0].mnemonic == "call") or ( - func_insn_count >= 2 - and disasm[0].mnemonic == "mov" - and disasm[0].operands[0].type == capstone.x86.X86_OP_REG - and disasm[0].operands[1].type == capstone.x86.X86_OP_IMM - and disasm[1].mnemonic == "call" - and disasm[1].operands[0].type == capstone.x86.X86_OP_REG - and disasm[1].operands[0].reg == disasm[0].operands[0].reg - ): - return (data, "Early CALL") - - except Exception as e: - vollog.debug(f"Error during disassembly at {addr:#x}: {e}") - - return None - - """def _check_iat_hooks_for_process(self, proc) -> List[Tuple[str, int, str]]: - #Check for Import Address Table hooks for an entire process using the existing IAT plugin - hooked_imports = [] - - try: - # Create IAT plugin instance to reuse its functionality - iat_plugin = iat.IAT(self.context, self.config_path) - - # Get IAT entries for this specific process - iat_entries = list(iat_plugin._generator([proc])) - # print(f"Found {len(iat_entries)} IAT entries for process {proc.UniqueProcessId}") # Commented out debug print - - # Get process layer for module bounds checking - proc_layer_name = proc.add_process_layer() - proc_layer = self.context.layers[proc_layer_name] - - # Build a map of loaded modules and their bounds - module_bounds = {} - for mod in proc.load_order_modules(): - try: - module_name = mod.BaseDllName.get_string().lower() - module_bounds[module_name] = (mod.DllBase, mod.DllBase + mod.SizeOfImage) - except: - continue - - # Check each IAT entry for suspicious redirections - for _, (pid, proc_name, dll_name, bound, function_name, function_address) in iat_entries: - try: - if function_address: - addr = int(function_address) if hasattr(function_address, '__int__') else function_address - - # Check if this address is within the expected module bounds - is_within_bounds = False - - # Check against all loaded modules (not just the declaring DLL) - for mod_name, (mod_start, mod_end) in module_bounds.items(): - if mod_start <= addr < mod_end: - is_within_bounds = True - break - - if not is_within_bounds: - hooked_imports.append(( - f"{dll_name}::{function_name}", - addr, - "IAT entry points outside any loaded module (hooked)" - )) - - except (exceptions.InvalidAddressException, ValueError, TypeError): - continue - - except Exception as e: - vollog.debug(f"Error in IAT hook detection: {e}") - - return hooked_imports""" - - """def _check_eat_hook(self, proc_layer, module, export_dir) -> List[Tuple[str, int, str]]: - #Check for Export Address Table hooks - hooked_exports = [] - - try: - module_start = module.DllBase - module_end = module.DllBase + module.SizeOfImage - - # Walk the EAT - for export in export_dir.entries(): - try: - expected_rva = export.rva - # Read pointer value properly using struct.unpack - pointer_size = proc_layer.context.symbol_space.get_type("pointer").size - pointer_data = proc_layer.read(module.DllBase + expected_rva, pointer_size) - actual_addr = struct.unpack(" Iterable[Tuple[HookType, str, int, str, str, bytes]]: - """Scan modules in a process for different types of API hooks""" - try: - proc_layer_name = proc.add_process_layer() - proc_layer = self.context.layers[proc_layer_name] - except Exception as e: - vollog.debug( - f"Error getting process layer for PID {proc.UniqueProcessId}: {e}" - ) - return - - # Scan each loaded module - for mod in proc.load_order_modules(): - try: - module_name = mod.BaseDllName.get_string() - except exceptions.InvalidAddressException: - vollog.debug(f"Invalid module name for {mod.DllBase:#x}") - continue - - # Get exported symbols - symbol_table_name = mod.get_symbol_table_name() - - if not symbol_table_name: - continue - - try: - # Get symbol finders for this module - export_finders = list( - pe_symbols.PESymbols._find_symbols_through_exports( - self.context, - self.config_path, - [(proc_layer_name, mod.DllBase, mod.SizeOfImage)], - module_name.lower(), - ) - ) - - if not export_finders: - continue - - # Check exports across all export finders - for finder in export_finders: - for export in finder._symbol_module: - try: - export_name = finder._get_name(export) - if not export_name: - continue - - export_addr = finder._module_start + export.address - if not export_addr: - continue - - try: - vollog.debug( - f"Analyzing {module_name}::{export_name} at {export_addr:#x}" - ) - - # Read data for analysis - data = proc_layer.read(export_addr, 24) - - disasm = renderers.Disassembly(data, export_addr) - - inline_hook_check = self.check_inline_hook( - data=data, addr=export_addr - ) - if inline_hook_check: - vollog.info( - f"Inline hook detected in {module_name}::{export_name} at {export_addr:#x}: {inline_hook_check[1]}" - ) - yield ( - HookType.INLINE, - module_name, - export_addr, - export_name, - inline_hook_check[1], - inline_hook_check[0], - disasm, - ) - - except exceptions.InvalidAddressException: - vollog.debug( - f"Invalid address for {module_name}::{export_name} at {export_addr:#x}" - ) - continue - - except Exception as e: - vollog.debug( - f"Error processing export {export_name if 'export_name' in locals() else 'unknown'}: {e}" - ) - continue - - except Exception as e: - vollog.debug(f"Error processing module {module_name}: {e}") - continue - - def _generator(self): - start = time.time() - filter_func = pslist.PsList.create_pid_filter(self.config.get("pid", None)) - - hook_count = 0 - - for proc in pslist.PsList.list_processes( - context=self.context, - kernel_module_name=self.config["kernel"], - filter_func=filter_func, - ): - proc_id = proc.UniqueProcessId - proc_name = proc.ImageFileName.cast( - "string", max_length=proc.ImageFileName.vol.count, errors="replace" - ) - - # Check for all hook types - for ( - hook_type, - module_name, - hook_addr, - function_name, - hook_info, - hook_data, - disasm, - ) in self._scan_process_modules(proc): - hook_count += 1 - yield ( - 0, - ( - proc_id, - proc_name, - hook_type.value, - module_name, - format_hints.Hex(hook_addr), - function_name, - hook_info, - ( - format_hints.HexBytes(hook_data) - if hook_data - else format_hints.HexBytes(b"") - ), - disasm, - ), - ) - - end = time.time() - vollog.info( - f"InlineHooks plugin completed in {end - start:.2f} seconds, found {hook_count} hooks" - ) - - def run(self): - return renderers.TreeGrid( - [ - ("PID", int), - ("Process", str), - ("Hook Type", str), - ("Module", str), - ("Hook Address", format_hints.Hex), - ("Function", str), - ("Hook Info", str), - ("Hook Hexdump", format_hints.HexBytes), - ("Disasm", renderers.Disassembly), - ], - self._generator(), - ) From 4cfc6396118db3d49051f96648dbb3c0b8d96d61 Mon Sep 17 00:00:00 2001 From: SolitudePy <47316655+SolitudePy@users.noreply.github.com> Date: Sat, 7 Jun 2025 13:46:32 +0300 Subject: [PATCH 9/9] move to correct dir --- .../plugins/windows/malware/avpatch.py | 150 ++++++ .../plugins/windows/malware/inlinehooks.py | 432 ++++++++++++++++++ 2 files changed, 582 insertions(+) create mode 100644 volatility3/framework/plugins/windows/malware/avpatch.py create mode 100644 volatility3/framework/plugins/windows/malware/inlinehooks.py diff --git a/volatility3/framework/plugins/windows/malware/avpatch.py b/volatility3/framework/plugins/windows/malware/avpatch.py new file mode 100644 index 0000000000..b98a59b8a7 --- /dev/null +++ b/volatility3/framework/plugins/windows/malware/avpatch.py @@ -0,0 +1,150 @@ +# This file is Copyright 2025 Volatility Foundation and licensed under the Volatility Software License 1.0 +# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0 +# +import logging + +from volatility3.framework import exceptions, interfaces, renderers +from volatility3.framework.configuration import requirements +from volatility3.framework.objects import utility +from volatility3.framework.renderers import format_hints +from volatility3.plugins.windows import pslist, pe_symbols +from volatility3.plugins.windows.malware import inlinehooks + +vollog = logging.getLogger(__name__) + + +# EtwpEventWriteFull -> https://github.com/SolitudePy/Stealthy-ETW-Patch +# ETW CAPA rule -> https://github.com/mandiant/capa-rules/blob/master/anti-analysis/anti-av/patch-event-tracing-for-windows-function.yml +# AMSI CAPA rule -> https://github.com/mandiant/capa-rules/blob/master/anti-analysis/anti-av/patch-antimalware-scan-interface-function.yml +# AMSI patch -> https://github.com/okankurtuluss/AMSIBypassPatch +class AvPatch(interfaces.plugins.PluginInterface): + """Detects ETW & AMSI in-memory patching used by malware for defense evasion.""" + + _version = (1, 0, 0) + _required_framework_version = (2, 26, 0) + + av_functions = { + "ntdll.dll": { + pe_symbols.wanted_names_identifier: [ + "EtwEventWrite", + "EtwEventWriteFull", + "NtTraceEvent", + "ZwTraceEvent", + "NtTraceControl", + "ZwTraceControl", + "EtwpEventWriteFull", + ], + }, + "advapi32.dll": { + pe_symbols.wanted_names_identifier: ["EventWrite", "TraceEvent"], + }, + "amsi.dll": { + pe_symbols.wanted_names_identifier: [ + "AmsiScanBuffer", + "AmsiScanString", + "AmsiInitialize", + "AmsiOpenSession", + ], + }, + } + + @classmethod + def get_requirements(cls): + return [ + requirements.ModuleRequirement( + name="kernel", + description="Windows kernel", + architectures=["Intel32", "Intel64"], + ), + requirements.VersionRequirement( + name="pslist", component=pslist.PsList, version=(3, 0, 0) + ), + requirements.VersionRequirement( + name="pe_symbols", component=pe_symbols.PESymbols, version=(3, 0, 0) + ), + requirements.VersionRequirement( + name="inlinehooks", component=inlinehooks.InlineHooks, version=(1, 0, 0) + ), + requirements.ListRequirement( + name="pid", + description="Filter on specific process IDs", + element_type=int, + optional=True, + ), + ] + + def _generator(self): + # Get all ETW & AMSI function addresses before looping through processes + found_symbols = pe_symbols.PESymbols.addresses_for_process_symbols( + context=self.context, + config_path=self.config_path, + kernel_module_name=self.config["kernel"], + symbols=self.av_functions, + ) + + filter_func = pslist.PsList.create_pid_filter(self.config.get("pid", None)) + + inlineHooks = inlinehooks.InlineHooks(self.context, self.config_path) + + for proc in pslist.PsList.list_processes( + context=self.context, + kernel_module_name=self.config["kernel"], + filter_func=filter_func, + ): + try: + proc_id = proc.UniqueProcessId + proc_name = utility.array_to_string(proc.ImageFileName) + proc_layer_name = proc.add_process_layer() + except exceptions.InvalidAddressException: + vollog.debug(f"Unable to create process layer for PID {proc_id}") + continue + + for dll_name, functions in found_symbols.items(): + for func_name, func_addr in functions: + try: + data = self.context.layers[proc_layer_name].read(func_addr, 24) + disasm = renderers.Disassembly(data, func_addr) + inline_hook_check = inlineHooks.check_inline_hook( + data=data, addr=func_addr + ) + + if inline_hook_check: + vollog.debug( + f"Inline hook detected at {func_addr:#x} in process {proc_id} ({proc_name}) for function {func_name}" + ) + yield ( + 0, + ( + proc_id, + proc_name, + dll_name, + func_name, + format_hints.Hex(func_addr), + inline_hook_check[1], + ( + format_hints.HexBytes(inline_hook_check[0]) + if inline_hook_check[0] + else format_hints.HexBytes(b"") + ), + disasm, + ), + ) + except exceptions.InvalidAddressException: + vollog.debug( + f"Invalid address when reading function {func_name} at {func_addr:#x} in process {proc_id}" + ) + + def run(self): + return renderers.TreeGrid( + [ + ("PID", int), + ("Process", str), + ("DLL", str), + ("Function", str), + ("Hook Address", format_hints.Hex), + ("Hook Info", str), + ("Hook Hexdump", format_hints.HexBytes), + ("Disasm", renderers.Disassembly), + ], + self._generator(), + ) diff --git a/volatility3/framework/plugins/windows/malware/inlinehooks.py b/volatility3/framework/plugins/windows/malware/inlinehooks.py new file mode 100644 index 0000000000..b4a3a404f9 --- /dev/null +++ b/volatility3/framework/plugins/windows/malware/inlinehooks.py @@ -0,0 +1,432 @@ +# This file is Copyright 2025 Volatility Foundation and licensed under the Volatility Software License 1.0 +# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0 +# +import logging +from typing import List, Tuple, Iterable, Optional +from enum import Enum +import time + +from volatility3.framework import interfaces, renderers, exceptions +from volatility3.framework.configuration import requirements +from volatility3.framework.renderers import format_hints +from volatility3.plugins.windows import pslist, pe_symbols, modules + +vollog = logging.getLogger(__name__) + + +class HookType(Enum): + """Enum defining different types of API hooks""" + + INLINE = "Inline Hook" + IAT = "Import Address Table Hook" + EAT = "Export Address Table Hook" + + +# https://github.com/SolitudePy/Inline-Hooks +class InlineHooks(interfaces.plugins.PluginInterface): + """Detect various types of Inline hooks in process memory""" + + _version = (1, 0, 0) + _required_framework_version = (2, 0, 0) + + @classmethod + def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: + return [ + requirements.ModuleRequirement( + name="kernel", + description="Windows kernel", + architectures=["Intel32", "Intel64"], + ), + requirements.VersionRequirement( + name="pslist", component=pslist.PsList, version=(3, 0, 0) + ), + requirements.VersionRequirement( + name="modules", + component=modules.Modules, + version=(3, 0, 0), + ), + requirements.VersionRequirement( + name="pe_symbols", component=pe_symbols.PESymbols, version=(3, 0, 0) + ), + requirements.ListRequirement( + name="pid", + description="Process IDs to include (all other processes are excluded)", + element_type=int, + optional=True, + ), + ] + + def _count_until_padding(self, data: bytes) -> int: + # Check for padding sequences + for i in range(len(data)): + + # Double int3 (CC CC) + if i + 1 < len(data) and data[i] == 0xCC and data[i + 1] == 0xCC: + return i + + # 11-byte NOP (66 66 66 0f 1f 84 00 00 00 00 00) + if ( + i + 11 <= len(data) + and data[i : i + 11] == b"\x66\x66\x66\x0f\x1f\x84\x00\x00\x00\x00\x00" + ): + return i + + # 2-byte NOP (66 66) + if i + 2 <= len(data) and data[i : i + 2] == b"\x66\x66": + return i + return len(data) + + def check_inline_hook( + self, data: bytes, addr: int = 0 + ) -> Optional[Tuple[bytes, str]]: + """Check for inline API hooks by analyzing instructions""" + + # Constants for minimum function sizes + # This is important to avoid false positives on stub functions + MIN_FUNC_SIZE_FOR_JMP = 2 + MIN_FUNC_SIZE_FOR_RET = 2 + MIN_FUNC_SIZE_FOR_XOR_RET = 3 + MIN_FUNC_SIZE_FOR_CALL = 2 + + if len(data) < 1: + return None + + try: + import capstone + except ImportError: + vollog.warning( + "Capstone not available - inline hook detection will be limited" + ) + return None + + data = data[: self._count_until_padding(data)] + + try: + # Set up disassembler + md = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_64) + md.detail = True + + disasm = list(md.disasm(data, addr)) + func_insn_count = len(disasm) + + if func_insn_count == 0: + return None + + # Check for NOP sled + if ( + func_insn_count >= 3 + and disasm[0].mnemonic == "nop" + and disasm[1].mnemonic == "nop" + and disasm[2].mnemonic == "nop" + ): + return (data, "NOP sled") + + # Check for Early RET + if func_insn_count >= MIN_FUNC_SIZE_FOR_RET: + if ( + (disasm[0].id == capstone.x86.X86_INS_RET) + or ( + func_insn_count >= 2 + and disasm[0].mnemonic == "push" + and disasm[1].id == capstone.x86.X86_INS_RET + ) + or ( + func_insn_count >= 2 + and disasm[0].mnemonic == "call" + and disasm[1].id == capstone.x86.X86_INS_RET + ) + or ( + func_insn_count >= 2 + and disasm[0].mnemonic == "jmp" + and disasm[1].id == capstone.x86.X86_INS_RET + ) + or ( + func_insn_count >= MIN_FUNC_SIZE_FOR_XOR_RET + and disasm[0].mnemonic == "xor" + and disasm[0].operands[0].type == capstone.x86.X86_OP_REG + and disasm[0].operands[1].type == capstone.x86.X86_OP_REG + and disasm[1].id == capstone.x86.X86_INS_RET + ) + ): + return (data, "Early RET") + + # Check for JMP relative/Register JMP hooks + if func_insn_count >= MIN_FUNC_SIZE_FOR_JMP: + if (disasm[0].bytes[0] == 0xE9) or ( + func_insn_count >= 2 + and disasm[0].mnemonic == "mov" + and disasm[0].operands[0].type == capstone.x86.X86_OP_REG + and disasm[0].operands[1].type == capstone.x86.X86_OP_IMM + and disasm[1].mnemonic == "jmp" + and disasm[1].operands[0].type == capstone.x86.X86_OP_REG + and disasm[1].operands[0].reg == disasm[0].operands[0].reg + ): + return (data, "Early JMP") + + # Check for Early CALL/Register CALL hooks + if func_insn_count >= MIN_FUNC_SIZE_FOR_CALL: + if (disasm[0].mnemonic == "call") or ( + func_insn_count >= 2 + and disasm[0].mnemonic == "mov" + and disasm[0].operands[0].type == capstone.x86.X86_OP_REG + and disasm[0].operands[1].type == capstone.x86.X86_OP_IMM + and disasm[1].mnemonic == "call" + and disasm[1].operands[0].type == capstone.x86.X86_OP_REG + and disasm[1].operands[0].reg == disasm[0].operands[0].reg + ): + return (data, "Early CALL") + + except Exception as e: + vollog.debug(f"Error during disassembly at {addr:#x}: {e}") + + return None + + """def _check_iat_hooks_for_process(self, proc) -> List[Tuple[str, int, str]]: + #Check for Import Address Table hooks for an entire process using the existing IAT plugin + hooked_imports = [] + + try: + # Create IAT plugin instance to reuse its functionality + iat_plugin = iat.IAT(self.context, self.config_path) + + # Get IAT entries for this specific process + iat_entries = list(iat_plugin._generator([proc])) + # print(f"Found {len(iat_entries)} IAT entries for process {proc.UniqueProcessId}") # Commented out debug print + + # Get process layer for module bounds checking + proc_layer_name = proc.add_process_layer() + proc_layer = self.context.layers[proc_layer_name] + + # Build a map of loaded modules and their bounds + module_bounds = {} + for mod in proc.load_order_modules(): + try: + module_name = mod.BaseDllName.get_string().lower() + module_bounds[module_name] = (mod.DllBase, mod.DllBase + mod.SizeOfImage) + except: + continue + + # Check each IAT entry for suspicious redirections + for _, (pid, proc_name, dll_name, bound, function_name, function_address) in iat_entries: + try: + if function_address: + addr = int(function_address) if hasattr(function_address, '__int__') else function_address + + # Check if this address is within the expected module bounds + is_within_bounds = False + + # Check against all loaded modules (not just the declaring DLL) + for mod_name, (mod_start, mod_end) in module_bounds.items(): + if mod_start <= addr < mod_end: + is_within_bounds = True + break + + if not is_within_bounds: + hooked_imports.append(( + f"{dll_name}::{function_name}", + addr, + "IAT entry points outside any loaded module (hooked)" + )) + + except (exceptions.InvalidAddressException, ValueError, TypeError): + continue + + except Exception as e: + vollog.debug(f"Error in IAT hook detection: {e}") + + return hooked_imports""" + + """def _check_eat_hook(self, proc_layer, module, export_dir) -> List[Tuple[str, int, str]]: + #Check for Export Address Table hooks + hooked_exports = [] + + try: + module_start = module.DllBase + module_end = module.DllBase + module.SizeOfImage + + # Walk the EAT + for export in export_dir.entries(): + try: + expected_rva = export.rva + # Read pointer value properly using struct.unpack + pointer_size = proc_layer.context.symbol_space.get_type("pointer").size + pointer_data = proc_layer.read(module.DllBase + expected_rva, pointer_size) + actual_addr = struct.unpack(" Iterable[Tuple[HookType, str, int, str, str, bytes]]: + """Scan modules in a process for different types of API hooks""" + try: + proc_layer_name = proc.add_process_layer() + proc_layer = self.context.layers[proc_layer_name] + except Exception as e: + vollog.debug( + f"Error getting process layer for PID {proc.UniqueProcessId}: {e}" + ) + return + + # Scan each loaded module + for mod in proc.load_order_modules(): + try: + module_name = mod.BaseDllName.get_string() + except exceptions.InvalidAddressException: + vollog.debug(f"Invalid module name for {mod.DllBase:#x}") + continue + + # Get exported symbols + symbol_table_name = mod.get_symbol_table_name() + + if not symbol_table_name: + continue + + try: + # Get symbol finders for this module + export_finders = list( + pe_symbols.PESymbols._find_symbols_through_exports( + self.context, + self.config_path, + [(proc_layer_name, mod.DllBase, mod.SizeOfImage)], + module_name.lower(), + ) + ) + + if not export_finders: + continue + + # Check exports across all export finders + for finder in export_finders: + for export in finder._symbol_module: + try: + export_name = finder._get_name(export) + if not export_name: + continue + + export_addr = finder._module_start + export.address + if not export_addr: + continue + + try: + vollog.debug( + f"Analyzing {module_name}::{export_name} at {export_addr:#x}" + ) + + # Read data for analysis + data = proc_layer.read(export_addr, 24) + + disasm = renderers.Disassembly(data, export_addr) + + inline_hook_check = self.check_inline_hook( + data=data, addr=export_addr + ) + if inline_hook_check: + vollog.info( + f"Inline hook detected in {module_name}::{export_name} at {export_addr:#x}: {inline_hook_check[1]}" + ) + yield ( + HookType.INLINE, + module_name, + export_addr, + export_name, + inline_hook_check[1], + inline_hook_check[0], + disasm, + ) + + except exceptions.InvalidAddressException: + vollog.debug( + f"Invalid address for {module_name}::{export_name} at {export_addr:#x}" + ) + continue + + except Exception as e: + vollog.debug( + f"Error processing export {export_name if 'export_name' in locals() else 'unknown'}: {e}" + ) + continue + + except Exception as e: + vollog.debug(f"Error processing module {module_name}: {e}") + continue + + def _generator(self): + start = time.time() + filter_func = pslist.PsList.create_pid_filter(self.config.get("pid", None)) + + hook_count = 0 + + for proc in pslist.PsList.list_processes( + context=self.context, + kernel_module_name=self.config["kernel"], + filter_func=filter_func, + ): + proc_id = proc.UniqueProcessId + proc_name = proc.ImageFileName.cast( + "string", max_length=proc.ImageFileName.vol.count, errors="replace" + ) + + # Check for all hook types + for ( + hook_type, + module_name, + hook_addr, + function_name, + hook_info, + hook_data, + disasm, + ) in self._scan_process_modules(proc): + hook_count += 1 + yield ( + 0, + ( + proc_id, + proc_name, + hook_type.value, + module_name, + format_hints.Hex(hook_addr), + function_name, + hook_info, + ( + format_hints.HexBytes(hook_data) + if hook_data + else format_hints.HexBytes(b"") + ), + disasm, + ), + ) + + end = time.time() + vollog.info( + f"InlineHooks plugin completed in {end - start:.2f} seconds, found {hook_count} hooks" + ) + + def run(self): + return renderers.TreeGrid( + [ + ("PID", int), + ("Process", str), + ("Hook Type", str), + ("Module", str), + ("Hook Address", format_hints.Hex), + ("Function", str), + ("Hook Info", str), + ("Hook Hexdump", format_hints.HexBytes), + ("Disasm", renderers.Disassembly), + ], + self._generator(), + )