|
57 | 57 | from os import path |
58 | 58 | from pathlib import Path |
59 | 59 | from pprint import pprint |
60 | | -from typing import TYPE_CHECKING, Union |
| 60 | +from typing import TYPE_CHECKING, Union, Any |
61 | 61 |
|
62 | 62 | from logprep.util.ansi import Fore |
63 | 63 | from more_itertools import nth |
@@ -89,8 +89,8 @@ class ProcessorExtensions: |
89 | 89 |
|
90 | 90 | @staticmethod |
91 | 91 | def _get_errors(processor: "Processor", extra_output: list): |
92 | | - pd_errors = [] |
93 | | - pd_warnings = [] |
| 92 | + pd_errors: list[str] = [] |
| 93 | + pd_warnings: list[str] = [] |
94 | 94 | if isinstance(processor, PreDetector): |
95 | 95 | if not extra_output: |
96 | 96 | return pd_errors, pd_warnings |
@@ -204,7 +204,8 @@ def color_based_print(item): |
204 | 204 | else: |
205 | 205 | print_fcolor(Fore.CYAN, item) |
206 | 206 |
|
207 | | - def load_json_or_yaml(self, file_path) -> Union[list, dict]: |
| 207 | + @staticmethod |
| 208 | + def load_json_or_yaml(file_path) -> Union[list, dict]: |
208 | 209 | """load json or yaml depending on suffix |
209 | 210 |
|
210 | 211 | Parameters |
@@ -250,14 +251,14 @@ def __init__(self, config_path: str): |
250 | 251 |
|
251 | 252 | self._success = True |
252 | 253 |
|
253 | | - self._result = { |
| 254 | + self._result: dict[str, Union[int, float]] = { |
254 | 255 | "+ Successful Tests": 0, |
255 | 256 | "- Failed Tests": 0, |
256 | 257 | "~ Warning": 0, |
257 | 258 | "Rule Test Coverage": 0.0, |
258 | 259 | "Total Tests": 0, |
259 | 260 | } |
260 | | - self._problems = {"warnings": [], "errors": []} |
| 261 | + self._problems: dict[str, list] = {"warnings": [], "errors": []} |
261 | 262 |
|
262 | 263 | self._pd_extra = ProcessorExtensions() |
263 | 264 | self._gpr = GrokPatternReplacer(self._config_yml) |
@@ -313,7 +314,7 @@ def _run_tests_for_rules(self, rules_pn: dict) -> None: |
313 | 314 | self._check_which_rule_files_miss_tests(rules_pn) |
314 | 315 | processors_no_ct = self._get_processors() |
315 | 316 | self.check_run_rule_tests(processors_no_ct, rules_pn) |
316 | | - self._result["~ Warning"] += len(self._problems.get("warnings")) |
| 317 | + self._result["~ Warning"] += len(self._problems["warnings"]) |
317 | 318 | self._pd_extra.print_rules(self._result) |
318 | 319 |
|
319 | 320 | if not self._success: |
@@ -456,24 +457,24 @@ def _eval_file_rule_test(self, rule_test: dict, processor: "Processor", r_idx: i |
456 | 457 |
|
457 | 458 | if ( |
458 | 459 | print_diff |
459 | | - or nth(self._problems.get("warnings"), self._rule_cnt) is not None |
| 460 | + or nth(self._problems["warnings"], self._rule_cnt) is not None |
460 | 461 | or nth(self._problems.get("errors"), self._rule_cnt) is not None |
461 | 462 | ): |
462 | 463 | self._pd_extra.color_based_print( |
463 | 464 | f"> RULE FILE {rule_test['file']} & " |
464 | 465 | f"RULE TEST {t_idx + 1}/{len(rule_test['tests'])}:" |
465 | 466 | ) |
466 | | - if nth(self._problems.get("warnings"), self._rule_cnt) is not None: |
| 467 | + if nth(self._problems["warnings"], self._rule_cnt) is not None: |
467 | 468 | self._pd_extra.color_based_print( |
468 | | - f"~ {self._problems.get('warnings')[self._result['~ Warning']]}" |
| 469 | + f"~ {self._problems['warnings'][int(self._result['~ Warning'])]}" |
469 | 470 | ) |
470 | 471 |
|
471 | 472 | if print_diff or nth(self._problems.get("errors"), self._rule_cnt) is not None: |
472 | 473 | if nth(self._problems.get("errors"), self._rule_cnt) is not None: |
473 | 474 | self._pd_extra.color_based_print( |
474 | | - f"- {self._problems.get('errors')[self._result['- Failed Tests']]}" |
| 475 | + f"- {self._problems['errors'][int(self._result['- Failed Tests'])]}" |
475 | 476 | ) |
476 | | - self._pd_extra.print_diff_test("", diff) # print_rules({"DIFF": diff}) |
| 477 | + self._pd_extra.print_diff_test("", diff) |
477 | 478 | self._success = False |
478 | 479 | self._result["- Failed Tests"] += 1 |
479 | 480 |
|
@@ -534,7 +535,7 @@ def _check_which_rule_files_miss_tests(self, rules_pn) -> None: |
534 | 535 | rules_pn : dict |
535 | 536 | accumulated rules for each processor to operate on |
536 | 537 | """ |
537 | | - rule_tests = {"with tests": [], "without tests": []} |
| 538 | + rule_tests: dict[str, list] = {"with tests": [], "without tests": []} |
538 | 539 | for _, processor_test_cfg in rules_pn.items(): |
539 | 540 | rules = processor_test_cfg["rules"] |
540 | 541 |
|
@@ -591,7 +592,7 @@ def _set_rules_dirs_to_empty(self) -> None: |
591 | 592 | processor_cfg["rules"] = self._empty_rules_dirs |
592 | 593 |
|
593 | 594 | def _get_rules_per_processor_name(self, rules_dirs: dict) -> defaultdict: |
594 | | - rules_pn = defaultdict(dict) |
| 595 | + rules_pn: defaultdict[Any, dict] = defaultdict(dict) |
595 | 596 |
|
596 | 597 | for processor_name, proc_rules_dirs in rules_dirs.items(): |
597 | 598 | self._get_rules_for_processor(processor_name, proc_rules_dirs, rules_pn) |
@@ -648,7 +649,7 @@ def _get_rule_dict(self, file, root, processor_name, rules_pn) -> None: |
648 | 649 | Exception |
649 | 650 | Target_rule_idx is now mandatory, throw exception if not found for each rule |
650 | 651 | """ |
651 | | - rule_tests = [] |
| 652 | + rule_tests: list[Any] | dict[Any, Any] = [] |
652 | 653 | test_path = path.join(root, "".join([file.rsplit(".", maxsplit=1)[0], "_test.json"])) |
653 | 654 |
|
654 | 655 | if path.isfile(test_path): |
@@ -691,7 +692,7 @@ def _is_valid_rule_name(file_name: str) -> bool: |
691 | 692 | ) |
692 | 693 |
|
693 | 694 | def _get_rule_dirs_by_processor_name(self) -> defaultdict: |
694 | | - rules_dirs = defaultdict(dict) |
| 695 | + rules_dirs: defaultdict[Any, dict] = defaultdict(dict) |
695 | 696 | for processor in self._config_yml["pipeline"]: |
696 | 697 | processor_name, processor_cfg = next(iter(processor.items())) |
697 | 698 |
|
@@ -729,8 +730,8 @@ def _get_rules_to_add(processor_cfg) -> list: |
729 | 730 | """ |
730 | 731 | rules_to_add = [] |
731 | 732 |
|
732 | | - rule_path_lists = processor_cfg.get("rules") |
733 | | - if rule_path_lists: |
734 | | - for rule_path_list in rule_path_lists: |
735 | | - rules_to_add.append(rule_path_list) |
| 733 | + rules_list = processor_cfg.get("rules") |
| 734 | + rules_path_list = (rule_path for rule_path in rules_list if isinstance(rule_path, str)) |
| 735 | + for rule_path_list in rules_path_list: |
| 736 | + rules_to_add.append(rule_path_list) |
736 | 737 | return rules_to_add |
0 commit comments