1+ from unittest import mock
12from unittest .mock import patch
23
34import pytest
5+ from confluent_kafka import TopicPartition
46
57from saluki .consume import consume
68
9+
710@patch ("saluki.consume.Consumer" )
811def test_go_forwards_with_no_offset_raises (_ ):
912 with pytest .raises (ValueError ):
1013 consume ("broker" , "topic" , go_forwards = True , offset = None )
1114
12- # test that tries going forwards that consumes from offset
1315
14- # test that checks start offset
16+ @patch ("saluki.consume.Consumer" )
17+ def test_go_forwards_with_offset_assigns_at_offset (mock_consumer ):
18+ expected_topic = "topic"
19+ expected_offset = 1234
20+ expected_partition = 1
21+ consume (
22+ "broker" ,
23+ expected_topic ,
24+ go_forwards = True ,
25+ offset = expected_offset ,
26+ partition = expected_partition ,
27+ )
28+ mock_assign = mock_consumer .return_value .assign
29+
30+ mock_assign .assert_called_with (
31+ [TopicPartition (expected_topic , expected_partition , expected_offset )]
32+ )
33+
34+
35+ @patch ("saluki.consume.Consumer" )
36+ def test_consume_with_offset_and_num_of_messages_goes_back_offset_minus_messages (
37+ mock_consumer ,
38+ ):
39+ expected_offset = 1234
40+ expected_topic = "sometopic"
41+ num_messages = 3
42+ expected_start_offset = expected_offset - num_messages + 1
43+
44+ consume ("broker" , expected_topic , offset = expected_offset , num_messages = num_messages )
45+
46+ mock_assign = mock_consumer .return_value .assign
47+ mock_assign .assert_called_once ()
48+
49+ mock_assign_call = mock_assign .call_args .args [0 ][0 ]
50+ assert mock_assign_call .topic == expected_topic
51+ assert mock_assign_call .offset == expected_start_offset
52+
53+
54+ @patch ("saluki.consume.Consumer" )
55+ def test_consume_with_no_offset_and_num_of_messages_goes_back_high_watermark_minus_messages (
56+ mock_consumer ,
57+ ):
58+ expected_topic = "sometopic"
59+ num_messages = 3
60+ high_watermark_offset = 2345
61+ expected_start_offset = high_watermark_offset - num_messages
62+
63+ mock_consumer .return_value .get_watermark_offsets .return_value = (
64+ None ,
65+ high_watermark_offset ,
66+ )
67+
68+ consume ("broker" , topic = expected_topic , num_messages = num_messages )
69+ mock_assign = mock_consumer .return_value .assign
70+ mock_assign .assert_called_once ()
71+
72+ mock_assign_call = mock_assign .call_args .args [0 ][0 ]
73+
74+ assert mock_assign_call .topic == expected_topic
75+ assert mock_assign_call .offset == expected_start_offset
76+
1577
16- # test that catches exception
78+ def test_consume_but_exception_thrown_consumer_is_closed ():
79+ with (
80+ mock .patch ("saluki.consume.Consumer" ) as c ,
81+ ):
82+ c .return_value .consume .side_effect = Exception
83+ consume ("somebroker" , "sometopic" , num_messages = 1 )
84+ c .return_value .close .assert_called_once ()
0 commit comments