Skip to content
97 changes: 97 additions & 0 deletions tests/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import pytest
from docutils.core import publish_string
from docutils.utils import SystemMessage
from flow.record import RecordDescriptor

from dissect.target.exceptions import PluginError, UnsupportedPluginError
from dissect.target.helpers.descriptor_extensions import UserRecordDescriptorExtension
Expand Down Expand Up @@ -1341,6 +1342,102 @@ def test_exported_plugin_format(descriptor: FunctionDescriptor) -> None:
)


def test_plugin_record_field_consistency() -> None:
"""Test if exported plugin functions yielding records do not have conflicting field names and types.

For example, take the following TargetRecordDescriptors for plugin X, Y and Z::

RecordX = TargetRecordDescriptor("record/x", [("varint", "my_field")])
RecordY = TargetRecordDescriptor("record/y", [("path", "my_field")])
RecordZ = TargetRecordDescriptor("record/y", [("string", "my_field")])

The ``RecordX`` descriptor will fail in this test, since the field ``my_field`` cannot be of type ``varint``
while also being used as ``string`` (and ``path``). The ``RecordY`` and ``RecordZ`` descriptors do not conflict,
since the types ``path`` and ``string`` translate to the same ``wildcard`` type.

Uses ``FIELD_TYPES_MAP`` which is loosely based on flow.record and ElasticSearch field types.

Resources:
- https://elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html
- https://github.com/fox-it/flow.record/tree/main/flow/record/fieldtypes
- https://github.com/JSCU-NL/dissect-elastic
"""

seen_field_names: set[str] = set()
seen_field_types: dict[str, tuple[str | None, RecordDescriptor]] = {}
inconsistencies: set[str] = set()

FIELD_TYPES_MAP = {
# strings
"string": "string",
"string[]": "string",
"stringlist": "string",
"wstring": "string",
"path": "string",
"path[]": "string",
"uri": "string",
"command": "string",
"dynamic": "string",
# ints
"varint": "int",
"varint[]": "int",
"filesize": "int",
"uint32": "int",
"uint16": "int",
"float": "float",
# ip / cidr
"net.ipaddress": "ip",
"net.ipaddress[]": "ip",
"net.ipnetwork": "ip_range",
"net.ipnetwork[]": "ip_range",
"net.ipinterface": "ip_range",
"net.ipinterface[]": "ip_range",
# dates
"datetime": "datetime",
"datetime[]": "datetime",
# other
"boolean": "boolean",
"bytes": "binary",
"digest": "keyword",
}

for descriptor in find_functions("*", Target(), compatibility=False, show_hidden=True)[0]:
# Test if plugin function record fields make sense and do not conflict with other records.
if descriptor.output == "record" and hasattr(descriptor, "record"):
# Functions can yield a single record or a list of records.
records = descriptor.record if isinstance(descriptor.record, list) else [descriptor.record]

for record in records:
assert isinstance(record, RecordDescriptor), (
f"{record!r} of function {descriptor!r} is not of type RecordDescriptor"
)
if record.name != "empty":
assert record.fields, f"{record!r} has no fields"

for name, field in record.fields.items():
# Make sure field names have the same type when translated. This check does not save multiple field
# name and typenames, this is a bare-minumum check only.
assert field.typename in FIELD_TYPES_MAP, (
f"Field type {field.typename} is not mapped in FIELD_TYPES_MAP, please add it manually."
)

if name in seen_field_names:
seen_typename, seen_record = seen_field_types[name]
if FIELD_TYPES_MAP[seen_typename] != FIELD_TYPES_MAP[field.typename]:
inconsistencies.add(
f"<{record.name} ({field.typename!r}, '{name}')> is duplicate mismatch of <{seen_record.name} ({seen_typename!r}, '{name}')>" # noqa: E501
)

else:
seen_field_names.add(name)
seen_field_types[name] = (field.typename, record)

if inconsistencies:
pytest.fail(
f"Found {len(inconsistencies)} inconsistencies in RecordDescriptors:\n" + "\n".join(inconsistencies)
)


def assert_valid_rst(src: str) -> None:
"""Attempts to compile the given string to rst."""

Expand Down
Loading