Skip to content
Merged
70 changes: 69 additions & 1 deletion src/sentry/replays/usecases/ingest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from datetime import datetime, timezone
from typing import Any, TypedDict

import msgspec
import sentry_sdk
from sentry_protos.snuba.v1.trace_item_pb2 import TraceItem

Expand Down Expand Up @@ -38,6 +39,58 @@
logger.addFilter(SamplingFilter(LOG_SAMPLE_RATE))


# Msgspec allows us to define a schema which we can deserialize the typed JSON into. We can also
# leverage this fact to opportunistically avoid deserialization. This is especially important
# because we have huge JSON types that we don't really care about.


class DomContentLoadedEvent(msgspec.Struct, gc=False, tag_field="type", tag=0):
pass


class LoadedEvent(msgspec.Struct, gc=False, tag_field="type", tag=1):
pass


class FullSnapshotEvent(msgspec.Struct, gc=False, tag_field="type", tag=2):
pass


class IncrementalSnapshotEvent(msgspec.Struct, gc=False, tag_field="type", tag=3):
pass


class MetaEvent(msgspec.Struct, gc=False, tag_field="type", tag=4):
pass


class PluginEvent(msgspec.Struct, gc=False, tag_field="type", tag=6):
pass


# These are the schema definitions we care about.


class CustomEventData(msgspec.Struct, gc=False):
tag: str
payload: Any


class CustomEvent(msgspec.Struct, gc=False, tag_field="type", tag=5):
data: CustomEventData


RRWebEvent = (
DomContentLoadedEvent
| LoadedEvent
| FullSnapshotEvent
| IncrementalSnapshotEvent
| MetaEvent
| CustomEvent
| PluginEvent
)


class DropEvent(Exception):
pass

Expand Down Expand Up @@ -112,6 +165,21 @@ def process_recording_event(message: Event) -> ProcessedEvent:

def parse_replay_events(message: Event):
try:
try:
# We're parsing with msgspec (if we can) and then transforming to the type that
# JSON.loads returns.
events = [
{"data": {"tag": e.data.tag, "payload": e.data.payload}}
for e in msgspec.json.decode(message["payload"], type=list[RRWebEvent])
if isinstance(e, CustomEvent)
]
except Exception:
# We're emitting a metric instead of logging in case this thing really fails hard in
# prod. We don't want a huge volume of logs slowing throughput. If there's a
# significant volume of this metric we'll test against a broader cohort of data.
metrics.incr("replays.recording_consumer.msgspec_decode_error")
events = json.loads(message["payload"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Replay Parsing Inconsistency

The replay event parsing has inconsistent behavior between the msgspec path and its json.loads fallback. The msgspec path only includes CustomEvent instances, transforming their structure, while the fallback includes all events in their original format. This leads to unpredictable downstream processing depending on which path is taken.

Fix in Cursor Fix in Web

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true. It is unpredictable (more specifically it is deterministic but dependent on user input which we may consider to be random) whether certain paths will be taken. However the affected paths are minor logging statements.


return parse_events(
{
"organization_id": message["context"]["org_id"],
Expand All @@ -122,7 +190,7 @@ def parse_replay_events(message: Event):
"segment_id": message["context"]["segment_id"],
"trace_id": extract_trace_id(message["replay_event"]),
},
json.loads(message["payload"]),
events,
)
except Exception:
logger.exception(
Expand Down
4 changes: 2 additions & 2 deletions tests/sentry/replays/unit/test_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
@django_db_all
def test_process_recording_event_without_video() -> None:
"""Test process_recording_event without replay video data"""
payload = b'[{"type": "test"}]'
payload = b'[{"type": 1}]'
payload_compressed = zlib.compress(payload)

message: Event = {
Expand Down Expand Up @@ -52,7 +52,7 @@ def test_process_recording_event_without_video() -> None:
@django_db_all
def test_process_recording_event_with_video() -> None:
"""Test process_recording_event with replay video data"""
payload = b'[{"type": "test"}]'
payload = b'[{"type": 1}]'
payload_compressed = zlib.compress(payload)
video_data = b"video"

Expand Down
Loading