diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 4f38af76b..8e8acc2b3 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -3630,6 +3630,9 @@ definitions: delimiter: type: string default: "," + set_empty_cell_to_none: + type: boolean + default: false AsyncJobStatusMap: description: Matches the api job status to Async Job Status. type: object diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 2fc26c43a..60a1cdf3f 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -103,6 +103,7 @@ class CsvParser(Parser): # TODO: migrate implementation to re-use file-base classes encoding: Optional[str] = "utf-8" delimiter: Optional[str] = "," + set_empty_cell_to_none: Optional[bool] = False def _get_delimiter(self) -> Optional[str]: """ @@ -121,6 +122,8 @@ def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE: text_data = TextIOWrapper(data, encoding=self.encoding) # type: ignore reader = csv.DictReader(text_data, delimiter=self._get_delimiter() or ",") for row in reader: + if self.set_empty_cell_to_none: + row = {k: (None if v == "" else v) for k, v in row.items()} yield row diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 48260645f..3f676f88e 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1383,6 +1383,7 @@ class CsvDecoder(BaseModel): type: Literal["CsvDecoder"] encoding: Optional[str] = "utf-8" delimiter: Optional[str] = "," + set_empty_cell_to_none: Optional[bool] = False class AsyncJobStatusMap(BaseModel): diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index ea720ae84..288151153 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2646,7 +2646,11 @@ def _get_parser(model: BaseModel, config: Config) -> Parser: elif isinstance(model, JsonlDecoderModel): return JsonLineParser() elif isinstance(model, CsvDecoderModel): - return CsvParser(encoding=model.encoding, delimiter=model.delimiter) + return CsvParser( + encoding=model.encoding, + delimiter=model.delimiter, + set_empty_cell_to_none=model.set_empty_cell_to_none, + ) elif isinstance(model, GzipDecoderModel): return GzipParser( inner_parser=ModelToComponentFactory._get_parser(model.decoder, config) diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index cd0a562ae..b1ba8f9d5 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -41,16 +41,24 @@ def compress_with_gzip(data: str, encoding: str = "utf-8"): def generate_csv( - encoding: str = "utf-8", delimiter: str = ",", should_compress: bool = False + encoding: str = "utf-8", + delimiter: str = ",", + should_compress: bool = False, + add_empty_strings: bool = False, ) -> bytes: data = [ {"id": "1", "name": "John", "age": "28"}, {"id": "2", "name": "Alice", "age": "34"}, {"id": "3", "name": "Bob", "age": "25"}, ] + fieldnames = ["id", "name", "age"] + if add_empty_strings: + for row in data: + row["gender"] = "" + fieldnames.append("gender") output = StringIO() - writer = csv.DictWriter(output, fieldnames=["id", "name", "age"], delimiter=delimiter) + writer = csv.DictWriter(output, fieldnames=fieldnames, delimiter=delimiter) writer.writeheader() for row in data: writer.writerow(row) @@ -258,6 +266,30 @@ def test_composite_raw_decoder_csv_parser_values(requests_mock, encoding: str, d assert parsed_records == expected_data +@pytest.mark.parametrize("set_empty_cell_to_none", [True, False]) +def test_composite_raw_decoder_parse_empty_strings(requests_mock, set_empty_cell_to_none: bool): + requests_mock.register_uri( + "GET", + "https://airbyte.io/", + content=generate_csv(should_compress=False, add_empty_strings=True), + ) + response = requests.get("https://airbyte.io/", stream=True) + + parser = CsvParser(set_empty_cell_to_none=set_empty_cell_to_none) + composite_raw_decoder = CompositeRawDecoder(parser=parser) + + expected_data = [ + {"id": "1", "name": "John", "age": "28"}, + {"id": "2", "name": "Alice", "age": "34"}, + {"id": "3", "name": "Bob", "age": "25"}, + ] + for expected_record in expected_data: + expected_record["gender"] = None if set_empty_cell_to_none else "" + + parsed_records = list(composite_raw_decoder.decode(response)) + assert parsed_records == expected_data + + class TestServer(BaseHTTPRequestHandler): __test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name