diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index b8e8e3315..93775c1ec 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -151,6 +151,10 @@ def decode( self, response: requests.Response ) -> Generator[MutableMapping[str, Any], None, None]: if self.is_stream_response(): + # urllib mentions that some interfaces don't play nice with auto_close [here](https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content) + # We have indeed observed some issues with CSV parsing. Hence, we will manage the closing of the file ourselves until we find a better solution. + response.raw.auto_close = False yield from self.parser.parse(data=response.raw) # type: ignore[arg-type] + response.raw.close() else: yield from self.parser.parse(data=io.BytesIO(response.content)) diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index 745113925..39e74d8e6 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -4,7 +4,10 @@ import csv import gzip import json +import socket +from http.server import BaseHTTPRequestHandler, HTTPServer from io import BytesIO, StringIO +from threading import Thread from unittest.mock import patch import pytest @@ -20,6 +23,12 @@ from airbyte_cdk.utils import AirbyteTracedException +def find_available_port() -> int: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("localhost", 0)) + return s.getsockname()[1] # type: ignore # this should return a int + + def compress_with_gzip(data: str, encoding: str = "utf-8"): """ Compress the data using Gzip. @@ -202,6 +211,39 @@ def test_composite_raw_decoder_csv_parser_values(requests_mock, encoding: str, d assert parsed_records == expected_data +class TestServer(BaseHTTPRequestHandler): + def do_GET(self) -> None: + self.send_response(200) + self.end_headers() + self.wfile.write(bytes("col1,col2\nval1,val2", "utf-8")) + + +def test_composite_raw_decoder_csv_parser_without_mocked_response(): + """ + This test reproduce a `ValueError: I/O operation on closed file` error we had with CSV parsing. We could not catch this with other tests because the closing of the mocked response from requests_mock was not the same as the one in requests. + + We first identified this issue while working with the sample defined https://people.sc.fsu.edu/~jburkardt/data/csv/addresses.csv. + This should be reproducible by having the test server return the `self.wfile.write` statement as a comment below but it does not. However, it wasn't reproducible. + + Currently we use `self.wfile.write(bytes("col1,col2\nval1,val2", "utf-8"))` to reproduce which we know is not a valid csv as it does not end with a newline character. However, this is the only we were able to reproduce locally. + """ + # self.wfile.write(bytes('John,Doe,120 jefferson st.,Riverside, NJ, 08075\nJack,McGinnis,220 hobo Av.,Phila, PA,09119\n"John ""Da Man""",Repici,120 Jefferson St.,Riverside, NJ,08075\nStephen,Tyler,"7452 Terrace ""At the Plaza"" road",SomeTown,SD, 91234\n,Blankman,,SomeTown, SD, 00298\n"Joan ""the bone"", Anne",Jet,"9th, at Terrace plc",Desert City,CO,00123\n', "utf-8")) + + # start server + port = find_available_port() + httpd = HTTPServer(("localhost", port), TestServer) + thread = Thread(target=httpd.serve_forever, args=()) + thread.start() + try: + response = requests.get(f"http://localhost:{port}", stream=True) + result = list(CompositeRawDecoder(parser=CsvParser()).decode(response)) + + assert len(result) == 1 + finally: + httpd.shutdown() # release port and kill the thread + thread.join(timeout=5) # ensure thread is cleaned up + + def test_given_response_already_consumed_when_decode_then_no_data_is_returned(requests_mock): requests_mock.register_uri( "GET", "https://airbyte.io/", content=json.dumps({"test": "test"}).encode()