Skip to content

Commit 73d312e

Browse files
authored
feat(aci): Separate Buffer for Workflows (#97549)
Buffer reliability is critical and has been proven to be a potential stability risk as we scale up. To allow Workflows infrastructure to fail without impacting the legacy system and to enable the use of newer backends without migrating live load, it's helpful to give Workflows it's own Buffer.
1 parent 826bfb5 commit 73d312e

File tree

17 files changed

+129
-119
lines changed

17 files changed

+129
-119
lines changed

src/sentry/buffer/base.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ class Buffer(Service):
3030
"incr",
3131
"process",
3232
"process_pending",
33-
"process_batch",
3433
"validate",
3534
"push_to_sorted_set",
3635
"push_to_hash",
@@ -148,9 +147,6 @@ def incr(
148147
def process_pending(self) -> None:
149148
return
150149

151-
def process_batch(self) -> None:
152-
return
153-
154150
def process(
155151
self,
156152
model: type[models.Model] | None,

src/sentry/buffer/redis.py

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -57,32 +57,6 @@ def _validate_json_roundtrip(value: dict[str, Any], model: type[models.Model]) -
5757
logger.exception("buffer.invalid_value", extra={"value": value, "model": model})
5858

5959

60-
class BufferHookEvent(Enum):
61-
FLUSH = "flush"
62-
63-
64-
class BufferHookRegistry:
65-
def __init__(self, *args: Any, **kwargs: Any) -> None:
66-
self._registry: dict[BufferHookEvent, Callable[..., Any]] = {}
67-
68-
def add_handler(self, key: BufferHookEvent, func: Callable[..., Any]) -> None:
69-
self._registry[key] = func
70-
71-
def has(self, key: BufferHookEvent) -> bool:
72-
return self._registry.get(key) is not None
73-
74-
def callback(self, buffer_hook_event: BufferHookEvent) -> bool:
75-
try:
76-
callback = self._registry[buffer_hook_event]
77-
except KeyError:
78-
logger.exception("buffer_hook_event.missing")
79-
80-
return callback()
81-
82-
83-
redis_buffer_registry = BufferHookRegistry()
84-
85-
8660
# Callable to get the queue name for the given model_key.
8761
# May return None to not assign a queue for the given model_key.
8862
ChooseQueueFunction = Callable[[str], str | None]
@@ -495,12 +469,6 @@ def get_hash_length(self, model: type[models.Model], field: dict[str, BufferFiel
495469
key = self._make_key(model, field)
496470
return self._execute_redis_operation(key, RedisOperation.HASH_LENGTH)
497471

498-
def process_batch(self) -> None:
499-
try:
500-
redis_buffer_registry.callback(BufferHookEvent.FLUSH)
501-
except Exception:
502-
logger.exception("process_batch.error")
503-
504472
def incr(
505473
self,
506474
model: type[models.Model],

src/sentry/conf/server.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2088,6 +2088,10 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
20882088
SENTRY_BUFFER = "sentry.buffer.Buffer"
20892089
SENTRY_BUFFER_OPTIONS: dict[str, str] = {}
20902090

2091+
# Workflow Buffer backend
2092+
SENTRY_WORKFLOW_BUFFER = "sentry.buffer.Buffer"
2093+
SENTRY_WORKFLOW_BUFFER_OPTIONS: dict[str, str] = {}
2094+
20912095
# Cache backend
20922096
# XXX: We explicitly require the cache to be configured as its not optional
20932097
# and causes serious confusion with the default django cache

src/sentry/options/defaults.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3154,6 +3154,12 @@
31543154
default=[],
31553155
flags=FLAG_AUTOMATOR_MODIFIABLE,
31563156
)
3157+
register(
3158+
"workflow_engine.buffer.use_new_buffer",
3159+
type=Bool,
3160+
default=False,
3161+
flags=FLAG_AUTOMATOR_MODIFIABLE,
3162+
)
31573163

31583164
# Restrict uptime issue creation for specific host provider identifiers. Items
31593165
# in this list map to the `host_provider_id` column in the UptimeSubscription

src/sentry/rules/processing/buffer_processing.py

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@
99

1010
from celery import Task
1111

12-
from sentry import buffer, options
13-
from sentry.buffer.base import BufferField
14-
from sentry.buffer.redis import BufferHookEvent, redis_buffer_registry
12+
from sentry import options
13+
from sentry.buffer.base import Buffer, BufferField
1514
from sentry.db import models
1615
from sentry.utils import metrics
16+
from sentry.utils.lazy_service_wrapper import LazyServiceWrapper
1717
from sentry.utils.registry import NoRegistrationExistsError, Registry
1818

1919
logger = logging.getLogger("sentry.delayed_processing")
@@ -56,12 +56,19 @@ def get_buffer_keys(cls) -> list[str]:
5656
for shard in range(cls.buffer_shards)
5757
]
5858

59+
@staticmethod
60+
def buffer_backend() -> LazyServiceWrapper[Buffer]:
61+
raise NotImplementedError
62+
5963

6064
delayed_processing_registry = Registry[type[DelayedProcessingBase]]()
6165

6266

6367
def fetch_group_to_event_data(
64-
project_id: int, model: type[models.Model], batch_key: str | None = None
68+
buffer: LazyServiceWrapper[Buffer],
69+
project_id: int,
70+
model: type[models.Model],
71+
batch_key: str | None = None,
6572
) -> dict[str, str]:
6673
field: dict[str, models.Model | int | str] = {
6774
"project_id": project_id,
@@ -70,7 +77,7 @@ def fetch_group_to_event_data(
7077
if batch_key:
7178
field["batch_key"] = batch_key
7279

73-
return buffer.backend.get_hash(model=model, field=field)
80+
return buffer.get_hash(model=model, field=field)
7481

7582

7683
def bucket_num_groups(num_groups: int) -> str:
@@ -80,7 +87,9 @@ def bucket_num_groups(num_groups: int) -> str:
8087
return "1"
8188

8289

83-
def process_in_batches(project_id: int, processing_type: str) -> None:
90+
def process_in_batches(
91+
buffer: LazyServiceWrapper[Buffer], project_id: int, processing_type: str
92+
) -> None:
8493
"""
8594
This will check the number of alertgroup_to_event_data items in the Redis buffer for a project.
8695
@@ -109,7 +118,7 @@ def process_in_batches(project_id: int, processing_type: str) -> None:
109118
task = processing_info.processing_task
110119
filters: dict[str, BufferField] = asdict(hash_args.filters)
111120

112-
event_count = buffer.backend.get_hash_length(model=hash_args.model, field=filters)
121+
event_count = buffer.get_hash_length(model=hash_args.model, field=filters)
113122
metrics.incr(
114123
f"{processing_type}.num_groups", tags={"num_groups": bucket_num_groups(event_count)}
115124
)
@@ -127,22 +136,22 @@ def process_in_batches(project_id: int, processing_type: str) -> None:
127136
)
128137

129138
# if the dictionary is large, get the items and chunk them.
130-
alertgroup_to_event_data = fetch_group_to_event_data(project_id, hash_args.model)
139+
alertgroup_to_event_data = fetch_group_to_event_data(buffer, project_id, hash_args.model)
131140

132141
with metrics.timer(f"{processing_type}.process_batch.duration"):
133142
items = iter(alertgroup_to_event_data.items())
134143

135144
while batch := dict(islice(items, batch_size)):
136145
batch_key = str(uuid.uuid4())
137146

138-
buffer.backend.push_to_hash_bulk(
147+
buffer.push_to_hash_bulk(
139148
model=hash_args.model,
140149
filters={**filters, "batch_key": batch_key},
141150
data=batch,
142151
)
143152

144153
# remove the batched items from the project alertgroup_to_event_data
145-
buffer.backend.delete_hash(**asdict(hash_args), fields=list(batch.keys()))
154+
buffer.delete_hash(**asdict(hash_args), fields=list(batch.keys()))
146155

147156
task.apply_async(
148157
kwargs={"project_id": project_id, "batch_key": batch_key},
@@ -159,6 +168,8 @@ def process_buffer() -> None:
159168
logger.info(log_name, extra={"option": handler.option})
160169
continue
161170

171+
buffer = handler.buffer_backend()
172+
162173
with metrics.timer(f"{processing_type}.process_all_conditions.duration"):
163174
# We need to use a very fresh timestamp here; project scores (timestamps) are
164175
# updated with each relevant event, and some can be updated every few milliseconds.
@@ -167,7 +178,7 @@ def process_buffer() -> None:
167178
# retrieved and processed here.
168179
fetch_time = datetime.now(tz=timezone.utc).timestamp()
169180
buffer_keys = handler.get_buffer_keys()
170-
all_project_ids_and_timestamps = buffer.backend.bulk_get_sorted_set(
181+
all_project_ids_and_timestamps = buffer.bulk_get_sorted_set(
171182
buffer_keys,
172183
min=0,
173184
max=fetch_time,
@@ -183,14 +194,10 @@ def process_buffer() -> None:
183194

184195
project_ids = list(all_project_ids_and_timestamps.keys())
185196
for project_id in project_ids:
186-
process_in_batches(project_id, processing_type)
197+
process_in_batches(buffer, project_id, processing_type)
187198

188-
buffer.backend.delete_keys(
199+
buffer.delete_keys(
189200
buffer_keys,
190201
min=0,
191202
max=fetch_time,
192203
)
193-
194-
195-
if not redis_buffer_registry.has(BufferHookEvent.FLUSH):
196-
redis_buffer_registry.add_handler(BufferHookEvent.FLUSH, process_buffer)

src/sentry/rules/processing/delayed_processing.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from django.db.models import OuterRef, Subquery
1313

1414
from sentry import buffer, features, nodestore
15-
from sentry.buffer.base import BufferField
15+
from sentry.buffer.base import Buffer, BufferField
1616
from sentry.db import models
1717
from sentry.issues.issue_occurrence import IssueOccurrence
1818
from sentry.models.group import Group
@@ -52,6 +52,7 @@
5252
from sentry.utils import json, metrics
5353
from sentry.utils.dates import ensure_aware
5454
from sentry.utils.iterators import chunked
55+
from sentry.utils.lazy_service_wrapper import LazyServiceWrapper
5556
from sentry.utils.retries import ConditionalRetryPolicy, exponential_delay
5657
from sentry.utils.safe import safe_execute
5758
from sentry.workflow_engine.processors.log_util import track_batch_performance
@@ -787,3 +788,7 @@ def hash_args(self) -> BufferHashKeys:
787788
@property
788789
def processing_task(self) -> Task:
789790
return apply_delayed
791+
792+
@staticmethod
793+
def buffer_backend() -> LazyServiceWrapper[Buffer]:
794+
return buffer.backend

src/sentry/tasks/process_buffer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,13 @@ def process_pending_batch() -> None:
5252
"""
5353
Process pending buffers in a batch.
5454
"""
55-
from sentry import buffer
55+
from sentry.rules.processing.buffer_processing import process_buffer
5656

5757
lock = get_process_lock("process_pending_batch")
5858

5959
try:
6060
with lock.acquire():
61-
buffer.backend.process_batch()
61+
process_buffer()
6262
except UnableToAcquireLock as error:
6363
logger.warning("process_pending_batch.fail", extra={"error": error})
6464

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from django.conf import settings
2+
3+
import sentry.buffer as old_buffer
4+
from sentry import options
5+
from sentry.buffer.base import Buffer
6+
from sentry.utils.services import LazyServiceWrapper
7+
8+
# Workflows-specific Buffer that can be configured separately from the default Buffer.
9+
_backend = LazyServiceWrapper(
10+
Buffer, settings.SENTRY_WORKFLOW_BUFFER, settings.SENTRY_WORKFLOW_BUFFER_OPTIONS
11+
)
12+
13+
14+
def get_backend() -> LazyServiceWrapper[Buffer]:
15+
"""
16+
Retrieve the appropriate Buffer to use for the workflow engine.
17+
"""
18+
if options.get("workflow_engine.buffer.use_new_buffer"):
19+
return _backend
20+
else:
21+
return old_buffer.backend

src/sentry/workflow_engine/processors/delayed_workflow.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
from django.utils import timezone
1212
from pydantic import BaseModel, validator
1313

14-
from sentry import buffer, features, nodestore, options
14+
import sentry.workflow_engine.buffer as buffer
15+
from sentry import features, nodestore, options
1516
from sentry.buffer.base import BufferField
1617
from sentry.db import models
1718
from sentry.issues.issue_occurrence import IssueOccurrence
@@ -312,7 +313,7 @@ def fetch_group_to_event_data(
312313
if batch_key:
313314
field["batch_key"] = batch_key
314315

315-
return buffer.backend.get_hash(model=model, field=field)
316+
return buffer.get_backend().get_hash(model=model, field=field)
316317

317318

318319
def fetch_workflows_envs(
@@ -765,7 +766,7 @@ def cleanup_redis_buffer(
765766
if batch_key:
766767
filters["batch_key"] = batch_key
767768

768-
buffer.backend.delete_hash(model=Workflow, filters=filters, fields=hashes_to_delete)
769+
buffer.get_backend().delete_hash(model=Workflow, filters=filters, fields=hashes_to_delete)
769770

770771

771772
def repr_keys[T, V](d: dict[T, V]) -> dict[str, V]:

src/sentry/workflow_engine/processors/workflow.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@
99
from django.db.models import Q
1010
from django.utils import timezone
1111

12-
from sentry import buffer, features
12+
from sentry import features
1313
from sentry.models.activity import Activity
1414
from sentry.models.environment import Environment
1515
from sentry.services.eventstore.models import GroupEvent
1616
from sentry.utils import json
17+
from sentry.workflow_engine import buffer
1718
from sentry.workflow_engine.models import Action, DataConditionGroup, Detector, Workflow
1819
from sentry.workflow_engine.models.workflow_data_condition_group import WorkflowDataConditionGroup
1920
from sentry.workflow_engine.processors.action import (
@@ -118,8 +119,9 @@ def enqueue_workflows(
118119
sentry_sdk.set_tag("delayed_workflow_items", items)
119120
return
120121

122+
backend = buffer.get_backend()
121123
for project_id, queue_items in items_by_project_id.items():
122-
buffer.backend.push_to_hash_bulk(
124+
backend.push_to_hash_bulk(
123125
model=Workflow,
124126
filters={"project_id": project_id},
125127
data={queue_item.buffer_key(): queue_item.buffer_value() for queue_item in queue_items},
@@ -130,7 +132,7 @@ def enqueue_workflows(
130132
sentry_sdk.set_tag("delayed_workflow_items", items)
131133

132134
sharded_key = random.choice(DelayedWorkflow.get_buffer_keys())
133-
buffer.backend.push_to_sorted_set(key=sharded_key, value=list(items_by_project_id.keys()))
135+
backend.push_to_sorted_set(key=sharded_key, value=list(items_by_project_id.keys()))
134136

135137
logger.debug(
136138
"workflow_engine.workflows.enqueued",

0 commit comments

Comments
 (0)