Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
if self.is_stream_response():
# urllib mentions that some interfaces don't play nice with auto_close [here](https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content)
# We have indeed observed some issues with CSV parsing. Hence, we will manage the closing of the file ourselves until we find a better solution.
response.raw.auto_close = False
yield from self.parser.parse(data=response.raw) # type: ignore[arg-type]
response.raw.close()
else:
yield from self.parser.parse(data=io.BytesIO(response.content))
42 changes: 42 additions & 0 deletions unit_tests/sources/declarative/decoders/test_composite_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
import csv
import gzip
import json
import socket
from http.server import BaseHTTPRequestHandler, HTTPServer
from io import BytesIO, StringIO
from threading import Thread
from unittest.mock import patch

import pytest
Expand All @@ -20,6 +23,12 @@
from airbyte_cdk.utils import AirbyteTracedException


def find_available_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("localhost", 0))
return s.getsockname()[1] # type: ignore # this should return a int


def compress_with_gzip(data: str, encoding: str = "utf-8"):
"""
Compress the data using Gzip.
Expand Down Expand Up @@ -202,6 +211,39 @@ def test_composite_raw_decoder_csv_parser_values(requests_mock, encoding: str, d
assert parsed_records == expected_data


class TestServer(BaseHTTPRequestHandler):
def do_GET(self) -> None:
self.send_response(200)
self.end_headers()
self.wfile.write(bytes("col1,col2\nval1,val2", "utf-8"))


def test_composite_raw_decoder_csv_parser_without_mocked_response():
"""
This test reproduce a `ValueError: I/O operation on closed file` error we had with CSV parsing. We could not catch this with other tests because the closing of the mocked response from requests_mock was not the same as the one in requests.

We first identified this issue while working with the sample defined https://people.sc.fsu.edu/~jburkardt/data/csv/addresses.csv.
This should be reproducible by having the test server return the `self.wfile.write` statement as a comment below but it does not. However, it wasn't reproducible.

Currently we use `self.wfile.write(bytes("col1,col2\nval1,val2", "utf-8"))` to reproduce which we know is not a valid csv as it does not end with a newline character. However, this is the only we were able to reproduce locally.
"""
# self.wfile.write(bytes('John,Doe,120 jefferson st.,Riverside, NJ, 08075\nJack,McGinnis,220 hobo Av.,Phila, PA,09119\n"John ""Da Man""",Repici,120 Jefferson St.,Riverside, NJ,08075\nStephen,Tyler,"7452 Terrace ""At the Plaza"" road",SomeTown,SD, 91234\n,Blankman,,SomeTown, SD, 00298\n"Joan ""the bone"", Anne",Jet,"9th, at Terrace plc",Desert City,CO,00123\n', "utf-8"))

# start server
port = find_available_port()
httpd = HTTPServer(("localhost", port), TestServer)
thread = Thread(target=httpd.serve_forever, args=())
thread.start()
try:
response = requests.get(f"http://localhost:{port}", stream=True)
result = list(CompositeRawDecoder(parser=CsvParser()).decode(response))

assert len(result) == 1
finally:
httpd.shutdown() # release port and kill the thread
thread.join(timeout=5) # ensure thread is cleaned up


def test_given_response_already_consumed_when_decode_then_no_data_is_returned(requests_mock):
requests_mock.register_uri(
"GET", "https://airbyte.io/", content=json.dumps({"test": "test"}).encode()
Expand Down
Loading