Skip to content

Commit 2120a18

Browse files
author
Oleksandr Bazarnov
committed
add
1 parent 0edebbe commit 2120a18

File tree

4 files changed

+154
-51
lines changed

4 files changed

+154
-51
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1678,6 +1678,10 @@ definitions:
16781678
type:
16791679
type: string
16801680
enum: [ResponseToFileExtractor]
1681+
file_type:
1682+
title: The file type in which the response data is storred. Supported types are [csv, jsonl].
1683+
type: string
1684+
default: csv
16811685
$parameters:
16821686
type: object
16831687
additionalProperties: true

airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py

Lines changed: 143 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import zlib
88
from contextlib import closing
99
from dataclasses import InitVar, dataclass
10+
from enum import Enum
1011
from typing import Any, Dict, Iterable, Mapping, Optional, Tuple
1112

1213
import pandas as pd
@@ -20,28 +21,65 @@
2021
DOWNLOAD_CHUNK_SIZE: int = 1024 * 10
2122

2223

24+
class FileTypes(Enum):
25+
CSV = "csv"
26+
JSONL = "jsonl"
27+
28+
2329
@dataclass
2430
class ResponseToFileExtractor(RecordExtractor):
2531
"""
26-
This class is used when having very big HTTP responses (usually streamed) which would require too much memory so we use disk space as
27-
a tradeoff.
32+
This class is used when having very big HTTP responses (usually streamed),
33+
which would require too much memory so we use disk space as a tradeoff.
34+
35+
The extractor does the following:
36+
1) Save the response to a temporary file
37+
2) Read from the temporary file by chunks to avoid OOM
38+
3) Remove the temporary file after reading
39+
4) Return the records
40+
5) If the response is not compressed, it will be filtered for null bytes
41+
6) If the response is compressed, it will be decompressed
42+
7) If the response is compressed and contains null bytes, it will be filtered for null bytes
2843
29-
Eventually, we want to support multiple file type by re-using the file based CDK parsers if possible. However, the lift is too high for
30-
a first iteration so we will only support CSV parsing using pandas as salesforce and sendgrid were doing.
3144
"""
3245

3346
parameters: InitVar[Mapping[str, Any]]
47+
file_type: Optional[str] = "csv"
3448

3549
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
3650
self.logger = logging.getLogger("airbyte")
3751

52+
def extract_records(
53+
self, response: Optional[requests.Response] = None
54+
) -> Iterable[Mapping[str, Any]]:
55+
"""
56+
Extracts records from the given response by:
57+
1) Saving the result to a tmp file
58+
2) Reading from saved file by chunks to avoid OOM
59+
60+
Args:
61+
response (Optional[requests.Response]): The response object containing the data. Defaults to None.
62+
63+
Yields:
64+
Iterable[Mapping[str, Any]]: An iterable of mappings representing the extracted records.
65+
66+
Returns:
67+
None
68+
"""
69+
if response:
70+
file_path, encoding = self._save_to_file(response)
71+
yield from self._read_with_chunks(file_path, encoding)
72+
else:
73+
yield from []
74+
3875
def _get_response_encoding(self, headers: Dict[str, Any]) -> str:
3976
"""
4077
Get the encoding of the response based on the provided headers. This method is heavily inspired by the requests library
4178
implementation.
4279
4380
Args:
4481
headers (Dict[str, Any]): The headers of the response.
82+
4583
Returns:
4684
str: The encoding of the response.
4785
"""
@@ -73,11 +111,28 @@ def _filter_null_bytes(self, b: bytes) -> bytes:
73111

74112
res = b.replace(b"\x00", b"")
75113
if len(res) < len(b):
76-
self.logger.warning(
77-
"Filter 'null' bytes from string, size reduced %d -> %d chars", len(b), len(res)
78-
)
114+
message = "ResponseToFileExtractor._filter_null_bytes(): Filter 'null' bytes from string, size reduced %d -> %d chars"
115+
self.logger.warning(message, len(b), len(res))
79116
return res
80117

118+
def _get_file_path(self) -> str:
119+
"""
120+
Get a temporary file path with a unique name.
121+
122+
Returns:
123+
str: The path to the temporary file.
124+
125+
Raises:
126+
ValueError: If the file type is not supported.
127+
"""
128+
129+
if self.file_type not in [file_type.value for file_type in FileTypes]:
130+
raise ValueError(
131+
f"ResponseToFileExtractor._get_file_path(): File type {self.file_type} is not supported.",
132+
)
133+
134+
return str(uuid.uuid4()) + "." + self.file_type
135+
81136
def _save_to_file(self, response: requests.Response) -> Tuple[str, str]:
82137
"""
83138
Saves the binary data from the given response to a temporary file and returns the filepath and response encoding.
@@ -95,8 +150,9 @@ def _save_to_file(self, response: requests.Response) -> Tuple[str, str]:
95150
decompressor = zlib.decompressobj(zlib.MAX_WBITS | 32)
96151
needs_decompression = True # we will assume at first that the response is compressed and change the flag if not
97152

98-
tmp_file = str(uuid.uuid4())
99-
with closing(response) as response, open(tmp_file, "wb") as data_file:
153+
file_path = self._get_file_path()
154+
# save binary data to tmp file
155+
with closing(response) as response, open(file_path, "wb") as data_file:
100156
response_encoding = self._get_response_encoding(dict(response.headers or {}))
101157
for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE):
102158
try:
@@ -110,15 +166,76 @@ def _save_to_file(self, response: requests.Response) -> Tuple[str, str]:
110166
needs_decompression = False
111167

112168
# check the file exists
113-
if os.path.isfile(tmp_file):
114-
return tmp_file, response_encoding
169+
if os.path.isfile(file_path):
170+
return file_path, response_encoding
115171
else:
116-
raise ValueError(
117-
f"The IO/Error occured while verifying binary data. Tmp file {tmp_file} doesn't exist."
118-
)
172+
message = "ResponseToFileExtractor._save_to_file(): The IO/Error occured while verifying binary data."
173+
raise ValueError(f"{message} Tmp file {file_path} doesn't exist.")
174+
175+
def _read_csv(
176+
self,
177+
path: str,
178+
file_encoding: str,
179+
chunk_size: int = 100,
180+
) -> Iterable[Mapping[str, Any]]:
181+
"""
182+
Reads a CSV file and yields each row as a dictionary.
183+
184+
Args:
185+
path (str): The path to the CSV file to be read.
186+
file_encoding (str): The encoding of the file.
187+
188+
Yields:
189+
Mapping[str, Any]: A dictionary representing each row of data.
190+
"""
191+
192+
csv_read_params = {
193+
"chunksize": chunk_size,
194+
"iterator": True,
195+
"dialect": "unix",
196+
"dtype": object,
197+
"encoding": file_encoding,
198+
}
199+
200+
for chunk in pd.read_csv(path, **csv_read_params):
201+
# replace NaN with None
202+
chunk = chunk.replace({nan: None}).to_dict(orient="records")
203+
for record in chunk:
204+
yield record
205+
206+
def _read_json_lines(
207+
self,
208+
path: str,
209+
file_encoding: str,
210+
chunk_size: int = 100,
211+
) -> Iterable[Mapping[str, Any]]:
212+
"""
213+
Reads a JSON file and yields each row as a dictionary.
214+
215+
Args:
216+
path (str): The path to the JSON file to be read.
217+
file_encoding (str): The encoding of the file.
218+
219+
Yields:
220+
Mapping[str, Any]: A dictionary representing each row of data.
221+
"""
222+
223+
json_read_params = {
224+
"lines": True,
225+
"chunksize": chunk_size,
226+
"encoding": file_encoding,
227+
"convert_dates": False,
228+
}
229+
230+
for chunk in pd.read_json(path, **json_read_params):
231+
for record in chunk.to_dict(orient="records"):
232+
yield record
119233

120234
def _read_with_chunks(
121-
self, path: str, file_encoding: str, chunk_size: int = 100
235+
self,
236+
path: str,
237+
file_encoding: str,
238+
chunk_size: int = 100,
122239
) -> Iterable[Mapping[str, Any]]:
123240
"""
124241
Reads data from a file in chunks and yields each row as a dictionary.
@@ -132,47 +249,23 @@ def _read_with_chunks(
132249
Mapping[str, Any]: A dictionary representing each row of data.
133250
134251
Raises:
135-
ValueError: If an IO/Error occurs while reading the temporary data.
252+
ValueError: If an error occurs while reading the data from the file.
136253
"""
137254

138255
try:
139-
# TODO: Add support for other file types, like `json`, with `pd.read_json()`
140-
with open(path, "r", encoding=file_encoding) as data:
141-
chunks = pd.read_csv(
142-
data, chunksize=chunk_size, iterator=True, dialect="unix", dtype=object
143-
)
144-
for chunk in chunks:
145-
chunk = chunk.replace({nan: None}).to_dict(orient="records")
146-
for row in chunk:
147-
yield row
256+
if self.file_type == FileTypes.CSV.value:
257+
yield from self._read_csv(path, file_encoding, chunk_size)
258+
259+
if self.file_type == FileTypes.JSONL.value:
260+
yield from self._read_json_lines(path, file_encoding, chunk_size)
261+
148262
except pd.errors.EmptyDataError as e:
149-
self.logger.info(f"Empty data received. {e}")
263+
message = "ResponseToFileExtractor._read_with_chunks(): Empty data received."
264+
self.logger.info(f"{message} {e}")
150265
yield from []
151266
except IOError as ioe:
152-
raise ValueError(f"The IO/Error occured while reading tmp data. Called: {path}", ioe)
267+
message = "ResponseToFileExtractor._read_with_chunks(): The IO/Error occured while reading the data from file."
268+
raise ValueError(f"{message} Called: {path}", ioe)
153269
finally:
154270
# remove binary tmp file, after data is read
155271
os.remove(path)
156-
157-
def extract_records(
158-
self, response: Optional[requests.Response] = None
159-
) -> Iterable[Mapping[str, Any]]:
160-
"""
161-
Extracts records from the given response by:
162-
1) Saving the result to a tmp file
163-
2) Reading from saved file by chunks to avoid OOM
164-
165-
Args:
166-
response (Optional[requests.Response]): The response object containing the data. Defaults to None.
167-
168-
Yields:
169-
Iterable[Mapping[str, Any]]: An iterable of mappings representing the extracted records.
170-
171-
Returns:
172-
None
173-
"""
174-
if response:
175-
file_path, encoding = self._save_to_file(response)
176-
yield from self._read_with_chunks(file_path, encoding)
177-
else:
178-
yield from []

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -702,6 +702,10 @@ class DpathExtractor(BaseModel):
702702

703703
class ResponseToFileExtractor(BaseModel):
704704
type: Literal["ResponseToFileExtractor"]
705+
file_type: Optional[str] = Field(
706+
"csv",
707+
title="The file type in which the response data is storred. Supported types are [csv, jsonl].",
708+
)
705709
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
706710

707711

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1992,7 +1992,9 @@ def create_response_to_file_extractor(
19921992
model: ResponseToFileExtractorModel,
19931993
**kwargs: Any,
19941994
) -> ResponseToFileExtractor:
1995-
return ResponseToFileExtractor(parameters=model.parameters or {})
1995+
return ResponseToFileExtractor(
1996+
parameters=model.parameters or {}, file_type=model.file_type or "csv"
1997+
)
19961998

19971999
@staticmethod
19982000
def create_exponential_backoff_strategy(

0 commit comments

Comments
 (0)