Skip to content

Commit 5dee3ae

Browse files
committed
Add linux.hidden_modules plugin
1 parent 6cd39c0 commit 5dee3ae

File tree

2 files changed

+369
-0
lines changed

2 files changed

+369
-0
lines changed
Lines changed: 331 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,331 @@
1+
# This file is Copyright 2024 Volatility Foundation and licensed under the Volatility Software License 1.0
2+
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
3+
#
4+
import re
5+
import functools
6+
import logging
7+
import contextlib
8+
from typing import List, Iterable
9+
from volatility3.framework import renderers, interfaces, exceptions, objects
10+
from volatility3.framework.constants.architectures import LINUX_ARCHS
11+
from volatility3.framework.renderers import format_hints
12+
from volatility3.framework.configuration import requirements
13+
from volatility3.plugins.linux import lsmod
14+
15+
vollog = logging.getLogger(__name__)
16+
17+
18+
class Hidden_modules(interfaces.plugins.PluginInterface):
19+
"""Carves memory to find hidden kernel modules"""
20+
21+
_required_framework_version = (2, 10, 0)
22+
23+
_version = (1, 0, 0)
24+
25+
@classmethod
26+
def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
27+
return [
28+
requirements.ModuleRequirement(
29+
name="kernel",
30+
description="Linux kernel",
31+
architectures=LINUX_ARCHS,
32+
),
33+
requirements.PluginRequirement(
34+
name="lsmod", plugin=lsmod.Lsmod, version=(2, 0, 0)
35+
),
36+
requirements.BooleanRequirement(
37+
name="fast",
38+
description="Fast scan method. Recommended only for kernels 4.2 and above",
39+
optional=True,
40+
default=False,
41+
),
42+
]
43+
44+
def _get_modules_memory_boundaries(self, vmlinux):
45+
if vmlinux.has_symbol("mod_tree"):
46+
mod_tree = vmlinux.object_from_symbol("mod_tree")
47+
modules_addr_min = mod_tree.addr_min
48+
modules_addr_max = mod_tree.addr_max
49+
elif vmlinux.has_symbol("module_addr_min"):
50+
modules_addr_min = vmlinux.object_from_symbol("module_addr_min")
51+
modules_addr_max = vmlinux.object_from_symbol("module_addr_max")
52+
53+
if isinstance(modules_addr_min, objects.Void):
54+
# Crap ISF! Here's my best-effort workaround
55+
vollog.warning(
56+
"Your ISF symbols are missing type information. You may need to update "
57+
"the ISF using the latest version of dwarf2json"
58+
)
59+
# See issue #1041. In the Linux kernel these are "unsigned long"
60+
for type_name in ("long unsigned int", "unsigned long"):
61+
if vmlinux.has_type(type_name):
62+
modules_addr_min = modules_addr_min.cast(type_name)
63+
modules_addr_max = modules_addr_max.cast(type_name)
64+
break
65+
else:
66+
raise exceptions.VolatilityException(
67+
"Bad ISF! Please update the ISF using the latest version of dwarf2json"
68+
)
69+
else:
70+
raise exceptions.VolatilityException(
71+
"Cannot find the module memory allocation area. Unsupported kernel"
72+
)
73+
74+
return modules_addr_min, modules_addr_max
75+
76+
def _get_module_state_live_bytes(
77+
self,
78+
context: interfaces.context.ContextInterface,
79+
vmlinux_module_name: str,
80+
) -> bytes:
81+
"""Retrieve the MODULE_STATE_LIVE value bytes by introspecting its enum type
82+
83+
Args:
84+
context: The context to retrieve required elements (layers, symbol tables) from
85+
vmlinux_module_name: The name of the kernel module on which to operate
86+
87+
Returns:
88+
The MODULE_STATE_LIVE value bytes
89+
"""
90+
vmlinux = context.modules[vmlinux_module_name]
91+
module_state_type_template = vmlinux.get_type("module").vol.members["state"][1]
92+
module_state_live_val = module_state_type_template.choices["MODULE_STATE_LIVE"]
93+
data_format = module_state_type_template.base_type.vol.data_format
94+
module_state_live_bytes = objects.convert_value_to_data(
95+
module_state_live_val, int, data_format
96+
)
97+
return module_state_live_bytes
98+
99+
def get_hidden_modules_vol2(
100+
self,
101+
context: interfaces.context.ContextInterface,
102+
vmlinux_module_name: str,
103+
known_module_addresses,
104+
modules_memory_boundaries: tuple,
105+
) -> Iterable[interfaces.objects.ObjectInterface]:
106+
"""Enumerate hidden modules using the traditional implementation.
107+
108+
This is a port of the Volatility2 plugin, with minor code improvements.
109+
110+
Args:
111+
context: The context to retrieve required elements (layers, symbol tables) from
112+
vmlinux_module_name: The name of the kernel module on which to operate
113+
Yields:
114+
module objects
115+
"""
116+
vmlinux = context.modules[vmlinux_module_name]
117+
vmlinux_layer = context.layers[vmlinux.layer_name]
118+
119+
check_nums = (
120+
3000,
121+
2800,
122+
2700,
123+
2500,
124+
2300,
125+
2100,
126+
2000,
127+
1500,
128+
1300,
129+
1200,
130+
1024,
131+
512,
132+
256,
133+
128,
134+
96,
135+
64,
136+
48,
137+
32,
138+
24,
139+
)
140+
modules_addr_min, modules_addr_max = modules_memory_boundaries
141+
modules_addr_min = modules_addr_min & ~0xFFF
142+
modules_addr_max = (modules_addr_max & ~0xFFF) + vmlinux_layer.page_size
143+
144+
check_bufs = []
145+
replace_bufs = []
146+
minus_size = vmlinux.get_type("pointer").size
147+
null_pointer_bytes = b"\x00" * minus_size
148+
for num in check_nums:
149+
check_bufs.append(b"\x00" * num)
150+
replace_bufs.append((b"\xff" * (num - minus_size)) + null_pointer_bytes)
151+
152+
all_ffs = b"\xff" * 4096
153+
scan_list = []
154+
for page_addr in range(
155+
modules_addr_min, modules_addr_max, vmlinux_layer.page_size
156+
):
157+
content_fixed = all_ffs
158+
with contextlib.suppress(
159+
exceptions.InvalidAddressException,
160+
exceptions.PagedInvalidAddressException,
161+
):
162+
content = vmlinux_layer.read(page_addr, vmlinux_layer.page_size)
163+
164+
all_nulls = all(x == 0 for x in content)
165+
if content and not all_nulls:
166+
content_fixed = content
167+
for check_bytes, replace_bytes in zip(check_bufs, replace_bufs):
168+
content_fixed = content_fixed.replace(
169+
check_bytes, replace_bytes
170+
)
171+
172+
scan_list.append(content_fixed)
173+
174+
scan_buf = b"".join(scan_list)
175+
del scan_list
176+
177+
module_state_live_bytes = self._get_module_state_live_bytes(
178+
context, vmlinux_module_name
179+
)
180+
# f'strings cannot be combined with bytes literals
181+
for cur_addr in re.finditer(b"(?=(%s))" % (module_state_live_bytes), scan_buf):
182+
module_addr = modules_addr_min + cur_addr.start()
183+
184+
if module_addr in known_module_addresses:
185+
continue
186+
187+
module = vmlinux.object("module", offset=module_addr, absolute=True)
188+
if module and module.is_valid():
189+
yield module
190+
191+
@functools.cached_property
192+
def module_address_alignment(self) -> int:
193+
"""Obtain the module memory address alignment. This is only used with the fast scan method.
194+
195+
struct module is aligned to the L1 cache line, which is typically 64 bytes for most
196+
common i386/AMD64/ARM64 configurations. In some cases, it can be 128 bytes, but this
197+
will still work.
198+
199+
Returns:
200+
The struct module alignment
201+
"""
202+
# FIXME: When dwarf2json/ISF supports type alignments. Read it directly from the type metadata
203+
# The cached_property won't provide any benefits until then
204+
return 64
205+
206+
def get_hidden_modules_fast(
207+
self,
208+
context: interfaces.context.ContextInterface,
209+
vmlinux_module_name: str,
210+
known_module_addresses,
211+
modules_memory_boundaries: tuple,
212+
) -> Iterable[interfaces.objects.ObjectInterface]:
213+
"""Enumerate hidden modules by taking advantage of memory address alignment patterns
214+
215+
This technique is much faster and uses less memory than the traditional scan method
216+
in Volatility2, but it doesn't work with older kernels.
217+
218+
From kernels 4.2 struct module allocation are aligned to the L1 cache line size.
219+
In i386/amd64/arm64 this is typically 64 bytes. However, this can be changed in
220+
the Linux kernel configuration via CONFIG_X86_L1_CACHE_SHIFT. The alignment can
221+
also be obtained from the DWARF info i.e. DW_AT_alignment<64>, but dwarf2json
222+
doesn't support this feature yet.
223+
In kernels < 4.2, alignment attributes are absent in the struct module, meaning
224+
alignment cannot be guaranteed. Therefore, for older kernels, it's better to use
225+
the traditional scan technique.
226+
227+
Args:
228+
context: The context to retrieve required elements (layers, symbol tables) from
229+
vmlinux_module_name: The name of the kernel module on which to operate
230+
Yields:
231+
module objects
232+
"""
233+
vmlinux = context.modules[vmlinux_module_name]
234+
vmlinux_layer = context.layers[vmlinux.layer_name]
235+
236+
module_addr_min, module_addr_max = modules_memory_boundaries
237+
238+
module_state_live_bytes = self._get_module_state_live_bytes(
239+
context, vmlinux_module_name
240+
)
241+
242+
for module_addr in range(
243+
module_addr_min, module_addr_max, self.module_address_alignment
244+
):
245+
if module_addr in known_module_addresses:
246+
continue
247+
248+
try:
249+
# This is just a pre-filter. Module readability and consistency are verified in module.is_valid()
250+
module_state_bytes = vmlinux_layer.read(
251+
module_addr, len(module_state_live_bytes)
252+
)
253+
if module_state_bytes != module_state_live_bytes:
254+
continue
255+
except (
256+
exceptions.PagedInvalidAddressException,
257+
exceptions.InvalidAddressException,
258+
):
259+
continue
260+
261+
module = vmlinux.object("module", offset=module_addr, absolute=True)
262+
if module and module.is_valid():
263+
yield module
264+
265+
def _validate_alignment_patterns(self, addresses: Iterable[int]) -> bool:
266+
"""Check if the memory addresses meet our alignments patterns
267+
268+
Args:
269+
addresses: Iterable with the address values
270+
271+
Returns:
272+
True if all the addresses meet the alignment
273+
"""
274+
return all(addr % self.module_address_alignment == 0 for addr in addresses)
275+
276+
def get_hidden_modules(
277+
self,
278+
context: interfaces.context.ContextInterface,
279+
vmlinux_module_name: str,
280+
) -> Iterable[interfaces.objects.ObjectInterface]:
281+
"""Enumerate hidden modules
282+
283+
Args:
284+
context: The context to retrieve required elements (layers, symbol tables) from
285+
vmlinux_module_name: The name of the kernel module on which to operate
286+
Yields:
287+
module objects
288+
"""
289+
vmlinux = context.modules[vmlinux_module_name]
290+
vmlinux_layer = context.layers[vmlinux.layer_name]
291+
292+
known_module_addresses = {
293+
vmlinux_layer.canonicalize(module.vol.offset)
294+
for module in lsmod.Lsmod.list_modules(context, vmlinux_module_name)
295+
}
296+
297+
modules_memory_boundaries = self._get_modules_memory_boundaries(vmlinux)
298+
299+
if self.config.get("fast"):
300+
if self._validate_alignment_patterns(known_module_addresses):
301+
scan_method = self.get_hidden_modules_fast
302+
else:
303+
vollog.warning(
304+
f"Module addresses aren't aligned to {self.module_address_alignment} bytes. "
305+
"Switching to the traditional scan method."
306+
)
307+
scan_method = self.get_hidden_modules_vol2
308+
else:
309+
scan_method = self.get_hidden_modules_vol2
310+
311+
yield from scan_method(
312+
context,
313+
vmlinux_module_name,
314+
known_module_addresses,
315+
modules_memory_boundaries,
316+
)
317+
318+
def _generator(self):
319+
vmlinux_module_name = self.config["kernel"]
320+
for module in self.get_hidden_modules(self.context, vmlinux_module_name):
321+
module_addr = module.vol.offset
322+
module_name = module.get_name() or renderers.NotAvailableValue()
323+
fields = (format_hints.Hex(module_addr), module_name)
324+
yield (0, fields)
325+
326+
def run(self):
327+
headers = [
328+
("Address", format_hints.Hex),
329+
("Name", str),
330+
]
331+
return renderers.TreeGrid(headers, self._generator())

volatility3/framework/symbols/linux/extensions/__init__.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,34 @@ def __init__(self, *args, **kwargs):
3535
super().__init__(*args, **kwargs)
3636
self._mod_mem_type = None # Initialize _mod_mem_type to None for memoization
3737

38+
def is_valid(self):
39+
layer = self._context.layers[self.vol.layer_name]
40+
# Make sure the entire module content is readable
41+
if not layer.is_valid(self.vol.offset, self.vol.size):
42+
return False
43+
44+
if not self.state.is_valid_choice:
45+
return False
46+
47+
core_size = self.get_core_size()
48+
if not (
49+
1 <= core_size <= 20000000
50+
and core_size + self.get_init_size() >= 4096
51+
and 1 <= self.get_core_text_size() <= 20000000
52+
):
53+
return False
54+
55+
if self.has_member("mkobj") and self.mkobj.has_member("mod"):
56+
if not (
57+
self.mkobj
58+
and self.mkobj.mod
59+
and self.mkobj.mod.is_readable()
60+
and self.mkobj.mod == self.vol.offset
61+
):
62+
return False
63+
64+
return True
65+
3866
@property
3967
def mod_mem_type(self):
4068
"""Return the mod_mem_type enum choices if available or an empty dict if not"""
@@ -112,6 +140,16 @@ def get_core_size(self):
112140

113141
raise AttributeError("Unable to determine core size of module")
114142

143+
def get_core_text_size(self):
144+
if self.has_member("mem"): # kernels 6.4+
145+
return self._get_mem_size("MOD_TEXT")
146+
elif self.has_member("core_layout"):
147+
return self.core_layout.text_size
148+
elif self.has_member("core_text_size"):
149+
return self.core_text_size
150+
151+
raise AttributeError("Unable to determine core text size of module")
152+
115153
def get_module_core(self):
116154
if self.has_member("mem"): # kernels 6.4+
117155
return self._get_mem_base("MOD_TEXT")

0 commit comments

Comments
 (0)