Skip to content

Commit 9229c52

Browse files
committed
leverage eql to validate subquery with synthetic sequence
1 parent 32dd5f8 commit 9229c52

File tree

2 files changed

+52
-24
lines changed

2 files changed

+52
-24
lines changed

detection_rules/rule_validators.py

Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -484,7 +484,6 @@ def _validate_against_packaged_integrations(
484484
query_text: str,
485485
packaged: list[dict[str, Any]],
486486
trailer_builder: Callable[[str, str | None, str, str, str], str],
487-
join_values: list[Any] | None = None,
488487
*,
489488
accumulate_schemas: bool = True,
490489
) -> EQL_ERROR_TYPES | ValueError | None:
@@ -510,7 +509,7 @@ def _validate_against_packaged_integrations(
510509
if accumulate_schemas:
511510
package_schemas.setdefault(package, {}).update(**integration_schema)
512511

513-
# Build trailer and validate the query text
512+
# Build trailer and validate provided text (already synthetic if needed by caller)
514513
err_trailer = trailer_builder(package, integration, package_version, stack_version, ecs_version)
515514
exc = self.validate_query_text_with_schema(
516515
query_text,
@@ -534,22 +533,6 @@ def _validate_against_packaged_integrations(
534533
elif exc is not None:
535534
return exc
536535

537-
# Validate join/group-by fields exist in this integration schema (if provided)
538-
for jf in join_values or []:
539-
jf_str = str(jf)
540-
if jf_str not in integration_schema:
541-
trailer = (
542-
f"\n\tJoin field not found in schema.\n\t"
543-
f"package: {package}, integration: {integration}, package_version: {package_version}, "
544-
f"stack: {stack_version}, ecs: {ecs_version}"
545-
)
546-
error_fields[jf_str] = {
547-
"error": ValueError(f"Unknown field: {jf_str}"),
548-
"trailer": trailer,
549-
"package": package,
550-
"integration": integration,
551-
}
552-
553536
return None
554537

555538
# Function to extract the field name from an error message
@@ -568,6 +551,13 @@ def _full_query_trailer_builder(pkg: str, integ: str | None, pkg_ver: str, stk_v
568551
f"package: {pkg}, integration: {integ}, package_version: {pkg_ver}, stack: {stk_ver}, ecs: {ecs_ver}"
569552
)
570553

554+
# Function to build a minimal synthetic sequence containing the subquery
555+
def _build_synthetic_sequence_from_subquery(subquery: "ast.SubqueryBy") -> str:
556+
subquery_text = str(subquery)
557+
join_fields = [str(j) for j in (getattr(subquery, "join_values", []) or [])]
558+
dummy_by = f" by {', '.join(join_fields)}" if join_fields else ""
559+
return f"sequence\n {subquery_text}\n [any where true]{dummy_by}"
560+
571561
# Determine if this is a sequence query via rule data flag
572562
if data.is_sequence: # type: ignore[reportAttributeAccessIssue]
573563
sequence: ast.Sequence = self.ast.first # type: ignore[reportAttributeAccessIssue]
@@ -586,18 +576,17 @@ def _full_query_trailer_builder(pkg: str, integ: str | None, pkg_ver: str, stk_v
586576
# Build subquery-specific package_integrations
587577
subquery_pkg_ints = parse_datasets(list(subquery_datasets), packages_manifest)
588578

589-
# Validate the subquery's event query (without the "by" fields)
590-
subquery_query_str = subquery.query.render() # type: ignore[reportUnknownVariableType]
579+
# Validate the entire subquery by wrapping it in a minimal sequence so EQL validates any join fields
580+
synthetic_sequence: str = _build_synthetic_sequence_from_subquery(subquery) # type: ignore[reportUnknownVariableType]
591581

592582
# Only mark as validated if there are subquery-specific integrations to check
593583
if subquery_pkg_ints:
594584
did_subquery_validation = True
595585

596586
exc = _validate_against_packaged_integrations(
597-
subquery_query_str, # type: ignore[reportUnknownVariableType]
587+
synthetic_sequence, # validate as a minimal sequence to enforce join field checks
598588
subquery_pkg_ints,
599589
_subquery_trailer_builder,
600-
join_values=list(getattr(subquery, "join_values", []) or []), # type: ignore[reportUnknownVariableType]
601590
accumulate_schemas=False,
602591
)
603592
if exc is not None:
@@ -610,7 +599,6 @@ def _full_query_trailer_builder(pkg: str, integ: str | None, pkg_ver: str, stk_v
610599
self.query,
611600
package_integrations,
612601
_full_query_trailer_builder,
613-
join_values=None,
614602
)
615603
if exc is not None:
616604
return exc
@@ -637,7 +625,6 @@ def _full_query_trailer_builder(pkg: str, integ: str | None, pkg_ver: str, stk_v
637625
self.query,
638626
package_integrations,
639627
_full_query_trailer_builder,
640-
join_values=None,
641628
)
642629
if exc is not None:
643630
return exc

tests/test_python_library.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,47 @@ def test_sequence_invalid_join_field_wrong_package(self) -> None:
133133
with self.assertRaisesRegex(ValueError, r"Error in both stack and integrations checks"):
134134
rc.load_dict(bad_rule)
135135

136+
def test_sequence_top_level_by_and_runs_across_integrations_valid(self) -> None:
137+
"""Sequence-level by and per-subquery runs; subqueries use different integrations and validate correctly."""
138+
rc = RuleCollection()
139+
query = """
140+
sequence by host.id, user.id with maxspan=1s
141+
[any where event.dataset == "azure.auditlogs" and event.action == "Register device"] by azure.auditlogs.properties.initiated_by.user.userPrincipalName with runs=5
142+
[authentication where event.dataset == "okta.system" and okta.event_type == "user.mfa.okta_verify.deny_push"] by okta.actor.id
143+
"""
144+
rule = {
145+
"metadata": mk_metadata(["azure", "okta"], comments="Top-level sequence by and runs"),
146+
"rule": mk_rule(
147+
name="EQL sequence with top-level by and runs",
148+
rule_id="4e5f6a99-4567-4f8d-9f72-1d8e5f3e5f15",
149+
description="Validate top-level sequence by and per-subquery runs across integrations.",
150+
risk_score=42,
151+
query=query,
152+
),
153+
}
154+
rc.load_dict(rule)
155+
156+
def test_sequence_top_level_by_and_runs_across_integrations_invalid_join(self) -> None:
157+
"""Sequence-level by with runs; okta subquery incorrectly uses an azure join field causing validation failure."""
158+
rc = RuleCollection()
159+
query = """
160+
sequence by host.id, user.id with maxspan=1s
161+
[any where event.dataset == "azure.auditlogs" and event.action == "Register device"] by azure.auditlogs.properties.initiated_by.user.userPrincipalName with runs=5
162+
[authentication where event.dataset == "okta.system" and okta.event_type == "user.mfa.okta_verify.deny_push"] by azure.auditlogs.properties.initiated_by.user.userPrincipalName
163+
"""
164+
bad_rule = {
165+
"metadata": mk_metadata(["azure", "okta"], comments="Top-level sequence by and runs invalid join"),
166+
"rule": mk_rule(
167+
name="EQL sequence with top-level by and runs invalid",
168+
rule_id="4e5f6a99-4567-4f8d-9f72-1d8e5f3e5f16",
169+
description="Invalid: okta subquery uses azure join field.",
170+
risk_score=42,
171+
query=query,
172+
),
173+
}
174+
with self.assertRaisesRegex(ValueError, r"Error in both stack and integrations checks"):
175+
rc.load_dict(bad_rule)
176+
136177
def test_sequence_okta_missing_in_metadata_but_present_in_dataset(self) -> None:
137178
"""Okta dataset appears in a subquery but is not listed in metadata; dataset should drive schema selection."""
138179
rc = RuleCollection()

0 commit comments

Comments
 (0)