Skip to content

Commit 030f106

Browse files
author
Oleksandr Bazarnov
committed
updated after the review
1 parent 5167da5 commit 030f106

File tree

5 files changed

+94
-169
lines changed

5 files changed

+94
-169
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1678,10 +1678,6 @@ definitions:
16781678
type:
16791679
type: string
16801680
enum: [ResponseToFileExtractor]
1681-
file_type:
1682-
title: The file type in which the response data is storred. Supported types are [csv, jsonl].
1683-
type: string
1684-
default: csv
16851681
$parameters:
16861682
type: object
16871683
additionalProperties: true

airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,36 @@ def parse(
3535
class GzipParser(Parser):
3636
inner_parser: Parser
3737

38+
def _reset_reader_pointer(self, data: BufferedIOBase) -> None:
39+
"""
40+
Reset the reader pointer to the beginning of the data.
41+
42+
Note:
43+
- This is necessary because the gzip decompression will consume the data stream.
44+
"""
45+
data.seek(0)
46+
3847
def parse(
3948
self,
4049
data: BufferedIOBase,
4150
) -> Generator[MutableMapping[str, Any], None, None]:
4251
"""
4352
Decompress gzipped bytes and pass decompressed data to the inner parser.
53+
54+
IMPORTANT:
55+
- If the data is not gzipped, reset the pointer and pass the data to the inner parser as is.
56+
57+
Note:
58+
- The data is not decoded by default.
4459
"""
45-
with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj:
46-
yield from self.inner_parser.parse(gzipobj)
60+
61+
try:
62+
with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj:
63+
yield from self.inner_parser.parse(gzipobj)
64+
except gzip.BadGzipFile:
65+
logger.warning(f"GzipParser(): Received non-gzipped data, parsing the data as is.")
66+
self._reset_reader_pointer(data)
67+
yield from self.inner_parser.parse(data)
4768

4869

4970
@dataclass

airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py

Lines changed: 49 additions & 143 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import zlib
88
from contextlib import closing
99
from dataclasses import InitVar, dataclass
10-
from enum import Enum
1110
from typing import Any, Dict, Iterable, Mapping, Optional, Tuple
1211

1312
import pandas as pd
@@ -21,65 +20,28 @@
2120
DOWNLOAD_CHUNK_SIZE: int = 1024 * 10
2221

2322

24-
class FileTypes(Enum):
25-
CSV = "csv"
26-
JSONL = "jsonl"
27-
28-
2923
@dataclass
3024
class ResponseToFileExtractor(RecordExtractor):
3125
"""
32-
This class is used when having very big HTTP responses (usually streamed),
33-
which would require too much memory so we use disk space as a tradeoff.
34-
35-
The extractor does the following:
36-
1) Save the response to a temporary file
37-
2) Read from the temporary file by chunks to avoid OOM
38-
3) Remove the temporary file after reading
39-
4) Return the records
40-
5) If the response is not compressed, it will be filtered for null bytes
41-
6) If the response is compressed, it will be decompressed
42-
7) If the response is compressed and contains null bytes, it will be filtered for null bytes
26+
This class is used when having very big HTTP responses (usually streamed) which would require too much memory so we use disk space as
27+
a tradeoff.
4328
29+
Eventually, we want to support multiple file type by re-using the file based CDK parsers if possible. However, the lift is too high for
30+
a first iteration so we will only support CSV parsing using pandas as salesforce and sendgrid were doing.
4431
"""
4532

4633
parameters: InitVar[Mapping[str, Any]]
47-
file_type: Optional[str] = "csv"
4834

4935
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
5036
self.logger = logging.getLogger("airbyte")
5137

52-
def extract_records(
53-
self, response: Optional[requests.Response] = None
54-
) -> Iterable[Mapping[str, Any]]:
55-
"""
56-
Extracts records from the given response by:
57-
1) Saving the result to a tmp file
58-
2) Reading from saved file by chunks to avoid OOM
59-
60-
Args:
61-
response (Optional[requests.Response]): The response object containing the data. Defaults to None.
62-
63-
Yields:
64-
Iterable[Mapping[str, Any]]: An iterable of mappings representing the extracted records.
65-
66-
Returns:
67-
None
68-
"""
69-
if response:
70-
file_path, encoding = self._save_to_file(response)
71-
yield from self._read_with_chunks(file_path, encoding)
72-
else:
73-
yield from []
74-
7538
def _get_response_encoding(self, headers: Dict[str, Any]) -> str:
7639
"""
7740
Get the encoding of the response based on the provided headers. This method is heavily inspired by the requests library
7841
implementation.
7942
8043
Args:
8144
headers (Dict[str, Any]): The headers of the response.
82-
8345
Returns:
8446
str: The encoding of the response.
8547
"""
@@ -111,27 +73,10 @@ def _filter_null_bytes(self, b: bytes) -> bytes:
11173

11274
res = b.replace(b"\x00", b"")
11375
if len(res) < len(b):
114-
message = "ResponseToFileExtractor._filter_null_bytes(): Filter 'null' bytes from string, size reduced %d -> %d chars"
115-
self.logger.warning(message, len(b), len(res))
116-
return res
117-
118-
def _get_file_path(self) -> str:
119-
"""
120-
Get a temporary file path with a unique name.
121-
122-
Returns:
123-
str: The path to the temporary file.
124-
125-
Raises:
126-
ValueError: If the file type is not supported.
127-
"""
128-
129-
if self.file_type not in [file_type.value for file_type in FileTypes]:
130-
raise ValueError(
131-
f"ResponseToFileExtractor._get_file_path(): File type {self.file_type} is not supported.",
76+
self.logger.warning(
77+
"Filter 'null' bytes from string, size reduced %d -> %d chars", len(b), len(res)
13278
)
133-
134-
return str(uuid.uuid4()) + "." + self.file_type
79+
return res
13580

13681
def _save_to_file(self, response: requests.Response) -> Tuple[str, str]:
13782
"""
@@ -150,9 +95,8 @@ def _save_to_file(self, response: requests.Response) -> Tuple[str, str]:
15095
decompressor = zlib.decompressobj(zlib.MAX_WBITS | 32)
15196
needs_decompression = True # we will assume at first that the response is compressed and change the flag if not
15297

153-
file_path = self._get_file_path()
154-
# save binary data to tmp file
155-
with closing(response) as response, open(file_path, "wb") as data_file:
98+
tmp_file = str(uuid.uuid4())
99+
with closing(response) as response, open(tmp_file, "wb") as data_file:
156100
response_encoding = self._get_response_encoding(dict(response.headers or {}))
157101
for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE):
158102
try:
@@ -166,76 +110,15 @@ def _save_to_file(self, response: requests.Response) -> Tuple[str, str]:
166110
needs_decompression = False
167111

168112
# check the file exists
169-
if os.path.isfile(file_path):
170-
return file_path, response_encoding
113+
if os.path.isfile(tmp_file):
114+
return tmp_file, response_encoding
171115
else:
172-
message = "ResponseToFileExtractor._save_to_file(): The IO/Error occured while verifying binary data."
173-
raise ValueError(f"{message} Tmp file {file_path} doesn't exist.")
174-
175-
def _read_csv(
176-
self,
177-
path: str,
178-
file_encoding: str,
179-
chunk_size: int = 100,
180-
) -> Iterable[Mapping[str, Any]]:
181-
"""
182-
Reads a CSV file and yields each row as a dictionary.
183-
184-
Args:
185-
path (str): The path to the CSV file to be read.
186-
file_encoding (str): The encoding of the file.
187-
188-
Yields:
189-
Mapping[str, Any]: A dictionary representing each row of data.
190-
"""
191-
192-
csv_read_params = {
193-
"chunksize": chunk_size,
194-
"iterator": True,
195-
"dialect": "unix",
196-
"dtype": object,
197-
"encoding": file_encoding,
198-
}
199-
200-
for chunk in pd.read_csv(path, **csv_read_params): # type: ignore # ignoring how args are passed
201-
# replace NaN with None
202-
chunk = chunk.replace({nan: None}).to_dict(orient="records")
203-
for record in chunk:
204-
yield record
205-
206-
def _read_json_lines(
207-
self,
208-
path: str,
209-
file_encoding: str,
210-
chunk_size: int = 100,
211-
) -> Iterable[Mapping[str, Any]]:
212-
"""
213-
Reads a JSON file and yields each row as a dictionary.
214-
215-
Args:
216-
path (str): The path to the JSON file to be read.
217-
file_encoding (str): The encoding of the file.
218-
219-
Yields:
220-
Mapping[str, Any]: A dictionary representing each row of data.
221-
"""
222-
223-
json_read_params = {
224-
"lines": True,
225-
"chunksize": chunk_size,
226-
"encoding": file_encoding,
227-
"convert_dates": False,
228-
}
229-
230-
for chunk in pd.read_json(path, **json_read_params): # type: ignore # ignoring how args are passed
231-
for record in chunk.to_dict(orient="records"):
232-
yield record
116+
raise ValueError(
117+
f"The IO/Error occured while verifying binary data. Tmp file {tmp_file} doesn't exist."
118+
)
233119

234120
def _read_with_chunks(
235-
self,
236-
path: str,
237-
file_encoding: str,
238-
chunk_size: int = 100,
121+
self, path: str, file_encoding: str, chunk_size: int = 100
239122
) -> Iterable[Mapping[str, Any]]:
240123
"""
241124
Reads data from a file in chunks and yields each row as a dictionary.
@@ -249,23 +132,46 @@ def _read_with_chunks(
249132
Mapping[str, Any]: A dictionary representing each row of data.
250133
251134
Raises:
252-
ValueError: If an error occurs while reading the data from the file.
135+
ValueError: If an IO/Error occurs while reading the temporary data.
253136
"""
254137

255138
try:
256-
if self.file_type == FileTypes.CSV.value:
257-
yield from self._read_csv(path, file_encoding, chunk_size)
258-
259-
if self.file_type == FileTypes.JSONL.value:
260-
yield from self._read_json_lines(path, file_encoding, chunk_size)
261-
139+
with open(path, "r", encoding=file_encoding) as data:
140+
chunks = pd.read_csv(
141+
data, chunksize=chunk_size, iterator=True, dialect="unix", dtype=object
142+
)
143+
for chunk in chunks:
144+
chunk = chunk.replace({nan: None}).to_dict(orient="records")
145+
for row in chunk:
146+
yield row
262147
except pd.errors.EmptyDataError as e:
263-
message = "ResponseToFileExtractor._read_with_chunks(): Empty data received."
264-
self.logger.info(f"{message} {e}")
148+
self.logger.info(f"Empty data received. {e}")
265149
yield from []
266150
except IOError as ioe:
267-
message = "ResponseToFileExtractor._read_with_chunks(): The IO/Error occured while reading the data from file."
268-
raise ValueError(f"{message} Called: {path}", ioe)
151+
raise ValueError(f"The IO/Error occured while reading tmp data. Called: {path}", ioe)
269152
finally:
270153
# remove binary tmp file, after data is read
271154
os.remove(path)
155+
156+
def extract_records(
157+
self, response: Optional[requests.Response] = None
158+
) -> Iterable[Mapping[str, Any]]:
159+
"""
160+
Extracts records from the given response by:
161+
1) Saving the result to a tmp file
162+
2) Reading from saved file by chunks to avoid OOM
163+
164+
Args:
165+
response (Optional[requests.Response]): The response object containing the data. Defaults to None.
166+
167+
Yields:
168+
Iterable[Mapping[str, Any]]: An iterable of mappings representing the extracted records.
169+
170+
Returns:
171+
None
172+
"""
173+
if response:
174+
file_path, encoding = self._save_to_file(response)
175+
yield from self._read_with_chunks(file_path, encoding)
176+
else:
177+
yield from []

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -702,10 +702,6 @@ class DpathExtractor(BaseModel):
702702

703703
class ResponseToFileExtractor(BaseModel):
704704
type: Literal["ResponseToFileExtractor"]
705-
file_type: Optional[str] = Field(
706-
"csv",
707-
title="The file type in which the response data is storred. Supported types are [csv, jsonl].",
708-
)
709705
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
710706

711707

0 commit comments

Comments
 (0)