Skip to content

Commit a99d167

Browse files
ref(alerts): Pull code out into smaller functions (#97135)
Redo of #94654 cause this was somehow easier than rebasing - this PR pulls code out into smaller functions because `process_update()` is very long and hard to navigate. --------- Co-authored-by: getsantry[bot] <66042841+getsantry[bot]@users.noreply.github.com>
1 parent 88b0e41 commit a99d167

File tree

1 file changed

+144
-115
lines changed

1 file changed

+144
-115
lines changed

src/sentry/incidents/subscription_processor.py

Lines changed: 144 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
ProcessedSubscriptionUpdate,
5151
QuerySubscriptionUpdate,
5252
)
53+
from sentry.models.organization import Organization
5354
from sentry.models.project import Project
5455
from sentry.seer.anomaly_detection.get_anomaly_data import get_anomaly_data_from_seer_legacy
5556
from sentry.seer.anomaly_detection.utils import (
@@ -65,6 +66,7 @@
6566
from sentry.utils.memory import track_memory_usage
6667
from sentry.workflow_engine.models import DataPacket, Detector
6768
from sentry.workflow_engine.processors.data_packet import process_data_packet
69+
from sentry.workflow_engine.types import DetectorEvaluationResult, DetectorGroupKey
6870

6971
logger = logging.getLogger(__name__)
7072
REDIS_TTL = int(timedelta(days=7).total_seconds())
@@ -294,6 +296,140 @@ def handle_trigger_anomalies(
294296

295297
return fired_incident_triggers
296298

299+
def get_comparison_delta(self, detector: Detector | None) -> int | None:
300+
comparison_delta = None
301+
302+
if detector:
303+
comparison_delta = detector.config.get("comparison_delta")
304+
else:
305+
comparison_delta = self.alert_rule.comparison_delta
306+
307+
return comparison_delta
308+
309+
def get_detector(self, has_metric_alert_processing: bool) -> Detector | None:
310+
detector = None
311+
if has_metric_alert_processing:
312+
try:
313+
detector = Detector.objects.get(
314+
data_sources__source_id=str(self.subscription.id),
315+
data_sources__type=DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION,
316+
)
317+
except Detector.DoesNotExist:
318+
logger.exception(
319+
"Detector not found", extra={"subscription_id": self.subscription.id}
320+
)
321+
return detector
322+
323+
def handle_trigger_alerts(
324+
self,
325+
trigger: AlertRuleTrigger,
326+
aggregation_value: float,
327+
fired_incident_triggers: list[IncidentTrigger],
328+
metrics_incremented: bool,
329+
) -> tuple[list[IncidentTrigger], bool]:
330+
# OVER/UNDER value trigger
331+
alert_operator, resolve_operator = self.THRESHOLD_TYPE_OPERATORS[
332+
AlertRuleThresholdType(self.alert_rule.threshold_type)
333+
]
334+
if alert_operator(
335+
aggregation_value, trigger.alert_threshold
336+
) and not self.check_trigger_matches_status(trigger, TriggerStatus.ACTIVE):
337+
# If the value has breached our threshold (above/below)
338+
# And the trigger is not yet active
339+
metrics.incr(
340+
"incidents.alert_rules.threshold.alert",
341+
tags={"detection_type": self.alert_rule.detection_type},
342+
)
343+
if (
344+
features.has(
345+
"organizations:workflow-engine-metric-alert-dual-processing-logs",
346+
self.subscription.project.organization,
347+
)
348+
and not metrics_incremented
349+
):
350+
metrics.incr("dual_processing.alert_rules.fire")
351+
metrics_incremented = True
352+
# triggering a threshold will create an incident and set the status to active
353+
incident_trigger = self.trigger_alert_threshold(trigger, aggregation_value)
354+
if incident_trigger is not None:
355+
fired_incident_triggers.append(incident_trigger)
356+
else:
357+
self.trigger_alert_counts[trigger.id] = 0
358+
359+
if (
360+
resolve_operator(aggregation_value, self.calculate_resolve_threshold(trigger))
361+
and self.active_incident
362+
and self.check_trigger_matches_status(trigger, TriggerStatus.ACTIVE)
363+
):
364+
metrics.incr(
365+
"incidents.alert_rules.threshold.resolve",
366+
tags={"detection_type": self.alert_rule.detection_type},
367+
)
368+
if features.has(
369+
"organizations:workflow-engine-metric-alert-dual-processing-logs",
370+
self.subscription.project.organization,
371+
):
372+
metrics.incr("dual_processing.alert_rules.resolve")
373+
incident_trigger = self.trigger_resolve_threshold(trigger, aggregation_value)
374+
375+
if incident_trigger is not None:
376+
fired_incident_triggers.append(incident_trigger)
377+
else:
378+
self.trigger_resolve_counts[trigger.id] = 0
379+
380+
return fired_incident_triggers, metrics_incremented
381+
382+
def process_results_workflow_engine(
383+
self,
384+
subscription_update: QuerySubscriptionUpdate,
385+
aggregation_value: float,
386+
organization: Organization,
387+
) -> list[tuple[Detector, dict[DetectorGroupKey, DetectorEvaluationResult]]]:
388+
if self.alert_rule.detection_type == AlertRuleDetectionType.DYNAMIC:
389+
anomaly_detection_packet = AnomalyDetectionUpdate(
390+
entity=subscription_update.get("entity", ""),
391+
subscription_id=subscription_update["subscription_id"],
392+
values={
393+
"value": aggregation_value,
394+
"source_id": str(self.subscription.id),
395+
"subscription_id": subscription_update["subscription_id"],
396+
"timestamp": self.last_update,
397+
},
398+
timestamp=self.last_update,
399+
)
400+
anomaly_detection_data_packet = DataPacket[AnomalyDetectionUpdate](
401+
source_id=str(self.subscription.id), packet=anomaly_detection_packet
402+
)
403+
results = process_data_packet(
404+
anomaly_detection_data_packet, DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION
405+
)
406+
else:
407+
metric_packet = ProcessedSubscriptionUpdate(
408+
entity=subscription_update.get("entity", ""),
409+
subscription_id=subscription_update["subscription_id"],
410+
values={"value": aggregation_value},
411+
timestamp=self.last_update,
412+
)
413+
metric_data_packet = DataPacket[ProcessedSubscriptionUpdate](
414+
source_id=str(self.subscription.id), packet=metric_packet
415+
)
416+
results = process_data_packet(metric_data_packet, DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION)
417+
418+
if features.has(
419+
"organizations:workflow-engine-metric-alert-dual-processing-logs",
420+
organization,
421+
):
422+
logger.info(
423+
"dual processing results for alert rule",
424+
extra={
425+
"results": results,
426+
"num_results": len(results),
427+
"value": aggregation_value,
428+
"rule_id": self.alert_rule.id,
429+
},
430+
)
431+
return results
432+
297433
def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
298434
"""
299435
This is the core processing method utilized when Query Subscription Consumer fetches updates from kafka
@@ -369,70 +505,15 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
369505
tags={"dual_processing": has_metric_alert_processing},
370506
),
371507
):
372-
if has_metric_alert_processing:
373-
try:
374-
detector = Detector.objects.get(
375-
data_sources__source_id=str(self.subscription.id),
376-
data_sources__type=DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION,
377-
)
378-
comparison_delta = detector.config.get("comparison_delta")
379-
except Detector.DoesNotExist:
380-
logger.exception(
381-
"Detector not found", extra={"subscription_id": self.subscription.id}
382-
)
383-
384-
else:
385-
comparison_delta = self.alert_rule.comparison_delta
386-
508+
detector = self.get_detector(has_metric_alert_processing)
509+
comparison_delta = self.get_comparison_delta(detector)
387510
aggregation_value = self.get_aggregation_value(subscription_update, comparison_delta)
388511

389512
if aggregation_value is not None:
390513
if has_metric_alert_processing:
391-
if self.alert_rule.detection_type == AlertRuleDetectionType.DYNAMIC:
392-
anomaly_detection_packet = AnomalyDetectionUpdate(
393-
entity=subscription_update.get("entity", ""),
394-
subscription_id=subscription_update["subscription_id"],
395-
values={
396-
"value": aggregation_value,
397-
"source_id": str(self.subscription.id),
398-
"subscription_id": subscription_update["subscription_id"],
399-
"timestamp": self.last_update,
400-
},
401-
timestamp=self.last_update,
402-
)
403-
anomaly_detection_data_packet = DataPacket[AnomalyDetectionUpdate](
404-
source_id=str(self.subscription.id), packet=anomaly_detection_packet
405-
)
406-
results = process_data_packet(
407-
anomaly_detection_data_packet, DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION
408-
)
409-
else:
410-
metric_packet = ProcessedSubscriptionUpdate(
411-
entity=subscription_update.get("entity", ""),
412-
subscription_id=subscription_update["subscription_id"],
413-
values={"value": aggregation_value},
414-
timestamp=self.last_update,
415-
)
416-
metric_data_packet = DataPacket[ProcessedSubscriptionUpdate](
417-
source_id=str(self.subscription.id), packet=metric_packet
418-
)
419-
results = process_data_packet(
420-
metric_data_packet, DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION
421-
)
422-
423-
if features.has(
424-
"organizations:workflow-engine-metric-alert-dual-processing-logs",
425-
organization,
426-
):
427-
logger.info(
428-
"dual processing results for alert rule",
429-
extra={
430-
"results": results,
431-
"num_results": len(results),
432-
"value": aggregation_value,
433-
"rule_id": self.alert_rule.id,
434-
},
435-
)
514+
results = self.process_results_workflow_engine(
515+
subscription_update, aggregation_value, organization
516+
)
436517

437518
if has_metric_issue_single_processing:
438519
# don't go through the legacy system
@@ -531,61 +612,9 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
531612
)
532613
return
533614

534-
# OVER/UNDER value trigger
535-
alert_operator, resolve_operator = self.THRESHOLD_TYPE_OPERATORS[
536-
AlertRuleThresholdType(self.alert_rule.threshold_type)
537-
]
538-
if alert_operator(
539-
aggregation_value, trigger.alert_threshold
540-
) and not self.check_trigger_matches_status(trigger, TriggerStatus.ACTIVE):
541-
# If the value has breached our threshold (above/below)
542-
# And the trigger is not yet active
543-
metrics.incr(
544-
"incidents.alert_rules.threshold.alert",
545-
tags={"detection_type": self.alert_rule.detection_type},
546-
)
547-
if (
548-
features.has(
549-
"organizations:workflow-engine-metric-alert-dual-processing-logs",
550-
self.subscription.project.organization,
551-
)
552-
and not metrics_incremented
553-
):
554-
metrics.incr("dual_processing.alert_rules.fire")
555-
metrics_incremented = True
556-
# triggering a threshold will create an incident and set the status to active
557-
incident_trigger = self.trigger_alert_threshold(
558-
trigger, aggregation_value
559-
)
560-
if incident_trigger is not None:
561-
fired_incident_triggers.append(incident_trigger)
562-
else:
563-
self.trigger_alert_counts[trigger.id] = 0
564-
565-
if (
566-
resolve_operator(
567-
aggregation_value, self.calculate_resolve_threshold(trigger)
568-
)
569-
and self.active_incident
570-
and self.check_trigger_matches_status(trigger, TriggerStatus.ACTIVE)
571-
):
572-
metrics.incr(
573-
"incidents.alert_rules.threshold.resolve",
574-
tags={"detection_type": self.alert_rule.detection_type},
575-
)
576-
if features.has(
577-
"organizations:workflow-engine-metric-alert-dual-processing-logs",
578-
self.subscription.project.organization,
579-
):
580-
metrics.incr("dual_processing.alert_rules.resolve")
581-
incident_trigger = self.trigger_resolve_threshold(
582-
trigger, aggregation_value
583-
)
584-
585-
if incident_trigger is not None:
586-
fired_incident_triggers.append(incident_trigger)
587-
else:
588-
self.trigger_resolve_counts[trigger.id] = 0
615+
fired_incident_triggers, metrics_incremented = self.handle_trigger_alerts(
616+
trigger, aggregation_value, fired_incident_triggers, metrics_incremented
617+
)
589618

590619
if fired_incident_triggers:
591620
# For all the newly created incidents

0 commit comments

Comments
 (0)