Skip to content

Commit bcc2c36

Browse files
author
Oleksandr Bazarnov
committed
updated after the review + maximes proposal
1 parent 96f41da commit bcc2c36

File tree

6 files changed

+170
-104
lines changed

6 files changed

+170
-104
lines changed

airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py

Lines changed: 79 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -7,44 +7,31 @@
77
import io
88
import json
99
import logging
10-
from abc import ABC, abstractmethod
1110
from dataclasses import dataclass
1211
from io import BufferedIOBase, TextIOWrapper
13-
from typing import Any, Generator, MutableMapping, Optional
12+
from typing import Any, Optional
1413

1514
import orjson
1615
import requests
1716

1817
from airbyte_cdk.models import FailureType
19-
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
18+
from airbyte_cdk.sources.declarative.decoders.decoder import DECODER_OUTPUT_TYPE, Decoder
19+
from airbyte_cdk.sources.declarative.decoders.decoder_parser import (
20+
PARSER_OUTPUT_TYPE,
21+
PARSERS_BY_HEADER_TYPE,
22+
PARSERS_TYPE,
23+
Parser,
24+
)
2025
from airbyte_cdk.utils import AirbyteTracedException
2126

2227
logger = logging.getLogger("airbyte")
2328

2429

25-
@dataclass
26-
class Parser(ABC):
27-
@abstractmethod
28-
def parse(
29-
self,
30-
data: BufferedIOBase,
31-
compressed: Optional[bool] = False,
32-
) -> Generator[MutableMapping[str, Any], None, None]:
33-
"""
34-
Parse data and yield dictionaries.
35-
"""
36-
pass
37-
38-
3930
@dataclass
4031
class GzipParser(Parser):
4132
inner_parser: Parser
4233

43-
def parse(
44-
self,
45-
data: BufferedIOBase,
46-
compressed: Optional[bool] = False,
47-
) -> Generator[MutableMapping[str, Any], None, None]:
34+
def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
4835
"""
4936
Decompress gzipped bytes and pass decompressed data to the inner parser.
5037
@@ -55,22 +42,15 @@ def parse(
5542
- The data is not decoded by default.
5643
"""
5744

58-
if compressed:
59-
with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj:
60-
yield from self.inner_parser.parse(gzipobj)
61-
else:
62-
yield from self.inner_parser.parse(data)
45+
with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj:
46+
yield from self.inner_parser.parse(gzipobj)
6347

6448

6549
@dataclass
6650
class JsonParser(Parser):
6751
encoding: str = "utf-8"
6852

69-
def parse(
70-
self,
71-
data: BufferedIOBase,
72-
compressed: Optional[bool] = False,
73-
) -> Generator[MutableMapping[str, Any], None, None]:
53+
def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
7454
"""
7555
Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data.
7656
"""
@@ -110,11 +90,7 @@ def _parse_json(self, raw_data: bytes) -> Optional[Any]:
11090
class JsonLineParser(Parser):
11191
encoding: Optional[str] = "utf-8"
11292

113-
def parse(
114-
self,
115-
data: BufferedIOBase,
116-
compressed: Optional[bool] = False,
117-
) -> Generator[MutableMapping[str, Any], None, None]:
93+
def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
11894
for line in data:
11995
try:
12096
yield json.loads(line.decode(encoding=self.encoding or "utf-8"))
@@ -138,11 +114,7 @@ def _get_delimiter(self) -> Optional[str]:
138114

139115
return self.delimiter
140116

141-
def parse(
142-
self,
143-
data: BufferedIOBase,
144-
compressed: Optional[bool] = False,
145-
) -> Generator[MutableMapping[str, Any], None, None]:
117+
def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
146118
"""
147119
Parse CSV data from decompressed bytes.
148120
"""
@@ -152,10 +124,9 @@ def parse(
152124
yield row
153125

154126

155-
@dataclass
156127
class CompositeRawDecoder(Decoder):
157128
"""
158-
Decoder strategy to transform a requests.Response into a Generator[MutableMapping[str, Any], None, None]
129+
Decoder strategy to transform a requests.Response into a PARSER_OUTPUT_TYPE
159130
passed response.raw to parser(s).
160131
161132
Note: response.raw is not decoded/decompressed by default. Parsers should be instantiated recursively.
@@ -168,26 +139,80 @@ class CompositeRawDecoder(Decoder):
168139
)
169140
"""
170141

171-
parser: Parser
172-
stream_response: bool = True
142+
def __init__(
143+
self,
144+
parser: Parser,
145+
stream_response: bool = True,
146+
parsers_by_header: PARSERS_BY_HEADER_TYPE = None,
147+
) -> None:
148+
# since we moved from using `dataclass` to `__init__` method,
149+
# we need to keep using the `parser` to be able to resolve the depenencies
150+
# between the parsers correctly.
151+
self.parser = parser
152+
153+
self._parsers_by_header = parsers_by_header if parsers_by_header else {}
154+
self._stream_response = stream_response
155+
156+
@classmethod
157+
def by_headers(
158+
cls,
159+
parsers: PARSERS_TYPE,
160+
stream_response: bool,
161+
fallback_parser: Parser,
162+
) -> "CompositeRawDecoder":
163+
"""
164+
Create a CompositeRawDecoder instance based on header values.
165+
166+
Args:
167+
parsers (PARSERS_TYPE): A list of tuples where each tuple contains headers, header values, and a parser.
168+
stream_response (bool): A flag indicating whether the response should be streamed.
169+
fallback_parser (Parser): A parser to use if no matching header is found.
170+
171+
Returns:
172+
CompositeRawDecoder: An instance of CompositeRawDecoder configured with the provided parsers.
173+
"""
174+
parsers_by_header = {}
175+
for headers, header_values, parser in parsers:
176+
for header in headers:
177+
parsers_by_header[header] = {header_value: parser for header_value in header_values}
178+
return cls(fallback_parser, stream_response, parsers_by_header)
173179

174180
def is_stream_response(self) -> bool:
175-
return self.stream_response
181+
return self._stream_response
176182

177-
def decode(
178-
self,
179-
response: requests.Response,
180-
) -> Generator[MutableMapping[str, Any], None, None]:
183+
def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE:
184+
parser = self._select_parser(response)
181185
if self.is_stream_response():
182186
# urllib mentions that some interfaces don't play nice with auto_close
183187
# More info here: https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content
184188
# We have indeed observed some issues with CSV parsing.
185189
# Hence, we will manage the closing of the file ourselves until we find a better solution.
186190
response.raw.auto_close = False
187-
yield from self.parser.parse(
191+
yield from parser.parse(
188192
data=response.raw, # type: ignore[arg-type]
189-
compressed=self.is_compressed_response(response),
190193
)
191194
response.raw.close()
192195
else:
193-
yield from self.parser.parse(data=io.BytesIO(response.content))
196+
yield from parser.parse(data=io.BytesIO(response.content))
197+
198+
def _select_parser(self, response: requests.Response) -> Parser:
199+
"""
200+
Selects the appropriate parser based on the response headers.
201+
202+
This method iterates through the `_parsers_by_header` dictionary to find a matching parser
203+
based on the headers in the response. If a matching header and header value are found,
204+
the corresponding parser is returned. If no match is found, the default parser is returned.
205+
206+
Args:
207+
response (requests.Response): The HTTP response object containing headers to check.
208+
209+
Returns:
210+
Parser: The parser corresponding to the matched header value, or the default parser if no match is found.
211+
"""
212+
for header, parser_by_header_value in self._parsers_by_header.items():
213+
if (
214+
header in response.headers
215+
and response.headers[header] in parser_by_header_value.keys()
216+
):
217+
return parser_by_header_value[response.headers[header]]
218+
return self.parser

airbyte_cdk/sources/declarative/decoders/decoder.py

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,7 @@
88

99
import requests
1010

11-
COMPRESSSED_RESPONSE_TYPES = [
12-
"gzip",
13-
"x-gzip",
14-
"gzip, deflate",
15-
"x-gzip, deflate",
16-
"application/zip",
17-
"application/gzip",
18-
"application/x-gzip",
19-
"application/x-zip-compressed",
20-
]
11+
DECODER_OUTPUT_TYPE = Generator[MutableMapping[str, Any], None, None]
2112

2213

2314
@dataclass
@@ -33,20 +24,9 @@ def is_stream_response(self) -> bool:
3324
"""
3425

3526
@abstractmethod
36-
def decode(
37-
self, response: requests.Response
38-
) -> Generator[MutableMapping[str, Any], None, None]:
27+
def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE:
3928
"""
4029
Decodes a requests.Response into a Mapping[str, Any] or an array
4130
:param response: the response to decode
4231
:return: Generator of Mapping describing the response
4332
"""
44-
45-
def is_compressed_response(self, response: requests.Response) -> bool:
46-
"""
47-
Check if the response is compressed based on the `Content-Encoding` or `Content-Type` header.
48-
"""
49-
return (
50-
response.headers.get("Content-Encoding") in COMPRESSSED_RESPONSE_TYPES
51-
or response.headers.get("Content-Type") in COMPRESSSED_RESPONSE_TYPES
52-
)
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
6+
import logging
7+
from abc import ABC, abstractmethod
8+
from dataclasses import dataclass
9+
from io import BufferedIOBase
10+
from typing import Any, Dict, Generator, List, MutableMapping, Optional, Set, Tuple
11+
12+
logger = logging.getLogger("airbyte")
13+
14+
15+
PARSER_OUTPUT_TYPE = Generator[MutableMapping[str, Any], None, None]
16+
17+
18+
@dataclass
19+
class Parser(ABC):
20+
@abstractmethod
21+
def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
22+
"""
23+
Parse data and yield dictionaries.
24+
"""
25+
pass
26+
27+
28+
# reusable parser types
29+
PARSERS_TYPE = List[Tuple[Set[str], Set[str], Parser]]
30+
PARSERS_BY_HEADER_TYPE = Optional[Dict[str, Dict[str, Parser]]]

airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@
66
import zipfile
77
from dataclasses import dataclass
88
from io import BytesIO
9-
from typing import Any, Generator, MutableMapping
109

1110
import requests
1211

1312
from airbyte_cdk.models import FailureType
1413
from airbyte_cdk.sources.declarative.decoders import Decoder
1514
from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import Parser
15+
from airbyte_cdk.sources.declarative.decoders.decoder import DECODER_OUTPUT_TYPE
1616
from airbyte_cdk.utils import AirbyteTracedException
1717

1818
logger = logging.getLogger("airbyte")
@@ -25,9 +25,7 @@ class ZipfileDecoder(Decoder):
2525
def is_stream_response(self) -> bool:
2626
return False
2727

28-
def decode(
29-
self, response: requests.Response
30-
) -> Generator[MutableMapping[str, Any], None, None]:
28+
def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE:
3129
try:
3230
with zipfile.ZipFile(BytesIO(response.content)) as zip_file:
3331
for file_name in zip_file.namelist():
@@ -36,7 +34,6 @@ def decode(
3634
try:
3735
yield from self.parser.parse(
3836
buffered_content,
39-
compressed=self.is_compressed_response(response),
4037
)
4138
except Exception as e:
4239
logger.error(

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2204,9 +2204,29 @@ def create_jsonl_decoder(
22042204
def create_gzip_decoder(
22052205
self, model: GzipDecoderModel, config: Config, **kwargs: Any
22062206
) -> Decoder:
2207-
return CompositeRawDecoder(
2208-
parser=ModelToComponentFactory._get_parser(model, config),
2209-
stream_response=False if self._emit_connector_builder_messages else True,
2207+
_compressed_response_types = {
2208+
"gzip",
2209+
"x-gzip",
2210+
"gzip, deflate",
2211+
"x-gzip, deflate",
2212+
"application/zip",
2213+
"application/gzip",
2214+
"application/x-gzip",
2215+
"application/x-zip-compressed",
2216+
}
2217+
2218+
gzip_parser: GzipParser = ModelToComponentFactory._get_parser(model, config) # type: ignore # based on the model, we know this will be a GzipParser
2219+
2220+
if self._emit_connector_builder_messages:
2221+
# This is very surprising but if the response is not streamed,
2222+
# CompositeRawDecoder calls response.content and the requests library actually uncompress the data as opposed to response.raw,
2223+
# which uses urllib3 directly and does not uncompress the data.
2224+
return CompositeRawDecoder(gzip_parser.inner_parser, False)
2225+
2226+
return CompositeRawDecoder.by_headers(
2227+
[({"Content-Encoding", "Content-Type"}, _compressed_response_types, gzip_parser)],
2228+
stream_response=True,
2229+
fallback_parser=gzip_parser.inner_parser,
22102230
)
22112231

22122232
@staticmethod

0 commit comments

Comments
 (0)