|
| 1 | +# This file is Copyright 2024 Volatility Foundation and licensed under the Volatility Software License 1.0 |
| 2 | +# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0 |
| 3 | +# |
| 4 | +import logging |
| 5 | +from typing import List, Dict, Iterator |
| 6 | +from volatility3.plugins.linux import lsmod, check_modules, hidden_modules |
| 7 | +from volatility3.framework import interfaces |
| 8 | +from volatility3.framework.configuration import requirements |
| 9 | +from volatility3.framework.renderers import format_hints, TreeGrid, NotAvailableValue |
| 10 | +from volatility3.framework.symbols.linux import extensions |
| 11 | +from volatility3.framework.constants import architectures |
| 12 | +from volatility3.framework.symbols.linux.utilities import tainting |
| 13 | + |
| 14 | +vollog = logging.getLogger(__name__) |
| 15 | + |
| 16 | + |
| 17 | +class Modxview(interfaces.plugins.PluginInterface): |
| 18 | + """Centralize lsmod, check_modules and hidden_modules results to efficiently |
| 19 | + spot modules presence and taints.""" |
| 20 | + |
| 21 | + _version = (1, 0, 0) |
| 22 | + _required_framework_version = (2, 17, 0) |
| 23 | + |
| 24 | + @classmethod |
| 25 | + def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: |
| 26 | + return [ |
| 27 | + requirements.ModuleRequirement( |
| 28 | + name="kernel", |
| 29 | + description="Linux kernel", |
| 30 | + architectures=architectures.LINUX_ARCHS, |
| 31 | + ), |
| 32 | + requirements.VersionRequirement( |
| 33 | + name="linux-tainting", component=tainting.Tainting, version=(1, 0, 0) |
| 34 | + ), |
| 35 | + requirements.PluginRequirement( |
| 36 | + name="lsmod", plugin=lsmod.Lsmod, version=(2, 0, 0) |
| 37 | + ), |
| 38 | + requirements.PluginRequirement( |
| 39 | + name="check_modules", |
| 40 | + plugin=check_modules.Check_modules, |
| 41 | + version=(1, 0, 0), |
| 42 | + ), |
| 43 | + requirements.PluginRequirement( |
| 44 | + name="hidden_modules", |
| 45 | + plugin=hidden_modules.Hidden_modules, |
| 46 | + version=(1, 0, 0), |
| 47 | + ), |
| 48 | + requirements.BooleanRequirement( |
| 49 | + name="plain_taints", |
| 50 | + description="Display the plain taints string for each module.", |
| 51 | + optional=True, |
| 52 | + default=False, |
| 53 | + ), |
| 54 | + ] |
| 55 | + |
| 56 | + @classmethod |
| 57 | + def flatten_run_modules_results( |
| 58 | + cls, run_results: Dict[str, List[extensions.module]], deduplicate: bool = True |
| 59 | + ) -> Iterator[extensions.module]: |
| 60 | + """Flatten a dictionary mapping plugin names and modules list, to a single merged list. |
| 61 | + This is useful to get a generic lookup list of all the detected modules. |
| 62 | +
|
| 63 | + Args: |
| 64 | + run_results: dictionary of plugin names mapping a list of detected modules |
| 65 | + deduplicate: remove duplicate modules, based on their offsets |
| 66 | +
|
| 67 | + Returns: |
| 68 | + Iterator of modules objects |
| 69 | + """ |
| 70 | + seen_addresses = set() |
| 71 | + for modules in run_results.values(): |
| 72 | + for module in modules: |
| 73 | + if deduplicate and module.vol.offset in seen_addresses: |
| 74 | + continue |
| 75 | + seen_addresses.add(module.vol.offset) |
| 76 | + yield module |
| 77 | + |
| 78 | + @classmethod |
| 79 | + def run_modules_scanners( |
| 80 | + cls, |
| 81 | + context: interfaces.context.ContextInterface, |
| 82 | + kernel_name: str, |
| 83 | + run_hidden_modules: bool = True, |
| 84 | + ) -> Dict[str, List[extensions.module]]: |
| 85 | + """Run module scanning plugins and aggregate the results. It is designed |
| 86 | + to not operate any inter-plugin results triage. |
| 87 | +
|
| 88 | + Args: |
| 89 | + run_hidden_modules: specify if the hidden_modules plugin should be run |
| 90 | + Returns: |
| 91 | + Dictionary mapping each plugin to its corresponding result |
| 92 | + """ |
| 93 | + |
| 94 | + kernel = context.modules[kernel_name] |
| 95 | + run_results = {} |
| 96 | + # lsmod |
| 97 | + run_results["lsmod"] = list(lsmod.Lsmod.list_modules(context, kernel_name)) |
| 98 | + # check_modules |
| 99 | + sysfs_modules: dict = check_modules.Check_modules.get_kset_modules( |
| 100 | + context, kernel_name |
| 101 | + ) |
| 102 | + ## Convert get_kset_modules() offsets back to module objects |
| 103 | + run_results["check_modules"] = [ |
| 104 | + kernel.object(object_type="module", offset=m_offset, absolute=True) |
| 105 | + for m_offset in sysfs_modules.values() |
| 106 | + ] |
| 107 | + # hidden_modules |
| 108 | + if run_hidden_modules: |
| 109 | + known_modules_addresses = set( |
| 110 | + context.layers[kernel.layer_name].canonicalize(module.vol.offset) |
| 111 | + for module in run_results["lsmod"] + run_results["check_modules"] |
| 112 | + ) |
| 113 | + modules_memory_boundaries = ( |
| 114 | + hidden_modules.Hidden_modules.get_modules_memory_boundaries( |
| 115 | + context, kernel_name |
| 116 | + ) |
| 117 | + ) |
| 118 | + run_results["hidden_modules"] = list( |
| 119 | + hidden_modules.Hidden_modules.get_hidden_modules( |
| 120 | + context, |
| 121 | + kernel_name, |
| 122 | + known_modules_addresses, |
| 123 | + modules_memory_boundaries, |
| 124 | + ) |
| 125 | + ) |
| 126 | + |
| 127 | + return run_results |
| 128 | + |
| 129 | + def _generator(self): |
| 130 | + kernel_name = self.config["kernel"] |
| 131 | + run_results = self.run_modules_scanners(self.context, kernel_name) |
| 132 | + aggregated_modules = {} |
| 133 | + # We want to be explicit on the plugins results we are interested in |
| 134 | + for plugin_name in ["lsmod", "check_modules", "hidden_modules"]: |
| 135 | + # Iterate over each recovered module |
| 136 | + for module in run_results[plugin_name]: |
| 137 | + # Use offsets as unique keys, whether a module |
| 138 | + # appears in many plugin runs or not |
| 139 | + if aggregated_modules.get(module.vol.offset, None) is not None: |
| 140 | + # Append the plugin to the list of originating plugins |
| 141 | + aggregated_modules[module.vol.offset][1].append(plugin_name) |
| 142 | + else: |
| 143 | + aggregated_modules[module.vol.offset] = (module, [plugin_name]) |
| 144 | + |
| 145 | + for module_offset, (module, originating_plugins) in aggregated_modules.items(): |
| 146 | + # Tainting parsing capabilities applied to the module |
| 147 | + if self.config.get("plain_taints"): |
| 148 | + taints = tainting.Tainting.get_taints_as_plain_string( |
| 149 | + self.context, |
| 150 | + kernel_name, |
| 151 | + module.taints, |
| 152 | + True, |
| 153 | + ) |
| 154 | + else: |
| 155 | + taints = ",".join( |
| 156 | + tainting.Tainting.get_taints_parsed( |
| 157 | + self.context, |
| 158 | + kernel_name, |
| 159 | + module.taints, |
| 160 | + True, |
| 161 | + ) |
| 162 | + ) |
| 163 | + |
| 164 | + yield ( |
| 165 | + 0, |
| 166 | + ( |
| 167 | + module.get_name() or NotAvailableValue(), |
| 168 | + format_hints.Hex(module_offset), |
| 169 | + "lsmod" in originating_plugins, |
| 170 | + "check_modules" in originating_plugins, |
| 171 | + "hidden_modules" in originating_plugins, |
| 172 | + taints or NotAvailableValue(), |
| 173 | + ), |
| 174 | + ) |
| 175 | + |
| 176 | + def run(self): |
| 177 | + columns = [ |
| 178 | + ("Name", str), |
| 179 | + ("Address", format_hints.Hex), |
| 180 | + ("In procfs", bool), |
| 181 | + ("In sysfs", bool), |
| 182 | + ("Hidden", bool), |
| 183 | + ("Taints", str), |
| 184 | + ] |
| 185 | + |
| 186 | + return TreeGrid( |
| 187 | + columns, |
| 188 | + self._generator(), |
| 189 | + ) |
0 commit comments