Skip to content

Commit 5fc9f67

Browse files
feat(crons): Add empty incident_occurrences_consumer (#80527)
Part of GH-79328
1 parent 49ce622 commit 5fc9f67

File tree

10 files changed

+107
-3
lines changed

10 files changed

+107
-3
lines changed

requirements-base.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ rfc3339-validator>=0.1.2
6666
rfc3986-validator>=0.1.1
6767
# [end] jsonschema format validators
6868
sentry-arroyo>=2.16.5
69-
sentry-kafka-schemas>=0.1.116
69+
sentry-kafka-schemas>=0.1.118
7070
sentry-ophio==1.0.0
7171
sentry-protos>=0.1.34
7272
sentry-redis-tools>=0.1.7

requirements-dev-frozen.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ sentry-cli==2.16.0
184184
sentry-devenv==1.13.0
185185
sentry-forked-django-stubs==5.1.1.post1
186186
sentry-forked-djangorestframework-stubs==3.15.1.post2
187-
sentry-kafka-schemas==0.1.116
187+
sentry-kafka-schemas==0.1.118
188188
sentry-ophio==1.0.0
189189
sentry-protos==0.1.34
190190
sentry-redis-tools==0.1.7

requirements-frozen.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ rpds-py==0.20.0
125125
rsa==4.8
126126
s3transfer==0.10.0
127127
sentry-arroyo==2.16.5
128-
sentry-kafka-schemas==0.1.116
128+
sentry-kafka-schemas==0.1.118
129129
sentry-ophio==1.0.0
130130
sentry-protos==0.1.34
131131
sentry-redis-tools==0.1.7

src/sentry/conf/server.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2893,6 +2893,7 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
28932893
"ingest-monitors": "default",
28942894
"monitors-clock-tick": "default",
28952895
"monitors-clock-tasks": "default",
2896+
"monitors-incident-occurrences": "default",
28962897
"uptime-configs": "default",
28972898
"uptime-results": "default",
28982899
"uptime-configs": "default",

src/sentry/conf/types/kafka_definition.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ class Topic(Enum):
4747
INGEST_MONITORS = "ingest-monitors"
4848
MONITORS_CLOCK_TICK = "monitors-clock-tick"
4949
MONITORS_CLOCK_TASKS = "monitors-clock-tasks"
50+
MONITORS_INCIDENT_OCCURRENCES = "monitors-incident-occurrences"
5051
UPTIME_CONFIG = "uptime-configs"
5152
UPTIME_RESULTS = "uptime-results"
5253
UPTIME_CONFIGS = "uptime-configs"

src/sentry/consumers/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,10 @@ def ingest_transactions_options() -> list[click.Option]:
256256
"topic": Topic.MONITORS_CLOCK_TASKS,
257257
"strategy_factory": "sentry.monitors.consumers.clock_tasks_consumer.MonitorClockTasksStrategyFactory",
258258
},
259+
"monitors-incident-occurrences": {
260+
"topic": Topic.MONITORS_INCIDENT_OCCURRENCES,
261+
"strategy_factory": "sentry.monitors.consumers.incident_occurrences_consumer.MonitorIncidentOccurenceStrategyFactory",
262+
},
259263
"uptime-results": {
260264
"topic": Topic.UPTIME_RESULTS,
261265
"strategy_factory": "sentry.uptime.consumers.results_consumer.UptimeResultsStrategyFactory",
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
from collections.abc import Mapping
5+
6+
from arroyo.backends.kafka.consumer import KafkaPayload
7+
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
8+
from arroyo.processing.strategies.commit import CommitOffsets
9+
from arroyo.processing.strategies.run_task import RunTask
10+
from arroyo.types import BrokerValue, Commit, FilteredPayload, Message, Partition
11+
from sentry_kafka_schemas.codecs import Codec
12+
from sentry_kafka_schemas.schema_types.monitors_incident_occurrences_v1 import IncidentOccurrence
13+
14+
from sentry.conf.types.kafka_definition import Topic, get_topic_codec
15+
16+
logger = logging.getLogger(__name__)
17+
18+
MONITORS_INCIDENT_OCCURRENCES: Codec[IncidentOccurrence] = get_topic_codec(
19+
Topic.MONITORS_INCIDENT_OCCURRENCES
20+
)
21+
22+
23+
def process_incident_occurrence(message: Message[KafkaPayload | FilteredPayload]):
24+
assert not isinstance(message.payload, FilteredPayload)
25+
assert isinstance(message.value, BrokerValue)
26+
27+
# wrapper: IncidentOccurrence = MONITORS_INCIDENT_OCCURRENCES.decode(message.payload.value)
28+
# TODO(epurkhiser): Do something with issue occurrence
29+
30+
31+
class MonitorIncidentOccurenceStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
32+
def __init__(self) -> None:
33+
pass
34+
35+
def create_with_partitions(
36+
self,
37+
commit: Commit,
38+
partitions: Mapping[Partition, int],
39+
) -> ProcessingStrategy[KafkaPayload]:
40+
return RunTask(
41+
function=process_incident_occurrence,
42+
next_step=CommitOffsets(commit),
43+
)

src/sentry/runner/commands/devserver.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@ def devserver(
338338

339339
kafka_consumers.add("monitors-clock-tick")
340340
kafka_consumers.add("monitors-clock-tasks")
341+
kafka_consumers.add("monitors-incident-occurrences")
341342

342343
if settings.SENTRY_USE_PROFILING:
343344
kafka_consumers.add("ingest-profiles")

tests/sentry/consumers/test_run.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ def test_dlq(consumer_def) -> None:
3838
"ingest-monitors",
3939
"monitors-clock-tick",
4040
"monitors-clock-tasks",
41+
"monitors-incident-occurrences",
4142
"uptime-results",
4243
"metrics-last-seen-updater",
4344
"generic-metrics-last-seen-updater",
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
from datetime import datetime
2+
from unittest import mock
3+
4+
from arroyo.backends.kafka import KafkaPayload
5+
from arroyo.processing.strategies import ProcessingStrategy
6+
from arroyo.types import BrokerValue, Message, Partition, Topic
7+
from django.utils import timezone
8+
from sentry_kafka_schemas.schema_types.monitors_incident_occurrences_v1 import IncidentOccurrence
9+
10+
from sentry.monitors.consumers.incident_occurrences_consumer import (
11+
MONITORS_INCIDENT_OCCURRENCES,
12+
MonitorIncidentOccurenceStrategyFactory,
13+
)
14+
15+
partition = Partition(Topic("test"), 0)
16+
17+
18+
def create_consumer() -> ProcessingStrategy[KafkaPayload]:
19+
factory = MonitorIncidentOccurenceStrategyFactory()
20+
commit = mock.Mock()
21+
return factory.create_with_partitions(commit, {partition: 0})
22+
23+
24+
def sned_incident_occurrence(
25+
consumer: ProcessingStrategy[KafkaPayload],
26+
ts: datetime,
27+
incident_occurrence: IncidentOccurrence,
28+
):
29+
value = BrokerValue(
30+
KafkaPayload(b"fake-key", MONITORS_INCIDENT_OCCURRENCES.encode(incident_occurrence), []),
31+
partition,
32+
1,
33+
ts,
34+
)
35+
consumer.submit(Message(value))
36+
37+
38+
def test_simple():
39+
# XXX(epurkhiser): Doesn't really test anything yet
40+
ts = timezone.now().replace(second=0, microsecond=0)
41+
42+
consumer = create_consumer()
43+
sned_incident_occurrence(
44+
consumer,
45+
ts,
46+
{
47+
"clock_tick_ts": 1617895645,
48+
"received_ts": 1617895650,
49+
"failed_checkin_id": 123456,
50+
"incident_id": 987654,
51+
"previous_checkin_ids": [111222, 333444, 55666],
52+
},
53+
)

0 commit comments

Comments
 (0)