Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/sentry/workflow_engine/processors/data_condition_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,18 @@ def with_error(self, error: ConditionError) -> "TriggerResult":
return self
return TriggerResult(triggered=self.triggered, error=error)

@staticmethod
def choose_tainted(a: "TriggerResult", b: "TriggerResult") -> "TriggerResult":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we just make this a list of TriggerResults and return the first tainted? might be a little more reusable that way.

"""
Returns the first tainted TriggerResult, or `a` if neither is tainted.
Useful for tracking whether any evaluation in a series was tainted.
"""
if a.is_tainted():
return a
if b.is_tainted():
return b
return a

@staticmethod
def any(items: Iterable["TriggerResult"]) -> "TriggerResult":
"""
Expand Down
117 changes: 87 additions & 30 deletions src/sentry/workflow_engine/processors/workflow.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections.abc import Collection, Sequence
from dataclasses import asdict, replace
from collections.abc import Collection, Iterable, Sequence
from dataclasses import asdict, dataclass, replace
from datetime import datetime
from enum import StrEnum
from typing import DefaultDict
Expand Down Expand Up @@ -27,6 +27,7 @@
WorkflowEventContextData,
)
from sentry.workflow_engine.processors.data_condition_group import (
TriggerResult,
get_data_conditions_for_group,
process_data_condition_group,
)
Expand All @@ -48,6 +49,37 @@ class WorkflowDataConditionGroupType(StrEnum):
WORKFLOW_TRIGGER = "workflow_trigger"


@dataclass(frozen=True)
class EvaluationStats:
"""
Counts of fully-evaluated workflows by result reliability.
Tainted results may be incorrect due to errors during evaluation.
"""

tainted: int = 0
untainted: int = 0

@classmethod
def from_results(cls, results: Iterable[TriggerResult]) -> "EvaluationStats":
tainted, untainted = 0, 0
for result in results:
if result.is_tainted():
tainted += 1
else:
untainted += 1
return cls(tainted=tainted, untainted=untainted)

def __add__(self, other: "EvaluationStats") -> "EvaluationStats":
return EvaluationStats(
tainted=self.tainted + other.tainted,
untainted=self.untainted + other.untainted,
)

def report_metrics(self, metric_name: str) -> None:
metrics_incr(metric_name, self.tainted, tags={"tainted": True})
metrics_incr(metric_name, self.untainted, tags={"tainted": False})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated stats class for tainted evaluation tracking

Low Severity

The new EvaluationStats class duplicates the existing _ConditionEvaluationStats class in delayed_workflow.py. Both have identical fields (tainted: int, untainted: int) and serve the same purpose of tracking tainted vs untainted evaluation counts. The new class adds useful methods (from_results, __add__, report_metrics) that could benefit the delayed workflow code as well. These should be unified into a single class to avoid maintenance burden and ensure consistent taint tracking across both immediate and delayed evaluation paths.

Fix in Cursor Fix in Web

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well aware, but need to make that code workflow based first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 should these be workflow based methods?

One thing i've been thinking about is if we could compose these condition group / condition evaluation methods more, to then reuse in delayed processing as well. If we go down that approach, i'd think of these as DataCondition based.



def delete_workflow(workflow: Workflow) -> bool:
with transaction.atomic(router.db_for_write(Workflow)):
action_filters = DataConditionGroup.objects.filter(
Expand Down Expand Up @@ -135,15 +167,16 @@ def evaluate_workflow_triggers(
workflows: set[Workflow],
event_data: WorkflowEventData,
event_start_time: datetime,
) -> tuple[set[Workflow], dict[Workflow, DelayedWorkflowItem]]:
) -> tuple[dict[Workflow, TriggerResult], dict[Workflow, DelayedWorkflowItem], EvaluationStats]:
"""
Returns a tuple of (triggered_workflows, queue_items_by_workflow)
- triggered_workflows: set of workflows that were triggered
Returns a tuple of (triggered_workflows, queue_items_by_workflow, stats)
- triggered_workflows: mapping of workflows that triggered to their evaluation result
- queue_items_by_workflow: mapping of workflow to the delayed workflow item, used
in the next step (evaluate action filters) to enqueue workflows with slow conditions
within that function
- stats: tainted/untainted counts for workflows that didn't trigger (fully evaluated)
"""
triggered_workflows: set[Workflow] = set()
triggered_workflows: dict[Workflow, TriggerResult] = {}
queue_items_by_workflow: dict[Workflow, DelayedWorkflowItem] = {}

dcg_ids = [
Expand All @@ -160,6 +193,7 @@ def evaluate_workflow_triggers(
project.organization,
)

tainted_untriggered, untainted_untriggered = 0, 0
for workflow in workflows:
when_data_conditions = None
if dcg_id := workflow.when_condition_group_id:
Expand All @@ -180,11 +214,8 @@ def evaluate_workflow_triggers(
timestamp=event_start_time,
)
else:
"""
Tracking when we try to enqueue a slow condition for an activity.
Currently, we are assuming those cases are evaluating as True since
an activity update is meant to respond to a previous event.
"""
# Activity updates with slow conditions are not enqueued because an activity
# update is meant to respond to a previous event.
metrics_incr("process_workflows.enqueue_workflow.activity")
logger.debug(
"workflow_engine.process_workflows.enqueue_workflow.activity",
Expand All @@ -195,7 +226,7 @@ def evaluate_workflow_triggers(
)
else:
if evaluation.triggered:
triggered_workflows.add(workflow)
triggered_workflows[workflow] = evaluation
if dual_processing_logs_enabled:
try:
detector = WorkflowEventContext.get().detector
Expand All @@ -212,19 +243,22 @@ def evaluate_workflow_triggers(
)
except DetectorWorkflow.DoesNotExist:
continue
else:
if evaluation.is_tainted():
tainted_untriggered += 1
else:
untainted_untriggered += 1
Comment on lines +247 to +250
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it kinda feels like the tainted / untainted stuff could be encapsulated a little more. could we just add the evaluation result to a list and have it determine this information? That way we don't need to independently track this then rebuild it for the results


metrics_incr(
"process_workflows.triggered_workflows",
len(triggered_workflows),
)
stats = EvaluationStats(tainted=tainted_untriggered, untainted=untainted_untriggered)
metrics_incr("process_workflows.triggered_workflows", len(triggered_workflows))

# TODO - Remove `environment` access once it's in the shared logger.
environment = WorkflowEventContext.get().environment
if environment is None:
try:
environment = get_environment_by_event(event_data)
except Environment.DoesNotExist:
return set(), {}
return {}, {}, stats

event_id = (
event_data.event.event_id
Expand All @@ -243,27 +277,31 @@ def evaluate_workflow_triggers(
},
)

return triggered_workflows, queue_items_by_workflow
return triggered_workflows, queue_items_by_workflow, stats


@sentry_sdk.trace
@scopedstats.timer()
def evaluate_workflows_action_filters(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated: we might want to look at decomposing this method and the trigger condition methods. it seems like we could probably compose these two a bit more and reduce code replication

workflows: set[Workflow],
triggered_workflows: dict[Workflow, TriggerResult],
event_data: WorkflowEventData,
queue_items_by_workflow: dict[Workflow, DelayedWorkflowItem],
event_start_time: datetime,
) -> tuple[set[DataConditionGroup], dict[Workflow, DelayedWorkflowItem]]:
) -> tuple[set[DataConditionGroup], dict[Workflow, DelayedWorkflowItem], EvaluationStats]:
"""
Evaluate the action filters for the given workflows.
Returns a set of DataConditionGroups that were evaluated to True.
Enqueues workflows with slow conditions to be evaluated in a batched task.
Returns a tuple of (filtered_action_groups, queue_items_by_workflow, stats)
- filtered_action_groups: set of DataConditionGroups that were evaluated to True
- queue_items_by_workflow: updated with workflows that have slow conditions
- stats: tainted/untainted counts for fully-evaluated workflows
"""
# Collect all workflows, including those with pending slow condition results (queue_items_by_workflow)
# to evaluate all fast conditions
all_workflows = workflows.union(set(queue_items_by_workflow.keys()))
all_workflows: set[Workflow] = set(triggered_workflows.keys()) | set(
queue_items_by_workflow.keys()
)

action_conditions_to_workflow = {
action_conditions_to_workflow: dict[DataConditionGroup, Workflow] = {
wdcg.condition_group: wdcg.workflow
for wdcg in WorkflowDataConditionGroup.objects.select_related(
"workflow", "condition_group"
Expand All @@ -288,6 +326,9 @@ def evaluate_workflows_action_filters(
)
}

workflow_to_result: dict[int, TriggerResult] = {
wf.id: result for wf, result in triggered_workflows.items()
}
for action_condition_group, workflow in action_conditions_to_workflow.items():
env = env_by_id.get(workflow.environment_id) if workflow.environment_id else None
workflow_event_data = replace(event_data, workflow_env=env)
Expand Down Expand Up @@ -327,6 +368,12 @@ def evaluate_workflows_action_filters(
},
)
else:
# Only accumulate taint for triggered workflows (not those with slow WHEN conditions)
if workflow.id in workflow_to_result:
workflow_to_result[workflow.id] = TriggerResult.choose_tainted(
workflow_to_result[workflow.id], group_evaluation.logic_result
)

if group_evaluation.logic_result.triggered:
if delayed_workflow_item := queue_items_by_workflow.get(workflow):
if delayed_workflow_item.delayed_when_group_id:
Expand All @@ -336,6 +383,12 @@ def evaluate_workflows_action_filters(
else:
filtered_action_groups.add(action_condition_group)

# Count tainted/untainted only for fully-evaluated workflows (not delayed)
fully_evaluated_workflows = triggered_workflows.keys() - queue_items_by_workflow.keys()
stats = EvaluationStats.from_results(
workflow_to_result[wf.id] for wf in fully_evaluated_workflows
)

event_id = (
event_data.event.event_id
if isinstance(event_data.event, GroupEvent)
Expand All @@ -347,7 +400,7 @@ def evaluate_workflows_action_filters(
extra={
"group_id": event_data.group.id,
"event_id": event_id,
"workflow_ids": [workflow.id for workflow in action_conditions_to_workflow.values()],
"workflow_ids": [wf.id for wf in action_conditions_to_workflow.values()],
"action_conditions": [
action_condition_group.id
for action_condition_group in action_conditions_to_workflow.keys()
Expand All @@ -357,7 +410,7 @@ def evaluate_workflows_action_filters(
},
)

return filtered_action_groups, queue_items_by_workflow
return filtered_action_groups, queue_items_by_workflow, stats


def get_environment_by_event(event_data: WorkflowEventData) -> Environment | None:
Expand Down Expand Up @@ -516,13 +569,14 @@ def process_workflows(
data=workflow_evaluation_data,
)

triggered_workflows, queue_items_by_workflow_id = evaluate_workflow_triggers(
triggered_workflows, queue_items_by_workflow_id, trigger_stats = evaluate_workflow_triggers(
workflows, event_data, event_start_time
)

workflow_evaluation_data.triggered_workflows = triggered_workflows
workflow_evaluation_data.triggered_workflows = set(triggered_workflows.keys())

if not triggered_workflows and not queue_items_by_workflow_id:
trigger_stats.report_metrics("process_workflows.workflows_evaluated")
# TODO - re-think tainted once the actions are removed from process_workflows.
return WorkflowEvaluation(
tainted=True,
Expand All @@ -532,9 +586,12 @@ def process_workflows(

# TODO - we should probably return here and have the rest from here be
# `process_actions`, this will take a list of "triggered_workflows"
actions_to_trigger, queue_items_by_workflow_id = evaluate_workflows_action_filters(
triggered_workflows, event_data, queue_items_by_workflow_id, event_start_time
actions_to_trigger, queue_items_by_workflow_id, action_stats = (
evaluate_workflows_action_filters(
triggered_workflows, event_data, queue_items_by_workflow_id, event_start_time
)
)
(trigger_stats + action_stats).report_metrics("process_workflows.workflows_evaluated")

enqueue_workflows(batch_client, queue_items_by_workflow_id)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,3 +568,9 @@ def test_none_with_generator_preserves_error(self) -> None:
assert TriggerResult.none(iter([FALSE, FALSE.with_error(ERR), FALSE])) == TRUE.with_error(
ERR
)

def test_choose_tainted(self) -> None:
assert TriggerResult.choose_tainted(TRUE.with_error(ERR), FALSE) == TRUE.with_error(ERR)
assert TriggerResult.choose_tainted(TRUE, FALSE.with_error(ERR)) == FALSE.with_error(ERR)
assert TriggerResult.choose_tainted(TRUE, FALSE) == TRUE
assert TriggerResult.choose_tainted(FALSE, TRUE) == FALSE
Loading
Loading