Skip to content

Commit de33efe

Browse files
authored
Merge pull request #80 from dClimate/timeouts
fix: timeouts with delays
2 parents 45f276a + 8cca5fc commit de33efe

File tree

3 files changed

+302
-23
lines changed

3 files changed

+302
-23
lines changed

py_hamt/store_httpx.py

Lines changed: 85 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import random
23
import re
34
from abc import ABC, abstractmethod
45
from typing import Any, Literal, Tuple, cast
@@ -146,6 +147,9 @@ def __init__(
146147
headers: dict[str, str] | None = None,
147148
auth: Tuple[str, str] | None = None,
148149
chunker: str = "size-1048576",
150+
max_retries: int = 3,
151+
initial_delay: float = 1.0,
152+
backoff_factor: float = 2.0,
149153
):
150154
"""
151155
If None is passed into the rpc or gateway base url, then the default for kubo local daemons will be used. The default local values will also be used if nothing is passed in at all.
@@ -219,14 +223,24 @@ def __init__(
219223
self._owns_client = True
220224
self._client_per_loop = {}
221225

222-
# The instance is never closed on initialization.
223-
self._closed = False
224-
225226
# store for later use by _loop_client()
226227
self._default_headers = headers
227228
self._default_auth = auth
228229

229230
self._sem: asyncio.Semaphore = asyncio.Semaphore(concurrency)
231+
self._closed: bool = False
232+
233+
# Validate retry parameters
234+
if max_retries < 0:
235+
raise ValueError("max_retries must be non-negative")
236+
if initial_delay <= 0:
237+
raise ValueError("initial_delay must be positive")
238+
if backoff_factor < 1.0:
239+
raise ValueError("backoff_factor must be >= 1.0 for exponential backoff")
240+
241+
self.max_retries = max_retries
242+
self.initial_delay = initial_delay
243+
self.backoff_factor = backoff_factor
230244

231245
# --------------------------------------------------------------------- #
232246
# helper: get or create the client bound to the current running loop #
@@ -338,28 +352,78 @@ def __del__(self) -> None:
338352
# save() – now uses the per-loop client #
339353
# --------------------------------------------------------------------- #
340354
async def save(self, data: bytes, codec: ContentAddressedStore.CodecInput) -> CID:
341-
async with self._sem: # throttle RPC
342-
# Create multipart form data
355+
async with self._sem:
343356
files = {"file": data}
344-
345-
# Send the POST request
346357
client = self._loop_client()
347-
response = await client.post(self.rpc_url, files=files)
348-
response.raise_for_status()
349-
cid_str: str = response.json()["Hash"]
358+
retry_count = 0
350359

351-
cid: CID = CID.decode(cid_str)
352-
if cid.codec.code != self.DAG_PB_MARKER:
353-
cid = cid.set(codec=codec)
354-
return cid
360+
while retry_count <= self.max_retries:
361+
try:
362+
response = await client.post(
363+
self.rpc_url, files=files, timeout=60.0
364+
)
365+
response.raise_for_status()
366+
cid_str: str = response.json()["Hash"]
367+
cid: CID = CID.decode(cid_str)
368+
if cid.codec.code != self.DAG_PB_MARKER:
369+
cid = cid.set(codec=codec)
370+
return cid
371+
372+
except (httpx.TimeoutException, httpx.RequestError) as e:
373+
retry_count += 1
374+
if retry_count > self.max_retries:
375+
raise httpx.TimeoutException(
376+
f"Failed to save data after {self.max_retries} retries: {str(e)}",
377+
request=e.request
378+
if isinstance(e, httpx.RequestError)
379+
else None,
380+
)
381+
382+
# Calculate backoff delay
383+
delay = self.initial_delay * (
384+
self.backoff_factor ** (retry_count - 1)
385+
)
386+
# Add some jitter to prevent thundering herd
387+
jitter = delay * 0.1 * (random.random() - 0.5)
388+
await asyncio.sleep(delay + jitter)
389+
390+
except httpx.HTTPStatusError:
391+
# Re-raise non-timeout HTTP errors immediately
392+
raise
393+
raise RuntimeError("Exited the retry loop unexpectedly.") # pragma: no cover
355394

356395
async def load(self, id: IPLDKind) -> bytes:
357-
"""@private"""
358-
cid = cast(CID, id) # CID is definitely in the IPLDKind type
396+
cid = cast(CID, id)
359397
url: str = f"{self.gateway_base_url + str(cid)}"
360-
361-
async with self._sem: # throttle gateway
398+
async with self._sem:
362399
client = self._loop_client()
363-
response = await client.get(url)
364-
response.raise_for_status()
365-
return response.content
400+
retry_count = 0
401+
402+
while retry_count <= self.max_retries:
403+
try:
404+
response = await client.get(url, timeout=60.0)
405+
response.raise_for_status()
406+
return response.content
407+
408+
except (httpx.TimeoutException, httpx.RequestError) as e:
409+
retry_count += 1
410+
if retry_count > self.max_retries:
411+
raise httpx.TimeoutException(
412+
f"Failed to load data after {self.max_retries} retries: {str(e)}",
413+
request=e.request
414+
if isinstance(e, httpx.RequestError)
415+
else None,
416+
)
417+
418+
# Calculate backoff delay
419+
delay = self.initial_delay * (
420+
self.backoff_factor ** (retry_count - 1)
421+
)
422+
# Add some jitter to prevent thundering herd
423+
jitter = delay * 0.1 * (random.random() - 0.5)
424+
await asyncio.sleep(delay + jitter)
425+
426+
except httpx.HTTPStatusError:
427+
# Re-raise non-timeout HTTP errors immediately
428+
raise
429+
raise RuntimeError("Exited the retry loop unexpectedly.") # pragma: no cover

tests/test_kubo_cas.py

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
import asyncio
12
from typing import Literal, cast
3+
from unittest.mock import AsyncMock, patch
24

35
import dag_cbor
46
import httpx
@@ -164,3 +166,208 @@ async def test_chunker_invalid_patterns(invalid):
164166
with pytest.raises(ValueError, match="Invalid chunker specification"):
165167
async with KuboCAS(chunker=invalid):
166168
pass
169+
170+
171+
@pytest.mark.asyncio
172+
async def test_kubo_timeout_retries():
173+
"""
174+
Test that KuboCAS handles timeouts with retries and exponential backoff
175+
for both save and load operations using unittest.mock.
176+
"""
177+
timeout_count = 0
178+
successful_after = 2 # Succeed after 2 timeout attempts
179+
test_cid = "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi"
180+
181+
async def mock_post(url, **kwargs):
182+
nonlocal timeout_count
183+
# Manually create a dummy request object
184+
dummy_request = httpx.Request("POST", url, files=kwargs.get("files"))
185+
if timeout_count < successful_after:
186+
timeout_count += 1
187+
raise httpx.TimeoutException("Simulated timeout", request=dummy_request)
188+
return httpx.Response(200, json={"Hash": test_cid}, request=dummy_request)
189+
190+
async def mock_get(url, **kwargs):
191+
nonlocal timeout_count
192+
# Manually create a dummy request object
193+
dummy_request = httpx.Request("GET", url)
194+
if timeout_count < successful_after:
195+
timeout_count += 1
196+
raise httpx.TimeoutException("Simulated timeout", request=dummy_request)
197+
return httpx.Response(200, content=test_data, request=dummy_request)
198+
199+
# Patch the httpx.AsyncClient methods
200+
with patch.object(httpx.AsyncClient, "post", new=AsyncMock(side_effect=mock_post)):
201+
with patch.object(
202+
httpx.AsyncClient, "get", new=AsyncMock(side_effect=mock_get)
203+
):
204+
async with httpx.AsyncClient() as client:
205+
async with KuboCAS(
206+
rpc_base_url="http://127.0.0.1:5001",
207+
gateway_base_url="http://127.0.0.1:8080",
208+
client=client,
209+
max_retries=3,
210+
initial_delay=0.1,
211+
backoff_factor=2.0,
212+
) as kubo_cas:
213+
# Test save with retries
214+
timeout_count = 0
215+
test_data = dag_cbor.encode("test")
216+
cid = await kubo_cas.save(test_data, codec="dag-cbor")
217+
assert timeout_count == successful_after, (
218+
"Should have retried twice before success"
219+
)
220+
assert str(cid) == test_cid
221+
222+
# Test load with retries
223+
timeout_count = 0
224+
result = await kubo_cas.load(cid)
225+
assert timeout_count == successful_after, (
226+
"Should have retried twice before success"
227+
)
228+
assert result == test_data
229+
230+
# Test failure after max retries
231+
async def failing_method(url, **kwargs):
232+
dummy_request = httpx.Request(
233+
"POST", url
234+
) # Create the dummy request
235+
raise httpx.TimeoutException(
236+
"Simulated timeout", request=dummy_request
237+
)
238+
239+
with patch.object(
240+
httpx.AsyncClient,
241+
"post",
242+
new=AsyncMock(side_effect=failing_method),
243+
):
244+
with patch.object(
245+
httpx.AsyncClient,
246+
"get",
247+
new=AsyncMock(side_effect=failing_method),
248+
):
249+
with pytest.raises(
250+
httpx.TimeoutException,
251+
match="Failed to save data after 3 retries",
252+
):
253+
await kubo_cas.save(test_data, codec="dag-cbor")
254+
255+
with pytest.raises(
256+
httpx.TimeoutException,
257+
match="Failed to load data after 3 retries",
258+
):
259+
await kubo_cas.load(cid)
260+
261+
262+
@pytest.mark.asyncio
263+
async def test_kubo_backoff_timing():
264+
"""
265+
Test that KuboCAS implements exponential backoff with jitter correctly.
266+
"""
267+
268+
async def timeout_method(url, **kwargs):
269+
# Manually create a dummy request for the exception
270+
dummy_request = httpx.Request("POST", url)
271+
raise httpx.TimeoutException("Simulated timeout", request=dummy_request)
272+
273+
# Patch sleep to record timing
274+
original_sleep = asyncio.sleep
275+
sleep_times = []
276+
277+
async def mock_sleep(delay):
278+
sleep_times.append(delay)
279+
# Call the original sleep function to avoid recursion
280+
await original_sleep(0)
281+
282+
with patch.object(
283+
httpx.AsyncClient, "post", new=AsyncMock(side_effect=timeout_method)
284+
):
285+
async with httpx.AsyncClient() as client:
286+
async with KuboCAS(
287+
rpc_base_url="http://127.0.0.1:5001",
288+
gateway_base_url="http://127.0.0.1:8080",
289+
client=client,
290+
max_retries=3,
291+
initial_delay=0.1,
292+
backoff_factor=2.0,
293+
) as kubo_cas:
294+
with patch("asyncio.sleep", side_effect=mock_sleep):
295+
with pytest.raises(httpx.TimeoutException):
296+
await kubo_cas.save(b"test", codec="dag-cbor")
297+
298+
# Verify backoff timing
299+
assert len(sleep_times) == 3, "Should have attempted 3 retries"
300+
assert 0.09 <= sleep_times[0] <= 0.11, "First retry should be ~0.1s"
301+
assert 0.18 <= sleep_times[1] <= 0.22, (
302+
"Second retry should be ~0.2s"
303+
)
304+
assert 0.36 <= sleep_times[2] <= 0.44, "Third retry should be ~0.4s"
305+
306+
307+
@pytest.mark.asyncio
308+
async def test_kubo_http_status_error_no_retry():
309+
"""
310+
Tests that KuboCAS immediately raises HTTPStatusError without retrying.
311+
"""
312+
313+
# This mock simulates a server error by returning a 500 status code.
314+
async def mock_post_server_error(url, **kwargs):
315+
dummy_request = httpx.Request("POST", url)
316+
return httpx.Response(
317+
500, request=dummy_request, content=b"Internal Server Error"
318+
)
319+
320+
# Patch the client's post method to always return the 500 error.
321+
with patch.object(
322+
httpx.AsyncClient, "post", new=AsyncMock(side_effect=mock_post_server_error)
323+
):
324+
# Also patch asyncio.sleep to verify it's not called (i.e., no retries).
325+
with patch("asyncio.sleep", new=AsyncMock()) as mock_sleep:
326+
async with httpx.AsyncClient() as client:
327+
async with KuboCAS(client=client) as kubo_cas:
328+
# Assert that the specific error is raised.
329+
with pytest.raises(httpx.HTTPStatusError) as exc_info:
330+
await kubo_cas.save(b"some data", codec="raw")
331+
332+
# Verify that the response in the exception has the correct status code.
333+
assert exc_info.value.response.status_code == 500
334+
# Verify that no retry was attempted.
335+
mock_sleep.assert_not_called()
336+
337+
338+
@pytest.mark.asyncio
339+
async def test_kubo_cas_retry_validation():
340+
"""Test validation of retry parameters in KuboCAS constructor"""
341+
342+
# Test max_retries validation
343+
with pytest.raises(ValueError, match="max_retries must be non-negative"):
344+
KuboCAS(max_retries=-1)
345+
346+
with pytest.raises(ValueError, match="max_retries must be non-negative"):
347+
KuboCAS(max_retries=-5)
348+
349+
# Test initial_delay validation
350+
with pytest.raises(ValueError, match="initial_delay must be positive"):
351+
KuboCAS(initial_delay=0)
352+
353+
with pytest.raises(ValueError, match="initial_delay must be positive"):
354+
KuboCAS(initial_delay=-1.0)
355+
356+
# Test backoff_factor validation
357+
with pytest.raises(
358+
ValueError, match="backoff_factor must be >= 1.0 for exponential backoff"
359+
):
360+
KuboCAS(backoff_factor=0.5)
361+
362+
with pytest.raises(
363+
ValueError, match="backoff_factor must be >= 1.0 for exponential backoff"
364+
):
365+
KuboCAS(backoff_factor=0.9)
366+
367+
# Test valid edge case values
368+
async with KuboCAS(
369+
max_retries=0, initial_delay=0.001, backoff_factor=1.0
370+
) as kubo_cas:
371+
assert kubo_cas.max_retries == 0
372+
assert kubo_cas.initial_delay == 0.001
373+
assert kubo_cas.backoff_factor == 1.0

tests/test_public_gateway.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ async def test_kubocas_public_gateway():
152152
cas_save = KuboCAS(
153153
rpc_base_url="http://127.0.0.1:5001",
154154
gateway_base_url="http://127.0.0.1:8080",
155+
max_retries=0,
155156
)
156157

157158
try:
@@ -218,6 +219,7 @@ async def test_trailing_slash_gateway():
218219
cas = KuboCAS(
219220
rpc_base_url="http://127.0.0.1:5001",
220221
gateway_base_url="http://127.0.0.1:8080/", # Note the trailing slash
222+
max_retries=0,
221223
)
222224

223225
try:
@@ -278,15 +280,21 @@ async def test_fix_kubocas_load():
278280
]
279281

280282
for input_url, expected_base in test_cases:
281-
cas = KuboCAS(rpc_base_url="http://127.0.0.1:5001", gateway_base_url=input_url)
283+
cas = KuboCAS(
284+
rpc_base_url="http://127.0.0.1:5001",
285+
gateway_base_url=input_url,
286+
max_retries=0,
287+
)
282288
assert cas.gateway_base_url == expected_base, (
283289
f"URL construction failed for {input_url}"
284290
)
285291
await cas.aclose()
286292

287293
# Test actual loading with local gateway
288294
cas = KuboCAS(
289-
rpc_base_url="http://127.0.0.1:5001", gateway_base_url="http://127.0.0.1:8080"
295+
rpc_base_url="http://127.0.0.1:5001",
296+
gateway_base_url="http://127.0.0.1:8080",
297+
max_retries=0,
290298
)
291299

292300
try:

0 commit comments

Comments
 (0)