Skip to content

Commit 7d34ef1

Browse files
committed
added utest + updates to collector for windows
1 parent c64323a commit 7d34ef1

File tree

6 files changed

+394
-45
lines changed

6 files changed

+394
-45
lines changed

nodescraper/plugins/inband/kernel_module/analyzer_args.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
# SOFTWARE.
2424
#
2525
###############################################################################
26+
import re
2627

2728
from nodescraper.models import AnalyzerArgs
2829
from nodescraper.plugins.inband.kernel_module.kernel_module_data import (
@@ -32,7 +33,8 @@
3233

3334
class KernelModuleAnalyzerArgs(AnalyzerArgs):
3435
kernel_modules: dict[str, dict] = {}
35-
modules_filter: list[str] = ["amd"]
36+
37+
regex_filter: list[str] = ["amd"]
3638
regex_match: bool = True
3739

3840
@classmethod
@@ -45,8 +47,16 @@ def build_from_model(cls, datamodel: KernelModuleDataModel) -> "KernelModuleAnal
4547
Returns:
4648
KernelModuleAnalyzerArgs: instance of analyzer args class
4749
"""
50+
51+
pattern_regex = re.compile("|".join(datamodel.regex_filter), re.IGNORECASE)
52+
filtered_mods = {
53+
name: data
54+
for name, data in datamodel.kernel_modules.items()
55+
if pattern_regex.search(name)
56+
}
57+
4858
return cls(
49-
kernel_modules=datamodel.kernel_modules,
50-
modules_filter=datamodel.modules_filter,
59+
kernel_modules=filtered_mods,
60+
regex_filter=datamodel.regex_filter,
5161
regex_match=datamodel.regex_match,
5262
)

nodescraper/plugins/inband/kernel_module/kernel_module_analyzer.py

Lines changed: 108 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -40,20 +40,62 @@ class KernelModuleAnalyzer(DataAnalyzer[KernelModuleDataModel, KernelModuleAnaly
4040
DATA_MODEL = KernelModuleDataModel
4141

4242
def filter_modules_by_pattern(
43-
self, modules: dict[str, dict], patterns: list[str]
44-
) -> dict[str, dict]:
45-
pattern_regex = re.compile("|".join(patterns), re.IGNORECASE)
43+
self, modules: dict[str, dict], patterns: list[str] = None
44+
) -> tuple[dict[str, dict], list[str]]:
45+
if patterns is None:
46+
return modules, []
4647

47-
return {name: data for name, data in modules.items() if pattern_regex.search(name)}
48+
matched_modules = {}
49+
unmatched_patterns = []
4850

49-
def filter_modules_by_name(
50-
self, modules: dict[str, dict], names: list[str] = None
51-
) -> dict[str, dict]:
51+
pattern_match_flags = {p: False for p in patterns}
5252

53-
if not names:
54-
return {name: data for name, data in modules.items()}
53+
for mod_name in modules:
54+
for p in patterns:
55+
if re.search(p, mod_name, re.IGNORECASE):
56+
matched_modules[mod_name] = modules[mod_name]
57+
pattern_match_flags[p] = True
58+
break
5559

56-
return {name: data for name, data in modules.items() if name in names}
60+
unmatched_patterns = [p for p, matched in pattern_match_flags.items() if not matched]
61+
62+
return matched_modules, unmatched_patterns
63+
64+
def filter_modules_by_name_and_param(
65+
self, modules: dict[str, dict], to_match: dict[str, dict]
66+
) -> tuple[dict[str, dict], dict[str, dict]]:
67+
if not to_match:
68+
return modules, {}
69+
70+
filtered = {}
71+
unmatched = {}
72+
73+
for mod_name, expected_data in to_match.items():
74+
expected_params = expected_data.get("parameters", {})
75+
actual_data = modules.get(mod_name)
76+
77+
if not actual_data:
78+
# Module completely missing
79+
unmatched[mod_name] = expected_data
80+
continue
81+
82+
actual_params = actual_data.get("parameters", {})
83+
param_mismatches = {}
84+
85+
for param, expected_val in expected_params.items():
86+
actual_val = actual_params.get(param)
87+
if actual_val != expected_val:
88+
param_mismatches[param] = {
89+
"expected": expected_val,
90+
"actual": actual_val if actual_val is not None else "<missing>",
91+
}
92+
93+
if param_mismatches:
94+
unmatched[mod_name] = {"parameters": param_mismatches}
95+
else:
96+
filtered[mod_name] = actual_data
97+
98+
return filtered, unmatched
5799

58100
def analyze_data(
59101
self, data: KernelModuleDataModel, args: Optional[KernelModuleAnalyzerArgs] = None
@@ -70,32 +112,74 @@ def analyze_data(
70112
if not args:
71113
args = KernelModuleAnalyzerArgs()
72114

73-
self.result.message = "Kernel modules analyzed"
74115
self.result.status = ExecutionStatus.OK
75-
filtered_modules = {}
116+
76117
if args.regex_match:
77118
try:
78-
filtered_modules = self.filter_modules_by_pattern(
79-
data.kernel_modules, args.modules_filter
119+
filtered_modules, unmatched_pattern = self.filter_modules_by_pattern(
120+
data.kernel_modules, args.regex_filter
80121
)
81122
except re.error:
82123
self._log_event(
83124
category=EventCategory.RUNTIME,
84125
description="KernelModule regex is invalid",
85-
data=data,
126+
data={"regex_filters": {args.regex_filter}},
86127
priority=EventPriority.ERROR,
87128
)
88129
self.result.message = "Kernel modules failed to match regex"
89130
self.result.status = ExecutionStatus.ERROR
90131
return self.result
91132

92-
else:
93-
filtered_modules = self.filter_modules_by_name(data.kernel_modules, args.modules_filter)
133+
if unmatched_pattern:
134+
self._log_event(
135+
category=EventCategory.RUNTIME,
136+
description="KernelModules did not match all patterns",
137+
data={"unmatched_pattern: ": unmatched_pattern},
138+
priority=EventPriority.INFO,
139+
)
140+
self.result.message = "Kernel modules failed to match every pattern"
141+
self.result.status = ExecutionStatus.ERROR
142+
return self.result
143+
144+
self._log_event(
145+
category=EventCategory.RUNTIME,
146+
description="KernelModules analyzed",
147+
data={"filtered_modules": filtered_modules},
148+
priority=EventPriority.INFO,
149+
)
150+
return self.result
151+
152+
elif args.kernel_modules:
153+
filtered_modules, not_matched = self.filter_modules_by_name_and_param(
154+
data.kernel_modules, args.kernel_modules
155+
)
156+
157+
# no modules matched
158+
if not filtered_modules and not_matched:
159+
self._log_event(
160+
category=EventCategory.RUNTIME,
161+
description="KernelModules: no modules matched",
162+
data=args.kernel_modules,
163+
priority=EventPriority.ERROR,
164+
)
165+
self.result.message = "Kernel modules not matched"
166+
self.result.status = ExecutionStatus.ERROR
167+
return self.result
168+
# some modules matched
169+
elif filtered_modules and not_matched:
170+
171+
self._log_event(
172+
category=EventCategory.RUNTIME,
173+
description="KernelModules: not all modules matched",
174+
data=not_matched,
175+
priority=EventPriority.ERROR,
176+
)
177+
self.result.message = "Kernel modules not matched"
178+
self.result.status = ExecutionStatus.ERROR
179+
return self.result
180+
# all modules matched
181+
else:
182+
self.result.message = "Kernel modules matched"
183+
return self.result
94184

95-
self._log_event(
96-
category=EventCategory.RUNTIME,
97-
description="KernelModules analyzed",
98-
data=filtered_modules,
99-
priority=EventPriority.INFO,
100-
)
101-
return self.result
185+
return self.result

nodescraper/plugins/inband/kernel_module/kernel_module_collector.py

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ def parse_proc_modules(self, output):
4040
modules = {}
4141
for line in output.strip().splitlines():
4242
parts = line.split()
43-
if len(parts) < 6:
43+
if not parts:
4444
continue
45-
name, size, instances, deps, state, offset = parts[:6]
45+
name = parts[0]
4646
modules[name] = {
4747
"parameters": {},
4848
}
@@ -89,26 +89,18 @@ def collect_data(
8989
tuple[TaskResult, KernelModuleDataModel | None]: tuple containing the task result and kernel data model or None if not found.
9090
"""
9191
kernel_modules = {}
92+
km_data: KernelModuleDataModel | None = None
9293
if self.system_info.os_family == OSFamily.WINDOWS:
9394
res = self._run_sut_cmd("wmic os get Version /Value")
9495
if res.exit_code == 0:
95-
kernel_modules = [line for line in res.stdout.splitlines() if "Version=" in line][
96-
0
97-
].split("=")[1]
96+
for line in res.stdout.splitlines():
97+
if line.startswith("Version="):
98+
version = line.split("=", 1)[1]
99+
kernel_modules = {version: {"parameters": {}}}
100+
break
101+
98102
else:
99103
kernel_modules, res = self.collect_all_module_info()
100-
"""
101-
for mod, info in kernel_modules.items():
102-
print(f"Module: {mod}")
103-
for key, val in info.items():
104-
if key == "parameters":
105-
print(" Parameters:")
106-
for pname, pval in val.items():
107-
print(f" {pname} = {pval}")
108-
else:
109-
print(f" {key}: {val}")
110-
print()
111-
"""
112104

113105
if not kernel_modules:
114106
self._log_event(

nodescraper/plugins/inband/kernel_module/kernel_module_data.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,5 @@
2929

3030
class KernelModuleDataModel(DataModel):
3131
kernel_modules: dict
32-
modules_filter: list[str] = ["amd"]
32+
regex_filter: list[str] = ["amd"]
3333
regex_match: bool = True
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
import pytest
2+
3+
from nodescraper.enums.eventcategory import EventCategory
4+
from nodescraper.enums.executionstatus import ExecutionStatus
5+
from nodescraper.models.systeminfo import OSFamily
6+
from nodescraper.plugins.inband.kernel_module.analyzer_args import (
7+
KernelModuleAnalyzerArgs,
8+
)
9+
from nodescraper.plugins.inband.kernel_module.kernel_module_analyzer import (
10+
KernelModuleAnalyzer,
11+
)
12+
from nodescraper.plugins.inband.kernel_module.kernel_module_data import (
13+
KernelModuleDataModel,
14+
)
15+
16+
17+
@pytest.fixture
18+
def sample_modules():
19+
return {
20+
"modA": {"parameters": {"p": 1}},
21+
"otherMod": {"parameters": {"p": 2}},
22+
"TESTmod": {"parameters": {"p": 3}},
23+
"amdABC": {"parameters": {"p": 3}},
24+
}
25+
26+
27+
@pytest.fixture
28+
def data_model(sample_modules):
29+
return KernelModuleDataModel(kernel_modules=sample_modules)
30+
31+
32+
@pytest.fixture
33+
def analyzer(system_info):
34+
# ensure no SystemCompatibilityError
35+
system_info.os_family = OSFamily.LINUX
36+
return KernelModuleAnalyzer(system_info=system_info)
37+
38+
39+
def test_filter_modules_by_pattern_none(sample_modules, analyzer):
40+
matched, unmatched = analyzer.filter_modules_by_pattern(sample_modules, None)
41+
assert matched == sample_modules
42+
assert unmatched == []
43+
44+
45+
def test_filter_modules_by_pattern_strict(sample_modules, analyzer):
46+
matched, unmatched = analyzer.filter_modules_by_pattern(sample_modules, [r"mod$"])
47+
assert set(matched) == {"otherMod", "TESTmod"}
48+
assert unmatched == []
49+
50+
51+
def test_filter_modules_by_pattern_unmatched(sample_modules, analyzer):
52+
# pattern "foo" never matches
53+
matched, unmatched = analyzer.filter_modules_by_pattern(sample_modules, ["foo"])
54+
assert matched == {}
55+
assert unmatched == ["foo"]
56+
57+
58+
def test_filter_name_and_param_all_match(sample_modules, analyzer):
59+
to_match = {"modA": {"parameters": {"p": 1}}}
60+
matched, unmatched = analyzer.filter_modules_by_name_and_param(sample_modules, to_match)
61+
assert matched == {"modA": sample_modules["modA"]}
62+
assert unmatched == {}
63+
64+
65+
def test_filter_name_and_param_param_mismatch(sample_modules, analyzer):
66+
to_match = {"modA": {"parameters": {"p": 999}}}
67+
matched, unmatched = analyzer.filter_modules_by_name_and_param(sample_modules, to_match)
68+
assert matched == {}
69+
assert "modA" in unmatched
70+
assert "p" in unmatched["modA"]["parameters"]
71+
72+
73+
def test_filter_name_and_param_missing_module(sample_modules, analyzer):
74+
to_match = {"bogus": {"parameters": {"x": 1}}}
75+
matched, unmatched = analyzer.filter_modules_by_name_and_param(sample_modules, to_match)
76+
assert matched == {}
77+
assert "bogus" in unmatched
78+
79+
80+
def test_analyze_data_default(data_model, analyzer):
81+
result = analyzer.analyze_data(data_model, None)
82+
assert result.status == ExecutionStatus.OK
83+
84+
85+
def test_analyze_data_regex_success(data_model, analyzer):
86+
args = KernelModuleAnalyzerArgs(regex_match=True, regex_filter=["^TESTmod$"])
87+
result = analyzer.analyze_data(data_model, args)
88+
assert result.status == ExecutionStatus.OK
89+
ev = result.events[0]
90+
assert ev.description == "KernelModules analyzed"
91+
# only TESTmod should have been kept
92+
fm = ev.data["filtered_modules"]
93+
assert set(fm) == {"TESTmod"}
94+
95+
96+
def test_analyze_data_regex_invalid_pattern(data_model, analyzer):
97+
args = KernelModuleAnalyzerArgs(regex_match=True, regex_filter=["*invalid"])
98+
result = analyzer.analyze_data(data_model, args)
99+
assert result.status in (ExecutionStatus.ERROR, ExecutionStatus.EXECUTION_FAILURE)
100+
assert any(EventCategory.RUNTIME.value in ev.category for ev in result.events)
101+
102+
103+
def test_analyze_data_regex_unmatched_patterns(data_model, analyzer):
104+
args = KernelModuleAnalyzerArgs(regex_match=True, regex_filter=["modA", "nope"])
105+
result = analyzer.analyze_data(data_model, args)
106+
assert result.status == ExecutionStatus.ERROR
107+
assert any(ev.description == "KernelModules did not match all patterns" for ev in result.events)
108+
109+
110+
def test_analyze_data_name_only_success(data_model, analyzer):
111+
args = KernelModuleAnalyzerArgs(
112+
regex_match=False, kernel_modules={"modA": {"parameters": {"p": 1}}}
113+
)
114+
result = analyzer.analyze_data(data_model, args)
115+
assert result.status == ExecutionStatus.OK
116+
assert result.message == "Kernel modules matched"
117+
assert result.events == []
118+
119+
120+
def test_analyze_data_name_only_no_match(data_model, analyzer):
121+
args = KernelModuleAnalyzerArgs(regex_match=False, kernel_modules={"XYZ": {"parameters": {}}})
122+
result = analyzer.analyze_data(data_model, args)
123+
assert result.status == ExecutionStatus.ERROR
124+
assert any("no modules matched" in ev.description.lower() for ev in result.events)
125+
126+
127+
def test_analyze_data_name_only_partial_match(data_model, analyzer):
128+
args = KernelModuleAnalyzerArgs(
129+
regex_match=False,
130+
kernel_modules={
131+
"modA": {"parameters": {"p": 1}},
132+
"otherMod": {"parameters": {"wrong": 0}},
133+
},
134+
)
135+
result = analyzer.analyze_data(data_model, args)
136+
assert result.status == ExecutionStatus.ERROR
137+
assert any("not all modules matched" in ev.description.lower() for ev in result.events)

0 commit comments

Comments
 (0)