diff --git a/suricata_check/_suricata_check.py b/suricata_check/_suricata_check.py index b980dcf..24e9999 100644 --- a/suricata_check/_suricata_check.py +++ b/suricata_check/_suricata_check.py @@ -420,9 +420,7 @@ def __get_verified_kwarg( if kwargs[name] is not CLI_ARGUMENTS[name]["default"]: if not isinstance(kwargs[name], CLI_ARGUMENTS[name]["type"]): raise click.BadParameter( - f"""Error: \ - Argument `{name}` should have a value of type `{CLI_ARGUMENTS[name]["type"]}` \ - but has value {kwargs[name]} of type {kwargs[name].__class__} instead.""", + f"""Error: Argument `{name}` should have a value of type `{CLI_ARGUMENTS[name]["type"]}` but has value {kwargs[name]} of type {kwargs[name].__class__} instead.""", ) return kwargs[name] @@ -454,43 +452,86 @@ def __main_single_rule( write_output(OutputReport(rules=[rule_report]), out) +# ---------------------------------------------------------------------- +# NEW MULTIPROCESSING WORKER FUNCTIONS +# ---------------------------------------------------------------------- + +def _worker_init(log_queue: Optional[Any]) -> None: + """Initializer for multiprocessing pool workers to set up logging.""" + if log_queue is None: + return + + # Set up the worker's root logger to forward logs to the main process queue + root_logger = logging.getLogger() + if root_logger.hasHandlers(): + root_logger.handlers.clear() + + queue_handler = logging.handlers.QueueHandler(log_queue) + root_logger.addHandler(queue_handler) + # The QueueListener in the main process handles the actual level filtering + root_logger.setLevel(logging.DEBUG) + + +def _process_rule_task(args: tuple[str, int, Optional[int], bool, Sequence[CheckerInterface]]) -> Optional[RuleReport]: + """Worker function to parse, validate, and analyze a single rule.""" + rule_line, number, multiline_begin_number, evaluate_disabled, checkers = args + + if rule_line.startswith("#"): + if evaluate_disabled: + if parse(rule_line) is None: + _logger.warning("Ignoring comment on line %i: %s", number, rule_line) + return None + else: + return None + + try: + rule: Optional[Rule] = parse(rule_line) + except ParsingError: + _logger.error("Internal error in parsing of rule on line %i: %s", number, rule_line) + rule = None + + ignore = __parse_type_ignore(rule) + + if rule is None: + _logger.error("Error parsing rule on line %i: %s", number, rule_line) + return None + + if not is_valid_rule(rule): + _logger.error("Invalid rule on line %i: %s", number, rule_line) + return None + + _logger.debug("Processing rule: %s on line %i", get_rule_option(rule, "sid"), number) + + rule_report: RuleReport = analyze_rule(rule, checkers=checkers, ignore=ignore) + rule_report.line_begin = multiline_begin_number or number + rule_report.line_end = number + + return rule_report + +# ---------------------------------------------------------------------- +# END MULTIPROCESSING WORKER FUNCTIONS +# ---------------------------------------------------------------------- + def process_rules_file( # noqa: C901, PLR0912, PLR0915 rules: str, evaluate_disabled: bool, checkers: Optional[Sequence[CheckerInterface]] = None, ) -> OutputReport: - """Processes a rule file and returns a list of rules and their issues. - - Args: - rules: A path to a Suricata rules file. - evaluate_disabled: A flag indicating whether disabled rules should be evaluated. - checkers: The checkers to be used when processing the rule file. - - Returns: - A list of rules and their issues. - - Raises: - RuntimeError: If no checkers could be automatically discovered. - - """ + """Processes a rule file and returns a list of rules and their issues.""" if checkers is None: checkers = get_checkers() output = OutputReport() - with ( - open( - os.path.normpath(rules), - buffering=io.DEFAULT_BUFFER_SIZE, - ) as rules_fh, - ): - if len(checkers) == 0: - msg = "No checkers provided for processing rules." - _logger.error(msg) - raise RuntimeError(msg) + if len(checkers) == 0: + msg = "No checkers provided for processing rules." + _logger.error(msg) + raise RuntimeError(msg) - _logger.info("Processing rule file: %s", rules) + _logger.info("Processing rule file: %s", rules) + tasks = [] + with open(os.path.normpath(rules), buffering=io.DEFAULT_BUFFER_SIZE) as rules_fh: collected_multiline_parts: Optional[str] = None multiline_begin_number: Optional[int] = None @@ -498,83 +539,42 @@ def process_rules_file( # noqa: C901, PLR0912, PLR0915 # First work on collecting and parsing multiline rules if line.rstrip("\r\n").endswith("\\"): multiline_part = line.rstrip("\r\n")[:-1] - if collected_multiline_parts is None: collected_multiline_parts = multiline_part multiline_begin_number = number else: collected_multiline_parts += multiline_part.lstrip() - continue # Process final part of multiline rule if one is being collected if collected_multiline_parts is not None: collected_multiline_parts += line.lstrip() - rule_line = collected_multiline_parts.strip() - collected_multiline_parts = None - # If no multiline rule is being collected process as a potential single line rule else: if len(line.strip()) == 0: continue - - if line.strip().startswith("#"): - if evaluate_disabled: - # Verify that this line is a rule and not a comment - if parse(line) is None: - # Log the comment since it may be a invalid rule - _logger.warning( - "Ignoring comment on line %i: %s", - number, - line, - ) - continue - else: - # Skip the rule - continue - rule_line = line.strip() - try: - rule: Optional[Rule] = parse(rule_line) - except ParsingError: - _logger.error( - "Internal error in parsing of rule on line %i: %s", - number, - rule_line, - ) - rule = None - - # Parse comment and potential ignore comment to ignore rules - ignore = __parse_type_ignore(rule) - - # Verify that a rule was parsed correctly. - if rule is None: - _logger.error("Error parsing rule on line %i: %s", number, rule_line) - continue - - if not is_valid_rule(rule): - _logger.error("Invalid rule on line %i: %s", number, rule_line) - continue - - _logger.debug( - "Processing rule: %s on line %i", - get_rule_option(rule, "sid"), - number, - ) + # Append as a task rather than processing immediately + tasks.append((rule_line, number, multiline_begin_number, evaluate_disabled, checkers)) + multiline_begin_number = None - rule_report: RuleReport = analyze_rule( - rule, - checkers=checkers, - ignore=ignore, - ) - rule_report.line_begin = multiline_begin_number or number - rule_report.line_end = number + # Extract the logging queue from the main process root logger + root_logger = logging.getLogger() + queue_handler = next((h for h in root_logger.handlers if isinstance(h, logging.handlers.QueueHandler)), None) + log_queue = queue_handler.queue if queue_handler else None - output.rules.append(rule_report) + # Spin up the process pool + with multiprocessing.Pool( + initializer=_worker_init, + initargs=(log_queue,) + ) as pool: + # pool.map preserves the original order of the rules in the file + results = pool.map(_process_rule_task, tasks) - multiline_begin_number = None + # Filter out None results and extend the output rules + output.rules.extend([res for res in results if res is not None]) _logger.info("Completed processing rule file: %s", rules) @@ -582,7 +582,6 @@ def process_rules_file( # noqa: C901, PLR0912, PLR0915 return output - def __parse_type_ignore(rule: Optional[Rule]) -> Optional[Sequence[str]]: if rule is None: return None @@ -599,21 +598,7 @@ def analyze_rule( checkers: Optional[Sequence[CheckerInterface]] = None, ignore: Optional[Sequence[str]] = None, ) -> RuleReport: - """Checks a rule and returns a dictionary containing the rule and a list of issues found. - - Args: - rule: The rule to be checked. - checkers: The checkers to be used to check the rule. - ignore: Regular expressions to match checker codes to ignore - - Returns: - A list of issues found in the rule. - Each issue is typed as a `dict`. - - Raises: - InvalidRuleError: If the rule does not follow the Suricata syntax. - - """ + """Checks a rule and returns a dictionary containing the rule and a list of issues found.""" if not is_valid_rule(rule): raise InvalidRuleError(rule.raw)