Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3630,9 +3630,10 @@ definitions:
delimiter:
type: string
default: ","
set_empty_cell_to_none:
type: boolean
default: false
set_values_to_none:
type: array
items:
type: string
AsyncJobStatusMap:
description: Matches the api job status to Async Job Status.
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import logging
from dataclasses import dataclass
from io import BufferedIOBase, TextIOWrapper
from typing import Any, Optional
from typing import Any, List, Optional

import orjson
import requests
Expand Down Expand Up @@ -103,7 +103,7 @@ class CsvParser(Parser):
# TODO: migrate implementation to re-use file-base classes
encoding: Optional[str] = "utf-8"
delimiter: Optional[str] = ","
set_empty_cell_to_none: Optional[bool] = False
set_values_to_none: Optional[List[str]] = None

def _get_delimiter(self) -> Optional[str]:
"""
Expand All @@ -122,8 +122,8 @@ def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
text_data = TextIOWrapper(data, encoding=self.encoding) # type: ignore
reader = csv.DictReader(text_data, delimiter=self._get_delimiter() or ",")
for row in reader:
if self.set_empty_cell_to_none:
row = {k: (None if v == "" else v) for k, v in row.items()}
if self.set_values_to_none:
row = {k: (None if v in self.set_values_to_none else v) for k, v in row.items()}
yield row


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1383,7 +1383,7 @@ class CsvDecoder(BaseModel):
type: Literal["CsvDecoder"]
encoding: Optional[str] = "utf-8"
delimiter: Optional[str] = ","
set_empty_cell_to_none: Optional[bool] = False
set_values_to_none: Optional[List[str]] = None


class AsyncJobStatusMap(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2651,7 +2651,7 @@ def _get_parser(model: BaseModel, config: Config) -> Parser:
return CsvParser(
encoding=model.encoding,
delimiter=model.delimiter,
set_empty_cell_to_none=model.set_empty_cell_to_none,
set_values_to_none=model.set_values_to_none,
)
elif isinstance(model, GzipDecoderModel):
return GzipParser(
Expand Down
25 changes: 16 additions & 9 deletions unit_tests/sources/declarative/decoders/test_composite_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from http.server import BaseHTTPRequestHandler, HTTPServer
from io import BytesIO, StringIO
from threading import Thread
from typing import ClassVar, Iterable
from typing import ClassVar, Iterable, List
from unittest.mock import Mock, patch

import pytest
Expand Down Expand Up @@ -44,17 +44,18 @@ def generate_csv(
encoding: str = "utf-8",
delimiter: str = ",",
should_compress: bool = False,
add_empty_strings: bool = False,
add_extra_column: bool = False,
extra_column_value: str = "",
) -> bytes:
data = [
{"id": "1", "name": "John", "age": "28"},
{"id": "2", "name": "Alice", "age": "34"},
{"id": "3", "name": "Bob", "age": "25"},
]
fieldnames = ["id", "name", "age"]
if add_empty_strings:
if add_extra_column:
for row in data:
row["gender"] = ""
row["gender"] = extra_column_value
fieldnames.append("gender")

output = StringIO()
Expand Down Expand Up @@ -266,16 +267,22 @@ def test_composite_raw_decoder_csv_parser_values(requests_mock, encoding: str, d
assert parsed_records == expected_data


@pytest.mark.parametrize("set_empty_cell_to_none", [True, False])
def test_composite_raw_decoder_parse_empty_strings(requests_mock, set_empty_cell_to_none: bool):
@pytest.mark.parametrize("set_values_to_none", [None, [""], ["--"]])
def test_composite_raw_decoder_parse_empty_strings(
requests_mock, set_values_to_none: List[str] | None
):
requests_mock.register_uri(
"GET",
"https://airbyte.io/",
content=generate_csv(should_compress=False, add_empty_strings=True),
content=generate_csv(
should_compress=False,
add_extra_column=True,
extra_column_value=set_values_to_none[0] if set_values_to_none else "random_value",
),
)
response = requests.get("https://airbyte.io/", stream=True)

parser = CsvParser(set_empty_cell_to_none=set_empty_cell_to_none)
parser = CsvParser(set_values_to_none=set_values_to_none)
composite_raw_decoder = CompositeRawDecoder(parser=parser)

expected_data = [
Expand All @@ -284,7 +291,7 @@ def test_composite_raw_decoder_parse_empty_strings(requests_mock, set_empty_cell
{"id": "3", "name": "Bob", "age": "25"},
]
for expected_record in expected_data:
expected_record["gender"] = None if set_empty_cell_to_none else ""
expected_record["gender"] = None if set_values_to_none else "random_value"

parsed_records = list(composite_raw_decoder.decode(response))
assert parsed_records == expected_data
Expand Down
Loading