Skip to content

feat(crons): Start dual writing DataSource and Detector rows for crons #97542

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/sentry/monitors/consumers/monitor_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
from sentry.monitors.system_incidents import update_check_in_volume
from sentry.monitors.types import CheckinItem
from sentry.monitors.utils import (
ensure_cron_detector,
get_new_timeout_at,
get_timeout_at,
signal_first_checkin,
Expand Down Expand Up @@ -165,6 +166,7 @@ def _ensure_monitor_with_config(
"is_upserting": True,
},
)
ensure_cron_detector(monitor)
if created:
signal_monitor_created(project, None, True, monitor, None)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@
MonitorSerializer,
MonitorSerializerResponse,
)
from sentry.monitors.utils import create_issue_alert_rule, signal_monitor_created
from sentry.monitors.utils import (
create_issue_alert_rule,
ensure_cron_detector,
signal_monitor_created,
)
from sentry.monitors.validators import MonitorBulkEditValidator, MonitorValidator
from sentry.search.utils import tokenize_query
from sentry.types.actor import Actor
Expand Down Expand Up @@ -291,6 +295,8 @@ def post(self, request: AuthenticatedHttpRequest, organization) -> Response:
except MonitorLimitsExceeded as e:
return self.respond({type(e).__name__: str(e)}, status=403)

ensure_cron_detector(monitor)

# Attempt to assign a seat for this monitor
seat_outcome = quotas.backend.assign_monitor_seat(monitor)
if seat_outcome != Outcome.ACCEPTED:
Expand Down
29 changes: 29 additions & 0 deletions src/sentry/monitors/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from collections import defaultdict
from datetime import datetime, timedelta

Expand All @@ -13,6 +14,7 @@
from sentry.models.rule import Rule, RuleActivity, RuleActivityType, RuleSource
from sentry.monitors.constants import DEFAULT_CHECKIN_MARGIN, MAX_TIMEOUT, TIMEOUT
from sentry.monitors.models import CheckInStatus, Monitor, MonitorCheckIn
from sentry.monitors.types import DATA_SOURCE_CRON_MONITOR
from sentry.projects.project_rules.creator import ProjectRuleCreator
from sentry.projects.project_rules.updater import ProjectRuleUpdater
from sentry.signals import (
Expand All @@ -23,7 +25,11 @@
from sentry.users.models.user import User
from sentry.utils.audit import create_audit_entry, create_system_audit_entry
from sentry.utils.auth import AuthenticatedHttpRequest
from sentry.utils.db import atomic_transaction
from sentry.utils.projectflags import set_project_flag_and_signal
from sentry.workflow_engine.models import DataSource, DataSourceDetector, Detector

logger = logging.getLogger(__name__)


def signal_first_checkin(project: Project, monitor: Monitor):
Expand Down Expand Up @@ -383,3 +389,26 @@ def update_issue_alert_rule(
)

return issue_alert_rule.id


def ensure_cron_detector(monitor: Monitor):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel like much can go wrong here, but if we want to be more cautious we could put this creation behind a flag.

from sentry.issues.grouptype import MonitorCheckInFailure

try:
with atomic_transaction(using=router.db_for_write(DataSource)):
data_source, created = DataSource.objects.get_or_create(
type=DATA_SOURCE_CRON_MONITOR,
organization_id=monitor.organization_id,
source_id=str(monitor.id),
)
if created:
detector = Detector.objects.create(
type=MonitorCheckInFailure.slug,
project_id=monitor.project_id,
name=monitor.name,
owner_user_id=monitor.owner_user_id,
owner_team_id=monitor.owner_team_id,
)
DataSourceDetector.objects.create(data_source=data_source, detector=detector)
except Exception:
logger.exception("Error creating cron detector")
7 changes: 6 additions & 1 deletion tests/sentry/monitors/consumers/test_monitor_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@
ScheduleType,
)
from sentry.monitors.processing_errors.errors import ProcessingErrorsException, ProcessingErrorType
from sentry.monitors.types import CheckinItem
from sentry.monitors.types import DATA_SOURCE_CRON_MONITOR, CheckinItem
from sentry.testutils.asserts import assert_org_audit_log_exists
from sentry.testutils.cases import TestCase
from sentry.testutils.helpers.options import override_options
from sentry.testutils.outbox import outbox_runner
from sentry.utils import json
from sentry.utils.outcomes import Outcome
from sentry.workflow_engine.models import Detector


class ExpectNoProcessingError:
Expand Down Expand Up @@ -572,6 +573,10 @@ def test_monitor_create(self) -> None:
monitor_environment.next_checkin_latest
== monitor_environment.monitor.get_next_expected_checkin_latest(checkin.date_added)
)
assert Detector.objects.filter(
datasource__type=DATA_SOURCE_CRON_MONITOR,
datasource__source_id=str(monitor_environment.monitor_id),
).exists()

def test_monitor_create_owner(self) -> None:
self.send_checkin(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
from sentry.constants import ObjectStatus
from sentry.models.rule import Rule, RuleSource
from sentry.monitors.models import Monitor, MonitorStatus, ScheduleType
from sentry.monitors.types import DATA_SOURCE_CRON_MONITOR
from sentry.quotas.base import SeatAssignmentResult
from sentry.slug.errors import DEFAULT_SLUG_ERROR_MESSAGE
from sentry.testutils.asserts import assert_org_audit_log_exists
from sentry.testutils.cases import MonitorTestCase
from sentry.testutils.outbox import outbox_runner
from sentry.utils.outcomes import Outcome
from sentry.workflow_engine.models import Detector


class ListOrganizationMonitorsTest(MonitorTestCase):
Expand Down Expand Up @@ -402,6 +404,11 @@ def test_simple(self, mock_record: MagicMock) -> None:
data={"upsert": False, **monitor.get_audit_log_data()},
)

assert Detector.objects.filter(
datasource__type=DATA_SOURCE_CRON_MONITOR,
datasource__source_id=str(monitor.id),
).exists()

self.project.refresh_from_db()
assert self.project.flags.has_cron_monitors

Expand Down
120 changes: 120 additions & 0 deletions tests/sentry/monitors/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from unittest.mock import patch

from django.db import IntegrityError

from sentry.issues.grouptype import MonitorIncidentType
from sentry.monitors.types import DATA_SOURCE_CRON_MONITOR
from sentry.monitors.utils import ensure_cron_detector
from sentry.testutils.cases import TestCase
from sentry.workflow_engine.models import DataSource, DataSourceDetector, Detector


class EnsureCronDetectorTest(TestCase):
def setUp(self):
super().setUp()
self.monitor = self.create_monitor(owner_user_id=None)

def test_creates_data_source_and_detector_for_new_monitor(self):
assert not DataSource.objects.filter(
type=DATA_SOURCE_CRON_MONITOR,
organization_id=self.monitor.organization_id,
source_id=str(self.monitor.id),
).exists()

ensure_cron_detector(self.monitor)
data_source = DataSource.objects.get(
type=DATA_SOURCE_CRON_MONITOR,
organization_id=self.monitor.organization_id,
source_id=str(self.monitor.id),
)
assert data_source is not None
detector = Detector.objects.get(
type=MonitorIncidentType.slug,
project_id=self.monitor.project_id,
name=self.monitor.name,
)
assert detector is not None
assert detector.owner_user_id == self.monitor.owner_user_id
assert detector.owner_team_id == self.monitor.owner_team_id
assert DataSourceDetector.objects.filter(
data_source=data_source,
detector=detector,
).exists()

def test_idempotent_for_existing_data_source(self):
ensure_cron_detector(self.monitor)
data_source = DataSource.objects.get(
type=DATA_SOURCE_CRON_MONITOR,
organization_id=self.monitor.organization_id,
source_id=str(self.monitor.id),
)
detector = Detector.objects.get(
type=MonitorIncidentType.slug,
project_id=self.monitor.project_id,
name=self.monitor.name,
)
link = DataSourceDetector.objects.get(
data_source=data_source,
detector=detector,
)
ensure_cron_detector(self.monitor)
data_source_after = DataSource.objects.get(
type=DATA_SOURCE_CRON_MONITOR,
organization_id=self.monitor.organization_id,
source_id=str(self.monitor.id),
)
detector_after = Detector.objects.get(
type=MonitorIncidentType.slug,
project_id=self.monitor.project_id,
name=self.monitor.name,
)
link_after = DataSourceDetector.objects.get(
data_source=data_source,
detector=detector,
)
assert data_source.id == data_source_after.id
assert detector.id == detector_after.id
assert link.id == link_after.id

def test_with_owner_user(self):
self.monitor.owner_user_id = self.user.id
self.monitor.save()
ensure_cron_detector(self.monitor)
detector = Detector.objects.get(
type=MonitorIncidentType.slug,
project_id=self.monitor.project_id,
)
assert detector.owner_user_id == self.user.id
assert detector.owner_team_id is None

def test_with_no_owner(self):
ensure_cron_detector(self.monitor)

detector = Detector.objects.get(
type=MonitorIncidentType.slug,
project_id=self.monitor.project_id,
)
assert detector.owner_user_id is None
assert detector.owner_team_id is None

def test_handles_database_errors_gracefully(self):
with (
patch("sentry.monitors.utils.logger") as mock_logger,
patch("sentry.monitors.utils.DataSource.objects.get_or_create") as mock_get_or_create,
):
mock_get_or_create.side_effect = IntegrityError("Database error")

ensure_cron_detector(self.monitor)
mock_logger.exception.assert_called_once_with("Error creating cron detector")
assert not DataSource.objects.filter(
type=DATA_SOURCE_CRON_MONITOR, source_id=str(self.monitor.id)
).exists()

def test_atomic_transaction_rollback(self):
with patch("sentry.monitors.utils.Detector.objects.create") as mock_create:
mock_create.side_effect = IntegrityError("Cannot create detector")

ensure_cron_detector(self.monitor)
assert not DataSource.objects.filter(
type=DATA_SOURCE_CRON_MONITOR, source_id=str(self.monitor.id)
).exists()
Loading