Skip to content

Commit 9460eed

Browse files
committed
Implement pulsar-relay retry handling and improvde message resume
1 parent e2099bd commit 9460eed

File tree

2 files changed

+450
-47
lines changed

2 files changed

+450
-47
lines changed
Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
"""
2+
Tests for the relay transport implementation.
3+
4+
Tests retry logic and message ID tracking functionality.
5+
"""
6+
from unittest.mock import Mock, patch
7+
import pytest
8+
import requests
9+
10+
from pulsar.client.transport.relay import RelayTransport, RelayTransportError
11+
12+
13+
class TestRetryLogic:
14+
"""Test retry logic with exponential backoff."""
15+
16+
@patch('pulsar.client.transport.relay.time.sleep')
17+
def test_post_message_retries_on_connection_error(self, mock_sleep):
18+
"""Test that post_message retries indefinitely on connection errors."""
19+
transport = RelayTransport('http://localhost:8000', 'user', 'pass')
20+
21+
# Mock the auth manager to return a token
22+
transport.auth_manager.get_token = Mock(return_value='test-token')
23+
24+
# Mock session.post to fail twice with ConnectionError, then succeed
25+
mock_response = Mock()
26+
mock_response.status_code = 200
27+
mock_response.json.return_value = {
28+
'message_id': 'msg_123',
29+
'topic': 'test-topic',
30+
'timestamp': '2025-10-27T10:00:00Z'
31+
}
32+
33+
transport.session.post = Mock(
34+
side_effect=[
35+
requests.ConnectionError("Connection refused"),
36+
requests.ConnectionError("Connection refused"),
37+
mock_response
38+
]
39+
)
40+
41+
result = transport.post_message('test-topic', {'data': 'test'})
42+
43+
# Verify it succeeded after retries
44+
assert result['message_id'] == 'msg_123'
45+
assert transport.session.post.call_count == 3
46+
# Verify exponential backoff was used
47+
assert mock_sleep.call_count == 2
48+
# First delay should be 1.0, second should be 2.0
49+
assert mock_sleep.call_args_list[0][0][0] == 1.0
50+
assert mock_sleep.call_args_list[1][0][0] == 2.0
51+
52+
@patch('pulsar.client.transport.relay.time.sleep')
53+
def test_post_message_retries_on_500_error(self, mock_sleep):
54+
"""Test that post_message retries on 5xx server errors."""
55+
transport = RelayTransport('http://localhost:8000', 'user', 'pass')
56+
transport.auth_manager.get_token = Mock(return_value='test-token')
57+
58+
# Mock responses: 500, 503, then 200
59+
mock_500 = Mock()
60+
mock_500.status_code = 500
61+
62+
mock_503 = Mock()
63+
mock_503.status_code = 503
64+
65+
mock_200 = Mock()
66+
mock_200.status_code = 200
67+
mock_200.json.return_value = {
68+
'message_id': 'msg_456',
69+
'topic': 'test-topic',
70+
'timestamp': '2025-10-27T10:00:00Z'
71+
}
72+
73+
transport.session.post = Mock(side_effect=[mock_500, mock_503, mock_200])
74+
75+
result = transport.post_message('test-topic', {'data': 'test'})
76+
77+
assert result['message_id'] == 'msg_456'
78+
assert transport.session.post.call_count == 3
79+
assert mock_sleep.call_count == 2
80+
81+
def test_post_message_does_not_retry_on_400_error(self):
82+
"""Test that post_message does NOT retry on 4xx client errors."""
83+
transport = RelayTransport('http://localhost:8000', 'user', 'pass')
84+
transport.auth_manager.get_token = Mock(return_value='test-token')
85+
86+
# Mock response with 400 error
87+
mock_400 = Mock()
88+
mock_400.status_code = 400
89+
90+
# Create HTTPError with response attached
91+
error = requests.HTTPError("400 Bad Request")
92+
error.response = mock_400
93+
mock_400.raise_for_status.side_effect = error
94+
95+
transport.session.post = Mock(return_value=mock_400)
96+
97+
with pytest.raises(RelayTransportError):
98+
transport.post_message('test-topic', {'data': 'test'})
99+
100+
# Should only be called once (no retries for 4xx)
101+
assert transport.session.post.call_count == 1
102+
103+
@patch('pulsar.client.transport.relay.time.sleep')
104+
def test_post_message_retries_on_timeout(self, mock_sleep):
105+
"""Test that post_message retries on timeout."""
106+
transport = RelayTransport('http://localhost:8000', 'user', 'pass')
107+
transport.auth_manager.get_token = Mock(return_value='test-token')
108+
109+
mock_response = Mock()
110+
mock_response.status_code = 200
111+
mock_response.json.return_value = {
112+
'message_id': 'msg_789',
113+
'topic': 'test-topic',
114+
'timestamp': '2025-10-27T10:00:00Z'
115+
}
116+
117+
transport.session.post = Mock(
118+
side_effect=[
119+
requests.Timeout("Request timed out"),
120+
mock_response
121+
]
122+
)
123+
124+
result = transport.post_message('test-topic', {'data': 'test'})
125+
126+
assert result['message_id'] == 'msg_789'
127+
assert transport.session.post.call_count == 2
128+
assert mock_sleep.call_count == 1
129+
130+
@patch('pulsar.client.transport.relay.time.sleep')
131+
def test_retry_backoff_caps_at_max_delay(self, mock_sleep):
132+
"""Test that exponential backoff caps at max_delay."""
133+
transport = RelayTransport('http://localhost:8000', 'user', 'pass')
134+
transport.auth_manager.get_token = Mock(return_value='test-token')
135+
136+
# Create many connection errors to test max delay
137+
errors = [requests.ConnectionError("Connection refused")] * 10
138+
139+
mock_response = Mock()
140+
mock_response.status_code = 200
141+
mock_response.json.return_value = {
142+
'message_id': 'msg_999',
143+
'topic': 'test-topic',
144+
'timestamp': '2025-10-27T10:00:00Z'
145+
}
146+
147+
transport.session.post = Mock(side_effect=errors + [mock_response])
148+
149+
result = transport.post_message('test-topic', {'data': 'test'})
150+
151+
assert result['message_id'] == 'msg_999'
152+
assert mock_sleep.call_count == 10
153+
154+
# Check that delay caps at 60 seconds
155+
delays = [call[0][0] for call in mock_sleep.call_args_list]
156+
# Expected: 1, 2, 4, 8, 16, 32, 60, 60, 60, 60
157+
assert delays[0] == 1.0
158+
assert delays[1] == 2.0
159+
assert delays[2] == 4.0
160+
assert delays[3] == 8.0
161+
assert delays[4] == 16.0
162+
assert delays[5] == 32.0
163+
# After this, should cap at 60
164+
assert all(d == 60.0 for d in delays[6:])
165+
166+
167+
class TestMessageIDTracking:
168+
"""Test message ID tracking functionality."""
169+
170+
def test_long_poll_tracks_message_ids(self):
171+
"""Test that long_poll tracks message IDs per topic."""
172+
transport = RelayTransport('http://localhost:8000', 'user', 'pass')
173+
transport.auth_manager.get_token = Mock(return_value='test-token')
174+
175+
# Mock response with messages from different topics
176+
mock_response = Mock()
177+
mock_response.status_code = 200
178+
mock_response.json.return_value = {
179+
'messages': [
180+
{'topic': 'topic1', 'message_id': 'msg_001', 'payload': {'data': 'a'}},
181+
{'topic': 'topic2', 'message_id': 'msg_002', 'payload': {'data': 'b'}},
182+
{'topic': 'topic1', 'message_id': 'msg_003', 'payload': {'data': 'c'}},
183+
],
184+
'has_more': False
185+
}
186+
187+
transport.session.post = Mock(return_value=mock_response)
188+
189+
messages = transport.long_poll(['topic1', 'topic2'])
190+
191+
# Verify message IDs are tracked (last message ID per topic)
192+
assert transport.get_last_message_id('topic1') == 'msg_003'
193+
assert transport.get_last_message_id('topic2') == 'msg_002'
194+
assert len(messages) == 3
195+
196+
def test_long_poll_uses_tracked_message_ids_in_since(self):
197+
"""Test that long_poll includes tracked message IDs in the since parameter."""
198+
transport = RelayTransport('http://localhost:8000', 'user', 'pass')
199+
transport.auth_manager.get_token = Mock(return_value='test-token')
200+
201+
# Set some tracked message IDs
202+
transport.set_last_message_id('topic1', 'msg_100')
203+
transport.set_last_message_id('topic2', 'msg_200')
204+
205+
mock_response = Mock()
206+
mock_response.status_code = 200
207+
mock_response.json.return_value = {
208+
'messages': [],
209+
'has_more': False
210+
}
211+
212+
transport.session.post = Mock(return_value=mock_response)
213+
214+
# Call long_poll
215+
transport.long_poll(['topic1', 'topic2'])
216+
217+
# Verify the 'since' parameter was included in the request
218+
call_args = transport.session.post.call_args
219+
request_json = call_args[1]['json']
220+
221+
assert 'since' in request_json
222+
assert request_json['since']['topic1'] == 'msg_100'
223+
assert request_json['since']['topic2'] == 'msg_200'
224+
225+
def test_long_poll_only_includes_since_for_requested_topics(self):
226+
"""Test that since only includes tracked IDs for topics in the request."""
227+
transport = RelayTransport('http://localhost:8000', 'user', 'pass')
228+
transport.auth_manager.get_token = Mock(return_value='test-token')
229+
230+
# Set tracked message IDs for multiple topics
231+
transport.set_last_message_id('topic1', 'msg_100')
232+
transport.set_last_message_id('topic2', 'msg_200')
233+
transport.set_last_message_id('topic3', 'msg_300')
234+
235+
mock_response = Mock()
236+
mock_response.status_code = 200
237+
mock_response.json.return_value = {'messages': [], 'has_more': False}
238+
239+
transport.session.post = Mock(return_value=mock_response)
240+
241+
# Only poll for topic1 and topic2
242+
transport.long_poll(['topic1', 'topic2'])
243+
244+
call_args = transport.session.post.call_args
245+
request_json = call_args[1]['json']
246+
247+
# Should only include topic1 and topic2 in since
248+
assert 'since' in request_json
249+
assert 'topic1' in request_json['since']
250+
assert 'topic2' in request_json['since']
251+
assert 'topic3' not in request_json['since']

0 commit comments

Comments
 (0)