diff --git a/src/sentry/incidents/subscription_processor.py b/src/sentry/incidents/subscription_processor.py index 64693c020830bc..4d85ddfc03fc32 100644 --- a/src/sentry/incidents/subscription_processor.py +++ b/src/sentry/incidents/subscription_processor.py @@ -514,6 +514,10 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None: results = self.process_results_workflow_engine( subscription_update, aggregation_value, organization ) + else: + # XXX: after we fully migrate to single processing we can return early here + # this just preserves test functionality for now + metrics.incr("incidents.alert_rules.skipping_update_invalid_aggregation_value") if has_metric_issue_single_processing: # don't go through the legacy system @@ -538,7 +542,6 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None: return if aggregation_value is None: - metrics.incr("incidents.alert_rules.skipping_update_invalid_aggregation_value") return fired_incident_triggers: list[IncidentTrigger] = [] diff --git a/tests/sentry/incidents/subscription_processor/test_subscription_processor_aci.py b/tests/sentry/incidents/subscription_processor/test_subscription_processor_aci.py new file mode 100644 index 00000000000000..7f55033c138e77 --- /dev/null +++ b/tests/sentry/incidents/subscription_processor/test_subscription_processor_aci.py @@ -0,0 +1,336 @@ +import copy +from datetime import timedelta +from functools import cached_property +from unittest.mock import call, patch + +from django.utils import timezone + +from sentry.testutils.factories import DEFAULT_EVENT_DATA +from sentry.workflow_engine.models.data_condition import Condition, DataCondition +from sentry.workflow_engine.types import DetectorPriorityLevel +from tests.sentry.incidents.subscription_processor.test_subscription_processor_base import ( + ProcessUpdateBaseClass, +) + + +class ProcessUpdateTest(ProcessUpdateBaseClass): + """ + Test early return scenarios + simple cases. + """ + + # TODO: tests for early return scenarios. These will need to be added once + # we've decoupled the subscription processor from the alert rule model. + + def test_simple(self) -> None: + """ + Verify that an alert can trigger. + """ + self.send_update(self.critical_threshold + 1) + assert self.get_detector_state(self.metric_detector) == DetectorPriorityLevel.HIGH + + def test_resolve(self) -> None: + detector = self.metric_detector + self.send_update(self.critical_threshold + 1, timedelta(minutes=-2)) + assert self.get_detector_state(detector) == DetectorPriorityLevel.HIGH + + self.send_update(self.resolve_threshold - 1, timedelta(minutes=-1)) + assert self.get_detector_state(detector) == DetectorPriorityLevel.OK + + def test_resolve_percent_boundary(self) -> None: + detector = self.metric_detector + self.update_threshold(detector, DetectorPriorityLevel.HIGH, 0.5) + self.update_threshold(detector, DetectorPriorityLevel.OK, 0.5) + self.send_update(self.critical_threshold + 0.1, timedelta(minutes=-2)) + assert self.get_detector_state(detector) == DetectorPriorityLevel.HIGH + + self.send_update(self.resolve_threshold, timedelta(minutes=-1)) + assert self.get_detector_state(detector) == DetectorPriorityLevel.OK + + def test_reversed(self) -> None: + """ + Test that resolutions work when the threshold is reversed. + """ + detector = self.metric_detector + DataCondition.objects.filter(condition_group=detector.workflow_condition_group).delete() + self.set_up_data_conditions(detector, Condition.LESS, 100, None, 100) + self.send_update(self.critical_threshold - 1, timedelta(minutes=-2)) + assert self.get_detector_state(detector) == DetectorPriorityLevel.HIGH + + self.send_update(self.resolve_threshold, timedelta(minutes=-1)) + assert self.get_detector_state(detector) == DetectorPriorityLevel.OK + + def test_multiple_triggers(self) -> None: + detector = self.metric_detector + DataCondition.objects.filter(condition_group=detector.workflow_condition_group).delete() + self.set_up_data_conditions(detector, Condition.GREATER, 100, 50, 50) + + self.send_update(self.warning_threshold + 1, timedelta(minutes=-5)) + assert self.get_detector_state(detector) == DetectorPriorityLevel.MEDIUM + + self.send_update(self.critical_threshold + 1, timedelta(minutes=-4)) + assert self.get_detector_state(detector) == DetectorPriorityLevel.HIGH + + self.send_update(self.critical_threshold - 1, timedelta(minutes=-3)) + assert self.get_detector_state(detector) == DetectorPriorityLevel.MEDIUM + + self.send_update(self.warning_threshold - 1, timedelta(minutes=-2)) + assert self.get_detector_state(detector) == DetectorPriorityLevel.OK + + def test_multiple_triggers_reversed(self) -> None: + detector = self.metric_detector + DataCondition.objects.filter(condition_group=detector.workflow_condition_group).delete() + self.set_up_data_conditions(detector, Condition.LESS, 50, 100, 100) + + self.send_update(self.warning_threshold - 1, timedelta(minutes=-5)) + assert self.get_detector_state(detector) == DetectorPriorityLevel.MEDIUM + + self.send_update(self.critical_threshold - 1, timedelta(minutes=-4)) + assert self.get_detector_state(detector) == DetectorPriorityLevel.HIGH + + self.send_update(self.critical_threshold + 1, timedelta(minutes=-3)) + assert self.get_detector_state(detector) == DetectorPriorityLevel.MEDIUM + + self.send_update(self.warning_threshold + 1, timedelta(minutes=-2)) + assert self.get_detector_state(detector) == DetectorPriorityLevel.OK + + # TODO: the subscription processor has a 10 minute cooldown period for creating new incidents + # We probably need similar logic within workflow engine. + + +class ProcessUpdateComparisonAlertTest(ProcessUpdateBaseClass): + @cached_property + def comparison_detector_above(self): + detector = self.metric_detector + detector.config.update({"comparison_delta": 60 * 60}) + detector.save() + self.update_threshold(detector, DetectorPriorityLevel.HIGH, 150) + self.update_threshold(detector, DetectorPriorityLevel.OK, 150) + snuba_query = self.get_snuba_query(detector) + snuba_query.update(time_window=60 * 60) + return detector + + @cached_property + def comparison_detector_below(self): + detector = self.metric_detector + detector.config.update({"comparison_delta": 60 * 60}) + detector.save() + DataCondition.objects.filter(condition_group=detector.workflow_condition_group).delete() + self.set_up_data_conditions(detector, Condition.LESS, 50, None, 50) + snuba_query = self.get_snuba_query(detector) + snuba_query.update(time_window=60 * 60) + return detector + + @patch("sentry.incidents.utils.process_update_helpers.metrics") + def test_comparison_alert_above(self, helper_metrics): + detector = self.comparison_detector_above + comparison_delta = timedelta(seconds=detector.config["comparison_delta"]) + self.send_update(self.critical_threshold + 1, timedelta(minutes=-10)) + + # Shouldn't trigger, since there should be no data in the comparison period + assert self.get_detector_state(detector) == DetectorPriorityLevel.OK + helper_metrics.incr.assert_has_calls( + [ + call("incidents.alert_rules.skipping_update_comparison_value_invalid"), + ] + ) + self.metrics.incr.assert_has_calls( + [ + call("incidents.alert_rules.skipping_update_invalid_aggregation_value"), + ] + ) + comparison_date = timezone.now() - comparison_delta + + for i in range(4): + self.store_event( + data={ + "timestamp": (comparison_date - timedelta(minutes=30 + i)).isoformat(), + "environment": self.environment.name, + }, + project_id=self.project.id, + ) + self.metrics.incr.reset_mock() + self.send_update(2, timedelta(minutes=-9)) + # Shouldn't trigger, since there are 4 events in the comparison period, and 2/4 == 50% + assert self.get_detector_state(detector) == DetectorPriorityLevel.OK + + self.send_update(4, timedelta(minutes=-8)) + # Shouldn't trigger, since there are 4 events in the comparison period, and 4/4 == 100% + assert self.get_detector_state(detector) == DetectorPriorityLevel.OK + + self.send_update(6, timedelta(minutes=-7)) + # Shouldn't trigger: 6/4 == 150%, but we want > 150% + assert self.get_detector_state(detector) == DetectorPriorityLevel.OK + + self.send_update(7, timedelta(minutes=-6)) + # Should trigger: 7/4 == 175% > 150% + assert self.get_detector_state(detector) == DetectorPriorityLevel.HIGH + + # Check that we successfully resolve + self.send_update(6, timedelta(minutes=-5)) + assert self.get_detector_state(detector) == DetectorPriorityLevel.OK + + @patch("sentry.incidents.utils.process_update_helpers.metrics") + def test_comparison_alert_below(self, helper_metrics): + detector = self.comparison_detector_below + comparison_delta = timedelta(seconds=detector.config["comparison_delta"]) + self.send_update(self.critical_threshold - 1, timedelta(minutes=-10)) + + # Shouldn't trigger, since there should be no data in the comparison period + assert self.get_detector_state(detector) == DetectorPriorityLevel.OK + helper_metrics.incr.assert_has_calls( + [ + call("incidents.alert_rules.skipping_update_comparison_value_invalid"), + ] + ) + self.metrics.incr.assert_has_calls( + [ + call("incidents.alert_rules.skipping_update_invalid_aggregation_value"), + ] + ) + comparison_date = timezone.now() - comparison_delta + + for i in range(4): + self.store_event( + data={ + "timestamp": (comparison_date - timedelta(minutes=30 + i)).isoformat(), + "environment": self.environment.name, + }, + project_id=self.project.id, + ) + + self.metrics.incr.reset_mock() + self.send_update(6, timedelta(minutes=-9)) + # Shouldn't trigger, since there are 4 events in the comparison period, and 6/4 == 150% + assert self.get_detector_state(detector) == DetectorPriorityLevel.OK + + self.send_update(4, timedelta(minutes=-8)) + # Shouldn't trigger, since there are 4 events in the comparison period, and 4/4 == 100% + assert self.get_detector_state(detector) == DetectorPriorityLevel.OK + + self.send_update(2, timedelta(minutes=-7)) + # Shouldn't trigger: 2/4 == 50%, but we want < 50% + assert self.get_detector_state(detector) == DetectorPriorityLevel.OK + + self.send_update(1, timedelta(minutes=-6)) + # Should trigger: 1/4 == 25% < 50% + assert self.get_detector_state(detector) == DetectorPriorityLevel.HIGH + + # Check that we successfully resolve + self.send_update(2, timedelta(minutes=-5)) + assert self.get_detector_state(detector) == DetectorPriorityLevel.OK + + @patch("sentry.incidents.utils.process_update_helpers.metrics") + def test_is_unresolved_comparison_query(self, helper_metrics): + """ + Test that uses the ErrorsQueryBuilder (because of the specific query) + """ + detector = self.comparison_detector_above + comparison_delta = timedelta(seconds=detector.config["comparison_delta"]) + snuba_query = self.get_snuba_query(detector) + snuba_query.update(query="(event.type:error) AND (is:unresolved)") + + self.send_update(self.critical_threshold + 1, timedelta(minutes=-10), subscription=self.sub) + helper_metrics.incr.assert_has_calls( + [ + call("incidents.alert_rules.skipping_update_comparison_value_invalid"), + ] + ) + self.metrics.incr.assert_has_calls( + [ + call("incidents.alert_rules.skipping_update_invalid_aggregation_value"), + ] + ) + comparison_date = timezone.now() - comparison_delta + + for i in range(4): + data = { + "timestamp": (comparison_date - timedelta(minutes=30 + i)).isoformat(), + "environment": self.environment.name, + "stacktrace": copy.deepcopy(DEFAULT_EVENT_DATA["stacktrace"]), + "fingerprint": ["group2"], + "level": "error", + "exception": { + "values": [ + { + "type": "IntegrationError", + "value": "Identity not found.", + } + ] + }, + } + self.store_event( + data=data, + project_id=self.project.id, + ) + + self.metrics.incr.reset_mock() + self.send_update(2, timedelta(minutes=-9)) + # Shouldn't trigger, since there are 4 events in the comparison period, and 2/4 == 50% + assert self.get_detector_state(detector) == DetectorPriorityLevel.OK + + self.send_update(4, timedelta(minutes=-8)) + # Shouldn't trigger, since there are 4 events in the comparison period, and 4/4 == 100% + assert self.get_detector_state(detector) == DetectorPriorityLevel.OK + + self.send_update(6, timedelta(minutes=-7)) + # Shouldn't trigger: 6/4 == 150%, but we want > 150% + assert self.get_detector_state(detector) == DetectorPriorityLevel.OK + + self.send_update(7, timedelta(minutes=-6)) + # Should trigger: 7/4 == 175% > 150% + assert self.get_detector_state(detector) == DetectorPriorityLevel.HIGH + + # Check that we successfully resolve + self.send_update(6, timedelta(minutes=-5)) + assert self.get_detector_state(detector) == DetectorPriorityLevel.OK + + @patch("sentry.incidents.utils.process_update_helpers.metrics") + def test_is_unresolved_different_aggregate(self, helper_metrics): + detector = self.comparison_detector_above + comparison_delta = timedelta(seconds=detector.config["comparison_delta"]) + snuba_query = self.get_snuba_query(detector) + snuba_query.update(aggregate="count_unique(tags[sentry:user])") + + self.send_update(self.critical_threshold + 1, timedelta(minutes=-10), subscription=self.sub) + helper_metrics.incr.assert_has_calls( + [ + call("incidents.alert_rules.skipping_update_comparison_value_invalid"), + ] + ) + self.metrics.incr.assert_has_calls( + [ + call("incidents.alert_rules.skipping_update_invalid_aggregation_value"), + ] + ) + comparison_date = timezone.now() - comparison_delta + + for i in range(4): + self.store_event( + data={ + "timestamp": (comparison_date - timedelta(minutes=30 + i)).isoformat(), + "environment": self.environment.name, + "tags": {"sentry:user": i}, + }, + project_id=self.project.id, + ) + + self.metrics.incr.reset_mock() + self.send_update(2, timedelta(minutes=-9)) + # Shouldn't trigger, since there are 4 events in the comparison period, and 2/4 == 50% + assert self.get_detector_state(detector) == DetectorPriorityLevel.OK + + self.send_update(4, timedelta(minutes=-8)) + # Shouldn't trigger, since there are 4 events in the comparison period, and 4/4 == 100% + assert self.get_detector_state(detector) == DetectorPriorityLevel.OK + + self.send_update(6, timedelta(minutes=-7)) + # Shouldn't trigger: 6/4 == 150%, but we want > 150% + assert self.get_detector_state(detector) == DetectorPriorityLevel.OK + + self.send_update(7, timedelta(minutes=-6)) + # Should trigger: 7/4 == 175% > 150% + assert self.get_detector_state(detector) == DetectorPriorityLevel.HIGH + + # Check that we successfully resolve + self.send_update(6, timedelta(minutes=-5)) + assert self.get_detector_state(detector) == DetectorPriorityLevel.OK diff --git a/tests/sentry/incidents/subscription_processor/test_subscription_processor_base.py b/tests/sentry/incidents/subscription_processor/test_subscription_processor_base.py new file mode 100644 index 00000000000000..fd502c0aa0f531 --- /dev/null +++ b/tests/sentry/incidents/subscription_processor/test_subscription_processor_base.py @@ -0,0 +1,222 @@ +from datetime import timedelta +from functools import cached_property +from random import randint +from unittest import mock +from uuid import uuid4 + +import pytest +from django.utils import timezone + +from sentry.incidents.grouptype import MetricIssue +from sentry.incidents.subscription_processor import SubscriptionProcessor +from sentry.incidents.utils.constants import INCIDENTS_SNUBA_SUBSCRIPTION_TYPE +from sentry.incidents.utils.types import DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION +from sentry.snuba.dataset import Dataset +from sentry.snuba.models import QuerySubscription, SnubaQuery, SnubaQueryEventType +from sentry.snuba.subscriptions import create_snuba_query, create_snuba_subscription +from sentry.testutils.cases import SnubaTestCase, SpanTestCase, TestCase +from sentry.testutils.helpers.datetime import freeze_time +from sentry.testutils.helpers.features import with_feature +from sentry.workflow_engine.models import DataSource, DataSourceDetector, DetectorState +from sentry.workflow_engine.models.data_condition import Condition, DataCondition +from sentry.workflow_engine.models.detector import Detector +from sentry.workflow_engine.types import DetectorPriorityLevel + +EMPTY = object() + + +@freeze_time() +@with_feature("organizations:workflow-engine-single-process-metric-issues") +class ProcessUpdateBaseClass(TestCase, SpanTestCase, SnubaTestCase): + @pytest.fixture(autouse=True) + def _setup_metrics_patch(self): + with mock.patch("sentry.incidents.subscription_processor.metrics") as self.metrics: + yield + + def setUp(self): + super().setUp() + self._run_tasks = self.tasks() + self._run_tasks.__enter__() + + def tearDown(self): + super().tearDown() + self._run_tasks.__exit__(None, None, None) + + @cached_property + def sub(self): + subscription_id = int(self.metric_detector.data_sources.first().source_id) + return QuerySubscription.objects.get(id=subscription_id) + + def create_detector_data_source_and_data_conditions(self): + detector = self.create_detector( + project=self.project, + workflow_condition_group=self.create_data_condition_group(), + type=MetricIssue.slug, + created_by_id=self.user.id, + ) + self.create_detector_state(detector=detector) + with self.tasks(): + snuba_query = create_snuba_query( + query_type=SnubaQuery.Type.ERROR, + dataset=Dataset.Events, + query="", + aggregate="count()", + time_window=timedelta(minutes=1), + resolution=timedelta(minutes=1), + environment=self.environment, + event_types=[ + SnubaQueryEventType.EventType.ERROR, + SnubaQueryEventType.EventType.DEFAULT, + ], + ) + query_subscription = create_snuba_subscription( + project=detector.project, + subscription_type=INCIDENTS_SNUBA_SUBSCRIPTION_TYPE, + snuba_query=snuba_query, + ) + data_source = self.create_data_source( + organization=self.organization, + source_id=str(query_subscription.id), + type=DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION, + ) + self.create_data_source_detector(data_source, detector) + + self.set_up_data_conditions(detector, Condition.GREATER, 100, None, 10) + + # XXX: while we're in the rollout phase of workflow engine, the subscription processor still requires a rule. + # Create one here and delete it when we can. + rule = self.create_alert_rule() + rule.snuba_query = snuba_query + rule.save() + + return detector + + def set_up_data_conditions( + self, + detector: Detector, + threshold_type: Condition, + critical_threshold: int, + warning_threshold: int | None = None, + resolve_threshold: int | None = None, + ): + if resolve_threshold is None: + resolve_threshold = ( + critical_threshold if warning_threshold is None else warning_threshold + ) + resolve_threshold_type = ( + Condition.LESS_OR_EQUAL + if threshold_type == Condition.GREATER + else Condition.GREATER_OR_EQUAL + ) + + self.create_data_condition( + type=threshold_type, + comparison=critical_threshold, + condition_result=DetectorPriorityLevel.HIGH, + condition_group=detector.workflow_condition_group, + ) + if warning_threshold is not None: + self.create_data_condition( + type=threshold_type, + comparison=warning_threshold, + condition_result=DetectorPriorityLevel.MEDIUM, + condition_group=detector.workflow_condition_group, + ) + self.create_data_condition( + type=resolve_threshold_type, + comparison=resolve_threshold, + condition_result=DetectorPriorityLevel.OK, + condition_group=detector.workflow_condition_group, + ) + + @cached_property + def metric_detector(self): + return self.create_detector_data_source_and_data_conditions() + + @cached_property + def critical_threshold(self): + critical_detector_trigger = DataCondition.objects.get( + condition_group=self.metric_detector.workflow_condition_group, + condition_result=DetectorPriorityLevel.HIGH, + ) + return critical_detector_trigger.comparison + + @cached_property + def warning_threshold(self): + warning_detector_trigger = DataCondition.objects.get( + condition_group=self.metric_detector.workflow_condition_group, + condition_result=DetectorPriorityLevel.MEDIUM, + ) + return warning_detector_trigger.comparison + + @cached_property + def resolve_threshold(self): + resolve_detector_trigger = DataCondition.objects.get( + condition_group=self.metric_detector.workflow_condition_group, + condition_result=DetectorPriorityLevel.OK, + ) + return resolve_detector_trigger.comparison + + def get_snuba_query(self, detector: Detector): + data_source_detector = DataSourceDetector.objects.get(detector=detector) + data_source = DataSource.objects.get(id=data_source_detector.data_source.id) + query_subscription = QuerySubscription.objects.get(id=data_source.source_id) + snuba_query = SnubaQuery.objects.get(id=query_subscription.snuba_query.id) + return snuba_query + + def update_threshold( + self, detector: Detector, priority_level: DetectorPriorityLevel, new_threshold: float + ) -> None: + detector_trigger = DataCondition.objects.get( + condition_group=detector.workflow_condition_group, + condition_result=priority_level, + ) + detector_trigger.comparison = new_threshold + detector_trigger.save() + + def build_subscription_update(self, subscription, time_delta=None, value=EMPTY): + if time_delta is not None: + timestamp = timezone.now() + time_delta + else: + timestamp = timezone.now() + timestamp = timestamp.replace(microsecond=0) + + data = {} + + if subscription: + data = {"some_col_name": randint(0, 100) if value is EMPTY else value} + values = {"data": [data]} + return { + "subscription_id": subscription.subscription_id if subscription else uuid4().hex, + "values": values, + "timestamp": timestamp, + "interval": 1, + "partition": 1, + "offset": 1, + } + + def send_update(self, value, time_delta=None, subscription=None): + if time_delta is None: + time_delta = timedelta() + if subscription is None: + subscription = self.sub + processor = SubscriptionProcessor(subscription) + message = self.build_subscription_update(subscription, value=value, time_delta=time_delta) + with ( + self.feature(["organizations:incidents", "organizations:performance-view"]), + self.capture_on_commit_callbacks(execute=True), + ): + processor.process_update(message) + return processor + + def assert_no_open_period(self, rule, subscription=None): + # TODO: inverse of below + pass + + def assert_open_period(self, rule, subscription=None): + # TODO: once we are writing to IncidentGroupOpenPeriod look up the GroupOpenPeriod + pass + + def get_detector_state(self, detector: Detector) -> int: + detector_state = DetectorState.objects.get(detector=detector) + return int(detector_state.state) diff --git a/tests/sentry/incidents/subscription_processor/test_subscription_processor_workflow_engine.py b/tests/sentry/incidents/subscription_processor/test_subscription_processor_workflow_engine.py index 47c35c7a2f229a..714b8d6c20291d 100644 --- a/tests/sentry/incidents/subscription_processor/test_subscription_processor_workflow_engine.py +++ b/tests/sentry/incidents/subscription_processor/test_subscription_processor_workflow_engine.py @@ -1,3 +1,8 @@ +""" +Dual processing tests for the workflow engine/legacy system. This file will be cleaned up +after we fully migrate away from metric alerts. +""" + from datetime import timedelta from unittest import mock from unittest.mock import MagicMock, call, patch