24
24
)
25
25
from sentry .workflow_engine .models .data_condition import Condition
26
26
from sentry .workflow_engine .processors .workflow import (
27
- WORKFLOW_ENGINE_BUFFER_LIST_KEY ,
28
27
DelayedWorkflowItem ,
29
28
delete_workflow ,
30
29
enqueue_workflows ,
31
30
evaluate_workflow_triggers ,
32
31
evaluate_workflows_action_filters ,
33
32
process_workflows ,
34
33
)
34
+ from sentry .workflow_engine .tasks .delayed_workflows import DelayedWorkflow
35
35
from sentry .workflow_engine .types import WorkflowEventData
36
36
from tests .sentry .workflow_engine .test_base import BaseWorkflowTest
37
37
@@ -414,6 +414,8 @@ def setUp(self) -> None:
414
414
)
415
415
self .event_data = WorkflowEventData (event = self .group_event , group = self .group )
416
416
self .action_group , _ = self .create_workflow_action (self .workflow )
417
+
418
+ self .buffer_keys = DelayedWorkflow .get_buffer_keys ()
417
419
self .mock_redis_buffer = mock_redis_buffer ()
418
420
self .mock_redis_buffer .__enter__ ()
419
421
@@ -438,11 +440,12 @@ def test_enqueues_workflow_all_logic_type(self) -> None:
438
440
439
441
process_workflows (self .event_data )
440
442
441
- project_ids = buffer .backend .get_sorted_set (
442
- WORKFLOW_ENGINE_BUFFER_LIST_KEY , 0 , self .buffer_timestamp
443
+ project_ids = buffer .backend .bulk_get_sorted_set (
444
+ self .buffer_keys ,
445
+ min = 0 ,
446
+ max = self .buffer_timestamp ,
443
447
)
444
- assert project_ids
445
- assert project_ids [0 ][0 ] == self .project .id
448
+ assert list (project_ids .keys ()) == [self .project .id ]
446
449
447
450
def test_enqueues_workflow_any_logic_type (self ) -> None :
448
451
assert self .workflow .when_condition_group
@@ -469,10 +472,12 @@ def test_enqueues_workflow_any_logic_type(self) -> None:
469
472
470
473
process_workflows (self .event_data )
471
474
472
- project_ids = buffer .backend .get_sorted_set (
473
- WORKFLOW_ENGINE_BUFFER_LIST_KEY , 0 , self .buffer_timestamp
475
+ project_ids = buffer .backend .bulk_get_sorted_set (
476
+ self .buffer_keys ,
477
+ min = 0 ,
478
+ max = self .buffer_timestamp ,
474
479
)
475
- assert project_ids [ 0 ][ 0 ] == self .project .id
480
+ assert list ( project_ids . keys ()) == [ self .project .id ]
476
481
477
482
def test_skips_enqueuing_any (self ) -> None :
478
483
# skips slow conditions if the condition group evaluates to True without evaluating them
@@ -549,10 +554,12 @@ def test_enqueues_with_when_and_if_slow_conditions(self) -> None:
549
554
550
555
process_workflows (self .event_data )
551
556
552
- project_ids = buffer .backend .get_sorted_set (
553
- WORKFLOW_ENGINE_BUFFER_LIST_KEY , 0 , self .buffer_timestamp
557
+ project_ids = buffer .backend .bulk_get_sorted_set (
558
+ self .buffer_keys ,
559
+ min = 0 ,
560
+ max = self .buffer_timestamp ,
554
561
)
555
- assert project_ids [ 0 ][ 0 ] == self .project .id
562
+ assert list ( project_ids . keys ()) == [ self .project .id ]
556
563
557
564
def test_enqueues_event_if_meets_fast_conditions (self ) -> None :
558
565
assert self .workflow .when_condition_group
@@ -576,8 +583,10 @@ def test_enqueues_event_if_meets_fast_conditions(self) -> None:
576
583
577
584
process_workflows (self .event_data )
578
585
579
- project_ids = buffer .backend .get_sorted_set (
580
- WORKFLOW_ENGINE_BUFFER_LIST_KEY , 0 , self .buffer_timestamp
586
+ project_ids = buffer .backend .bulk_get_sorted_set (
587
+ self .buffer_keys ,
588
+ min = 0 ,
589
+ max = self .buffer_timestamp ,
581
590
)
582
591
assert not project_ids
583
592
@@ -586,10 +595,12 @@ def test_enqueues_event_if_meets_fast_conditions(self) -> None:
586
595
587
596
process_workflows (self .event_data )
588
597
589
- project_ids = buffer .backend .get_sorted_set (
590
- WORKFLOW_ENGINE_BUFFER_LIST_KEY , 0 , self .buffer_timestamp
598
+ project_ids = buffer .backend .bulk_get_sorted_set (
599
+ self .buffer_keys ,
600
+ min = 0 ,
601
+ max = self .buffer_timestamp ,
591
602
)
592
- assert project_ids [ 0 ][ 0 ] == self .project .id
603
+ assert list ( project_ids . keys ()) == [ self .project .id ]
593
604
594
605
595
606
@freeze_time (FROZEN_TIME )
@@ -609,6 +620,7 @@ def setUp(self) -> None:
609
620
occurrence = self .build_occurrence (evidence_data = {"detector_id" : self .detector .id })
610
621
)
611
622
self .event_data = WorkflowEventData (event = self .group_event , group = self .group )
623
+ self .buffer_keys = DelayedWorkflow .get_buffer_keys ()
612
624
613
625
@patch ("sentry.utils.metrics.incr" )
614
626
def test_metrics_issue_dual_processing_metrics (self , mock_incr : MagicMock ) -> None :
@@ -736,11 +748,12 @@ def test_enqueues_when_slow_conditions(self):
736
748
737
749
enqueue_workflows (queue_items )
738
750
739
- project_ids = buffer .backend .get_sorted_set (
740
- WORKFLOW_ENGINE_BUFFER_LIST_KEY , 0 , timezone .now ().timestamp ()
751
+ project_ids = buffer .backend .bulk_get_sorted_set (
752
+ self .buffer_keys ,
753
+ min = 0 ,
754
+ max = timezone .now ().timestamp (),
741
755
)
742
- assert project_ids
743
- assert project_ids [0 ][0 ] == self .project .id
756
+ assert list (project_ids .keys ()) == [self .project .id ]
744
757
745
758
746
759
class TestEnqueueWorkflows (BaseWorkflowTest ):
@@ -774,9 +787,11 @@ def setUp(self) -> None:
774
787
775
788
@patch ("sentry.buffer.backend.push_to_sorted_set" )
776
789
@patch ("sentry.buffer.backend.push_to_hash_bulk" )
790
+ @patch ("random.choice" )
777
791
def test_enqueue_workflows__adds_to_workflow_engine_buffer (
778
- self , mock_push_to_hash_bulk , mock_push_to_sorted_set
792
+ self , mock_randchoice , mock_push_to_hash_bulk , mock_push_to_sorted_set
779
793
):
794
+ mock_randchoice .return_value = f"{ DelayedWorkflow .buffer_key } :{ 5 } "
780
795
enqueue_workflows (
781
796
{
782
797
self .workflow : DelayedWorkflowItem (
@@ -791,7 +806,7 @@ def test_enqueue_workflows__adds_to_workflow_engine_buffer(
791
806
)
792
807
793
808
mock_push_to_sorted_set .assert_called_once_with (
794
- key = WORKFLOW_ENGINE_BUFFER_LIST_KEY ,
809
+ key = f" { DelayedWorkflow . buffer_key } : { 5 } " ,
795
810
value = [self .group_event .project_id ],
796
811
)
797
812
0 commit comments