5
5
from collections .abc import Sequence
6
6
from copy import deepcopy
7
7
from datetime import datetime , timedelta
8
- from typing import TypeVar , cast
8
+ from typing import Literal , TypedDict , TypeVar , cast
9
9
10
10
from django .conf import settings
11
11
from django .db import router , transaction
87
87
T = TypeVar ("T" )
88
88
89
89
90
+ class MetricIssueDetectorConfig (TypedDict ):
91
+ """
92
+ Schema for Metric Issue Detector.config.
93
+ """
94
+
95
+ comparison_delta : int | None
96
+ detection_type : Literal ["static" , "percent" , "dynamic" ]
97
+
98
+
90
99
class SubscriptionProcessor :
91
100
"""
92
101
Class for processing subscription updates for an alert rule. Accepts a subscription
@@ -107,19 +116,20 @@ class SubscriptionProcessor:
107
116
108
117
def __init__ (self , subscription : QuerySubscription ) -> None :
109
118
self .subscription = subscription
119
+ self ._alert_rule : AlertRule | None = None
110
120
try :
111
- self .alert_rule = AlertRule .objects .get_for_subscription (subscription )
121
+ self ._alert_rule = AlertRule .objects .get_for_subscription (subscription )
112
122
except AlertRule .DoesNotExist :
113
123
return
114
124
115
- self .triggers = AlertRuleTrigger .objects .get_for_alert_rule (self .alert_rule )
125
+ self .triggers = AlertRuleTrigger .objects .get_for_alert_rule (self ._alert_rule )
116
126
self .triggers .sort (key = lambda trigger : trigger .alert_threshold )
117
127
118
128
(
119
129
self .last_update ,
120
130
self .trigger_alert_counts ,
121
131
self .trigger_resolve_counts ,
122
- ) = get_alert_rule_stats (self .alert_rule , self .subscription , self .triggers )
132
+ ) = get_alert_rule_stats (self ._alert_rule , self .subscription , self .triggers )
123
133
self .orig_trigger_alert_counts = deepcopy (self .trigger_alert_counts )
124
134
self .orig_trigger_resolve_counts = deepcopy (self .trigger_resolve_counts )
125
135
@@ -135,6 +145,14 @@ def __init__(self, subscription: QuerySubscription) -> None:
135
145
or self ._has_workflow_engine_processing_only
136
146
)
137
147
148
+ @property
149
+ def alert_rule (self ) -> AlertRule :
150
+ """
151
+ Only use this in non-single processing contexts.
152
+ """
153
+ assert self ._alert_rule is not None
154
+ return self ._alert_rule
155
+
138
156
@property
139
157
def active_incident (self ) -> Incident | None :
140
158
"""
@@ -188,15 +206,15 @@ def check_trigger_matches_status(
188
206
incident_trigger = self .incident_trigger_map .get (trigger .id )
189
207
return incident_trigger is not None and incident_trigger .status == status .value
190
208
191
- def reset_trigger_counts (self ) -> None :
209
+ def reset_trigger_counts (self , alert_rule : AlertRule ) -> None :
192
210
"""
193
211
Helper method that clears both the trigger alert and the trigger resolve counts
194
212
"""
195
213
for trigger_id in self .trigger_alert_counts :
196
214
self .trigger_alert_counts [trigger_id ] = 0
197
215
for trigger_id in self .trigger_resolve_counts :
198
216
self .trigger_resolve_counts [trigger_id ] = 0
199
- self .update_alert_rule_stats ()
217
+ self .update_alert_rule_stats (alert_rule )
200
218
201
219
def calculate_resolve_threshold (self , trigger : AlertRuleTrigger ) -> float :
202
220
"""
@@ -253,8 +271,8 @@ def get_crash_rate_alert_metrics_aggregation_value(
253
271
aggregation_value = get_crash_rate_alert_metrics_aggregation_value_helper (
254
272
subscription_update
255
273
)
256
- if aggregation_value is None :
257
- self .reset_trigger_counts ()
274
+ if aggregation_value is None and self . _alert_rule is not None :
275
+ self .reset_trigger_counts (self . _alert_rule )
258
276
return aggregation_value
259
277
260
278
def get_aggregation_value (
@@ -271,7 +289,7 @@ def get_aggregation_value(
271
289
organization_id = self .subscription .project .organization .id ,
272
290
project_ids = [self .subscription .project_id ],
273
291
comparison_delta = comparison_delta ,
274
- alert_rule_id = self .alert_rule .id ,
292
+ alert_rule_id = self ._alert_rule .id if self . _alert_rule else None ,
275
293
)
276
294
277
295
return aggregation_value
@@ -300,7 +318,7 @@ def handle_trigger_anomalies(
300
318
is_resolved = False ,
301
319
)
302
320
incremented = metrics_incremented or incremented
303
- incident_trigger = self .trigger_alert_threshold (trigger , aggregation_value )
321
+ incident_trigger = self .trigger_alert_threshold (trigger )
304
322
if incident_trigger is not None :
305
323
fired_incident_triggers .append (incident_trigger )
306
324
else :
@@ -332,9 +350,12 @@ def get_comparison_delta(self, detector: Detector | None) -> int | None:
332
350
comparison_delta = None
333
351
334
352
if detector :
335
- comparison_delta = detector .config .get ("comparison_delta" )
353
+ detector_cfg : MetricIssueDetectorConfig = detector .config
354
+ comparison_delta = detector_cfg .get ("comparison_delta" )
336
355
else :
337
- comparison_delta = self .alert_rule .comparison_delta
356
+ # If we don't have a Detector, we must have an AlertRule.
357
+ assert self ._alert_rule is not None
358
+ comparison_delta = self ._alert_rule .comparison_delta
338
359
339
360
return comparison_delta
340
361
@@ -421,7 +442,7 @@ def handle_trigger_alerts(
421
442
)
422
443
incremented = metrics_incremented or incremented
423
444
# triggering a threshold will create an incident and set the status to active
424
- incident_trigger = self .trigger_alert_threshold (trigger , aggregation_value )
445
+ incident_trigger = self .trigger_alert_threshold (trigger )
425
446
if incident_trigger is not None :
426
447
fired_incident_triggers .append (incident_trigger )
427
448
else :
@@ -455,11 +476,13 @@ def handle_trigger_alerts(
455
476
456
477
def process_results_workflow_engine (
457
478
self ,
479
+ detector : Detector ,
458
480
subscription_update : QuerySubscriptionUpdate ,
459
481
aggregation_value : float ,
460
482
organization : Organization ,
461
483
) -> list [tuple [Detector , dict [DetectorGroupKey , DetectorEvaluationResult ]]]:
462
- if self .alert_rule .detection_type == AlertRuleDetectionType .DYNAMIC :
484
+ detector_cfg : MetricIssueDetectorConfig = detector .config
485
+ if detector_cfg ["detection_type" ] == AlertRuleDetectionType .DYNAMIC .value :
463
486
anomaly_detection_packet = AnomalyDetectionUpdate (
464
487
entity = subscription_update .get ("entity" , "" ),
465
488
subscription_id = subscription_update ["subscription_id" ],
@@ -499,14 +522,13 @@ def process_results_workflow_engine(
499
522
"results" : results ,
500
523
"num_results" : len (results ),
501
524
"value" : aggregation_value ,
502
- "rule_id" : self .alert_rule .id ,
525
+ "rule_id" : self ._alert_rule .id if self . _alert_rule else None ,
503
526
},
504
527
)
505
528
return results
506
529
507
530
def process_legacy_metric_alerts (
508
531
self ,
509
- subscription_update : QuerySubscriptionUpdate ,
510
532
aggregation_value : float ,
511
533
detector : Detector | None ,
512
534
results : list [tuple [Detector , dict [DetectorGroupKey , DetectorEvaluationResult ]]] | None ,
@@ -632,7 +654,7 @@ def process_legacy_metric_alerts(
632
654
# is killed here. The trade-off is that we might process an update twice. Mostly
633
655
# this will have no effect, but if someone manages to close a triggered incident
634
656
# before the next one then we might alert twice.
635
- self .update_alert_rule_stats ()
657
+ self .update_alert_rule_stats (self . alert_rule )
636
658
return fired_incident_triggers
637
659
638
660
def has_downgraded (self , dataset : str , organization : Organization ) -> bool :
@@ -677,7 +699,7 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
677
699
if self .has_downgraded (dataset , organization ):
678
700
return
679
701
680
- if not hasattr ( self , "alert_rule" ) :
702
+ if self . _alert_rule is None :
681
703
# QuerySubscriptions must _always_ have an associated AlertRule
682
704
# If the alert rule has been removed then clean up associated tables and return
683
705
metrics .incr ("incidents.alert_rules.no_alert_rule_for_subscription" , sample_rate = 1.0 )
@@ -736,8 +758,9 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
736
758
legacy_results = None
737
759
738
760
if self ._has_workflow_engine_processing :
761
+ assert detector is not None
739
762
workflow_engine_results = self .process_results_workflow_engine (
740
- subscription_update , aggregation_value , organization
763
+ detector , subscription_update , aggregation_value , organization
741
764
)
742
765
743
766
if self ._has_workflow_engine_processing_only :
@@ -756,7 +779,6 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
756
779
workflow engine "and" metric alerts.
757
780
"""
758
781
legacy_results = self .process_legacy_metric_alerts (
759
- subscription_update ,
760
782
aggregation_value ,
761
783
detector ,
762
784
workflow_engine_results ,
@@ -775,7 +797,8 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
775
797
)
776
798
777
799
def trigger_alert_threshold (
778
- self , trigger : AlertRuleTrigger , metric_value : float
800
+ self ,
801
+ trigger : AlertRuleTrigger ,
779
802
) -> IncidentTrigger | None :
780
803
"""
781
804
Called when a subscription update exceeds the value defined in the
@@ -1019,7 +1042,7 @@ def handle_incident_severity_update(self) -> None:
1019
1042
status_method = IncidentStatusMethod .RULE_TRIGGERED ,
1020
1043
)
1021
1044
1022
- def update_alert_rule_stats (self ) -> None :
1045
+ def update_alert_rule_stats (self , alert_rule : AlertRule ) -> None :
1023
1046
"""
1024
1047
Updates stats about the alert rule, if they're changed.
1025
1048
:return:
@@ -1036,7 +1059,7 @@ def update_alert_rule_stats(self) -> None:
1036
1059
}
1037
1060
1038
1061
update_alert_rule_stats (
1039
- self . alert_rule ,
1062
+ alert_rule ,
1040
1063
self .subscription ,
1041
1064
self .last_update ,
1042
1065
updated_trigger_alert_counts ,
0 commit comments