Skip to content

Commit 3a995ca

Browse files
kurisukunm4dh4tChris Barros
authored
Feat: elastalert backend (#95)
* First draft * Cleanup * feat(elastalert): handle priority and multi rule queries exceptions * feat(elastalert): add test on unimplemented temporal ordered correlation rules * Cleanup * fix incorrect type hinting * mdr * feat(elastalert): handle type any for basic alerts * remove useless exception covered by sigma * remove imports * feat(elastalert): invert type and priority order * feat(elastalert): add rulr title in output * feat(elastalert): avoid YAML imcompatibility on index * Fix tests and index selection * feat(tests): add unit tests on single and multiples indexes coverage * Fix Python 3.9 syntax --------- Co-authored-by: Yann Pellegrin <yann.pellegrin@hacknowledge.ch> Co-authored-by: Yann <yann.pellegrin@pm.me> Co-authored-by: Chris Barros <chris.barros@hacknowledge.ch>
1 parent face4ad commit 3a995ca

File tree

2 files changed

+594
-0
lines changed

2 files changed

+594
-0
lines changed
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
from typing import ClassVar, Dict, List, Optional, Union
2+
3+
from sigma.rule import SigmaRule
4+
from sigma.conversion.state import ConversionState
5+
from sigma.conversion.deferred import DeferredQueryExpression
6+
from sigma.processing.pipeline import ProcessingPipeline
7+
from sigma.correlations import SigmaCorrelationConditionOperator
8+
from sigma.correlations import SigmaCorrelationRule, SigmaCorrelationTimespan
9+
from sigma.exceptions import SigmaFeatureNotSupportedByBackendError
10+
from sigma.backends.elasticsearch.elasticsearch_lucene import LuceneBackend
11+
12+
13+
class ElastalertBackend(LuceneBackend):
14+
"""
15+
Elastalert backend for Sigma. Converts Sigma rule into Elastalert rule, including correlation rules.
16+
"""
17+
18+
# A descriptive name of the backend
19+
name: ClassVar[str] = "Elasticsearch Elastalert"
20+
# Output formats provided by the backend as name -> description mapping.
21+
# The name should match to finalize_output_<name>.
22+
formats: ClassVar[Dict[str, str]] = {
23+
"default": "Elastalert rule",
24+
}
25+
# Does the backend requires that a processing pipeline is provided?
26+
requires_pipeline: ClassVar[bool] = True
27+
28+
state_defaults: ClassVar[Dict[str, str]] = {
29+
"index": "*",
30+
}
31+
32+
timespan_mapping: ClassVar[Dict[str, str]] = {
33+
"s": "seconds",
34+
"m": "minutes",
35+
"h": "hours",
36+
"d": "days",
37+
"w": "weeks",
38+
"M": "months",
39+
"y": "years",
40+
}
41+
42+
correlation_methods: ClassVar[Dict[str, str]] = {
43+
"default": "Elastalert correlation rule",
44+
}
45+
default_correlation_query: ClassVar[Dict[str, str]] = {
46+
"default": "{search}\n{aggregate}\n{condition}"
47+
}
48+
49+
correlation_search_single_rule_expression: ClassVar[str] = "{query}"
50+
correlation_condition_mapping: ClassVar[Dict[str, str]] = {
51+
SigmaCorrelationConditionOperator.GT: "max_threshold",
52+
SigmaCorrelationConditionOperator.LT: "min_threshold",
53+
}
54+
55+
event_count_aggregation_expression: ClassVar[Dict[str, str]] = {
56+
"default": "timeframe:\n {timespan}\n{groupby}"
57+
}
58+
value_count_aggregation_expression: ClassVar[Dict[str, str]] = {
59+
"default": "buffer_time:\n {timespan}\n{groupby}"
60+
}
61+
62+
groupby_expression: ClassVar[Dict[str, str]] = {"default": "query_key:\n{fields}"}
63+
groupby_field_expression: ClassVar[Dict[str, str]] = {"default": "- {field}"}
64+
groupby_field_expression_joiner: ClassVar[Dict[str, str]] = {"default": "\n"}
65+
66+
event_count_condition_expression: ClassVar[Dict[str, str]] = {
67+
"default": "num_events: {count}\ntype: frequency"
68+
}
69+
value_count_condition_expression: ClassVar[Dict[str, str]] = {
70+
"default": (
71+
"metric_agg_type: cardinality\n"
72+
"metric_agg_key: {field}\n"
73+
"{op}: {count}\n"
74+
"type: metric_aggregation"
75+
)
76+
}
77+
78+
def __init__(
79+
self,
80+
processing_pipeline: Optional["ProcessingPipeline"] = None,
81+
collect_errors: bool = False,
82+
**kwargs,
83+
):
84+
super().__init__(processing_pipeline, collect_errors, **kwargs)
85+
self.severity_risk_mapping = {
86+
"INFORMATIONAL": 0,
87+
"LOW": 1,
88+
"MEDIUM": 2,
89+
"HIGH": 3,
90+
"CRITICAL": 4,
91+
}
92+
93+
def convert_correlation_search(
94+
self,
95+
rule: SigmaCorrelationRule,
96+
**kwargs,
97+
) -> str:
98+
if len(rule.rules) != 1:
99+
raise SigmaFeatureNotSupportedByBackendError(
100+
"Multiple correlation rules are not supported by Elastalert backend"
101+
)
102+
103+
return super().convert_correlation_search(rule, **kwargs)
104+
105+
def convert_timespan(
106+
self,
107+
timespan: SigmaCorrelationTimespan,
108+
output_format: Optional[str] = None,
109+
method: Optional[str] = None,
110+
) -> str:
111+
return f"{self.timespan_mapping[timespan.unit]}: {timespan.count}"
112+
113+
def preprocess_indices(self, indices: List[str]) -> str:
114+
if not indices:
115+
return self.state_defaults["index"]
116+
117+
if self.wildcard_multi in indices:
118+
return self.wildcard_multi
119+
120+
if len(indices) == 1:
121+
return indices[0]
122+
123+
# Deduplicate sources using a set
124+
indices = list(set(indices))
125+
126+
# Sort the indices to ensure a consistent order as sets are arbitrary ordered
127+
indices.sort()
128+
129+
return ",".join(indices)
130+
131+
def finalize_query(
132+
self,
133+
rule: SigmaRule,
134+
query: Union[str, DeferredQueryExpression],
135+
index: int,
136+
state: ConversionState,
137+
output_format: str,
138+
) -> Union[str, DeferredQueryExpression]:
139+
# If set, load the index from the processing state
140+
index_state = (
141+
state.processing_state.get("index", self.state_defaults["index"])
142+
if isinstance(rule, SigmaRule)
143+
else [
144+
state.processing_state.get("index", self.state_defaults["index"])
145+
for rule_reference in rule.rules
146+
for state in rule_reference.rule.get_conversion_states()
147+
]
148+
)
149+
# If the non-default index is not a string, preprocess it
150+
if not isinstance(index_state, str):
151+
index_state = self.preprocess_indices(index_state)
152+
153+
# Save the processed index back to the processing state
154+
state.processing_state["index"] = index_state
155+
return super().finalize_query(rule, query, index, state, output_format)
156+
157+
def finalize_query_default(
158+
self, rule: SigmaRule, query: str, index: int, state: ConversionState
159+
) -> str:
160+
alert_type = "type: any\n" if not isinstance(rule, SigmaCorrelationRule) else ""
161+
162+
return (
163+
f"description: {rule.description if rule.description else ''}\n"
164+
f"name: {rule.title if rule.title else ''}\n"
165+
f"index: \"{state.processing_state['index']}\"\n"
166+
"filter:\n"
167+
"- query:\n"
168+
" query_string:\n"
169+
f" query: {query}\n"
170+
f"{alert_type}"
171+
f"priority: {self.severity_risk_mapping[rule.level.name] if rule.level is not None else 1}"
172+
)
173+
174+
def finalize_output_default(self, queries: List[str]) -> List[str]:
175+
return list(queries)

0 commit comments

Comments
 (0)