|
2 | 2 | from __future__ import annotations |
3 | 3 |
|
4 | 4 | from operator import add |
5 | | -from typing import List, Set, Tuple |
| 5 | +from typing import List, Set, Tuple, Dict, Union |
6 | 6 | from sigma.rule import SigmaRule, SigmaDetection, SigmaDetectionItem |
7 | 7 | from sigma.collection import SigmaCollection |
8 | 8 | from sigma.correlations import SigmaCorrelationRule |
@@ -145,19 +145,22 @@ def noop(field: str) -> str: |
145 | 145 | def extract_fields_from_collection( |
146 | 146 | collection: SigmaCollection, |
147 | 147 | backend, |
| 148 | + group = False, |
148 | 149 | collect_errors: bool = True, |
149 | | -) -> Tuple[Set[str], List[SigmaError]]: |
| 150 | +) -> Tuple[Union[Set[str], Dict[str, Set[str]]], List[SigmaError]]: |
150 | 151 | """Extract all unique field names from a Sigma collection. |
151 | 152 | |
152 | 153 | Args: |
153 | 154 | collection: A SigmaCollection to extract fields from |
154 | 155 | backend: A Backend instance used to escape and quote field names |
| 156 | + group: Whether to group fields by logsource. Defaults to False. |
155 | 157 | collect_errors: Whether to collect errors. Defaults to True. |
156 | 158 | |
157 | 159 | Returns: |
158 | | - Tuple[Set[str], List[SigmaError]]: A set of unique field names and any errors found |
| 160 | + Tuple[Union[Set[str], Dict[str, Set[str]]], List[SigmaError]]: A set of unique field names (or a dict of set if grouped) and any errors found |
159 | 161 | """ |
160 | 162 | all_fields: Set[str] = set() |
| 163 | + grouped_fields: Dict[str, Set[str]] = {} |
161 | 164 | all_errors: List[SigmaError] = [] |
162 | 165 |
|
163 | 166 | for rule in collection: |
@@ -202,6 +205,15 @@ def extract_fields_from_collection( |
202 | 205 | fields, errors = get_fields(backend, rule, collect_errors) |
203 | 206 | all_fields.update(fields) |
204 | 207 | all_errors.extend(errors) |
205 | | - |
206 | | - return all_fields, all_errors |
| 208 | + if group: |
| 209 | + if isinstance(rule, SigmaRule): # Correlations not supported, they don't have logsource |
| 210 | + logsource = f"{rule.logsource.category or ''}|{rule.logsource.product or ''}|{rule.logsource.service or ''}" |
| 211 | + if logsource not in grouped_fields: |
| 212 | + grouped_fields[logsource] = set() |
| 213 | + grouped_fields[logsource].update(fields) |
| 214 | + |
| 215 | + if group: |
| 216 | + return grouped_fields, all_errors |
| 217 | + else: |
| 218 | + return all_fields, all_errors |
207 | 219 |
|
0 commit comments