Skip to content

Commit 4adcc52

Browse files
authored
[Core] Support timeout error in requests+aiohttp transports (#43201)
* Support timeout error in requests transport * Requests transport error updates * Test updates * Fix imports * Fix black * aiohttp fixes * Fix tests * Even blacker * Update CHANGELOG.md
1 parent 51176df commit 4adcc52

File tree

9 files changed

+167
-61
lines changed

9 files changed

+167
-61
lines changed

sdk/core/azure-core/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
### Bugs Fixed
2424

2525
- Fixed repeated import attempts of cchardet and chardet when charset_normalizer is used #43092
26+
- Fixed leaked requests and aiohttp exceptions for streamed responses #43200
27+
- Improved granularity of ServiceRequestError and ServiceResponseError exceptions raised in timeout scenarios from the requests and aiohttp transports #43200
2628

2729
### Other Changes
2830

sdk/core/azure-core/azure/core/pipeline/_tools.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,5 @@ def handle_non_stream_rest_response(response: HttpResponse) -> None:
8080
"""
8181
try:
8282
response.read()
83+
finally:
8384
response.close()
84-
except Exception as exc:
85-
response.close()
86-
raise exc

sdk/core/azure-core/azure/core/pipeline/_tools_async.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,5 @@ async def handle_no_stream_rest_response(response: "RestAsyncHttpResponse") -> N
6767
"""
6868
try:
6969
await response.read()
70+
finally:
7071
await response.close()
71-
except Exception as exc:
72-
await response.close()
73-
raise exc

sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ async def __anext__(self):
474474
except aiohttp.client_exceptions.ClientResponseError as err:
475475
raise ServiceResponseError(err, error=err) from err
476476
except asyncio.TimeoutError as err:
477-
raise ServiceResponseError(err, error=err) from err
477+
raise ServiceResponseTimeoutError(err, error=err) from err
478478
except aiohttp.client_exceptions.ClientError as err:
479479
raise ServiceRequestError(err, error=err) from err
480480
except Exception as err:
@@ -571,7 +571,7 @@ async def load_body(self) -> None:
571571
except aiohttp.client_exceptions.ClientResponseError as err:
572572
raise ServiceResponseError(err, error=err) from err
573573
except asyncio.TimeoutError as err:
574-
raise ServiceResponseError(err, error=err) from err
574+
raise ServiceResponseTimeoutError(err, error=err) from err
575575
except aiohttp.client_exceptions.ClientError as err:
576576
raise ServiceRequestError(err, error=err) from err
577577

sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@
4646
from azure.core.configuration import ConnectionConfiguration
4747
from azure.core.exceptions import (
4848
ServiceRequestError,
49+
ServiceRequestTimeoutError,
4950
ServiceResponseError,
51+
ServiceResponseTimeoutError,
5052
IncompleteReadError,
5153
HttpResponseError,
5254
DecodeError,
@@ -85,7 +87,7 @@ def _read_raw_stream(response, chunk_size=1):
8587
except CoreDecodeError as e:
8688
raise DecodeError(e, error=e) from e
8789
except ReadTimeoutError as e:
88-
raise ServiceRequestError(e, error=e) from e
90+
raise ServiceResponseTimeoutError(e, error=e) from e
8991
else:
9092
# Standard file-like object.
9193
while True:
@@ -202,6 +204,14 @@ def __next__(self):
202204
_LOGGER.warning("Unable to stream download.")
203205
internal_response.close()
204206
raise HttpResponseError(err, error=err) from err
207+
except requests.ConnectionError as err:
208+
internal_response.close()
209+
if err.args and isinstance(err.args[0], ReadTimeoutError):
210+
raise ServiceResponseTimeoutError(err, error=err) from err
211+
raise ServiceResponseError(err, error=err) from err
212+
except requests.RequestException as err:
213+
internal_response.close()
214+
raise ServiceResponseError(err, error=err) from err
205215
except Exception as err:
206216
_LOGGER.warning("Unable to stream download.")
207217
internal_response.close()
@@ -384,13 +394,14 @@ def send( # pylint: disable=too-many-statements
384394
"Please report this issue to https://github.com/Azure/azure-sdk-for-python/issues."
385395
) from err
386396
raise
387-
except (
388-
NewConnectionError,
389-
ConnectTimeoutError,
390-
) as err:
397+
except NewConnectionError as err:
391398
error = ServiceRequestError(err, error=err)
399+
except ConnectTimeoutError as err:
400+
error = ServiceRequestTimeoutError(err, error=err)
401+
except requests.exceptions.ConnectTimeout as err:
402+
error = ServiceRequestTimeoutError(err, error=err)
392403
except requests.exceptions.ReadTimeout as err:
393-
error = ServiceResponseError(err, error=err)
404+
error = ServiceResponseTimeoutError(err, error=err)
394405
except requests.exceptions.ConnectionError as err:
395406
if err.args and isinstance(err.args[0], ProtocolError):
396407
error = ServiceResponseError(err, error=err)
@@ -405,7 +416,7 @@ def send( # pylint: disable=too-many-statements
405416
_LOGGER.warning("Unable to stream download.")
406417
error = HttpResponseError(err, error=err)
407418
except requests.RequestException as err:
408-
error = ServiceRequestError(err, error=err)
419+
error = ServiceResponseError(err, error=err)
409420

410421
if error:
411422
raise error

sdk/core/azure-core/azure/core/rest/_aiohttp.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,23 @@
2727
import asyncio # pylint: disable=do-not-import-asyncio
2828
from itertools import groupby
2929
from typing import Iterator, cast
30+
31+
import aiohttp
3032
from multidict import CIMultiDict
33+
3134
from ._http_response_impl_async import (
3235
AsyncHttpResponseImpl,
3336
AsyncHttpResponseBackcompatMixin,
3437
)
3538
from ..pipeline.transport._aiohttp import AioHttpStreamDownloadGenerator
3639
from ..utils._pipeline_transport_rest_shared import _pad_attr_name, _aiohttp_body_helper
37-
from ..exceptions import ResponseNotReadError
40+
from ..exceptions import (
41+
ResponseNotReadError,
42+
IncompleteReadError,
43+
ServiceResponseError,
44+
ServiceResponseTimeoutError,
45+
ServiceRequestError,
46+
)
3847

3948

4049
class _ItemsView(collections.abc.ItemsView):
@@ -212,7 +221,18 @@ async def read(self) -> bytes:
212221
"""
213222
if not self._content:
214223
self._stream_download_check()
215-
self._content = await self._internal_response.read()
224+
try:
225+
self._content = await self._internal_response.read()
226+
except aiohttp.client_exceptions.ClientPayloadError as err:
227+
# This is the case that server closes connection before we finish the reading. aiohttp library
228+
# raises ClientPayloadError.
229+
raise IncompleteReadError(err, error=err) from err
230+
except aiohttp.client_exceptions.ClientResponseError as err:
231+
raise ServiceResponseError(err, error=err) from err
232+
except asyncio.TimeoutError as err:
233+
raise ServiceResponseTimeoutError(err, error=err) from err
234+
except aiohttp.client_exceptions.ClientError as err:
235+
raise ServiceRequestError(err, error=err) from err
216236
await self._set_read_checks()
217237
return _aiohttp_body_helper(self)
218238

sdk/core/azure-core/tests/async_tests/test_basic_transport_async.py

Lines changed: 55 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,15 @@
33
# Licensed under the MIT License. See LICENSE.txt in the project root for
44
# license information.
55
# -------------------------------------------------------------------------
6+
7+
import pytest
8+
import sys
9+
import asyncio
10+
from packaging.version import Version
11+
from unittest import mock
12+
13+
import aiohttp
14+
615
from azure.core.pipeline.transport import (
716
AsyncHttpResponse as PipelineTransportAsyncHttpResponse,
817
AsyncHttpTransport,
@@ -21,13 +30,8 @@
2130
ServiceRequestTimeoutError,
2231
ServiceResponseTimeoutError,
2332
)
33+
2434
from utils import HTTP_REQUESTS, request_and_responses_product
25-
import pytest
26-
import sys
27-
import asyncio
28-
from unittest.mock import Mock
29-
from packaging.version import Version
30-
import aiohttp
3135

3236

3337
# transport = mock.MagicMock(spec=AsyncHttpTransport)
@@ -1049,47 +1053,66 @@ async def test_close_too_soon_works_fine(caplog, port, http_request):
10491053
assert result # No exception is good enough here
10501054

10511055

1052-
@pytest.mark.skipif(
1053-
Version(aiohttp.__version__) >= Version("3.10"),
1054-
reason="aiohttp 3.10 introduced separate connection timeout",
1055-
)
10561056
@pytest.mark.parametrize("http_request", HTTP_REQUESTS)
10571057
@pytest.mark.asyncio
1058-
async def test_aiohttp_timeout_response(http_request):
1058+
async def test_aiohttp_timeout_response(port, http_request):
10591059
async with AioHttpTransport() as transport:
1060-
transport.session._connector.connect = Mock(side_effect=asyncio.TimeoutError("Too slow!"))
10611060

1062-
request = http_request("GET", f"http://localhost:12345/basic/string")
1061+
request = http_request("GET", f"http://localhost:{port}/basic/string")
10631062

1064-
with pytest.raises(ServiceResponseTimeoutError) as err:
1065-
await transport.send(request)
1063+
with mock.patch.object(
1064+
aiohttp.ClientResponse, "start", side_effect=asyncio.TimeoutError("Too slow!")
1065+
) as mock_method:
1066+
with pytest.raises(ServiceResponseTimeoutError) as err:
1067+
await transport.send(request)
10661068

1067-
with pytest.raises(ServiceResponseError) as err:
1068-
await transport.send(request)
1069+
with pytest.raises(ServiceResponseError) as err:
1070+
await transport.send(request)
10691071

1070-
stream_request = http_request("GET", f"http://localhost:12345/streams/basic")
1071-
with pytest.raises(ServiceResponseTimeoutError) as err:
1072-
await transport.send(stream_request, stream=True)
1072+
stream_resp = http_request("GET", f"http://localhost:{port}/streams/basic")
1073+
with pytest.raises(ServiceResponseTimeoutError) as err:
1074+
await transport.send(stream_resp, stream=True)
1075+
1076+
stream_resp = await transport.send(stream_resp, stream=True)
1077+
with mock.patch.object(
1078+
aiohttp.streams.StreamReader, "read", side_effect=asyncio.TimeoutError("Too slow!")
1079+
) as mock_method:
1080+
with pytest.raises(ServiceResponseTimeoutError) as err:
1081+
try:
1082+
# current HttpResponse
1083+
await stream_resp.read()
1084+
except AttributeError:
1085+
# legacy HttpResponse
1086+
b"".join([b async for b in stream_resp.stream_download(None)])
10731087

10741088

1075-
@pytest.mark.skipif(
1076-
Version(aiohttp.__version__) < Version("3.10"),
1077-
reason="aiohttp 3.10 introduced separate connection timeout",
1078-
)
10791089
@pytest.mark.parametrize("http_request", HTTP_REQUESTS)
10801090
@pytest.mark.asyncio
10811091
async def test_aiohttp_timeout_request(http_request):
10821092
async with AioHttpTransport() as transport:
1083-
transport.session._connector.connect = Mock(side_effect=asyncio.TimeoutError("Too slow!"))
1093+
transport.session._connector.connect = mock.Mock(side_effect=asyncio.TimeoutError("Too slow!"))
10841094

10851095
request = http_request("GET", f"http://localhost:12345/basic/string")
10861096

1087-
with pytest.raises(ServiceRequestTimeoutError) as err:
1088-
await transport.send(request)
1097+
# aiohttp 3.10 introduced separate connection timeout
1098+
if Version(aiohttp.__version__) >= Version("3.10"):
1099+
with pytest.raises(ServiceRequestTimeoutError) as err:
1100+
await transport.send(request)
1101+
1102+
with pytest.raises(ServiceRequestError) as err:
1103+
await transport.send(request)
1104+
1105+
stream_request = http_request("GET", f"http://localhost:12345/streams/basic")
1106+
with pytest.raises(ServiceRequestTimeoutError) as err:
1107+
await transport.send(stream_request, stream=True)
1108+
1109+
else:
1110+
with pytest.raises(ServiceResponseTimeoutError) as err:
1111+
await transport.send(request)
10891112

1090-
with pytest.raises(ServiceRequestError) as err:
1091-
await transport.send(request)
1113+
with pytest.raises(ServiceResponseError) as err:
1114+
await transport.send(request)
10921115

1093-
stream_request = http_request("GET", f"http://localhost:12345/streams/basic")
1094-
with pytest.raises(ServiceRequestTimeoutError) as err:
1095-
await transport.send(stream_request, stream=True)
1116+
stream_request = http_request("GET", f"http://localhost:12345/streams/basic")
1117+
with pytest.raises(ServiceResponseTimeoutError) as err:
1118+
await transport.send(stream_request, stream=True)

sdk/core/azure-core/tests/test_basic_transport.py

Lines changed: 63 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,28 +5,35 @@
55
# -------------------------------------------------------------------------
66
from http.client import HTTPConnection
77
from collections import OrderedDict
8-
import sys
8+
import logging
9+
import pytest
10+
from unittest import mock
11+
from socket import timeout as SocketTimeout
912

10-
try:
11-
from unittest import mock
12-
except ImportError:
13-
import mock
13+
from urllib3.util import connection as urllib_connection
14+
from urllib3.response import HTTPResponse as UrllibResponse
15+
from urllib3.connection import HTTPConnection as UrllibConnection
1416

17+
from azure.core.rest._http_response_impl import HttpResponseImpl as RestHttpResponseImpl
18+
from azure.core.pipeline._tools import is_rest
1519
from azure.core.pipeline.transport import HttpResponse as PipelineTransportHttpResponse, RequestsTransport
1620
from azure.core.pipeline.transport._base import HttpTransport, _deserialize_response, _urljoin
1721
from azure.core.pipeline.policies import HeadersPolicy
1822
from azure.core.pipeline import Pipeline
19-
from azure.core.exceptions import HttpResponseError
20-
import logging
21-
import pytest
23+
from azure.core.exceptions import (
24+
HttpResponseError,
25+
ServiceRequestError,
26+
ServiceResponseError,
27+
ServiceRequestTimeoutError,
28+
ServiceResponseTimeoutError,
29+
)
30+
2231
from utils import (
2332
HTTP_REQUESTS,
2433
request_and_responses_product,
2534
HTTP_CLIENT_TRANSPORT_RESPONSES,
2635
create_transport_response,
2736
)
28-
from azure.core.rest._http_response_impl import HttpResponseImpl as RestHttpResponseImpl
29-
from azure.core.pipeline._tools import is_rest
3037

3138

3239
class PipelineTransportMockResponse(PipelineTransportHttpResponse):
@@ -1322,3 +1329,49 @@ def test_close_too_soon_works_fine(caplog, port, http_request):
13221329
result = transport.send(request)
13231330

13241331
assert result # No exception is good enough here
1332+
1333+
1334+
@pytest.mark.parametrize("http_request", HTTP_REQUESTS)
1335+
def test_requests_timeout_response(caplog, port, http_request):
1336+
transport = RequestsTransport()
1337+
1338+
request = http_request("GET", f"http://localhost:{port}/basic/string")
1339+
1340+
with mock.patch.object(UrllibConnection, "getresponse", side_effect=SocketTimeout) as mock_method:
1341+
with pytest.raises(ServiceResponseTimeoutError) as err:
1342+
transport.send(request, read_timeout=0.0001)
1343+
1344+
with pytest.raises(ServiceResponseError) as err:
1345+
transport.send(request, read_timeout=0.0001)
1346+
1347+
stream_request = http_request("GET", f"http://localhost:{port}/streams/basic")
1348+
with pytest.raises(ServiceResponseTimeoutError) as err:
1349+
transport.send(stream_request, stream=True, read_timeout=0.0001)
1350+
1351+
stream_resp = transport.send(stream_request, stream=True)
1352+
with mock.patch.object(UrllibResponse, "_handle_chunk", side_effect=SocketTimeout) as mock_method:
1353+
with pytest.raises(ServiceResponseTimeoutError) as err:
1354+
try:
1355+
# current HttpResponse
1356+
stream_resp.read()
1357+
except AttributeError:
1358+
# legacy HttpResponse
1359+
b"".join(stream_resp.stream_download(None))
1360+
1361+
1362+
@pytest.mark.parametrize("http_request", HTTP_REQUESTS)
1363+
def test_requests_timeout_request(caplog, port, http_request):
1364+
transport = RequestsTransport()
1365+
1366+
request = http_request("GET", f"http://localhost:{port}/basic/string")
1367+
1368+
with mock.patch.object(urllib_connection, "create_connection", side_effect=SocketTimeout) as mock_method:
1369+
with pytest.raises(ServiceRequestTimeoutError) as err:
1370+
transport.send(request, connection_timeout=0.0001)
1371+
1372+
with pytest.raises(ServiceRequestTimeoutError) as err:
1373+
transport.send(request, connection_timeout=0.0001)
1374+
1375+
stream_request = http_request("GET", f"http://localhost:{port}/streams/basic")
1376+
with pytest.raises(ServiceRequestTimeoutError) as err:
1377+
transport.send(stream_request, stream=True, connection_timeout=0.0001)

sdk/core/azure-core/tests/test_stream_generator.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
)
1010
from azure.core.pipeline import Pipeline, PipelineResponse
1111
from azure.core.pipeline.transport._requests_basic import StreamDownloadGenerator
12+
from azure.core.exceptions import ServiceResponseError
1213

1314
try:
1415
from unittest import mock
@@ -73,7 +74,7 @@ def close(self):
7374
http_response.internal_response = MockInternalResponse()
7475
stream = StreamDownloadGenerator(pipeline, http_response, decompress=False)
7576
with mock.patch("time.sleep", return_value=None):
76-
with pytest.raises(requests.exceptions.ConnectionError):
77+
with pytest.raises(ServiceResponseError):
7778
stream.__next__()
7879

7980

@@ -133,5 +134,5 @@ def mock_run(self, *args, **kwargs):
133134
pipeline = Pipeline(transport)
134135
pipeline.run = mock_run
135136
downloader = response.stream_download(pipeline, decompress=False)
136-
with pytest.raises(requests.exceptions.ConnectionError):
137+
with pytest.raises(ServiceResponseError):
137138
full_response = b"".join(downloader)

0 commit comments

Comments
 (0)