Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 86 additions & 101 deletions suricata_check/_suricata_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,9 +420,7 @@ def __get_verified_kwarg(
if kwargs[name] is not CLI_ARGUMENTS[name]["default"]:
if not isinstance(kwargs[name], CLI_ARGUMENTS[name]["type"]):
raise click.BadParameter(
f"""Error: \
Argument `{name}` should have a value of type `{CLI_ARGUMENTS[name]["type"]}` \
but has value {kwargs[name]} of type {kwargs[name].__class__} instead.""",
f"""Error: Argument `{name}` should have a value of type `{CLI_ARGUMENTS[name]["type"]}` but has value {kwargs[name]} of type {kwargs[name].__class__} instead.""",
)
return kwargs[name]

Expand Down Expand Up @@ -454,135 +452,136 @@ def __main_single_rule(
write_output(OutputReport(rules=[rule_report]), out)


# ----------------------------------------------------------------------
# NEW MULTIPROCESSING WORKER FUNCTIONS
# ----------------------------------------------------------------------
Comment on lines +455 to +457
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These comments should be removed.


def _worker_init(log_queue: Optional[Any]) -> None:
"""Initializer for multiprocessing pool workers to set up logging."""
if log_queue is None:
return

# Set up the worker's root logger to forward logs to the main process queue
root_logger = logging.getLogger()
if root_logger.hasHandlers():
root_logger.handlers.clear()

queue_handler = logging.handlers.QueueHandler(log_queue)
root_logger.addHandler(queue_handler)
# The QueueListener in the main process handles the actual level filtering
root_logger.setLevel(logging.DEBUG)


def _process_rule_task(args: tuple[str, int, Optional[int], bool, Sequence[CheckerInterface]]) -> Optional[RuleReport]:
"""Worker function to parse, validate, and analyze a single rule."""
rule_line, number, multiline_begin_number, evaluate_disabled, checkers = args

if rule_line.startswith("#"):
if evaluate_disabled:
if parse(rule_line) is None:
_logger.warning("Ignoring comment on line %i: %s", number, rule_line)
return None
else:
return None

try:
rule: Optional[Rule] = parse(rule_line)
except ParsingError:
_logger.error("Internal error in parsing of rule on line %i: %s", number, rule_line)
rule = None

ignore = __parse_type_ignore(rule)

if rule is None:
_logger.error("Error parsing rule on line %i: %s", number, rule_line)
return None

if not is_valid_rule(rule):
_logger.error("Invalid rule on line %i: %s", number, rule_line)
return None

_logger.debug("Processing rule: %s on line %i", get_rule_option(rule, "sid"), number)

rule_report: RuleReport = analyze_rule(rule, checkers=checkers, ignore=ignore)
rule_report.line_begin = multiline_begin_number or number
rule_report.line_end = number

return rule_report

# ----------------------------------------------------------------------
# END MULTIPROCESSING WORKER FUNCTIONS
# ----------------------------------------------------------------------

def process_rules_file( # noqa: C901, PLR0912, PLR0915
rules: str,
evaluate_disabled: bool,
checkers: Optional[Sequence[CheckerInterface]] = None,
) -> OutputReport:
"""Processes a rule file and returns a list of rules and their issues.

Args:
rules: A path to a Suricata rules file.
evaluate_disabled: A flag indicating whether disabled rules should be evaluated.
checkers: The checkers to be used when processing the rule file.

Returns:
A list of rules and their issues.

Raises:
RuntimeError: If no checkers could be automatically discovered.

"""
Comment on lines -462 to -475
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify why documentation was removed in several places?

"""Processes a rule file and returns a list of rules and their issues."""
if checkers is None:
checkers = get_checkers()

output = OutputReport()

with (
open(
os.path.normpath(rules),
buffering=io.DEFAULT_BUFFER_SIZE,
) as rules_fh,
):
if len(checkers) == 0:
msg = "No checkers provided for processing rules."
_logger.error(msg)
raise RuntimeError(msg)
if len(checkers) == 0:
msg = "No checkers provided for processing rules."
_logger.error(msg)
raise RuntimeError(msg)

_logger.info("Processing rule file: %s", rules)
_logger.info("Processing rule file: %s", rules)

tasks = []
with open(os.path.normpath(rules), buffering=io.DEFAULT_BUFFER_SIZE) as rules_fh:
collected_multiline_parts: Optional[str] = None
multiline_begin_number: Optional[int] = None

for number, line in enumerate(rules_fh.readlines(), start=1):
# First work on collecting and parsing multiline rules
if line.rstrip("\r\n").endswith("\\"):
multiline_part = line.rstrip("\r\n")[:-1]

if collected_multiline_parts is None:
collected_multiline_parts = multiline_part
multiline_begin_number = number
else:
collected_multiline_parts += multiline_part.lstrip()

continue

# Process final part of multiline rule if one is being collected
if collected_multiline_parts is not None:
collected_multiline_parts += line.lstrip()

rule_line = collected_multiline_parts.strip()

collected_multiline_parts = None
# If no multiline rule is being collected process as a potential single line rule
else:
if len(line.strip()) == 0:
continue

if line.strip().startswith("#"):
if evaluate_disabled:
# Verify that this line is a rule and not a comment
if parse(line) is None:
# Log the comment since it may be a invalid rule
_logger.warning(
"Ignoring comment on line %i: %s",
number,
line,
)
continue
else:
# Skip the rule
continue

rule_line = line.strip()

try:
rule: Optional[Rule] = parse(rule_line)
except ParsingError:
_logger.error(
"Internal error in parsing of rule on line %i: %s",
number,
rule_line,
)
rule = None

# Parse comment and potential ignore comment to ignore rules
ignore = __parse_type_ignore(rule)

# Verify that a rule was parsed correctly.
if rule is None:
_logger.error("Error parsing rule on line %i: %s", number, rule_line)
continue

if not is_valid_rule(rule):
_logger.error("Invalid rule on line %i: %s", number, rule_line)
continue

_logger.debug(
"Processing rule: %s on line %i",
get_rule_option(rule, "sid"),
number,
)
# Append as a task rather than processing immediately
tasks.append((rule_line, number, multiline_begin_number, evaluate_disabled, checkers))
multiline_begin_number = None

rule_report: RuleReport = analyze_rule(
rule,
checkers=checkers,
ignore=ignore,
)
rule_report.line_begin = multiline_begin_number or number
rule_report.line_end = number
# Extract the logging queue from the main process root logger
root_logger = logging.getLogger()
queue_handler = next((h for h in root_logger.handlers if isinstance(h, logging.handlers.QueueHandler)), None)
log_queue = queue_handler.queue if queue_handler else None

output.rules.append(rule_report)
# Spin up the process pool
with multiprocessing.Pool(
initializer=_worker_init,
initargs=(log_queue,)
) as pool:
# pool.map preserves the original order of the rules in the file
results = pool.map(_process_rule_task, tasks)
Comment on lines +568 to +574
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach implies processing only begins after the entire file is read. Perhaps it can start earlier and tasks can be added as lines are read?


multiline_begin_number = None
# Filter out None results and extend the output rules
output.rules.extend([res for res in results if res is not None])

_logger.info("Completed processing rule file: %s", rules)

output.summary = summarize_output(output, checkers)

return output


def __parse_type_ignore(rule: Optional[Rule]) -> Optional[Sequence[str]]:
if rule is None:
return None
Expand All @@ -599,21 +598,7 @@ def analyze_rule(
checkers: Optional[Sequence[CheckerInterface]] = None,
ignore: Optional[Sequence[str]] = None,
) -> RuleReport:
"""Checks a rule and returns a dictionary containing the rule and a list of issues found.

Args:
rule: The rule to be checked.
checkers: The checkers to be used to check the rule.
ignore: Regular expressions to match checker codes to ignore

Returns:
A list of issues found in the rule.
Each issue is typed as a `dict`.

Raises:
InvalidRuleError: If the rule does not follow the Suricata syntax.

"""
Comment on lines -602 to -616
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See other comment

"""Checks a rule and returns a dictionary containing the rule and a list of issues found."""
if not is_valid_rule(rule):
raise InvalidRuleError(rule.raw)

Expand Down