Skip to content

Commit f2ff9fa

Browse files
committed
add option to set empty cells to None
1 parent 5f98bd2 commit f2ff9fa

File tree

5 files changed

+43
-2
lines changed

5 files changed

+43
-2
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3630,6 +3630,9 @@ definitions:
36303630
delimiter:
36313631
type: string
36323632
default: ","
3633+
set_empty_cell_to_none:
3634+
type: boolean
3635+
default: false
36333636
AsyncJobStatusMap:
36343637
description: Matches the api job status to Async Job Status.
36353638
type: object

airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ class CsvParser(Parser):
103103
# TODO: migrate implementation to re-use file-base classes
104104
encoding: Optional[str] = "utf-8"
105105
delimiter: Optional[str] = ","
106+
set_empty_cell_to_none: Optional[bool] = False
106107

107108
def _get_delimiter(self) -> Optional[str]:
108109
"""
@@ -121,6 +122,8 @@ def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
121122
text_data = TextIOWrapper(data, encoding=self.encoding) # type: ignore
122123
reader = csv.DictReader(text_data, delimiter=self._get_delimiter() or ",")
123124
for row in reader:
125+
if self.set_empty_cell_to_none:
126+
row = {k: (None if v == "" else v) for k, v in row.items()}
124127
yield row
125128

126129

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1383,6 +1383,7 @@ class CsvDecoder(BaseModel):
13831383
type: Literal["CsvDecoder"]
13841384
encoding: Optional[str] = "utf-8"
13851385
delimiter: Optional[str] = ","
1386+
set_empty_cell_to_none: Optional[bool] = False
13861387

13871388

13881389
class AsyncJobStatusMap(BaseModel):

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2646,7 +2646,11 @@ def _get_parser(model: BaseModel, config: Config) -> Parser:
26462646
elif isinstance(model, JsonlDecoderModel):
26472647
return JsonLineParser()
26482648
elif isinstance(model, CsvDecoderModel):
2649-
return CsvParser(encoding=model.encoding, delimiter=model.delimiter)
2649+
return CsvParser(
2650+
encoding=model.encoding,
2651+
delimiter=model.delimiter,
2652+
set_empty_cell_to_none=model.set_empty_cell_to_none,
2653+
)
26502654
elif isinstance(model, GzipDecoderModel):
26512655
return GzipParser(
26522656
inner_parser=ModelToComponentFactory._get_parser(model.decoder, config)

unit_tests/sources/declarative/decoders/test_composite_decoder.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,19 @@ def compress_with_gzip(data: str, encoding: str = "utf-8"):
4141

4242

4343
def generate_csv(
44-
encoding: str = "utf-8", delimiter: str = ",", should_compress: bool = False
44+
encoding: str = "utf-8",
45+
delimiter: str = ",",
46+
should_compress: bool = False,
47+
add_empty_strings: bool = False,
4548
) -> bytes:
4649
data = [
4750
{"id": "1", "name": "John", "age": "28"},
4851
{"id": "2", "name": "Alice", "age": "34"},
4952
{"id": "3", "name": "Bob", "age": "25"},
5053
]
54+
if add_empty_strings:
55+
for row in data:
56+
row["gender"] = ""
5157

5258
output = StringIO()
5359
writer = csv.DictWriter(output, fieldnames=["id", "name", "age"], delimiter=delimiter)
@@ -258,6 +264,30 @@ def test_composite_raw_decoder_csv_parser_values(requests_mock, encoding: str, d
258264
assert parsed_records == expected_data
259265

260266

267+
@pytest.mark.parametrize("set_empty_cell_to_none", [True, False])
268+
def test_composite_raw_decoder_parse_empty_strings(requests_mock, set_empty_cell_to_none: bool):
269+
requests_mock.register_uri(
270+
"GET",
271+
"https://airbyte.io/",
272+
content=generate_csv(should_compress=False, add_empty_strings=True),
273+
)
274+
response = requests.get("https://airbyte.io/", stream=True)
275+
276+
parser = CsvParser(set_empty_cell_to_none=set_empty_cell_to_none)
277+
composite_raw_decoder = CompositeRawDecoder(parser=parser)
278+
279+
expected_data = [
280+
{"id": "1", "name": "John", "age": "28"},
281+
{"id": "2", "name": "Alice", "age": "34"},
282+
{"id": "3", "name": "Bob", "age": "25"},
283+
]
284+
for expected_record in expected_data:
285+
expected_recordp["gender"] = None if set_empty_cell_to_none else ""
286+
287+
parsed_records = list(composite_raw_decoder.decode(response))
288+
assert parsed_records == expected_data
289+
290+
261291
class TestServer(BaseHTTPRequestHandler):
262292
__test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name
263293

0 commit comments

Comments
 (0)