Skip to content

Commit 815c4ac

Browse files
authored
chore: Add more pytest (#59)
* test: 🚨 Add Cover for config.py * test: 🚨 Add more pytest * test: 🚨 Remove test_validator_SigmahqUnsupportedRegexGroupConstruct_atomic_group
1 parent fe330e8 commit 815c4ac

File tree

9 files changed

+896
-144
lines changed

9 files changed

+896
-144
lines changed

β€Žpoetry.lockβ€Ž

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 108 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -1,146 +1,160 @@
1-
from dataclasses import dataclass
21
import json
3-
import os
4-
import pathlib
2+
from pathlib import Path
53
from typing import Dict, List, Optional
64
from sigma.rule import SigmaLogSource
75
from .sigmahq_data import (
6+
taxonomy_version,
87
ref_sigmahq_logsource_filepattern,
8+
file_pattern_version,
99
ref_sigmahq_fieldsname,
1010
ref_sigmahq_redundant_field,
1111
ref_sigmahq_logsource_definition,
12+
windows_version,
1213
ref_windows_provider_name,
1314
ref_windows_no_eventid,
1415
)
1516
import requests
1617

1718

1819
def core_logsource(source: SigmaLogSource) -> SigmaLogSource:
20+
"""Create a core logsource with product, category and service."""
1921
return SigmaLogSource(product=source.product, category=source.category, service=source.service)
2022

2123

2224
def key_logsource(source: dict) -> str:
23-
product = source["product"] if source["product"] else "none"
24-
category = source["category"] if source["category"] else "none"
25-
service = source["service"] if source["service"] else "none"
25+
"""Generate a unique key for a logsource dictionary."""
26+
product = source.get("product", "none") or "none"
27+
category = source.get("category", "none") or "none"
28+
service = source.get("service", "none") or "none"
2629
return f"{product}_{category}_{service}"
2730

2831

2932
class ConfigHQ:
30-
"""Loads SigmaHQ configuration from local JSON files if available, otherwise uses reference data."""
33+
"""Loads SigmaHQ configuration from local JSON files if available, otherwise uses reference data.
34+
35+
Supports both local and remote configuration sources with caching and fallback mechanisms.
36+
"""
3137

3238
JSON_FOLDER: str = "validator_json"
3339
JSON_NAME_TAXONOMY: str = "sigmahq_taxonomy.json"
3440
JSON_NAME_FILENAME: str = "sigmahq_filename.json"
3541
JSON_NAME_WINDOWS_PROVIDER: str = "sigmahq_windows_validator.json"
3642

37-
taxonomy_version: str = "0.0.0"
38-
sigma_fieldsname: Dict[SigmaLogSource, List[str]] = {}
39-
sigmahq_redundant_fields: Dict[SigmaLogSource, List[str]] = {}
40-
41-
filename_version: str = "0.0.0"
42-
sigmahq_logsource_filepattern: Dict[SigmaLogSource, str] = {}
43-
44-
windows_version: str = "0.0.0"
45-
windows_no_eventid: List[str] = []
46-
windows_provider_name: Dict[SigmaLogSource, List[str]] = {}
47-
sigmahq_logsource_definition: Dict[SigmaLogSource, Optional[str]] = {}
48-
49-
def __init__(self, config_dir: Optional[str] = None):
50-
# Accept both local path and remote URL for config_dir
51-
self.is_remote = False
52-
if config_dir and (config_dir.startswith("http://") or config_dir.startswith("https://")):
53-
self.config_dir = config_dir.rstrip("/")
54-
self.is_remote = True
43+
def __init__(self, data_place: Optional[str] = None):
44+
# Initialize with internal reference data
45+
self.taxonomy_version = taxonomy_version
46+
self.sigmahq_redundant_fields = ref_sigmahq_redundant_field
47+
self.sigma_fieldsname = ref_sigmahq_fieldsname
48+
self.sigmahq_logsource_definition = ref_sigmahq_logsource_definition
49+
self.filename_version = file_pattern_version
50+
self.sigmahq_logsource_filepattern = ref_sigmahq_logsource_filepattern
51+
self.windows_version = windows_version
52+
self.windows_provider_name = ref_windows_provider_name
53+
self.windows_no_eventid = ref_windows_no_eventid
54+
55+
# Determine configuration source
56+
self.config_dir: Optional[Path] = None
57+
self.config_url: Optional[str] = None
58+
59+
if data_place is None:
60+
# Check default local folder
61+
default_path = Path.cwd() / self.JSON_FOLDER
62+
if default_path.exists():
63+
self.config_dir = default_path
64+
elif data_place.startswith("http://") or data_place.startswith("https://"):
65+
self.config_url = data_place.rstrip("/")
5566
else:
56-
self.config_dir = (
57-
pathlib.Path(config_dir) if config_dir else pathlib.Path.cwd() / self.JSON_FOLDER
58-
)
59-
60-
self._load_sigma_json()
61-
self._load_filename_json()
62-
self._load_windows_provider_json()
63-
64-
def _load_json(self, filename: str):
65-
if self.is_remote:
66-
url = f"{self.config_dir}/{filename}"
67+
self.config_dir = Path(data_place)
68+
69+
# Load configuration if path exists
70+
if (
71+
self.config_dir is not None and self.config_dir.exists()
72+
) or self.config_url is not None:
73+
self._load_sigma_json()
74+
self._load_filename_json()
75+
self._load_windows_provider_json()
76+
77+
def _load_json(self, filename: str) -> Optional[dict]:
78+
"""Load JSON data from either local file or remote URL with error handling."""
79+
if self.config_url:
80+
url = f"{self.config_url}/{filename}"
6781
try:
68-
response = requests.get(url)
82+
response = requests.get(url, timeout=10)
6983
response.raise_for_status()
7084
return response.json()
7185
except Exception as e:
7286
print(f"Error loading remote {filename}: {e}")
7387
return None
74-
else:
75-
config_dir_path = (
76-
pathlib.Path(self.config_dir)
77-
if isinstance(self.config_dir, str)
78-
else self.config_dir
79-
)
80-
path = config_dir_path / filename
88+
elif self.config_dir:
89+
path = self.config_dir / filename
8190
if path.exists():
8291
try:
8392
with path.open("r", encoding="UTF-8") as file:
8493
return json.load(file)
8594
except Exception as e:
8695
print(f"Error loading {filename}: {e}")
96+
return None
8797
return None
8898

8999
def _load_sigma_json(self):
100+
"""Load taxonomy configuration from JSON."""
90101
json_dict = self._load_json(self.JSON_NAME_TAXONOMY)
91-
if json_dict:
92-
taxonomy_info = dict()
93-
taxonomy_definition = dict()
94-
taxonomy_redundant = dict()
95-
temp = {key_logsource(v["logsource"]): v for v in json_dict["taxonomy"].values()}
96-
for key in sorted(temp.keys(), key=str.casefold):
97-
value = temp[key]
98-
logsource = core_logsource(SigmaLogSource.from_dict(value["logsource"]))
99-
fieldlist = []
100-
fieldlist.extend(value["field"]["native"])
101-
fieldlist.extend(value["field"]["custom"])
102-
taxonomy_info[logsource] = sorted(fieldlist, key=str.casefold)
103-
taxonomy_definition[logsource] = value["logsource"]["definition"]
104-
taxonomy_redundant[logsource] = value["field"]["redundant"]
105-
self.taxonomy_version = json_dict["version"]
106-
self.sigmahq_redundant_fields = taxonomy_redundant
107-
self.sigma_fieldsname = taxonomy_info
108-
self.sigmahq_logsource_definition = taxonomy_definition
109-
else:
110-
self.taxonomy_version = "0.0.0"
111-
self.sigmahq_redundant_fields = ref_sigmahq_redundant_field
112-
self.sigma_fieldsname = ref_sigmahq_fieldsname
113-
self.sigmahq_logsource_definition = ref_sigmahq_logsource_definition
102+
if not json_dict or "taxonomy" not in json_dict:
103+
return
104+
105+
taxonomy_info: Dict[SigmaLogSource, List[str]] = {}
106+
taxonomy_definition: Dict[SigmaLogSource, Optional[str]] = {}
107+
taxonomy_redundant: Dict[SigmaLogSource, List[str]] = {}
108+
109+
# Process taxonomy data
110+
temp = {key_logsource(v["logsource"]): v for v in json_dict["taxonomy"].values()}
111+
for key in sorted(temp.keys(), key=str.casefold):
112+
value = temp[key]
113+
logsource = core_logsource(SigmaLogSource.from_dict(value["logsource"]))
114+
fieldlist = sorted(
115+
value["field"]["native"] + value["field"]["custom"], key=str.casefold
116+
)
117+
taxonomy_info[logsource] = fieldlist
118+
taxonomy_definition[logsource] = value["logsource"].get("definition")
119+
taxonomy_redundant[logsource] = value["field"]["redundant"]
120+
121+
self.taxonomy_version = json_dict["version"]
122+
self.sigma_fieldsname = taxonomy_info
123+
self.sigmahq_redundant_fields = taxonomy_redundant
124+
self.sigmahq_logsource_definition = taxonomy_definition
114125

115126
def _load_filename_json(self):
127+
"""Load filename pattern configuration from JSON."""
116128
json_dict = self._load_json(self.JSON_NAME_FILENAME)
117-
if json_dict:
118-
119-
filename_info = dict()
120-
temp = {key_logsource(v["logsource"]): v for v in json_dict["pattern"].values()}
121-
for key in sorted(temp.keys(), key=str.casefold):
122-
value = temp[key]
123-
logsource = core_logsource(SigmaLogSource.from_dict(value["logsource"]))
124-
filename_info[logsource] = value["prefix"]
125-
self.filename_version = json_dict["version"]
126-
self.sigmahq_logsource_filepattern = filename_info
127-
else:
128-
self.filename_version = "0.0.0"
129-
self.sigmahq_logsource_filepattern = ref_sigmahq_logsource_filepattern
129+
if not json_dict or "pattern" not in json_dict or "version" not in json_dict:
130+
return
131+
132+
filename_info: Dict[SigmaLogSource, str] = {}
133+
temp = {key_logsource(v["logsource"]): v for v in json_dict["pattern"].values()}
134+
for key in sorted(temp.keys(), key=str.casefold):
135+
value = temp[key]
136+
logsource = core_logsource(SigmaLogSource.from_dict(value["logsource"]))
137+
filename_info[logsource] = value["prefix"]
138+
139+
self.filename_version = json_dict["version"]
140+
self.sigmahq_logsource_filepattern = filename_info
130141

131142
def _load_windows_provider_json(self):
143+
"""Load Windows provider configuration from JSON."""
132144
json_dict = self._load_json(self.JSON_NAME_WINDOWS_PROVIDER)
133-
if json_dict:
134-
windows_provider_name = dict()
135-
for category in sorted(json_dict["category_provider_name"], key=str.casefold):
136-
windows_provider_name[
137-
SigmaLogSource(product="windows", category=category, service=None)
138-
] = json_dict["category_provider_name"][category]
139-
windows_no_eventid = sorted(json_dict["category_no_eventid"], key=str.casefold)
140-
self.windows_version = json_dict["version"]
141-
self.windows_provider_name = windows_provider_name
142-
self.windows_no_eventid = windows_no_eventid
143-
else:
144-
self.windows_version = "0.0.0"
145-
self.windows_provider_name = ref_windows_provider_name
146-
self.windows_no_eventid = ref_windows_no_eventid
145+
if (
146+
not json_dict
147+
or "category_provider_name" not in json_dict
148+
or "category_no_eventid" not in json_dict
149+
):
150+
return
151+
152+
windows_provider_name = dict()
153+
for category in sorted(json_dict["category_provider_name"], key=str.casefold):
154+
windows_provider_name[
155+
SigmaLogSource(product="windows", category=category, service=None)
156+
] = json_dict["category_provider_name"][category]
157+
windows_no_eventid = sorted(json_dict["category_no_eventid"], key=str.casefold)
158+
self.windows_version = json_dict["version"]
159+
self.windows_provider_name = windows_provider_name
160+
self.windows_no_eventid = windows_no_eventid
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
{
2+
"title": "SigmaHQ rule filename prefix pytest",
3+
"version": "20250101",
4+
"pattern": {
5+
"p1_none_none": {
6+
"logsource": {
7+
"product": "p1",
8+
"category": null,
9+
"service": null,
10+
"definition": null
11+
},
12+
"prefix": "p1_"
13+
},
14+
"none_c1_none": {
15+
"logsource": {
16+
"product": null,
17+
"category": "c1",
18+
"service": null,
19+
"definition": null
20+
},
21+
"prefix": "c1_"
22+
},
23+
"none_none_s1": {
24+
"logsource": {
25+
"product": null,
26+
"category": null,
27+
"service": "s1",
28+
"definition": null
29+
},
30+
"prefix": "s1_"
31+
}
32+
}
33+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
{
2+
"title": "SigmaHQ taxonomy pytest",
3+
"version": "20250101",
4+
"taxonomy": {
5+
"p1_none_none": {
6+
"logsource": {
7+
"product": "p1",
8+
"category": null,
9+
"service": null,
10+
"definition": null
11+
},
12+
"description": null,
13+
"field": {
14+
"native": [],
15+
"custom": [],
16+
"redundant": []
17+
}
18+
},
19+
"p2_none_none": {
20+
"logsource": {
21+
"product": "p2",
22+
"category": null,
23+
"service": null,
24+
"definition": null
25+
},
26+
"description": null,
27+
"field": {
28+
"native": ["azerty"],
29+
"custom": [],
30+
"redundant": []
31+
}
32+
},
33+
"p3_none_none": {
34+
"logsource": {
35+
"product": "p3",
36+
"category": null,
37+
"service": null,
38+
"definition": null
39+
},
40+
"description": null,
41+
"field": {
42+
"native": [],
43+
"custom": ["azerty"],
44+
"redundant": []
45+
}
46+
},
47+
"p4_none_none": {
48+
"logsource": {
49+
"product": "p4",
50+
"category": null,
51+
"service": null,
52+
"definition": null
53+
},
54+
"description": null,
55+
"field": {
56+
"native": [],
57+
"custom": [],
58+
"redundant": ["azerty"]
59+
}
60+
},
61+
"p5_none_none": {
62+
"logsource": {
63+
"product": "p5",
64+
"category": null,
65+
"service": null,
66+
"definition": null
67+
},
68+
"description": null,
69+
"field": {
70+
"native": ["azerty"],
71+
"custom": ["qwerty"],
72+
"redundant": ["hello"]
73+
}
74+
}
75+
}
76+
}

0 commit comments

Comments
Β (0)