Skip to content

Commit cf489ca

Browse files
committed
Introduce analyze command to extract field names
When working with converting large numbers of Sigma rules, being able to check whether a query's results contains the fields that are searched for in the converted rule is integral to validating the query will work correctly. This introduces a new analyze command, `fields` which is provided a backend, any number of pipelines, and Sigma files, and returns a list of unique fields that appear after the pipeline transformations are taken into account.
1 parent 6026358 commit cf489ca

File tree

3 files changed

+347
-2
lines changed

3 files changed

+347
-2
lines changed

sigma/analyze/fields.py

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
"""Extract field names from Sigma rules."""
2+
from operator import add
3+
from typing import List, Set, Tuple
4+
from sigma.rule import SigmaRule, SigmaDetection, SigmaDetectionItem
5+
from sigma.collection import SigmaCollection
6+
from sigma.correlations import SigmaCorrelationRule
7+
from sigma.exceptions import SigmaError, SigmaPlaceholderError
8+
from sigma.modifiers import SigmaExpandModifier
9+
from sigma.types import SigmaString
10+
from sigma.processing.pipeline import ProcessingPipeline
11+
12+
13+
def get_fields(
14+
backend,
15+
rule: SigmaRule | SigmaCorrelationRule,
16+
collect_errors: bool = True,
17+
) -> Tuple[List[str], List[SigmaError]]:
18+
"""Extract field names from a Sigma rule.
19+
20+
Args:
21+
backend: A Backend instance used to escape and quote field names
22+
rule: A SigmaRule or SigmaCorrelationRule to extract fields from
23+
collect_errors: Whether to collect errors. Defaults to True.
24+
25+
Returns:
26+
Tuple[List[str], List[SigmaError]]: A list of fields and any errors found
27+
"""
28+
fields: List[str] = []
29+
errors: List[SigmaError] = []
30+
31+
def noop(field: str) -> str:
32+
"""A no-op function that returns the field as-is."""
33+
return field
34+
35+
# Get the field escaper from the backend
36+
escape_and_quote_field = getattr(backend, "escape_and_quote_field", lambda x: x)
37+
if not callable(escape_and_quote_field):
38+
escape_and_quote_field = noop
39+
40+
if isinstance(rule, SigmaRule):
41+
if not rule.detection:
42+
return fields, errors
43+
44+
# Extract fields from each detection
45+
for key in frozenset(rule.detection.detections.keys()):
46+
_fields, _errors = _get_fields_from_detection_items(
47+
backend,
48+
rule.detection.detections[key].detection_items,
49+
collect_errors,
50+
)
51+
fields.extend(_fields)
52+
errors.extend(_errors)
53+
54+
elif isinstance(rule, SigmaCorrelationRule):
55+
# Handle correlation rules
56+
if rule.group_by:
57+
fields.extend([escape_and_quote_field(field) for field in rule.group_by])
58+
59+
# Handle aliases
60+
if rule.aliases:
61+
aliases_to_remove = set()
62+
for field_alias in rule.aliases:
63+
esc_field_alias = escape_and_quote_field(field_alias.alias)
64+
if esc_field_alias in fields:
65+
aliases_to_remove.add(esc_field_alias)
66+
fields.extend([
67+
escape_and_quote_field(field)
68+
for field in field_alias.mapping.values()
69+
])
70+
fields = [f for f in fields if f not in aliases_to_remove]
71+
72+
return fields, errors
73+
74+
75+
def _get_fields_from_detection_items(
76+
backend,
77+
detection_items: List[SigmaDetectionItem | SigmaDetection],
78+
collect_errors: bool = True,
79+
) -> Tuple[List[str], List[SigmaError]]:
80+
"""Extract fields from detection items recursively.
81+
82+
Args:
83+
backend: A Backend instance used to escape and quote field names
84+
detection_items: A list of SigmaDetectionItem or SigmaDetection
85+
collect_errors: Whether to collect errors. Defaults to True.
86+
87+
Returns:
88+
Tuple[List[str], List[SigmaError]]: A list of fields and any errors found
89+
"""
90+
fields: List[str] = []
91+
errors: List[SigmaError] = []
92+
93+
def noop(field: str) -> str:
94+
"""A no-op function that returns the field as-is."""
95+
return field
96+
97+
escape_and_quote_field = getattr(backend, "escape_and_quote_field", lambda x: x)
98+
if not callable(escape_and_quote_field):
99+
escape_and_quote_field = noop
100+
101+
for di in detection_items:
102+
if isinstance(di, SigmaDetectionItem) and hasattr(di, "field") and di.field:
103+
if collect_errors:
104+
# Check for unexpanded placeholders
105+
has_placeholder_modifier = any(
106+
[
107+
is_sem
108+
for mod in di.modifiers
109+
if (is_sem := issubclass(mod, SigmaExpandModifier))
110+
]
111+
)
112+
has_placeholder_value = any(
113+
[
114+
is_placeholder
115+
for val in di.value
116+
if (
117+
is_placeholder := isinstance(val, SigmaString)
118+
and (
119+
hasattr(val, "contains_placeholder")
120+
and val.contains_placeholder()
121+
)
122+
)
123+
]
124+
)
125+
if all([has_placeholder_modifier, has_placeholder_value]):
126+
errors.append(
127+
SigmaPlaceholderError(
128+
"Cannot extract fields from Sigma rule with unexpanded placeholders."
129+
)
130+
)
131+
fields.append(escape_and_quote_field(di.field))
132+
elif isinstance(di, SigmaDetection):
133+
# Recursively extract fields from nested detections
134+
_fields, _errors = _get_fields_from_detection_items(
135+
backend, di.detection_items, collect_errors
136+
)
137+
fields.extend(_fields)
138+
errors.extend(_errors)
139+
140+
return fields, errors
141+
142+
143+
def extract_fields_from_collection(
144+
collection: SigmaCollection,
145+
backend,
146+
collect_errors: bool = True,
147+
) -> Tuple[Set[str], List[SigmaError]]:
148+
"""Extract all unique field names from a Sigma collection.
149+
150+
Args:
151+
collection: A SigmaCollection to extract fields from
152+
backend: A Backend instance used to escape and quote field names
153+
collect_errors: Whether to collect errors. Defaults to True.
154+
155+
Returns:
156+
Tuple[Set[str], List[SigmaError]]: A set of unique field names and any errors found
157+
"""
158+
all_fields: Set[str] = set()
159+
all_errors: List[SigmaError] = []
160+
161+
for rule in collection:
162+
# Try to apply any processing pipelines if available
163+
last_processing_pipeline = getattr(rule, "last_processing_pipeline", None)
164+
if not last_processing_pipeline:
165+
backend_processing_pipeline = (
166+
getattr(backend, "backend_processing_pipeline", None) or None
167+
)
168+
processing_pipeline = getattr(backend, "processing_pipeline", None) or None
169+
output_format_processing_pipeline = (
170+
getattr(backend, "output_format_processing_pipeline", None) or None
171+
)
172+
173+
if output_format_processing_pipeline and isinstance(output_format_processing_pipeline, dict):
174+
output_format_processing_pipeline = (
175+
output_format_processing_pipeline.get(
176+
getattr(backend, "format", "default")
177+
)
178+
)
179+
180+
if backend_processing_pipeline is None:
181+
backend_processing_pipeline = ProcessingPipeline()
182+
if processing_pipeline is None:
183+
processing_pipeline = ProcessingPipeline()
184+
if output_format_processing_pipeline is None:
185+
output_format_processing_pipeline = ProcessingPipeline()
186+
187+
last_processing_pipeline = add(
188+
backend_processing_pipeline,
189+
add(processing_pipeline, output_format_processing_pipeline),
190+
)
191+
192+
# Apply the processing pipeline to the rule
193+
try:
194+
rule = last_processing_pipeline.apply(rule)
195+
except Exception:
196+
# If pipeline application fails, continue with the rule as-is
197+
pass
198+
199+
# Extract fields from the rule
200+
fields, errors = get_fields(backend, rule, collect_errors)
201+
all_fields.update(fields)
202+
all_errors.extend(errors)
203+
204+
return all_fields, all_errors
205+

sigma/cli/analyze.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
11
import json
22
import pathlib
33
import click
4+
from sigma.processing.resolver import SigmaPipelineNotFoundError
45

6+
from sigma.cli.convert import pipeline_resolver
57
from sigma.cli.rules import check_rule_errors, load_rules
68
from sigma.analyze.attack import score_functions, calculate_attack_scores
9+
from sigma.analyze.fields import extract_fields_from_collection
710
from sigma.data.mitre_attack import (
811
mitre_attack_techniques_tactics_mapping,
912
mitre_attack_version,
1013
)
1114
from sigma.analyze.stats import create_logsourcestats, format_row
1215
from sigma.rule import SigmaLevel, SigmaStatus
16+
from sigma.plugins import InstalledSigmaPlugins
17+
from sigma.conversion.base import Backend
1318

1419

1520
@click.group(name="analyze", help="Analyze Sigma rule sets")
@@ -207,3 +212,102 @@ def analyze_logsource(
207212
print("-+-".join("-" * width for width in column_widths), file=output)
208213
for row in rows:
209214
print(format_row(row, column_widths), file=output)
215+
216+
217+
@analyze_group.command(
218+
name="fields",
219+
help="Extract field names from Sigma rules for a given target backend and processing pipeline(s).",
220+
)
221+
@click.option(
222+
"--file-pattern",
223+
"-P",
224+
default="*.yml",
225+
show_default=True,
226+
help="Pattern for file names to be included in recursion into directories.",
227+
)
228+
@click.option(
229+
"--target",
230+
"-t",
231+
type=str,
232+
required=True,
233+
help="Target backend to use for field name escaping and quoting.",
234+
)
235+
@click.option(
236+
"--pipeline",
237+
"-p",
238+
multiple=True,
239+
help="Specify processing pipelines as identifiers ("
240+
+ click.style("sigma list pipelines", bold=True, fg="green")
241+
+ ") or YAML files or directories",
242+
)
243+
@click.option(
244+
"--pipeline-check/--disable-pipeline-check",
245+
default=True,
246+
help="Verify if a pipeline is used that is intended for another backend.",
247+
)
248+
@click.argument(
249+
"input",
250+
nargs=-1,
251+
required=True,
252+
type=click.Path(exists=True, allow_dash=True, path_type=pathlib.Path),
253+
)
254+
def analyze_fields(file_pattern, target, pipeline, pipeline_check, input):
255+
"""Extract field names from Sigma rule sets.
256+
257+
This command extracts and outputs all unique field names present in the given
258+
Sigma rule collection, formatted for the specified target backend.
259+
"""
260+
# Load plugins and get available backends
261+
plugins = InstalledSigmaPlugins.autodiscover()
262+
backends = plugins.backends
263+
264+
if target not in backends:
265+
available_targets = ", ".join(sorted(backends.keys()))
266+
raise click.ClickException(
267+
f"Unknown target '{target}'. Available targets are: {available_targets}"
268+
)
269+
270+
# Load rules
271+
rules = load_rules(input, file_pattern)
272+
check_rule_errors(rules)
273+
274+
# Resolve pipelines
275+
try:
276+
processing_pipeline = pipeline_resolver.resolve(
277+
pipeline, target if pipeline_check else None
278+
)
279+
except SigmaPipelineNotFoundError as e:
280+
raise click.UsageError(
281+
f"The pipeline '{e.spec}' was not found.\n"
282+
+ "List all installed processing pipelines with: "
283+
+ click.style(f"sigma list pipelines {target}", bold=True, fg="green")
284+
+ "\n"
285+
"List pipeline plugins for installation with: "
286+
+ click.style(
287+
f"sigma plugin list --plugin-type pipeline", bold=True, fg="green"
288+
)
289+
+ "\n"
290+
+ "Pipelines not listed here are treated as file names."
291+
)
292+
293+
# Initialize backend
294+
backend_class = backends[target]
295+
try:
296+
backend: Backend = backend_class(
297+
processing_pipeline=processing_pipeline,
298+
collect_errors=True,
299+
)
300+
except Exception as e:
301+
raise click.ClickException(f"Failed to initialize backend '{target}': {str(e)}")
302+
303+
# Extract fields
304+
all_fields, errors = extract_fields_from_collection(rules, backend)
305+
306+
# Handle errors
307+
if errors:
308+
click.echo("Warnings during field extraction:", err=True)
309+
for error in errors:
310+
click.echo(f"* {error}", err=True)
311+
312+
# Output fields sorted
313+
click.echo("\n".join(sorted(all_fields)))

tests/test_analyze.py

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import pytest
22
from click.testing import CliRunner
3-
from sigma.cli.analyze import analyze_group, analyze_attack, analyze_logsource
3+
from sigma.cli.analyze import analyze_group, analyze_attack, analyze_logsource, analyze_fields
44
from sigma.rule import (
55
SigmaRule,
66
SigmaLogSource,
@@ -208,4 +208,40 @@ def test_logsource_invalid_rule():
208208
cli = CliRunner()
209209
result = cli.invoke(analyze_logsource, ["-", "tests/files/sigma_rule_without_condition.yml"])
210210
assert result.exit_code != 0
211-
assert "at least one condition" in result.stdout
211+
assert "at least one condition" in result.stdout
212+
213+
214+
def test_fields_help():
215+
cli = CliRunner()
216+
result = cli.invoke(analyze_fields, ["--help"])
217+
assert result.exit_code == 0
218+
assert len(result.stdout.split()) > 8
219+
220+
221+
def test_fields_extract():
222+
cli = CliRunner()
223+
result = cli.invoke(analyze_fields, ["-t", "text_query_test", "-", "tests/files/valid"])
224+
assert result.exit_code == 0
225+
# Should have extracted at least some fields
226+
assert len(result.stdout.split()) > 0
227+
228+
229+
def test_fields_extract_correlation_rule():
230+
cli = CliRunner()
231+
result = cli.invoke(analyze_fields, ["-t", "text_query_test", "-", "tests/files/sigma_correlation_rules.yml"])
232+
assert result.exit_code == 0
233+
assert len(result.stdout.split()) > 0
234+
235+
236+
def test_fields_extract_with_pipelines():
237+
cli = CliRunner()
238+
result = cli.invoke(analyze_fields, ["-t", "text_query_test", "-p", "tests/files/custom_pipeline.yml", "-p", "dummy_test", "-", "tests/files/valid"])
239+
assert result.exit_code == 0
240+
assert len(result.stdout.split()) > 0
241+
242+
243+
def test_fields_invalid_rule():
244+
cli = CliRunner()
245+
result = cli.invoke(analyze_fields, ["-t", "text_query_test", "-", "tests/files/sigma_rule_without_condition.yml"])
246+
assert result.exit_code != 0
247+
assert "at least one condition" in result.stdout

0 commit comments

Comments
 (0)