Skip to content

Commit e990888

Browse files
authored
ref(aci): enqueue project ids to sharded list in process_workflows (#97641)
1 parent 037153b commit e990888

File tree

5 files changed

+69
-50
lines changed

5 files changed

+69
-50
lines changed

src/sentry/workflow_engine/processors/workflow.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import random
12
from dataclasses import asdict, dataclass, replace
23
from datetime import datetime
34
from enum import StrEnum
@@ -9,9 +10,9 @@
910
from django.utils import timezone
1011

1112
from sentry import buffer, features
13+
from sentry.eventstore.models import GroupEvent
1214
from sentry.models.activity import Activity
1315
from sentry.models.environment import Environment
14-
from sentry.services.eventstore.models import GroupEvent
1516
from sentry.utils import json
1617
from sentry.workflow_engine.models import Action, DataConditionGroup, Detector, Workflow
1718
from sentry.workflow_engine.models.workflow_data_condition_group import WorkflowDataConditionGroup
@@ -30,7 +31,6 @@
3031

3132
logger = log_context.get_logger(__name__)
3233

33-
WORKFLOW_ENGINE_BUFFER_LIST_KEY = "workflow_engine_delayed_processing_buffer"
3434
DetectorId = int | None
3535
DataConditionGroupId = int
3636

@@ -99,6 +99,8 @@ def buffer_value(self) -> str:
9999
def enqueue_workflows(
100100
items_by_workflow: dict[Workflow, DelayedWorkflowItem],
101101
) -> None:
102+
from sentry.workflow_engine.tasks.delayed_workflows import DelayedWorkflow
103+
102104
items_by_project_id = DefaultDict[int, list[DelayedWorkflowItem]](list)
103105
for queue_item in items_by_workflow.values():
104106
if not queue_item.delayed_if_group_ids and not queue_item.passing_if_group_ids:
@@ -125,9 +127,8 @@ def enqueue_workflows(
125127

126128
sentry_sdk.set_tag("delayed_workflow_items", items)
127129

128-
buffer.backend.push_to_sorted_set(
129-
key=WORKFLOW_ENGINE_BUFFER_LIST_KEY, value=list(items_by_project_id.keys())
130-
)
130+
sharded_key = random.choice(DelayedWorkflow.get_buffer_keys())
131+
buffer.backend.push_to_sorted_set(key=sharded_key, value=list(items_by_project_id.keys()))
131132

132133
logger.debug(
133134
"workflow_engine.workflows.enqueued",

src/sentry/workflow_engine/tasks/delayed_workflows.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
from sentry.taskworker.namespaces import workflow_engine_tasks
1818
from sentry.taskworker.retry import Retry
1919
from sentry.workflow_engine.models import Workflow
20-
from sentry.workflow_engine.processors.workflow import WORKFLOW_ENGINE_BUFFER_LIST_KEY
2120
from sentry.workflow_engine.utils import log_context
2221

2322
logger = log_context.get_logger("sentry.workflow_engine.tasks.delayed_workflows")
@@ -52,7 +51,8 @@ def process_delayed_workflows_shim(
5251

5352
@delayed_processing_registry.register("delayed_workflow")
5453
class DelayedWorkflow(DelayedProcessingBase):
55-
buffer_key = WORKFLOW_ENGINE_BUFFER_LIST_KEY
54+
buffer_key = "workflow_engine_delayed_processing_buffer"
55+
buffer_shards = 8
5656
option = "delayed_workflow.rollout"
5757

5858
@property

tests/sentry/workflow_engine/processors/test_delayed_workflow.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
get_group_to_groupevent,
6161
get_groups_to_fire,
6262
)
63-
from sentry.workflow_engine.processors.workflow import WORKFLOW_ENGINE_BUFFER_LIST_KEY
63+
from sentry.workflow_engine.tasks.delayed_workflows import DelayedWorkflow
6464
from tests.sentry.workflow_engine.test_base import BaseWorkflowTest
6565
from tests.snuba.rules.conditions.test_event_frequency import BaseEventFrequencyPercentTest
6666

@@ -119,12 +119,8 @@ def setUp(self) -> None:
119119
self.mock_redis_buffer = mock_redis_buffer()
120120
self.mock_redis_buffer.__enter__()
121121

122-
buffer.backend.push_to_sorted_set(
123-
key=WORKFLOW_ENGINE_BUFFER_LIST_KEY, value=self.project.id
124-
)
125-
buffer.backend.push_to_sorted_set(
126-
key=WORKFLOW_ENGINE_BUFFER_LIST_KEY, value=self.project2.id
127-
)
122+
buffer.backend.push_to_sorted_set(key=DelayedWorkflow.buffer_key, value=self.project.id)
123+
buffer.backend.push_to_sorted_set(key=DelayedWorkflow.buffer_key, value=self.project2.id)
128124

129125
def tearDown(self):
130126
self.mock_redis_buffer.__exit__(None, None, None)

tests/sentry/workflow_engine/processors/test_workflow.py

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@
2424
)
2525
from sentry.workflow_engine.models.data_condition import Condition
2626
from sentry.workflow_engine.processors.workflow import (
27-
WORKFLOW_ENGINE_BUFFER_LIST_KEY,
2827
DelayedWorkflowItem,
2928
delete_workflow,
3029
enqueue_workflows,
3130
evaluate_workflow_triggers,
3231
evaluate_workflows_action_filters,
3332
process_workflows,
3433
)
34+
from sentry.workflow_engine.tasks.delayed_workflows import DelayedWorkflow
3535
from sentry.workflow_engine.types import WorkflowEventData
3636
from tests.sentry.workflow_engine.test_base import BaseWorkflowTest
3737

@@ -414,6 +414,8 @@ def setUp(self) -> None:
414414
)
415415
self.event_data = WorkflowEventData(event=self.group_event, group=self.group)
416416
self.action_group, _ = self.create_workflow_action(self.workflow)
417+
418+
self.buffer_keys = DelayedWorkflow.get_buffer_keys()
417419
self.mock_redis_buffer = mock_redis_buffer()
418420
self.mock_redis_buffer.__enter__()
419421

@@ -438,11 +440,12 @@ def test_enqueues_workflow_all_logic_type(self) -> None:
438440

439441
process_workflows(self.event_data)
440442

441-
project_ids = buffer.backend.get_sorted_set(
442-
WORKFLOW_ENGINE_BUFFER_LIST_KEY, 0, self.buffer_timestamp
443+
project_ids = buffer.backend.bulk_get_sorted_set(
444+
self.buffer_keys,
445+
min=0,
446+
max=self.buffer_timestamp,
443447
)
444-
assert project_ids
445-
assert project_ids[0][0] == self.project.id
448+
assert list(project_ids.keys()) == [self.project.id]
446449

447450
def test_enqueues_workflow_any_logic_type(self) -> None:
448451
assert self.workflow.when_condition_group
@@ -469,10 +472,12 @@ def test_enqueues_workflow_any_logic_type(self) -> None:
469472

470473
process_workflows(self.event_data)
471474

472-
project_ids = buffer.backend.get_sorted_set(
473-
WORKFLOW_ENGINE_BUFFER_LIST_KEY, 0, self.buffer_timestamp
475+
project_ids = buffer.backend.bulk_get_sorted_set(
476+
self.buffer_keys,
477+
min=0,
478+
max=self.buffer_timestamp,
474479
)
475-
assert project_ids[0][0] == self.project.id
480+
assert list(project_ids.keys()) == [self.project.id]
476481

477482
def test_skips_enqueuing_any(self) -> None:
478483
# skips slow conditions if the condition group evaluates to True without evaluating them
@@ -549,10 +554,12 @@ def test_enqueues_with_when_and_if_slow_conditions(self) -> None:
549554

550555
process_workflows(self.event_data)
551556

552-
project_ids = buffer.backend.get_sorted_set(
553-
WORKFLOW_ENGINE_BUFFER_LIST_KEY, 0, self.buffer_timestamp
557+
project_ids = buffer.backend.bulk_get_sorted_set(
558+
self.buffer_keys,
559+
min=0,
560+
max=self.buffer_timestamp,
554561
)
555-
assert project_ids[0][0] == self.project.id
562+
assert list(project_ids.keys()) == [self.project.id]
556563

557564
def test_enqueues_event_if_meets_fast_conditions(self) -> None:
558565
assert self.workflow.when_condition_group
@@ -576,8 +583,10 @@ def test_enqueues_event_if_meets_fast_conditions(self) -> None:
576583

577584
process_workflows(self.event_data)
578585

579-
project_ids = buffer.backend.get_sorted_set(
580-
WORKFLOW_ENGINE_BUFFER_LIST_KEY, 0, self.buffer_timestamp
586+
project_ids = buffer.backend.bulk_get_sorted_set(
587+
self.buffer_keys,
588+
min=0,
589+
max=self.buffer_timestamp,
581590
)
582591
assert not project_ids
583592

@@ -586,10 +595,12 @@ def test_enqueues_event_if_meets_fast_conditions(self) -> None:
586595

587596
process_workflows(self.event_data)
588597

589-
project_ids = buffer.backend.get_sorted_set(
590-
WORKFLOW_ENGINE_BUFFER_LIST_KEY, 0, self.buffer_timestamp
598+
project_ids = buffer.backend.bulk_get_sorted_set(
599+
self.buffer_keys,
600+
min=0,
601+
max=self.buffer_timestamp,
591602
)
592-
assert project_ids[0][0] == self.project.id
603+
assert list(project_ids.keys()) == [self.project.id]
593604

594605

595606
@freeze_time(FROZEN_TIME)
@@ -609,6 +620,7 @@ def setUp(self) -> None:
609620
occurrence=self.build_occurrence(evidence_data={"detector_id": self.detector.id})
610621
)
611622
self.event_data = WorkflowEventData(event=self.group_event, group=self.group)
623+
self.buffer_keys = DelayedWorkflow.get_buffer_keys()
612624

613625
@patch("sentry.utils.metrics.incr")
614626
def test_metrics_issue_dual_processing_metrics(self, mock_incr: MagicMock) -> None:
@@ -736,11 +748,12 @@ def test_enqueues_when_slow_conditions(self):
736748

737749
enqueue_workflows(queue_items)
738750

739-
project_ids = buffer.backend.get_sorted_set(
740-
WORKFLOW_ENGINE_BUFFER_LIST_KEY, 0, timezone.now().timestamp()
751+
project_ids = buffer.backend.bulk_get_sorted_set(
752+
self.buffer_keys,
753+
min=0,
754+
max=timezone.now().timestamp(),
741755
)
742-
assert project_ids
743-
assert project_ids[0][0] == self.project.id
756+
assert list(project_ids.keys()) == [self.project.id]
744757

745758

746759
class TestEnqueueWorkflows(BaseWorkflowTest):
@@ -774,9 +787,11 @@ def setUp(self) -> None:
774787

775788
@patch("sentry.buffer.backend.push_to_sorted_set")
776789
@patch("sentry.buffer.backend.push_to_hash_bulk")
790+
@patch("random.choice")
777791
def test_enqueue_workflows__adds_to_workflow_engine_buffer(
778-
self, mock_push_to_hash_bulk, mock_push_to_sorted_set
792+
self, mock_randchoice, mock_push_to_hash_bulk, mock_push_to_sorted_set
779793
):
794+
mock_randchoice.return_value = f"{DelayedWorkflow.buffer_key}:{5}"
780795
enqueue_workflows(
781796
{
782797
self.workflow: DelayedWorkflowItem(
@@ -791,7 +806,7 @@ def test_enqueue_workflows__adds_to_workflow_engine_buffer(
791806
)
792807

793808
mock_push_to_sorted_set.assert_called_once_with(
794-
key=WORKFLOW_ENGINE_BUFFER_LIST_KEY,
809+
key=f"{DelayedWorkflow.buffer_key}:{5}",
795810
value=[self.group_event.project_id],
796811
)
797812

tests/sentry/workflow_engine/test_integration.py

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from sentry.workflow_engine.models.data_condition import Condition
2525
from sentry.workflow_engine.processors import process_data_source, process_detectors
2626
from sentry.workflow_engine.processors.delayed_workflow import process_delayed_workflows
27-
from sentry.workflow_engine.processors.workflow import WORKFLOW_ENGINE_BUFFER_LIST_KEY
27+
from sentry.workflow_engine.tasks.delayed_workflows import DelayedWorkflow
2828
from sentry.workflow_engine.types import DetectorPriorityLevel
2929
from tests.sentry.workflow_engine.test_base import BaseWorkflowTest
3030

@@ -213,6 +213,7 @@ def setUp(self) -> None:
213213
)
214214
self.workflow_triggers.conditions.all().delete()
215215
self.action_group, self.action = self.create_workflow_action(workflow=self.workflow)
216+
self.buffer_keys = DelayedWorkflow.get_buffer_keys()
216217

217218
@pytest.fixture(autouse=True)
218219
def with_feature_flags(self):
@@ -409,8 +410,8 @@ def test_slow_condition_workflow_with_conditions(self, mock_trigger: MagicMock)
409410
self.post_process_error(event_1, is_new=True)
410411
assert not mock_trigger.called
411412

412-
project_ids = buffer.backend.get_sorted_set(
413-
WORKFLOW_ENGINE_BUFFER_LIST_KEY, 0, timezone.now().timestamp()
413+
project_ids = buffer.backend.bulk_get_sorted_set(
414+
self.buffer_keys, 0, timezone.now().timestamp()
414415
)
415416
assert not project_ids
416417

@@ -419,8 +420,8 @@ def test_slow_condition_workflow_with_conditions(self, mock_trigger: MagicMock)
419420
self.post_process_error(event_2, is_new=True)
420421
assert not mock_trigger.called
421422

422-
project_ids = buffer.backend.get_sorted_set(
423-
WORKFLOW_ENGINE_BUFFER_LIST_KEY, 0, timezone.now().timestamp()
423+
project_ids = buffer.backend.bulk_get_sorted_set(
424+
self.buffer_keys, 0, timezone.now().timestamp()
424425
)
425426
assert not project_ids
426427

@@ -433,11 +434,13 @@ def test_slow_condition_workflow_with_conditions(self, mock_trigger: MagicMock)
433434
self.post_process_error(event_5)
434435
assert not mock_trigger.called
435436

436-
project_ids = buffer.backend.get_sorted_set(
437-
WORKFLOW_ENGINE_BUFFER_LIST_KEY, 0, timezone.now().timestamp()
437+
project_ids = buffer.backend.bulk_get_sorted_set(
438+
self.buffer_keys,
439+
min=0,
440+
max=timezone.now().timestamp(),
438441
)
439442

440-
process_delayed_workflows(project_ids[0][0])
443+
process_delayed_workflows(list(project_ids.keys())[0])
441444
mock_trigger.assert_called_once()
442445

443446
def test_slow_condition_subqueries(self, mock_trigger: MagicMock) -> None:
@@ -474,11 +477,13 @@ def test_slow_condition_subqueries(self, mock_trigger: MagicMock) -> None:
474477
self.post_process_error(event_3)
475478
assert not mock_trigger.called
476479

477-
project_ids = buffer.backend.get_sorted_set(
478-
WORKFLOW_ENGINE_BUFFER_LIST_KEY, 0, timezone.now().timestamp()
480+
project_ids = buffer.backend.bulk_get_sorted_set(
481+
self.buffer_keys,
482+
min=0,
483+
max=timezone.now().timestamp(),
479484
)
480485

481-
process_delayed_workflows(project_ids[0][0])
486+
process_delayed_workflows(list(project_ids.keys())[0])
482487
assert not mock_trigger.called
483488

484489
with freeze_time(now + timedelta(minutes=1)):
@@ -487,9 +492,11 @@ def test_slow_condition_subqueries(self, mock_trigger: MagicMock) -> None:
487492
self.post_process_error(event_4)
488493
assert not mock_trigger.called
489494

490-
project_ids = buffer.backend.get_sorted_set(
491-
WORKFLOW_ENGINE_BUFFER_LIST_KEY, 0, timezone.now().timestamp()
495+
project_ids = buffer.backend.bulk_get_sorted_set(
496+
self.buffer_keys,
497+
min=0,
498+
max=timezone.now().timestamp(),
492499
)
493500

494-
process_delayed_workflows(project_ids[0][0])
501+
process_delayed_workflows(list(project_ids.keys())[0])
495502
mock_trigger.assert_called_once()

0 commit comments

Comments
 (0)