Skip to content

Commit 6fab237

Browse files
Miauwkeruyunzheng
andauthored
Simplify the command type (#183)
Co-authored-by: Yun Zheng Hu <hu@fox-it.com>
1 parent cd69cf1 commit 6fab237

File tree

5 files changed

+262
-111
lines changed

5 files changed

+262
-111
lines changed

flow/record/fieldtypes/__init__.py

Lines changed: 81 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -752,41 +752,39 @@ def __repr__(self) -> str:
752752

753753

754754
class command(FieldType):
755-
executable: path | None = None
756-
args: list[str] | None = None
755+
"""The command fieldtype splits a command string into an ``executable`` and its arguments.
757756
758-
_path_type: type[path] = None
759-
_posix: bool
757+
Args:
758+
value: the string that contains the command and arguments
759+
path_type: When specified it forces the command to use a specific path type
760760
761-
def __new__(cls, value: str):
762-
if cls is not command:
763-
return super().__new__(cls)
761+
Example:
764762
765-
if not isinstance(value, str):
766-
raise TypeError(f"Expected a value of type 'str' not {type(value)}")
763+
.. code-block:: text
764+
765+
'c:\\windows\\malware.exe /info' -> windows_path('c:\\windows\\malware.exe) ['/info']
766+
'/usr/bin/env bash' -> posix_path('/usr/bin/env') ['bash']
767767
768-
# pre checking for windows like paths
769-
# This checks for windows like starts of a path:
770-
# an '%' for an environment variable
771-
# r'\\' for a UNC path
772-
# the strip and check for ":" on the second line is for `<drive_letter>:`
773-
stripped_value = value.lstrip("\"'")
774-
windows = value.startswith((r"\\", "%")) or (len(stripped_value) >= 2 and stripped_value[1] == ":")
768+
# In this situation, the executable path needs to be quoted.
769+
'c:\\user\\John Doe\\malware.exe /all /the /things' -> windows_path('c:\\user\\John')
770+
['Doe\\malware.exe /all /the /things']
771+
"""
775772

776-
cls = windows_command if windows else posix_command
777-
return super().__new__(cls)
773+
__executable: path
774+
__args: tuple[str, ...]
778775

779-
def __init__(self, value: str | tuple[str, tuple[str]] | None):
780-
if value is None:
781-
return
776+
__path_type: type[path]
782777

783-
if isinstance(value, str):
784-
self.executable, self.args = self._split(value)
785-
return
778+
def __init__(self, value: str = "", *, path_type: type[path] | None = None):
779+
if not isinstance(value, str):
780+
raise TypeError(f"Expected a value of type 'str' not {type(value)}")
781+
782+
raw = value.strip()
786783

787-
executable, self.args = value
788-
self.executable = self._path_type(executable)
789-
self.args = list(self.args)
784+
# Detect the kind of path from value if not specified
785+
self.__path_type = path_type or type(path(raw.lstrip("\"'")))
786+
787+
self.executable, self.args = self._split(raw)
790788

791789
def __repr__(self) -> str:
792790
return f"(executable={self.executable!r}, args={self.args})"
@@ -795,66 +793,77 @@ def __eq__(self, other: object) -> bool:
795793
if isinstance(other, command):
796794
return self.executable == other.executable and self.args == other.args
797795
if isinstance(other, str):
798-
return self._join() == other
796+
return self.raw == other
799797
if isinstance(other, (tuple, list)):
800-
return self.executable == other[0] and self.args == list(other[1:])
798+
return self.executable == other[0] and self.args == (*other[1:],)
801799

802800
return False
803801

804-
def _split(self, value: str) -> tuple[str, list[str]]:
805-
executable, *args = shlex.split(value, posix=self._posix)
806-
executable = executable.strip("'\" ")
807-
808-
return self._path_type(executable), args
802+
def _split(self, value: str) -> tuple[str, tuple[str, ...]]:
803+
if not value:
804+
return "", ()
809805

810-
def _join(self) -> str:
811-
return shlex.join([str(self.executable), *self.args])
806+
executable, *args = shlex.split(value, posix=self.__path_type is posix_path)
807+
return executable.strip("'\" "), (*args,)
812808

813-
def _pack(self) -> tuple[tuple[str, list], str]:
814-
command_type = TYPE_WINDOWS if isinstance(self, windows_command) else TYPE_POSIX
815-
if self.executable:
816-
_exec, _ = self.executable._pack()
817-
return ((_exec, self.args), command_type)
818-
return (None, command_type)
819-
820-
@classmethod
821-
def _unpack(cls, data: tuple[tuple[str, tuple] | None, int]) -> command:
822-
_value, _type = data
823-
if _type == TYPE_WINDOWS:
824-
return windows_command(_value)
825-
826-
return posix_command(_value)
809+
def _pack(self) -> tuple[str, int]:
810+
path_type = TYPE_WINDOWS if self.__path_type is windows_path else TYPE_POSIX
811+
return self.raw, path_type
827812

828813
@classmethod
829-
def from_posix(cls, value: str) -> command:
830-
return posix_command(value)
814+
def _unpack(cls, data: tuple[str, int]) -> command:
815+
raw_str, path_type = data
816+
if path_type == TYPE_POSIX:
817+
return command(raw_str, path_type=posix_path)
818+
if path_type == TYPE_WINDOWS:
819+
return command(raw_str, path_type=windows_path)
820+
# default, infer type of path from str
821+
return command(raw_str)
831822

832-
@classmethod
833-
def from_windows(cls, value: str) -> command:
834-
return windows_command(value)
823+
@property
824+
def executable(self) -> path:
825+
return self.__executable
835826

827+
@property
828+
def args(self) -> tuple[str, ...]:
829+
return self.__args
836830

837-
class posix_command(command):
838-
_posix = True
839-
_path_type = posix_path
831+
@executable.setter
832+
def executable(self, val: str | path | None) -> None:
833+
self.__executable = self.__path_type(val)
840834

835+
@args.setter
836+
def args(self, val: str | tuple[str, ...] | list[str] | None) -> None:
837+
if val is None:
838+
self.__args = ()
839+
return
841840

842-
class windows_command(command):
843-
_posix = False
844-
_path_type = windows_path
841+
if isinstance(val, str):
842+
self.__args = tuple(shlex.split(val, posix=self.__path_type is posix_path))
843+
elif isinstance(val, list):
844+
self.__args = tuple(val)
845+
else:
846+
self.__args = val
845847

846-
def _split(self, value: str) -> tuple[str, list[str]]:
847-
executable, args = super()._split(value)
848-
if args:
849-
args = [" ".join(args)]
848+
@property
849+
def raw(self) -> str:
850+
exe = str(self.executable)
850851

851-
return executable, args
852+
if " " in exe:
853+
exe = shlex.quote(exe)
852854

853-
def _join(self) -> str:
854-
arg = f" {self.args[0]}" if self.args else ""
855-
executable_str = str(self.executable)
855+
result = [exe]
856+
# Only quote on posix paths as shlex doesn't remove the quotes on non posix paths
857+
if self.__path_type is posix_path:
858+
result.extend(shlex.quote(part) if " " in part else part for part in self.args)
859+
else:
860+
result.extend(self.args)
861+
return " ".join(result)
856862

857-
if " " in executable_str:
858-
return f"'{executable_str}'{arg}"
863+
@classmethod
864+
def from_posix(cls, value: str) -> command:
865+
return command(value, path_type=posix_path)
859866

860-
return f"{executable_str}{arg}"
867+
@classmethod
868+
def from_windows(cls, value: str) -> command:
869+
return command(value, path_type=windows_path)

flow/record/jsonpacker.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,7 @@ def pack_obj(self, obj: Any) -> dict | str:
7575
if isinstance(obj, fieldtypes.path):
7676
return str(obj)
7777
if isinstance(obj, fieldtypes.command):
78-
return {
79-
"executable": obj.executable,
80-
"args": obj.args,
81-
}
78+
return obj.raw
8279

8380
raise TypeError(f"Unpackable type {type(obj)}")
8481

tests/adapter/test_json.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import pytest
77

88
from flow.record import RecordReader, RecordWriter
9+
from flow.record.base import RecordDescriptor
910
from tests._utils import generate_records
1011

1112
if TYPE_CHECKING:
@@ -117,3 +118,40 @@ def test_json_adapter_with_record_descriptors(tmp_path: Path, record_adapter_pat
117118
elif record["_type"] == "record":
118119
assert "_recorddescriptor" in record
119120
assert descriptor_seen == 2
121+
122+
123+
def test_json_command_fieldtype(tmp_path: Path) -> None:
124+
json_file = tmp_path.joinpath("records.json")
125+
record_adapter_path = f"jsonfile://{json_file}"
126+
writer = RecordWriter(record_adapter_path)
127+
128+
TestRecord = RecordDescriptor(
129+
"test/command",
130+
[
131+
("command", "commando"),
132+
],
133+
)
134+
135+
writer.write(
136+
TestRecord(
137+
commando="C:\\help.exe data",
138+
)
139+
)
140+
writer.write(
141+
TestRecord(
142+
commando="/usr/bin/env bash",
143+
)
144+
)
145+
writer.write(TestRecord())
146+
writer.flush()
147+
148+
reader = RecordReader(record_adapter_path)
149+
records = list(reader)
150+
151+
assert records[0].commando.executable == "C:\\help.exe"
152+
assert records[0].commando.args == ("data",)
153+
154+
assert records[1].commando.executable == "/usr/bin/env"
155+
assert records[1].commando.args == ("bash",)
156+
157+
assert len(records) == 3

tests/adapter/test_xlsx.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ def test_sanitize_field_values(mock_openpyxl_package: MagicMock) -> None:
3939
fieldtypes.net.ipaddress("13.37.13.37"),
4040
["Shaken", "Not", "Stirred"],
4141
fieldtypes.posix_path("/home/user"),
42-
fieldtypes.posix_command("/bin/bash -c 'echo hello world'"),
42+
fieldtypes.command.from_posix("/bin/bash -c 'echo hello world'"),
4343
fieldtypes.windows_path("C:\\Users\\user\\Desktop"),
44-
fieldtypes.windows_command("C:\\Some.exe /?"),
44+
fieldtypes.command.from_windows("C:\\Some.exe /?"),
4545
]
4646
)
4747
) == [

0 commit comments

Comments
 (0)