Skip to content

Commit 7f7d827

Browse files
committed
WIP
1 parent 0c110e1 commit 7f7d827

File tree

4 files changed

+241
-1
lines changed

4 files changed

+241
-1
lines changed
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
from datetime import timedelta
2+
from functools import cached_property
3+
from random import randint
4+
from unittest import mock
5+
from uuid import uuid4
6+
7+
import pytest
8+
from django.utils import timezone
9+
10+
from sentry.incidents.grouptype import MetricIssue
11+
from sentry.incidents.subscription_processor import SubscriptionProcessor
12+
from sentry.incidents.utils.constants import INCIDENTS_SNUBA_SUBSCRIPTION_TYPE
13+
from sentry.incidents.utils.types import DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION
14+
from sentry.snuba.dataset import Dataset
15+
from sentry.snuba.entity_subscription import SnubaQuery
16+
from sentry.snuba.models import QuerySubscription, SnubaQueryEventType
17+
from sentry.snuba.subscriptions import create_snuba_query, create_snuba_subscription
18+
from sentry.testutils.cases import SnubaTestCase, SpanTestCase, TestCase
19+
from sentry.testutils.fixtures import DetectorPriorityLevel
20+
from sentry.testutils.helpers.datetime import freeze_time
21+
from sentry.workflow_engine.models.data_condition import Condition, DataCondition
22+
from sentry.workflow_engine.models.detector import Detector
23+
24+
EMPTY = object()
25+
26+
27+
@freeze_time()
28+
class ProcessUpdateBaseClass(TestCase, SpanTestCase, SnubaTestCase):
29+
@pytest.fixture(autouse=True)
30+
def _setup_metrics_patch(self):
31+
with mock.patch("sentry.incidents.subscription_processor.metrics") as self.metrics:
32+
yield
33+
34+
def setUp(self):
35+
super().setUp()
36+
self._run_tasks = self.tasks()
37+
self._run_tasks.__enter__()
38+
39+
def tearDown(self):
40+
super().tearDown()
41+
self._run_tasks.__exit__(None, None, None)
42+
43+
@cached_property
44+
def sub(self):
45+
subscription_id = int(self.metric_detector.data_sources.first().source_id)
46+
return QuerySubscription.objects.get(id=subscription_id)
47+
48+
def create_detector_data_source_and_data_conditions(self):
49+
detector = self.create_detector(
50+
project=self.project,
51+
workflow_condition_group=self.create_data_condition_group(),
52+
type=MetricIssue.slug,
53+
created_by_id=self.user.id,
54+
)
55+
with self.tasks():
56+
snuba_query = create_snuba_query(
57+
query_type=SnubaQuery.Type.ERROR,
58+
dataset=Dataset.Events,
59+
query="",
60+
aggregate="count()",
61+
time_window=timedelta(minutes=1),
62+
resolution=timedelta(minutes=1),
63+
environment=self.environment,
64+
event_types=[
65+
SnubaQueryEventType.EventType.ERROR,
66+
SnubaQueryEventType.EventType.DEFAULT,
67+
],
68+
)
69+
query_subscription = create_snuba_subscription(
70+
project=detector.project,
71+
subscription_type=INCIDENTS_SNUBA_SUBSCRIPTION_TYPE,
72+
snuba_query=snuba_query,
73+
)
74+
data_source = self.create_data_source(
75+
organization=self.organization,
76+
source_id=str(query_subscription.id),
77+
type=DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION,
78+
)
79+
self.create_data_source_detector(data_source, detector)
80+
81+
self.set_up_data_conditions(detector, Condition.GREATER, 100, None, 10)
82+
83+
# XXX: while we're in the rollout phase of workflow engine, the subscription processor still requires a rule.
84+
# Create one here and delete it when we can.
85+
rule = self.create_alert_rule()
86+
rule.snuba_query = snuba_query
87+
rule.save()
88+
89+
return detector
90+
91+
def set_up_data_conditions(
92+
self,
93+
detector: Detector,
94+
threshold_type: Condition,
95+
critical_threshold: int,
96+
warning_threshold: int | None = None,
97+
resolve_threshold: int | None = None,
98+
):
99+
if resolve_threshold is None:
100+
resolve_threshold = (
101+
critical_threshold if warning_threshold is None else warning_threshold
102+
)
103+
resolve_threshold_type = (
104+
Condition.LESS_OR_EQUAL
105+
if threshold_type == Condition.GREATER
106+
else Condition.GREATER_OR_EQUAL
107+
)
108+
109+
self.create_data_condition(
110+
type=threshold_type,
111+
comparison=critical_threshold,
112+
condition_result=DetectorPriorityLevel.HIGH,
113+
condition_group=detector.workflow_condition_group,
114+
)
115+
if warning_threshold is not None:
116+
self.create_data_condition(
117+
type=threshold_type,
118+
comparison=warning_threshold,
119+
condition_result=DetectorPriorityLevel.MEDIUM,
120+
condition_group=detector.workflow_condition_group,
121+
)
122+
self.create_data_condition(
123+
type=resolve_threshold_type,
124+
comparison=resolve_threshold,
125+
condition_result=DetectorPriorityLevel.OK,
126+
condition_group=detector.workflow_condition_group,
127+
)
128+
129+
@cached_property
130+
def metric_detector(self):
131+
return self.create_detector_data_source_and_data_conditions()
132+
133+
@cached_property
134+
def critical_threshold(self):
135+
critical_detector_trigger = DataCondition.objects.get(
136+
condition_group=self.metric_detector.workflow_condition_group,
137+
condition_result=DetectorPriorityLevel.HIGH,
138+
)
139+
return critical_detector_trigger.comparison
140+
141+
def build_subscription_update(self, subscription, time_delta=None, value=EMPTY):
142+
if time_delta is not None:
143+
timestamp = timezone.now() + time_delta
144+
else:
145+
timestamp = timezone.now()
146+
timestamp = timestamp.replace(microsecond=0)
147+
148+
data = {}
149+
150+
if subscription:
151+
data = {"some_col_name": randint(0, 100) if value is EMPTY else value}
152+
values = {"data": [data]}
153+
return {
154+
"subscription_id": subscription.subscription_id if subscription else uuid4().hex,
155+
"values": values,
156+
"timestamp": timestamp,
157+
"interval": 1,
158+
"partition": 1,
159+
"offset": 1,
160+
}
161+
162+
def send_update(self, value, time_delta=None, subscription=None):
163+
if time_delta is None:
164+
time_delta = timedelta()
165+
if subscription is None:
166+
subscription = self.sub
167+
processor = SubscriptionProcessor(subscription)
168+
message = self.build_subscription_update(subscription, value=value, time_delta=time_delta)
169+
with (
170+
self.feature(["organizations:incidents", "organizations:performance-view"]),
171+
self.capture_on_commit_callbacks(execute=True),
172+
):
173+
processor.process_update(message)
174+
return processor
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
from datetime import timedelta
2+
3+
from sentry.constants import ObjectStatus
4+
from sentry.incidents.subscription_processor import SubscriptionProcessor
5+
from sentry.snuba.models import QuerySubscription, SnubaQuery
6+
from tests.sentry.incidents.subscription_processor.test_subscription_processor_base import (
7+
ProcessUpdateBaseClass,
8+
)
9+
10+
11+
class ProcessUpdateEarlyQuitTest(ProcessUpdateBaseClass):
12+
def test_removed_detector(self):
13+
message = self.build_subscription_update(self.sub)
14+
self.metric_detector.delete()
15+
subscription_id = self.sub.id
16+
snuba_query = self.sub.snuba_query
17+
with (
18+
self.feature(["organizations:incidents", "organizations:performance-view"]),
19+
self.tasks(),
20+
):
21+
SubscriptionProcessor(self.sub).process_update(message)
22+
# TODO: replace the metric that early quits if there's no detector for the subscription processor
23+
assert not QuerySubscription.objects.filter(id=subscription_id).exists()
24+
assert not SnubaQuery.objects.filter(id=snuba_query.id).exists()
25+
26+
def test_removed_project(self):
27+
message = self.build_subscription_update(self.sub)
28+
self.project.delete()
29+
with self.feature(["organizations:incidents", "organizations:performance-view"]):
30+
SubscriptionProcessor(self.sub).process_update(message)
31+
self.metrics.incr.assert_called_once_with("incidents.alert_rules.ignore_deleted_project")
32+
33+
def test_pending_deletion_project(self):
34+
message = self.build_subscription_update(self.sub)
35+
self.project.update(status=ObjectStatus.DELETION_IN_PROGRESS)
36+
with self.feature(["organizations:incidents", "organizations:performance-view"]):
37+
SubscriptionProcessor(self.sub).process_update(message)
38+
self.metrics.incr.assert_called_once_with("incidents.alert_rules.ignore_deleted_project")
39+
40+
# TODO: test_no_feature will need to be updated with the new logic for gating metric detectors
41+
42+
def test_skip_already_processed_update(self):
43+
self.send_update(self.critical_threshold)
44+
self.metrics.incr.reset_mock()
45+
self.send_update(self.critical_threshold)
46+
self.metrics.incr.assert_called_once_with(
47+
"incidents.alert_rules.skipping_already_processed_update"
48+
)
49+
self.metrics.incr.reset_mock()
50+
self.send_update(self.critical_threshold, timedelta(hours=-1))
51+
self.metrics.incr.assert_called_once_with(
52+
"incidents.alert_rules.skipping_already_processed_update"
53+
)
54+
self.metrics.incr.reset_mock()
55+
self.send_update(self.critical_threshold, timedelta(hours=1))
56+
assert self.metrics.incr.call_count == 0

tests/sentry/incidents/subscription_processor/test_subscription_processor.py renamed to tests/sentry/incidents/subscription_processor/test_subscription_processor_legacy.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
"""
2+
Subscription processor tests for the legacy metric alerting system. This file will
3+
be cleaned up after we fully migrate away from metric alerts.
4+
"""
5+
16
import copy
27
import inspect
38
import unittest

tests/sentry/incidents/subscription_processor/test_subscription_processor_workflow_engine.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
"""
2+
Dual processing tests for the workflow engine/legacy system. This file will be cleaned up
3+
after we fully migrate away from metric alerts.
4+
"""
5+
16
from datetime import timedelta
27
from unittest import mock
38
from unittest.mock import MagicMock, call, patch
@@ -37,7 +42,7 @@
3742
from sentry.utils import json
3843
from sentry.workflow_engine.models.data_condition import Condition
3944
from sentry.workflow_engine.types import DetectorPriorityLevel
40-
from tests.sentry.incidents.subscription_processor.test_subscription_processor import (
45+
from tests.sentry.incidents.subscription_processor.test_subscription_processor_legacy import (
4146
ProcessUpdateAnomalyDetectionTest,
4247
ProcessUpdateComparisonAlertTest,
4348
)

0 commit comments

Comments
 (0)