Skip to content

Commit af5a743

Browse files
authored
Merge pull request #1219 from qpalzmz112/develop
Added psxview plugin
2 parents 16cde44 + 98c0094 commit af5a743

File tree

1 file changed

+251
-0
lines changed

1 file changed

+251
-0
lines changed
Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
import datetime, logging, string
2+
3+
from volatility3.framework import constants, exceptions
4+
from volatility3.framework.interfaces import plugins
5+
from volatility3.framework.configuration import requirements
6+
from volatility3.framework.renderers import format_hints, TreeGrid
7+
from volatility3.plugins.windows import (
8+
handles,
9+
info,
10+
pslist,
11+
psscan,
12+
sessions,
13+
thrdscan,
14+
)
15+
16+
vollog = logging.getLogger(__name__)
17+
18+
19+
class PsXView(plugins.PluginInterface):
20+
"""Lists all processes found via four of the methods described in \"The Art of Memory Forensics,\" which may help
21+
identify processes that are trying to hide themselves. I recommend using -r pretty if you are looking at this
22+
plugin's output in a terminal."""
23+
24+
# I've omitted the desktop thread scanning method because Volatility3 doesn't appear to have the funcitonality
25+
# which the original plugin used to do it.
26+
27+
# The sessions method is omitted because it begins with the list of processes found by Pslist anyway.
28+
29+
# Lastly, I've omitted the pspcid method because I could not for the life of me get it to work. I saved the
30+
# code I do have from it, and will happily share it if anyone else wants to add it.
31+
32+
_required_framework_version = (2, 0, 0)
33+
_version = (1, 0, 0)
34+
35+
valid_proc_name_chars = set(
36+
string.ascii_lowercase + string.ascii_uppercase + "." + " "
37+
)
38+
39+
@classmethod
40+
def get_requirements(cls):
41+
return [
42+
requirements.ModuleRequirement(
43+
name="kernel",
44+
description="Windows kernel",
45+
architectures=["Intel32", "Intel64"],
46+
),
47+
requirements.VersionRequirement(
48+
name="info", component=info.Info, version=(1, 0, 0)
49+
),
50+
requirements.VersionRequirement(
51+
name="pslist", component=pslist.PsList, version=(2, 0, 0)
52+
),
53+
requirements.VersionRequirement(
54+
name="psscan", component=psscan.PsScan, version=(1, 0, 0)
55+
),
56+
requirements.VersionRequirement(
57+
name="thrdscan", component=thrdscan.ThrdScan, version=(1, 0, 0)
58+
),
59+
requirements.VersionRequirement(
60+
name="handles", component=handles.Handles, version=(1, 0, 0)
61+
),
62+
requirements.BooleanRequirement(
63+
name="physical-offsets",
64+
description="List processes with physical offsets instead of virtual offsets.",
65+
optional=True,
66+
),
67+
]
68+
69+
def _proc_name_to_string(self, proc):
70+
return proc.ImageFileName.cast(
71+
"string", max_length=proc.ImageFileName.vol.count, errors="replace"
72+
)
73+
74+
def _is_valid_proc_name(self, str):
75+
for c in str:
76+
if not c in self.valid_proc_name_chars:
77+
return False
78+
return True
79+
80+
def _filter_garbage_procs(self, proc_list):
81+
return [
82+
p
83+
for p in proc_list
84+
if p.is_valid() and self._is_valid_proc_name(self._proc_name_to_string(p))
85+
]
86+
87+
def _translate_offset(self, offset):
88+
if not self.config["physical-offsets"]:
89+
return offset
90+
91+
kernel = self.context.modules[self.config["kernel"]]
92+
layer_name = kernel.layer_name
93+
94+
try:
95+
_original_offset, _original_length, offset, _length, _layer_name = list(
96+
self.context.layers[layer_name].mapping(offset=offset, length=0)
97+
)[0]
98+
except exceptions.PagedInvalidAddressException:
99+
vollog.debug(f"Page fault: unable to translate {offset:0x}")
100+
101+
return offset
102+
103+
def _proc_list_to_dict(self, tasks):
104+
tasks = self._filter_garbage_procs(tasks)
105+
return {self._translate_offset(proc.vol.offset): proc for proc in tasks}
106+
107+
def _check_pslist(self, tasks):
108+
return self._proc_list_to_dict(tasks)
109+
110+
def _check_psscan(self, layer_name, symbol_table):
111+
res = psscan.PsScan.scan_processes(
112+
context=self.context, layer_name=layer_name, symbol_table=symbol_table
113+
)
114+
115+
return self._proc_list_to_dict(res)
116+
117+
def _check_thrdscan(self):
118+
ret = []
119+
120+
for ethread in thrdscan.ThrdScan.scan_threads(
121+
self.context, module_name="kernel"
122+
):
123+
process = None
124+
try:
125+
process = ethread.owning_process()
126+
if not process.is_valid():
127+
continue
128+
129+
ret.append(process)
130+
except AttributeError:
131+
vollog.log(
132+
constants.LOGLEVEL_VVV,
133+
"Unable to find the owning process of ethread",
134+
)
135+
136+
return self._proc_list_to_dict(ret)
137+
138+
def _check_csrss_handles(self, tasks, layer_name, symbol_table):
139+
ret = []
140+
141+
for p in tasks:
142+
name = self._proc_name_to_string(p)
143+
if name == "csrss.exe":
144+
try:
145+
if p.has_member("ObjectTable"):
146+
handles_plugin = handles.Handles(
147+
context=self.context, config_path=self.config_path
148+
)
149+
hndls = list(handles_plugin.handles(p.ObjectTable))
150+
for h in hndls:
151+
if (
152+
h.get_object_type(
153+
handles_plugin.get_type_map(
154+
self.context, layer_name, symbol_table
155+
)
156+
)
157+
== "Process"
158+
):
159+
ret.append(h.Body.cast("_EPROCESS"))
160+
161+
except exceptions.InvalidAddressException:
162+
vollog.log(
163+
constants.LOGLEVEL_VVV, "Cannot access eprocess object table"
164+
)
165+
166+
return self._proc_list_to_dict(ret)
167+
168+
def _generator(self):
169+
kernel = self.context.modules[self.config["kernel"]]
170+
171+
layer_name = kernel.layer_name
172+
symbol_table = kernel.symbol_table_name
173+
174+
kdbg_list_processes = list(
175+
pslist.PsList.list_processes(
176+
context=self.context, layer_name=layer_name, symbol_table=symbol_table
177+
)
178+
)
179+
180+
# get processes from each source
181+
processes = {}
182+
183+
processes["pslist"] = self._check_pslist(kdbg_list_processes)
184+
processes["psscan"] = self._check_psscan(layer_name, symbol_table)
185+
processes["thrdscan"] = self._check_thrdscan()
186+
processes["csrss"] = self._check_csrss_handles(
187+
kdbg_list_processes, layer_name, symbol_table
188+
)
189+
190+
# print results
191+
192+
# list of lists of offsets
193+
offsets = [list(processes[source].keys()) for source in processes]
194+
195+
# flatten to one list
196+
offsets = sum(offsets, [])
197+
198+
# remove duplicates
199+
offsets = set(offsets)
200+
201+
for offset in offsets:
202+
proc = None
203+
204+
in_sources = {src: False for src in processes}
205+
206+
for source in processes:
207+
if offset in processes[source]:
208+
in_sources[source] = True
209+
if not proc:
210+
proc = processes[source][offset]
211+
212+
pid = proc.UniqueProcessId
213+
name = self._proc_name_to_string(proc)
214+
215+
exit_time = proc.get_exit_time()
216+
if type(exit_time) != datetime.datetime:
217+
exit_time = ""
218+
else:
219+
exit_time = str(exit_time)
220+
221+
yield (
222+
0,
223+
(
224+
format_hints.Hex(offset),
225+
name,
226+
pid,
227+
in_sources["pslist"],
228+
in_sources["psscan"],
229+
in_sources["thrdscan"],
230+
in_sources["csrss"],
231+
exit_time,
232+
),
233+
)
234+
235+
def run(self):
236+
offset_type = "(Physical)" if self.config["physical-offsets"] else "(Virtual)"
237+
offset_str = "Offset" + offset_type
238+
239+
return TreeGrid(
240+
[
241+
(offset_str, format_hints.Hex),
242+
("Name", str),
243+
("PID", int),
244+
("pslist", bool),
245+
("psscan", bool),
246+
("thrdscan", bool),
247+
("csrss", bool),
248+
("Exit Time", str),
249+
],
250+
self._generator(),
251+
)

0 commit comments

Comments
 (0)