Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
268 changes: 151 additions & 117 deletions src/sentry/uptime/consumers/results_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,155 @@ def record_check_metrics(
)


def create_backfill_misses(
detector: Detector,
subscription: UptimeSubscription,
result: CheckResult,
last_update_ms: int,
metric_tags: dict[str, str],
cluster,
) -> None:
"""
Create synthetic missed check results for gaps in the check timeline.

When a result arrives with a gap (2+ intervals since last update), this
function creates backfill misses for the intervening time periods, unless
the subscription interval was recently changed.
"""
subscription_interval_ms = 1000 * subscription.interval_seconds
num_intervals = (result["scheduled_check_time_ms"] - last_update_ms) / subscription_interval_ms

# If the scheduled check is two or more intervals since the last seen check, we can declare the
# intervening checks missed...
if last_update_ms > 0 and num_intervals > 1:
# ... but it might be the case that the user changed the frequency of the detector. So, first
# verify that the interval in postgres is the same as the last-seen interval (in redis).
# We only store in redis when we encounter a difference like this, which means we won't be able
# to tell if a missed check is real with the very first missed check. This is probably okay,
# and preferable to just writing the interval to redis on every check consumed.
last_interval_key = build_last_seen_interval_key(detector)

# If we've never set an interval before, just set this to zero, which will never compare
# true with any valid interval.
last_interval_seen: str = cluster.get(last_interval_key) or "0"

if int(last_interval_seen) == subscription_interval_ms:
# Bound the number of missed checks we generate--just in case.
num_missed_checks = min(MAX_SYNTHETIC_MISSED_CHECKS, int(num_intervals - 1))

metrics.distribution(
"uptime.result_processer.num_missing_check",
num_missed_checks,
tags=metric_tags,
)
logger.info(
"uptime.result_processor.num_missing_check",
extra={"num_missed_checks": num_missed_checks, **result},
)
if num_intervals != int(num_intervals):
logger.info(
"uptime.result_processor.invalid_check_interval",
0,
extra={
"last_update_ms": last_update_ms,
"current_update_ms": result["scheduled_check_time_ms"],
"interval_ms": subscription_interval_ms,
**result,
},
)

synthetic_metric_tags = metric_tags.copy()
synthetic_metric_tags["status"] = CHECKSTATUS_MISSED_WINDOW
for i in range(0, num_missed_checks):
missed_result: CheckResult = {
"guid": uuid.uuid4().hex,
"subscription_id": result["subscription_id"],
"status": CHECKSTATUS_MISSED_WINDOW,
"status_reason": {
"type": "miss_backfill",
"description": "Miss was never reported for this scheduled check_time",
},
"trace_id": uuid.uuid4().hex,
"span_id": uuid.uuid4().hex,
"region": result["region"],
"scheduled_check_time_ms": last_update_ms
+ ((i + 1) * subscription_interval_ms),
"actual_check_time_ms": result["actual_check_time_ms"],
"duration_ms": 0,
"request_info": None,
}
produce_eap_uptime_result(
detector,
missed_result,
synthetic_metric_tags.copy(),
)
else:
logger.info(
"uptime.result_processor.false_num_missing_check",
extra={**result},
)
cluster.set(last_interval_key, subscription_interval_ms, ex=LAST_UPDATE_REDIS_TTL)


def process_result_internal(
detector: Detector,
uptime_subscription: UptimeSubscription,
result: CheckResult,
metric_tags: dict[str, str],
cluster,
) -> None:
"""
Core result processing logic shared by main consumer and retry task.

Does NOT include: dedup check, backfill detection, queue check.
Contains: metrics, mode handling, EAP production, state updates.
"""
mode_name = UptimeMonitorMode(detector.config["mode"]).name.lower()

# We log the result stats here after the duplicate check so that we
# know the "true" duration and delay of each check. Since during
# deploys we might have checks run from both the old/new checker
# deployments, there will be overlap of when things run. The new
# deployment will have artificially inflated delay stats, since it may
# duplicate checks that already ran on time on the old deployment, but
# will have run them later.
#
# Since we process all results for a given uptime monitor in order, we
# can guarantee that we get the earliest delay stat for each scheduled
# check for the monitor here, and so this stat will be a more accurate
# measurement of delay/duration.
record_check_metrics(result, detector, {"mode": mode_name, **metric_tags})

Mode = UptimeMonitorMode
try:
match detector.config["mode"]:
case Mode.AUTO_DETECTED_ONBOARDING:
handle_onboarding_result(detector, uptime_subscription, result, metric_tags.copy())
case Mode.AUTO_DETECTED_ACTIVE | Mode.MANUAL:
handle_active_result(detector, uptime_subscription, result, metric_tags.copy())
case _:
logger.error(
"Unknown subscription mode",
extra={"mode": detector.config["mode"]},
)
except Exception:
logger.exception("Failed to process result for uptime project subscription")

# EAP production _must_ happen after handling the result, since we
# may mutate the UptimeSubscription when we determine we're in an incident
produce_eap_uptime_result(detector, result, metric_tags.copy())

# Track the last update date to allow deduplication
last_update_key = build_last_update_key(detector)
cluster.set(
last_update_key,
int(result["scheduled_check_time_ms"]),
ex=LAST_UPDATE_REDIS_TTL,
)

record_check_completion_metrics(result, metric_tags)


def handle_active_result(
detector: Detector,
uptime_subscription: UptimeSubscription,
Expand Down Expand Up @@ -305,123 +454,8 @@ def handle_result(self, subscription: UptimeSubscription | None, result: CheckRe
)
return

subscription_interval_ms = 1000 * subscription.interval_seconds
num_intervals = (
result["scheduled_check_time_ms"] - last_update_ms
) / subscription_interval_ms

# If the scheduled check is two or more intervals since the last seen check, we can declare the
# intervening checks missed...
if last_update_raw is not None and num_intervals > 1:
# ... but it might be the case that the user changed the frequency of the detector. So, first
# verify that the interval in postgres is the same as the last-seen interval (in redis).
# We only store in redis when we encounter a difference like this, which means we won't be able
# to tell if a missed check is real with the very first missed check. This is probably okay,
# and preferable to just writing the interval to redis on every check consumed.
last_interval_key = build_last_seen_interval_key(detector)

# If we've never set an interval before, just set this to zero, which will never compare
# true with any valid interval.
last_interval_seen: str = cluster.get(last_interval_key) or "0"

if int(last_interval_seen) == subscription_interval_ms:
# Bound the number of missed checks we generate--just in case.
num_missed_checks = min(MAX_SYNTHETIC_MISSED_CHECKS, int(num_intervals - 1))

metrics.distribution(
"uptime.result_processer.num_missing_check",
num_missed_checks,
tags=metric_tags,
)
logger.info(
"uptime.result_processor.num_missing_check",
extra={"num_missed_checks": num_missed_checks, **result},
)
if num_intervals != int(num_intervals):
logger.info(
"uptime.result_processor.invalid_check_interval",
0,
extra={
"last_update_ms": last_update_ms,
"current_update_ms": result["scheduled_check_time_ms"],
"interval_ms": subscription_interval_ms,
**result,
},
)

synthetic_metric_tags = metric_tags.copy()
synthetic_metric_tags["status"] = CHECKSTATUS_MISSED_WINDOW
for i in range(0, num_missed_checks):
missed_result: CheckResult = {
"guid": uuid.uuid4().hex,
"subscription_id": result["subscription_id"],
"status": CHECKSTATUS_MISSED_WINDOW,
"status_reason": {
"type": "miss_backfill",
"description": "Miss was never reported for this scheduled check_time",
},
"trace_id": uuid.uuid4().hex,
"span_id": uuid.uuid4().hex,
"region": result["region"],
"scheduled_check_time_ms": last_update_ms
+ ((i + 1) * subscription_interval_ms),
"actual_check_time_ms": result["actual_check_time_ms"],
"duration_ms": 0,
"request_info": None,
}
produce_eap_uptime_result(
detector,
missed_result,
synthetic_metric_tags.copy(),
)
else:
logger.info(
"uptime.result_processor.false_num_missing_check",
extra={**result},
)
cluster.set(last_interval_key, subscription_interval_ms, ex=LAST_UPDATE_REDIS_TTL)

# We log the result stats here after the duplicate check so that we
# know the "true" duration and delay of each check. Since during
# deploys we might have checks run from both the old/new checker
# deployments, there will be overlap of when things run. The new
# deployment will have artificially inflated delay stats, since it may
# duplicate checks that already ran on time on the old deployment, but
# will have run them later.
#
# Since we process all results for a given uptime monitor in order, we
# can guarantee that we get the earliest delay stat for each scheduled
# check for the monitor here, and so this stat will be a more accurate
# measurement of delay/duration.
record_check_metrics(result, detector, {"mode": mode_name, **metric_tags})

Mode = UptimeMonitorMode
try:
match detector.config["mode"]:
case Mode.AUTO_DETECTED_ONBOARDING:
handle_onboarding_result(detector, subscription, result, metric_tags.copy())
case Mode.AUTO_DETECTED_ACTIVE | Mode.MANUAL:
handle_active_result(detector, subscription, result, metric_tags.copy())
case _:
logger.error(
"Unknown subscription mode",
extra={"mode": detector.config["mode"]},
)
except Exception:
logger.exception("Failed to process result for uptime project subscription")

# EAP production _must_ happen after handling the result, since we
# may mutate the UptimeSubscription when we determine we're in an incident
produce_eap_uptime_result(detector, result, metric_tags.copy())

# Track the last update date to allow deduplication
cluster.set(
last_update_key,
int(result["scheduled_check_time_ms"]),
ex=LAST_UPDATE_REDIS_TTL,
)

record_check_completion_metrics(result, metric_tags)
create_backfill_misses(detector, subscription, result, last_update_ms, metric_tags, cluster)
process_result_internal(detector, subscription, result, metric_tags, cluster)


class UptimeResultsStrategyFactory(ResultsStrategyFactory[CheckResult, UptimeSubscription]):
Expand Down
Loading