Skip to content

Commit ef15d42

Browse files
committed
feat(aci): Separate Buffer for Workflows
1 parent f17ceaf commit ef15d42

File tree

16 files changed

+90
-110
lines changed

16 files changed

+90
-110
lines changed

src/sentry/buffer/base.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ class Buffer(Service):
3030
"incr",
3131
"process",
3232
"process_pending",
33-
"process_batch",
3433
"validate",
3534
"push_to_sorted_set",
3635
"push_to_hash",
@@ -140,9 +139,6 @@ def incr(
140139
def process_pending(self) -> None:
141140
return
142141

143-
def process_batch(self) -> None:
144-
return
145-
146142
def process(
147143
self,
148144
model: type[models.Model] | None,

src/sentry/buffer/redis.py

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -56,32 +56,6 @@ def _validate_json_roundtrip(value: dict[str, Any], model: type[models.Model]) -
5656
logger.exception("buffer.invalid_value", extra={"value": value, "model": model})
5757

5858

59-
class BufferHookEvent(Enum):
60-
FLUSH = "flush"
61-
62-
63-
class BufferHookRegistry:
64-
def __init__(self, *args: Any, **kwargs: Any) -> None:
65-
self._registry: dict[BufferHookEvent, Callable[..., Any]] = {}
66-
67-
def add_handler(self, key: BufferHookEvent, func: Callable[..., Any]) -> None:
68-
self._registry[key] = func
69-
70-
def has(self, key: BufferHookEvent) -> bool:
71-
return self._registry.get(key) is not None
72-
73-
def callback(self, buffer_hook_event: BufferHookEvent) -> bool:
74-
try:
75-
callback = self._registry[buffer_hook_event]
76-
except KeyError:
77-
logger.exception("buffer_hook_event.missing")
78-
79-
return callback()
80-
81-
82-
redis_buffer_registry = BufferHookRegistry()
83-
84-
8559
# Callable to get the queue name for the given model_key.
8660
# May return None to not assign a queue for the given model_key.
8761
ChooseQueueFunction = Callable[[str], str | None]
@@ -445,12 +419,6 @@ def get_hash_length(self, model: type[models.Model], field: dict[str, BufferFiel
445419
key = self._make_key(model, field)
446420
return self._execute_redis_operation(key, RedisOperation.HASH_LENGTH)
447421

448-
def process_batch(self) -> None:
449-
try:
450-
redis_buffer_registry.callback(BufferHookEvent.FLUSH)
451-
except Exception:
452-
logger.exception("process_batch.error")
453-
454422
def incr(
455423
self,
456424
model: type[models.Model],

src/sentry/conf/server.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2088,6 +2088,10 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
20882088
SENTRY_BUFFER = "sentry.buffer.Buffer"
20892089
SENTRY_BUFFER_OPTIONS: dict[str, str] = {}
20902090

2091+
# Workflow Buffer backend
2092+
SENTRY_WORKFLOW_BUFFER = "sentry.buffer.Buffer"
2093+
SENTRY_WORKFLOW_BUFFER_OPTIONS: dict[str, str] = {}
2094+
20912095
# Cache backend
20922096
# XXX: We explicitly require the cache to be configured as its not optional
20932097
# and causes serious confusion with the default django cache

src/sentry/options/defaults.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3147,6 +3147,12 @@
31473147
default=[],
31483148
flags=FLAG_AUTOMATOR_MODIFIABLE,
31493149
)
3150+
register(
3151+
"workflow_engine.buffer.use_new_buffer",
3152+
type=Bool,
3153+
default=False,
3154+
flags=FLAG_AUTOMATOR_MODIFIABLE,
3155+
)
31503156

31513157
# Restrict uptime issue creation for specific host provider identifiers. Items
31523158
# in this list map to the `host_provider_id` column in the UptimeSubscription

src/sentry/rules/processing/buffer_processing.py

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@
99

1010
from celery import Task
1111

12-
from sentry import buffer, options
13-
from sentry.buffer.base import BufferField
14-
from sentry.buffer.redis import BufferHookEvent, redis_buffer_registry
12+
from sentry import options
13+
from sentry.buffer.base import Buffer, BufferField
1514
from sentry.db import models
1615
from sentry.utils import metrics
16+
from sentry.utils.lazy_service_wrapper import LazyServiceWrapper
1717
from sentry.utils.registry import NoRegistrationExistsError, Registry
1818

1919
logger = logging.getLogger("sentry.delayed_processing")
@@ -47,12 +47,19 @@ def hash_args(self) -> BufferHashKeys:
4747
def processing_task(self) -> Task:
4848
raise NotImplementedError
4949

50+
@staticmethod
51+
def buffer_backend() -> LazyServiceWrapper[Buffer]:
52+
raise NotImplementedError
53+
5054

5155
delayed_processing_registry = Registry[type[DelayedProcessingBase]]()
5256

5357

5458
def fetch_group_to_event_data(
55-
project_id: int, model: type[models.Model], batch_key: str | None = None
59+
buffer: LazyServiceWrapper[Buffer],
60+
project_id: int,
61+
model: type[models.Model],
62+
batch_key: str | None = None,
5663
) -> dict[str, str]:
5764
field: dict[str, models.Model | int | str] = {
5865
"project_id": project_id,
@@ -61,7 +68,7 @@ def fetch_group_to_event_data(
6168
if batch_key:
6269
field["batch_key"] = batch_key
6370

64-
return buffer.backend.get_hash(model=model, field=field)
71+
return buffer.get_hash(model=model, field=field)
6572

6673

6774
def bucket_num_groups(num_groups: int) -> str:
@@ -71,7 +78,9 @@ def bucket_num_groups(num_groups: int) -> str:
7178
return "1"
7279

7380

74-
def process_in_batches(project_id: int, processing_type: str) -> None:
81+
def process_in_batches(
82+
buffer: LazyServiceWrapper[Buffer], project_id: int, processing_type: str
83+
) -> None:
7584
"""
7685
This will check the number of alertgroup_to_event_data items in the Redis buffer for a project.
7786
@@ -100,7 +109,7 @@ def process_in_batches(project_id: int, processing_type: str) -> None:
100109
task = processing_info.processing_task
101110
filters: dict[str, BufferField] = asdict(hash_args.filters)
102111

103-
event_count = buffer.backend.get_hash_length(model=hash_args.model, field=filters)
112+
event_count = buffer.get_hash_length(model=hash_args.model, field=filters)
104113
metrics.incr(
105114
f"{processing_type}.num_groups", tags={"num_groups": bucket_num_groups(event_count)}
106115
)
@@ -118,22 +127,22 @@ def process_in_batches(project_id: int, processing_type: str) -> None:
118127
)
119128

120129
# if the dictionary is large, get the items and chunk them.
121-
alertgroup_to_event_data = fetch_group_to_event_data(project_id, hash_args.model)
130+
alertgroup_to_event_data = fetch_group_to_event_data(buffer, project_id, hash_args.model)
122131

123132
with metrics.timer(f"{processing_type}.process_batch.duration"):
124133
items = iter(alertgroup_to_event_data.items())
125134

126135
while batch := dict(islice(items, batch_size)):
127136
batch_key = str(uuid.uuid4())
128137

129-
buffer.backend.push_to_hash_bulk(
138+
buffer.push_to_hash_bulk(
130139
model=hash_args.model,
131140
filters={**filters, "batch_key": batch_key},
132141
data=batch,
133142
)
134143

135144
# remove the batched items from the project alertgroup_to_event_data
136-
buffer.backend.delete_hash(**asdict(hash_args), fields=list(batch.keys()))
145+
buffer.delete_hash(**asdict(hash_args), fields=list(batch.keys()))
137146

138147
task.apply_async(
139148
kwargs={"project_id": project_id, "batch_key": batch_key},
@@ -150,14 +159,16 @@ def process_buffer() -> None:
150159
logger.info(log_name, extra={"option": handler.option})
151160
continue
152161

162+
buffer = handler.buffer_backend()
163+
153164
with metrics.timer(f"{processing_type}.process_all_conditions.duration"):
154165
# We need to use a very fresh timestamp here; project scores (timestamps) are
155166
# updated with each relevant event, and some can be updated every few milliseconds.
156167
# The staler this timestamp, the more likely it'll miss some recently updated projects,
157168
# and the more likely we'll have frequently updated projects that are never actually
158169
# retrieved and processed here.
159170
fetch_time = datetime.now(tz=timezone.utc)
160-
project_ids = buffer.backend.get_sorted_set(
171+
project_ids = buffer.get_sorted_set(
161172
handler.buffer_key, min=0, max=fetch_time.timestamp()
162173
)
163174
if should_emit_logs:
@@ -168,10 +179,6 @@ def process_buffer() -> None:
168179
logger.info(log_name, extra={"project_ids": log_str})
169180

170181
for project_id, _ in project_ids:
171-
process_in_batches(project_id, processing_type)
172-
173-
buffer.backend.delete_key(handler.buffer_key, min=0, max=fetch_time.timestamp())
174-
182+
process_in_batches(buffer, project_id, processing_type)
175183

176-
if not redis_buffer_registry.has(BufferHookEvent.FLUSH):
177-
redis_buffer_registry.add_handler(BufferHookEvent.FLUSH, process_buffer)
184+
buffer.delete_key(handler.buffer_key, min=0, max=fetch_time.timestamp())

src/sentry/rules/processing/delayed_processing.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from django.db.models import OuterRef, Subquery
1313

1414
from sentry import buffer, features, nodestore
15-
from sentry.buffer.base import BufferField
15+
from sentry.buffer.base import Buffer, BufferField
1616
from sentry.db import models
1717
from sentry.eventstore.models import Event, GroupEvent
1818
from sentry.issues.issue_occurrence import IssueOccurrence
@@ -52,6 +52,7 @@
5252
from sentry.utils import json, metrics
5353
from sentry.utils.dates import ensure_aware
5454
from sentry.utils.iterators import chunked
55+
from sentry.utils.lazy_service_wrapper import LazyServiceWrapper
5556
from sentry.utils.retries import ConditionalRetryPolicy, exponential_delay
5657
from sentry.utils.safe import safe_execute
5758
from sentry.workflow_engine.processors.log_util import track_batch_performance
@@ -787,3 +788,7 @@ def hash_args(self) -> BufferHashKeys:
787788
@property
788789
def processing_task(self) -> Task:
789790
return apply_delayed
791+
792+
@staticmethod
793+
def buffer_backend() -> LazyServiceWrapper[Buffer]:
794+
return buffer.backend

src/sentry/tasks/process_buffer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,13 @@ def process_pending_batch() -> None:
5252
"""
5353
Process pending buffers in a batch.
5454
"""
55-
from sentry import buffer
55+
from sentry.rules.processing.buffer_processing import process_buffer
5656

5757
lock = get_process_lock("process_pending_batch")
5858

5959
try:
6060
with lock.acquire():
61-
buffer.backend.process_batch()
61+
process_buffer()
6262
except UnableToAcquireLock as error:
6363
logger.warning("process_pending_batch.fail", extra={"error": error})
6464

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from django.conf import settings
2+
3+
import sentry.buffer as old_buffer
4+
from sentry import options
5+
from sentry.buffer.base import Buffer
6+
from sentry.utils.services import LazyServiceWrapper
7+
8+
_backend = LazyServiceWrapper(
9+
Buffer, settings.SENTRY_WORKFLOW_BUFFER, settings.SENTRY_WORKFLOW_BUFFER_OPTIONS
10+
)
11+
12+
13+
def validate_new_backend() -> None:
14+
pass
15+
16+
17+
def get_backend() -> LazyServiceWrapper[Buffer]:
18+
if options.get("workflow_engine.buffer.use_new_buffer"):
19+
return _backend
20+
else:
21+
return old_buffer.backend

src/sentry/workflow_engine/processors/delayed_workflow.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
from django.utils import timezone
1212
from pydantic import BaseModel, validator
1313

14-
from sentry import buffer, features, nodestore, options
14+
import sentry.workflow_engine.buffer as buffer
15+
from sentry import features, nodestore, options
1516
from sentry.buffer.base import BufferField
1617
from sentry.db import models
1718
from sentry.eventstore.models import Event, GroupEvent
@@ -310,7 +311,7 @@ def fetch_group_to_event_data(
310311
if batch_key:
311312
field["batch_key"] = batch_key
312313

313-
return buffer.backend.get_hash(model=model, field=field)
314+
return buffer.get_backend().get_hash(model=model, field=field)
314315

315316

316317
def fetch_workflows_envs(
@@ -771,7 +772,7 @@ def cleanup_redis_buffer(
771772
if batch_key:
772773
filters["batch_key"] = batch_key
773774

774-
buffer.backend.delete_hash(model=Workflow, filters=filters, fields=hashes_to_delete)
775+
buffer.get_backend().delete_hash(model=Workflow, filters=filters, fields=hashes_to_delete)
775776

776777

777778
def repr_keys[T, V](d: dict[T, V]) -> dict[str, V]:

src/sentry/workflow_engine/processors/workflow.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,12 @@
88
from django.db.models import Q
99
from django.utils import timezone
1010

11-
from sentry import buffer, features
11+
from sentry import features
1212
from sentry.eventstore.models import GroupEvent
1313
from sentry.models.activity import Activity
1414
from sentry.models.environment import Environment
1515
from sentry.utils import json
16+
from sentry.workflow_engine import buffer
1617
from sentry.workflow_engine.models import Action, DataConditionGroup, Detector, Workflow
1718
from sentry.workflow_engine.models.workflow_data_condition_group import WorkflowDataConditionGroup
1819
from sentry.workflow_engine.processors.action import filter_recently_fired_workflow_actions
@@ -114,8 +115,9 @@ def enqueue_workflows(
114115
sentry_sdk.set_tag("delayed_workflow_items", items)
115116
return
116117

118+
backend = buffer.get_backend()
117119
for project_id, queue_items in items_by_project_id.items():
118-
buffer.backend.push_to_hash_bulk(
120+
backend.push_to_hash_bulk(
119121
model=Workflow,
120122
filters={"project_id": project_id},
121123
data={queue_item.buffer_key(): queue_item.buffer_value() for queue_item in queue_items},
@@ -125,7 +127,7 @@ def enqueue_workflows(
125127

126128
sentry_sdk.set_tag("delayed_workflow_items", items)
127129

128-
buffer.backend.push_to_sorted_set(
130+
backend.push_to_sorted_set(
129131
key=WORKFLOW_ENGINE_BUFFER_LIST_KEY, value=list(items_by_project_id.keys())
130132
)
131133

0 commit comments

Comments
 (0)