Skip to content
27 changes: 25 additions & 2 deletions keep/rulesengine/rulesengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import re
from typing import List, Optional
from copy import deepcopy

import celpy
import celpy.c7nlib
Expand Down Expand Up @@ -455,11 +456,12 @@ def _check_if_rule_apply(self, rule: Rule, event: AlertDto) -> List[str]:
Evaluates if a rule applies to an event using CEL. Handles type coercion for ==/!= between int and str.
"""
sub_rules = self._extract_subrules(rule.definition_cel)
payload = event.dict()
payload = deepcopy(event.dict())
# workaround since source is a list
# todo: fix this in the future
payload["source"] = payload["source"][0]
payload = RulesEngine.sanitize_cel_payload(payload)
normalized_payload = None

# what we do here is to compile the CEL rule and evaluate it
# https://github.com/cloud-custodian/cel-python
Expand All @@ -477,7 +479,8 @@ def _check_if_rule_apply(self, rule: Rule, event: AlertDto) -> List[str]:
sub_rule = sub_rule.replace("null", '""')
ast = self.env.compile(sub_rule)
prgm = self.env.program(ast)
activation = celpy.json_to_cel(json.loads(json.dumps(payload, default=str)))
current_payload = normalized_payload or payload
activation = celpy.json_to_cel(json.loads(json.dumps(current_payload, default=str)))
try:
r = prgm.evaluate(activation)
except celpy.evaluation.CELEvalError as e:
Expand All @@ -490,6 +493,26 @@ def _check_if_rule_apply(self, rule: Rule, event: AlertDto) -> List[str]:
e
):
try:
if normalized_payload is None and "StringType" in str(e) and "BoolType" in str(e):
normalized_payload = deepcopy(payload)
# Normilize boolean strings to actual booleans base on AlertDTO
for field_name, model_field in AlertDto.__fields__.items():
val = normalized_payload.get(field_name)
if issubclass(model_field.type_, bool) and isinstance(payload.get(field_name), str):
if val is not None:
lower = val.lower()
if lower == "true":
normalized_payload[field_name] = True
elif lower == "false":
normalized_payload[field_name] = False
activation = celpy.json_to_cel(json.loads(json.dumps(normalized_payload, default=str)))
try:
r = prgm.evaluate(activation)
if r:
sub_rules_matched.append(sub_rule)
continue
except celpy.evaluation.CELEvalError:
pass
coerced = self._coerce_eq_type_error(
sub_rule, prgm, activation, event
)
Expand Down
53 changes: 39 additions & 14 deletions keep/workflowmanager/workflowmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,24 +428,49 @@ def insert_events(self, tenant_id, events: typing.List[AlertDto | IncidentDto]):
except (ValueError, AttributeError):
# If severity conversion fails, keep original value
pass

activation = celpy.json_to_cel(event_payload)
try:
should_run = program.evaluate(activation)
except celpy.evaluation.CELEvalError as e:
self.logger.exception(
"Error evaluating CEL for event in insert_events",
extra={
"exception": e,
"event": event,
"trigger": trigger,
"workflow_id": workflow_model.id,
"tenant_id": tenant_id,
"cel": trigger["cel"],
"deprecated_filters": trigger.get("filters"),
},
)
continue
if "StringType" in str(e) and "BoolType" in str(e):
# Normilize boolean strings to actual booleans base on AlertDTO
for field_name, model_field in AlertDto.__fields__.items():
if issubclass(model_field.type_, bool) and isinstance(event_payload.get(field_name), str):
if event_payload[field_name].lower() == "true":
event_payload[field_name] = True
elif event_payload[field_name].lower() == "false":
event_payload[field_name] = False
activation = celpy.json_to_cel(event_payload)
try:
should_run = program.evaluate(activation)
except celpy.evaluation.CELEvalError as exc:
self.logger.exception(
"Error evaluating CEL for event in insert_events after normalizing boolean strings",
extra={
"exception": exc,
"event": event,
"trigger": trigger,
"workflow_id": workflow_model.id,
"tenant_id": tenant_id,
"cel": trigger["cel"],
"deprecated_filters": trigger.get("filters"),
},
)
continue
else:
self.logger.exception(
"Error evaluating CEL for event in insert_events",
extra={
"exception": e,
"event": event,
"trigger": trigger,
"workflow_id": workflow_model.id,
"tenant_id": tenant_id,
"cel": trigger["cel"],
"deprecated_filters": trigger.get("filters"),
},
)
continue

if bool(should_run) is False:
self.logger.debug(
Expand Down
152 changes: 151 additions & 1 deletion tests/test_alert_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@
# Shahar: since js2py is not secured, I've commented out this tests
# TODO: fix js2py and uncomment the tests

from datetime import timedelta
from datetime import timedelta, datetime

import pytest
from freezegun import freeze_time

from keep.api.core.db import get_incidents_by_alert_fingerprint
from keep.api.core.dependencies import SINGLE_TENANT_UUID
from keep.api.models.alert import AlertStatus
from keep.api.models.db.mapping import MappingRule
from keep.api.models.db.rule import Rule
from keep.contextmanager.contextmanager import ContextManager
from keep.providers.keep_provider.keep_provider import KeepProvider
from keep.searchengine.searchengine import SearchEngine
from keep.workflowmanager.workflowmanager import WorkflowManager
from keep.api.models.db.workflow import Workflow as WorkflowDB

steps_dict = {
# this is the step that will be used to trigger the alert
Expand Down Expand Up @@ -1074,3 +1080,147 @@ def test_check_if_rule_apply_int_str_type_coercion(db_session):
assert (
len(matched_rules4) == 1
), "Rule with 'field == \"2\"' should match alert with field='2'"

@pytest.mark.parametrize(
"enrich_mapping_value, rule_value_activation, should_be_executed",
[
("true", "true", True),
("false", "true", False),
("true", "false", False),
("false", "false", True),
]
)
def test_check_if_rule_apply_dismissed_incident(
db_session,
create_alert,
enrich_mapping_value,
rule_value_activation,
should_be_executed
):
"""
Feature: Dismissed Alerts Handling with CEL
Scenario: Using Mapping feature to dismiss alerts,
CEL expresion should recognice the dismissed status.
"""
#GIVEN The mapping rule modify the "dismissed" attribute
mapping_data = [
{"service": "app1", "dismissed": enrich_mapping_value},
]

mapping_rule = MappingRule(
tenant_id=SINGLE_TENANT_UUID,
name="Service Mapping",
description="Map service to additional attributes",
type="csv",
matchers=[["service"]],
rows=mapping_data,
file_name="service_mapping.csv",
priority=1,
created_by=SINGLE_TENANT_UUID,
)
db_session.add(mapping_rule)
db_session.commit()

#AND The rule use CEL expression to check the "dismissed" attribute
rule = Rule(
id="test-rule-1",
tenant_id=SINGLE_TENANT_UUID,
name="Test Rule - Dismissed Alerts",
definition_cel=f'dismissed == {rule_value_activation} && service == "app1"',
definition={},
timeframe=60,
timeunit="seconds",
created_by="[email protected]",
creation_time=datetime.utcnow(),
grouping_criteria=[],
threshold=1,
)
db_session.add(rule)
db_session.commit()
#AND An alert coming to be enriched by mapping rule
create_alert(
"fpw1",
AlertStatus.FIRING,
datetime.utcnow(),
{"service": "app1"}
)
#WHEN The rules engine process the alert
total_execs = len(get_incidents_by_alert_fingerprint(
SINGLE_TENANT_UUID, "fpw1"
))

#THEN The incidents should be executed or not depending on the values
assert total_execs == (1 if should_be_executed else 0)

@pytest.mark.parametrize(
"enrich_mapping_value, wf_value_activation, should_be_executed",
[
("true", "true", True),
("false", "true", False),
("true", "false", False),
("false", "false", True),
]
)
def test_check_if_rule_apply_dismissed_workflow(
db_session,
create_alert,
enrich_mapping_value,
wf_value_activation,
should_be_executed
):
"""
Feature: Dismissed Alerts Handling with CEL
Scenario: Using Mapping feature to dismiss alerts,
CEL expresion should recognice the dismissed status.
"""
#GIVEN The mapping rule modify the "dismissed" attribute
mapping_data = [
{"service": "app1", "dismissed": enrich_mapping_value},
]

#AND The workflow is filtering using CEL expression on "dismissed" attribute
workflow_definition = f"""workflow:
id: service-check
triggers:
- type: alert
cel: dismissed=={wf_value_activation}
"""

mapping_rule = MappingRule(
tenant_id=SINGLE_TENANT_UUID,
name="Service Mapping",
description="Map service to additional attributes",
type="csv",
matchers=[["service"]],
rows=mapping_data,
file_name="service_mapping.csv",
priority=1,
created_by=SINGLE_TENANT_UUID,
)
db_session.add(mapping_rule)
db_session.commit()

workflow = WorkflowDB(
id="dimissed-cel-wf",
name="dimissed-cel-wf",
tenant_id=SINGLE_TENANT_UUID,
description="Handle alerts for specific services",
created_by="[email protected]",
interval=0,
workflow_raw=workflow_definition,
)
db_session.add(workflow)
db_session.commit()
#AND An alert coming to be enriched by mapping rule
create_alert(
"fpw1",
AlertStatus.FIRING,
datetime.utcnow(),
{"service": "app1"}
)
#WHEN The workflow evaluates CEL Workflow vs Alert values enriched
total_execs = len(WorkflowManager.get_instance().scheduler.workflows_to_run)

WorkflowManager.get_instance().scheduler.workflows_to_run.clear()
#THEN The workflow should be executed or not depending on the values
assert total_execs == (1 if should_be_executed else 0)
1 change: 1 addition & 0 deletions tests/test_workflow_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -1150,3 +1150,4 @@ def test_cel_expression_filter(db_session):
assert all(a.severity == "critical" for a in triggered_alerts)
assert not any(a.id == "alert-3" for a in triggered_alerts)
assert not any(a.id == "alert-4" for a in triggered_alerts)

Loading