|
| 1 | +from unittest.mock import Mock, patch |
| 2 | + |
| 3 | +from confluent_kafka import Message, TopicPartition |
| 4 | + |
| 5 | +from saluki.play import play |
| 6 | + |
| 7 | + |
| 8 | +def test_play_with_offsets(): |
| 9 | + src_broker = "broker1" |
| 10 | + src_topic = "topic1" |
| 11 | + dest_broker = "broker2" |
| 12 | + dest_topic = "topic2" |
| 13 | + offsets = [1, 2] |
| 14 | + |
| 15 | + message_1 = Mock(spec=Message) |
| 16 | + message_1_key = "msg1key" |
| 17 | + message_1.key.return_value = message_1_key |
| 18 | + message_1_val = "msg1" |
| 19 | + message_1.value.return_value = message_1_val |
| 20 | + |
| 21 | + message_2 = Mock(spec=Message) |
| 22 | + message_2_key = "msg2key" |
| 23 | + message_2.key.return_value = message_2_key |
| 24 | + message_2_val = "msg2" |
| 25 | + message_2.value.return_value = message_2_val |
| 26 | + |
| 27 | + with patch("saluki.play.Consumer") as c, patch("saluki.play.Producer") as p: |
| 28 | + consumer_obj = c() |
| 29 | + consumer_obj.consume.return_value = [message_1, message_2] |
| 30 | + |
| 31 | + play(src_broker, src_topic, dest_broker, dest_topic, offsets, None) |
| 32 | + |
| 33 | + assert consumer_obj.assign.call_args.args[0][0].topic == src_topic |
| 34 | + assert consumer_obj.assign.call_args.args[0][0].offset == offsets[0] |
| 35 | + |
| 36 | + consumer_obj.consume.assert_called_with(2) # stop - start + 1 |
| 37 | + |
| 38 | + p_obj = p() |
| 39 | + call_1 = p_obj.produce.call_args_list[0] |
| 40 | + assert call_1.args == (dest_topic, message_1_val, message_1_key) |
| 41 | + call_2 = p_obj.produce.call_args_list[1] |
| 42 | + assert call_2.args == (dest_topic, message_2_val, message_2_key) |
| 43 | + |
| 44 | + |
| 45 | +def test_play_with_timestamps(): |
| 46 | + src_broker = "broker1" |
| 47 | + src_topic = "topic1" |
| 48 | + dest_broker = "broker2" |
| 49 | + dest_topic = "topic2" |
| 50 | + timestamps = [1762444369, 1762444375] |
| 51 | + |
| 52 | + message_1 = Mock(spec=Message) |
| 53 | + message_1_key = "msg1key" |
| 54 | + message_1.key.return_value = message_1_key |
| 55 | + message_1_val = "msg1" |
| 56 | + message_1.value.return_value = message_1_val |
| 57 | + |
| 58 | + message_2 = Mock(spec=Message) |
| 59 | + message_2_key = "msg2key" |
| 60 | + message_2.key.return_value = message_2_key |
| 61 | + message_2_val = "msg2" |
| 62 | + message_2.value.return_value = message_2_val |
| 63 | + |
| 64 | + with patch("saluki.play.Consumer") as c, patch("saluki.play.Producer") as p: |
| 65 | + consumer_obj = c() |
| 66 | + consumer_obj.offsets_for_times.return_value = [ |
| 67 | + TopicPartition(src_topic, partition=0, offset=2), |
| 68 | + TopicPartition(src_topic, partition=0, offset=3), |
| 69 | + ] |
| 70 | + consumer_obj.consume.return_value = [message_1, message_2] |
| 71 | + |
| 72 | + play(src_broker, src_topic, dest_broker, dest_topic, None, timestamps) |
| 73 | + |
| 74 | + assert consumer_obj.assign.call_args.args[0][0].topic == src_topic |
| 75 | + assert consumer_obj.assign.call_args.args[0][0].offset == 2 |
| 76 | + |
| 77 | + consumer_obj.consume.assert_called_with(2) # stop - start + 1 |
| 78 | + |
| 79 | + p_obj = p() |
| 80 | + call_1 = p_obj.produce.call_args_list[0] |
| 81 | + assert call_1.args == (dest_topic, message_1_val, message_1_key) |
| 82 | + call_2 = p_obj.produce.call_args_list[1] |
| 83 | + assert call_2.args == (dest_topic, message_2_val, message_2_key) |
| 84 | + |
| 85 | + |
| 86 | +def test_play_with_exception_when_consuming_consumer_still_closed(): |
| 87 | + with ( |
| 88 | + patch("saluki.play.Consumer") as mock_consumer, |
| 89 | + patch("saluki.play.Producer"), |
| 90 | + patch("saluki.play.logger") as mock_logger, |
| 91 | + ): |
| 92 | + mock_consumer().consume.side_effect = Exception("blah") |
| 93 | + play("", "", "", "", [1, 2], None) |
| 94 | + |
| 95 | + mock_logger.exception.assert_called_once() |
| 96 | + |
| 97 | + mock_consumer().close.assert_called_once() |
0 commit comments