Skip to content

feat(aci): Separate Buffer for Workflows #97549

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/sentry/buffer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ class Buffer(Service):
"incr",
"process",
"process_pending",
"process_batch",
"validate",
"push_to_sorted_set",
"push_to_hash",
Expand Down Expand Up @@ -148,9 +147,6 @@ def incr(
def process_pending(self) -> None:
return

def process_batch(self) -> None:
return

def process(
self,
model: type[models.Model] | None,
Expand Down
32 changes: 0 additions & 32 deletions src/sentry/buffer/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,32 +57,6 @@ def _validate_json_roundtrip(value: dict[str, Any], model: type[models.Model]) -
logger.exception("buffer.invalid_value", extra={"value": value, "model": model})


class BufferHookEvent(Enum):
FLUSH = "flush"


class BufferHookRegistry:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove this yet? i'm mostly concerned about the legacy delayed_processor code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be okay, though it is hard to reason about.
The task now called process_buffer directly, and process buffer uses the registry to pick the right backend for the right delayed processor.
Though, I didn't think too hard about what the registry was trying to accomplish, so I may be missing something. 😬

def __init__(self, *args: Any, **kwargs: Any) -> None:
self._registry: dict[BufferHookEvent, Callable[..., Any]] = {}

def add_handler(self, key: BufferHookEvent, func: Callable[..., Any]) -> None:
self._registry[key] = func

def has(self, key: BufferHookEvent) -> bool:
return self._registry.get(key) is not None

def callback(self, buffer_hook_event: BufferHookEvent) -> bool:
try:
callback = self._registry[buffer_hook_event]
except KeyError:
logger.exception("buffer_hook_event.missing")

return callback()


redis_buffer_registry = BufferHookRegistry()


# Callable to get the queue name for the given model_key.
# May return None to not assign a queue for the given model_key.
ChooseQueueFunction = Callable[[str], str | None]
Expand Down Expand Up @@ -495,12 +469,6 @@ def get_hash_length(self, model: type[models.Model], field: dict[str, BufferFiel
key = self._make_key(model, field)
return self._execute_redis_operation(key, RedisOperation.HASH_LENGTH)

def process_batch(self) -> None:
try:
redis_buffer_registry.callback(BufferHookEvent.FLUSH)
except Exception:
logger.exception("process_batch.error")

def incr(
self,
model: type[models.Model],
Expand Down
4 changes: 4 additions & 0 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2088,6 +2088,10 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
SENTRY_BUFFER = "sentry.buffer.Buffer"
SENTRY_BUFFER_OPTIONS: dict[str, str] = {}

# Workflow Buffer backend
SENTRY_WORKFLOW_BUFFER = "sentry.buffer.Buffer"
SENTRY_WORKFLOW_BUFFER_OPTIONS: dict[str, str] = {}

# Cache backend
# XXX: We explicitly require the cache to be configured as its not optional
# and causes serious confusion with the default django cache
Expand Down
6 changes: 6 additions & 0 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -3154,6 +3154,12 @@
default=[],
flags=FLAG_AUTOMATOR_MODIFIABLE,
)
register(
"workflow_engine.buffer.use_new_buffer",
type=Bool,
default=False,
flags=FLAG_AUTOMATOR_MODIFIABLE,
)

# Restrict uptime issue creation for specific host provider identifiers. Items
# in this list map to the `host_provider_id` column in the UptimeSubscription
Expand Down
41 changes: 24 additions & 17 deletions src/sentry/rules/processing/buffer_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@

from celery import Task

from sentry import buffer, options
from sentry.buffer.base import BufferField
from sentry.buffer.redis import BufferHookEvent, redis_buffer_registry
from sentry import options
from sentry.buffer.base import Buffer, BufferField
from sentry.db import models
from sentry.utils import metrics
from sentry.utils.lazy_service_wrapper import LazyServiceWrapper
from sentry.utils.registry import NoRegistrationExistsError, Registry

logger = logging.getLogger("sentry.delayed_processing")
Expand Down Expand Up @@ -56,12 +56,19 @@ def get_buffer_keys(cls) -> list[str]:
for shard in range(cls.buffer_shards)
]

@staticmethod
def buffer_backend() -> LazyServiceWrapper[Buffer]:
raise NotImplementedError


delayed_processing_registry = Registry[type[DelayedProcessingBase]]()


def fetch_group_to_event_data(
project_id: int, model: type[models.Model], batch_key: str | None = None
buffer: LazyServiceWrapper[Buffer],
project_id: int,
model: type[models.Model],
batch_key: str | None = None,
) -> dict[str, str]:
field: dict[str, models.Model | int | str] = {
"project_id": project_id,
Expand All @@ -70,7 +77,7 @@ def fetch_group_to_event_data(
if batch_key:
field["batch_key"] = batch_key

return buffer.backend.get_hash(model=model, field=field)
return buffer.get_hash(model=model, field=field)


def bucket_num_groups(num_groups: int) -> str:
Expand All @@ -80,7 +87,9 @@ def bucket_num_groups(num_groups: int) -> str:
return "1"


def process_in_batches(project_id: int, processing_type: str) -> None:
def process_in_batches(
buffer: LazyServiceWrapper[Buffer], project_id: int, processing_type: str
) -> None:
"""
This will check the number of alertgroup_to_event_data items in the Redis buffer for a project.
Expand Down Expand Up @@ -109,7 +118,7 @@ def process_in_batches(project_id: int, processing_type: str) -> None:
task = processing_info.processing_task
filters: dict[str, BufferField] = asdict(hash_args.filters)

event_count = buffer.backend.get_hash_length(model=hash_args.model, field=filters)
event_count = buffer.get_hash_length(model=hash_args.model, field=filters)
metrics.incr(
f"{processing_type}.num_groups", tags={"num_groups": bucket_num_groups(event_count)}
)
Expand All @@ -127,22 +136,22 @@ def process_in_batches(project_id: int, processing_type: str) -> None:
)

# if the dictionary is large, get the items and chunk them.
alertgroup_to_event_data = fetch_group_to_event_data(project_id, hash_args.model)
alertgroup_to_event_data = fetch_group_to_event_data(buffer, project_id, hash_args.model)

with metrics.timer(f"{processing_type}.process_batch.duration"):
items = iter(alertgroup_to_event_data.items())

while batch := dict(islice(items, batch_size)):
batch_key = str(uuid.uuid4())

buffer.backend.push_to_hash_bulk(
buffer.push_to_hash_bulk(
model=hash_args.model,
filters={**filters, "batch_key": batch_key},
data=batch,
)

# remove the batched items from the project alertgroup_to_event_data
buffer.backend.delete_hash(**asdict(hash_args), fields=list(batch.keys()))
buffer.delete_hash(**asdict(hash_args), fields=list(batch.keys()))

task.apply_async(
kwargs={"project_id": project_id, "batch_key": batch_key},
Expand All @@ -159,6 +168,8 @@ def process_buffer() -> None:
logger.info(log_name, extra={"option": handler.option})
continue

buffer = handler.buffer_backend()

with metrics.timer(f"{processing_type}.process_all_conditions.duration"):
# We need to use a very fresh timestamp here; project scores (timestamps) are
# updated with each relevant event, and some can be updated every few milliseconds.
Expand All @@ -167,7 +178,7 @@ def process_buffer() -> None:
# retrieved and processed here.
fetch_time = datetime.now(tz=timezone.utc).timestamp()
buffer_keys = handler.get_buffer_keys()
all_project_ids_and_timestamps = buffer.backend.bulk_get_sorted_set(
all_project_ids_and_timestamps = buffer.bulk_get_sorted_set(
buffer_keys,
min=0,
max=fetch_time,
Expand All @@ -183,14 +194,10 @@ def process_buffer() -> None:

project_ids = list(all_project_ids_and_timestamps.keys())
for project_id in project_ids:
process_in_batches(project_id, processing_type)
process_in_batches(buffer, project_id, processing_type)

buffer.backend.delete_keys(
buffer.delete_keys(
buffer_keys,
min=0,
max=fetch_time,
)


if not redis_buffer_registry.has(BufferHookEvent.FLUSH):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉 - i never liked this code. just curious, if we want to add another handler when the buffer is flushed, would we have to manually add it to tasks/process_buffer.py::process_pending_batch? should we have a registry hook there or is this uncommon enough that we can be explicit about it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we've made it so there isn't just One buffer anymore, and the dispatching to the appropriate buffer is now handled by the config in delayed_processing_registry.
For the delayed processing-style batch scheduling, this seem sufficiently extensible (presumably, in some month it'll only be dispatching to workflows, and we may want to split "Buffer" as our method set is fairly independent and can pretty much work with a redis client), so I'm not too concerned with generalizing the pattern.

redis_buffer_registry.add_handler(BufferHookEvent.FLUSH, process_buffer)
7 changes: 6 additions & 1 deletion src/sentry/rules/processing/delayed_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from django.db.models import OuterRef, Subquery

from sentry import buffer, features, nodestore
from sentry.buffer.base import BufferField
from sentry.buffer.base import Buffer, BufferField
from sentry.db import models
from sentry.issues.issue_occurrence import IssueOccurrence
from sentry.models.group import Group
Expand Down Expand Up @@ -52,6 +52,7 @@
from sentry.utils import json, metrics
from sentry.utils.dates import ensure_aware
from sentry.utils.iterators import chunked
from sentry.utils.lazy_service_wrapper import LazyServiceWrapper
from sentry.utils.retries import ConditionalRetryPolicy, exponential_delay
from sentry.utils.safe import safe_execute
from sentry.workflow_engine.processors.log_util import track_batch_performance
Expand Down Expand Up @@ -787,3 +788,7 @@ def hash_args(self) -> BufferHashKeys:
@property
def processing_task(self) -> Task:
return apply_delayed

@staticmethod
def buffer_backend() -> LazyServiceWrapper[Buffer]:
return buffer.backend
4 changes: 2 additions & 2 deletions src/sentry/tasks/process_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ def process_pending_batch() -> None:
"""
Process pending buffers in a batch.
"""
from sentry import buffer
from sentry.rules.processing.buffer_processing import process_buffer

lock = get_process_lock("process_pending_batch")

try:
with lock.acquire():
buffer.backend.process_batch()
process_buffer()
except UnableToAcquireLock as error:
logger.warning("process_pending_batch.fail", extra={"error": error})

Expand Down
21 changes: 21 additions & 0 deletions src/sentry/workflow_engine/buffer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from django.conf import settings

import sentry.buffer as old_buffer
from sentry import options
from sentry.buffer.base import Buffer
from sentry.utils.services import LazyServiceWrapper

# Workflows-specific Buffer that can be configured separately from the default Buffer.
_backend = LazyServiceWrapper(
Buffer, settings.SENTRY_WORKFLOW_BUFFER, settings.SENTRY_WORKFLOW_BUFFER_OPTIONS
)


def get_backend() -> LazyServiceWrapper[Buffer]:
"""
Retrieve the appropriate Buffer to use for the workflow engine.
"""
if options.get("workflow_engine.buffer.use_new_buffer"):
return _backend
else:
return old_buffer.backend
7 changes: 4 additions & 3 deletions src/sentry/workflow_engine/processors/delayed_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
from django.utils import timezone
from pydantic import BaseModel, validator

from sentry import buffer, features, nodestore, options
import sentry.workflow_engine.buffer as buffer
from sentry import features, nodestore, options
from sentry.buffer.base import BufferField
from sentry.db import models
from sentry.issues.issue_occurrence import IssueOccurrence
Expand Down Expand Up @@ -312,7 +313,7 @@ def fetch_group_to_event_data(
if batch_key:
field["batch_key"] = batch_key

return buffer.backend.get_hash(model=model, field=field)
return buffer.get_backend().get_hash(model=model, field=field)


def fetch_workflows_envs(
Expand Down Expand Up @@ -765,7 +766,7 @@ def cleanup_redis_buffer(
if batch_key:
filters["batch_key"] = batch_key

buffer.backend.delete_hash(model=Workflow, filters=filters, fields=hashes_to_delete)
buffer.get_backend().delete_hash(model=Workflow, filters=filters, fields=hashes_to_delete)


def repr_keys[T, V](d: dict[T, V]) -> dict[str, V]:
Expand Down
8 changes: 5 additions & 3 deletions src/sentry/workflow_engine/processors/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@
from django.db.models import Q
from django.utils import timezone

from sentry import buffer, features
from sentry import features
from sentry.models.activity import Activity
from sentry.models.environment import Environment
from sentry.services.eventstore.models import GroupEvent
from sentry.utils import json
from sentry.workflow_engine import buffer
from sentry.workflow_engine.models import Action, DataConditionGroup, Detector, Workflow
from sentry.workflow_engine.models.workflow_data_condition_group import WorkflowDataConditionGroup
from sentry.workflow_engine.processors.action import (
Expand Down Expand Up @@ -118,8 +119,9 @@ def enqueue_workflows(
sentry_sdk.set_tag("delayed_workflow_items", items)
return

backend = buffer.get_backend()
for project_id, queue_items in items_by_project_id.items():
buffer.backend.push_to_hash_bulk(
backend.push_to_hash_bulk(
model=Workflow,
filters={"project_id": project_id},
data={queue_item.buffer_key(): queue_item.buffer_value() for queue_item in queue_items},
Expand All @@ -130,7 +132,7 @@ def enqueue_workflows(
sentry_sdk.set_tag("delayed_workflow_items", items)

sharded_key = random.choice(DelayedWorkflow.get_buffer_keys())
buffer.backend.push_to_sorted_set(key=sharded_key, value=list(items_by_project_id.keys()))
backend.push_to_sorted_set(key=sharded_key, value=list(items_by_project_id.keys()))

logger.debug(
"workflow_engine.workflows.enqueued",
Expand Down
7 changes: 7 additions & 0 deletions src/sentry/workflow_engine/tasks/delayed_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

from celery import Task

import sentry.workflow_engine.buffer as buffer
from sentry import options
from sentry.buffer.base import Buffer
from sentry.rules.processing.buffer_processing import (
BufferHashKeys,
DelayedProcessingBase,
Expand All @@ -16,6 +18,7 @@
from sentry.taskworker.config import TaskworkerConfig
from sentry.taskworker.namespaces import workflow_engine_tasks
from sentry.taskworker.retry import Retry
from sentry.utils.lazy_service_wrapper import LazyServiceWrapper
from sentry.workflow_engine.models import Workflow
from sentry.workflow_engine.utils import log_context

Expand Down Expand Up @@ -66,3 +69,7 @@ def processing_task(self) -> Task:
if options.get("delayed_workflow.use_workflow_engine_pool"):
return process_delayed_workflows_shim
return process_delayed_workflows

@staticmethod
def buffer_backend() -> LazyServiceWrapper[Buffer]:
return buffer.get_backend()
Loading
Loading