Skip to content

Commit 3b43533

Browse files
authored
fix(aci): record WorkflowFireHistory before deduping actions (#97746)
1 parent 64a1848 commit 3b43533

File tree

5 files changed

+59
-21
lines changed

5 files changed

+59
-21
lines changed

src/sentry/workflow_engine/processors/action.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
Workflow,
2929
WorkflowActionGroupStatus,
3030
)
31+
from sentry.workflow_engine.models.detector import Detector
3132
from sentry.workflow_engine.registry import action_handler_registry
33+
from sentry.workflow_engine.tasks.actions import build_trigger_action_task_params, trigger_action
3234
from sentry.workflow_engine.types import WorkflowEventData
3335

3436
logger = logging.getLogger(__name__)
@@ -137,6 +139,15 @@ def deduplicate_actions(
137139
return actions_queryset.filter(id__in=dedup_key_to_action_id.values())
138140

139141

142+
def fire_actions(
143+
actions: BaseQuerySet[Action], detector: Detector, event_data: WorkflowEventData
144+
) -> None:
145+
deduped_actions = deduplicate_actions(actions)
146+
for action in deduped_actions:
147+
task_params = build_trigger_action_task_params(action, detector, event_data)
148+
trigger_action.apply_async(kwargs=task_params, headers={"sentry-propagate-traces": False})
149+
150+
140151
def filter_recently_fired_workflow_actions(
141152
filtered_action_groups: set[DataConditionGroup], event_data: WorkflowEventData
142153
) -> BaseQuerySet[Action]:
@@ -182,12 +193,10 @@ def filter_recently_fired_workflow_actions(
182193
for action_id, workflow_id in action_to_workflow_ids.items()
183194
]
184195

185-
decorated_actions = actions_queryset.annotate(
196+
return actions_queryset.annotate(
186197
workflow_id=Case(*workflow_id_cases, output_field=models.IntegerField()),
187198
)
188199

189-
return deduplicate_actions(decorated_actions)
190-
191200

192201
def get_available_action_integrations_for_org(organization: Organization) -> list[RpcIntegration]:
193202
providers = [

src/sentry/workflow_engine/processors/delayed_workflow.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,17 @@
4444
SLOW_CONDITIONS,
4545
Condition,
4646
)
47-
from sentry.workflow_engine.processors.action import filter_recently_fired_workflow_actions
47+
from sentry.workflow_engine.processors.action import (
48+
filter_recently_fired_workflow_actions,
49+
fire_actions,
50+
)
4851
from sentry.workflow_engine.processors.data_condition_group import (
4952
evaluate_data_conditions,
5053
get_slow_conditions_for_groups,
5154
)
5255
from sentry.workflow_engine.processors.detector import get_detectors_by_groupevents_bulk
5356
from sentry.workflow_engine.processors.log_util import track_batch_performance
5457
from sentry.workflow_engine.processors.workflow_fire_history import create_workflow_fire_histories
55-
from sentry.workflow_engine.tasks.actions import build_trigger_action_task_params, trigger_action
5658
from sentry.workflow_engine.types import WorkflowEventData
5759
from sentry.workflow_engine.utils import log_context
5860

@@ -747,14 +749,7 @@ def fire_actions_for_groups(
747749
total_actions += len(filtered_actions)
748750

749751
if should_trigger_actions(group_event.group.type):
750-
for action in filtered_actions:
751-
# TODO: populate workflow env in WorkflowEventData correctly
752-
task_params = build_trigger_action_task_params(
753-
action, detector, workflow_event_data
754-
)
755-
trigger_action.apply_async(
756-
kwargs=task_params, headers={"sentry-propagate-traces": False}
757-
)
752+
fire_actions(filtered_actions, detector, workflow_event_data)
758753

759754
logger.info(
760755
"workflow_engine.delayed_workflow.triggered_actions_summary",

src/sentry/workflow_engine/processors/workflow.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,17 @@
1616
from sentry.utils import json
1717
from sentry.workflow_engine.models import Action, DataConditionGroup, Detector, Workflow
1818
from sentry.workflow_engine.models.workflow_data_condition_group import WorkflowDataConditionGroup
19-
from sentry.workflow_engine.processors.action import filter_recently_fired_workflow_actions
19+
from sentry.workflow_engine.processors.action import (
20+
filter_recently_fired_workflow_actions,
21+
fire_actions,
22+
)
2023
from sentry.workflow_engine.processors.contexts.workflow_event_context import (
2124
WorkflowEventContext,
2225
WorkflowEventContextData,
2326
)
2427
from sentry.workflow_engine.processors.data_condition_group import process_data_condition_group
2528
from sentry.workflow_engine.processors.detector import get_detector_by_event
2629
from sentry.workflow_engine.processors.workflow_fire_history import create_workflow_fire_histories
27-
from sentry.workflow_engine.tasks.actions import build_trigger_action_task_params, trigger_action
2830
from sentry.workflow_engine.types import WorkflowEventData
2931
from sentry.workflow_engine.utils import log_context
3032
from sentry.workflow_engine.utils.metrics import metrics_incr
@@ -458,13 +460,9 @@ def process_workflows(
458460
return triggered_workflows
459461

460462
should_trigger_actions = should_fire_workflow_actions(organization, event_data.group.type)
461-
462463
create_workflow_fire_histories(
463464
detector, actions, event_data, should_trigger_actions, is_delayed=False
464465
)
465-
466-
for action in actions:
467-
task_params = build_trigger_action_task_params(action, detector, event_data)
468-
trigger_action.apply_async(kwargs=task_params, headers={"sentry-propagate-traces": False})
466+
fire_actions(actions, detector, event_data)
469467

470468
return triggered_workflows

tests/sentry/workflow_engine/processors/test_workflow.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
Workflow,
2424
)
2525
from sentry.workflow_engine.models.data_condition import Condition
26+
from sentry.workflow_engine.models.workflow_fire_history import WorkflowFireHistory
2627
from sentry.workflow_engine.processors.workflow import (
2728
DelayedWorkflowItem,
2829
delete_workflow,
@@ -271,6 +272,41 @@ def test_metrics_triggered_workflows(self, mock_incr: MagicMock) -> None:
271272
tags={"detector_type": self.error_detector.type},
272273
)
273274

275+
@patch("sentry.workflow_engine.processors.action.trigger_action.apply_async")
276+
def test_workflow_fire_history_with_action_deduping(
277+
self, mock_trigger_action: MagicMock
278+
) -> None:
279+
"""Fire a single action, but record that it was fired for multiple workflows"""
280+
281+
self.action_group, self.action = self.create_workflow_action(workflow=self.error_workflow)
282+
283+
error_workflow_2 = self.create_workflow(
284+
name="error_workflow_2",
285+
when_condition_group=self.create_data_condition_group(),
286+
)
287+
self.create_detector_workflow(
288+
detector=self.error_detector,
289+
workflow=error_workflow_2,
290+
)
291+
self.action_group_2, self.action_2 = self.create_workflow_action(workflow=error_workflow_2)
292+
293+
error_workflow_3 = self.create_workflow(
294+
name="error_workflow_3",
295+
when_condition_group=self.create_data_condition_group(),
296+
)
297+
self.create_detector_workflow(
298+
detector=self.error_detector,
299+
workflow=error_workflow_3,
300+
)
301+
self.action_group_3, self.action_3 = self.create_workflow_action(workflow=error_workflow_3)
302+
303+
process_workflows(self.event_data)
304+
305+
assert WorkflowFireHistory.objects.count() == 3
306+
assert (
307+
mock_trigger_action.call_count == 1
308+
) # TODO: determine if this is ideal, the actions are duped but not in the same workflows
309+
274310

275311
@mock_redis_buffer()
276312
class TestEvaluateWorkflowTriggers(BaseWorkflowTest):

tests/sentry/workflow_engine/test_integration.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ def test_workflow_engine__workflows__no_flag(self) -> None:
198198
mock_process_workflow.assert_not_called()
199199

200200

201-
@mock.patch("sentry.workflow_engine.processors.workflow.trigger_action.apply_async")
201+
@mock.patch("sentry.workflow_engine.processors.action.trigger_action.apply_async")
202202
@mock_redis_buffer()
203203
class TestWorkflowEngineIntegrationFromErrorPostProcess(BaseWorkflowIntegrationTest):
204204
def setUp(self) -> None:

0 commit comments

Comments
 (0)