|
23 | 23 | # SOFTWARE. |
24 | 24 | # |
25 | 25 | ############################################################################### |
| 26 | +from nodescraper.base import InBandDataCollector |
| 27 | +from nodescraper.enums import EventCategory, EventPriority, ExecutionStatus, OSFamily |
| 28 | +from nodescraper.models import TaskResult |
26 | 29 |
|
27 | | -from nodescraper.models import AnalyzerArgs |
28 | | -from nodescraper.plugins.inband.kernel_module.kernel_module_data import ( |
29 | | - KernelModuleDataModel, |
30 | | -) |
| 30 | +from .kernel_module_data import KernelModuleDataModel |
31 | 31 |
|
32 | 32 |
|
33 | | -class KernelModuleAnalyzerArgs(AnalyzerArgs): |
34 | | - modules: dict = {} |
35 | | - modules_filter: list[str] = ["amd", "Amd"] |
36 | | - regex_match: bool = False |
| 33 | +class KernelModuleCollector(InBandDataCollector[KernelModuleDataModel, None]): |
| 34 | + """Read kernel modules and associated parameters""" |
37 | 35 |
|
38 | | - @classmethod |
39 | | - def build_from_model(cls, datamodel: KernelModuleDataModel) -> "KernelModuleAnalyzerArgs": |
40 | | - """build analyzer args from data model |
| 36 | + DATA_MODEL = KernelModuleDataModel |
41 | 37 |
|
42 | | - Args: |
43 | | - datamodel (KernelModuleDataModel): data model for plugin |
| 38 | + def parse_proc_modules(self, output): |
| 39 | + """Parses the output of /proc/modules into a dictionary.""" |
| 40 | + modules = {} |
| 41 | + for line in output.strip().splitlines(): |
| 42 | + parts = line.split() |
| 43 | + if len(parts) < 6: |
| 44 | + continue |
| 45 | + name, size, instances, deps, state, offset = parts[:6] |
| 46 | + modules[name] = { |
| 47 | + "size": int(size), |
| 48 | + "instances": int(instances), |
| 49 | + "dependencies": [] if deps == "-" else deps.split(","), |
| 50 | + "state": state, |
| 51 | + "offset": offset, |
| 52 | + "parameters": {}, |
| 53 | + } |
| 54 | + return modules |
| 55 | + |
| 56 | + def get_module_parameters(self, module_name): |
| 57 | + """Fetches parameter names and values for a given kernel module using _run_sut_cmd.""" |
| 58 | + param_dict = {} |
| 59 | + param_dir = f"/sys/module/{module_name}/parameters" |
| 60 | + |
| 61 | + list_params_cmd = f"ls {param_dir}" |
| 62 | + res = self._run_sut_cmd(list_params_cmd) |
| 63 | + if res.exit_code != 0: |
| 64 | + return param_dict |
| 65 | + |
| 66 | + for param in res.stdout.strip().splitlines(): |
| 67 | + param_path = f"{param_dir}/{param}" |
| 68 | + value_res = self._run_sut_cmd(f"cat {param_path}") |
| 69 | + value = value_res.stdout.strip() if value_res.exit_code == 0 else "<unreadable>" |
| 70 | + param_dict[param] = value |
| 71 | + |
| 72 | + return param_dict |
| 73 | + |
| 74 | + def collect_all_module_info(self): |
| 75 | + res = self._run_sut_cmd("cat /proc/modules") |
| 76 | + if res.exit_code != 0: |
| 77 | + raise RuntimeError("Failed to read /proc/modules") |
| 78 | + |
| 79 | + modules = self.parse_proc_modules(res.stdout) |
| 80 | + |
| 81 | + for mod in modules: |
| 82 | + modules[mod]["parameters"] = self.get_module_parameters(mod) |
| 83 | + |
| 84 | + return modules, res |
| 85 | + |
| 86 | + def collect_data( |
| 87 | + self, |
| 88 | + args=None, |
| 89 | + ) -> tuple[TaskResult, KernelModuleDataModel | None]: |
| 90 | + """ |
| 91 | + Collect kernel modules data. |
44 | 92 |
|
45 | 93 | Returns: |
46 | | - KernelModuleAnalyzerArgs: instance of analyzer args class |
| 94 | + tuple[TaskResult, KernelModuleDataModel | None]: tuple containing the task result and kernel data model or None if not found. |
47 | 95 | """ |
48 | | - return cls(modules=datamodel.modules) |
| 96 | + kernel_modules = {} |
| 97 | + if self.system_info.os_family == OSFamily.WINDOWS: |
| 98 | + res = self._run_sut_cmd("wmic os get Version /Value") |
| 99 | + if res.exit_code == 0: |
| 100 | + kernel_modules = [line for line in res.stdout.splitlines() if "Version=" in line][ |
| 101 | + 0 |
| 102 | + ].split("=")[1] |
| 103 | + else: |
| 104 | + kernel_modules, res = self.collect_all_module_info() |
| 105 | + """ |
| 106 | + for mod, info in kernel_modules.items(): |
| 107 | + print(f"Module: {mod}") |
| 108 | + for key, val in info.items(): |
| 109 | + if key == "parameters": |
| 110 | + print(" Parameters:") |
| 111 | + for pname, pval in val.items(): |
| 112 | + print(f" {pname} = {pval}") |
| 113 | + else: |
| 114 | + print(f" {key}: {val}") |
| 115 | + print() |
| 116 | + """ |
| 117 | + |
| 118 | + if not kernel_modules: |
| 119 | + self._log_event( |
| 120 | + category=EventCategory.OS, |
| 121 | + description="Error checking kernel modules", |
| 122 | + data={"command": res.command, "exit_code": res.exit_code}, |
| 123 | + priority=EventPriority.ERROR, |
| 124 | + console_log=True, |
| 125 | + ) |
| 126 | + |
| 127 | + if kernel_modules: |
| 128 | + km_data = KernelModuleDataModel(kernel_modules=kernel_modules) |
| 129 | + self._log_event( |
| 130 | + category="KERNEL_READ", |
| 131 | + description="Kernel modules read", |
| 132 | + data=km_data.model_dump(), |
| 133 | + priority=EventPriority.INFO, |
| 134 | + ) |
| 135 | + self.result.message = f"{len(km_data.kernel_modules)} kernel modules collected" |
| 136 | + self.result.status = ExecutionStatus.OK |
| 137 | + else: |
| 138 | + kernel_modules = None |
| 139 | + |
| 140 | + self.result.message = ( |
| 141 | + "Kernel modules collected" if kernel_modules else "Kernel modules not found" |
| 142 | + ) |
| 143 | + self.result.status = ExecutionStatus.OK if kernel_modules else ExecutionStatus.ERROR |
| 144 | + return self.result, km_data |
0 commit comments