From 7f07fa4103704bc9c99c6e364c6f356c9add1c18 Mon Sep 17 00:00:00 2001 From: Colton Allen Date: Fri, 3 Oct 2025 08:58:33 -0500 Subject: [PATCH 1/2] Add msgspec integration --- .../replays/usecases/ingest/__init__.py | 67 ++++++++++++++++++- tests/sentry/replays/unit/test_ingest.py | 4 +- 2 files changed, 68 insertions(+), 3 deletions(-) diff --git a/src/sentry/replays/usecases/ingest/__init__.py b/src/sentry/replays/usecases/ingest/__init__.py index dc83a7e33578d2..512ab884874140 100644 --- a/src/sentry/replays/usecases/ingest/__init__.py +++ b/src/sentry/replays/usecases/ingest/__init__.py @@ -5,6 +5,7 @@ from datetime import datetime, timezone from typing import Any, TypedDict +import msgspec import sentry_sdk from sentry_protos.snuba.v1.trace_item_pb2 import TraceItem @@ -38,6 +39,58 @@ logger.addFilter(SamplingFilter(LOG_SAMPLE_RATE)) +# Msgspec allows us to define a schema which we can deserialize the typed JSON into. We can also +# leverage this fact to opportunistically avoid deserialization. This is especially important +# because we have huge JSON types that we don't really care about. + + +class DomContentLoadedEvent(msgspec.Struct, gc=False, tag_field="type", tag=0): + pass + + +class LoadedEvent(msgspec.Struct, gc=False, tag_field="type", tag=1): + pass + + +class FullSnapshotEvent(msgspec.Struct, gc=False, tag_field="type", tag=2): + pass + + +class IncrementalSnapshotEvent(msgspec.Struct, gc=False, tag_field="type", tag=3): + pass + + +class MetaEvent(msgspec.Struct, gc=False, tag_field="type", tag=4): + pass + + +class PluginEvent(msgspec.Struct, gc=False, tag_field="type", tag=6): + pass + + +# These are the schema definitions we care about. + + +class CustomEventData(msgspec.Struct, gc=False): + tag: str + payload: Any + + +class CustomEvent(msgspec.Struct, gc=False, tag_field="type", tag=5): + data: CustomEventData + + +RRWebEvent = ( + DomContentLoadedEvent + | LoadedEvent + | FullSnapshotEvent + | IncrementalSnapshotEvent + | MetaEvent + | CustomEvent + | PluginEvent +) + + class DropEvent(Exception): pass @@ -112,6 +165,18 @@ def process_recording_event(message: Event) -> ProcessedEvent: def parse_replay_events(message: Event): try: + try: + # We're parsing with msgspec (if we can) and then transforming to the type that + # JSON.loads returns. + events = [ + {"data": {"tag": e.data.tag, "payload": e.data.payload}} + for e in msgspec.json.decode(message["payload"], type=list[RRWebEvent]) + if isinstance(e, CustomEvent) + ] + except Exception: + logger.exception("msgspec deserialization failed.") + events = json.loads(message["payload"]) + return parse_events( { "organization_id": message["context"]["org_id"], @@ -122,7 +187,7 @@ def parse_replay_events(message: Event): "segment_id": message["context"]["segment_id"], "trace_id": extract_trace_id(message["replay_event"]), }, - json.loads(message["payload"]), + events, ) except Exception: logger.exception( diff --git a/tests/sentry/replays/unit/test_ingest.py b/tests/sentry/replays/unit/test_ingest.py index 4b9da64adb3858..93c9ba48cb9671 100644 --- a/tests/sentry/replays/unit/test_ingest.py +++ b/tests/sentry/replays/unit/test_ingest.py @@ -17,7 +17,7 @@ @django_db_all def test_process_recording_event_without_video() -> None: """Test process_recording_event without replay video data""" - payload = b'[{"type": "test"}]' + payload = b'[{"type": 1}]' payload_compressed = zlib.compress(payload) message: Event = { @@ -52,7 +52,7 @@ def test_process_recording_event_without_video() -> None: @django_db_all def test_process_recording_event_with_video() -> None: """Test process_recording_event with replay video data""" - payload = b'[{"type": "test"}]' + payload = b'[{"type": 1}]' payload_compressed = zlib.compress(payload) video_data = b"video" From 86a999fd689fa537ea33c6d3e10e7d9d69fde33d Mon Sep 17 00:00:00 2001 From: Colton Allen Date: Fri, 3 Oct 2025 10:47:30 -0500 Subject: [PATCH 2/2] Emit metric instead of log --- src/sentry/replays/usecases/ingest/__init__.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/sentry/replays/usecases/ingest/__init__.py b/src/sentry/replays/usecases/ingest/__init__.py index 512ab884874140..c1125a2ea98e34 100644 --- a/src/sentry/replays/usecases/ingest/__init__.py +++ b/src/sentry/replays/usecases/ingest/__init__.py @@ -174,7 +174,10 @@ def parse_replay_events(message: Event): if isinstance(e, CustomEvent) ] except Exception: - logger.exception("msgspec deserialization failed.") + # We're emitting a metric instead of logging in case this thing really fails hard in + # prod. We don't want a huge volume of logs slowing throughput. If there's a + # significant volume of this metric we'll test against a broader cohort of data. + metrics.incr("replays.recording_consumer.msgspec_decode_error") events = json.loads(message["payload"]) return parse_events(