Skip to content

Commit 524e464

Browse files
srest2021andrewshie-sentry
authored andcommitted
fixes(REPLAY-592): Add continuous profiling to replay recording consumer (#97496)
fixes REPLAY-592 Add continuous profiling to process_message and commit_message in replay recording consumer. Add an option `replay.consumer.recording.profiling.enabled` to defaults.py, disabling profiling by default. Add new tests for profiling.
1 parent 7ed7fab commit 524e464

File tree

4 files changed

+201
-6
lines changed

4 files changed

+201
-6
lines changed

src/sentry/options/defaults.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,13 @@
524524
default=0.0,
525525
flags=FLAG_ALLOW_EMPTY | FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE,
526526
)
527+
# Enables profiling for replay recording ingestion.
528+
register(
529+
"replay.consumer.recording.profiling.enabled",
530+
type=Bool,
531+
default=False,
532+
flags=FLAG_AUTOMATOR_MODIFIABLE,
533+
)
527534

528535
# User Feedback Options
529536
register(

src/sentry/replays/consumers/recording.py

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from sentry_kafka_schemas.schema_types.ingest_replay_recordings_v1 import ReplayRecording
1616
from sentry_sdk import set_tag
1717

18+
from sentry import options
1819
from sentry.conf.types.kafka_definition import Topic, get_topic_codec
1920
from sentry.filestore.gcs import GCS_RETRYABLE_ERRORS
2021
from sentry.replays.usecases.ingest import (
@@ -65,9 +66,9 @@ def create_with_partitions(
6566
partitions: Mapping[Partition, int],
6667
) -> ProcessingStrategy[KafkaPayload]:
6768
return RunTask(
68-
function=process_message,
69+
function=process_message_with_options,
6970
next_step=RunTaskInThreads(
70-
processing_function=commit_message,
71+
processing_function=commit_message_with_options,
7172
concurrency=self.num_threads,
7273
max_pending_futures=self.max_pending_futures,
7374
next_step=CommitOffsets(commit),
@@ -78,14 +79,28 @@ def create_with_partitions(
7879
# Processing Task
7980

8081

81-
def process_message(message: Message[KafkaPayload]) -> ProcessedEvent | FilteredPayload:
82+
def process_message_with_options(
83+
message: Message[KafkaPayload],
84+
) -> ProcessedEvent | FilteredPayload:
85+
profiling_enabled = options.get(
86+
"replay.consumer.recording.profiling.enabled",
87+
)
88+
return process_message(message, profiling_enabled=profiling_enabled)
89+
90+
91+
def process_message(
92+
message: Message[KafkaPayload], profiling_enabled: bool = False
93+
) -> ProcessedEvent | FilteredPayload:
8294
with sentry_sdk.start_transaction(
8395
name="replays.consumer.recording_buffered.process_message",
8496
op="replays.consumer.recording_buffered.process_message",
8597
custom_sampling_context={
8698
"sample_rate": getattr(settings, "SENTRY_REPLAY_RECORDINGS_CONSUMER_APM_SAMPLING", 0)
8799
},
88100
):
101+
if profiling_enabled:
102+
sentry_sdk.profiler.start_profiler()
103+
89104
try:
90105
recording_event = parse_recording_event(message.payload.value)
91106
set_tag("org_id", recording_event["context"]["org_id"])
@@ -96,6 +111,9 @@ def process_message(message: Message[KafkaPayload]) -> ProcessedEvent | Filtered
96111
except Exception:
97112
logger.exception("Failed to process replay recording message.")
98113
return FilteredPayload()
114+
finally:
115+
if profiling_enabled:
116+
sentry_sdk.profiler.stop_profiler()
99117

100118

101119
@sentry_sdk.trace
@@ -170,7 +188,14 @@ def parse_headers(recording: bytes, replay_id: str) -> tuple[int, bytes]:
170188
# I/O Task
171189

172190

173-
def commit_message(message: Message[ProcessedEvent]) -> None:
191+
def commit_message_with_options(message: Message[ProcessedEvent]) -> None:
192+
profiling_enabled = options.get(
193+
"replay.consumer.recording.profiling.enabled",
194+
)
195+
return commit_message(message, profiling_enabled=profiling_enabled)
196+
197+
198+
def commit_message(message: Message[ProcessedEvent], profiling_enabled: bool = False) -> None:
174199
isolation_scope = sentry_sdk.get_isolation_scope().fork()
175200
with sentry_sdk.scope.use_isolation_scope(isolation_scope):
176201
with sentry_sdk.start_transaction(
@@ -182,6 +207,9 @@ def commit_message(message: Message[ProcessedEvent]) -> None:
182207
)
183208
},
184209
):
210+
if profiling_enabled:
211+
sentry_sdk.profiler.start_profiler()
212+
185213
try:
186214
commit_recording_message(message.payload)
187215
track_recording_metadata(message.payload)
@@ -193,3 +221,6 @@ def commit_message(message: Message[ProcessedEvent]) -> None:
193221
except Exception:
194222
logger.exception("Failed to commit replay recording message.")
195223
return None
224+
finally:
225+
if profiling_enabled:
226+
sentry_sdk.profiler.stop_profiler()

tests/sentry/replays/integration/consumers/test_recording.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,11 @@ def submit(consumer, message):
4141
consumer.terminate()
4242

4343

44-
def test_recording_consumer(consumer) -> None:
44+
@mock.patch("sentry.replays.consumers.recording.options.get")
45+
def test_recording_consumer(mock_options_get, consumer) -> None:
46+
# disable profiling
47+
mock_options_get.return_value = False
48+
4549
headers = json.dumps({"segment_id": 42}).encode()
4650
recording_payload = headers + b"\n" + zlib.compress(b"")
4751

@@ -66,7 +70,11 @@ def test_recording_consumer(consumer) -> None:
6670
assert commit.called
6771

6872

69-
def test_recording_consumer_invalid_message(consumer) -> None:
73+
@mock.patch("sentry.replays.consumers.recording.options.get")
74+
def test_recording_consumer_invalid_message(mock_options_get, consumer) -> None:
75+
# disable profiling
76+
mock_options_get.return_value = False
77+
7078
with mock.patch("sentry.replays.consumers.recording.commit_recording_message") as commit:
7179
submit(consumer, {})
7280

tests/sentry/replays/unit/consumers/test_recording.py

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import zlib
2+
from unittest.mock import patch
23

34
import msgpack
45
import pytest
@@ -7,11 +8,14 @@
78

89
from sentry.replays.consumers.recording import (
910
DropSilently,
11+
commit_message,
12+
commit_message_with_options,
1013
decompress_segment,
1114
parse_headers,
1215
parse_recording_event,
1316
parse_request_message,
1417
process_message,
18+
process_message_with_options,
1519
)
1620
from sentry.replays.usecases.ingest import ProcessedEvent
1721
from sentry.replays.usecases.ingest.event_parser import ParsedEventMeta
@@ -520,3 +524,148 @@ def test_process_message_no_headers() -> None:
520524

521525
def make_kafka_message(message) -> Message[KafkaPayload]:
522526
return Message(Value(KafkaPayload(key=None, value=msgpack.packb(message), headers=[]), {}))
527+
528+
529+
def make_processed_event_message(processed_event: ProcessedEvent) -> Message[ProcessedEvent]:
530+
return Message(Value(processed_event, {}))
531+
532+
533+
def make_valid_message() -> dict:
534+
original_payload = b'[{"type": "test", "data": "some event data"}]'
535+
compressed_payload = zlib.compress(original_payload)
536+
segment_id = 42
537+
headers = json.dumps({"segment_id": segment_id}).encode()
538+
recording_payload = headers + b"\n" + compressed_payload
539+
540+
return {
541+
"type": "replay_recording_not_chunked",
542+
"org_id": 3,
543+
"project_id": 4,
544+
"replay_id": "1",
545+
"received": 2,
546+
"retention_days": 30,
547+
"payload": recording_payload,
548+
"key_id": 1,
549+
"replay_event": b"{}",
550+
"replay_video": b"",
551+
"version": 0,
552+
}
553+
554+
555+
def make_valid_processed_event() -> ProcessedEvent:
556+
original_payload = b'[{"type": "test", "data": "some event data"}]'
557+
compressed_payload = zlib.compress(original_payload)
558+
559+
return ProcessedEvent(
560+
actions_event=ParsedEventMeta([], [], [], [], [], []),
561+
context={
562+
"key_id": 1,
563+
"org_id": 3,
564+
"project_id": 4,
565+
"received": 2,
566+
"replay_id": "1",
567+
"retention_days": 30,
568+
"segment_id": 42,
569+
},
570+
filedata=compressed_payload,
571+
filename="30/4/1/42",
572+
recording_size_uncompressed=len(original_payload),
573+
recording_size=len(compressed_payload),
574+
replay_event={},
575+
trace_items=[],
576+
video_size=None,
577+
)
578+
579+
580+
@patch("sentry.replays.consumers.recording.sentry_sdk.profiler")
581+
@pytest.mark.parametrize("profiling_enabled", [True, False])
582+
def test_process_message_profiling(mock_profiler, profiling_enabled) -> None:
583+
"""Test that profiling is started and stopped when enabled, and not when disabled."""
584+
message = make_valid_message()
585+
kafka_message = make_kafka_message(message)
586+
result = process_message(kafka_message, profiling_enabled=profiling_enabled)
587+
588+
assert mock_profiler.start_profiler.call_count == (1 if profiling_enabled else 0)
589+
assert mock_profiler.stop_profiler.call_count == (1 if profiling_enabled else 0)
590+
591+
assert isinstance(result, ProcessedEvent)
592+
593+
594+
@patch("sentry.replays.consumers.recording.sentry_sdk.profiler")
595+
@pytest.mark.parametrize("profiling_enabled", [True, False])
596+
def test_commit_message_profiling(mock_profiler, profiling_enabled) -> None:
597+
"""Test that profiling is started and stopped when enabled, and not when disabled."""
598+
processed_event = make_valid_processed_event()
599+
commit_message(
600+
make_processed_event_message(processed_event), profiling_enabled=profiling_enabled
601+
)
602+
603+
assert mock_profiler.start_profiler.call_count == (1 if profiling_enabled else 0)
604+
assert mock_profiler.stop_profiler.call_count == (1 if profiling_enabled else 0)
605+
606+
607+
@patch("sentry.replays.consumers.recording.sentry_sdk.profiler")
608+
@patch("sentry.replays.consumers.recording.parse_recording_event")
609+
@pytest.mark.parametrize("profiling_enabled", [True, False])
610+
def test_process_message_profiling_on_error(
611+
mock_parse_recording_event, mock_profiler, profiling_enabled
612+
) -> None:
613+
"""Test that profiling is started and stopped when enabled, and not when disabled, even on error."""
614+
mock_parse_recording_event.side_effect = Exception("test error")
615+
616+
message = make_valid_message()
617+
kafka_message = make_kafka_message(message)
618+
result = process_message(kafka_message, profiling_enabled=profiling_enabled)
619+
620+
assert mock_profiler.start_profiler.call_count == (1 if profiling_enabled else 0)
621+
assert mock_profiler.stop_profiler.call_count == (1 if profiling_enabled else 0)
622+
assert result == FilteredPayload()
623+
624+
625+
@patch("sentry.replays.consumers.recording.sentry_sdk.profiler")
626+
@patch("sentry.replays.consumers.recording.commit_recording_message")
627+
@pytest.mark.parametrize("profiling_enabled", [True, False])
628+
def test_commit_message_profiling_on_error(
629+
mock_commit_recording_message, mock_profiler, profiling_enabled
630+
) -> None:
631+
"""Test that profiling is started and stopped when enabled, and not when disabled, even on error."""
632+
mock_commit_recording_message.side_effect = Exception("test error")
633+
634+
processed_event = make_valid_processed_event()
635+
commit_message(
636+
make_processed_event_message(processed_event), profiling_enabled=profiling_enabled
637+
)
638+
639+
assert mock_profiler.start_profiler.call_count == (1 if profiling_enabled else 0)
640+
assert mock_profiler.stop_profiler.call_count == (1 if profiling_enabled else 0)
641+
642+
643+
@patch("sentry.replays.consumers.recording.options.get")
644+
@patch("sentry.replays.consumers.recording.commit_message")
645+
@pytest.mark.parametrize("profiling_enabled", [True, False])
646+
def test_commit_message_with_options(mock_commit_message, mock_options, profiling_enabled) -> None:
647+
"""Test that commit_message_with_options calls commit_message with the correct profiling_enabled value."""
648+
mock_options.return_value = profiling_enabled
649+
650+
processed_event = make_valid_processed_event()
651+
commit_message_with_options(make_processed_event_message(processed_event))
652+
653+
assert mock_commit_message.call_count == 1
654+
assert mock_commit_message.call_args[1]["profiling_enabled"] == profiling_enabled
655+
656+
657+
@patch("sentry.replays.consumers.recording.options.get")
658+
@patch("sentry.replays.consumers.recording.process_message")
659+
@pytest.mark.parametrize("profiling_enabled", [True, False])
660+
def test_process_message_with_options(
661+
mock_process_message, mock_options, profiling_enabled
662+
) -> None:
663+
"""Test that process_message_with_options calls process_message with the correct profiling_enabled value."""
664+
mock_options.return_value = profiling_enabled
665+
666+
message = make_valid_message()
667+
kafka_message = make_kafka_message(message)
668+
process_message_with_options(kafka_message)
669+
670+
assert mock_process_message.call_count == 1
671+
assert mock_process_message.call_args[1]["profiling_enabled"] == profiling_enabled

0 commit comments

Comments
 (0)