Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions volatility3/cli/volshell/linux.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,24 @@
#

from typing import Any, List, Optional, Tuple, Union
from enum import Enum

from volatility3.cli.volshell import generic
from volatility3.framework import constants, interfaces
from volatility3.framework.configuration import requirements
from volatility3.plugins.linux import pslist


# Could import the enum from psscan.py to avoid code duplication
class DescExitStateEnum(Enum):
"""Enum for linux task exit_state as defined in include/linux/sched.h"""

TASK_RUNNING = 0x00000000
EXIT_DEAD = 0x00000010
EXIT_ZOMBIE = 0x00000020
EXIT_TRACE = EXIT_ZOMBIE | EXIT_DEAD


class Volshell(generic.Volshell):
"""Shell environment to directly interact with a linux memory image."""

Expand Down Expand Up @@ -40,6 +51,71 @@ def change_task(self, pid=None):
return None
print(f"No task with task ID {pid} found")

def get_process(self, pid=None, virtaddr=None, physaddr=None):
"""Return the task_struct object that matches the pid. If a physical or a virtual address is provided, construct the task_struct object at said address. Only one parameter is allowed.

Args:
pid (int, optional): PID to search for
virtaddr (int, optional): Virtual address to construct object at
physaddr (int, optional): Physical address to construct object at

Returns:
ObjectInterface: task_struct Object
"""

if sum(1 if x is not None else 0 for x in [pid, virtaddr, physaddr]) != 1:
print("Only one parameter is accepted")
return None

vmlinux_module_name = self.config["kernel"]
vmlinux = self.context.modules[vmlinux_module_name]

kernel_layer_name = vmlinux.layer_name
kernel_layer = self.context.layers[kernel_layer_name]

memory_layer_name = kernel_layer.dependencies[0]

task_struct_symbol = vmlinux.symbol_table_name + constants.BANG + "task_struct"

if virtaddr is not None:
task = self.context.object(
task_struct_symbol,
layer_name=kernel_layer_name,
offset=virtaddr,
)

if physaddr is not None:
task = self.context.object(
task_struct_symbol,
layer_name=memory_layer_name,
offset=physaddr,
native_layer_name=kernel_layer_name,
)

if physaddr is not None or virtaddr is not None:
try:
DescExitStateEnum(task.exit_state)
except ValueError:
print(
f"task_struct @ {hex(task.vol.offset)} as exit_state {task.exit_state} is likely not valid"
)

if not (0 < task.pid < 65535):
print(
f"task_struct @ {hex(task.vol.offset)} as pid {task.pid} is likely not valid"
)

return task

if pid is not None:
tasks = self.list_tasks()
for task in tasks:
if task.pid == pid:
return task
print(f"No task with task ID {pid} found")

return None

def list_tasks(self):
"""Returns a list of task objects from the primary layer"""
# We always use the main kernel memory and associated symbols
Expand All @@ -50,6 +126,7 @@ def construct_locals(self) -> List[Tuple[List[str], Any]]:
result += [
(["ct", "change_task", "cp"], self.change_task),
(["lt", "list_tasks", "ps"], self.list_tasks),
(["gp", "get_process", "get_task"], self.get_process),
(["symbols"], self.context.symbol_space[self.current_symbol_table]),
]
if self.config.get("pid", None) is not None:
Expand Down
56 changes: 56 additions & 0 deletions volatility3/cli/volshell/windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,67 @@ def list_processes(self):
)
)

def get_process(self, pid=None, virtaddr=None, physaddr=None):
"""Returns the _EPROCESS object that matches the pid. If a physical or a virtual address is provided, construct the _EPROCESS object at said address. Only one parameter is allowed.

Args:
pid (int, optional): PID / UniqueProcessId to search for.
virtaddr (int, optional): Virtual address to construct object at
physaddr (int, optional): Physical address to construct object at

Returns:
ObjectInterface: _EPROCESS Object
"""

if sum(1 if x is not None else 0 for x in [pid, virtaddr, physaddr]) != 1:
print("Only one parameter is accepted")
return None

kernel_name = self.config["kernel"]
kernel = self.context.modules[kernel_name]

kernel_layer_name = kernel.layer_name

kernel_layer = self.context.layers[kernel_layer_name]
memory_layer_name = kernel_layer.dependencies[0]

eprocess_symbol = kernel.symbol_table_name + constants.BANG + "_EPROCESS"

if virtaddr is not None:
eproc = self.context.object(
eprocess_symbol,
layer_name=kernel_layer_name,
offset=virtaddr,
)

return eproc

if physaddr is not None:
eproc = self.context.object(
eprocess_symbol,
layer_name=memory_layer_name,
offset=physaddr,
native_layer_name=kernel_layer_name,
)

return eproc

if pid is not None:
processes = self.list_processes()
for process in processes:
if process.UniqueProcessId == pid:
return process
print(f"No process with process ID {pid} found")
return None

return None

def construct_locals(self) -> List[Tuple[List[str], Any]]:
result = super().construct_locals()
result += [
(["cp", "change_process"], self.change_process),
(["lp", "list_processes", "ps"], self.list_processes),
(["gp", "get_process"], self.get_process),
(["symbols"], self.context.symbol_space[self.current_symbol_table]),
]
if self.config.get("pid", None) is not None:
Expand Down
Loading