|
1 | 1 | import zlib
|
| 2 | +from unittest.mock import patch |
2 | 3 |
|
3 | 4 | import msgpack
|
4 | 5 | import pytest
|
|
7 | 8 |
|
8 | 9 | from sentry.replays.consumers.recording import (
|
9 | 10 | DropSilently,
|
| 11 | + commit_message, |
| 12 | + commit_message_with_options, |
10 | 13 | decompress_segment,
|
11 | 14 | parse_headers,
|
12 | 15 | parse_recording_event,
|
13 | 16 | parse_request_message,
|
14 | 17 | process_message,
|
| 18 | + process_message_with_options, |
15 | 19 | )
|
16 | 20 | from sentry.replays.usecases.ingest import ProcessedEvent
|
17 | 21 | from sentry.replays.usecases.ingest.event_parser import ParsedEventMeta
|
@@ -520,3 +524,148 @@ def test_process_message_no_headers() -> None:
|
520 | 524 |
|
521 | 525 | def make_kafka_message(message) -> Message[KafkaPayload]:
|
522 | 526 | return Message(Value(KafkaPayload(key=None, value=msgpack.packb(message), headers=[]), {}))
|
| 527 | + |
| 528 | + |
| 529 | +def make_processed_event_message(processed_event: ProcessedEvent) -> Message[ProcessedEvent]: |
| 530 | + return Message(Value(processed_event, {})) |
| 531 | + |
| 532 | + |
| 533 | +def make_valid_message() -> dict: |
| 534 | + original_payload = b'[{"type": "test", "data": "some event data"}]' |
| 535 | + compressed_payload = zlib.compress(original_payload) |
| 536 | + segment_id = 42 |
| 537 | + headers = json.dumps({"segment_id": segment_id}).encode() |
| 538 | + recording_payload = headers + b"\n" + compressed_payload |
| 539 | + |
| 540 | + return { |
| 541 | + "type": "replay_recording_not_chunked", |
| 542 | + "org_id": 3, |
| 543 | + "project_id": 4, |
| 544 | + "replay_id": "1", |
| 545 | + "received": 2, |
| 546 | + "retention_days": 30, |
| 547 | + "payload": recording_payload, |
| 548 | + "key_id": 1, |
| 549 | + "replay_event": b"{}", |
| 550 | + "replay_video": b"", |
| 551 | + "version": 0, |
| 552 | + } |
| 553 | + |
| 554 | + |
| 555 | +def make_valid_processed_event() -> ProcessedEvent: |
| 556 | + original_payload = b'[{"type": "test", "data": "some event data"}]' |
| 557 | + compressed_payload = zlib.compress(original_payload) |
| 558 | + |
| 559 | + return ProcessedEvent( |
| 560 | + actions_event=ParsedEventMeta([], [], [], [], [], []), |
| 561 | + context={ |
| 562 | + "key_id": 1, |
| 563 | + "org_id": 3, |
| 564 | + "project_id": 4, |
| 565 | + "received": 2, |
| 566 | + "replay_id": "1", |
| 567 | + "retention_days": 30, |
| 568 | + "segment_id": 42, |
| 569 | + }, |
| 570 | + filedata=compressed_payload, |
| 571 | + filename="30/4/1/42", |
| 572 | + recording_size_uncompressed=len(original_payload), |
| 573 | + recording_size=len(compressed_payload), |
| 574 | + replay_event={}, |
| 575 | + trace_items=[], |
| 576 | + video_size=None, |
| 577 | + ) |
| 578 | + |
| 579 | + |
| 580 | +@patch("sentry.replays.consumers.recording.sentry_sdk.profiler") |
| 581 | +@pytest.mark.parametrize("profiling_enabled", [True, False]) |
| 582 | +def test_process_message_profiling(mock_profiler, profiling_enabled) -> None: |
| 583 | + """Test that profiling is started and stopped when enabled, and not when disabled.""" |
| 584 | + message = make_valid_message() |
| 585 | + kafka_message = make_kafka_message(message) |
| 586 | + result = process_message(kafka_message, profiling_enabled=profiling_enabled) |
| 587 | + |
| 588 | + assert mock_profiler.start_profiler.call_count == (1 if profiling_enabled else 0) |
| 589 | + assert mock_profiler.stop_profiler.call_count == (1 if profiling_enabled else 0) |
| 590 | + |
| 591 | + assert isinstance(result, ProcessedEvent) |
| 592 | + |
| 593 | + |
| 594 | +@patch("sentry.replays.consumers.recording.sentry_sdk.profiler") |
| 595 | +@pytest.mark.parametrize("profiling_enabled", [True, False]) |
| 596 | +def test_commit_message_profiling(mock_profiler, profiling_enabled) -> None: |
| 597 | + """Test that profiling is started and stopped when enabled, and not when disabled.""" |
| 598 | + processed_event = make_valid_processed_event() |
| 599 | + commit_message( |
| 600 | + make_processed_event_message(processed_event), profiling_enabled=profiling_enabled |
| 601 | + ) |
| 602 | + |
| 603 | + assert mock_profiler.start_profiler.call_count == (1 if profiling_enabled else 0) |
| 604 | + assert mock_profiler.stop_profiler.call_count == (1 if profiling_enabled else 0) |
| 605 | + |
| 606 | + |
| 607 | +@patch("sentry.replays.consumers.recording.sentry_sdk.profiler") |
| 608 | +@patch("sentry.replays.consumers.recording.parse_recording_event") |
| 609 | +@pytest.mark.parametrize("profiling_enabled", [True, False]) |
| 610 | +def test_process_message_profiling_on_error( |
| 611 | + mock_parse_recording_event, mock_profiler, profiling_enabled |
| 612 | +) -> None: |
| 613 | + """Test that profiling is started and stopped when enabled, and not when disabled, even on error.""" |
| 614 | + mock_parse_recording_event.side_effect = Exception("test error") |
| 615 | + |
| 616 | + message = make_valid_message() |
| 617 | + kafka_message = make_kafka_message(message) |
| 618 | + result = process_message(kafka_message, profiling_enabled=profiling_enabled) |
| 619 | + |
| 620 | + assert mock_profiler.start_profiler.call_count == (1 if profiling_enabled else 0) |
| 621 | + assert mock_profiler.stop_profiler.call_count == (1 if profiling_enabled else 0) |
| 622 | + assert result == FilteredPayload() |
| 623 | + |
| 624 | + |
| 625 | +@patch("sentry.replays.consumers.recording.sentry_sdk.profiler") |
| 626 | +@patch("sentry.replays.consumers.recording.commit_recording_message") |
| 627 | +@pytest.mark.parametrize("profiling_enabled", [True, False]) |
| 628 | +def test_commit_message_profiling_on_error( |
| 629 | + mock_commit_recording_message, mock_profiler, profiling_enabled |
| 630 | +) -> None: |
| 631 | + """Test that profiling is started and stopped when enabled, and not when disabled, even on error.""" |
| 632 | + mock_commit_recording_message.side_effect = Exception("test error") |
| 633 | + |
| 634 | + processed_event = make_valid_processed_event() |
| 635 | + commit_message( |
| 636 | + make_processed_event_message(processed_event), profiling_enabled=profiling_enabled |
| 637 | + ) |
| 638 | + |
| 639 | + assert mock_profiler.start_profiler.call_count == (1 if profiling_enabled else 0) |
| 640 | + assert mock_profiler.stop_profiler.call_count == (1 if profiling_enabled else 0) |
| 641 | + |
| 642 | + |
| 643 | +@patch("sentry.replays.consumers.recording.options.get") |
| 644 | +@patch("sentry.replays.consumers.recording.commit_message") |
| 645 | +@pytest.mark.parametrize("profiling_enabled", [True, False]) |
| 646 | +def test_commit_message_with_options(mock_commit_message, mock_options, profiling_enabled) -> None: |
| 647 | + """Test that commit_message_with_options calls commit_message with the correct profiling_enabled value.""" |
| 648 | + mock_options.return_value = profiling_enabled |
| 649 | + |
| 650 | + processed_event = make_valid_processed_event() |
| 651 | + commit_message_with_options(make_processed_event_message(processed_event)) |
| 652 | + |
| 653 | + assert mock_commit_message.call_count == 1 |
| 654 | + assert mock_commit_message.call_args[1]["profiling_enabled"] == profiling_enabled |
| 655 | + |
| 656 | + |
| 657 | +@patch("sentry.replays.consumers.recording.options.get") |
| 658 | +@patch("sentry.replays.consumers.recording.process_message") |
| 659 | +@pytest.mark.parametrize("profiling_enabled", [True, False]) |
| 660 | +def test_process_message_with_options( |
| 661 | + mock_process_message, mock_options, profiling_enabled |
| 662 | +) -> None: |
| 663 | + """Test that process_message_with_options calls process_message with the correct profiling_enabled value.""" |
| 664 | + mock_options.return_value = profiling_enabled |
| 665 | + |
| 666 | + message = make_valid_message() |
| 667 | + kafka_message = make_kafka_message(message) |
| 668 | + process_message_with_options(kafka_message) |
| 669 | + |
| 670 | + assert mock_process_message.call_count == 1 |
| 671 | + assert mock_process_message.call_args[1]["profiling_enabled"] == profiling_enabled |
0 commit comments