Skip to content

Commit 8fd8f7f

Browse files
committed
process_spoofing plugin
1 parent fb76bf6 commit 8fd8f7f

File tree

1 file changed

+253
-0
lines changed

1 file changed

+253
-0
lines changed
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
# This file is Copyright 2025 Volatility Foundation and licensed under the Volatility Software License 1.0
2+
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
3+
#
4+
5+
import logging
6+
from pathlib import PurePosixPath
7+
from typing import Optional, Tuple, Iterator
8+
9+
from volatility3.framework import exceptions, interfaces, renderers
10+
from volatility3.framework.configuration import requirements
11+
from volatility3.framework.interfaces import plugins
12+
from volatility3.framework.objects import utility
13+
from volatility3.framework.symbols import linux
14+
from volatility3.plugins.linux import pslist
15+
16+
vollog = logging.getLogger(__name__)
17+
18+
19+
# https://github.com/SolitudePy/linux-mal
20+
class ProcessSpoofing(plugins.PluginInterface):
21+
"""Detects process spoofing by comparing executable path to cmdline & comm fields"""
22+
23+
_required_framework_version = (2, 0, 0)
24+
_version = (1, 0, 0)
25+
26+
@classmethod
27+
def get_requirements(cls):
28+
return [
29+
requirements.ModuleRequirement(
30+
name="kernel",
31+
description="Linux kernel",
32+
architectures=["Intel32", "Intel64"],
33+
),
34+
requirements.VersionRequirement(
35+
name="pslist", component=pslist.PsList, version=(4, 0, 0)
36+
),
37+
requirements.VersionRequirement(
38+
name="linuxutils", component=linux.LinuxUtilities, version=(2, 0, 0)
39+
),
40+
requirements.ListRequirement(
41+
name="pid",
42+
description="Filter on specific process IDs",
43+
element_type=int,
44+
optional=True,
45+
),
46+
]
47+
48+
def _get_executable_path(
49+
self, task: interfaces.objects.ObjectInterface
50+
) -> Optional[str]:
51+
"""
52+
Extract the executable path from task_struct.mm.exe_file
53+
54+
Args:
55+
task: task_struct object of the process
56+
57+
Returns:
58+
Executable path or None if not available
59+
"""
60+
try:
61+
mm = task.mm
62+
if not mm or not mm.is_readable():
63+
# Kernel threads doesn't have
64+
return None
65+
66+
exe_file = mm.exe_file
67+
if not exe_file or not exe_file.is_readable():
68+
return None
69+
70+
# Use LinuxUtilities.path_for_file to extract the path
71+
exe_path = linux.LinuxUtilities.path_for_file(self.context, task, exe_file)
72+
73+
return exe_path if exe_path else None
74+
75+
except (exceptions.InvalidAddressException, AttributeError):
76+
return None
77+
78+
def _get_cmdline_basename(
79+
self, task: interfaces.objects.ObjectInterface
80+
) -> Optional[str]:
81+
"""
82+
Extract the command line arguments and return the basename of the first argument
83+
84+
Args:
85+
task: task_struct object of the process
86+
87+
Returns:
88+
Basename of the first command line argument or None if not available
89+
"""
90+
try:
91+
mm = task.mm
92+
if not mm or not mm.is_readable():
93+
return renderers.NotAvailableValue()
94+
95+
proc_layer_name = task.add_process_layer()
96+
if proc_layer_name is None:
97+
return None
98+
99+
proc_layer = self.context.layers[proc_layer_name]
100+
101+
# Read argv from userland
102+
start = task.mm.arg_start
103+
size_to_read = task.mm.arg_end - task.mm.arg_start
104+
105+
if not (0 < size_to_read <= 4096):
106+
return None
107+
108+
# Attempt to read command line arguments
109+
try:
110+
argv = proc_layer.read(start, size_to_read)
111+
except exceptions.InvalidAddressException:
112+
return None
113+
114+
# Parse the arguments - they are null byte terminated
115+
args_str = argv.decode(encoding="utf8", errors="replace")
116+
args_list = args_str.split("\x00")
117+
if args_list and args_list[0]:
118+
basename = PurePosixPath(args_list[0]).name
119+
return basename
120+
else:
121+
return None
122+
123+
except (exceptions.InvalidAddressException, AttributeError):
124+
return None
125+
126+
def _get_comm(self, task: interfaces.objects.ObjectInterface) -> Optional[str]:
127+
"""
128+
Extract the comm field from task_struct
129+
130+
Args:
131+
task: task_struct object of the process
132+
133+
Returns:
134+
Process name from comm field or None if not available
135+
"""
136+
try:
137+
return utility.array_to_string(task.comm)
138+
except (exceptions.InvalidAddressException, AttributeError):
139+
return None
140+
141+
def _extract_process_names(
142+
self, task: interfaces.objects.ObjectInterface
143+
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
144+
"""
145+
Extract all three process name sources for comparison
146+
147+
Args:
148+
task: task_struct object of the process
149+
150+
Returns:
151+
Tuple of (exe_path_basename, cmdline_basename, comm)
152+
"""
153+
exe_path = self._get_executable_path(task)
154+
exe_basename = PurePosixPath(exe_path).name
155+
cmdline_basename = self._get_cmdline_basename(task)
156+
comm = self._get_comm(task)
157+
158+
return exe_basename, cmdline_basename, comm
159+
160+
def _detect_spoofing(
161+
self,
162+
exe_basename: Optional[str],
163+
cmdline_basename: Optional[str],
164+
comm: Optional[str],
165+
) -> Optional[list]:
166+
"""
167+
Analyze the three name sources to detect potential spoofing
168+
169+
Args:
170+
exe_basename: Basename from exe_file path
171+
cmdline_basename: Basename from command line
172+
comm: Name from comm field
173+
174+
Returns:
175+
notes: List of notes indicating potential spoofing, or None if no issues found
176+
"""
177+
notes = []
178+
179+
# Count how many name sources we have
180+
available_sources = sum(
181+
1 for name in [exe_basename, cmdline_basename, comm] if name
182+
)
183+
184+
if available_sources < 2:
185+
return None
186+
187+
if exe_basename != cmdline_basename:
188+
notes.append(
189+
f"'Potential cmdline spoofing: exe_file={exe_basename};cmdline={cmdline_basename}'"
190+
)
191+
if exe_basename[:15] != comm:
192+
notes.append(
193+
f"'Potential comm spoofing: exe_file={exe_basename};comm={comm}'"
194+
)
195+
return notes
196+
197+
def _generator(self, tasks) -> Iterator[Tuple[int, Tuple]]:
198+
"""
199+
Generate process spoofing detection results
200+
201+
Args:
202+
tasks: Iterator of task_struct objects
203+
204+
Yields:
205+
Tuple containing process information and spoofing analysis
206+
"""
207+
for task in tasks:
208+
try:
209+
pid = task.pid
210+
ppid = task.get_parent_pid()
211+
212+
exe_basename, cmdline_basename, comm = self._extract_process_names(task)
213+
214+
notes = self._detect_spoofing(exe_basename, cmdline_basename, comm)
215+
216+
exe_render = exe_basename if exe_basename else "N/A"
217+
cmdline_render = cmdline_basename if cmdline_basename else "N/A"
218+
comm_render = comm if comm else "N/A"
219+
220+
yield (
221+
0,
222+
(
223+
pid,
224+
ppid,
225+
exe_render,
226+
cmdline_render,
227+
comm_render,
228+
"[" + ", ".join(notes) + "]" if notes else "OK",
229+
),
230+
)
231+
232+
except Exception as e:
233+
vollog.debug(f"Error processing task at {task.vol.offset:#x}: {e}")
234+
continue
235+
236+
def run(self):
237+
filter_func = pslist.PsList.create_pid_filter(self.config.get("pid", None))
238+
239+
return renderers.TreeGrid(
240+
[
241+
("PID", int),
242+
("PPID", int),
243+
("Exe_Basename", str),
244+
("Cmdline_Basename", str),
245+
("Comm", str),
246+
("Notes", str),
247+
],
248+
self._generator(
249+
pslist.PsList.list_tasks(
250+
self.context, self.config["kernel"], filter_func=filter_func
251+
)
252+
),
253+
)

0 commit comments

Comments
 (0)