Skip to content

Commit a3b150f

Browse files
committed
feat(aci): Separate Buffer for Workflows
1 parent 642d48f commit a3b150f

File tree

16 files changed

+89
-88
lines changed

16 files changed

+89
-88
lines changed

src/sentry/buffer/base.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ class Buffer(Service):
3030
"incr",
3131
"process",
3232
"process_pending",
33-
"process_batch",
3433
"validate",
3534
"push_to_sorted_set",
3635
"push_to_hash",
@@ -148,9 +147,6 @@ def incr(
148147
def process_pending(self) -> None:
149148
return
150149

151-
def process_batch(self) -> None:
152-
return
153-
154150
def process(
155151
self,
156152
model: type[models.Model] | None,

src/sentry/buffer/redis.py

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -57,32 +57,6 @@ def _validate_json_roundtrip(value: dict[str, Any], model: type[models.Model]) -
5757
logger.exception("buffer.invalid_value", extra={"value": value, "model": model})
5858

5959

60-
class BufferHookEvent(Enum):
61-
FLUSH = "flush"
62-
63-
64-
class BufferHookRegistry:
65-
def __init__(self, *args: Any, **kwargs: Any) -> None:
66-
self._registry: dict[BufferHookEvent, Callable[..., Any]] = {}
67-
68-
def add_handler(self, key: BufferHookEvent, func: Callable[..., Any]) -> None:
69-
self._registry[key] = func
70-
71-
def has(self, key: BufferHookEvent) -> bool:
72-
return self._registry.get(key) is not None
73-
74-
def callback(self, buffer_hook_event: BufferHookEvent) -> bool:
75-
try:
76-
callback = self._registry[buffer_hook_event]
77-
except KeyError:
78-
logger.exception("buffer_hook_event.missing")
79-
80-
return callback()
81-
82-
83-
redis_buffer_registry = BufferHookRegistry()
84-
85-
8660
# Callable to get the queue name for the given model_key.
8761
# May return None to not assign a queue for the given model_key.
8862
ChooseQueueFunction = Callable[[str], str | None]
@@ -495,12 +469,6 @@ def get_hash_length(self, model: type[models.Model], field: dict[str, BufferFiel
495469
key = self._make_key(model, field)
496470
return self._execute_redis_operation(key, RedisOperation.HASH_LENGTH)
497471

498-
def process_batch(self) -> None:
499-
try:
500-
redis_buffer_registry.callback(BufferHookEvent.FLUSH)
501-
except Exception:
502-
logger.exception("process_batch.error")
503-
504472
def incr(
505473
self,
506474
model: type[models.Model],

src/sentry/conf/server.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2088,6 +2088,10 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
20882088
SENTRY_BUFFER = "sentry.buffer.Buffer"
20892089
SENTRY_BUFFER_OPTIONS: dict[str, str] = {}
20902090

2091+
# Workflow Buffer backend
2092+
SENTRY_WORKFLOW_BUFFER = "sentry.buffer.Buffer"
2093+
SENTRY_WORKFLOW_BUFFER_OPTIONS: dict[str, str] = {}
2094+
20912095
# Cache backend
20922096
# XXX: We explicitly require the cache to be configured as its not optional
20932097
# and causes serious confusion with the default django cache

src/sentry/options/defaults.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3154,6 +3154,12 @@
31543154
default=[],
31553155
flags=FLAG_AUTOMATOR_MODIFIABLE,
31563156
)
3157+
register(
3158+
"workflow_engine.buffer.use_new_buffer",
3159+
type=Bool,
3160+
default=False,
3161+
flags=FLAG_AUTOMATOR_MODIFIABLE,
3162+
)
31573163

31583164
# Restrict uptime issue creation for specific host provider identifiers. Items
31593165
# in this list map to the `host_provider_id` column in the UptimeSubscription

src/sentry/rules/processing/buffer_processing.py

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@
99

1010
from celery import Task
1111

12-
from sentry import buffer, options
13-
from sentry.buffer.base import BufferField
14-
from sentry.buffer.redis import BufferHookEvent, redis_buffer_registry
12+
from sentry import options
13+
from sentry.buffer.base import Buffer, BufferField
1514
from sentry.db import models
1615
from sentry.utils import metrics
16+
from sentry.utils.lazy_service_wrapper import LazyServiceWrapper
1717
from sentry.utils.registry import NoRegistrationExistsError, Registry
1818

1919
logger = logging.getLogger("sentry.delayed_processing")
@@ -56,12 +56,19 @@ def get_buffer_keys(cls) -> list[str]:
5656
for shard in range(cls.buffer_shards)
5757
]
5858

59+
@staticmethod
60+
def buffer_backend() -> LazyServiceWrapper[Buffer]:
61+
raise NotImplementedError
62+
5963

6064
delayed_processing_registry = Registry[type[DelayedProcessingBase]]()
6165

6266

6367
def fetch_group_to_event_data(
64-
project_id: int, model: type[models.Model], batch_key: str | None = None
68+
buffer: LazyServiceWrapper[Buffer],
69+
project_id: int,
70+
model: type[models.Model],
71+
batch_key: str | None = None,
6572
) -> dict[str, str]:
6673
field: dict[str, models.Model | int | str] = {
6774
"project_id": project_id,
@@ -70,7 +77,7 @@ def fetch_group_to_event_data(
7077
if batch_key:
7178
field["batch_key"] = batch_key
7279

73-
return buffer.backend.get_hash(model=model, field=field)
80+
return buffer.get_hash(model=model, field=field)
7481

7582

7683
def bucket_num_groups(num_groups: int) -> str:
@@ -80,7 +87,9 @@ def bucket_num_groups(num_groups: int) -> str:
8087
return "1"
8188

8289

83-
def process_in_batches(project_id: int, processing_type: str) -> None:
90+
def process_in_batches(
91+
buffer: LazyServiceWrapper[Buffer], project_id: int, processing_type: str
92+
) -> None:
8493
"""
8594
This will check the number of alertgroup_to_event_data items in the Redis buffer for a project.
8695
@@ -109,7 +118,7 @@ def process_in_batches(project_id: int, processing_type: str) -> None:
109118
task = processing_info.processing_task
110119
filters: dict[str, BufferField] = asdict(hash_args.filters)
111120

112-
event_count = buffer.backend.get_hash_length(model=hash_args.model, field=filters)
121+
event_count = buffer.get_hash_length(model=hash_args.model, field=filters)
113122
metrics.incr(
114123
f"{processing_type}.num_groups", tags={"num_groups": bucket_num_groups(event_count)}
115124
)
@@ -127,22 +136,22 @@ def process_in_batches(project_id: int, processing_type: str) -> None:
127136
)
128137

129138
# if the dictionary is large, get the items and chunk them.
130-
alertgroup_to_event_data = fetch_group_to_event_data(project_id, hash_args.model)
139+
alertgroup_to_event_data = fetch_group_to_event_data(buffer, project_id, hash_args.model)
131140

132141
with metrics.timer(f"{processing_type}.process_batch.duration"):
133142
items = iter(alertgroup_to_event_data.items())
134143

135144
while batch := dict(islice(items, batch_size)):
136145
batch_key = str(uuid.uuid4())
137146

138-
buffer.backend.push_to_hash_bulk(
147+
buffer.push_to_hash_bulk(
139148
model=hash_args.model,
140149
filters={**filters, "batch_key": batch_key},
141150
data=batch,
142151
)
143152

144153
# remove the batched items from the project alertgroup_to_event_data
145-
buffer.backend.delete_hash(**asdict(hash_args), fields=list(batch.keys()))
154+
buffer.delete_hash(**asdict(hash_args), fields=list(batch.keys()))
146155

147156
task.apply_async(
148157
kwargs={"project_id": project_id, "batch_key": batch_key},
@@ -159,6 +168,8 @@ def process_buffer() -> None:
159168
logger.info(log_name, extra={"option": handler.option})
160169
continue
161170

171+
buffer = handler.buffer_backend()
172+
162173
with metrics.timer(f"{processing_type}.process_all_conditions.duration"):
163174
# We need to use a very fresh timestamp here; project scores (timestamps) are
164175
# updated with each relevant event, and some can be updated every few milliseconds.
@@ -167,7 +178,7 @@ def process_buffer() -> None:
167178
# retrieved and processed here.
168179
fetch_time = datetime.now(tz=timezone.utc).timestamp()
169180
buffer_keys = handler.get_buffer_keys()
170-
all_project_ids_and_timestamps = buffer.backend.bulk_get_sorted_set(
181+
all_project_ids_and_timestamps = buffer.bulk_get_sorted_set(
171182
buffer_keys,
172183
min=0,
173184
max=fetch_time,
@@ -185,12 +196,8 @@ def process_buffer() -> None:
185196
for project_id in project_ids:
186197
process_in_batches(project_id, processing_type)
187198

188-
buffer.backend.delete_keys(
199+
buffer.delete_keys(
189200
buffer_keys,
190201
min=0,
191202
max=fetch_time,
192203
)
193-
194-
195-
if not redis_buffer_registry.has(BufferHookEvent.FLUSH):
196-
redis_buffer_registry.add_handler(BufferHookEvent.FLUSH, process_buffer)

src/sentry/rules/processing/delayed_processing.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from django.db.models import OuterRef, Subquery
1313

1414
from sentry import buffer, features, nodestore
15-
from sentry.buffer.base import BufferField
15+
from sentry.buffer.base import Buffer, BufferField
1616
from sentry.db import models
1717
from sentry.issues.issue_occurrence import IssueOccurrence
1818
from sentry.models.group import Group
@@ -52,6 +52,7 @@
5252
from sentry.utils import json, metrics
5353
from sentry.utils.dates import ensure_aware
5454
from sentry.utils.iterators import chunked
55+
from sentry.utils.lazy_service_wrapper import LazyServiceWrapper
5556
from sentry.utils.retries import ConditionalRetryPolicy, exponential_delay
5657
from sentry.utils.safe import safe_execute
5758
from sentry.workflow_engine.processors.log_util import track_batch_performance
@@ -787,3 +788,7 @@ def hash_args(self) -> BufferHashKeys:
787788
@property
788789
def processing_task(self) -> Task:
789790
return apply_delayed
791+
792+
@staticmethod
793+
def buffer_backend() -> LazyServiceWrapper[Buffer]:
794+
return buffer.backend

src/sentry/tasks/process_buffer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,13 @@ def process_pending_batch() -> None:
5252
"""
5353
Process pending buffers in a batch.
5454
"""
55-
from sentry import buffer
55+
from sentry.rules.processing.buffer_processing import process_buffer
5656

5757
lock = get_process_lock("process_pending_batch")
5858

5959
try:
6060
with lock.acquire():
61-
buffer.backend.process_batch()
61+
process_buffer()
6262
except UnableToAcquireLock as error:
6363
logger.warning("process_pending_batch.fail", extra={"error": error})
6464

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from django.conf import settings
2+
3+
import sentry.buffer as old_buffer
4+
from sentry import options
5+
from sentry.buffer.base import Buffer
6+
from sentry.utils.services import LazyServiceWrapper
7+
8+
_backend = LazyServiceWrapper(
9+
Buffer, settings.SENTRY_WORKFLOW_BUFFER, settings.SENTRY_WORKFLOW_BUFFER_OPTIONS
10+
)
11+
12+
13+
def validate_new_backend() -> None:
14+
pass
15+
16+
17+
def get_backend() -> LazyServiceWrapper[Buffer]:
18+
if options.get("workflow_engine.buffer.use_new_buffer"):
19+
return _backend
20+
else:
21+
return old_buffer.backend

src/sentry/workflow_engine/processors/delayed_workflow.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
from django.utils import timezone
1212
from pydantic import BaseModel, validator
1313

14-
from sentry import buffer, features, nodestore, options
14+
import sentry.workflow_engine.buffer as buffer
15+
from sentry import features, nodestore, options
1516
from sentry.buffer.base import BufferField
1617
from sentry.db import models
1718
from sentry.issues.issue_occurrence import IssueOccurrence
@@ -310,7 +311,7 @@ def fetch_group_to_event_data(
310311
if batch_key:
311312
field["batch_key"] = batch_key
312313

313-
return buffer.backend.get_hash(model=model, field=field)
314+
return buffer.get_backend().get_hash(model=model, field=field)
314315

315316

316317
def fetch_workflows_envs(
@@ -771,7 +772,7 @@ def cleanup_redis_buffer(
771772
if batch_key:
772773
filters["batch_key"] = batch_key
773774

774-
buffer.backend.delete_hash(model=Workflow, filters=filters, fields=hashes_to_delete)
775+
buffer.get_backend().delete_hash(model=Workflow, filters=filters, fields=hashes_to_delete)
775776

776777

777778
def repr_keys[T, V](d: dict[T, V]) -> dict[str, V]:

src/sentry/workflow_engine/processors/workflow.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@
99
from django.db.models import Q
1010
from django.utils import timezone
1111

12-
from sentry import buffer, features
12+
from sentry import features
1313
from sentry.eventstore.models import GroupEvent
1414
from sentry.models.activity import Activity
1515
from sentry.models.environment import Environment
1616
from sentry.utils import json
17+
from sentry.workflow_engine import buffer
1718
from sentry.workflow_engine.models import Action, DataConditionGroup, Detector, Workflow
1819
from sentry.workflow_engine.models.workflow_data_condition_group import WorkflowDataConditionGroup
1920
from sentry.workflow_engine.processors.action import filter_recently_fired_workflow_actions
@@ -116,8 +117,9 @@ def enqueue_workflows(
116117
sentry_sdk.set_tag("delayed_workflow_items", items)
117118
return
118119

120+
backend = buffer.get_backend()
119121
for project_id, queue_items in items_by_project_id.items():
120-
buffer.backend.push_to_hash_bulk(
122+
backend.push_to_hash_bulk(
121123
model=Workflow,
122124
filters={"project_id": project_id},
123125
data={queue_item.buffer_key(): queue_item.buffer_value() for queue_item in queue_items},
@@ -128,7 +130,7 @@ def enqueue_workflows(
128130
sentry_sdk.set_tag("delayed_workflow_items", items)
129131

130132
sharded_key = random.choice(DelayedWorkflow.get_buffer_keys())
131-
buffer.backend.push_to_sorted_set(key=sharded_key, value=list(items_by_project_id.keys()))
133+
backend.push_to_sorted_set(key=sharded_key, value=list(items_by_project_id.keys()))
132134

133135
logger.debug(
134136
"workflow_engine.workflows.enqueued",

0 commit comments

Comments
 (0)