From 1f9d58dc61ef990d3ff3bc1f05e8cde7668cf33f Mon Sep 17 00:00:00 2001 From: Mike Auty Date: Sat, 30 Mar 2024 16:18:50 +0000 Subject: [PATCH 1/2] Plugins: Windows.string speed enhancements by eve --- .../framework/plugins/windows/strings.py | 248 ++++++++++++------ 1 file changed, 173 insertions(+), 75 deletions(-) diff --git a/volatility3/framework/plugins/windows/strings.py b/volatility3/framework/plugins/windows/strings.py index 9ea4ffed08..9281847cbf 100644 --- a/volatility3/framework/plugins/windows/strings.py +++ b/volatility3/framework/plugins/windows/strings.py @@ -2,6 +2,7 @@ # which is available at https://www.volatilityfoundation.org/license/vsl-v1.0 # +from dataclasses import dataclass import logging import re from typing import Dict, Generator, List, Set, Tuple, Optional @@ -15,13 +16,66 @@ vollog = logging.getLogger(__name__) +@dataclass +class MappingNode: + def __init__( + self, + physical_addr_start, + physical_addr_end, + virtual_addr_start, + virtual_addr_end, + process_id, + region, + ) -> None: + self.physical_addr_start = physical_addr_start + self.physical_addr_end = physical_addr_end + self.virtual_addr_start = virtual_addr_start + self.virtual_addr_end = virtual_addr_end + self.process_id = process_id + self.region = region + + +class MappingTree: + def __init__(self, root=None) -> None: + self.root = root + self.left = None + self.right = None + + def add(self, node): + if isinstance(node, MappingNode): + if self.root == None: + self.root = node + elif node.physical_addr_start < self.root.physical_addr_start: + if self.left == None: + self.left = MappingTree(node) + else: + self.left.add(node) + else: + if self.right == None: + self.right = MappingTree(node) + else: + self.right.add(node) + else: + raise TypeError() + + def at(self, point): + if self.root: + if self.root.physical_addr_start <= point <= self.root.physical_addr_end: + yield self.root + if point < self.root.physical_addr_start and self.left: + yield from self.left.at(point) + elif self.right: + yield from self.right.at(point) + + class Strings(interfaces.plugins.PluginInterface): """Reads output from the strings command and indicates which process(es) each string belongs to.""" _required_framework_version = (2, 0, 0) # 2.0.0 - change signature of `generate_mapping` - _version = (2, 0, 0) + # 3.0.0 - Interval mapping + _version = (3, 0, 0) strings_pattern = re.compile(rb"^(?:\W*)([0-9]+)(?:\W*)(\w[\w\W]+)\n?") @@ -46,11 +100,16 @@ def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface] name="strings_file", description="Strings file" ), ] - # TODO: Make URLRequirement that can accept a file address which the framework can open def run(self): return renderers.TreeGrid( - [("String", str), ("Physical Address", format_hints.Hex), ("Result", str)], + [ + ("String", str), + ("Region", str), + ("PID", int), + ("Physical Address", format_hints.Hex), + ("Virtual Address", format_hints.Hex), + ], self._generator(), ) @@ -71,10 +130,12 @@ def _generator(self) -> Generator[Tuple, None, None]: except ValueError: vollog.error(f"Line in unrecognized format: line {count}") line = strings_fp.readline() + kernel = self.context.modules[self.config["kernel"]] - revmap = self.generate_mapping( - context=self.context, - kernel_module_name=self.config["kernel"], + revmap_tree = self.generate_mapping( + self.context, + kernel.layer_name, + kernel.symbol_table_name, progress_callback=self._progress_callback, pid_list=self.config["pid"], ) @@ -82,26 +143,39 @@ def _generator(self) -> Generator[Tuple, None, None]: last_prog: float = 0 line_count: float = 0 num_strings = len(string_list) - for offset, string in string_list: + + for phys_offset, string in string_list: line_count += 1 - try: - revmap_list = [ - name + ":" + hex(offset) for (name, offset) in revmap[offset >> 12] - ] - except (IndexError, KeyError): - revmap_list = ["FREE MEMORY"] - yield ( - 0, - ( - str(string, "latin-1"), - format_hints.Hex(offset), - ", ".join(revmap_list), - ), - ) - prog = line_count / num_strings * 100 - if round(prog, 1) > last_prog: - last_prog = round(prog, 1) - self._progress_callback(prog, "Matching strings in memory") + + matched_region = False + for node in revmap_tree.at(phys_offset): + matched_region = True + + region_offset = phys_offset - node.physical_addr_start + offset = node.virtual_addr_start + region_offset + yield ( + 0, + ( + str(string.strip(), "latin-1"), + node.region, + node.process_id, + format_hints.Hex(phys_offset), + format_hints.Hex(offset), + ), + ) + + if not matched_region: + # no maps found for this offset + yield ( + 0, + ( + str(string.strip(), "latin-1"), + "Unallocated", + -1, + format_hints.Hex(phys_offset), + format_hints.Hex(0x00), + ), + ) def _parse_line(self, line: bytes) -> Tuple[int, bytes]: """Parses a single line from a strings file. @@ -123,7 +197,8 @@ def _parse_line(self, line: bytes) -> Tuple[int, bytes]: def generate_mapping( cls, context: interfaces.context.ContextInterface, - kernel_module_name: str, + layer_name: str, + symbol_table: str, progress_callback: constants.ProgressCallback = None, pid_list: Optional[List[int]] = None, ) -> Dict[int, Set[Tuple[str, int]]]: @@ -132,7 +207,8 @@ def generate_mapping( Args: context: the context for the method to run against - kernel_module_name: the name of the module forthe kernel + layer_name: the name of the windows intel layer to be scanned + symbol_table: the name of the kernel symbol table progress_callback: an optional callable to display progress pid_list: a lit of process IDs to consider when generating the reverse map @@ -140,60 +216,82 @@ def generate_mapping( A mapping of virtual offsets to strings and physical offsets """ filter = pslist.PsList.create_pid_filter(pid_list) + revmap_tree = MappingTree() - kernel = context.modules[kernel_module_name] - - layer = context.layers[kernel.layer_name] - reverse_map: Dict[int, Set[Tuple[str, int]]] = dict() + # start with kernel mappings + layer = context.layers[layer_name] + min_kernel_addr = 2 ** (layer._maxvirtaddr - 1) if isinstance(layer, intel.Intel): # We don't care about errors, we just wanted chunks that map correctly - for mapval in layer.mapping(0x0, layer.maximum_address, ignore_errors=True): - offset, _, mapped_offset, mapped_size, maplayer = mapval - for val in range(mapped_offset, mapped_offset + mapped_size, 0x1000): - cur_set = reverse_map.get(val >> 12, set()) - cur_set.add(("kernel", offset)) - reverse_map[val >> 12] = cur_set + for mapval in layer.mapping( + min_kernel_addr, layer.maximum_address, ignore_errors=True + ): + ( + virt_offset, + virt_size, + phy_offset, + phy_mapping_size, + _phy_layer_name, + ) = mapval + + node = MappingNode( + phy_offset, + phy_offset + phy_mapping_size, + virt_offset, + virt_offset + virt_size, + -1, + "Kernel", + ) + revmap_tree.add(node) + if progress_callback: progress_callback( - (offset * 100) / layer.maximum_address, - "Creating reverse kernel map", + (virt_offset * 100) / layer.maximum_address, + f"Creating custom tree mapping for kernel", ) - # TODO: Include kernel modules + # now process normal processes, ignoring kernel addrs + for process in pslist.PsList.list_processes(context, layer_name, symbol_table): + if not filter(process): + proc_id = "Unknown" + try: + proc_id = process.UniqueProcessId + proc_layer_name = process.add_process_layer() + except exceptions.InvalidAddressException as excp: + vollog.debug( + "Process {}: invalid address {} in layer {}".format( + proc_id, excp.invalid_address, excp.layer_name + ) + ) + continue - for process in pslist.PsList.list_processes( - context=context, kernel_module_name=kernel_module_name - ): - if not filter(process): - proc_id = "Unknown" - try: - proc_id = process.UniqueProcessId - proc_layer_name = process.add_process_layer() - except exceptions.InvalidAddressException as excp: - vollog.debug( - f"Process {proc_id}: invalid address {excp.invalid_address} in layer {excp.layer_name}" + proc_layer = context.layers[proc_layer_name] + max_proc_addr = (2 ** (proc_layer._maxvirtaddr - 1)) - 1 + if isinstance(proc_layer, linear.LinearlyMappedLayer): + for mapval in proc_layer.mapping( + 0, max_proc_addr, ignore_errors=True + ): + ( + virt_offset, + virt_size, + phy_offset, + phy_mapping_size, + _phy_layer_name, + ) = mapval + + node = MappingNode( + phy_offset, + phy_offset + phy_mapping_size, + virt_offset, + virt_offset + virt_size, + proc_id, + "Process", ) - continue - - proc_layer = context.layers[proc_layer_name] - if isinstance(proc_layer, linear.LinearlyMappedLayer): - for mapval in proc_layer.mapping( - 0x0, proc_layer.maximum_address, ignore_errors=True - ): - mapped_offset, _, offset, mapped_size, _maplayer = mapval - for val in range( - mapped_offset, mapped_offset + mapped_size, 0x1000 - ): - cur_set = reverse_map.get(mapped_offset >> 12, set()) - cur_set.add( - (f"Process {process.UniqueProcessId}", offset) - ) - reverse_map[mapped_offset >> 12] = cur_set - # FIXME: make the progress for all processes, rather than per-process - if progress_callback: - progress_callback( - (offset * 100) / layer.maximum_address, - f"Creating mapping for task {process.UniqueProcessId}", - ) - - return reverse_map + revmap_tree.add(node) + + if progress_callback: + progress_callback( + (virt_offset * 100) / max_proc_addr, + f"Creating custom tree mapping for task {proc_id}", + ) + return revmap_tree From 4f7f3adc2ac1ff03ffc0c332d9d7087c9cc5a05b Mon Sep 17 00:00:00 2001 From: Mike Auty Date: Sat, 11 Oct 2025 23:50:27 +0100 Subject: [PATCH 2/2] Change the recursion to iteration to avoid stack depth issues --- .../framework/plugins/windows/strings.py | 108 +++++++++--------- 1 file changed, 52 insertions(+), 56 deletions(-) diff --git a/volatility3/framework/plugins/windows/strings.py b/volatility3/framework/plugins/windows/strings.py index 9281847cbf..e413688307 100644 --- a/volatility3/framework/plugins/windows/strings.py +++ b/volatility3/framework/plugins/windows/strings.py @@ -1,15 +1,16 @@ # This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0 # which is available at https://www.volatilityfoundation.org/license/vsl-v1.0 # +from __future__ import annotations -from dataclasses import dataclass import logging import re -from typing import Dict, Generator, List, Set, Tuple, Optional +from dataclasses import dataclass +from typing import Generator -from volatility3.framework import interfaces, renderers, exceptions, constants +from volatility3.framework import constants, exceptions, interfaces, renderers from volatility3.framework.configuration import requirements -from volatility3.framework.layers import intel, resources, linear +from volatility3.framework.layers import intel, linear, resources from volatility3.framework.renderers import format_hints from volatility3.plugins.windows import pslist @@ -18,45 +19,41 @@ @dataclass class MappingNode: - def __init__( - self, - physical_addr_start, - physical_addr_end, - virtual_addr_start, - virtual_addr_end, - process_id, - region, - ) -> None: - self.physical_addr_start = physical_addr_start - self.physical_addr_end = physical_addr_end - self.virtual_addr_start = virtual_addr_start - self.virtual_addr_end = virtual_addr_end - self.process_id = process_id - self.region = region + physical_addr_start: int + physical_addr_end: int + virtual_addr_start: int + virtual_addr_end: int + process_id: int | str + region: str +@dataclass class MappingTree: - def __init__(self, root=None) -> None: - self.root = root - self.left = None - self.right = None - - def add(self, node): - if isinstance(node, MappingNode): - if self.root == None: - self.root = node - elif node.physical_addr_start < self.root.physical_addr_start: - if self.left == None: - self.left = MappingTree(node) + root: MappingNode | None = None + left: MappingTree | None = None + right: MappingTree | None = None + + def add(self, node: MappingNode, depth: int = 0) -> None: + # Iteratively add to avoid recursion issues + if not isinstance(node, MappingNode): + raise TypeError + parent_node: MappingTree | None = self + while parent_node is not None: + if parent_node.root is None: + parent_node.root = node + parent_node = None + elif node.physical_addr_start < parent_node.root.physical_addr_start: + if parent_node.left is None: + parent_node.left = MappingTree(node) + parent_node = None else: - self.left.add(node) + parent_node = parent_node.left else: - if self.right == None: - self.right = MappingTree(node) + if parent_node.right is None: + parent_node.right = MappingTree(node) + parent_node = None else: - self.right.add(node) - else: - raise TypeError() + parent_node = parent_node.right def at(self, point): if self.root: @@ -80,7 +77,7 @@ class Strings(interfaces.plugins.PluginInterface): strings_pattern = re.compile(rb"^(?:\W*)([0-9]+)(?:\W*)(\w[\w\W]+)\n?") @classmethod - def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: + def get_requirements(cls) -> list[interfaces.configuration.RequirementInterface]: return [ requirements.ModuleRequirement( name="kernel", @@ -113,13 +110,14 @@ def run(self): self._generator(), ) - def _generator(self) -> Generator[Tuple, None, None]: + def _generator(self) -> Generator[tuple, None, None]: """Generates results from a strings file.""" - string_list: List[Tuple[int, bytes]] = [] + string_list: list[tuple[int, bytes]] = [] # Test strings file format is accurate - accessor = resources.ResourceAccessor() - strings_fp = accessor.open(self.config["strings_file"], "rb") + strings_fp = resources.ResourceAccessor().open( + self.config["strings_file"], "rb" + ) line = strings_fp.readline() count: float = 0 while line: @@ -140,9 +138,9 @@ def _generator(self) -> Generator[Tuple, None, None]: pid_list=self.config["pid"], ) - last_prog: float = 0 + _last_prog: float = 0 line_count: float = 0 - num_strings = len(string_list) + _num_strings = len(string_list) for phys_offset, string in string_list: line_count += 1 @@ -177,7 +175,7 @@ def _generator(self) -> Generator[Tuple, None, None]: ), ) - def _parse_line(self, line: bytes) -> Tuple[int, bytes]: + def _parse_line(self, line: bytes) -> tuple[int, bytes]: """Parses a single line from a strings file. Args: @@ -200,8 +198,8 @@ def generate_mapping( layer_name: str, symbol_table: str, progress_callback: constants.ProgressCallback = None, - pid_list: Optional[List[int]] = None, - ) -> Dict[int, Set[Tuple[str, int]]]: + pid_list: list[int] | None = None, + ) -> MappingTree: """Creates a reverse mapping between virtual addresses and physical addresses. @@ -219,7 +217,7 @@ def generate_mapping( revmap_tree = MappingTree() # start with kernel mappings - layer = context.layers[layer_name] + layer: intel.Intel = context.layers[layer_name] min_kernel_addr = 2 ** (layer._maxvirtaddr - 1) if isinstance(layer, intel.Intel): # We don't care about errors, we just wanted chunks that map correctly @@ -247,7 +245,7 @@ def generate_mapping( if progress_callback: progress_callback( (virt_offset * 100) / layer.maximum_address, - f"Creating custom tree mapping for kernel", + f"Creating custom tree mapping for kernel at offset : {virt_offset:x}", ) # now process normal processes, ignoring kernel addrs @@ -259,13 +257,11 @@ def generate_mapping( proc_layer_name = process.add_process_layer() except exceptions.InvalidAddressException as excp: vollog.debug( - "Process {}: invalid address {} in layer {}".format( - proc_id, excp.invalid_address, excp.layer_name - ) + f"Process {proc_id}: invalid address {excp.invalid_address} in layer {excp.layer_name}" ) continue - proc_layer = context.layers[proc_layer_name] + proc_layer: intel.Intel = context.layers[proc_layer_name] max_proc_addr = (2 ** (proc_layer._maxvirtaddr - 1)) - 1 if isinstance(proc_layer, linear.LinearlyMappedLayer): for mapval in proc_layer.mapping( @@ -284,14 +280,14 @@ def generate_mapping( phy_offset + phy_mapping_size, virt_offset, virt_offset + virt_size, - proc_id, - "Process", + process_id=proc_id, + region="Process", ) revmap_tree.add(node) if progress_callback: progress_callback( (virt_offset * 100) / max_proc_addr, - f"Creating custom tree mapping for task {proc_id}", + f"Creating custom tree mapping for task {proc_id}: {virt_offset:x}", ) return revmap_tree