diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 93775c1ec..b735d8e6e 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -1,3 +1,7 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + import csv import gzip import io @@ -6,7 +10,7 @@ from abc import ABC, abstractmethod from dataclasses import dataclass from io import BufferedIOBase, TextIOWrapper -from typing import Any, Generator, MutableMapping, Optional +from typing import Any, Dict, Generator, List, MutableMapping, Optional, Set, Tuple import orjson import requests @@ -41,7 +45,14 @@ def parse( ) -> Generator[MutableMapping[str, Any], None, None]: """ Decompress gzipped bytes and pass decompressed data to the inner parser. + + IMPORTANT: + - If the data is not gzipped, reset the pointer and pass the data to the inner parser as is. + + Note: + - The data is not decoded by default. """ + with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj: yield from self.inner_parser.parse(gzipobj) @@ -50,7 +61,10 @@ def parse( class JsonParser(Parser): encoding: str = "utf-8" - def parse(self, data: BufferedIOBase) -> Generator[MutableMapping[str, Any], None, None]: + def parse( + self, + data: BufferedIOBase, + ) -> Generator[MutableMapping[str, Any], None, None]: """ Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data. """ @@ -130,31 +144,69 @@ def parse( yield row -@dataclass +_HEADER = str +_HEADER_VALUE = str + + class CompositeRawDecoder(Decoder): """ Decoder strategy to transform a requests.Response into a Generator[MutableMapping[str, Any], None, None] passed response.raw to parser(s). - Note: response.raw is not decoded/decompressed by default. - parsers should be instantiated recursively. + + Note: response.raw is not decoded/decompressed by default. Parsers should be instantiated recursively. + Example: - composite_raw_decoder = CompositeRawDecoder(parser=GzipParser(inner_parser=JsonLineParser(encoding="iso-8859-1"))) + composite_raw_decoder = CompositeRawDecoder( + parser=GzipParser( + inner_parser=JsonLineParser(encoding="iso-8859-1") + ) + ) """ - parser: Parser - stream_response: bool = True + @classmethod + def by_headers( + cls, + parsers: List[Tuple[Set[_HEADER], Set[_HEADER_VALUE], Parser]], + stream_response: bool, + fallback_parser: Parser, + ) -> "CompositeRawDecoder": + parsers_by_header = {} + for headers, header_values, parser in parsers: + for header in headers: + parsers_by_header[header] = {header_value: parser for header_value in header_values} + return cls(fallback_parser, stream_response, parsers_by_header) + + def __init__(self, parser: Parser, stream_response: bool = True, parsers_by_header: Optional[Dict[_HEADER, Dict[_HEADER_VALUE, Parser]]] = None) -> None: + self._parsers_by_header = parsers_by_header if parsers_by_header else {} + self._fallback_parser = parser + self._stream_response = stream_response def is_stream_response(self) -> bool: - return self.stream_response + return self._stream_response def decode( - self, response: requests.Response + self, + response: requests.Response, ) -> Generator[MutableMapping[str, Any], None, None]: + parser = self._select_parser(response) if self.is_stream_response(): - # urllib mentions that some interfaces don't play nice with auto_close [here](https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content) - # We have indeed observed some issues with CSV parsing. Hence, we will manage the closing of the file ourselves until we find a better solution. + # urllib mentions that some interfaces don't play nice with auto_close + # More info here: https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content + # We have indeed observed some issues with CSV parsing. + # Hence, we will manage the closing of the file ourselves until we find a better solution. response.raw.auto_close = False - yield from self.parser.parse(data=response.raw) # type: ignore[arg-type] + yield from parser.parse( + data=response.raw, # type: ignore[arg-type] + ) response.raw.close() else: - yield from self.parser.parse(data=io.BytesIO(response.content)) + yield from parser.parse(data=io.BytesIO(response.content)) + + def _select_parser(self, response: requests.Response) -> Parser: + for header, parser_by_header_value in self._parsers_by_header.items(): + if ( + header in response.headers + and response.headers[header] in parser_by_header_value.keys() + ): + return parser_by_header_value[response.headers[header]] + return self._fallback_parser diff --git a/airbyte_cdk/sources/declarative/decoders/decoder.py b/airbyte_cdk/sources/declarative/decoders/decoder.py index 5fa9dc8f6..d195caeac 100644 --- a/airbyte_cdk/sources/declarative/decoders/decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/decoder.py @@ -8,6 +8,17 @@ import requests +COMPRESSSED_RESPONSE_TYPES = [ + "gzip", + "x-gzip", + "gzip, deflate", + "x-gzip, deflate", + "application/zip", + "application/gzip", + "application/x-gzip", + "application/x-zip-compressed", +] + @dataclass class Decoder: @@ -30,3 +41,12 @@ def decode( :param response: the response to decode :return: Generator of Mapping describing the response """ + + def is_compressed_response(self, response: requests.Response) -> bool: + """ + Check if the response is compressed based on the `Content-Encoding` or `Content-Type` header. + """ + return ( + response.headers.get("Content-Encoding") in COMPRESSSED_RESPONSE_TYPES + or response.headers.get("Content-Type") in COMPRESSSED_RESPONSE_TYPES + ) diff --git a/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py b/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py index a937a1e4d..d2642acbb 100644 --- a/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py @@ -8,14 +8,11 @@ from io import BytesIO from typing import Any, Generator, MutableMapping -import orjson import requests from airbyte_cdk.models import FailureType from airbyte_cdk.sources.declarative.decoders import Decoder -from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import ( - Parser, -) +from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import Parser from airbyte_cdk.utils import AirbyteTracedException logger = logging.getLogger("airbyte") @@ -37,7 +34,9 @@ def decode( unzipped_content = zip_file.read(file_name) buffered_content = BytesIO(unzipped_content) try: - yield from self.parser.parse(buffered_content) + yield from self.parser.parse( + buffered_content, + ) except Exception as e: logger.error( f"Failed to parse file: {file_name} from zip file: {response.request.url} with exception {e}." diff --git a/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py b/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py index 76631ee6b..c7fd98c17 100644 --- a/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py +++ b/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py @@ -15,7 +15,6 @@ from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor -EMPTY_STR: str = "" DEFAULT_ENCODING: str = "utf-8" DOWNLOAD_CHUNK_SIZE: int = 1024 * 10 @@ -136,7 +135,6 @@ def _read_with_chunks( """ try: - # TODO: Add support for other file types, like `json`, with `pd.read_json()` with open(path, "r", encoding=file_encoding) as data: chunks = pd.read_csv( data, chunksize=chunk_size, iterator=True, dialect="unix", dtype=object diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 1c2289c17..31b8595d9 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -513,6 +513,17 @@ SchemaNormalizationModel.Default: TransformConfig.DefaultSchemaNormalization, } +_COMPRESSED_RESPONSE_TYPES = { + "gzip", + "x-gzip", + "gzip, deflate", + "x-gzip, deflate", + "application/zip", + "application/gzip", + "application/x-gzip", + "application/x-zip-compressed", +} + class ModelToComponentFactory: EPOCH_DATETIME_FORMAT = "%s" @@ -2193,18 +2204,29 @@ def create_csv_decoder(self, model: CsvDecoderModel, config: Config, **kwargs: A stream_response=False if self._emit_connector_builder_messages else True, ) - @staticmethod - def create_jsonl_decoder(model: JsonlDecoderModel, config: Config, **kwargs: Any) -> Decoder: + def create_jsonl_decoder( + self, model: JsonlDecoderModel, config: Config, **kwargs: Any + ) -> Decoder: return CompositeRawDecoder( - parser=ModelToComponentFactory._get_parser(model, config), stream_response=True + parser=ModelToComponentFactory._get_parser(model, config), + stream_response=False if self._emit_connector_builder_messages else True, ) def create_gzip_decoder( self, model: GzipDecoderModel, config: Config, **kwargs: Any ) -> Decoder: - return CompositeRawDecoder( - parser=ModelToComponentFactory._get_parser(model, config), - stream_response=False if self._emit_connector_builder_messages else True, + gzip_parser: GzipParser = ModelToComponentFactory._get_parser(model, config) # type: ignore # based on the model, we know this will be a GzipParser + + if self._emit_connector_builder_messages: + # This is very surprising but if the response is not streamed, CompositeRawDecoder calls response.content and the requests library actually uncompress the data as opposed to response.raw which uses urllib3 directly and does not uncompress the data + return CompositeRawDecoder(gzip_parser.inner_parser, False) + + return CompositeRawDecoder.by_headers( + [ + ({"Content-Encoding", "Content-Type"}, _COMPRESSED_RESPONSE_TYPES, gzip_parser) + ], + stream_response=True, + fallback_parser=gzip_parser.inner_parser, ) @staticmethod @@ -2753,7 +2775,10 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie ) paginator = ( self._create_component_from_model( - model=model.download_paginator, decoder=decoder, config=config, url_base="" + model=model.download_paginator, + decoder=decoder, + config=config, + url_base="", ) if model.download_paginator else NoPagination(parameters={}) @@ -2870,7 +2895,10 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie model=model.status_extractor, decoder=decoder, config=config, name=name ) download_target_extractor = self._create_component_from_model( - model=model.download_target_extractor, decoder=decoder, config=config, name=name + model=model.download_target_extractor, + decoder=decoder, + config=config, + name=name, ) job_repository: AsyncJobRepository = AsyncHttpJobRepository( creation_requester=creation_requester, diff --git a/airbyte_cdk/sources/declarative/requesters/http_requester.py b/airbyte_cdk/sources/declarative/requesters/http_requester.py index 8a64fae60..45671fc59 100644 --- a/airbyte_cdk/sources/declarative/requesters/http_requester.py +++ b/airbyte_cdk/sources/declarative/requesters/http_requester.py @@ -16,7 +16,9 @@ ) from airbyte_cdk.sources.declarative.decoders import Decoder from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder -from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import ( + InterpolatedString, +) from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import ( InterpolatedRequestOptionsProvider, ) @@ -26,7 +28,10 @@ from airbyte_cdk.sources.streams.http import HttpClient from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler from airbyte_cdk.sources.types import Config, EmptyString, StreamSlice, StreamState -from airbyte_cdk.utils.mapping_helpers import combine_mappings, get_interpolation_context +from airbyte_cdk.utils.mapping_helpers import ( + combine_mappings, + get_interpolation_context, +) @dataclass @@ -155,7 +160,9 @@ def get_request_params( next_page_token: Optional[Mapping[str, Any]] = None, ) -> MutableMapping[str, Any]: return self._request_options_provider.get_request_params( - stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token + stream_state=stream_state, + stream_slice=stream_slice, + next_page_token=next_page_token, ) def get_request_headers( @@ -166,7 +173,9 @@ def get_request_headers( next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: return self._request_options_provider.get_request_headers( - stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token + stream_state=stream_state, + stream_slice=stream_slice, + next_page_token=next_page_token, ) # fixing request options provider types has a lot of dependencies @@ -195,7 +204,9 @@ def get_request_body_json( # type: ignore next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[Mapping[str, Any]]: return self._request_options_provider.get_request_body_json( - stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token + stream_state=stream_state, + stream_slice=stream_slice, + next_page_token=next_page_token, ) @property @@ -350,9 +361,24 @@ def _join_url(cls, url_base: str, path: str) -> str: path (str): The path to join with the base URL. Returns: - str: The concatenated URL with the trailing slash (if any) removed. + str: The resulting joined URL. + + Note: + Related issue: https://github.com/airbytehq/airbyte-internal-issues/issues/11869 + - If the path is an empty string or None, the method returns the base URL with any trailing slash removed. + + Example: + 1) _join_url("https://example.com/api/", "endpoint") >> 'https://example.com/api/endpoint' + 2) _join_url("https://example.com/api", "/endpoint") >> 'https://example.com/api/endpoint' + 3) _join_url("https://example.com/api/", "") >> 'https://example.com/api' + 4) _join_url("https://example.com/api", None) >> 'https://example.com/api' """ - return urljoin(url_base, path).rstrip("/") + + # return a full-url if provided directly from interpolation context + if path == EmptyString or path is None: + return url_base.rstrip("/") + + return urljoin(url_base, path) def send_request( self, diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index 39e74d8e6..02c0993b6 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -8,7 +8,8 @@ from http.server import BaseHTTPRequestHandler, HTTPServer from io import BytesIO, StringIO from threading import Thread -from unittest.mock import patch +from typing import Iterable +from unittest.mock import Mock, patch import pytest import requests @@ -68,6 +69,7 @@ def test_composite_raw_decoder_gzip_csv_parser(requests_mock, encoding: str): "GET", "https://airbyte.io/", content=generate_csv(encoding=encoding, delimiter="\t", should_compress=True), + headers={"Content-Encoding": "gzip"}, ) response = requests.get("https://airbyte.io/", stream=True) @@ -81,7 +83,7 @@ def test_composite_raw_decoder_gzip_csv_parser(requests_mock, encoding: str): assert counter == 3 -def generate_jsonlines(): +def generate_jsonlines() -> Iterable[str]: """ Generator function to yield data in JSON Lines format. This is useful for streaming large datasets. @@ -107,12 +109,57 @@ def generate_compressed_jsonlines(encoding: str = "utf-8") -> bytes: @pytest.mark.parametrize("encoding", ["utf-8", "utf", "iso-8859-1"]) def test_composite_raw_decoder_gzip_jsonline_parser(requests_mock, encoding: str): requests_mock.register_uri( - "GET", "https://airbyte.io/", content=generate_compressed_jsonlines(encoding=encoding) + "GET", + "https://airbyte.io/", + content=generate_compressed_jsonlines(encoding=encoding), ) response = requests.get("https://airbyte.io/", stream=True) parser = GzipParser(inner_parser=JsonLineParser(encoding=encoding)) - composite_raw_decoder = CompositeRawDecoder(parser=parser) + composite_raw_decoder = CompositeRawDecoder(parser) + counter = 0 + for _ in composite_raw_decoder.decode(response): + counter += 1 + assert counter == 3 + + +def test_given_header_match_when_decode_then_select_parser(requests_mock): + requests_mock.register_uri( + "GET", + "https://airbyte.io/", + content=generate_compressed_jsonlines(), + headers={"Content-Encoding": "gzip"}, + ) + response = requests.get("https://airbyte.io/", stream=True) + + parser = GzipParser(inner_parser=JsonLineParser()) + unused_parser = Mock() + composite_raw_decoder = CompositeRawDecoder.by_headers( + [({"Content-Encoding"}, {"gzip"}, parser)], + stream_response=True, + fallback_parser=unused_parser, + ) + counter = 0 + for _ in composite_raw_decoder.decode(response): + counter += 1 + assert counter == 3 + + +def test_given_header_does_not_match_when_decode_then_select_fallback_parser(requests_mock): + requests_mock.register_uri( + "GET", + "https://airbyte.io/", + content="".join(generate_jsonlines()).encode("utf-8"), + headers={"Content-Encoding": "not gzip in order to expect fallback"}, + ) + response = requests.get("https://airbyte.io/", stream=True) + + unused_parser = GzipParser(inner_parser=Mock()) + composite_raw_decoder = CompositeRawDecoder.by_headers( + [({"Content-Encoding"}, {"gzip"}, unused_parser)], + stream_response=True, + fallback_parser=JsonLineParser(), + ) counter = 0 for _ in composite_raw_decoder.decode(response): counter += 1 @@ -266,7 +313,8 @@ def test_given_response_is_not_streamed_when_decode_then_can_be_called_multiple_ ) response = requests.get("https://airbyte.io/") composite_raw_decoder = CompositeRawDecoder( - parser=JsonParser(encoding="utf-8"), stream_response=False + parser=JsonParser(encoding="utf-8"), + stream_response=False, ) content = list(composite_raw_decoder.decode(response)) diff --git a/unit_tests/sources/declarative/decoders/test_zipfile_decoder.py b/unit_tests/sources/declarative/decoders/test_zipfile_decoder.py index 731895e2e..f5c988d0f 100644 --- a/unit_tests/sources/declarative/decoders/test_zipfile_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_zipfile_decoder.py @@ -43,7 +43,12 @@ def test_zipfile_decoder_with_single_file_response(requests_mock, json_data): zipfile_decoder = ZipfileDecoder(parser=GzipParser(inner_parser=JsonParser())) compressed_data = gzip.compress(json.dumps(json_data).encode()) zipped_data = create_zip_from_dict(compressed_data) - requests_mock.register_uri("GET", "https://airbyte.io/", content=zipped_data) + requests_mock.register_uri( + "GET", + "https://airbyte.io/", + content=zipped_data, + headers={"Content-Encoding": "application/zip"}, + ) response = requests.get("https://airbyte.io/") if isinstance(json_data, list): diff --git a/unit_tests/sources/declarative/requesters/test_http_requester.py b/unit_tests/sources/declarative/requesters/test_http_requester.py index a1229579f..dfe78011a 100644 --- a/unit_tests/sources/declarative/requesters/test_http_requester.py +++ b/unit_tests/sources/declarative/requesters/test_http_requester.py @@ -825,7 +825,7 @@ def test_send_request_stream_slice_next_page_token(): "test_trailing_slash_on_path", "https://airbyte.io", "/my_endpoint/", - "https://airbyte.io/my_endpoint", + "https://airbyte.io/my_endpoint/", ), ( "test_nested_path_no_leading_slash",