|
1 | | -from unittest.mock import Mock |
| 1 | +from unittest.mock import Mock, patch |
2 | 2 |
|
3 | 3 | import pytest |
4 | 4 |
|
5 | | -from saluki.utils import parse_kafka_uri, _parse_timestamp, _deserialise_and_print_messages |
| 5 | +from saluki.utils import ( |
| 6 | + parse_kafka_uri, |
| 7 | + _parse_timestamp, |
| 8 | + _deserialise_and_print_messages, |
| 9 | + __try_to_deserialise_message, |
| 10 | +) |
6 | 11 | from confluent_kafka import Message |
7 | 12 |
|
| 13 | + |
8 | 14 | @pytest.fixture |
9 | 15 | def mock_message(): |
10 | 16 | return Mock(spec=Message) |
11 | 17 |
|
12 | | -def test_deserialising_message_with_no_message_continues(mock_message): |
13 | | - _deserialise_and_print_messages([mock_message]) |
| 18 | + |
| 19 | +def test_deserialising_message_with_no_message_continues(): |
| 20 | + with patch("saluki.utils.__try_to_deserialise_message") as mock_deserialise_message: |
| 21 | + _deserialise_and_print_messages([None], None) |
| 22 | + mock_deserialise_message.assert_not_called() |
| 23 | + |
14 | 24 |
|
15 | 25 | def test_deserialising_message_with_error_continues(mock_message): |
16 | | - pass |
| 26 | + mock_message.error.return_value = "Some error" |
| 27 | + with patch("saluki.utils.__try_to_deserialise_message") as mock_deserialise_message: |
| 28 | + _deserialise_and_print_messages([mock_message], None) |
| 29 | + mock_deserialise_message.assert_not_called() |
| 30 | + |
17 | 31 |
|
18 | 32 | def test_deserialising_message_with_wrong_partition_continues(mock_message): |
| 33 | + noninteresting_partition = 123 |
| 34 | + mock_message.error.return_value = False |
| 35 | + mock_message.partition.return_value = noninteresting_partition |
| 36 | + with patch("saluki.utils.__try_to_deserialise_message") as mock_deserialise_message: |
| 37 | + _deserialise_and_print_messages([mock_message], 234) |
| 38 | + mock_deserialise_message.assert_not_called() |
| 39 | + |
| 40 | + |
| 41 | +def test_deserialising_message_with_correct_partition_calls_deserialise(mock_message): |
| 42 | + partition = 123 |
| 43 | + mock_message.error.return_value = False |
| 44 | + mock_message.partition.return_value = partition |
| 45 | + with patch("saluki.utils.__try_to_deserialise_message") as mock_deserialise_message: |
| 46 | + _deserialise_and_print_messages([mock_message], partition) |
| 47 | + mock_deserialise_message.assert_called_once() |
| 48 | + |
| 49 | + |
| 50 | +def test_deserialising_empty_message(mock_message): |
| 51 | + assert (None, "") == __try_to_deserialise_message(b"") |
| 52 | + |
| 53 | + |
| 54 | +def test_deserialising_message_with_invalid_schema_falls_back_to_raw_bytes_decode( |
| 55 | + mock_message, |
| 56 | +): |
19 | 57 | pass |
20 | 58 |
|
21 | 59 |
|
| 60 | +def test_deserialising_message_which_raises_does_not_stop_loop(mock_message): |
| 61 | + pass |
| 62 | + |
| 63 | + |
| 64 | +def test_schema_that_isnt_in_deserialiser_list(mock_message): |
| 65 | + pass |
22 | 66 |
|
23 | | -# test with normal payload |
24 | 67 |
|
25 | | -# test with fallback serialiser when schema not found (empty payload) |
| 68 | +def test_message_that_has_valid_schema_but_empty_payload(mock_message): |
| 69 | + pass |
26 | 70 |
|
27 | | -# test with schema that looks ok, but not in list of deserialisers |
28 | 71 |
|
29 | | -# test exception while deserialising |
| 72 | +def test_message_that_has_valid_schema_but_invalid_payload(mock_message): |
| 73 | + pass |
| 74 | + |
| 75 | + |
| 76 | +def test_message_that_has_valid_schema_and_valid_payload(mock_message): |
| 77 | + pass |
30 | 78 |
|
31 | 79 |
|
32 | 80 | def test_parse_timestamp_with_valid_timestamp(mock_message): |
33 | 81 | mock_message.timestamp.return_value = (1, 1753434939336) |
34 | | - assert _parse_timestamp(mock_message) == '2025-07-25 10:15:39.336000' |
| 82 | + assert _parse_timestamp(mock_message) == "2025-07-25 10:15:39.336000" |
| 83 | + |
35 | 84 |
|
36 | 85 | def test_parse_timestamp_with_timestamp_not_available(mock_message): |
37 | 86 | mock_message.timestamp.return_value = (2, "blah") |
38 | 87 | assert _parse_timestamp(mock_message) == "Unknown" |
39 | 88 |
|
| 89 | + |
40 | 90 | def test_uri_with_broker_name_and_topic_successfully_split(): |
41 | 91 | test_broker = "localhost" |
42 | 92 | test_topic = "some_topic" |
|
0 commit comments