Skip to content

Commit 08d037a

Browse files
authored
aci(feat): Use propagated timestamp to track triggering latency (#97612)
Create a timestamp before process_workflows_event and use it after alert triggering to report latency.
1 parent ddced12 commit 08d037a

File tree

14 files changed

+214
-120
lines changed

14 files changed

+214
-120
lines changed

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -644,9 +644,11 @@ module = [
644644
"sentry.web.frontend.cli",
645645
"sentry.web.frontend.csv",
646646
"sentry.web.frontend.mixins.*",
647+
"sentry.workflow_engine.buffer.*",
647648
"sentry.workflow_engine.handlers.condition.*",
648649
"sentry.workflow_engine.migrations.*",
649650
"sentry.workflow_engine.processors.*",
651+
"sentry.workflow_engine.tasks.*",
650652
"sentry.workflow_engine.typings.*",
651653
"sentry.workflow_engine.utils.*",
652654
"sentry_plugins.base",

src/sentry/rules/processing/delayed_processing.py

Lines changed: 42 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from collections import defaultdict
55
from dataclasses import dataclass
66
from datetime import datetime, timedelta, timezone
7-
from typing import Any, DefaultDict, NamedTuple
7+
from typing import Any, DefaultDict, NamedTuple, NotRequired, TypedDict
88

99
import sentry_sdk
1010
from celery import Task
@@ -50,7 +50,6 @@
5050
from sentry.taskworker.namespaces import issues_tasks
5151
from sentry.taskworker.retry import Retry
5252
from sentry.utils import json, metrics
53-
from sentry.utils.dates import ensure_aware
5453
from sentry.utils.iterators import chunked
5554
from sentry.utils.lazy_service_wrapper import LazyServiceWrapper
5655
from sentry.utils.retries import ConditionalRetryPolicy, exponential_delay
@@ -243,25 +242,48 @@ def bulk_fetch_events(event_ids: list[str], project_id: int) -> dict[str, Event]
243242
}
244243

245244

245+
class EventData(TypedDict):
246+
event_id: str
247+
occurrence_id: NotRequired[str | None]
248+
start_timestamp: NotRequired[datetime | None]
249+
250+
246251
def parse_rulegroup_to_event_data(
247252
rulegroup_to_event_data: dict[str, str],
248-
) -> dict[tuple[int, int], dict[str, str]]:
253+
) -> dict[tuple[int, int], EventData]:
249254
parsed_rulegroup_to_event_data = {}
250255
for rule_group, instance_data in rulegroup_to_event_data.items():
251256
event_data = json.loads(instance_data)
257+
if ts_string := event_data.get("start_timestamp"):
258+
try:
259+
# Handle ISO format with timezone info
260+
event_data["start_timestamp"] = datetime.fromisoformat(ts_string)
261+
except (ValueError, TypeError):
262+
try:
263+
# Fallback to manual parsing if needed
264+
event_data["start_timestamp"] = datetime.strptime(
265+
ts_string, "%Y-%m-%dT%H:%M:%S.%fZ"
266+
).replace(tzinfo=timezone.utc)
267+
except ValueError:
268+
logger.exception(
269+
"delayed_processing.invalid_start_timestamp",
270+
extra={"rule_group": rule_group, "start_timestamp": ts_string},
271+
)
272+
del event_data["start_timestamp"]
273+
252274
rule_id, group_id = rule_group.split(":")
253275
parsed_rulegroup_to_event_data[(int(rule_id), int(group_id))] = event_data
254276
return parsed_rulegroup_to_event_data
255277

256278

257279
def build_group_to_groupevent(
258280
log_config: LogConfig,
259-
parsed_rulegroup_to_event_data: dict[tuple[int, int], dict[str, str]],
281+
parsed_rulegroup_to_event_data: dict[tuple[int, int], EventData],
260282
bulk_event_id_to_events: dict[str, Event],
261283
bulk_occurrence_id_to_occurrence: dict[str, IssueOccurrence],
262284
group_id_to_group: dict[int, Group],
263285
project_id: int,
264-
) -> dict[Group, GroupEvent]:
286+
) -> dict[Group, tuple[GroupEvent, datetime | None]]:
265287

266288
project = fetch_project(project_id)
267289
if project:
@@ -278,11 +300,12 @@ def build_group_to_groupevent(
278300
"project_id": project_id,
279301
},
280302
)
281-
group_to_groupevent = {}
303+
group_to_groupevent: dict[Group, tuple[GroupEvent, datetime | None]] = {}
282304

283305
for rule_group, instance_data in parsed_rulegroup_to_event_data.items():
284306
event_id = instance_data.get("event_id")
285307
occurrence_id = instance_data.get("occurrence_id")
308+
start_timestamp = instance_data.get("start_timestamp")
286309

287310
if event_id is None:
288311
logger.info(
@@ -312,16 +335,16 @@ def build_group_to_groupevent(
312335
group_event = event.for_group(group)
313336
if occurrence_id:
314337
group_event.occurrence = bulk_occurrence_id_to_occurrence.get(occurrence_id)
315-
group_to_groupevent[group] = group_event
338+
group_to_groupevent[group] = (group_event, start_timestamp)
316339
return group_to_groupevent
317340

318341

319342
def get_group_to_groupevent(
320343
log_config: LogConfig,
321-
parsed_rulegroup_to_event_data: dict[tuple[int, int], dict[str, str]],
344+
parsed_rulegroup_to_event_data: dict[tuple[int, int], EventData],
322345
project_id: int,
323346
group_ids: set[int],
324-
) -> dict[Group, GroupEvent]:
347+
) -> dict[Group, tuple[GroupEvent, datetime | None]]:
325348
groups = Group.objects.filter(id__in=group_ids)
326349
group_id_to_group = {group.id: group for group in groups}
327350

@@ -491,7 +514,7 @@ def get_rules_to_fire(
491514
def fire_rules(
492515
log_config: LogConfig,
493516
rules_to_fire: DefaultDict[Rule, set[int]],
494-
parsed_rulegroup_to_event_data: dict[tuple[int, int], dict[str, str]],
517+
parsed_rulegroup_to_event_data: dict[tuple[int, int], EventData],
495518
alert_rules: list[Rule],
496519
project: Project,
497520
) -> None:
@@ -511,7 +534,8 @@ def fire_rules(
511534
)
512535
if log_config.num_events_issue_debugging or log_config.workflow_engine_process_workflows:
513536
serialized_groups = {
514-
group.id: group_event.event_id for group, group_event in group_to_groupevent.items()
537+
group.id: group_event.event_id
538+
for group, (group_event, _) in group_to_groupevent.items()
515539
}
516540
logger.info(
517541
"delayed_processing.group_to_groupevent",
@@ -574,14 +598,13 @@ def fire_rules(
574598
continue
575599

576600
notification_uuid = str(uuid.uuid4())
577-
groupevent = group_to_groupevent[group]
578-
metrics.timing(
579-
"rule_fire_history.latency",
580-
(
581-
datetime.now(tz=timezone.utc) - ensure_aware(groupevent.datetime)
582-
).total_seconds(),
583-
tags={"delayed": True, "group_type": group.issue_type.slug},
584-
)
601+
groupevent, start_timestamp = group_to_groupevent[group]
602+
if start_timestamp:
603+
metrics.timing(
604+
"rule_fire_history.latency",
605+
(datetime.now(tz=timezone.utc) - start_timestamp).total_seconds(),
606+
tags={"delayed": True, "group_type": group.issue_type.slug},
607+
)
585608
rule_fire_history = history.record(
586609
rule, group, groupevent.event_id, notification_uuid
587610
)

src/sentry/rules/processing/processor.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import random
55
import uuid
66
from collections.abc import Callable, Collection, Mapping, MutableMapping, Sequence
7-
from datetime import timedelta
7+
from datetime import datetime, timedelta
88
from random import randrange
99
from typing import Any
1010

@@ -28,7 +28,6 @@
2828
from sentry.services.eventstore.models import GroupEvent
2929
from sentry.types.rules import RuleFuture
3030
from sentry.utils import json, metrics
31-
from sentry.utils.dates import ensure_aware
3231
from sentry.utils.hashlib import hash_values
3332
from sentry.utils.safe import safe_execute
3433

@@ -213,10 +212,12 @@ def __init__(
213212
is_new_group_environment: bool,
214213
has_reappeared: bool,
215214
has_escalated: bool = False,
215+
start_timestamp: datetime | None = None,
216216
) -> None:
217217
self.event = event
218218
self.group = event.group
219219
self.project = event.project
220+
self.start_timestamp = start_timestamp or timezone.now()
220221

221222
self.is_new = is_new
222223
self.is_regression = is_regression
@@ -282,7 +283,11 @@ def enqueue_rule(self, rule: Rule) -> None:
282283
buffer.backend.push_to_sorted_set(PROJECT_ID_BUFFER_LIST_KEY, rule.project.id)
283284

284285
value = json.dumps(
285-
{"event_id": self.event.event_id, "occurrence_id": self.event.occurrence_id}
286+
{
287+
"event_id": self.event.event_id,
288+
"occurrence_id": self.event.occurrence_id,
289+
"start_timestamp": self.start_timestamp,
290+
}
286291
)
287292
buffer.backend.push_to_hash(
288293
model=Project,
@@ -417,7 +422,7 @@ def apply_rule(self, rule: Rule, status: GroupRuleStatus) -> None:
417422
notification_uuid = str(uuid.uuid4())
418423
metrics.timing(
419424
"rule_fire_history.latency",
420-
(timezone.now() - ensure_aware(self.event.datetime)).total_seconds(),
425+
(timezone.now() - self.start_timestamp).total_seconds(),
421426
tags={"delayed": False, "group_type": self.group.issue_type.slug},
422427
)
423428
rule_fire_history = history.record(rule, self.group, self.event.event_id, notification_uuid)

src/sentry/tasks/post_process.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,6 +1012,7 @@ def process_workflow_engine(job: PostProcessJob) -> None:
10121012
group_state=job["group_state"],
10131013
has_reappeared=job["has_reappeared"],
10141014
has_escalated=job["has_escalated"],
1015+
start_timestamp_seconds=time(),
10151016
),
10161017
headers={"sentry-propagate-traces": False},
10171018
)

src/sentry/workflow_engine/processors/delayed_workflow.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -619,7 +619,7 @@ def get_group_to_groupevent(
619619
event_data: EventRedisData,
620620
groups_to_dcgs: dict[GroupId, set[DataConditionGroup]],
621621
project: Project,
622-
) -> dict[Group, GroupEvent]:
622+
) -> dict[Group, tuple[GroupEvent, datetime | None]]:
623623
groups = Group.objects.filter(id__in=event_data.group_ids)
624624
group_id_to_group = {group.id: group for group in groups}
625625

@@ -638,7 +638,7 @@ def get_group_to_groupevent(
638638
group_id: {dcg.id for dcg in dcgs} for group_id, dcgs in groups_to_dcgs.items()
639639
}
640640

641-
group_to_groupevent: dict[Group, GroupEvent] = {}
641+
group_to_groupevent: dict[Group, tuple[GroupEvent, datetime | None]] = {}
642642
for key, instance in event_data.events.items():
643643
if key.dcg_ids.intersection(groups_to_dcg_ids.get(key.group_id, set())):
644644
event = bulk_event_id_to_events.get(instance.event_id)
@@ -652,7 +652,7 @@ def get_group_to_groupevent(
652652
group_event.occurrence = bulk_occurrence_id_to_occurrence.get(
653653
instance.occurrence_id
654654
)
655-
group_to_groupevent[group] = group_event
655+
group_to_groupevent[group] = (group_event, instance.timestamp)
656656

657657
return group_to_groupevent
658658

@@ -661,10 +661,10 @@ def get_group_to_groupevent(
661661
def fire_actions_for_groups(
662662
organization: Organization,
663663
groups_to_fire: dict[GroupId, set[DataConditionGroup]],
664-
group_to_groupevent: dict[Group, GroupEvent],
664+
group_to_groupevent: dict[Group, tuple[GroupEvent, datetime | None]],
665665
) -> None:
666666
serialized_groups = {
667-
group.id: group_event.event_id for group, group_event in group_to_groupevent.items()
667+
group.id: group_event.event_id for group, (group_event, _) in group_to_groupevent.items()
668668
}
669669
logger.info(
670670
"workflow_engine.delayed_workflow.fire_actions_for_groups",
@@ -675,7 +675,9 @@ def fire_actions_for_groups(
675675
)
676676

677677
# Bulk fetch detectors
678-
event_id_to_detector = get_detectors_by_groupevents_bulk(list(group_to_groupevent.values()))
678+
event_id_to_detector = get_detectors_by_groupevents_bulk(
679+
[group_event for group_event, _ in group_to_groupevent.values()]
680+
)
679681

680682
# Feature check caching to keep us within the trace budget.
681683
trigger_actions_ff = features.has("organizations:workflow-engine-trigger-actions", organization)
@@ -697,7 +699,7 @@ def fire_actions_for_groups(
697699
logger,
698700
threshold=timedelta(seconds=40),
699701
) as tracker:
700-
for group, group_event in group_to_groupevent.items():
702+
for group, (group_event, start_timestamp) in group_to_groupevent.items():
701703
with tracker.track(str(group.id)), log_context.new_context(group_id=group.id):
702704
workflow_event_data = WorkflowEventData(event=group_event, group=group)
703705
detector = event_id_to_detector.get(group_event.event_id)
@@ -729,6 +731,7 @@ def fire_actions_for_groups(
729731
workflow_event_data,
730732
should_trigger_actions(group_event.group.type),
731733
is_delayed=True,
734+
start_timestamp=start_timestamp,
732735
)
733736

734737
event_id = (

src/sentry/workflow_engine/processors/workflow.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import sentry_sdk
88
from django.db import router, transaction
99
from django.db.models import Q
10-
from django.utils import timezone
1110

1211
from sentry import features
1312
from sentry.models.activity import Activity
@@ -144,7 +143,9 @@ def enqueue_workflows(
144143

145144
@sentry_sdk.trace
146145
def evaluate_workflow_triggers(
147-
workflows: set[Workflow], event_data: WorkflowEventData
146+
workflows: set[Workflow],
147+
event_data: WorkflowEventData,
148+
event_start_time: datetime,
148149
) -> tuple[set[Workflow], dict[Workflow, DelayedWorkflowItem]]:
149150
"""
150151
Returns a tuple of (triggered_workflows, queue_items_by_workflow)
@@ -155,7 +156,6 @@ def evaluate_workflow_triggers(
155156
"""
156157
triggered_workflows: set[Workflow] = set()
157158
queue_items_by_workflow: dict[Workflow, DelayedWorkflowItem] = {}
158-
current_time = timezone.now()
159159

160160
for workflow in workflows:
161161
evaluation, remaining_conditions = workflow.evaluate_trigger_conditions(event_data)
@@ -168,7 +168,7 @@ def evaluate_workflow_triggers(
168168
delayed_when_group_id=workflow.when_condition_group_id,
169169
delayed_if_group_ids=[],
170170
passing_if_group_ids=[],
171-
timestamp=current_time,
171+
timestamp=event_start_time,
172172
)
173173
else:
174174
"""
@@ -226,6 +226,7 @@ def evaluate_workflows_action_filters(
226226
workflows: set[Workflow],
227227
event_data: WorkflowEventData,
228228
queue_items_by_workflow: dict[Workflow, DelayedWorkflowItem],
229+
event_start_time: datetime,
229230
) -> tuple[set[DataConditionGroup], dict[Workflow, DelayedWorkflowItem]]:
230231
"""
231232
Evaluate the action filters for the given workflows.
@@ -244,7 +245,6 @@ def evaluate_workflows_action_filters(
244245
}
245246

246247
filtered_action_groups: set[DataConditionGroup] = set()
247-
current_time = timezone.now()
248248

249249
for action_condition, workflow in action_conditions_to_workflow.items():
250250
env = (
@@ -271,7 +271,7 @@ def evaluate_workflows_action_filters(
271271
delayed_if_group_ids=[action_condition.id],
272272
event=event_data.event,
273273
passing_if_group_ids=[],
274-
timestamp=current_time,
274+
timestamp=event_start_time,
275275
)
276276
else:
277277
# We should not include activity updates in delayed conditions,
@@ -388,7 +388,9 @@ def _get_associated_workflows(
388388

389389
@log_context.root()
390390
def process_workflows(
391-
event_data: WorkflowEventData, detector: Detector | None = None
391+
event_data: WorkflowEventData,
392+
event_start_time: datetime,
393+
detector: Detector | None = None,
392394
) -> set[Workflow]:
393395
"""
394396
This method will get the detector based on the event, and then gather the associated workflows.
@@ -444,14 +446,14 @@ def process_workflows(
444446
return set()
445447

446448
triggered_workflows, queue_items_by_workflow_id = evaluate_workflow_triggers(
447-
workflows, event_data
449+
workflows, event_data, event_start_time
448450
)
449451
if not triggered_workflows and not queue_items_by_workflow_id:
450452
# if there aren't any triggered workflows, there's no action filters to evaluate
451453
return set()
452454

453455
actions_to_trigger, queue_items_by_workflow_id = evaluate_workflows_action_filters(
454-
triggered_workflows, event_data, queue_items_by_workflow_id
456+
triggered_workflows, event_data, queue_items_by_workflow_id, event_start_time
455457
)
456458
enqueue_workflows(queue_items_by_workflow_id)
457459
actions = filter_recently_fired_workflow_actions(actions_to_trigger, event_data)
@@ -463,7 +465,12 @@ def process_workflows(
463465

464466
should_trigger_actions = should_fire_workflow_actions(organization, event_data.group.type)
465467
create_workflow_fire_histories(
466-
detector, actions, event_data, should_trigger_actions, is_delayed=False
468+
detector,
469+
actions,
470+
event_data,
471+
should_trigger_actions,
472+
is_delayed=False,
473+
start_timestamp=event_start_time,
467474
)
468475
fire_actions(actions, detector, event_data)
469476

0 commit comments

Comments
 (0)