Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 69 additions & 1 deletion src/sentry/replays/usecases/ingest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from datetime import datetime, timezone
from typing import Any, TypedDict

import msgspec
import sentry_sdk
from sentry_protos.snuba.v1.trace_item_pb2 import TraceItem

Expand Down Expand Up @@ -38,6 +39,58 @@
logger.addFilter(SamplingFilter(LOG_SAMPLE_RATE))


# Msgspec allows us to define a schema which we can deserialize the typed JSON into. We can also
# leverage this fact to opportunistically avoid deserialization. This is especially important
# because we have huge JSON types that we don't really care about.


class DomContentLoadedEvent(msgspec.Struct, gc=False, tag_field="type", tag=0):
pass


class LoadedEvent(msgspec.Struct, gc=False, tag_field="type", tag=1):
pass


class FullSnapshotEvent(msgspec.Struct, gc=False, tag_field="type", tag=2):
pass


class IncrementalSnapshotEvent(msgspec.Struct, gc=False, tag_field="type", tag=3):
pass


class MetaEvent(msgspec.Struct, gc=False, tag_field="type", tag=4):
pass


class PluginEvent(msgspec.Struct, gc=False, tag_field="type", tag=6):
pass


# These are the schema definitions we care about.


class CustomEventData(msgspec.Struct, gc=False):
tag: str
payload: Any


class CustomEvent(msgspec.Struct, gc=False, tag_field="type", tag=5):
data: CustomEventData


RRWebEvent = (
DomContentLoadedEvent
| LoadedEvent
| FullSnapshotEvent
| IncrementalSnapshotEvent
| MetaEvent
| CustomEvent
| PluginEvent
)


class DropEvent(Exception):
pass

Expand Down Expand Up @@ -112,6 +165,21 @@ def process_recording_event(message: Event) -> ProcessedEvent:

def parse_replay_events(message: Event):
try:
try:
# We're parsing with msgspec (if we can) and then transforming to the type that
# JSON.loads returns.
events = [
{"data": {"tag": e.data.tag, "payload": e.data.payload}}
for e in msgspec.json.decode(message["payload"], type=list[RRWebEvent])
if isinstance(e, CustomEvent)
]
except Exception:
# We're emitting a metric instead of logging in case this thing really fails hard in
# prod. We don't want a huge volume of logs slowing throughput. If there's a
# significant volume of this metric we'll test against a broader cohort of data.
metrics.incr("replays.recording_consumer.msgspec_decode_error")
events = json.loads(message["payload"])

return parse_events(
{
"organization_id": message["context"]["org_id"],
Expand All @@ -122,7 +190,7 @@ def parse_replay_events(message: Event):
"segment_id": message["context"]["segment_id"],
"trace_id": extract_trace_id(message["replay_event"]),
},
json.loads(message["payload"]),
events,
)
except Exception:
logger.exception(
Expand Down
4 changes: 2 additions & 2 deletions tests/sentry/replays/unit/test_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
@django_db_all
def test_process_recording_event_without_video() -> None:
"""Test process_recording_event without replay video data"""
payload = b'[{"type": "test"}]'
payload = b'[{"type": 1}]'
payload_compressed = zlib.compress(payload)

message: Event = {
Expand Down Expand Up @@ -52,7 +52,7 @@ def test_process_recording_event_without_video() -> None:
@django_db_all
def test_process_recording_event_with_video() -> None:
"""Test process_recording_event with replay video data"""
payload = b'[{"type": "test"}]'
payload = b'[{"type": 1}]'
payload_compressed = zlib.compress(payload)
video_data = b"video"

Expand Down
Loading