Skip to content

Commit 2a5924f

Browse files
feat(csv-parser): update option to provide an array of values that should be set to None in the csv file (#581)
1 parent ad1184f commit 2a5924f

File tree

5 files changed

+26
-18
lines changed

5 files changed

+26
-18
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3630,9 +3630,10 @@ definitions:
36303630
delimiter:
36313631
type: string
36323632
default: ","
3633-
set_empty_cell_to_none:
3634-
type: boolean
3635-
default: false
3633+
set_values_to_none:
3634+
type: array
3635+
items:
3636+
type: string
36363637
AsyncJobStatusMap:
36373638
description: Matches the api job status to Async Job Status.
36383639
type: object

airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import logging
1010
from dataclasses import dataclass
1111
from io import BufferedIOBase, TextIOWrapper
12-
from typing import Any, Optional
12+
from typing import Any, List, Optional
1313

1414
import orjson
1515
import requests
@@ -103,7 +103,7 @@ class CsvParser(Parser):
103103
# TODO: migrate implementation to re-use file-base classes
104104
encoding: Optional[str] = "utf-8"
105105
delimiter: Optional[str] = ","
106-
set_empty_cell_to_none: Optional[bool] = False
106+
set_values_to_none: Optional[List[str]] = None
107107

108108
def _get_delimiter(self) -> Optional[str]:
109109
"""
@@ -122,8 +122,8 @@ def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
122122
text_data = TextIOWrapper(data, encoding=self.encoding) # type: ignore
123123
reader = csv.DictReader(text_data, delimiter=self._get_delimiter() or ",")
124124
for row in reader:
125-
if self.set_empty_cell_to_none:
126-
row = {k: (None if v == "" else v) for k, v in row.items()}
125+
if self.set_values_to_none:
126+
row = {k: (None if v in self.set_values_to_none else v) for k, v in row.items()}
127127
yield row
128128

129129

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1383,7 +1383,7 @@ class CsvDecoder(BaseModel):
13831383
type: Literal["CsvDecoder"]
13841384
encoding: Optional[str] = "utf-8"
13851385
delimiter: Optional[str] = ","
1386-
set_empty_cell_to_none: Optional[bool] = False
1386+
set_values_to_none: Optional[List[str]] = None
13871387

13881388

13891389
class AsyncJobStatusMap(BaseModel):

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2651,7 +2651,7 @@ def _get_parser(model: BaseModel, config: Config) -> Parser:
26512651
return CsvParser(
26522652
encoding=model.encoding,
26532653
delimiter=model.delimiter,
2654-
set_empty_cell_to_none=model.set_empty_cell_to_none,
2654+
set_values_to_none=model.set_values_to_none,
26552655
)
26562656
elif isinstance(model, GzipDecoderModel):
26572657
return GzipParser(

unit_tests/sources/declarative/decoders/test_composite_decoder.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from http.server import BaseHTTPRequestHandler, HTTPServer
99
from io import BytesIO, StringIO
1010
from threading import Thread
11-
from typing import ClassVar, Iterable
11+
from typing import ClassVar, Iterable, List
1212
from unittest.mock import Mock, patch
1313

1414
import pytest
@@ -44,17 +44,18 @@ def generate_csv(
4444
encoding: str = "utf-8",
4545
delimiter: str = ",",
4646
should_compress: bool = False,
47-
add_empty_strings: bool = False,
47+
add_extra_column: bool = False,
48+
extra_column_value: str = "",
4849
) -> bytes:
4950
data = [
5051
{"id": "1", "name": "John", "age": "28"},
5152
{"id": "2", "name": "Alice", "age": "34"},
5253
{"id": "3", "name": "Bob", "age": "25"},
5354
]
5455
fieldnames = ["id", "name", "age"]
55-
if add_empty_strings:
56+
if add_extra_column:
5657
for row in data:
57-
row["gender"] = ""
58+
row["gender"] = extra_column_value
5859
fieldnames.append("gender")
5960

6061
output = StringIO()
@@ -266,16 +267,22 @@ def test_composite_raw_decoder_csv_parser_values(requests_mock, encoding: str, d
266267
assert parsed_records == expected_data
267268

268269

269-
@pytest.mark.parametrize("set_empty_cell_to_none", [True, False])
270-
def test_composite_raw_decoder_parse_empty_strings(requests_mock, set_empty_cell_to_none: bool):
270+
@pytest.mark.parametrize("set_values_to_none", [None, [""], ["--"]])
271+
def test_composite_raw_decoder_parse_empty_strings(
272+
requests_mock, set_values_to_none: List[str] | None
273+
):
271274
requests_mock.register_uri(
272275
"GET",
273276
"https://airbyte.io/",
274-
content=generate_csv(should_compress=False, add_empty_strings=True),
277+
content=generate_csv(
278+
should_compress=False,
279+
add_extra_column=True,
280+
extra_column_value=set_values_to_none[0] if set_values_to_none else "random_value",
281+
),
275282
)
276283
response = requests.get("https://airbyte.io/", stream=True)
277284

278-
parser = CsvParser(set_empty_cell_to_none=set_empty_cell_to_none)
285+
parser = CsvParser(set_values_to_none=set_values_to_none)
279286
composite_raw_decoder = CompositeRawDecoder(parser=parser)
280287

281288
expected_data = [
@@ -284,7 +291,7 @@ def test_composite_raw_decoder_parse_empty_strings(requests_mock, set_empty_cell
284291
{"id": "3", "name": "Bob", "age": "25"},
285292
]
286293
for expected_record in expected_data:
287-
expected_record["gender"] = None if set_empty_cell_to_none else ""
294+
expected_record["gender"] = None if set_values_to_none else "random_value"
288295

289296
parsed_records = list(composite_raw_decoder.decode(response))
290297
assert parsed_records == expected_data

0 commit comments

Comments
 (0)