Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
if self.is_stream_response():
response.raw.auto_close = False
yield from self.parser.parse(data=response.raw) # type: ignore[arg-type]
response.raw.close()
else:
yield from self.parser.parse(data=io.BytesIO(response.content))
24 changes: 24 additions & 0 deletions unit_tests/sources/declarative/decoders/test_composite_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import csv
import gzip
import json
from http.server import BaseHTTPRequestHandler, HTTPServer
from io import BytesIO, StringIO
from threading import Thread
from unittest.mock import patch

import pytest
Expand Down Expand Up @@ -202,6 +204,28 @@ def test_composite_raw_decoder_csv_parser_values(requests_mock, encoding: str, d
assert parsed_records == expected_data


class TestServer(BaseHTTPRequestHandler):

def do_GET(self) -> None:
self.send_response(200)
self.end_headers()
self.wfile.write(bytes("col1,col2\nval1,val2", 'utf-8'))


def test_composite_raw_decoder_csv_parser():
# start server
httpd = HTTPServer(('localhost', 8080), TestServer)
thread = Thread(target=httpd.serve_forever, args = ())
thread.start()

response = requests.get("http://localhost:8080", stream=True)
result = list(CompositeRawDecoder(parser=CsvParser()).decode(response))

assert len(result) == 1
httpd.shutdown() # release port
thread.join()


def test_given_response_already_consumed_when_decode_then_no_data_is_returned(requests_mock):
requests_mock.register_uri(
"GET", "https://airbyte.io/", content=json.dumps({"test": "test"}).encode()
Expand Down
Loading