Skip to content

Commit 3750f6d

Browse files
author
Oleksandr Bazarnov
committed
updated compression types checks
1 parent f1a71a6 commit 3750f6d

File tree

3 files changed

+26
-28
lines changed

3 files changed

+26
-28
lines changed

airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,6 @@
2222
logger = logging.getLogger("airbyte")
2323

2424

25-
COMPRESSION_TYPES = [
26-
"gzip",
27-
"x-gzip",
28-
"gzip, deflate",
29-
"x-gzip, deflate",
30-
"application/zip",
31-
"application/gzip",
32-
"application/x-gzip",
33-
"application/x-zip-compressed",
34-
]
35-
36-
3725
@dataclass
3826
class Parser(ABC):
3927
@abstractmethod
@@ -186,12 +174,6 @@ class CompositeRawDecoder(Decoder):
186174
def is_stream_response(self) -> bool:
187175
return self.stream_response
188176

189-
def is_compressed(self, response: requests.Response) -> bool:
190-
"""
191-
Check if the response is compressed based on the Content-Encoding header.
192-
"""
193-
return response.headers.get("Content-Encoding") in COMPRESSION_TYPES
194-
195177
def decode(
196178
self,
197179
response: requests.Response,
@@ -202,7 +184,10 @@ def decode(
202184
# We have indeed observed some issues with CSV parsing.
203185
# Hence, we will manage the closing of the file ourselves until we find a better solution.
204186
response.raw.auto_close = False
205-
yield from self.parser.parse(data=response.raw, compressed=self.is_compressed(response)) # type: ignore[arg-type]
187+
yield from self.parser.parse(
188+
data=response.raw, # type: ignore[arg-type]
189+
compressed=self.is_compressed_response(response),
190+
)
206191
response.raw.close()
207192
else:
208193
yield from self.parser.parse(data=io.BytesIO(response.content))

airbyte_cdk/sources/declarative/decoders/decoder.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,17 @@
88

99
import requests
1010

11+
COMPRESSION_RESPONSE_TYPES = [
12+
"gzip",
13+
"x-gzip",
14+
"gzip, deflate",
15+
"x-gzip, deflate",
16+
"application/zip",
17+
"application/gzip",
18+
"application/x-gzip",
19+
"application/x-zip-compressed",
20+
]
21+
1122

1223
@dataclass
1324
class Decoder:
@@ -30,3 +41,12 @@ def decode(
3041
:param response: the response to decode
3142
:return: Generator of Mapping describing the response
3243
"""
44+
45+
def is_compressed_response(self, response: requests.Response) -> bool:
46+
"""
47+
Check if the response is compressed based on the Content-Encoding header.
48+
"""
49+
return (
50+
response.headers.get("Content-Encoding") in COMPRESSION_RESPONSE_TYPES
51+
or response.headers.get("Content-Type") in COMPRESSION_RESPONSE_TYPES
52+
)

airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,11 @@
88
from io import BytesIO
99
from typing import Any, Generator, MutableMapping
1010

11-
import orjson
1211
import requests
1312

1413
from airbyte_cdk.models import FailureType
1514
from airbyte_cdk.sources.declarative.decoders import Decoder
16-
from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import COMPRESSION_TYPES, Parser
15+
from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import Parser
1716
from airbyte_cdk.utils import AirbyteTracedException
1817

1918
logger = logging.getLogger("airbyte")
@@ -26,12 +25,6 @@ class ZipfileDecoder(Decoder):
2625
def is_stream_response(self) -> bool:
2726
return False
2827

29-
def is_compressed(self, response: requests.Response) -> bool:
30-
"""
31-
Check if the response is compressed based on the Content-Encoding header.
32-
"""
33-
return response.headers.get("Content-Encoding") in COMPRESSION_TYPES
34-
3528
def decode(
3629
self, response: requests.Response
3730
) -> Generator[MutableMapping[str, Any], None, None]:
@@ -43,7 +36,7 @@ def decode(
4336
try:
4437
yield from self.parser.parse(
4538
buffered_content,
46-
compressed=self.is_compressed(response),
39+
compressed=self.is_compressed_response(response),
4740
)
4841
except Exception as e:
4942
logger.error(

0 commit comments

Comments
 (0)