Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3630,6 +3630,9 @@ definitions:
delimiter:
type: string
default: ","
set_empty_cell_to_none:
type: boolean
default: false
AsyncJobStatusMap:
description: Matches the api job status to Async Job Status.
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class CsvParser(Parser):
# TODO: migrate implementation to re-use file-base classes
encoding: Optional[str] = "utf-8"
delimiter: Optional[str] = ","
set_empty_cell_to_none: Optional[bool] = False

def _get_delimiter(self) -> Optional[str]:
"""
Expand All @@ -121,6 +122,8 @@ def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
text_data = TextIOWrapper(data, encoding=self.encoding) # type: ignore
reader = csv.DictReader(text_data, delimiter=self._get_delimiter() or ",")
for row in reader:
if self.set_empty_cell_to_none:
row = {k: (None if v == "" else v) for k, v in row.items()}
yield row


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1383,6 +1383,7 @@ class CsvDecoder(BaseModel):
type: Literal["CsvDecoder"]
encoding: Optional[str] = "utf-8"
delimiter: Optional[str] = ","
set_empty_cell_to_none: Optional[bool] = False


class AsyncJobStatusMap(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2646,7 +2646,11 @@ def _get_parser(model: BaseModel, config: Config) -> Parser:
elif isinstance(model, JsonlDecoderModel):
return JsonLineParser()
elif isinstance(model, CsvDecoderModel):
return CsvParser(encoding=model.encoding, delimiter=model.delimiter)
return CsvParser(
encoding=model.encoding,
delimiter=model.delimiter,
set_empty_cell_to_none=model.set_empty_cell_to_none,
)
elif isinstance(model, GzipDecoderModel):
return GzipParser(
inner_parser=ModelToComponentFactory._get_parser(model.decoder, config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,19 @@ def compress_with_gzip(data: str, encoding: str = "utf-8"):


def generate_csv(
encoding: str = "utf-8", delimiter: str = ",", should_compress: bool = False
encoding: str = "utf-8",
delimiter: str = ",",
should_compress: bool = False,
add_empty_strings: bool = False,
) -> bytes:
data = [
{"id": "1", "name": "John", "age": "28"},
{"id": "2", "name": "Alice", "age": "34"},
{"id": "3", "name": "Bob", "age": "25"},
]
if add_empty_strings:
for row in data:
row["gender"] = ""

output = StringIO()
writer = csv.DictWriter(output, fieldnames=["id", "name", "age"], delimiter=delimiter)
Expand Down Expand Up @@ -258,6 +264,30 @@ def test_composite_raw_decoder_csv_parser_values(requests_mock, encoding: str, d
assert parsed_records == expected_data


@pytest.mark.parametrize("set_empty_cell_to_none", [True, False])
def test_composite_raw_decoder_parse_empty_strings(requests_mock, set_empty_cell_to_none: bool):
requests_mock.register_uri(
"GET",
"https://airbyte.io/",
content=generate_csv(should_compress=False, add_empty_strings=True),
)
response = requests.get("https://airbyte.io/", stream=True)

parser = CsvParser(set_empty_cell_to_none=set_empty_cell_to_none)
composite_raw_decoder = CompositeRawDecoder(parser=parser)

expected_data = [
{"id": "1", "name": "John", "age": "28"},
{"id": "2", "name": "Alice", "age": "34"},
{"id": "3", "name": "Bob", "age": "25"},
]
for expected_record in expected_data:
expected_recordp["gender"] = None if set_empty_cell_to_none else ""

parsed_records = list(composite_raw_decoder.decode(response))
assert parsed_records == expected_data


class TestServer(BaseHTTPRequestHandler):
__test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name

Expand Down
Loading