|
11 | 11 | import orjson |
12 | 12 | import requests |
13 | 13 | import requests_cache |
| 14 | +from airbyte_protocol_dataclasses.models import FailureType |
14 | 15 | from requests.auth import AuthBase |
15 | 16 |
|
16 | 17 | from airbyte_cdk.models import ( |
|
35 | 36 | ResponseAction, |
36 | 37 | ) |
37 | 38 | from airbyte_cdk.sources.streams.http.exceptions import ( |
| 39 | + BaseBackoffException, |
38 | 40 | DefaultBackoffException, |
39 | 41 | RateLimitBackoffException, |
40 | 42 | RequestBodyException, |
@@ -290,15 +292,34 @@ def _send_with_retry( |
290 | 292 | backoff_handler = http_client_default_backoff_handler( |
291 | 293 | max_tries=max_tries, max_time=max_time |
292 | 294 | ) |
293 | | - # backoff handlers wrap _send, so it will always return a response |
294 | | - response = backoff_handler(rate_limit_backoff_handler(user_backoff_handler))( |
295 | | - request, |
296 | | - request_kwargs, |
297 | | - log_formatter=log_formatter, |
298 | | - exit_on_rate_limit=exit_on_rate_limit, |
299 | | - ) # type: ignore # mypy can't infer that backoff_handler wraps _send |
300 | | - |
301 | | - return response |
| 295 | + # backoff handlers wrap _send, so it will always return a response -- except when all retries are exhausted |
| 296 | + try: |
| 297 | + response = backoff_handler(rate_limit_backoff_handler(user_backoff_handler))( |
| 298 | + request, |
| 299 | + request_kwargs, |
| 300 | + log_formatter=log_formatter, |
| 301 | + exit_on_rate_limit=exit_on_rate_limit, |
| 302 | + ) # type: ignore # mypy can't infer that backoff_handler wraps _send |
| 303 | + |
| 304 | + return response |
| 305 | + except BaseBackoffException as e: |
| 306 | + self._logger.error(f"Retries exhausted with backoff exception.", exc_info=True) |
| 307 | + raise AirbyteTracedException( |
| 308 | + internal_message=f"Exhausted available request attempts. No more requests will be attempted.", |
| 309 | + message="Exhausted available request attempts. No more requests will be attempted. Please see logs for more details.", |
| 310 | + failure_type=FailureType.transient_error, |
| 311 | + exception=e, |
| 312 | + stream_descriptor=StreamDescriptor(name=self._name), |
| 313 | + ) |
| 314 | + except Exception as e: |
| 315 | + self._logger.error(f"Retries exhausted with unexpected exception.", exc_info=True) |
| 316 | + raise AirbyteTracedException( |
| 317 | + internal_message=f"Exhausted available request attempts. No more requests will be attempted.", |
| 318 | + message="Exhausted available request attempts. No more requests will be attempted. Please see logs for more details.", |
| 319 | + failure_type=FailureType.system_error, |
| 320 | + exception=e, |
| 321 | + stream_descriptor=StreamDescriptor(name=self._name), |
| 322 | + ) |
302 | 323 |
|
303 | 324 | def _send( |
304 | 325 | self, |
|
0 commit comments