Skip to content

Commit b7f489c

Browse files
perf(replays): Optimize memory usage and deserialization performance (#101195)
`msgspec` allows us to partially deserialize a JSON payload. By removing the requirement to deserialize full and incremental snapshots, we a significantly increase the performance of the deserialization and significantly reduce the memory consumption. This is particularly important as our consumer suffers from memory usage issue and our single-threaded processing logic is a bottleneck to throughput improvements. Experiments demonstrated a 10x performance uplift and memory usage declined to, essentially, nil. There is a downside. We can no longer read data from incremental snapshots or full snapshots. To do so would require allocating memory and defeat the purpose of this PR. Presently we don't do anything with those events except log some canvas events which, I think, are unimportant enough we can remove them. Full plan: https://www.notion.so/sentry/Q4-25-Recording-Consumer-Improvements-28c8b10e4b5d803cb212e19aebefee81 Related: getsentry/pypi#1695 --------- Co-authored-by: getsantry[bot] <66042841+getsantry[bot]@users.noreply.github.com>
1 parent f2ad982 commit b7f489c

File tree

7 files changed

+289
-11
lines changed

7 files changed

+289
-11
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ dependencies = [
4848
"maxminddb>=2.3.0",
4949
"mistune>=2.0.4",
5050
"mmh3>=4.0.0",
51+
"msgspec>=0.19.0",
5152
"msgpack>=1.1.0",
5253
"openai>=1.3.5",
5354
"orjson>=3.10.10",

src/sentry/options/defaults.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,13 @@
513513
default=False,
514514
flags=FLAG_AUTOMATOR_MODIFIABLE,
515515
)
516+
# Enable new msgspec-based recording parser.
517+
register(
518+
"replay.consumer.msgspec_recording_parser",
519+
type=Bool,
520+
default=False,
521+
flags=FLAG_AUTOMATOR_MODIFIABLE,
522+
)
516523
# Trace sampling rates for replay summary endpoint.
517524
register(
518525
"replay.endpoints.project_replay_summary.trace_sample_rate_post",

src/sentry/replays/consumers/recording.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from sentry_kafka_schemas.schema_types.ingest_replay_recordings_v1 import ReplayRecording
1515
from sentry_sdk import set_tag
1616

17+
from sentry import options
1718
from sentry.conf.types.kafka_definition import Topic, get_topic_codec
1819
from sentry.replays.usecases.ingest import (
1920
DropEvent,
@@ -97,7 +98,10 @@ def process_message(message: bytes) -> ProcessedEvent | None:
9798
recording_event = parse_recording_event(message)
9899
set_tag("org_id", recording_event["context"]["org_id"])
99100
set_tag("project_id", recording_event["context"]["project_id"])
100-
return process_recording_event(recording_event)
101+
return process_recording_event(
102+
recording_event,
103+
use_new_recording_parser=options.get("replay.consumer.msgspec_recording_parser"),
104+
)
101105
except DropSilently:
102106
return None
103107
except Exception:

src/sentry/replays/usecases/ingest/__init__.py

Lines changed: 81 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from datetime import datetime, timezone
66
from typing import Any, TypedDict
77

8+
import msgspec
89
import sentry_sdk
910
from sentry_protos.snuba.v1.trace_item_pb2 import TraceItem
1011

@@ -39,6 +40,75 @@
3940
logger.addFilter(SamplingFilter(LOG_SAMPLE_RATE))
4041

4142

43+
# Msgspec allows us to define a schema which we can deserialize the typed JSON into. We can also
44+
# leverage this fact to opportunistically avoid deserialization. This is especially important
45+
# because we have huge JSON types that we don't really care about.
46+
47+
48+
class DomContentLoadedEvent(msgspec.Struct, gc=False, tag_field="type", tag=0):
49+
pass
50+
51+
52+
class LoadedEvent(msgspec.Struct, gc=False, tag_field="type", tag=1):
53+
pass
54+
55+
56+
class FullSnapshotEvent(msgspec.Struct, gc=False, tag_field="type", tag=2):
57+
pass
58+
59+
60+
class IncrementalSnapshotEvent(msgspec.Struct, gc=False, tag_field="type", tag=3):
61+
pass
62+
63+
64+
class MetaEvent(msgspec.Struct, gc=False, tag_field="type", tag=4):
65+
pass
66+
67+
68+
class PluginEvent(msgspec.Struct, gc=False, tag_field="type", tag=6):
69+
pass
70+
71+
72+
# These are the schema definitions we care about.
73+
74+
75+
class CustomEventData(msgspec.Struct, gc=False):
76+
tag: str
77+
payload: Any
78+
79+
80+
class CustomEvent(msgspec.Struct, gc=False, tag_field="type", tag=5):
81+
data: CustomEventData | None = None
82+
83+
84+
RRWebEvent = (
85+
DomContentLoadedEvent
86+
| LoadedEvent
87+
| FullSnapshotEvent
88+
| IncrementalSnapshotEvent
89+
| MetaEvent
90+
| CustomEvent
91+
| PluginEvent
92+
)
93+
94+
95+
def parse_recording_data(payload: bytes) -> list[dict]:
96+
try:
97+
# We're parsing with msgspec (if we can) and then transforming to the type that
98+
# JSON.loads returns.
99+
return [
100+
{"type": 5, "data": {"tag": e.data.tag, "payload": e.data.payload}}
101+
for e in msgspec.json.decode(payload, type=list[RRWebEvent])
102+
if isinstance(e, CustomEvent) and e.data is not None
103+
]
104+
except Exception:
105+
# We're emitting a metric instead of logging in case this thing really fails hard in
106+
# prod. We don't want a huge volume of logs slowing throughput. If there's a
107+
# significant volume of this metric we'll test against a broader cohort of data.
108+
metrics.incr("replays.recording_consumer.msgspec_decode_error")
109+
return json.loads(payload)
110+
111+
42112
class DropEvent(Exception):
43113
pass
44114

@@ -76,8 +146,10 @@ class ProcessedEvent:
76146

77147

78148
@sentry_sdk.trace
79-
def process_recording_event(message: Event) -> ProcessedEvent:
80-
parsed_output = parse_replay_events(message)
149+
def process_recording_event(
150+
message: Event, use_new_recording_parser: bool = False
151+
) -> ProcessedEvent:
152+
parsed_output = parse_replay_events(message, use_new_recording_parser)
81153
if parsed_output:
82154
replay_events, trace_items = parsed_output
83155
else:
@@ -111,8 +183,13 @@ def process_recording_event(message: Event) -> ProcessedEvent:
111183
)
112184

113185

114-
def parse_replay_events(message: Event):
186+
def parse_replay_events(message: Event, use_new_recording_parser: bool):
115187
try:
188+
if use_new_recording_parser:
189+
events = parse_recording_data(message["payload"])
190+
else:
191+
events = json.loads(message["payload"])
192+
116193
return parse_events(
117194
{
118195
"organization_id": message["context"]["org_id"],
@@ -123,7 +200,7 @@ def parse_replay_events(message: Event):
123200
"segment_id": message["context"]["segment_id"],
124201
"trace_id": extract_trace_id(message["replay_event"]),
125202
},
126-
json.loads(message["payload"]),
203+
events,
127204
)
128205
except Exception:
129206
logger.exception(

tests/sentry/replays/integration/consumers/test_recording.py

Lines changed: 176 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,12 @@ def submit(consumer: ProcessingStrategy[KafkaPayload], message: dict[str, Any])
4343
consumer.terminate()
4444

4545

46-
def test_recording_consumer(consumer: ProcessingStrategy[KafkaPayload]) -> None:
46+
@mock.patch("sentry.options.get")
47+
def test_recording_consumer(options_get, consumer: ProcessingStrategy[KafkaPayload]) -> None: # type: ignore[no-untyped-def]
48+
options_get.return_value = True
49+
4750
headers = json.dumps({"segment_id": 42}).encode()
48-
recording_payload = headers + b"\n" + zlib.compress(b"")
51+
recording_payload = headers + b"\n" + zlib.compress(json.dumps(MOCK_EVENTS).encode())
4952

5053
message = {
5154
"type": "replay_recording_not_chunked",
@@ -66,10 +69,181 @@ def test_recording_consumer(consumer: ProcessingStrategy[KafkaPayload]) -> None:
6669
# Message was successfully processed and the result was committed.
6770
assert commit.called
6871

72+
# Assert parsing yield measured output.
73+
actions = commit.call_args[0][0].actions_event
74+
assert actions is not None
75+
assert actions.canvas_sizes == []
76+
assert len(actions.click_events) == 3
77+
assert actions.click_events[0].is_dead == 0
78+
assert actions.click_events[0].is_rage == 0
79+
assert actions.click_events[1].is_dead == 1
80+
assert actions.click_events[1].is_rage == 0
81+
assert actions.click_events[2].is_dead == 1
82+
assert actions.click_events[2].is_rage == 1
83+
assert actions.multiclick_events == []
84+
assert len(actions.hydration_errors) == 1
85+
assert actions.hydration_errors[0].timestamp == 1.0
86+
assert actions.hydration_errors[0].url == "https://sentry.io"
87+
assert actions.request_response_sizes == [(1002, 8001)]
88+
89+
# Probablistic fields are ignored... Needs to be refactored such that the integration
90+
# tests can pass the state directly.
91+
#
92+
# assert actions.mutation_events == ...
93+
# assert actions.options_events == ...
94+
6995

7096
def test_recording_consumer_invalid_message(consumer: ProcessingStrategy[KafkaPayload]) -> None:
7197
with mock.patch("sentry.replays.consumers.recording.commit_recording_message") as commit:
7298
submit(consumer, {})
7399

74100
# Message was not successfully processed and the result was dropped.
75101
assert not commit.called
102+
103+
104+
MOCK_EVENTS = [
105+
# Every event other than type 5.
106+
{"type": 0, "data": {"anything": "goes"}},
107+
{"type": 1, "data": {"anything": "goes"}},
108+
{"type": 2, "data": {"anything": "goes"}},
109+
{"type": 3, "data": {"anything": "goes"}},
110+
{"type": 4, "data": {"anything": "goes"}},
111+
{"type": 6, "data": {"anything": "goes"}},
112+
# Invalid event types.
113+
{"type": 5, "data": None},
114+
{"type": 5},
115+
# Canvas Events
116+
{"type": 3, "data": {"source": 9, "id": 2440, "type": 0, "commands": [{"a": "b"}]}},
117+
# Mutation Events
118+
{
119+
"type": 5,
120+
"data": {
121+
"tag": "breadcrumb",
122+
"payload": {"category": "replay.mutations", "data": {"count": 1738}},
123+
},
124+
},
125+
# SDK Option Events
126+
{
127+
"data": {
128+
"payload": {
129+
"blockAllMedia": True,
130+
"errorSampleRate": 0,
131+
"maskAllInputs": True,
132+
"maskAllText": True,
133+
"networkCaptureBodies": True,
134+
"networkDetailHasUrls": False,
135+
"networkRequestHasHeaders": True,
136+
"networkResponseHasHeaders": True,
137+
"sessionSampleRate": 1,
138+
"useCompression": False,
139+
"useCompressionOption": True,
140+
},
141+
"tag": "options",
142+
},
143+
"timestamp": 1680009712.507,
144+
"type": 5,
145+
},
146+
# Hydration Error Events
147+
{
148+
"type": 5,
149+
"data": {
150+
"tag": "breadcrumb",
151+
"payload": {
152+
"category": "replay.hydrate-error",
153+
"timestamp": 1.0,
154+
"data": {"url": "https://sentry.io"},
155+
},
156+
},
157+
},
158+
# Request Response Size Event
159+
{
160+
"type": 5,
161+
"data": {
162+
"tag": "performanceSpan",
163+
"payload": {
164+
"op": "resource.xhr",
165+
"data": {"requestBodySize": 1002, "responseBodySize": 8001},
166+
},
167+
},
168+
},
169+
# Click Event
170+
{
171+
"type": 5,
172+
"timestamp": 1674298825,
173+
"data": {
174+
"tag": "breadcrumb",
175+
"payload": {
176+
"timestamp": 1674298825.403,
177+
"type": "default",
178+
"category": "ui.click",
179+
"message": "div#hello.hello.world",
180+
"data": {
181+
"nodeId": 1,
182+
"node": {
183+
"id": 1,
184+
"tagName": "div",
185+
"attributes": {
186+
"id": "hello",
187+
"class": "hello world",
188+
"aria-label": "test",
189+
"role": "button",
190+
"alt": "1",
191+
"data-testid": "2",
192+
"title": "3",
193+
"data-sentry-component": "SignUpForm",
194+
},
195+
"textContent": "Hello, world!",
196+
},
197+
},
198+
},
199+
},
200+
},
201+
# Test Dead Click Event
202+
{
203+
"type": 5,
204+
"data": {
205+
"tag": "breadcrumb",
206+
"payload": {
207+
"type": "default",
208+
"category": "ui.slowClickDetected",
209+
"timestamp": 1674298825.403,
210+
"message": "button.slow",
211+
"data": {
212+
"endReason": "timeout",
213+
"timeAfterClickMs": 7000,
214+
"clickCount": 3,
215+
"node": {
216+
"id": 456,
217+
"tagName": "a",
218+
"textContent": "Slow button",
219+
"attributes": {},
220+
},
221+
},
222+
},
223+
},
224+
},
225+
# Test Rage Click Event
226+
{
227+
"type": 5,
228+
"data": {
229+
"tag": "breadcrumb",
230+
"payload": {
231+
"type": "default",
232+
"category": "ui.slowClickDetected",
233+
"timestamp": 1674298825.403,
234+
"message": "button.slow",
235+
"data": {
236+
"endReason": "timeout",
237+
"timeAfterClickMs": 7000,
238+
"clickCount": 5,
239+
"node": {
240+
"id": 456,
241+
"tagName": "a",
242+
"textContent": "Slow button",
243+
"attributes": {},
244+
},
245+
},
246+
},
247+
},
248+
},
249+
]

tests/sentry/replays/unit/test_ingest.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
@django_db_all
1818
def test_process_recording_event_without_video() -> None:
1919
"""Test process_recording_event without replay video data"""
20-
payload = b'[{"type": "test"}]'
20+
payload = b'[{"type": 1}]'
2121
payload_compressed = zlib.compress(payload)
2222

2323
message: Event = {
@@ -52,7 +52,7 @@ def test_process_recording_event_without_video() -> None:
5252
@django_db_all
5353
def test_process_recording_event_with_video() -> None:
5454
"""Test process_recording_event with replay video data"""
55-
payload = b'[{"type": "test"}]'
55+
payload = b'[{"type": 1}]'
5656
payload_compressed = zlib.compress(payload)
5757
video_data = b"video"
5858

@@ -107,7 +107,8 @@ def test_parse_replay_events_empty() -> None:
107107
"payload_compressed": b"",
108108
"replay_event": None,
109109
"replay_video": None,
110-
}
110+
},
111+
True,
111112
)
112113
assert result == ParsedEventMeta([], [], [], [], [], [], [], [])
113114
assert trace_items == []
@@ -130,7 +131,8 @@ def test_parse_replay_events_invalid_json() -> None:
130131
"payload_compressed": b"",
131132
"replay_event": None,
132133
"replay_video": None,
133-
}
134+
},
135+
True,
134136
)
135137
assert result is None
136138

0 commit comments

Comments
 (0)