Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 80 additions & 70 deletions flow/record/fieldtypes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,41 +752,39 @@ def __repr__(self) -> str:


class command(FieldType):
executable: path | None = None
args: list[str] | None = None
"""The command fieldtype splits a command string into an ``executable`` and its arguments.

_path_type: type[path] = None
_posix: bool
Args:
value: the string that contains the command and arguments
path_type: When specified it forces the command to use a specific path type

def __new__(cls, value: str):
if cls is not command:
return super().__new__(cls)
Example:

if not isinstance(value, str):
raise TypeError(f"Expected a value of type 'str' not {type(value)}")
.. code-block:: text

'c:\\windows\\malware.exe /info' -> windows_path('c:\\windows\\malware.exe) ['/info']
'/usr/bin/env bash' -> posix_path('/usr/bin/env') ['bash']

# pre checking for windows like paths
# This checks for windows like starts of a path:
# an '%' for an environment variable
# r'\\' for a UNC path
# the strip and check for ":" on the second line is for `<drive_letter>:`
stripped_value = value.lstrip("\"'")
windows = value.startswith((r"\\", "%")) or (len(stripped_value) >= 2 and stripped_value[1] == ":")
# In this situation, the executable path needs to be quoted.
'c:\\user\\John Doe\\malware.exe /all /the /things' -> windows_path('c:\\user\\John')
['Doe\\malware.exe /all /the /things']
"""

cls = windows_command if windows else posix_command
return super().__new__(cls)
__executable: path
__args: tuple[str, ...]

def __init__(self, value: str | tuple[str, tuple[str]] | None):
if value is None:
return
__path_type: type[path]

if isinstance(value, str):
self.executable, self.args = self._split(value)
return
def __init__(self, value: str = "", *, path_type: type[path] | None = None):
if not isinstance(value, str):
raise TypeError(f"Expected a value of type 'str' not {type(value)}")

raw = value.strip()

executable, self.args = value
self.executable = self._path_type(executable)
self.args = list(self.args)
# Detect the kind of path from value if not specified
self.__path_type = path_type or type(path(raw.lstrip("\"'")))

self.executable, self.args = self._split(raw)

def __repr__(self) -> str:
return f"(executable={self.executable!r}, args={self.args})"
Expand All @@ -795,66 +793,78 @@ def __eq__(self, other: object) -> bool:
if isinstance(other, command):
return self.executable == other.executable and self.args == other.args
if isinstance(other, str):
return self._join() == other
return self.raw == other
if isinstance(other, (tuple, list)):
return self.executable == other[0] and self.args == list(other[1:])
return self.executable == other[0] and self.args == (*other[1:],)

return False

def _split(self, value: str) -> tuple[str, list[str]]:
executable, *args = shlex.split(value, posix=self._posix)
executable = executable.strip("'\" ")
def _split(self, value: str) -> tuple[str, tuple[str, ...]]:
if not (value):
return "", ()

return self._path_type(executable), args
executable, *args = shlex.split(value, posix=self.__path_type is posix_path)
return executable.strip("'\" "), (*args,)

def _join(self) -> str:
return shlex.join([str(self.executable), *self.args])

def _pack(self) -> tuple[tuple[str, list], str]:
command_type = TYPE_WINDOWS if isinstance(self, windows_command) else TYPE_POSIX
if self.executable:
_exec, _ = self.executable._pack()
return ((_exec, self.args), command_type)
return (None, command_type)
return self.raw

@classmethod
def _unpack(cls, data: tuple[tuple[str, tuple] | None, int]) -> command:
_value, _type = data
if _type == TYPE_WINDOWS:
return windows_command(_value)

return posix_command(_value)
def _pack(self) -> tuple[str, int]:
path_type = TYPE_WINDOWS if self.__path_type is windows_path else TYPE_POSIX
return self.raw, path_type

@classmethod
def from_posix(cls, value: str) -> command:
return posix_command(value)
def _unpack(cls, data: tuple[str, int]) -> command:
raw_str, path_type = data
if path_type == TYPE_POSIX:
return command(raw_str, path_type=posix_path)
if path_type == TYPE_WINDOWS:
return command(raw_str, path_type=windows_path)
# default, infer type of path from str
return command(raw_str)

@classmethod
def from_windows(cls, value: str) -> command:
return windows_command(value)
@property
def executable(self) -> path:
return self.__executable

@property
def args(self) -> tuple[str, ...]:
return self.__args

class posix_command(command):
_posix = True
_path_type = posix_path
@executable.setter
def executable(self, val: str | path | None) -> None:
self.__executable = self.__path_type(val)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with the latest merged PR #200 we should be able to directly initialize path with the value to determine the correct path type. Then you can als get rid of __path_type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean removing the whole __path_type in general? As in that case, there will be issues with for example hello_world.exe that will be interpreted differently depending on what system it gets run on.

If you just mean this specific case, this could still be an issue because the whole type can change from windows to posix path and vise versa. Tho I already have an idea on how to fix that specific issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah wait I thought __path_type was a function to determine the path type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, no. It is a type[path], so either a windows_path or a posix_path :)


@args.setter
def args(self, val: str | tuple[str, ...] | list[str] | None) -> None:
if val is None:
self.__args = ()
elif isinstance(val, (tuple, list)):
if val and self.__path_type is windows_path:
val = (" ".join(val),)
self.__args = (*val,)
else:
self.__args = tuple(shlex.split(val, posix=self.__path_type is posix_path))

class windows_command(command):
_posix = False
_path_type = windows_path
@property
def raw(self) -> str:
exe = str(self.executable)

def _split(self, value: str) -> tuple[str, list[str]]:
executable, args = super()._split(value)
if args:
args = [" ".join(args)]
if " " in exe:
exe = shlex.quote(exe)

return executable, args
result = [exe]
if self.__path_type is posix_path:
result.extend(shlex.quote(part) if " " in part else part for part in self.args)
else:
result.extend(self.args)

def _join(self) -> str:
arg = f" {self.args[0]}" if self.args else ""
executable_str = str(self.executable)
return " ".join(result)

if " " in executable_str:
return f"'{executable_str}'{arg}"
@classmethod
def from_posix(cls, value: str) -> command:
return command(value, path_type=posix_path)

return f"{executable_str}{arg}"
@classmethod
def from_windows(cls, value: str) -> command:
return command(value, path_type=windows_path)
5 changes: 1 addition & 4 deletions flow/record/jsonpacker.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,7 @@ def pack_obj(self, obj: Any) -> dict | str:
if isinstance(obj, fieldtypes.path):
return str(obj)
if isinstance(obj, fieldtypes.command):
return {
"executable": obj.executable,
"args": obj.args,
}
return obj.raw

raise TypeError(f"Unpackable type {type(obj)}")

Expand Down
38 changes: 38 additions & 0 deletions tests/adapter/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pytest

from flow.record import RecordReader, RecordWriter
from flow.record.base import RecordDescriptor
from tests._utils import generate_records

if TYPE_CHECKING:
Expand Down Expand Up @@ -117,3 +118,40 @@ def test_json_adapter_with_record_descriptors(tmp_path: Path, record_adapter_pat
elif record["_type"] == "record":
assert "_recorddescriptor" in record
assert descriptor_seen == 2


def test_json_command_fieldtype(tmp_path: Path) -> None:
json_file = tmp_path.joinpath("records.json")
record_adapter_path = f"jsonfile://{json_file}"
writer = RecordWriter(record_adapter_path)

TestRecord = RecordDescriptor(
"test/command",
[
("command", "commando"),
],
)

writer.write(
TestRecord(
commando="C:\\help.exe data",
)
)
writer.write(
TestRecord(
commando="/usr/bin/env bash",
)
)
writer.write(TestRecord())
writer.flush()

reader = RecordReader(record_adapter_path)
records = list(reader)

assert records[0].commando.executable == "C:\\help.exe"
assert records[0].commando.args == ("data",)

assert records[1].commando.executable == "/usr/bin/env"
assert records[1].commando.args == ("bash",)

assert len(records) == 3
4 changes: 2 additions & 2 deletions tests/adapter/test_xlsx.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ def test_sanitize_field_values(mock_openpyxl_package: MagicMock) -> None:
fieldtypes.net.ipaddress("13.37.13.37"),
["Shaken", "Not", "Stirred"],
fieldtypes.posix_path("/home/user"),
fieldtypes.posix_command("/bin/bash -c 'echo hello world'"),
fieldtypes.command.from_posix("/bin/bash -c 'echo hello world'"),
fieldtypes.windows_path("C:\\Users\\user\\Desktop"),
fieldtypes.windows_command("C:\\Some.exe /?"),
fieldtypes.command.from_windows("C:\\Some.exe /?"),
]
)
) == [
Expand Down
Loading