-
Notifications
You must be signed in to change notification settings - Fork 640
Expand file tree
/
Copy pathproc.py
More file actions
276 lines (246 loc) · 10.4 KB
/
proc.py
File metadata and controls
276 lines (246 loc) · 10.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
"""A module containing a collection of plugins that produce data typically
found in Linux's /proc file system."""
import logging
from typing import Callable, Generator, Type, Optional
from volatility3.framework import renderers, interfaces, exceptions
from volatility3.framework.configuration import requirements
from volatility3.framework.interfaces import plugins
from volatility3.framework.objects import utility
from volatility3.framework.renderers import format_hints
from volatility3.plugins.linux import pslist
vollog = logging.getLogger(__name__)
class Maps(plugins.PluginInterface):
"""Lists all memory maps for all processes."""
_required_framework_version = (2, 0, 0)
_version = (1, 0, 3)
MAXSIZE_DEFAULT = 1024 * 1024 * 1024 # 1 Gb
@classmethod
def get_requirements(cls):
# Since we're calling the plugin, make sure we have the plugin's requirements
return [
requirements.ModuleRequirement(
name="kernel",
description="Linux kernel",
architectures=["Intel32", "Intel64"],
),
requirements.PluginRequirement(
name="pslist", plugin=pslist.PsList, version=(4, 0, 0)
),
requirements.ListRequirement(
name="pid",
description="Filter on specific process IDs",
element_type=int,
optional=True,
),
requirements.BooleanRequirement(
name="dump",
description="Extract listed memory segments",
default=False,
optional=True,
),
requirements.ListRequirement(
name="address",
description="Process virtual memory addresses to include "
"(all other VMA sections are excluded). This can be any "
"virtual address within the VMA section.",
element_type=int,
optional=True,
),
requirements.IntRequirement(
name="maxsize",
description="Maximum size for dumped VMA sections "
"(all the bigger sections will be ignored)",
default=cls.MAXSIZE_DEFAULT,
optional=True,
),
]
@classmethod
def list_vmas(
cls,
task: interfaces.objects.ObjectInterface,
filter_func: Callable[
[interfaces.objects.ObjectInterface], bool
] = lambda _: True,
) -> Generator[interfaces.objects.ObjectInterface, None, None]:
"""Lists the Virtual Memory Areas of a specific process.
Args:
task: task object from which to list the vma
filter_func: Function to take a vma and return False if it should be filtered out
Returns:
Yields vmas based on the task and filtered based on the filter function
"""
mm_pointer = task.mm
if not mm_pointer:
vollog.debug(
f"Excluded pid {task.pid} as there is no mm member. It is likely a kernel thread"
)
return
if not mm_pointer.is_readable():
vollog.error(f"Task {task.pid} has an invalid mm member")
return
for vma in mm_pointer.get_vma_iter():
if filter_func(vma):
yield vma
else:
vollog.debug(
f"Excluded vma at offset {vma.vol.offset:#x} for pid {task.pid} due to filter_func"
)
@classmethod
def vma_dump(
cls,
context: interfaces.context.ContextInterface,
task: interfaces.objects.ObjectInterface,
vm_start: int,
vm_end: int,
open_method: Type[interfaces.plugins.FileHandlerInterface],
maxsize: int = MAXSIZE_DEFAULT,
) -> Optional[interfaces.plugins.FileHandlerInterface]:
"""Extracts the complete data for VMA as a FileInterface.
Args:
context: The context to retrieve required elements (layers, symbol tables) from
task: an task_struct instance
vm_start: The start virtual address from the vma to dump
vm_end: The end virtual address from the vma to dump
open_method: class to provide context manager for opening the file
maxsize: Max size of VMA section (default MAXSIZE_DEFAULT)
Returns:
An open FileInterface object containing the complete data for the task or None in the case of failure
"""
pid = task.pid
try:
proc_layer_name = task.add_process_layer()
except exceptions.InvalidAddressException as excp:
vollog.debug(
f"Process {pid}: invalid address {excp.invalid_address} in layer {excp.layer_name}"
)
return None
vm_size = vm_end - vm_start
# check if vm_size is negative, this should never happen.
if vm_size < 0:
vollog.warning(
f"Skip virtual memory dump for pid {pid} between {vm_start:#x}-{vm_end:#x} as {vm_size} is negative."
)
return None
# check if vm_size is larger than the maxsize limit, and therefore is not saved out.
if maxsize <= vm_size:
vollog.warning(
f"Skip virtual memory dump for pid {pid} between {vm_start:#x}-{vm_end:#x} as {vm_size} is larger than maxsize limit of {maxsize}"
)
return None
proc_layer = context.layers[proc_layer_name]
file_name = f"pid.{pid}.vma.{vm_start:#x}-{vm_end:#x}.dmp"
try:
file_handle = open_method(file_name)
chunk_size = 1024 * 1024 * 10
offset = vm_start
while offset < vm_start + vm_size:
to_read = min(chunk_size, vm_start + vm_size - offset)
data = proc_layer.read(offset, to_read, pad=True)
file_handle.write(data)
offset += to_read
except Exception as excp:
vollog.debug(f"Unable to dump virtual memory {file_name}: {excp}")
return None
return file_handle
def _generator(self, tasks):
# build filter for addresses if required
address_list = self.config.get("address", None)
if not address_list:
# do not filter as no address_list was supplied
def vma_filter_func(_):
return True
else:
# filter for any vm_start that matches the supplied address config
def vma_filter_function(x: interfaces.objects.ObjectInterface) -> bool:
addrs_in_vma = [
addr for addr in address_list if x.vm_start <= addr <= x.vm_end
]
# if any of the user supplied addresses would fall within this vma return true
return bool(addrs_in_vma)
vma_filter_func = vma_filter_function
for task in tasks:
if not (task.mm and task.mm.is_readable()):
continue
name = utility.array_to_string(task.comm)
for vma in self.list_vmas(task, filter_func=vma_filter_func):
flags = vma.get_protection()
page_offset = vma.get_page_offset()
inode_num = None
try:
dentry = vma.vm_file.get_dentry()
inode_ptr = dentry.d_inode
inode_num = inode_ptr.i_ino
major = inode_ptr.i_sb.major
minor = inode_ptr.i_sb.minor
except exceptions.InvalidAddressException:
if not inode_num:
inode_num = 0
major = 0
minor = 0
path = vma.get_name(self.context, task)
file_output = "Disabled"
if self.config["dump"]:
file_output = "Error outputting file"
try:
vm_start = vma.vm_start
vm_end = vma.vm_end
except AttributeError:
vollog.debug(
f"Unable to find the vm_start and vm_end for vma at {vma.vol.offset:#x} for pid {task.pid}"
)
vm_start = None
vm_end = None
if vm_start and vm_end:
# only attempt to dump the memory if we have vm_start and vm_end
file_handle = self.vma_dump(
self.context,
task,
vm_start,
vm_end,
self.open,
self.config["maxsize"],
)
if file_handle:
file_handle.close()
file_output = file_handle.preferred_filename
yield (
0,
(
task.pid,
name,
format_hints.Hex(vma.vm_start),
format_hints.Hex(vma.vm_end),
flags,
format_hints.Hex(page_offset),
major,
minor,
inode_num,
path or renderers.NotAvailableValue(),
file_output,
),
)
def run(self):
filter_func = pslist.PsList.create_pid_filter(self.config.get("pid", None))
return renderers.TreeGrid(
[
("PID", int),
("Process", str),
("Start", format_hints.Hex),
("End", format_hints.Hex),
("Flags", str),
("PgOff", format_hints.Hex),
("Major", int),
("Minor", int),
("Inode", int),
("File Path", str),
("File output", str),
],
self._generator(
pslist.PsList.list_tasks(
self.context, self.config["kernel"], filter_func=filter_func
)
),
)