Skip to content

Commit 4a5d568

Browse files
Uxio0kclowes
andauthored
Allow timeout for chunked responses (#3428)
* Allow timeout for chunked responses - Related to #3418 * Only read iter_content if stream is True * Add newsfragment --------- Co-authored-by: kclowes <[email protected]>
1 parent 86a4350 commit 4a5d568

File tree

3 files changed

+99
-9
lines changed

3 files changed

+99
-9
lines changed

newsfragments/3428.feature.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add explicit stream kwarg to HTTPProvider so that timeout can be more finely tuned.

tests/core/utilities/test_http_session_manager.py

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
from web3._utils.http_session_manager import (
3030
HTTPSessionManager,
3131
)
32+
from web3.exceptions import (
33+
TimeExhausted,
34+
)
3235
from web3.utils.caching import (
3336
SimpleCache,
3437
)
@@ -44,6 +47,15 @@ def __init__(self, text="", status_code=200):
4447
self.reason = None
4548
self.content = "content"
4649

50+
def __enter__(self):
51+
return self
52+
53+
def __exit__(self, exc_type, exc_value, traceback):
54+
pass
55+
56+
def iter_content(self):
57+
return [b"iter content"]
58+
4759
@staticmethod
4860
def json():
4961
return json.dumps({"data": "content"})
@@ -110,7 +122,59 @@ def test_session_manager_make_post_request_no_args(mocker, http_session_manager)
110122
assert len(http_session_manager.session_cache) == 1
111123
cache_key = generate_cache_key(f"{threading.get_ident()}:{TEST_URI}")
112124
session = http_session_manager.session_cache.get_cache_entry(cache_key)
113-
session.post.assert_called_once_with(TEST_URI, data=b"request", timeout=30)
125+
session.post.assert_called_once_with(
126+
TEST_URI, data=b"request", timeout=30, stream=False
127+
)
128+
129+
# Ensure the adapter was created with default values
130+
check_adapters_mounted(session)
131+
adapter = session.get_adapter(TEST_URI)
132+
assert isinstance(adapter, HTTPAdapter)
133+
assert adapter._pool_connections == DEFAULT_POOLSIZE
134+
assert adapter._pool_maxsize == DEFAULT_POOLSIZE
135+
136+
137+
def test_session_manager_make_post_request_streaming(mocker, http_session_manager):
138+
mocker.patch("requests.Session.post", return_value=MockedResponse())
139+
140+
# Submit a first request to create a session
141+
assert len(http_session_manager.session_cache) == 0
142+
response = http_session_manager.make_post_request(
143+
TEST_URI, data=b"request", stream=True
144+
)
145+
assert response == b"iter content"
146+
assert len(http_session_manager.session_cache) == 1
147+
cache_key = generate_cache_key(f"{threading.get_ident()}:{TEST_URI}")
148+
session = http_session_manager.session_cache.get_cache_entry(cache_key)
149+
session.post.assert_called_once_with(
150+
TEST_URI, data=b"request", timeout=30, stream=True
151+
)
152+
153+
# Ensure the adapter was created with passed in values
154+
check_adapters_mounted(session)
155+
adapter = session.get_adapter(TEST_URI)
156+
assert isinstance(adapter, HTTPAdapter)
157+
assert adapter._pool_connections == DEFAULT_POOLSIZE
158+
assert adapter._pool_maxsize == DEFAULT_POOLSIZE
159+
160+
161+
def test_session_manager_make_post_request_times_out_while_streaming(
162+
mocker, http_session_manager
163+
):
164+
mocker.patch("requests.Session.post", return_value=MockedResponse())
165+
166+
# Submit a first request to create a session
167+
assert len(http_session_manager.session_cache) == 0
168+
with pytest.raises(TimeExhausted):
169+
http_session_manager.make_post_request(
170+
TEST_URI, data=b"request", stream=True, timeout=0.000001
171+
)
172+
assert len(http_session_manager.session_cache) == 1
173+
cache_key = generate_cache_key(f"{threading.get_ident()}:{TEST_URI}")
174+
session = http_session_manager.session_cache.get_cache_entry(cache_key)
175+
session.post.assert_called_once_with(
176+
TEST_URI, data=b"request", timeout=0.000001, stream=True
177+
)
114178

115179
# Ensure the adapter was created with default values
116180
check_adapters_mounted(session)
@@ -140,7 +204,9 @@ def test_session_manager_precached_session(mocker, http_session_manager):
140204

141205
# Ensure the timeout was passed to the request
142206
session = http_session_manager.cache_and_return_session(TEST_URI)
143-
session.post.assert_called_once_with(TEST_URI, data=b"request", timeout=60)
207+
session.post.assert_called_once_with(
208+
TEST_URI, data=b"request", timeout=60, stream=False
209+
)
144210

145211
# Ensure the adapter parameters match those we specified
146212
check_adapters_mounted(session)

web3/_utils/http_session_manager.py

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import logging
66
import os
77
import threading
8+
import time
89
from typing import (
910
Any,
1011
Dict,
@@ -32,6 +33,9 @@
3233
from web3._utils.http import (
3334
DEFAULT_HTTP_TIMEOUT,
3435
)
36+
from web3.exceptions import (
37+
TimeExhausted,
38+
)
3539
from web3.utils.caching import (
3640
SimpleCache,
3741
)
@@ -115,21 +119,40 @@ def json_make_get_request(
115119
def get_response_from_post_request(
116120
self, endpoint_uri: URI, *args: Any, **kwargs: Any
117121
) -> requests.Response:
118-
kwargs.setdefault("timeout", DEFAULT_HTTP_TIMEOUT)
119122
session = self.cache_and_return_session(
120123
endpoint_uri, request_timeout=kwargs["timeout"]
121124
)
122-
response = session.post(endpoint_uri, *args, **kwargs)
123-
return response
125+
return session.post(endpoint_uri, *args, **kwargs)
124126

125127
def make_post_request(
126128
self, endpoint_uri: URI, data: Union[bytes, Dict[str, Any]], **kwargs: Any
127129
) -> bytes:
128-
response = self.get_response_from_post_request(
130+
kwargs.setdefault("timeout", DEFAULT_HTTP_TIMEOUT)
131+
kwargs.setdefault("stream", False)
132+
133+
start = time.time()
134+
timeout = kwargs["timeout"]
135+
136+
with self.get_response_from_post_request(
129137
endpoint_uri, data=data, **kwargs
130-
)
131-
response.raise_for_status()
132-
return response.content
138+
) as response:
139+
response.raise_for_status()
140+
if kwargs.get("stream"):
141+
return self._handle_streaming_response(response, start, timeout)
142+
else:
143+
return response.content
144+
145+
def _handle_streaming_response(
146+
self, response: requests.Response, start: float, timeout: float
147+
) -> bytes:
148+
response_body = b""
149+
for data in response.iter_content():
150+
response_body += data
151+
# Manually manage timeout so streaming responses time out
152+
# rather than resetting the timeout each time a response comes back
153+
if (time.time() - start) > timeout:
154+
raise TimeExhausted
155+
return response_body
133156

134157
def _close_evicted_sessions(self, evicted_sessions: List[requests.Session]) -> None:
135158
for evicted_session in evicted_sessions:

0 commit comments

Comments
 (0)