Skip to content

Commit 3273e16

Browse files
authored
feat(workflow_engine): Add process_data_packets method (#82002)
## Description Adding a wrapper method, `process_data_packets` to merge the process_data_sources and process_detectors. Still thinking through higher level abstractions to simplify using the workflow_engine, but this should simplify integrations a little further for now.
1 parent d1e9d20 commit 3273e16

File tree

8 files changed

+109
-32
lines changed

8 files changed

+109
-32
lines changed

src/sentry/workflow_engine/models/data_source.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,11 @@ class DataSource(DefaultFieldsModel):
2929
__relocation_scope__ = RelocationScope.Organization
3030

3131
organization = FlexibleForeignKey("sentry.Organization")
32+
33+
# Should this be a string so we can support UUID / ints?
3234
query_id = BoundedBigIntegerField()
35+
36+
# TODO - Add a type here
3337
type = models.TextField()
3438

3539
detectors = models.ManyToManyField("workflow_engine.Detector", through=DataSourceDetector)

src/sentry/workflow_engine/processors/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
__all__ = [
22
"process_data_sources",
33
"process_detectors",
4+
"process_workflows",
5+
"process_data_packet",
46
]
57

68
from .data_source import process_data_sources
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from sentry.workflow_engine.handlers.detector import DetectorEvaluationResult
2+
from sentry.workflow_engine.models import DataPacket, Detector
3+
from sentry.workflow_engine.processors.data_source import process_data_sources
4+
from sentry.workflow_engine.processors.detector import process_detectors
5+
from sentry.workflow_engine.types import DetectorGroupKey
6+
7+
8+
def process_data_packets(
9+
data_packets: list[DataPacket], query_type: str
10+
) -> list[tuple[Detector, dict[DetectorGroupKey, DetectorEvaluationResult]]]:
11+
"""
12+
This method ties the two main pre-processing methods together to process
13+
the incoming data and create issue occurrences.
14+
"""
15+
processed_sources = process_data_sources(data_packets, query_type)
16+
17+
results: list[tuple[Detector, dict[DetectorGroupKey, DetectorEvaluationResult]]] = []
18+
for data_packet, detectors in processed_sources:
19+
detector_results = process_detectors(data_packet, detectors)
20+
21+
for detector, detector_state in detector_results:
22+
results.append((detector, detector_state))
23+
24+
return results

src/sentry/workflow_engine/processors/data_source.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ def process_data_sources(
1414
) -> list[tuple[DataPacket, list[Detector]]]:
1515
metrics.incr("sentry.workflow_engine.process_data_sources", tags={"query_type": query_type})
1616

17+
# TODO - change data_source.query_id to be a string to support UUIDs
1718
data_packet_ids = {int(packet.query_id) for packet in data_packets}
1819

1920
# Fetch all data sources and associated detectors for the given data packets
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from sentry.workflow_engine.processors.data_packet import process_data_packets
2+
from sentry.workflow_engine.types import DetectorPriorityLevel
3+
from tests.sentry.workflow_engine.test_base import BaseWorkflowTest
4+
5+
6+
class TestProcessDataPacket(BaseWorkflowTest):
7+
def setUp(self):
8+
self.snuba_query = self.create_snuba_query()
9+
10+
(self.workflow, self.detector, self.detector_workflow, self.workflow_triggers) = (
11+
self.create_detector_and_workflow()
12+
)
13+
14+
self.data_source, self.data_packet = self.create_test_query_data_source(self.detector)
15+
16+
def test_single_data_packet(self):
17+
results = process_data_packets([self.data_packet], "snuba_query_subscription")
18+
assert len(results) == 1
19+
20+
detector, detector_evaluation_result = results[0]
21+
assert detector_evaluation_result[None].priority == DetectorPriorityLevel.HIGH

tests/sentry/workflow_engine/processors/test_workflow.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from unittest import mock
22

3-
from sentry.incidents.grouptype import MetricAlertFire
3+
from sentry.issues.grouptype import ErrorGroupType
44
from sentry.workflow_engine.models import DataConditionGroup
55
from sentry.workflow_engine.models.data_condition import Condition
66
from sentry.workflow_engine.processors.workflow import evaluate_workflow_triggers, process_workflows
@@ -14,12 +14,13 @@ def setUp(self):
1414
self.detector,
1515
self.detector_workflow,
1616
self.workflow_triggers,
17-
) = self.create_detector_and_workflow(detector_type=MetricAlertFire.slug)
17+
) = self.create_detector_and_workflow()
1818

1919
self.error_workflow, self.error_detector, self.detector_workflow_error, _ = (
2020
self.create_detector_and_workflow(
2121
name_prefix="error",
2222
workflow_triggers=self.create_data_condition_group(),
23+
detector_type=ErrorGroupType.slug,
2324
)
2425
)
2526

tests/sentry/workflow_engine/test_base.py

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,24 @@
1-
from datetime import datetime
1+
from datetime import UTC, datetime
22
from uuid import uuid4
33

44
from sentry.eventstore.models import Event, GroupEvent
5-
from sentry.issues.grouptype import ErrorGroupType
5+
from sentry.incidents.grouptype import MetricAlertFire
6+
from sentry.incidents.utils.types import QuerySubscriptionUpdate
67
from sentry.models.group import Group
78
from sentry.snuba.models import SnubaQuery
89
from sentry.testutils.cases import TestCase
910
from sentry.testutils.factories import EventType
1011
from sentry.workflow_engine.models import (
1112
Action,
1213
DataConditionGroup,
14+
DataPacket,
15+
DataSource,
1316
Detector,
1417
DetectorWorkflow,
1518
Workflow,
1619
)
1720
from sentry.workflow_engine.models.data_condition import Condition
21+
from sentry.workflow_engine.types import DetectorPriorityLevel
1822
from tests.sentry.issues.test_utils import OccurrenceTestMixin
1923

2024

@@ -66,9 +70,13 @@ def create_detector_and_workflow(
6670
self,
6771
name_prefix="test",
6872
workflow_triggers: DataConditionGroup | None = None,
69-
detector_type: str = ErrorGroupType.slug,
73+
detector_type: str = MetricAlertFire.slug,
7074
**kwargs,
7175
) -> tuple[Workflow, Detector, DetectorWorkflow, DataConditionGroup]:
76+
"""
77+
Create a Worfkllow, Detector, DetectorWorkflow, and DataConditionGroup for testing.
78+
These models are configured to work together to test the workflow engine.
79+
"""
7280
workflow_triggers = workflow_triggers or self.create_data_condition_group()
7381

7482
if not workflow_triggers.conditions.exists():
@@ -100,6 +108,46 @@ def create_detector_and_workflow(
100108

101109
return workflow, detector, detector_workflow, workflow_triggers
102110

111+
def create_test_query_data_source(self, detector) -> tuple[DataSource, DataPacket]:
112+
"""
113+
Create a DataSource and DataPacket for testing; this will create a fake QuerySubscriptionUpdate and link it to a data_source.
114+
115+
A detector is required to create this test data, so we can ensure that the detector
116+
has a condition to evaluate for the data_packet that evalutes to true.
117+
"""
118+
subscription_update: QuerySubscriptionUpdate = {
119+
"subscription_id": "123",
120+
"values": {"foo": 1},
121+
"timestamp": datetime.now(UTC),
122+
"entity": "test-entity",
123+
}
124+
125+
data_source = self.create_data_source(
126+
query_id=subscription_update["subscription_id"],
127+
organization=self.organization,
128+
)
129+
130+
data_source.detectors.add(detector)
131+
132+
if detector.workflow_condition_group is None:
133+
detector.workflow_condition_group = self.create_data_condition_group(logic_type="any")
134+
detector.save()
135+
136+
self.create_data_condition(
137+
condition_group=detector.workflow_condition_group,
138+
type=Condition.EQUAL,
139+
condition_result=DetectorPriorityLevel.HIGH,
140+
comparison=1,
141+
)
142+
143+
# Create a data_packet from the update for testing
144+
data_packet = DataPacket[QuerySubscriptionUpdate](
145+
query_id=subscription_update["subscription_id"],
146+
packet=subscription_update,
147+
)
148+
149+
return data_source, data_packet
150+
103151
def create_workflow_action(
104152
self,
105153
workflow: Workflow,

tests/sentry/workflow_engine/test_integration.py

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,11 @@
33

44
from sentry.eventstream.types import EventStreamEventType
55
from sentry.incidents.grouptype import MetricAlertFire
6-
from sentry.incidents.utils.types import QuerySubscriptionUpdate
76
from sentry.issues.grouptype import ErrorGroupType
87
from sentry.issues.ingest import save_issue_occurrence
98
from sentry.models.group import Group
109
from sentry.tasks.post_process import post_process_group
1110
from sentry.testutils.helpers.features import with_feature
12-
from sentry.workflow_engine.models import DataPacket, DataSource
1311
from sentry.workflow_engine.models.data_condition import Condition
1412
from sentry.workflow_engine.processors import process_data_sources, process_detectors
1513
from sentry.workflow_engine.types import DetectorPriorityLevel
@@ -77,36 +75,14 @@ def call_post_process_group(
7775

7876
return cache_key
7977

80-
def create_test_data_source(self) -> DataSource:
81-
self.subscription_update: QuerySubscriptionUpdate = {
82-
"subscription_id": "123",
83-
"values": {"foo": 1},
84-
"timestamp": datetime.utcnow(),
85-
"entity": "test-entity",
86-
}
87-
88-
self.data_source = self.create_data_source(
89-
query_id=self.subscription_update["subscription_id"],
90-
organization=self.organization,
91-
)
92-
self.data_source.detectors.add(self.detector)
93-
94-
# Create a data_packet from the update for testing
95-
self.data_packet = DataPacket[QuerySubscriptionUpdate](
96-
query_id=self.subscription_update["subscription_id"],
97-
packet=self.subscription_update,
98-
)
99-
100-
return self.data_source
101-
10278

10379
class TestWorkflowEngineIntegrationToIssuePlatform(BaseWorkflowIntegrationTest):
10480
@with_feature("organizations:workflow-engine-metric-alert-processing")
10581
def test_workflow_engine__data_source__to_metric_issue_workflow(self):
10682
"""
10783
This test ensures that a data_source can create the correct event in Issue Platform
10884
"""
109-
self.create_test_data_source()
85+
self.data_source, self.data_packet = self.create_test_query_data_source(self.detector)
11086

11187
with mock.patch(
11288
"sentry.workflow_engine.processors.detector.produce_occurrence_to_kafka"
@@ -121,7 +97,7 @@ def test_workflow_engine__data_source__to_metric_issue_workflow(self):
12197

12298
@with_feature("organizations:workflow-engine-metric-alert-processing")
12399
def test_workflow_engine__data_source__different_type(self):
124-
self.create_test_data_source()
100+
self.data_source, self.data_packet = self.create_test_query_data_source(self.detector)
125101

126102
with mock.patch(
127103
"sentry.workflow_engine.processors.detector.produce_occurrence_to_kafka"
@@ -134,7 +110,7 @@ def test_workflow_engine__data_source__different_type(self):
134110

135111
@with_feature("organizations:workflow-engine-metric-alert-processing")
136112
def test_workflow_engine__data_source__no_detectors(self):
137-
self.create_test_data_source()
113+
self.data_source, self.data_packet = self.create_test_query_data_source(self.detector)
138114
self.detector.delete()
139115

140116
with mock.patch(

0 commit comments

Comments
 (0)