Skip to content

Commit 2bde838

Browse files
author
maxi297
committed
Proposal for generic composite raw decoder
1 parent 96f41da commit 2bde838

File tree

4 files changed

+75
-41
lines changed

4 files changed

+75
-41
lines changed

airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from abc import ABC, abstractmethod
1111
from dataclasses import dataclass
1212
from io import BufferedIOBase, TextIOWrapper
13-
from typing import Any, Generator, MutableMapping, Optional
13+
from typing import Any, Dict, Generator, List, MutableMapping, Optional, Set, Tuple
1414

1515
import orjson
1616
import requests
@@ -28,7 +28,6 @@ class Parser(ABC):
2828
def parse(
2929
self,
3030
data: BufferedIOBase,
31-
compressed: Optional[bool] = False,
3231
) -> Generator[MutableMapping[str, Any], None, None]:
3332
"""
3433
Parse data and yield dictionaries.
@@ -43,7 +42,6 @@ class GzipParser(Parser):
4342
def parse(
4443
self,
4544
data: BufferedIOBase,
46-
compressed: Optional[bool] = False,
4745
) -> Generator[MutableMapping[str, Any], None, None]:
4846
"""
4947
Decompress gzipped bytes and pass decompressed data to the inner parser.
@@ -55,11 +53,8 @@ def parse(
5553
- The data is not decoded by default.
5654
"""
5755

58-
if compressed:
59-
with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj:
60-
yield from self.inner_parser.parse(gzipobj)
61-
else:
62-
yield from self.inner_parser.parse(data)
56+
with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj:
57+
yield from self.inner_parser.parse(gzipobj)
6358

6459

6560
@dataclass
@@ -69,7 +64,6 @@ class JsonParser(Parser):
6964
def parse(
7065
self,
7166
data: BufferedIOBase,
72-
compressed: Optional[bool] = False,
7367
) -> Generator[MutableMapping[str, Any], None, None]:
7468
"""
7569
Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data.
@@ -113,7 +107,6 @@ class JsonLineParser(Parser):
113107
def parse(
114108
self,
115109
data: BufferedIOBase,
116-
compressed: Optional[bool] = False,
117110
) -> Generator[MutableMapping[str, Any], None, None]:
118111
for line in data:
119112
try:
@@ -141,7 +134,6 @@ def _get_delimiter(self) -> Optional[str]:
141134
def parse(
142135
self,
143136
data: BufferedIOBase,
144-
compressed: Optional[bool] = False,
145137
) -> Generator[MutableMapping[str, Any], None, None]:
146138
"""
147139
Parse CSV data from decompressed bytes.
@@ -152,7 +144,9 @@ def parse(
152144
yield row
153145

154146

155-
@dataclass
147+
_HEADER = str
148+
_HEADER_VALUE = str
149+
156150
class CompositeRawDecoder(Decoder):
157151
"""
158152
Decoder strategy to transform a requests.Response into a Generator[MutableMapping[str, Any], None, None]
@@ -168,26 +162,46 @@ class CompositeRawDecoder(Decoder):
168162
)
169163
"""
170164

171-
parser: Parser
172-
stream_response: bool = True
165+
@classmethod
166+
def by_headers(cls, parsers: List[Tuple[Set[_HEADER], Set[_HEADER_VALUE], Parser]], stream_response: bool, fallback_parser: Parser) -> "CompositeRawDecoder":
167+
parsers_by_header = {}
168+
for headers, header_values, parser in parsers:
169+
for header in headers:
170+
parsers_by_header[header] = {header_value: parser for header_value in header_values}
171+
return cls(fallback_parser, stream_response, parsers_by_header)
172+
173+
@classmethod
174+
def from_parser(cls, parser: Parser, stream_response: bool) -> "CompositeRawDecoder":
175+
return cls(parser, stream_response, {})
176+
177+
def __init__(self, parser: Parser, stream_response: bool = True, parsers_by_header: Optional[Dict[_HEADER, Dict[_HEADER_VALUE, Parser]]] = None) -> None:
178+
self._parsers_by_header = parsers_by_header if parsers_by_header else {}
179+
self._fallback_parser = parser
180+
self._stream_response = stream_response
173181

174182
def is_stream_response(self) -> bool:
175-
return self.stream_response
183+
return self._stream_response
176184

177185
def decode(
178186
self,
179187
response: requests.Response,
180188
) -> Generator[MutableMapping[str, Any], None, None]:
189+
parser = self._select_parser(response)
181190
if self.is_stream_response():
182191
# urllib mentions that some interfaces don't play nice with auto_close
183192
# More info here: https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content
184193
# We have indeed observed some issues with CSV parsing.
185194
# Hence, we will manage the closing of the file ourselves until we find a better solution.
186195
response.raw.auto_close = False
187-
yield from self.parser.parse(
196+
yield from parser.parse(
188197
data=response.raw, # type: ignore[arg-type]
189-
compressed=self.is_compressed_response(response),
190198
)
191199
response.raw.close()
192200
else:
193-
yield from self.parser.parse(data=io.BytesIO(response.content))
201+
yield from parser.parse(data=io.BytesIO(response.content))
202+
203+
def _select_parser(self, response: requests.Response) -> Parser:
204+
for header, parser_by_header_value in self._parsers_by_header.items():
205+
if header in response.headers and response.headers[header] in parser_by_header_value.keys():
206+
return parser_by_header_value[response.headers[header]]
207+
return self._fallback_parser

airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ def decode(
3636
try:
3737
yield from self.parser.parse(
3838
buffered_content,
39-
compressed=self.is_compressed_response(response),
4039
)
4140
except Exception as e:
4241
logger.error(

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,17 @@
513513
SchemaNormalizationModel.Default: TransformConfig.DefaultSchemaNormalization,
514514
}
515515

516+
_COMPRESSED_RESPONSE_TYPES = {
517+
"gzip",
518+
"x-gzip",
519+
"gzip, deflate",
520+
"x-gzip, deflate",
521+
"application/zip",
522+
"application/gzip",
523+
"application/x-gzip",
524+
"application/x-zip-compressed",
525+
}
526+
516527

517528
class ModelToComponentFactory:
518529
EPOCH_DATETIME_FORMAT = "%s"
@@ -2204,9 +2215,13 @@ def create_jsonl_decoder(
22042215
def create_gzip_decoder(
22052216
self, model: GzipDecoderModel, config: Config, **kwargs: Any
22062217
) -> Decoder:
2207-
return CompositeRawDecoder(
2208-
parser=ModelToComponentFactory._get_parser(model, config),
2218+
gzip_parser: GzipParser = ModelToComponentFactory._get_parser(model, config) # type: ignore # based on the model, we know this will be a GzipParser
2219+
return CompositeRawDecoder.by_headers(
2220+
[
2221+
({"Content-Encoding", "Content-Type"}, _COMPRESSED_RESPONSE_TYPES, gzip_parser)
2222+
],
22092223
stream_response=False if self._emit_connector_builder_messages else True,
2224+
fallback_parser=gzip_parser.inner_parser,
22102225
)
22112226

22122227
@staticmethod

unit_tests/sources/declarative/decoders/test_composite_decoder.py

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
from http.server import BaseHTTPRequestHandler, HTTPServer
99
from io import BytesIO, StringIO
1010
from threading import Thread
11-
from unittest.mock import patch
11+
from typing import Iterable
12+
from unittest.mock import Mock, patch
1213

1314
import pytest
1415
import requests
@@ -82,7 +83,7 @@ def test_composite_raw_decoder_gzip_csv_parser(requests_mock, encoding: str):
8283
assert counter == 3
8384

8485

85-
def generate_jsonlines():
86+
def generate_jsonlines() -> Iterable[str]:
8687
"""
8788
Generator function to yield data in JSON Lines format.
8889
This is useful for streaming large datasets.
@@ -111,41 +112,46 @@ def test_composite_raw_decoder_gzip_jsonline_parser(requests_mock, encoding: str
111112
"GET",
112113
"https://airbyte.io/",
113114
content=generate_compressed_jsonlines(encoding=encoding),
114-
headers={"Content-Encoding": "gzip"},
115115
)
116116
response = requests.get("https://airbyte.io/", stream=True)
117117

118118
parser = GzipParser(inner_parser=JsonLineParser(encoding=encoding))
119-
composite_raw_decoder = CompositeRawDecoder(parser=parser)
119+
composite_raw_decoder = CompositeRawDecoder(parser)
120120
counter = 0
121121
for _ in composite_raw_decoder.decode(response):
122122
counter += 1
123123
assert counter == 3
124124

125125

126-
@pytest.mark.parametrize("encoding", ["utf-8", "utf", "iso-8859-1"])
127-
def test_composite_raw_decoder_gzip_jsonline_parser_decodes_non_gzipped_raw_response(
128-
requests_mock, encoding: str
129-
) -> None:
130-
"""
131-
Test the GzipParser with a non-compressed response.
132-
"""
126+
def test_given_header_match_when_decode_then_select_parser(requests_mock):
127+
requests_mock.register_uri(
128+
"GET",
129+
"https://airbyte.io/",
130+
content=generate_compressed_jsonlines(),
131+
headers={"Content-Encoding": "gzip"},
132+
)
133+
response = requests.get("https://airbyte.io/", stream=True)
133134

135+
parser = GzipParser(inner_parser=JsonLineParser())
136+
unused_parser = Mock()
137+
composite_raw_decoder = CompositeRawDecoder.by_headers([({"Content-Encoding"}, {"gzip"}, parser)], stream_response=True, fallback_parser=unused_parser)
138+
counter = 0
139+
for _ in composite_raw_decoder.decode(response):
140+
counter += 1
141+
assert counter == 3
142+
143+
144+
def test_given_header_does_not_match_when_decode_then_select_fallback_parser(requests_mock):
134145
requests_mock.register_uri(
135146
"GET",
136147
"https://airbyte.io/",
137-
# we encode the jsonl content as bytes here
138-
content="".join(generate_jsonlines()).encode(encoding),
139-
# we don't specify the `Content-Encoding` header here
140-
# to simulate a non-compressed response
141-
# but we still use the GzipParser to decode it
142-
# to test the GzipParser's behavior with non-compressed data
143-
# and to ensure it doesn't raise an error.
148+
content="".join(generate_jsonlines()).encode("utf-8"),
149+
headers={"Content-Encoding": "not gzip in order to expect fallback"},
144150
)
145151
response = requests.get("https://airbyte.io/", stream=True)
146152

147-
parser = GzipParser(inner_parser=JsonLineParser(encoding=encoding))
148-
composite_raw_decoder = CompositeRawDecoder(parser=parser)
153+
unused_parser = GzipParser(inner_parser=Mock())
154+
composite_raw_decoder = CompositeRawDecoder.by_headers([({"Content-Encoding"}, {"gzip"}, unused_parser)], stream_response=True, fallback_parser=JsonLineParser())
149155
counter = 0
150156
for _ in composite_raw_decoder.decode(response):
151157
counter += 1

0 commit comments

Comments
 (0)