Skip to content
Merged
7 changes: 7 additions & 0 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,13 @@
default=False,
flags=FLAG_AUTOMATOR_MODIFIABLE,
)
# Enable new msgspec-based recording parser.
register(
"replay.consumer.msgspec_recording_parser",
type=Bool,
default=False,
flags=FLAG_AUTOMATOR_MODIFIABLE,
)
# Trace sampling rates for replay summary endpoint.
register(
"replay.endpoints.project_replay_summary.trace_sample_rate_post",
Expand Down
6 changes: 5 additions & 1 deletion src/sentry/replays/consumers/recording.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from sentry_kafka_schemas.schema_types.ingest_replay_recordings_v1 import ReplayRecording
from sentry_sdk import set_tag

from sentry import options
from sentry.conf.types.kafka_definition import Topic, get_topic_codec
from sentry.replays.usecases.ingest import (
DropEvent,
Expand Down Expand Up @@ -97,7 +98,10 @@ def process_message(message: bytes) -> ProcessedEvent | None:
recording_event = parse_recording_event(message)
set_tag("org_id", recording_event["context"]["org_id"])
set_tag("project_id", recording_event["context"]["project_id"])
return process_recording_event(recording_event)
return process_recording_event(
recording_event,
use_new_recording_parser=options.get("replay.consumer.msgspec_recording_parser"),
)
except DropSilently:
return None
except Exception:
Expand Down
85 changes: 81 additions & 4 deletions src/sentry/replays/usecases/ingest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from datetime import datetime, timezone
from typing import Any, TypedDict

import msgspec
import sentry_sdk
from sentry_protos.snuba.v1.trace_item_pb2 import TraceItem

Expand Down Expand Up @@ -38,6 +39,75 @@
logger.addFilter(SamplingFilter(LOG_SAMPLE_RATE))


# Msgspec allows us to define a schema which we can deserialize the typed JSON into. We can also
# leverage this fact to opportunistically avoid deserialization. This is especially important
# because we have huge JSON types that we don't really care about.


class DomContentLoadedEvent(msgspec.Struct, gc=False, tag_field="type", tag=0):
pass


class LoadedEvent(msgspec.Struct, gc=False, tag_field="type", tag=1):
pass


class FullSnapshotEvent(msgspec.Struct, gc=False, tag_field="type", tag=2):
pass


class IncrementalSnapshotEvent(msgspec.Struct, gc=False, tag_field="type", tag=3):
pass


class MetaEvent(msgspec.Struct, gc=False, tag_field="type", tag=4):
pass


class PluginEvent(msgspec.Struct, gc=False, tag_field="type", tag=6):
pass


# These are the schema definitions we care about.


class CustomEventData(msgspec.Struct, gc=False):
tag: str
payload: Any


class CustomEvent(msgspec.Struct, gc=False, tag_field="type", tag=5):
data: CustomEventData | None = None


RRWebEvent = (
DomContentLoadedEvent
| LoadedEvent
| FullSnapshotEvent
| IncrementalSnapshotEvent
| MetaEvent
| CustomEvent
| PluginEvent
)


def parse_recording_data(payload: bytes) -> list[dict]:
try:
# We're parsing with msgspec (if we can) and then transforming to the type that
# JSON.loads returns.
return [
{"type": 5, "data": {"tag": e.data.tag, "payload": e.data.payload}}
for e in msgspec.json.decode(payload, type=list[RRWebEvent])
if isinstance(e, CustomEvent) and e.data is not None
]
except Exception:
# We're emitting a metric instead of logging in case this thing really fails hard in
# prod. We don't want a huge volume of logs slowing throughput. If there's a
# significant volume of this metric we'll test against a broader cohort of data.
metrics.incr("replays.recording_consumer.msgspec_decode_error")
return json.loads(payload)


class DropEvent(Exception):
pass

Expand Down Expand Up @@ -75,8 +145,10 @@ class ProcessedEvent:


@sentry_sdk.trace
def process_recording_event(message: Event) -> ProcessedEvent:
parsed_output = parse_replay_events(message)
def process_recording_event(
message: Event, use_new_recording_parser: bool = False
) -> ProcessedEvent:
parsed_output = parse_replay_events(message, use_new_recording_parser)
if parsed_output:
replay_events, trace_items = parsed_output
else:
Expand Down Expand Up @@ -110,8 +182,13 @@ def process_recording_event(message: Event) -> ProcessedEvent:
)


def parse_replay_events(message: Event):
def parse_replay_events(message: Event, use_new_recording_parser: bool):
try:
if use_new_recording_parser:
events = parse_recording_data(message["payload"])
else:
events = json.loads(message["payload"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Replay Parsing Inconsistency

The replay event parsing has inconsistent behavior between the msgspec path and its json.loads fallback. The msgspec path only includes CustomEvent instances, transforming their structure, while the fallback includes all events in their original format. This leads to unpredictable downstream processing depending on which path is taken.

Fix in Cursor Fix in Web

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true. It is unpredictable (more specifically it is deterministic but dependent on user input which we may consider to be random) whether certain paths will be taken. However the affected paths are minor logging statements.


return parse_events(
{
"organization_id": message["context"]["org_id"],
Expand All @@ -122,7 +199,7 @@ def parse_replay_events(message: Event):
"segment_id": message["context"]["segment_id"],
"trace_id": extract_trace_id(message["replay_event"]),
},
json.loads(message["payload"]),
events,
)
except Exception:
logger.exception(
Expand Down
178 changes: 176 additions & 2 deletions tests/sentry/replays/integration/consumers/test_recording.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ def submit(consumer: ProcessingStrategy[KafkaPayload], message: dict[str, Any])
consumer.terminate()


def test_recording_consumer(consumer: ProcessingStrategy[KafkaPayload]) -> None:
@mock.patch("sentry.replays.usecases.ingest.event_parser.options.get")
def test_recording_consumer(options_get, consumer: ProcessingStrategy[KafkaPayload]) -> None:
options_get.return_value = True

headers = json.dumps({"segment_id": 42}).encode()
recording_payload = headers + b"\n" + zlib.compress(b"")
recording_payload = headers + b"\n" + zlib.compress(json.dumps(MOCK_EVENTS).encode())

message = {
"type": "replay_recording_not_chunked",
Expand All @@ -66,10 +69,181 @@ def test_recording_consumer(consumer: ProcessingStrategy[KafkaPayload]) -> None:
# Message was successfully processed and the result was committed.
assert commit.called

# Assert parsing yield measured output.
actions = commit.call_args[0][0].actions_event
assert actions is not None
assert actions.canvas_sizes == []
assert len(actions.click_events) == 3
assert actions.click_events[0].is_dead == 0
assert actions.click_events[0].is_rage == 0
assert actions.click_events[1].is_dead == 1
assert actions.click_events[1].is_rage == 0
assert actions.click_events[2].is_dead == 1
assert actions.click_events[2].is_rage == 1
assert actions.multiclick_events == []
assert len(actions.hydration_errors) == 1
assert actions.hydration_errors[0].timestamp == 1.0
assert actions.hydration_errors[0].url == "https://sentry.io"
assert actions.request_response_sizes == [(1002, 8001)]

# Probablistic fields are ignored... Needs to be refactored such that the integration
# tests can pass the state directly.
#
# assert actions.mutation_events == ...
# assert actions.options_events == ...


def test_recording_consumer_invalid_message(consumer: ProcessingStrategy[KafkaPayload]) -> None:
with mock.patch("sentry.replays.consumers.recording.commit_recording_message") as commit:
submit(consumer, {})

# Message was not successfully processed and the result was dropped.
assert not commit.called


MOCK_EVENTS = [
# Every event other than type 5.
{"type": 0, "data": {"anything": "goes"}},
{"type": 1, "data": {"anything": "goes"}},
{"type": 2, "data": {"anything": "goes"}},
{"type": 3, "data": {"anything": "goes"}},
{"type": 4, "data": {"anything": "goes"}},
{"type": 6, "data": {"anything": "goes"}},
# Invalid event types.
{"type": 5, "data": None},
{"type": 5},
# Canvas Events
{"type": 3, "data": {"source": 9, "id": 2440, "type": 0, "commands": [{"a": "b"}]}},
# Mutation Events
{
"type": 5,
"data": {
"tag": "breadcrumb",
"payload": {"category": "replay.mutations", "data": {"count": 1738}},
},
},
# SDK Option Events
{
"data": {
"payload": {
"blockAllMedia": True,
"errorSampleRate": 0,
"maskAllInputs": True,
"maskAllText": True,
"networkCaptureBodies": True,
"networkDetailHasUrls": False,
"networkRequestHasHeaders": True,
"networkResponseHasHeaders": True,
"sessionSampleRate": 1,
"useCompression": False,
"useCompressionOption": True,
},
"tag": "options",
},
"timestamp": 1680009712.507,
"type": 5,
},
# Hydration Error Events
{
"type": 5,
"data": {
"tag": "breadcrumb",
"payload": {
"category": "replay.hydrate-error",
"timestamp": 1.0,
"data": {"url": "https://sentry.io"},
},
},
},
# Request Response Size Event
{
"type": 5,
"data": {
"tag": "performanceSpan",
"payload": {
"op": "resource.xhr",
"data": {"requestBodySize": 1002, "responseBodySize": 8001},
},
},
},
# Click Event
{
"type": 5,
"timestamp": 1674298825,
"data": {
"tag": "breadcrumb",
"payload": {
"timestamp": 1674298825.403,
"type": "default",
"category": "ui.click",
"message": "div#hello.hello.world",
"data": {
"nodeId": 1,
"node": {
"id": 1,
"tagName": "div",
"attributes": {
"id": "hello",
"class": "hello world",
"aria-label": "test",
"role": "button",
"alt": "1",
"data-testid": "2",
"title": "3",
"data-sentry-component": "SignUpForm",
},
"textContent": "Hello, world!",
},
},
},
},
},
# Test Dead Click Event
{
"type": 5,
"data": {
"tag": "breadcrumb",
"payload": {
"type": "default",
"category": "ui.slowClickDetected",
"timestamp": 1674298825.403,
"message": "button.slow",
"data": {
"endReason": "timeout",
"timeAfterClickMs": 7000,
"clickCount": 3,
"node": {
"id": 456,
"tagName": "a",
"textContent": "Slow button",
"attributes": {},
},
},
},
},
},
# Test Rage Click Event
{
"type": 5,
"data": {
"tag": "breadcrumb",
"payload": {
"type": "default",
"category": "ui.slowClickDetected",
"timestamp": 1674298825.403,
"message": "button.slow",
"data": {
"endReason": "timeout",
"timeAfterClickMs": 7000,
"clickCount": 5,
"node": {
"id": 456,
"tagName": "a",
"textContent": "Slow button",
"attributes": {},
},
},
},
},
},
]
10 changes: 6 additions & 4 deletions tests/sentry/replays/unit/test_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
@django_db_all
def test_process_recording_event_without_video() -> None:
"""Test process_recording_event without replay video data"""
payload = b'[{"type": "test"}]'
payload = b'[{"type": 1}]'
payload_compressed = zlib.compress(payload)

message: Event = {
Expand Down Expand Up @@ -52,7 +52,7 @@ def test_process_recording_event_without_video() -> None:
@django_db_all
def test_process_recording_event_with_video() -> None:
"""Test process_recording_event with replay video data"""
payload = b'[{"type": "test"}]'
payload = b'[{"type": 1}]'
payload_compressed = zlib.compress(payload)
video_data = b"video"

Expand Down Expand Up @@ -107,7 +107,8 @@ def test_parse_replay_events_empty() -> None:
"payload_compressed": b"",
"replay_event": None,
"replay_video": None,
}
},
True,
)
assert result == ParsedEventMeta([], [], [], [], [], [], [])
assert trace_items == []
Expand All @@ -130,7 +131,8 @@ def test_parse_replay_events_invalid_json() -> None:
"payload_compressed": b"",
"replay_event": None,
"replay_video": None,
}
},
True,
)
assert result is None

Expand Down
Loading