diff --git a/src/sentry/buffer/base.py b/src/sentry/buffer/base.py index bc815125de808e..e6932e8779f78a 100644 --- a/src/sentry/buffer/base.py +++ b/src/sentry/buffer/base.py @@ -30,7 +30,6 @@ class Buffer(Service): "incr", "process", "process_pending", - "process_batch", "validate", "push_to_sorted_set", "push_to_hash", @@ -148,9 +147,6 @@ def incr( def process_pending(self) -> None: return - def process_batch(self) -> None: - return - def process( self, model: type[models.Model] | None, diff --git a/src/sentry/buffer/redis.py b/src/sentry/buffer/redis.py index 7188afd0af7ea1..8349ba4aea9184 100644 --- a/src/sentry/buffer/redis.py +++ b/src/sentry/buffer/redis.py @@ -57,32 +57,6 @@ def _validate_json_roundtrip(value: dict[str, Any], model: type[models.Model]) - logger.exception("buffer.invalid_value", extra={"value": value, "model": model}) -class BufferHookEvent(Enum): - FLUSH = "flush" - - -class BufferHookRegistry: - def __init__(self, *args: Any, **kwargs: Any) -> None: - self._registry: dict[BufferHookEvent, Callable[..., Any]] = {} - - def add_handler(self, key: BufferHookEvent, func: Callable[..., Any]) -> None: - self._registry[key] = func - - def has(self, key: BufferHookEvent) -> bool: - return self._registry.get(key) is not None - - def callback(self, buffer_hook_event: BufferHookEvent) -> bool: - try: - callback = self._registry[buffer_hook_event] - except KeyError: - logger.exception("buffer_hook_event.missing") - - return callback() - - -redis_buffer_registry = BufferHookRegistry() - - # Callable to get the queue name for the given model_key. # May return None to not assign a queue for the given model_key. ChooseQueueFunction = Callable[[str], str | None] @@ -495,12 +469,6 @@ def get_hash_length(self, model: type[models.Model], field: dict[str, BufferFiel key = self._make_key(model, field) return self._execute_redis_operation(key, RedisOperation.HASH_LENGTH) - def process_batch(self) -> None: - try: - redis_buffer_registry.callback(BufferHookEvent.FLUSH) - except Exception: - logger.exception("process_batch.error") - def incr( self, model: type[models.Model], diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index b2bc321239c5ba..cfe0ba38b8e6c4 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -2088,6 +2088,10 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]: SENTRY_BUFFER = "sentry.buffer.Buffer" SENTRY_BUFFER_OPTIONS: dict[str, str] = {} +# Workflow Buffer backend +SENTRY_WORKFLOW_BUFFER = "sentry.buffer.Buffer" +SENTRY_WORKFLOW_BUFFER_OPTIONS: dict[str, str] = {} + # Cache backend # XXX: We explicitly require the cache to be configured as its not optional # and causes serious confusion with the default django cache diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index 0a4ee79fa718a4..1ae8bd5c7e5b42 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -3154,6 +3154,12 @@ default=[], flags=FLAG_AUTOMATOR_MODIFIABLE, ) +register( + "workflow_engine.buffer.use_new_buffer", + type=Bool, + default=False, + flags=FLAG_AUTOMATOR_MODIFIABLE, +) # Restrict uptime issue creation for specific host provider identifiers. Items # in this list map to the `host_provider_id` column in the UptimeSubscription diff --git a/src/sentry/rules/processing/buffer_processing.py b/src/sentry/rules/processing/buffer_processing.py index 3430461cbd4625..97ecd3f5987cfa 100644 --- a/src/sentry/rules/processing/buffer_processing.py +++ b/src/sentry/rules/processing/buffer_processing.py @@ -9,11 +9,11 @@ from celery import Task -from sentry import buffer, options -from sentry.buffer.base import BufferField -from sentry.buffer.redis import BufferHookEvent, redis_buffer_registry +from sentry import options +from sentry.buffer.base import Buffer, BufferField from sentry.db import models from sentry.utils import metrics +from sentry.utils.lazy_service_wrapper import LazyServiceWrapper from sentry.utils.registry import NoRegistrationExistsError, Registry logger = logging.getLogger("sentry.delayed_processing") @@ -56,12 +56,19 @@ def get_buffer_keys(cls) -> list[str]: for shard in range(cls.buffer_shards) ] + @staticmethod + def buffer_backend() -> LazyServiceWrapper[Buffer]: + raise NotImplementedError + delayed_processing_registry = Registry[type[DelayedProcessingBase]]() def fetch_group_to_event_data( - project_id: int, model: type[models.Model], batch_key: str | None = None + buffer: LazyServiceWrapper[Buffer], + project_id: int, + model: type[models.Model], + batch_key: str | None = None, ) -> dict[str, str]: field: dict[str, models.Model | int | str] = { "project_id": project_id, @@ -70,7 +77,7 @@ def fetch_group_to_event_data( if batch_key: field["batch_key"] = batch_key - return buffer.backend.get_hash(model=model, field=field) + return buffer.get_hash(model=model, field=field) def bucket_num_groups(num_groups: int) -> str: @@ -80,7 +87,9 @@ def bucket_num_groups(num_groups: int) -> str: return "1" -def process_in_batches(project_id: int, processing_type: str) -> None: +def process_in_batches( + buffer: LazyServiceWrapper[Buffer], project_id: int, processing_type: str +) -> None: """ This will check the number of alertgroup_to_event_data items in the Redis buffer for a project. @@ -109,7 +118,7 @@ def process_in_batches(project_id: int, processing_type: str) -> None: task = processing_info.processing_task filters: dict[str, BufferField] = asdict(hash_args.filters) - event_count = buffer.backend.get_hash_length(model=hash_args.model, field=filters) + event_count = buffer.get_hash_length(model=hash_args.model, field=filters) metrics.incr( f"{processing_type}.num_groups", tags={"num_groups": bucket_num_groups(event_count)} ) @@ -127,7 +136,7 @@ def process_in_batches(project_id: int, processing_type: str) -> None: ) # if the dictionary is large, get the items and chunk them. - alertgroup_to_event_data = fetch_group_to_event_data(project_id, hash_args.model) + alertgroup_to_event_data = fetch_group_to_event_data(buffer, project_id, hash_args.model) with metrics.timer(f"{processing_type}.process_batch.duration"): items = iter(alertgroup_to_event_data.items()) @@ -135,14 +144,14 @@ def process_in_batches(project_id: int, processing_type: str) -> None: while batch := dict(islice(items, batch_size)): batch_key = str(uuid.uuid4()) - buffer.backend.push_to_hash_bulk( + buffer.push_to_hash_bulk( model=hash_args.model, filters={**filters, "batch_key": batch_key}, data=batch, ) # remove the batched items from the project alertgroup_to_event_data - buffer.backend.delete_hash(**asdict(hash_args), fields=list(batch.keys())) + buffer.delete_hash(**asdict(hash_args), fields=list(batch.keys())) task.apply_async( kwargs={"project_id": project_id, "batch_key": batch_key}, @@ -159,6 +168,8 @@ def process_buffer() -> None: logger.info(log_name, extra={"option": handler.option}) continue + buffer = handler.buffer_backend() + with metrics.timer(f"{processing_type}.process_all_conditions.duration"): # We need to use a very fresh timestamp here; project scores (timestamps) are # updated with each relevant event, and some can be updated every few milliseconds. @@ -167,7 +178,7 @@ def process_buffer() -> None: # retrieved and processed here. fetch_time = datetime.now(tz=timezone.utc).timestamp() buffer_keys = handler.get_buffer_keys() - all_project_ids_and_timestamps = buffer.backend.bulk_get_sorted_set( + all_project_ids_and_timestamps = buffer.bulk_get_sorted_set( buffer_keys, min=0, max=fetch_time, @@ -183,14 +194,10 @@ def process_buffer() -> None: project_ids = list(all_project_ids_and_timestamps.keys()) for project_id in project_ids: - process_in_batches(project_id, processing_type) + process_in_batches(buffer, project_id, processing_type) - buffer.backend.delete_keys( + buffer.delete_keys( buffer_keys, min=0, max=fetch_time, ) - - -if not redis_buffer_registry.has(BufferHookEvent.FLUSH): - redis_buffer_registry.add_handler(BufferHookEvent.FLUSH, process_buffer) diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index 3f7e9a6032110d..b1e25b5b8ffa58 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -12,7 +12,7 @@ from django.db.models import OuterRef, Subquery from sentry import buffer, features, nodestore -from sentry.buffer.base import BufferField +from sentry.buffer.base import Buffer, BufferField from sentry.db import models from sentry.issues.issue_occurrence import IssueOccurrence from sentry.models.group import Group @@ -52,6 +52,7 @@ from sentry.utils import json, metrics from sentry.utils.dates import ensure_aware from sentry.utils.iterators import chunked +from sentry.utils.lazy_service_wrapper import LazyServiceWrapper from sentry.utils.retries import ConditionalRetryPolicy, exponential_delay from sentry.utils.safe import safe_execute from sentry.workflow_engine.processors.log_util import track_batch_performance @@ -787,3 +788,7 @@ def hash_args(self) -> BufferHashKeys: @property def processing_task(self) -> Task: return apply_delayed + + @staticmethod + def buffer_backend() -> LazyServiceWrapper[Buffer]: + return buffer.backend diff --git a/src/sentry/tasks/process_buffer.py b/src/sentry/tasks/process_buffer.py index 9b2d1f9e9730ca..4c187df0f05be4 100644 --- a/src/sentry/tasks/process_buffer.py +++ b/src/sentry/tasks/process_buffer.py @@ -52,13 +52,13 @@ def process_pending_batch() -> None: """ Process pending buffers in a batch. """ - from sentry import buffer + from sentry.rules.processing.buffer_processing import process_buffer lock = get_process_lock("process_pending_batch") try: with lock.acquire(): - buffer.backend.process_batch() + process_buffer() except UnableToAcquireLock as error: logger.warning("process_pending_batch.fail", extra={"error": error}) diff --git a/src/sentry/workflow_engine/buffer/__init__.py b/src/sentry/workflow_engine/buffer/__init__.py new file mode 100644 index 00000000000000..0423064b702cdd --- /dev/null +++ b/src/sentry/workflow_engine/buffer/__init__.py @@ -0,0 +1,21 @@ +from django.conf import settings + +import sentry.buffer as old_buffer +from sentry import options +from sentry.buffer.base import Buffer +from sentry.utils.services import LazyServiceWrapper + +# Workflows-specific Buffer that can be configured separately from the default Buffer. +_backend = LazyServiceWrapper( + Buffer, settings.SENTRY_WORKFLOW_BUFFER, settings.SENTRY_WORKFLOW_BUFFER_OPTIONS +) + + +def get_backend() -> LazyServiceWrapper[Buffer]: + """ + Retrieve the appropriate Buffer to use for the workflow engine. + """ + if options.get("workflow_engine.buffer.use_new_buffer"): + return _backend + else: + return old_buffer.backend diff --git a/src/sentry/workflow_engine/processors/delayed_workflow.py b/src/sentry/workflow_engine/processors/delayed_workflow.py index 619fb86ae31f72..b2d20eca54c375 100644 --- a/src/sentry/workflow_engine/processors/delayed_workflow.py +++ b/src/sentry/workflow_engine/processors/delayed_workflow.py @@ -11,7 +11,8 @@ from django.utils import timezone from pydantic import BaseModel, validator -from sentry import buffer, features, nodestore, options +import sentry.workflow_engine.buffer as buffer +from sentry import features, nodestore, options from sentry.buffer.base import BufferField from sentry.db import models from sentry.issues.issue_occurrence import IssueOccurrence @@ -312,7 +313,7 @@ def fetch_group_to_event_data( if batch_key: field["batch_key"] = batch_key - return buffer.backend.get_hash(model=model, field=field) + return buffer.get_backend().get_hash(model=model, field=field) def fetch_workflows_envs( @@ -765,7 +766,7 @@ def cleanup_redis_buffer( if batch_key: filters["batch_key"] = batch_key - buffer.backend.delete_hash(model=Workflow, filters=filters, fields=hashes_to_delete) + buffer.get_backend().delete_hash(model=Workflow, filters=filters, fields=hashes_to_delete) def repr_keys[T, V](d: dict[T, V]) -> dict[str, V]: diff --git a/src/sentry/workflow_engine/processors/workflow.py b/src/sentry/workflow_engine/processors/workflow.py index 37cfd186a975ba..955bae743fd358 100644 --- a/src/sentry/workflow_engine/processors/workflow.py +++ b/src/sentry/workflow_engine/processors/workflow.py @@ -9,11 +9,12 @@ from django.db.models import Q from django.utils import timezone -from sentry import buffer, features +from sentry import features from sentry.models.activity import Activity from sentry.models.environment import Environment from sentry.services.eventstore.models import GroupEvent from sentry.utils import json +from sentry.workflow_engine import buffer from sentry.workflow_engine.models import Action, DataConditionGroup, Detector, Workflow from sentry.workflow_engine.models.workflow_data_condition_group import WorkflowDataConditionGroup from sentry.workflow_engine.processors.action import ( @@ -118,8 +119,9 @@ def enqueue_workflows( sentry_sdk.set_tag("delayed_workflow_items", items) return + backend = buffer.get_backend() for project_id, queue_items in items_by_project_id.items(): - buffer.backend.push_to_hash_bulk( + backend.push_to_hash_bulk( model=Workflow, filters={"project_id": project_id}, data={queue_item.buffer_key(): queue_item.buffer_value() for queue_item in queue_items}, @@ -130,7 +132,7 @@ def enqueue_workflows( sentry_sdk.set_tag("delayed_workflow_items", items) sharded_key = random.choice(DelayedWorkflow.get_buffer_keys()) - buffer.backend.push_to_sorted_set(key=sharded_key, value=list(items_by_project_id.keys())) + backend.push_to_sorted_set(key=sharded_key, value=list(items_by_project_id.keys())) logger.debug( "workflow_engine.workflows.enqueued", diff --git a/src/sentry/workflow_engine/tasks/delayed_workflows.py b/src/sentry/workflow_engine/tasks/delayed_workflows.py index bc8778494f140d..98a9e7e9a8840b 100644 --- a/src/sentry/workflow_engine/tasks/delayed_workflows.py +++ b/src/sentry/workflow_engine/tasks/delayed_workflows.py @@ -4,7 +4,9 @@ from celery import Task +import sentry.workflow_engine.buffer as buffer from sentry import options +from sentry.buffer.base import Buffer from sentry.rules.processing.buffer_processing import ( BufferHashKeys, DelayedProcessingBase, @@ -16,6 +18,7 @@ from sentry.taskworker.config import TaskworkerConfig from sentry.taskworker.namespaces import workflow_engine_tasks from sentry.taskworker.retry import Retry +from sentry.utils.lazy_service_wrapper import LazyServiceWrapper from sentry.workflow_engine.models import Workflow from sentry.workflow_engine.utils import log_context @@ -66,3 +69,7 @@ def processing_task(self) -> Task: if options.get("delayed_workflow.use_workflow_engine_pool"): return process_delayed_workflows_shim return process_delayed_workflows + + @staticmethod + def buffer_backend() -> LazyServiceWrapper[Buffer]: + return buffer.get_backend() diff --git a/tests/sentry/buffer/test_redis.py b/tests/sentry/buffer/test_redis.py index ac859e5ad9b136..b108f19cd2f5bb 100644 --- a/tests/sentry/buffer/test_redis.py +++ b/tests/sentry/buffer/test_redis.py @@ -5,22 +5,14 @@ from collections import defaultdict from collections.abc import Mapping from unittest import mock -from unittest.mock import MagicMock, Mock import pytest from django.utils import timezone from sentry import options -from sentry.buffer.redis import ( - BufferHookEvent, - RedisBuffer, - _get_model_key, - redis_buffer_registry, - redis_buffer_router, -) +from sentry.buffer.redis import RedisBuffer, _get_model_key, redis_buffer_router from sentry.models.group import Group from sentry.models.project import Project -from sentry.rules.processing.buffer_processing import process_buffer from sentry.rules.processing.processor import PROJECT_ID_BUFFER_LIST_KEY from sentry.testutils.helpers.datetime import freeze_time from sentry.testutils.pytest.fixtures import django_db_all @@ -330,27 +322,6 @@ def test_bulk_sorted_set_single_key(self) -> None: assert len(project_ids_and_timestamps) == 4 assert set(project_ids_and_timestamps.keys()) == set(project_ids) - def test_buffer_hook_registry(self) -> None: - """Test that we can add an event to the registry and that the callback is invoked""" - mock = Mock() - redis_buffer_registry._registry[BufferHookEvent.FLUSH] = mock - - redis_buffer_registry.callback(BufferHookEvent.FLUSH) - assert mock.call_count == 1 - - @mock.patch("sentry.rules.processing.buffer_processing.metrics.timer") - def test_callback(self, mock_metrics_timer: MagicMock) -> None: - redis_buffer_registry.add_handler(BufferHookEvent.FLUSH, process_buffer) - self.buf.process_batch() - assert mock_metrics_timer.call_count == 1 - - def test_process_batch(self) -> None: - """Test that the registry's callbacks are invoked when we process a batch""" - mock = Mock() - redis_buffer_registry._registry[BufferHookEvent.FLUSH] = mock - self.buf.process_batch() - assert mock.call_count == 1 - def test_delete_batch(self) -> None: """Test that after we add things to redis we can clean it up""" project_id = 1 diff --git a/tests/sentry/rules/processing/test_buffer_processing.py b/tests/sentry/rules/processing/test_buffer_processing.py index da14a7a5f4f534..c51e047595da12 100644 --- a/tests/sentry/rules/processing/test_buffer_processing.py +++ b/tests/sentry/rules/processing/test_buffer_processing.py @@ -272,7 +272,7 @@ def setUp(self) -> None: @patch("sentry.rules.processing.delayed_processing.apply_delayed.apply_async") def test_no_redis_data(self, mock_apply_delayed: MagicMock) -> None: - process_in_batches(self.project.id, "delayed_processing") + process_in_batches(buffer.backend, self.project.id, "delayed_processing") mock_apply_delayed.assert_called_once_with( kwargs={"project_id": self.project.id}, headers={"sentry-propagate-traces": False} ) @@ -283,7 +283,7 @@ def test_basic(self, mock_apply_delayed: MagicMock) -> None: self.push_to_hash(self.project.id, self.rule.id, self.group_two.id) self.push_to_hash(self.project.id, self.rule.id, self.group_three.id) - process_in_batches(self.project.id, "delayed_processing") + process_in_batches(buffer.backend, self.project.id, "delayed_processing") mock_apply_delayed.assert_called_once_with( kwargs={"project_id": self.project.id}, headers={"sentry-propagate-traces": False} ) @@ -295,7 +295,7 @@ def test_batch(self, mock_apply_delayed: MagicMock) -> None: self.push_to_hash(self.project.id, self.rule.id, self.group_two.id) self.push_to_hash(self.project.id, self.rule.id, self.group_three.id) - process_in_batches(self.project.id, "delayed_processing") + process_in_batches(buffer.backend, self.project.id, "delayed_processing") assert mock_apply_delayed.call_count == 2 # Validate the batches are created correctly diff --git a/tests/sentry/rules/processing/test_delayed_processing.py b/tests/sentry/rules/processing/test_delayed_processing.py index 8d0b4e4c38051e..599b3c313d9f63 100644 --- a/tests/sentry/rules/processing/test_delayed_processing.py +++ b/tests/sentry/rules/processing/test_delayed_processing.py @@ -1532,7 +1532,7 @@ def test_batched_cleanup(self, mock_apply_delayed: MagicMock) -> None: rules_to_groups[self.rule.id].add(group_two.id) rules_to_groups[self.rule.id].add(group_three.id) - process_in_batches(self.project.id, "delayed_processing") + process_in_batches(buffer.backend, self.project.id, "delayed_processing") batch_one_key = mock_apply_delayed.call_args_list[0][1]["kwargs"]["batch_key"] batch_two_key = mock_apply_delayed.call_args_list[1][1]["kwargs"]["batch_key"] diff --git a/tests/sentry/tasks/test_process_buffer.py b/tests/sentry/tasks/test_process_buffer.py index edcabddd484f53..1d42caa07a4ff7 100644 --- a/tests/sentry/tasks/test_process_buffer.py +++ b/tests/sentry/tasks/test_process_buffer.py @@ -38,23 +38,15 @@ def test_nothing(self, mock_process_pending: mock.MagicMock) -> None: class ProcessPendingBatchTest(TestCase): - @mock.patch("sentry.buffer.backend.process_batch") - def test_process_pending_batch(self, mock_process_pending_batch: mock.MagicMock) -> None: - process_pending_batch() - assert len(mock_process_pending_batch.mock_calls) == 1 - mock_process_pending_batch.assert_any_call() - - @mock.patch("sentry.buffer.backend.process_batch") - def test_process_pending_batch_locked_out( - self, mock_process_pending_batch: mock.MagicMock - ) -> None: + @mock.patch("sentry.rules.processing.buffer_processing.process_buffer") + def test_process_pending_batch_locked_out(self, mock_process_buffer: mock.MagicMock) -> None: with self.assertLogs("sentry.tasks.process_buffer", level="WARNING") as logger: lock = get_process_lock("process_pending_batch") with lock.acquire(): process_pending_batch() self.assertEqual(len(logger.output), 1) - assert len(mock_process_pending_batch.mock_calls) == 0 + assert len(mock_process_buffer.mock_calls) == 0 with self.assertNoLogs("sentry.tasks.process_buffer", level="WARNING"): process_pending_batch() - assert len(mock_process_pending_batch.mock_calls) == 1 + assert len(mock_process_buffer.mock_calls) == 1 diff --git a/tests/sentry/workflow_engine/processors/test_delayed_workflow.py b/tests/sentry/workflow_engine/processors/test_delayed_workflow.py index 2ea4945b55c504..6cf0d6637a9923 100644 --- a/tests/sentry/workflow_engine/processors/test_delayed_workflow.py +++ b/tests/sentry/workflow_engine/processors/test_delayed_workflow.py @@ -331,7 +331,7 @@ def test_parse_timestamps(self) -> None: def test_delayed_workflow_shim(self, mock_process_delayed: MagicMock) -> None: self._push_base_events() - process_in_batches(self.project.id, "delayed_workflow") + process_in_batches(buffer.backend, self.project.id, "delayed_workflow") assert mock_process_delayed.call_count == 2 @@ -993,7 +993,7 @@ def test_batched_cleanup(self, mock_process_delayed: MagicMock) -> None: self._push_base_events() all_data = buffer.backend.get_hash(Workflow, {"project_id": self.project.id}) - process_in_batches(self.project.id, "delayed_workflow") + process_in_batches(buffer.backend, self.project.id, "delayed_workflow") batch_one_key = mock_process_delayed.call_args_list[0][1]["kwargs"]["batch_key"] batch_two_key = mock_process_delayed.call_args_list[1][1]["kwargs"]["batch_key"] diff --git a/tests/sentry/workflow_engine/processors/test_workflow.py b/tests/sentry/workflow_engine/processors/test_workflow.py index e2d4b7fc77f97f..4d9879096107b1 100644 --- a/tests/sentry/workflow_engine/processors/test_workflow.py +++ b/tests/sentry/workflow_engine/processors/test_workflow.py @@ -4,7 +4,6 @@ import pytest from django.utils import timezone -from sentry import buffer from sentry.eventstream.base import GroupState from sentry.grouping.grouptype import ErrorGroupType from sentry.models.activity import Activity @@ -12,10 +11,12 @@ from sentry.services.eventstore.models import GroupEvent from sentry.testutils.factories import Factories from sentry.testutils.helpers.datetime import before_now, freeze_time +from sentry.testutils.helpers.options import override_options from sentry.testutils.helpers.redis import mock_redis_buffer from sentry.testutils.pytest.fixtures import django_db_all from sentry.types.activity import ActivityType from sentry.utils import json +from sentry.workflow_engine import buffer as workflow_buffer from sentry.workflow_engine.models import ( Action, DataConditionGroup, @@ -473,7 +474,7 @@ def test_enqueues_workflow_all_logic_type(self) -> None: process_workflows(self.event_data) - project_ids = buffer.backend.bulk_get_sorted_set( + project_ids = workflow_buffer.get_backend().bulk_get_sorted_set( self.buffer_keys, min=0, max=self.buffer_timestamp, @@ -505,7 +506,7 @@ def test_enqueues_workflow_any_logic_type(self) -> None: process_workflows(self.event_data) - project_ids = buffer.backend.bulk_get_sorted_set( + project_ids = workflow_buffer.get_backend().bulk_get_sorted_set( self.buffer_keys, min=0, max=self.buffer_timestamp, @@ -587,7 +588,7 @@ def test_enqueues_with_when_and_if_slow_conditions(self) -> None: process_workflows(self.event_data) - project_ids = buffer.backend.bulk_get_sorted_set( + project_ids = workflow_buffer.get_backend().bulk_get_sorted_set( self.buffer_keys, min=0, max=self.buffer_timestamp, @@ -616,7 +617,7 @@ def test_enqueues_event_if_meets_fast_conditions(self) -> None: process_workflows(self.event_data) - project_ids = buffer.backend.bulk_get_sorted_set( + project_ids = workflow_buffer.get_backend().bulk_get_sorted_set( self.buffer_keys, min=0, max=self.buffer_timestamp, @@ -628,7 +629,7 @@ def test_enqueues_event_if_meets_fast_conditions(self) -> None: process_workflows(self.event_data) - project_ids = buffer.backend.bulk_get_sorted_set( + project_ids = workflow_buffer.get_backend().bulk_get_sorted_set( self.buffer_keys, min=0, max=self.buffer_timestamp, @@ -781,7 +782,7 @@ def test_enqueues_when_slow_conditions(self): enqueue_workflows(queue_items) - project_ids = buffer.backend.bulk_get_sorted_set( + project_ids = workflow_buffer.get_backend().bulk_get_sorted_set( self.buffer_keys, min=0, max=timezone.now().timestamp(), @@ -821,9 +822,10 @@ def setUp(self) -> None: @patch("sentry.buffer.backend.push_to_sorted_set") @patch("sentry.buffer.backend.push_to_hash_bulk") @patch("random.choice") + @override_options({"workflow_engine.buffer.use_new_buffer": False}) def test_enqueue_workflows__adds_to_workflow_engine_buffer( self, mock_randchoice, mock_push_to_hash_bulk, mock_push_to_sorted_set - ): + ) -> None: mock_randchoice.return_value = f"{DelayedWorkflow.buffer_key}:{5}" enqueue_workflows( { @@ -843,11 +845,39 @@ def test_enqueue_workflows__adds_to_workflow_engine_buffer( value=[self.group_event.project_id], ) + @patch("sentry.workflow_engine.buffer._backend.push_to_sorted_set") + @patch("sentry.workflow_engine.buffer._backend.push_to_hash_bulk") + @patch("random.choice") + @override_options({"workflow_engine.buffer.use_new_buffer": True}) + def test_enqueue_workflows__adds_to_workflow_engine_buffer_new_buffer( + self, mock_randchoice, mock_push_to_hash_bulk, mock_push_to_sorted_set + ) -> None: + key_choice = f"{DelayedWorkflow.buffer_key}:{5}" + mock_randchoice.return_value = key_choice + enqueue_workflows( + { + self.workflow: DelayedWorkflowItem( + self.workflow, + self.group_event, + self.workflow.when_condition_group_id, + [self.slow_workflow_filter_group.id], + [self.workflow_filter_group.id], + timestamp=timezone.now(), + ) + } + ) + + mock_push_to_sorted_set.assert_called_once_with( + key=key_choice, + value=[self.group_event.project_id], + ) + @patch("sentry.buffer.backend.push_to_sorted_set") @patch("sentry.buffer.backend.push_to_hash_bulk") + @override_options({"workflow_engine.buffer.use_new_buffer": False}) def test_enqueue_workflow__adds_to_workflow_engine_set( self, mock_push_to_hash_bulk, mock_push_to_sorted_set - ): + ) -> None: current_time = timezone.now() workflow_filter_group_2 = self.create_data_condition_group() self.create_workflow_data_condition_group(