Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions volatility3/cli/volshell/linux.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,24 @@
#

from typing import Any, List, Optional, Tuple, Union
from enum import Enum

from volatility3.cli.volshell import generic
from volatility3.framework import constants, interfaces
from volatility3.framework.configuration import requirements
from volatility3.plugins.linux import pslist


# Could import the enum from psscan.py to avoid code duplication
class DescExitStateEnum(Enum):
"""Enum for linux task exit_state as defined in include/linux/sched.h"""

TASK_RUNNING = 0x00000000
EXIT_DEAD = 0x00000010
EXIT_ZOMBIE = 0x00000020
EXIT_TRACE = EXIT_ZOMBIE | EXIT_DEAD


class Volshell(generic.Volshell):
"""Shell environment to directly interact with a linux memory image."""

Expand Down Expand Up @@ -40,6 +51,52 @@ def change_task(self, pid=None):
return None
print(f"No task with task ID {pid} found")

def get_process(self, pid=None, offset=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implies offset is always a physical offset. Were you going to split that into physical_offset and virtual_offset or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did split the code, basically the same as the windows side, but wanted to get your input on the is_valid() thing before committing

"""Get Task based on a process ID. Does not retrieve the layer, to change layer use the .pid attribute. The offset argument can be used both for physical or virtual offsets"""

if pid is not None and offset is not None:
print("Only one parameter is accepted")
return None

if offset is not None:
vmlinux_module_name = self.config["kernel"]
vmlinux = self.context.modules[vmlinux_module_name]

kernel_layer_name = vmlinux.layer_name
kernel_layer = self.context.layers[kernel_layer_name]

memory_layer_name = kernel_layer.dependencies[0]

ptask = self.context.object(
vmlinux.symbol_table_name + constants.BANG + "task_struct",
layer_name=memory_layer_name,
offset=offset,
native_layer_name=kernel_layer_name,
)

try:
DescExitStateEnum(ptask.exit_state)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this statement to test for offset validity? If so I think something like is_valid would work better, so ptask.exit_state.is_valid()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I yanked this from the psscan where it checks for a valid exit state psscan.py. Also it seems that is_valid() is not a method of the members of the task_struct but rather the struct's itself pslist.py

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yeah, fair enough. It will work, just sticks out a little to me...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall I use is_valid() then on all proc/tasks? On windows since dt() breaks if the object is not valid, returning None makes more sense, than just printing a warning

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's alright, it just stands out to me as a method of testing (and getting a second example in the code base will make it harder to prevent if more people add it that way). There's currently a bit of a shift in the linux codebase where they're implementing check validity first rather than throwing exceptions. If you'd like to and can get it to work, it might be better in keeping with the rest of the codebase, but if you don't want to or it's tricky to achieve then that's fine...

except ValueError:
print(
f"task_struct @ {hex(ptask.vol.offset)} as exit_state {ptask.exit_state} is likely not valid"
)

if not (0 < ptask.pid < 65535):
print(
f"task_struct @ {hex(ptask.vol.offset)} as pid {ptask.pid} is likely not valid"
)

return ptask

if pid is not None:
tasks = self.list_tasks()
for task in tasks:
if task.pid == pid:
return task
print(f"No task with task ID {pid} found")

return None

def list_tasks(self):
"""Returns a list of task objects from the primary layer"""
# We always use the main kernel memory and associated symbols
Expand All @@ -50,6 +107,7 @@ def construct_locals(self) -> List[Tuple[List[str], Any]]:
result += [
(["ct", "change_task", "cp"], self.change_task),
(["lt", "list_tasks", "ps"], self.list_tasks),
(["gp", "get_process"], self.get_process),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add "get_task" to the end of that list too please? gt won't work, but get_task then would... Can't tell if it's worth adding that alias to windows too, I'll leave that one up to you...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't particularly like that we needed to mix process/task, so for the windows side I think i'd leave it just with the process suffix

(["symbols"], self.context.symbol_space[self.current_symbol_table]),
]
if self.config.get("pid", None) is not None:
Expand Down
47 changes: 47 additions & 0 deletions volatility3/cli/volshell/windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,58 @@ def list_processes(self):
)
)

def get_process(self, pid=None, v_offset=None, p_offset=None):
"""Returns the EPROCESS object that matches the pid. If v_offset/p_offset is provided, construct the EPROCESS object at the provided address. Only one parameter is allowed."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here with regards to Args: and Returns:. Also I kinda prefer variables have more descriptive names (although I'll allow it if you really want, since it is likely to be typed be developers quite often).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall I use virtaddr and physaddr on both?


if sum(1 if x is not None else 0 for x in [pid, v_offset, p_offset]) != 1:
print("Only one parameter is accepted")
return None

kernel_name = self.config["kernel"]
kernel = self.context.modules[kernel_name]

kernel_layer_name = kernel.layer_name

kernel_layer = self.context.layers[kernel_layer_name]
memory_layer_name = kernel_layer.dependencies[0]

eprocess_symbol = kernel.symbol_table_name + constants.BANG + "_EPROCESS"

if v_offset is not None:
eproc = self.context.object(
eprocess_symbol,
layer_name=kernel_layer_name,
offset=v_offset,
)

return eproc

if p_offset is not None:
eproc = self.context.object(
eprocess_symbol,
layer_name=memory_layer_name,
offset=p_offset,
native_layer_name=kernel_layer_name,
)

return eproc

if pid is not None:
processes = self.list_processes()
for process in processes:
if process.UniqueProcessId == pid:
return process
print(f"No process with process ID {pid} found")
return None

return None

def construct_locals(self) -> List[Tuple[List[str], Any]]:
result = super().construct_locals()
result += [
(["cp", "change_process"], self.change_process),
(["lp", "list_processes", "ps"], self.list_processes),
(["gp", "get_process"], self.get_process),
(["symbols"], self.context.symbol_space[self.current_symbol_table]),
]
if self.config.get("pid", None) is not None:
Expand Down
Loading