Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ def check_connection(
for stream in streams[: min(self.stream_count, len(streams))]:
stream_is_available, reason = evaluate_availability(stream, logger)
if not stream_is_available:
logger.warning(f"Stream {stream.name} is not available: {reason}")
return False, reason
message = f"Stream {stream.name} is not available: {reason}"
logger.warning(message)
return False, message
except Exception as error:
error_message = (
f"Encountered an error trying to connect to stream {stream.name}. Error: {error}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,56 +31,56 @@
400: ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.system_error,
error_message="Bad request. Please check your request parameters.",
error_message="HTTP Status Code: 400. Error: Bad request. Please check your request parameters.",
),
401: ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.config_error,
error_message="Unauthorized. Please ensure you are authenticated correctly.",
error_message="HTTP Status Code: 401. Error: Unauthorized. Please ensure you are authenticated correctly.",
),
403: ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.config_error,
error_message="Forbidden. You don't have permission to access this resource.",
error_message="HTTP Status Code: 403. Error: Forbidden. You don't have permission to access this resource.",
),
404: ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.system_error,
error_message="Not found. The requested resource was not found on the server.",
error_message="HTTP Status Code: 404. Error: Not found. The requested resource was not found on the server.",
),
405: ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.system_error,
error_message="Method not allowed. Please check your request method.",
error_message="HTTP Status Code: 405. Error: Method not allowed. Please check your request method.",
),
408: ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="Request timeout.",
error_message="HTTP Status Code: 408. Error: Request timeout.",
),
429: ErrorResolution(
response_action=ResponseAction.RATE_LIMITED,
failure_type=FailureType.transient_error,
error_message="Too many requests.",
error_message="HTTP Status Code: 429. Error: Too many requests.",
),
500: ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="Internal server error.",
error_message="HTTP Status Code: 500. Error: Internal server error.",
),
502: ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="Bad gateway.",
error_message="HTTP Status Code: 502. Error: Bad gateway.",
),
503: ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="Service unavailable.",
error_message="HTTP Status Code: 503. Error: Service unavailable.",
),
504: ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="Gateway timeout.",
error_message="HTTP Status Code: 504. Error: Gateway timeout.",
),
}
30 changes: 21 additions & 9 deletions airbyte_cdk/sources/streams/http/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import orjson
import requests
import requests_cache
from airbyte_protocol_dataclasses.models import FailureType
from requests.auth import AuthBase

from airbyte_cdk.models import (
Expand All @@ -35,6 +36,7 @@
ResponseAction,
)
from airbyte_cdk.sources.streams.http.exceptions import (
BaseBackoffException,
DefaultBackoffException,
RateLimitBackoffException,
RequestBodyException,
Expand Down Expand Up @@ -290,15 +292,25 @@ def _send_with_retry(
backoff_handler = http_client_default_backoff_handler(
max_tries=max_tries, max_time=max_time
)
# backoff handlers wrap _send, so it will always return a response
response = backoff_handler(rate_limit_backoff_handler(user_backoff_handler))(
request,
request_kwargs,
log_formatter=log_formatter,
exit_on_rate_limit=exit_on_rate_limit,
) # type: ignore # mypy can't infer that backoff_handler wraps _send

return response
# backoff handlers wrap _send, so it will always return a response -- except when all retries are exhausted
try:
response = backoff_handler(rate_limit_backoff_handler(user_backoff_handler))(
request,
request_kwargs,
log_formatter=log_formatter,
exit_on_rate_limit=exit_on_rate_limit,
) # type: ignore # mypy can't infer that backoff_handler wraps _send

return response
except BaseBackoffException as e:
self._logger.error(f"Retries exhausted with backoff exception.", exc_info=True)
raise MessageRepresentationAirbyteTracedErrors(
internal_message=f"Exhausted available request attempts. Exception: {e}",
message=f"Exhausted available request attempts. Please see logs for more details. Exception: {e}",
failure_type=FailureType.transient_error,
exception=e,
stream_descriptor=StreamDescriptor(name=self._name),
)

def _send(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def test_default_error_handler_with_default_response_filter(
),
ResponseAction.RETRY,
FailureType.system_error,
"Bad request. Please check your request parameters.",
"HTTP Status Code: 400. Error: Bad request. Please check your request parameters.",
),
(
"_with_http_response_status_402_fail_with_default_failure_type",
Expand All @@ -118,7 +118,7 @@ def test_default_error_handler_with_default_response_filter(
),
ResponseAction.FAIL,
FailureType.config_error,
"Forbidden. You don't have permission to access this resource.",
"HTTP Status Code: 403. Error: Forbidden. You don't have permission to access this resource.",
),
(
"_with_http_response_status_200_fail_with_contained_error_message",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
ErrorResolution(
response_action=ResponseAction.IGNORE,
failure_type=FailureType.config_error,
error_message="Forbidden. You don't have permission to access this resource.",
error_message="HTTP Status Code: 403. Error: Forbidden. You don't have permission to access this resource.",
),
id="test_http_code_matches_ignore_action",
),
Expand All @@ -59,7 +59,7 @@
ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="Too many requests.",
error_message="HTTP Status Code: 429. Error: Too many requests.",
),
id="test_http_code_matches_retry_action",
),
Expand Down Expand Up @@ -104,7 +104,7 @@
ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.config_error,
error_message="Forbidden. You don't have permission to access this resource.",
error_message="HTTP Status Code: 403. Error: Forbidden. You don't have permission to access this resource.",
),
id="test_predicate_matches_headers",
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
from airbyte_cdk.sources.streams.http.exceptions import (
RequestBodyException,
UserDefinedBackoffException,
)
from airbyte_cdk.sources.streams.http.http_client import MessageRepresentationAirbyteTracedErrors
from airbyte_cdk.sources.types import Config


Expand Down Expand Up @@ -880,7 +880,7 @@ def test_request_attempt_count_is_tracked_across_retries(http_requester_factory)
response.status_code = 500
http_requester._http_client._session.send.return_value = response

with pytest.raises(UserDefinedBackoffException):
with pytest.raises(MessageRepresentationAirbyteTracedErrors):
http_requester._http_client._send_with_retry(request=request_mock, request_kwargs={})

assert (
Expand All @@ -906,7 +906,7 @@ def test_request_attempt_count_with_exponential_backoff_strategy(http_requester_
response.status_code = 500
http_requester._http_client._session.send.return_value = response

with pytest.raises(UserDefinedBackoffException):
with pytest.raises(MessageRepresentationAirbyteTracedErrors):
http_requester._http_client._send_with_retry(request=request_mock, request_kwargs={})

assert (
Expand Down Expand Up @@ -937,7 +937,7 @@ def test_backoff_strategy_from_manifest_is_respected(http_requester_factory: Any
response.status_code = 500
http_requester._http_client._session.send.return_value = response

with pytest.raises(UserDefinedBackoffException):
with pytest.raises(MessageRepresentationAirbyteTracedErrors):
http_requester._http_client._send_with_retry(request=request_mock, request_kwargs={})

assert (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ def test_given_ok_response_http_status_error_handler_returns_success_action(mock
403,
ResponseAction.FAIL,
FailureType.config_error,
"Forbidden. You don't have permission to access this resource.",
"Status Code: 403. Error: Forbidden. You don't have permission to access this resource.",
),
(
404,
ResponseAction.FAIL,
FailureType.system_error,
"Not found. The requested resource was not found on the server.",
"Status Code: 404. Error: Not found. The requested resource was not found on the server.",
),
],
)
Expand Down
49 changes: 39 additions & 10 deletions unit_tests/sources/streams/http/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@
import logging
from http import HTTPStatus
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union
from unittest.mock import ANY, MagicMock, patch
from unittest.mock import ANY, MagicMock, Mock, patch

import pytest
import requests
from requests.auth import AuthBase
from requests.exceptions import InvalidURL

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, SyncMode, Type
from airbyte_cdk.sources.message.repository import MessageRepository
from airbyte_cdk.sources.streams import CheckpointMixin
from airbyte_cdk.sources.streams.call_rate import APIBudget
from airbyte_cdk.sources.streams.checkpoint import ResumableFullRefreshCursor
from airbyte_cdk.sources.streams.checkpoint.substream_resumable_full_refresh_cursor import (
SubstreamResumableFullRefreshCursor,
Expand All @@ -30,7 +33,10 @@
RequestBodyException,
UserDefinedBackoffException,
)
from airbyte_cdk.sources.streams.http.http_client import MessageRepresentationAirbyteTracedErrors
from airbyte_cdk.sources.streams.http.http_client import (
HttpClient,
MessageRepresentationAirbyteTracedErrors,
)
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
from airbyte_cdk.utils.airbyte_secrets_utils import update_secrets

Expand All @@ -40,7 +46,22 @@ class StubBasicReadHttpStream(HttpStream):
primary_key = ""

def __init__(self, deduplicate_query_params: bool = False, **kwargs):
disable_retries = False
if "disable_retries" in kwargs:
disable_retries = kwargs.pop("disable_retries")
super().__init__(**kwargs)
self._http_client = HttpClient(
name=self.name,
logger=self.logger,
error_handler=self.get_error_handler(),
api_budget=kwargs.get("api_budget", Mock(spec=APIBudget)),
authenticator=kwargs.get("authenticator", None),
use_cache=self.use_cache,
backoff_strategy=self.get_backoff_strategy(),
message_repository=kwargs.get("message_repository", Mock(spec=MessageRepository)),
disable_retries=disable_retries,
)

self.resp_counter = 1
self._deduplicate_query_params = deduplicate_query_params

Expand Down Expand Up @@ -169,7 +190,7 @@ def test_stub_custom_backoff_http_stream(mocker):

send_mock = mocker.patch.object(requests.Session, "send", return_value=req)

with pytest.raises(UserDefinedBackoffException):
with pytest.raises(MessageRepresentationAirbyteTracedErrors):
list(stream.read_records(SyncMode.full_refresh))
assert send_mock.call_count == stream.max_retries + 1

Expand Down Expand Up @@ -286,7 +307,10 @@ def test_raise_on_http_errors_off_429(mocker):
req.status_code = 429

mocker.patch.object(requests.Session, "send", return_value=req)
with pytest.raises(DefaultBackoffException, match="Too many requests"):
with pytest.raises(
MessageRepresentationAirbyteTracedErrors,
match="Exhausted available request attempts. Please see logs for more details. Exception: HTTP Status Code: 429. Error: Too many requests.",
):
stream.exit_on_rate_limit = True
list(stream.read_records(SyncMode.full_refresh))

Expand All @@ -299,7 +323,7 @@ def test_raise_on_http_errors_off_5xx(mocker, status_code):
req.status_code = status_code

send_mock = mocker.patch.object(requests.Session, "send", return_value=req)
with pytest.raises(DefaultBackoffException):
with pytest.raises(MessageRepresentationAirbyteTracedErrors):
list(stream.read_records(SyncMode.full_refresh))
assert send_mock.call_count == stream.max_retries + 1

Expand Down Expand Up @@ -330,7 +354,7 @@ def test_raise_on_http_errors(mocker, error):
stream = AutoFailFalseHttpStream()
send_mock = mocker.patch.object(requests.Session, "send", side_effect=error())

with pytest.raises(DefaultBackoffException):
with pytest.raises(MessageRepresentationAirbyteTracedErrors):
list(stream.read_records(SyncMode.full_refresh))
assert send_mock.call_count == stream.max_retries + 1

Expand Down Expand Up @@ -548,6 +572,9 @@ def test_using_cache(mocker, requests_mock):
class AutoFailTrueHttpStream(StubBasicReadHttpStream):
raise_on_http_errors = True

def __init__(self, **kwargs):
super().__init__(disable_retries=True, **kwargs)

def should_retry(self, *args, **kwargs):
return True

Expand Down Expand Up @@ -580,14 +607,16 @@ def test_http_stream_adapter_http_status_error_handler_should_retry_false_raise_

@pytest.mark.parametrize("status_code", range(400, 600))
def test_send_raise_on_http_errors_logs(mocker, status_code):
mocker.patch("time.sleep", lambda x: None)
stream = AutoFailTrueHttpStream()
res = requests.Response()
res = Mock(spec=requests.Response)
res.status_code = status_code
res.headers = {}
mocker.patch.object(requests.Session, "send", return_value=res)
mocker.patch.object(stream._http_client, "_logger")
with pytest.raises(requests.exceptions.HTTPError):
response = stream._http_client.send_request("GET", "https://g", {}, exit_on_rate_limit=True)
with pytest.raises(MessageRepresentationAirbyteTracedErrors):
_, response = stream._http_client.send_request(
"GET", "https://g", {}, exit_on_rate_limit=True
)
stream._http_client.logger.error.assert_called_with(response.text)
assert response.status_code == status_code

Expand Down
Loading