Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pySigma-validators-sigmahq"
version = "0.12.2"
version = "0.13.0"
description = "pySigma SigmaHQ validators"
authors = ["François Hubaut <frack113@users.noreply.github.com>"]
license = "LGPL-2.1-only"
Expand Down
53 changes: 53 additions & 0 deletions sigma/validators/sigmahq/correlation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from dataclasses import dataclass
from typing import ClassVar, List

from sigma.correlations import SigmaCorrelationRule, SigmaCorrelationType
from sigma.rule import SigmaRuleBase
from sigma.validators.base import (
SigmaRuleValidator,
SigmaValidationIssue,
SigmaValidationIssueSeverity,
)


@dataclass
class SigmahqCorrelationRulesMinimumIssue(SigmaValidationIssue):
description: ClassVar[str] = (
"Correlation rule must reference at least 2 rules for temporal types"
)
severity: ClassVar[SigmaValidationIssueSeverity] = SigmaValidationIssueSeverity.HIGH


class SigmahqCorrelationRulesMinimumValidator(SigmaRuleValidator):
"""Checks if temporal correlation rules have at least 2 rules."""

def validate(self, rule: SigmaRuleBase) -> List[SigmaValidationIssue]:
if isinstance(rule, SigmaCorrelationRule):
if rule.type in [SigmaCorrelationType.TEMPORAL, SigmaCorrelationType.TEMPORAL_ORDERED]:
if len(rule.rules) < 2:
return [SigmahqCorrelationRulesMinimumIssue([rule])]
return []


@dataclass
class SigmahqCorrelationGroupByExistenceIssue(SigmaValidationIssue):
description: ClassVar[str] = (
"Correlation rule is missing the group-by field in correlation section"
)
severity: ClassVar[SigmaValidationIssueSeverity] = SigmaValidationIssueSeverity.HIGH


class SigmahqCorrelationGroupByExistenceValidator(SigmaRuleValidator):
"""Checks if a correlation rule has a group-by field for types that require it."""

def validate(self, rule: SigmaRuleBase) -> List[SigmaValidationIssue]:
if isinstance(rule, SigmaCorrelationRule):
if rule.type in [
SigmaCorrelationType.EVENT_COUNT,
SigmaCorrelationType.VALUE_COUNT,
SigmaCorrelationType.TEMPORAL,
SigmaCorrelationType.TEMPORAL_ORDERED,
]:
if rule.group_by is None or len(rule.group_by) == 0:
return [SigmahqCorrelationGroupByExistenceIssue([rule])]
return []
14 changes: 11 additions & 3 deletions sigma/validators/sigmahq/detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
from sigma.rule import (
SigmaRule,
SigmaDetectionItem,
SigmaRuleBase,
)

from sigma.validators.base import (
SigmaValidationIssue,
SigmaRuleValidator,
SigmaValidationIssueSeverity,
SigmaDetectionItemValidator,
SigmaDetectionItem,
Expand All @@ -32,7 +32,11 @@ class SigmahqCategoryEventIdIssue(SigmaValidationIssue):
class SigmahqCategoryEventIdValidator(SigmaDetectionItemValidator):
"""Checks if a rule uses an EventID field with a windows category logsource that doesn't require it."""

def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]:
def validate(self, rule: SigmaRuleBase) -> List[SigmaValidationIssue]:
# Only validate SigmaRule (detection rules), not correlation rules
if not isinstance(rule, SigmaRule):
return []

if (
rule.logsource.product == "windows"
and rule.logsource.category in config.windows_no_eventid
Expand Down Expand Up @@ -61,7 +65,11 @@ class SigmahqCategoryWindowsProviderNameIssue(SigmaValidationIssue):
class SigmahqCategoryWindowsProviderNameValidator(SigmaDetectionItemValidator):
"""Checks if a rule uses a Provider_Name field with a windows category logsource that doesn't require it."""

def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]:
def validate(self, rule: SigmaRuleBase) -> List[SigmaValidationIssue]:
# Only validate SigmaRule (detection rules), not correlation rules
if not isinstance(rule, SigmaRule):
return []

if rule.logsource in config.windows_provider_name:
self.list_provider = config.windows_provider_name[rule.logsource]
return super().validate(rule)
Expand Down
20 changes: 16 additions & 4 deletions sigma/validators/sigmahq/field.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import ClassVar, Dict, List, Tuple
import re

from sigma.rule import SigmaRule, SigmaLogSource
from sigma.rule import SigmaRule, SigmaLogSource, SigmaRuleBase
from sigma.types import SigmaString
from sigma.validators.base import (
SigmaValidationIssue,
Expand Down Expand Up @@ -57,7 +57,11 @@ class SigmahqFieldnameCastIssue(SigmaValidationIssue):
class SigmahqFieldnameCastValidator(SigmaDetectionItemValidator):
"""Check field name have a cast error."""

def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]:
def validate(self, rule: SigmaRuleBase) -> List[SigmaValidationIssue]:
# Only validate SigmaRule (detection rules), not correlation rules
if not isinstance(rule, SigmaRule):
return []

core_logsource = SigmaLogSource(
category=rule.logsource.category,
product=rule.logsource.product,
Expand Down Expand Up @@ -95,7 +99,11 @@ class SigmahqInvalidFieldnameIssue(SigmaValidationIssue):
class SigmahqInvalidFieldnameValidator(SigmaDetectionItemValidator):
"""Check field name do not exist in the logsource."""

def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]:
def validate(self, rule: SigmaRuleBase) -> List[SigmaValidationIssue]:
# Only validate SigmaRule (detection rules), not correlation rules
if not isinstance(rule, SigmaRule):
return []

core_logsource = SigmaLogSource(
category=rule.logsource.category,
product=rule.logsource.product,
Expand Down Expand Up @@ -288,7 +296,11 @@ class SigmahqRedundantFieldIssue(SigmaValidationIssue):
class SigmahqRedundantFieldValidator(SigmaDetectionItemValidator):
"""Check if a field name is already covered by the logsource."""

def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]:
def validate(self, rule: SigmaRuleBase) -> List[SigmaValidationIssue]:
# Only validate SigmaRule (detection rules), not correlation rules
if not isinstance(rule, SigmaRule):
return []

core_logsource = SigmaLogSource(
category=rule.logsource.category,
product=rule.logsource.product,
Expand Down
57 changes: 56 additions & 1 deletion sigma/validators/sigmahq/filename.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import ClassVar, Dict, List

from sigma.rule import SigmaRule, SigmaLogSource, SigmaRuleBase
from sigma.correlations import SigmaCorrelationRule

from sigma.validators.base import (
SigmaRuleValidator,
Expand All @@ -22,6 +23,13 @@ class SigmahqFilenameConventionIssue(SigmaValidationIssue):
filename: str


@dataclass
class SigmahqCorrelationFilenamePrefixIssue(SigmaValidationIssue):
description: ClassVar[str] = "Correlation rule filename must start with 'correlation_'"
severity: ClassVar[SigmaValidationIssueSeverity] = SigmaValidationIssueSeverity.MEDIUM
filename: str


class SigmahqFilenameConventionValidator(SigmaRuleValidator):
"""Check a rule filename against SigmaHQ filename convention."""

Expand All @@ -34,6 +42,24 @@ def validate(self, rule: SigmaRuleBase) -> List[SigmaValidationIssue]:
return []


class SigmahqCorrelationFilenamePrefixValidator(SigmaRuleValidator):
"""Check that correlation rule filenames start with 'correlation_'."""

def validate(self, rule: SigmaRuleBase) -> List[SigmaValidationIssue]:
# Only validate correlation rules
if not isinstance(rule, SigmaCorrelationRule):
return []

if rule.source is not None:
filename = rule.source.path.name

# All correlation files (pure or combined) must start with 'correlation_'
if not filename.startswith("correlation_"):
return [SigmahqCorrelationFilenamePrefixIssue([rule], filename)]

return []


@dataclass
class SigmahqFilenamePrefixIssue(SigmaValidationIssue):
description: ClassVar[str] = "The rule filename prefix doesn't match the SigmaHQ convention"
Expand All @@ -46,7 +72,36 @@ class SigmahqFilenamePrefixIssue(SigmaValidationIssue):
class SigmahqFilenamePrefixValidator(SigmaRuleValidator):
"""Check a rule filename against SigmaHQ filename prefix convention."""

def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]:
def _is_combined_file(self, rule: SigmaRuleBase) -> bool:
"""
Check if the file contains a combined format (both detection(s) and correlation rules).
This is determined by reading the file and checking for YAML document separator.
"""
if rule.source is None:
return False

try:
with open(rule.source.path, "r", encoding="utf-8") as f:
content = f.read()
# Check if file contains both correlation and detection/logsource sections
has_separator = "\n---\n" in content or "\n---" in content
has_correlation = "correlation:" in content
has_logsource = "logsource:" in content

# Combined if it has separator and both correlation and logsource
return has_separator and has_correlation and has_logsource
except:
return False

def validate(self, rule: SigmaRuleBase) -> List[SigmaValidationIssue]:
# Only validate SigmaRule (detection rules), not correlation rules
if not isinstance(rule, SigmaRule):
return []

# Skip validation for combined format files (they can have multiple logsources)
if self._is_combined_file(rule):
return []

if rule.source is not None:
filename = rule.source.path.name
logsource = SigmaLogSource(
Expand Down
8 changes: 6 additions & 2 deletions sigma/validators/sigmahq/logsource.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class SigmahqLogsourceUnknownIssue(SigmaValidationIssue):
class SigmahqLogsourceUnknownValidator(SigmaRuleValidator):
"""Checks if a rule uses an unknown logsource."""

def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]:
def validate(self, rule: SigmaRuleBase) -> List[SigmaValidationIssue]:
# Ensure rule is a SigmaRule instance to access logsource
logsource = getattr(rule, "logsource", None)
if logsource is not None:
Expand Down Expand Up @@ -51,7 +51,11 @@ class SigmahqSysmonMissingEventidIssue(SigmaValidationIssue):
class SigmahqSysmonMissingEventidValidator(SigmaRuleValidator):
"""Checks if a rule uses the windows sysmon service logsource without the EventID field."""

def validate(self, rule: SigmaRule) -> List[SigmaValidationIssue]:
def validate(self, rule: SigmaRuleBase) -> List[SigmaValidationIssue]:
# Only validate SigmaRule (detection rules), not correlation rules
if not isinstance(rule, SigmaRule):
return []

if rule.logsource.service == "sysmon":
find = False
for selection in rule.detection.detections.values():
Expand Down
5 changes: 5 additions & 0 deletions sigma/validators/sigmahq/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import re

from sigma.rule import SigmaRuleBase, SigmaStatus
from sigma.correlations import SigmaCorrelationRule
from sigma.validators.base import (
SigmaRuleValidator,
SigmaValidationIssue,
Expand Down Expand Up @@ -275,6 +276,10 @@ def validate(self, rule: SigmaRuleBase) -> List[SigmaValidationIssue]:
custom_keys = list(rule.custom_attributes.keys())
allowed_fields = {"regression_tests_path", "simulation"}

# For correlation rules, the 'correlation' field is standard, not custom
if isinstance(rule, SigmaCorrelationRule):
allowed_fields.add("correlation")

# Find any custom attributes that are not in the allowed list
unknown_fields = [key for key in custom_keys if key not in allowed_fields]

Expand Down
33 changes: 33 additions & 0 deletions tests/files/rules-correlations/correlation_combined_format.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
title: Test Detection Rule in Combined File
id: aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa
status: test
description: This is a test detection rule in a combined format file
author: Test Author
date: 2024-01-01
level: medium
logsource:
category: process_creation
product: windows
detection:
selection:
Image|endswith: '\test.exe'
condition: selection
falsepositives:
- Unknown
---
title: Test Correlation Rule in Combined File
id: bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb
status: test
description: This is a test correlation rule in a combined format file
author: Test Author
date: 2024-01-01
level: high
correlation:
type: event_count
rules:
- aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa
group-by:
- Computer
timespan: 5m
condition:
gte: 10
16 changes: 16 additions & 0 deletions tests/files/rules-correlations/correlation_valid_filename.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
title: Test Correlation Rule With Correct Filename
id: 12345678-1234-1234-1234-123456789012
status: test
description: This is a test correlation rule with a correctly named file
author: Test Author
date: 2024-01-01
level: high
correlation:
type: event_count
rules:
- 87654321-4321-4321-4321-210987654321
group-by:
- Computer
timespan: 5m
condition:
gte: 10
16 changes: 16 additions & 0 deletions tests/files/rules-correlations/invalid_prefix_name.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
title: Test Correlation Rule With Incorrect Filename
id: 98765432-9876-9876-9876-987654321098
status: test
description: This is a test correlation rule with an incorrectly named file
author: Test Author
date: 2024-01-01
level: high
correlation:
type: event_count
rules:
- 87654321-4321-4321-4321-210987654321
group-by:
- Computer
timespan: 5m
condition:
gte: 10
Loading
Loading