Skip to content

Commit 51a6ab8

Browse files
authored
feat: Improve retry and exception handling in the SDK clients (#171)
# Overview The retry mechanism was incorrectly configured, resulting in capturing and retrying on errors it shouldn't retry on, e.g. `asyncio.CancelledError` or `asyncio.TimeoutError`. Instead of a negative filter when using `tenacity`'s `retry` decorator, i.e. `retry_if_not_exception_type((CompassClientError,))`, which captured a lot more exceptions than we originally intended, we now specifically list the exceptions that we know we want to retry on, or otherwise let the error bubble. This PR also improves the exception hierarchy by introducing a few new exceptions to help us better communicate errors to the clients without having to bubble or wrap httpx exceptions. Specifically, we are introducing: - CompassNetworkError - CompassServerError - CompassTimeoutError As part of this PR, the `utils.py` module were refactored into multiple files to avoid having a pile of utility functions dumped in a single file.
1 parent 9b3490b commit 51a6ab8

File tree

16 files changed

+683
-539
lines changed

16 files changed

+683
-539
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,3 +172,4 @@ cython_debug/
172172

173173
# PyPI configuration file
174174
.pypirc
175+
typings/

cohere_compass/clients/compass.py

Lines changed: 18 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,12 @@
2424
import httpx
2525
from joblib import Parallel, delayed # type: ignore
2626
from pydantic import BaseModel
27-
from tenacity import retry, retry_if_not_exception_type, stop_after_attempt, wait_fixed
27+
from tenacity import (
28+
retry,
29+
retry_if_exception,
30+
stop_after_attempt,
31+
wait_fixed,
32+
)
2833

2934
# Local imports
3035
from cohere_compass import GroupAuthorizationInput
@@ -37,11 +42,10 @@
3742
URL_SAFE_STRING_PATTERN,
3843
)
3944
from cohere_compass.exceptions import (
40-
CompassAuthError,
41-
CompassClientError,
4245
CompassError,
4346
CompassInsertionError,
4447
CompassMaxErrorRateExceeded,
48+
handle_httpx_exceptions,
4549
)
4650
from cohere_compass.models import (
4751
CompassDocument,
@@ -82,7 +86,8 @@
8286
RetrievedDocument,
8387
SortBy,
8488
)
85-
from cohere_compass.utils import partition_documents
89+
from cohere_compass.utils.documents import partition_documents
90+
from cohere_compass.utils.retry import is_retryable_httpx_exception
8691

8792

8893
@dataclass
@@ -1474,42 +1479,16 @@ def _send_request(
14741479
@retry(
14751480
stop=stop_after_attempt(max_retries),
14761481
wait=wait_fixed(retry_wait),
1477-
reraise=True, # re-raise last exception instead of wrapping in RetryError
1478-
# todo find alternative to InvalidSchema
1479-
retry=retry_if_not_exception_type((CompassClientError,)),
1482+
reraise=True,
1483+
retry=retry_if_exception(is_retryable_httpx_exception),
14801484
)
14811485
def _send_request_with_retry() -> _SendRequestResult:
1482-
try:
1483-
return self._send_http_request(
1484-
http_method=http_method,
1485-
target_path=target_path,
1486-
data=data,
1487-
timeout=timeout,
1488-
)
1489-
except httpx.HTTPStatusError as e:
1490-
if e.response.status_code == 401:
1491-
error = "Unauthorized. Please check your bearer token."
1492-
raise CompassAuthError(message=str(e))
1493-
elif 400 <= e.response.status_code < 500:
1494-
error = f"Client error occurred: {e.response.text}"
1495-
raise CompassClientError(message=error, code=e.response.status_code)
1496-
else:
1497-
error = str(e) + " " + e.response.text
1498-
logger.warning(
1499-
f"Failed to send request to {api_name} {target_path}: "
1500-
f"{type(e)} {error}. Going to sleep for "
1501-
f"{retry_wait} seconds and retrying."
1502-
)
1503-
raise e
1504-
except Exception as e:
1505-
error = str(e)
1506-
logger.warning(
1507-
f"Failed to send request to {api_name} {target_path}: {type(e)} "
1508-
f"{error}. Sleeping {retry_wait} seconds and retrying..."
1509-
)
1510-
raise e
1486+
return self._send_http_request(
1487+
http_method=http_method,
1488+
target_path=target_path,
1489+
data=data,
1490+
timeout=timeout,
1491+
)
15111492

1512-
try:
1493+
with handle_httpx_exceptions():
15131494
return _send_request_with_retry()
1514-
except Exception as e:
1515-
raise CompassError(f"Failed to send request for {api_name} API") from e

cohere_compass/clients/compass_async.py

Lines changed: 18 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,12 @@
1919
# 3rd party imports
2020
import httpx
2121
from pydantic import BaseModel
22-
from tenacity import retry, retry_if_not_exception_type, stop_after_attempt, wait_fixed
22+
from tenacity import (
23+
retry,
24+
retry_if_exception,
25+
stop_after_attempt,
26+
wait_fixed,
27+
)
2328

2429
# Local imports
2530
from cohere_compass import GroupAuthorizationInput
@@ -36,11 +41,10 @@
3641
DEFAULT_RETRY_WAIT,
3742
)
3843
from cohere_compass.exceptions import (
39-
CompassAuthError,
40-
CompassClientError,
4144
CompassError,
4245
CompassInsertionError,
4346
CompassMaxErrorRateExceeded,
47+
handle_httpx_exceptions,
4448
)
4549
from cohere_compass.models import (
4650
CompassDocument,
@@ -77,11 +81,11 @@
7781
)
7882
from cohere_compass.models.indexes import IndexDetails, ListIndexesResponse
7983
from cohere_compass.models.search import GetDocumentResponse, RetrievedDocument, SortBy
80-
from cohere_compass.utils import (
81-
async_apply,
82-
async_enumerate,
84+
from cohere_compass.utils.asyn import async_apply, async_enumerate
85+
from cohere_compass.utils.documents import (
8386
partition_documents_async,
8487
)
88+
from cohere_compass.utils.retry import is_retryable_httpx_exception
8589

8690

8791
class CompassAsyncClient:
@@ -1349,41 +1353,15 @@ async def _send_request(
13491353
@retry(
13501354
stop=stop_after_attempt(max_retries),
13511355
wait=wait_fixed(retry_wait),
1352-
reraise=True, # re-raise last exception instead of wrapping in RetryError
1353-
# todo find alternative to InvalidSchema
1354-
retry=retry_if_not_exception_type((CompassClientError,)),
1356+
reraise=True,
1357+
retry=retry_if_exception(is_retryable_httpx_exception),
13551358
)
13561359
async def _send_request_with_retry() -> _SendRequestResult:
1357-
try:
1358-
return await self._send_http_request(
1359-
http_method=http_method,
1360-
target_path=target_path,
1361-
data=data,
1362-
)
1363-
except httpx.HTTPStatusError as e:
1364-
if e.response.status_code == 401:
1365-
error = "Unauthorized. Please check your bearer token."
1366-
raise CompassAuthError(message=str(e))
1367-
elif 400 <= e.response.status_code < 500:
1368-
error = f"Client error occurred: {e.response.text}"
1369-
raise CompassClientError(message=error, code=e.response.status_code)
1370-
else:
1371-
error = str(e) + " " + e.response.text
1372-
logger.warning(
1373-
f"Failed to send request to {api_name} {target_path}: "
1374-
f"{type(e)} {error}. Going to sleep for "
1375-
f"{self.retry_wait} seconds and retrying."
1376-
)
1377-
raise e
1378-
except Exception as e:
1379-
error = str(e)
1380-
logger.warning(
1381-
f"Failed to send request to {api_name} {target_path}: {type(e)} "
1382-
f"{error}. Sleeping {self.retry_wait} seconds and retrying..."
1383-
)
1384-
raise e
1360+
return await self._send_http_request(
1361+
http_method=http_method,
1362+
target_path=target_path,
1363+
data=data,
1364+
)
13851365

1386-
try:
1366+
with handle_httpx_exceptions():
13871367
return await _send_request_with_retry()
1388-
except Exception as e:
1389-
raise CompassError(f"Failed to send request for {api_name} API") from e

cohere_compass/clients/parser.py

Lines changed: 22 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,9 @@
1818
import httpx
1919

2020
# 3rd party imports
21-
from pydantic import ValidationError
2221
from tenacity import (
2322
retry,
24-
retry_if_not_exception_type,
23+
retry_if_exception,
2524
stop_after_attempt,
2625
wait_fixed,
2726
)
@@ -35,13 +34,17 @@
3534
DEFAULT_MAX_RETRIES,
3635
DEFAULT_RETRY_WAIT,
3736
)
38-
from cohere_compass.exceptions import CompassClientError, CompassError
37+
from cohere_compass.exceptions import handle_httpx_exceptions
3938
from cohere_compass.models import (
4039
CompassDocument,
4140
MetadataConfig,
4241
ParserConfig,
4342
)
44-
from cohere_compass.utils import imap_parallel, open_document, scan_folder
43+
from cohere_compass.utils.fs import open_document, scan_folder
44+
from cohere_compass.utils.iter import imap_parallel
45+
from cohere_compass.utils.retry import (
46+
is_retryable_compass_exception,
47+
)
4548

4649
Fn_or_Dict = dict[str, Any] | Callable[[CompassDocument], dict[str, Any]]
4750

@@ -244,12 +247,6 @@ def _get_metadata(
244247
else:
245248
return custom_context
246249

247-
@retry(
248-
stop=stop_after_attempt(DEFAULT_MAX_RETRIES),
249-
wait=wait_fixed(DEFAULT_RETRY_WAIT),
250-
retry=retry_if_not_exception_type((CompassClientError, ValidationError)),
251-
reraise=True,
252-
)
253250
def process_file(
254251
self,
255252
*,
@@ -304,12 +301,6 @@ def process_file(
304301
timeout=timeout,
305302
)
306303

307-
@retry(
308-
stop=stop_after_attempt(DEFAULT_MAX_RETRIES),
309-
wait=wait_fixed(DEFAULT_RETRY_WAIT),
310-
retry=retry_if_not_exception_type((CompassClientError, ValidationError)),
311-
reraise=True,
312-
)
313304
def process_file_bytes(
314305
self,
315306
*,
@@ -379,6 +370,12 @@ def _get_file_params(
379370
content_type=content_type,
380371
)
381372

373+
@retry(
374+
stop=stop_after_attempt(DEFAULT_MAX_RETRIES),
375+
wait=wait_fixed(DEFAULT_RETRY_WAIT),
376+
retry=retry_if_exception(is_retryable_compass_exception),
377+
reraise=True,
378+
)
382379
def _process_file_bytes(
383380
self,
384381
*,
@@ -392,23 +389,15 @@ def _process_file_bytes(
392389
if self.bearer_token:
393390
headers = {"Authorization": f"Bearer {self.bearer_token}"}
394391

395-
res = self.httpx_client.post(
396-
url=f"{self.parser_url}/v1/process_file",
397-
data={"data": json.dumps(params.model_dump())},
398-
files={"file": (filename, file_bytes)},
399-
headers=headers,
400-
timeout=(timeout or self.timeout).total_seconds(),
401-
)
402-
403-
if res.is_error:
404-
if res.status_code >= 400 and res.status_code < 500:
405-
raise CompassClientError(
406-
f"Error processing file: {res.status_code} {res.text}"
407-
)
408-
else:
409-
raise CompassError(
410-
f"Error processing file: {res.status_code} {res.text}"
411-
)
392+
with handle_httpx_exceptions():
393+
res = self.httpx_client.post(
394+
url=f"{self.parser_url}/v1/process_file",
395+
data={"data": json.dumps(params.model_dump())},
396+
files={"file": (filename, file_bytes)},
397+
headers=headers,
398+
timeout=(timeout or self.timeout).total_seconds(),
399+
)
400+
res.raise_for_status()
412401

413402
docs: list[CompassDocument] = []
414403
for doc in res.json()["docs"]:

cohere_compass/clients/parser_async.py

Lines changed: 22 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import httpx
1818
from tenacity import (
1919
retry,
20-
retry_if_not_exception_type,
20+
retry_if_exception,
2121
stop_after_attempt,
2222
wait_fixed,
2323
)
@@ -31,13 +31,17 @@
3131
DEFAULT_MAX_RETRIES,
3232
DEFAULT_RETRY_WAIT,
3333
)
34-
from cohere_compass.exceptions import CompassClientError, CompassError
34+
from cohere_compass.exceptions import handle_httpx_exceptions
3535
from cohere_compass.models import (
3636
CompassDocument,
3737
MetadataConfig,
3838
ParserConfig,
3939
)
40-
from cohere_compass.utils import async_map, open_document, scan_folder
40+
from cohere_compass.utils.asyn import async_map
41+
from cohere_compass.utils.fs import open_document, scan_folder
42+
from cohere_compass.utils.retry import (
43+
is_retryable_compass_exception,
44+
)
4145

4246
Fn_or_Dict = dict[str, Any] | Callable[[CompassDocument], dict[str, Any]]
4347

@@ -240,13 +244,6 @@ def _get_metadata(
240244
else:
241245
return custom_context
242246

243-
@retry(
244-
stop=stop_after_attempt(DEFAULT_MAX_RETRIES),
245-
wait=wait_fixed(DEFAULT_RETRY_WAIT),
246-
# todo find alternative to InvalidSchema
247-
retry=retry_if_not_exception_type((CompassClientError,)),
248-
reraise=True,
249-
)
250247
async def process_file(
251248
self,
252249
*,
@@ -297,13 +294,6 @@ async def process_file(
297294
custom_context=custom_context,
298295
)
299296

300-
@retry(
301-
stop=stop_after_attempt(DEFAULT_MAX_RETRIES),
302-
wait=wait_fixed(DEFAULT_RETRY_WAIT),
303-
# todo find alternative to InvalidSchema
304-
retry=retry_if_not_exception_type((CompassClientError,)),
305-
reraise=True,
306-
)
307297
async def process_file_bytes(
308298
self,
309299
*,
@@ -373,6 +363,12 @@ def _get_file_params(
373363
content_type=content_type,
374364
)
375365

366+
@retry(
367+
stop=stop_after_attempt(DEFAULT_MAX_RETRIES),
368+
wait=wait_fixed(DEFAULT_RETRY_WAIT),
369+
retry=retry_if_exception(is_retryable_compass_exception),
370+
reraise=True,
371+
)
376372
async def _process_file_bytes(
377373
self,
378374
*,
@@ -386,23 +382,15 @@ async def _process_file_bytes(
386382
if self.bearer_token:
387383
headers = {"Authorization": f"Bearer {self.bearer_token}"}
388384

389-
res = await self.httpx.post(
390-
url=f"{self.parser_url}/v1/process_file",
391-
data={"data": json.dumps(params.model_dump())},
392-
files={"file": (filename, file_bytes)},
393-
headers=headers,
394-
timeout=(timeout or self.timeout).total_seconds(),
395-
)
396-
397-
if res.is_error:
398-
if res.status_code >= 400 and res.status_code < 500:
399-
raise CompassClientError(
400-
f"Error processing file: {res.status_code} {res.text}"
401-
)
402-
else:
403-
raise CompassError(
404-
f"Error processing file: {res.status_code} {res.text}"
405-
)
385+
with handle_httpx_exceptions():
386+
res = await self.httpx.post(
387+
url=f"{self.parser_url}/v1/process_file",
388+
data={"data": json.dumps(params.model_dump())},
389+
files={"file": (filename, file_bytes)},
390+
headers=headers,
391+
timeout=(timeout or self.timeout).total_seconds(),
392+
)
393+
res.raise_for_status()
406394

407395
docs: list[CompassDocument] = []
408396
for doc in res.json()["docs"]:

0 commit comments

Comments
 (0)