Skip to content

Commit 3ba4c26

Browse files
committed
Add a --group argument to sigma analyze fields to group results by log source. Displays the output in a prettytable.
1 parent 15d2540 commit 3ba4c26

File tree

3 files changed

+45
-10
lines changed

3 files changed

+45
-10
lines changed

sigma/analyze/fields.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from __future__ import annotations
33

44
from operator import add
5-
from typing import List, Set, Tuple
5+
from typing import List, Set, Tuple, Dict, Union
66
from sigma.rule import SigmaRule, SigmaDetection, SigmaDetectionItem
77
from sigma.collection import SigmaCollection
88
from sigma.correlations import SigmaCorrelationRule
@@ -145,19 +145,22 @@ def noop(field: str) -> str:
145145
def extract_fields_from_collection(
146146
collection: SigmaCollection,
147147
backend,
148+
group = False,
148149
collect_errors: bool = True,
149-
) -> Tuple[Set[str], List[SigmaError]]:
150+
) -> Tuple[Union[Set[str], Dict[str, Set[str]]], List[SigmaError]]:
150151
"""Extract all unique field names from a Sigma collection.
151152
152153
Args:
153154
collection: A SigmaCollection to extract fields from
154155
backend: A Backend instance used to escape and quote field names
156+
group: Whether to group fields by logsource. Defaults to False.
155157
collect_errors: Whether to collect errors. Defaults to True.
156158
157159
Returns:
158-
Tuple[Set[str], List[SigmaError]]: A set of unique field names and any errors found
160+
Tuple[Union[Set[str], Dict[str, Set[str]]], List[SigmaError]]: A set of unique field names (or a dict of set if grouped) and any errors found
159161
"""
160162
all_fields: Set[str] = set()
163+
grouped_fields: Dict[str, Set[str]] = {}
161164
all_errors: List[SigmaError] = []
162165

163166
for rule in collection:
@@ -202,6 +205,15 @@ def extract_fields_from_collection(
202205
fields, errors = get_fields(backend, rule, collect_errors)
203206
all_fields.update(fields)
204207
all_errors.extend(errors)
205-
206-
return all_fields, all_errors
208+
if group:
209+
if isinstance(rule, SigmaRule): # Correlations not supported, they don't have logsource
210+
logsource = f"{rule.logsource.category or ''}|{rule.logsource.product or ''}|{rule.logsource.service or ''}"
211+
if logsource not in grouped_fields:
212+
grouped_fields[logsource] = set()
213+
grouped_fields[logsource].update(fields)
214+
215+
if group:
216+
return grouped_fields, all_errors
217+
else:
218+
return all_fields, all_errors
207219

sigma/cli/analyze.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import json
22
import pathlib
33
import click
4+
from prettytable import PrettyTable
45
from sigma.processing.resolver import SigmaPipelineNotFoundError
56

67
from sigma.cli.convert import pipeline_resolver
@@ -245,13 +246,18 @@ def analyze_logsource(
245246
default=True,
246247
help="Verify if a pipeline is used that is intended for another backend.",
247248
)
249+
@click.option(
250+
"--group/--no-group",
251+
default=False,
252+
help="Group fields by logsource.",
253+
)
248254
@click.argument(
249255
"input",
250256
nargs=-1,
251257
required=True,
252258
type=click.Path(exists=True, allow_dash=True, path_type=pathlib.Path),
253259
)
254-
def analyze_fields(file_pattern, target, pipeline, pipeline_check, input):
260+
def analyze_fields(file_pattern, target, pipeline, pipeline_check, group, input):
255261
"""Extract field names from Sigma rule sets.
256262
257263
This command extracts and outputs all unique field names present in the given
@@ -301,13 +307,22 @@ def analyze_fields(file_pattern, target, pipeline, pipeline_check, input):
301307
raise click.ClickException(f"Failed to initialize backend '{target}': {str(e)}")
302308

303309
# Extract fields
304-
all_fields, errors = extract_fields_from_collection(rules, backend)
310+
all_fields, errors = extract_fields_from_collection(rules, backend, group)
305311

306312
# Handle errors
307313
if errors:
308314
click.echo("Warnings during field extraction:", err=True)
309315
for error in errors:
310316
click.echo(f"* {error}", err=True)
311-
312-
# Output fields sorted
313-
click.echo("\n".join(sorted(all_fields)))
317+
318+
if group:
319+
table = PrettyTable()
320+
table.field_names = ["Logsource", "Fields"]
321+
table.align["Logsource"] = "r"
322+
table.align["Fields"] = "l"
323+
for logsource, fields in sorted(all_fields.items()):
324+
table.add_row([logsource, "\n".join(sorted(fields))])
325+
click.echo(table)
326+
else:
327+
# Output fields sorted
328+
click.echo("\n".join(sorted(all_fields)))

tests/test_analyze.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,3 +245,11 @@ def test_fields_invalid_rule():
245245
result = cli.invoke(analyze_fields, ["-t", "text_query_test", "-", "tests/files/sigma_rule_without_condition.yml"])
246246
assert result.exit_code != 0
247247
assert "at least one condition" in result.stderr
248+
249+
def test_fields_grouped_extract():
250+
cli = CliRunner()
251+
result = cli.invoke(analyze_fields, ["-t", "text_query_test", "--group", "-", "tests/files/valid"])
252+
assert result.exit_code == 0
253+
# Should have extracted at least some fields
254+
assert len(result.stdout.split()) > 0
255+
assert "+----------" in result.stdout # Check for table format

0 commit comments

Comments
 (0)