Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
if self.is_stream_response():
response.raw.auto_close = False
yield from self.parser.parse(data=response.raw) # type: ignore[arg-type]
response.raw.close()
else:
yield from self.parser.parse(data=io.BytesIO(response.content))
26 changes: 26 additions & 0 deletions unit_tests/sources/declarative/decoders/test_composite_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import csv
import gzip
import json
from http.server import BaseHTTPRequestHandler, HTTPServer
from io import BytesIO, StringIO
from threading import Thread
from unittest.mock import patch

import pytest
Expand Down Expand Up @@ -202,6 +204,30 @@ def test_composite_raw_decoder_csv_parser_values(requests_mock, encoding: str, d
assert parsed_records == expected_data


class TestServer(BaseHTTPRequestHandler):
def do_GET(self) -> None:
self.send_response(200)
self.end_headers()
self.wfile.write(bytes("col1,col2\nval1,val2", "utf-8"))


def test_composite_raw_decoder_csv_parser_without_mocked_response():
"""
This test reproduce a `ValueError: I/O operation on closed file` error we had with CSV parsing. We could not catch this with other tests because the closing of the mocked response from requests_mock was not the same as the one in requests.
"""
# start server
httpd = HTTPServer(("localhost", 8080), TestServer)
thread = Thread(target=httpd.serve_forever, args=())
thread.start()

response = requests.get("http://localhost:8080", stream=True)
result = list(CompositeRawDecoder(parser=CsvParser()).decode(response))

assert len(result) == 1
httpd.shutdown() # release port
thread.join()


def test_given_response_already_consumed_when_decode_then_no_data_is_returned(requests_mock):
requests_mock.register_uri(
"GET", "https://airbyte.io/", content=json.dumps({"test": "test"}).encode()
Expand Down
Loading