Skip to content

Commit bc3ea56

Browse files
authored
Merge pull request #74 from kelnage/analyze-field-names-rule
Introduce analyze command to extract field names
2 parents 6026358 + e617529 commit bc3ea56

File tree

3 files changed

+349
-2
lines changed

3 files changed

+349
-2
lines changed

sigma/analyze/fields.py

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
"""Extract field names from Sigma rules."""
2+
from __future__ import annotations
3+
4+
from operator import add
5+
from typing import List, Set, Tuple
6+
from sigma.rule import SigmaRule, SigmaDetection, SigmaDetectionItem
7+
from sigma.collection import SigmaCollection
8+
from sigma.correlations import SigmaCorrelationRule
9+
from sigma.exceptions import SigmaError, SigmaPlaceholderError
10+
from sigma.modifiers import SigmaExpandModifier
11+
from sigma.types import SigmaString
12+
from sigma.processing.pipeline import ProcessingPipeline
13+
14+
15+
def get_fields(
16+
backend,
17+
rule: SigmaRule | SigmaCorrelationRule,
18+
collect_errors: bool = True,
19+
) -> Tuple[List[str], List[SigmaError]]:
20+
"""Extract field names from a Sigma rule.
21+
22+
Args:
23+
backend: A Backend instance used to escape and quote field names
24+
rule: A SigmaRule or SigmaCorrelationRule to extract fields from
25+
collect_errors: Whether to collect errors. Defaults to True.
26+
27+
Returns:
28+
Tuple[List[str], List[SigmaError]]: A list of fields and any errors found
29+
"""
30+
fields: List[str] = []
31+
errors: List[SigmaError] = []
32+
33+
def noop(field: str) -> str:
34+
"""A no-op function that returns the field as-is."""
35+
return field
36+
37+
# Get the field escaper from the backend
38+
escape_and_quote_field = getattr(backend, "escape_and_quote_field", lambda x: x)
39+
if not callable(escape_and_quote_field):
40+
escape_and_quote_field = noop
41+
42+
if isinstance(rule, SigmaRule):
43+
if not rule.detection:
44+
return fields, errors
45+
46+
# Extract fields from each detection
47+
for key in frozenset(rule.detection.detections.keys()):
48+
_fields, _errors = _get_fields_from_detection_items(
49+
backend,
50+
rule.detection.detections[key].detection_items,
51+
collect_errors,
52+
)
53+
fields.extend(_fields)
54+
errors.extend(_errors)
55+
56+
elif isinstance(rule, SigmaCorrelationRule):
57+
# Handle correlation rules
58+
if rule.group_by:
59+
fields.extend([escape_and_quote_field(field) for field in rule.group_by])
60+
61+
# Handle aliases
62+
if rule.aliases:
63+
aliases_to_remove = set()
64+
for field_alias in rule.aliases:
65+
esc_field_alias = escape_and_quote_field(field_alias.alias)
66+
if esc_field_alias in fields:
67+
aliases_to_remove.add(esc_field_alias)
68+
fields.extend([
69+
escape_and_quote_field(field)
70+
for field in field_alias.mapping.values()
71+
])
72+
fields = [f for f in fields if f not in aliases_to_remove]
73+
74+
return fields, errors
75+
76+
77+
def _get_fields_from_detection_items(
78+
backend,
79+
detection_items: List[SigmaDetectionItem | SigmaDetection],
80+
collect_errors: bool = True,
81+
) -> Tuple[List[str], List[SigmaError]]:
82+
"""Extract fields from detection items recursively.
83+
84+
Args:
85+
backend: A Backend instance used to escape and quote field names
86+
detection_items: A list of SigmaDetectionItem or SigmaDetection
87+
collect_errors: Whether to collect errors. Defaults to True.
88+
89+
Returns:
90+
Tuple[List[str], List[SigmaError]]: A list of fields and any errors found
91+
"""
92+
fields: List[str] = []
93+
errors: List[SigmaError] = []
94+
95+
def noop(field: str) -> str:
96+
"""A no-op function that returns the field as-is."""
97+
return field
98+
99+
escape_and_quote_field = getattr(backend, "escape_and_quote_field", lambda x: x)
100+
if not callable(escape_and_quote_field):
101+
escape_and_quote_field = noop
102+
103+
for di in detection_items:
104+
if isinstance(di, SigmaDetectionItem) and hasattr(di, "field") and di.field:
105+
if collect_errors:
106+
# Check for unexpanded placeholders
107+
has_placeholder_modifier = any(
108+
[
109+
is_sem
110+
for mod in di.modifiers
111+
if (is_sem := issubclass(mod, SigmaExpandModifier))
112+
]
113+
)
114+
has_placeholder_value = any(
115+
[
116+
is_placeholder
117+
for val in di.value
118+
if (
119+
is_placeholder := isinstance(val, SigmaString)
120+
and (
121+
hasattr(val, "contains_placeholder")
122+
and val.contains_placeholder()
123+
)
124+
)
125+
]
126+
)
127+
if all([has_placeholder_modifier, has_placeholder_value]):
128+
errors.append(
129+
SigmaPlaceholderError(
130+
"Cannot extract fields from Sigma rule with unexpanded placeholders."
131+
)
132+
)
133+
fields.append(escape_and_quote_field(di.field))
134+
elif isinstance(di, SigmaDetection):
135+
# Recursively extract fields from nested detections
136+
_fields, _errors = _get_fields_from_detection_items(
137+
backend, di.detection_items, collect_errors
138+
)
139+
fields.extend(_fields)
140+
errors.extend(_errors)
141+
142+
return fields, errors
143+
144+
145+
def extract_fields_from_collection(
146+
collection: SigmaCollection,
147+
backend,
148+
collect_errors: bool = True,
149+
) -> Tuple[Set[str], List[SigmaError]]:
150+
"""Extract all unique field names from a Sigma collection.
151+
152+
Args:
153+
collection: A SigmaCollection to extract fields from
154+
backend: A Backend instance used to escape and quote field names
155+
collect_errors: Whether to collect errors. Defaults to True.
156+
157+
Returns:
158+
Tuple[Set[str], List[SigmaError]]: A set of unique field names and any errors found
159+
"""
160+
all_fields: Set[str] = set()
161+
all_errors: List[SigmaError] = []
162+
163+
for rule in collection:
164+
# Try to apply any processing pipelines if available
165+
last_processing_pipeline = getattr(rule, "last_processing_pipeline", None)
166+
if not last_processing_pipeline:
167+
backend_processing_pipeline = (
168+
getattr(backend, "backend_processing_pipeline", None) or None
169+
)
170+
processing_pipeline = getattr(backend, "processing_pipeline", None) or None
171+
output_format_processing_pipeline = (
172+
getattr(backend, "output_format_processing_pipeline", None) or None
173+
)
174+
175+
if output_format_processing_pipeline and isinstance(output_format_processing_pipeline, dict):
176+
output_format_processing_pipeline = (
177+
output_format_processing_pipeline.get(
178+
getattr(backend, "format", "default")
179+
)
180+
)
181+
182+
if backend_processing_pipeline is None:
183+
backend_processing_pipeline = ProcessingPipeline()
184+
if processing_pipeline is None:
185+
processing_pipeline = ProcessingPipeline()
186+
if output_format_processing_pipeline is None:
187+
output_format_processing_pipeline = ProcessingPipeline()
188+
189+
last_processing_pipeline = add(
190+
backend_processing_pipeline,
191+
add(processing_pipeline, output_format_processing_pipeline),
192+
)
193+
194+
# Apply the processing pipeline to the rule
195+
try:
196+
rule = last_processing_pipeline.apply(rule)
197+
except Exception:
198+
# If pipeline application fails, continue with the rule as-is
199+
pass
200+
201+
# Extract fields from the rule
202+
fields, errors = get_fields(backend, rule, collect_errors)
203+
all_fields.update(fields)
204+
all_errors.extend(errors)
205+
206+
return all_fields, all_errors
207+

sigma/cli/analyze.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
11
import json
22
import pathlib
33
import click
4+
from sigma.processing.resolver import SigmaPipelineNotFoundError
45

6+
from sigma.cli.convert import pipeline_resolver
57
from sigma.cli.rules import check_rule_errors, load_rules
68
from sigma.analyze.attack import score_functions, calculate_attack_scores
9+
from sigma.analyze.fields import extract_fields_from_collection
710
from sigma.data.mitre_attack import (
811
mitre_attack_techniques_tactics_mapping,
912
mitre_attack_version,
1013
)
1114
from sigma.analyze.stats import create_logsourcestats, format_row
1215
from sigma.rule import SigmaLevel, SigmaStatus
16+
from sigma.plugins import InstalledSigmaPlugins
17+
from sigma.conversion.base import Backend
1318

1419

1520
@click.group(name="analyze", help="Analyze Sigma rule sets")
@@ -207,3 +212,102 @@ def analyze_logsource(
207212
print("-+-".join("-" * width for width in column_widths), file=output)
208213
for row in rows:
209214
print(format_row(row, column_widths), file=output)
215+
216+
217+
@analyze_group.command(
218+
name="fields",
219+
help="Extract field names from Sigma rules for a given target backend and processing pipeline(s).",
220+
)
221+
@click.option(
222+
"--file-pattern",
223+
"-P",
224+
default="*.yml",
225+
show_default=True,
226+
help="Pattern for file names to be included in recursion into directories.",
227+
)
228+
@click.option(
229+
"--target",
230+
"-t",
231+
type=str,
232+
required=True,
233+
help="Target backend to use for field name escaping and quoting.",
234+
)
235+
@click.option(
236+
"--pipeline",
237+
"-p",
238+
multiple=True,
239+
help="Specify processing pipelines as identifiers ("
240+
+ click.style("sigma list pipelines", bold=True, fg="green")
241+
+ ") or YAML files or directories",
242+
)
243+
@click.option(
244+
"--pipeline-check/--disable-pipeline-check",
245+
default=True,
246+
help="Verify if a pipeline is used that is intended for another backend.",
247+
)
248+
@click.argument(
249+
"input",
250+
nargs=-1,
251+
required=True,
252+
type=click.Path(exists=True, allow_dash=True, path_type=pathlib.Path),
253+
)
254+
def analyze_fields(file_pattern, target, pipeline, pipeline_check, input):
255+
"""Extract field names from Sigma rule sets.
256+
257+
This command extracts and outputs all unique field names present in the given
258+
Sigma rule collection, formatted for the specified target backend.
259+
"""
260+
# Load plugins and get available backends
261+
plugins = InstalledSigmaPlugins.autodiscover()
262+
backends = plugins.backends
263+
264+
if target not in backends:
265+
available_targets = ", ".join(sorted(backends.keys()))
266+
raise click.ClickException(
267+
f"Unknown target '{target}'. Available targets are: {available_targets}"
268+
)
269+
270+
# Load rules
271+
rules = load_rules(input, file_pattern)
272+
check_rule_errors(rules)
273+
274+
# Resolve pipelines
275+
try:
276+
processing_pipeline = pipeline_resolver.resolve(
277+
pipeline, target if pipeline_check else None
278+
)
279+
except SigmaPipelineNotFoundError as e:
280+
raise click.UsageError(
281+
f"The pipeline '{e.spec}' was not found.\n"
282+
+ "List all installed processing pipelines with: "
283+
+ click.style(f"sigma list pipelines {target}", bold=True, fg="green")
284+
+ "\n"
285+
"List pipeline plugins for installation with: "
286+
+ click.style(
287+
f"sigma plugin list --plugin-type pipeline", bold=True, fg="green"
288+
)
289+
+ "\n"
290+
+ "Pipelines not listed here are treated as file names."
291+
)
292+
293+
# Initialize backend
294+
backend_class = backends[target]
295+
try:
296+
backend: Backend = backend_class(
297+
processing_pipeline=processing_pipeline,
298+
collect_errors=True,
299+
)
300+
except Exception as e:
301+
raise click.ClickException(f"Failed to initialize backend '{target}': {str(e)}")
302+
303+
# Extract fields
304+
all_fields, errors = extract_fields_from_collection(rules, backend)
305+
306+
# Handle errors
307+
if errors:
308+
click.echo("Warnings during field extraction:", err=True)
309+
for error in errors:
310+
click.echo(f"* {error}", err=True)
311+
312+
# Output fields sorted
313+
click.echo("\n".join(sorted(all_fields)))

tests/test_analyze.py

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import pytest
22
from click.testing import CliRunner
3-
from sigma.cli.analyze import analyze_group, analyze_attack, analyze_logsource
3+
from sigma.cli.analyze import analyze_group, analyze_attack, analyze_logsource, analyze_fields
44
from sigma.rule import (
55
SigmaRule,
66
SigmaLogSource,
@@ -208,4 +208,40 @@ def test_logsource_invalid_rule():
208208
cli = CliRunner()
209209
result = cli.invoke(analyze_logsource, ["-", "tests/files/sigma_rule_without_condition.yml"])
210210
assert result.exit_code != 0
211-
assert "at least one condition" in result.stdout
211+
assert "at least one condition" in result.stdout
212+
213+
214+
def test_fields_help():
215+
cli = CliRunner()
216+
result = cli.invoke(analyze_fields, ["--help"])
217+
assert result.exit_code == 0
218+
assert len(result.stdout.split()) > 8
219+
220+
221+
def test_fields_extract():
222+
cli = CliRunner()
223+
result = cli.invoke(analyze_fields, ["-t", "text_query_test", "-", "tests/files/valid"])
224+
assert result.exit_code == 0
225+
# Should have extracted at least some fields
226+
assert len(result.stdout.split()) > 0
227+
228+
229+
def test_fields_extract_correlation_rule():
230+
cli = CliRunner()
231+
result = cli.invoke(analyze_fields, ["-t", "text_query_test", "-", "tests/files/sigma_correlation_rules.yml"])
232+
assert result.exit_code == 0
233+
assert len(result.stdout.split()) > 0
234+
235+
236+
def test_fields_extract_with_pipelines():
237+
cli = CliRunner()
238+
result = cli.invoke(analyze_fields, ["-t", "text_query_test", "-p", "tests/files/custom_pipeline.yml", "-p", "dummy_test", "-", "tests/files/valid"])
239+
assert result.exit_code == 0
240+
assert len(result.stdout.split()) > 0
241+
242+
243+
def test_fields_invalid_rule():
244+
cli = CliRunner()
245+
result = cli.invoke(analyze_fields, ["-t", "text_query_test", "-", "tests/files/sigma_rule_without_condition.yml"])
246+
assert result.exit_code != 0
247+
assert "at least one condition" in result.stdout

0 commit comments

Comments
 (0)