Skip to content

Commit 82d6732

Browse files
committed
[Persistent Connection Providers] Improve request info management:
- Instead of guessing what the request `id` will be and having to bump this cache key for internal requests, split persistent Connection provider calls into send and recv. Use the build request after sending to get the `id` and store the request information for when the response is received. - Add request caching support and request mocker support for these new methods. Use the `send_request` and `recv_for_request` methods on the `Module` class so we properly handle the request information for processing the response.
1 parent 9488fe7 commit 82d6732

File tree

13 files changed

+350
-126
lines changed

13 files changed

+350
-126
lines changed

tests/core/providers/test_async_ipc_provider.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,5 +364,5 @@ async def test_async_ipc_provider_write_messages_end_with_new_line_delimiter(
364364

365365
await w3.provider.make_request("method", [])
366366

367-
request_data = b'{"jsonrpc": "2.0", "method": "method", "params": [], "id": 0}'
367+
request_data = b'{"id": 0, "jsonrpc": "2.0", "method": "method", "params": []}'
368368
w3.provider._writer.write.assert_called_with(request_data + b"\n")

web3/_utils/caching/caching_utils.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,12 @@
6464
from web3.providers import ( # noqa: F401
6565
AsyncBaseProvider,
6666
BaseProvider,
67+
PersistentConnectionProvider,
6768
)
6869
from web3.types import ( # noqa: F401
6970
AsyncMakeRequestFn,
7071
MakeRequestFn,
72+
RPCRequest,
7173
RPCResponse,
7274
)
7375

@@ -367,3 +369,65 @@ async def wrapper(
367369
# save a reference to the decorator on the wrapped function
368370
wrapper._decorator = async_handle_request_caching # type: ignore
369371
return wrapper
372+
373+
374+
def async_handle_send_caching(
375+
func: Callable[
376+
[ASYNC_PROVIDER_TYPE, RPCEndpoint, Any],
377+
Coroutine[Any, Any, "RPCRequest"],
378+
],
379+
) -> Callable[..., Coroutine[Any, Any, "RPCRequest"]]:
380+
async def wrapper(
381+
provider: ASYNC_PROVIDER_TYPE, method: RPCEndpoint, params: Any
382+
) -> "RPCRequest":
383+
if is_cacheable_request(provider, method, params):
384+
request_cache = provider._request_cache
385+
cache_key = generate_cache_key(
386+
f"{threading.get_ident()}:{(method, params)}"
387+
)
388+
cached_response = request_cache.get_cache_entry(cache_key)
389+
if cached_response is not None:
390+
# The request data isn't used, this just prevents a cached request from
391+
# being sent - return an empty request object
392+
return {"id": -1, "method": RPCEndpoint(""), "params": []}
393+
return await func(provider, method, params)
394+
395+
# save a reference to the decorator on the wrapped function
396+
wrapper._decorator = async_handle_send_caching # type: ignore
397+
return wrapper
398+
399+
400+
def async_handle_recv_caching(
401+
func: Callable[
402+
["PersistentConnectionProvider", "RPCRequest"],
403+
Coroutine[Any, Any, "RPCResponse"],
404+
]
405+
) -> Callable[..., Coroutine[Any, Any, "RPCResponse"]]:
406+
async def wrapper(
407+
provider: "PersistentConnectionProvider",
408+
rpc_request: "RPCRequest",
409+
) -> "RPCResponse":
410+
method = rpc_request["method"]
411+
params = rpc_request["params"]
412+
if is_cacheable_request(provider, method, params):
413+
request_cache = provider._request_cache
414+
cache_key = generate_cache_key(
415+
f"{threading.get_ident()}:{(method, params)}"
416+
)
417+
cache_result = request_cache.get_cache_entry(cache_key)
418+
if cache_result is not None:
419+
return cache_result
420+
else:
421+
response = await func(provider, rpc_request)
422+
if await _async_should_cache_response(
423+
provider, method, params, response
424+
):
425+
async with provider._request_cache_lock:
426+
request_cache.cache(cache_key, response)
427+
return response
428+
else:
429+
return await func(provider, rpc_request)
430+
431+
# save a reference to the decorator on the wrapped function
432+
wrapper._decorator = async_handle_recv_caching # type: ignore
433+
return wrapper

web3/_utils/caching/request_caching_validation.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from web3.providers import ( # noqa: F401
2020
AsyncBaseProvider,
2121
BaseProvider,
22+
PersistentConnectionProvider,
2223
)
2324

2425
UNCACHEABLE_BLOCK_IDS = {"finalized", "safe", "latest", "pending"}

web3/_utils/module_testing/utils.py

Lines changed: 99 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,14 @@
1010
cast,
1111
)
1212

13-
from toolz import (
13+
from eth_utils.toolz import (
1414
merge,
1515
)
1616

17+
from web3.providers.persistent import (
18+
PersistentConnectionProvider,
19+
)
20+
1721
if TYPE_CHECKING:
1822
from web3 import ( # noqa: F401
1923
AsyncWeb3,
@@ -26,6 +30,7 @@
2630
AsyncMakeRequestFn,
2731
MakeRequestFn,
2832
RPCEndpoint,
33+
RPCRequest,
2934
RPCResponse,
3035
)
3136

@@ -101,9 +106,21 @@ def __init__(
101106
self.mock_results = mock_results or {}
102107
self.mock_errors = mock_errors or {}
103108
self.mock_responses = mock_responses or {}
104-
self._make_request: Union[
105-
"AsyncMakeRequestFn", "MakeRequestFn"
106-
] = w3.provider.make_request
109+
if isinstance(w3.provider, PersistentConnectionProvider):
110+
self._send_request = w3.provider.send_request
111+
self._recv_for_request = w3.provider.recv_for_request
112+
else:
113+
self._make_request: Union[
114+
"AsyncMakeRequestFn", "MakeRequestFn"
115+
] = w3.provider.make_request
116+
117+
def _build_request_id(self) -> int:
118+
request_id = (
119+
next(copy.deepcopy(self.w3.provider.request_counter))
120+
if hasattr(self.w3.provider, "request_counter")
121+
else 1
122+
)
123+
return request_id
107124

108125
def __enter__(self) -> "Self":
109126
# mypy error: Cannot assign to a method
@@ -131,11 +148,7 @@ def _mock_request_handler(
131148
):
132149
return self._make_request(method, params)
133150

134-
request_id = (
135-
next(copy.deepcopy(self.w3.provider.request_counter))
136-
if hasattr(self.w3.provider, "request_counter")
137-
else 1
138-
)
151+
request_id = self._build_request_id()
139152
response_dict = {"jsonrpc": "2.0", "id": request_id}
140153

141154
if method in self.mock_responses:
@@ -176,35 +189,34 @@ def _mock_request_handler(
176189
# -- async -- #
177190

178191
async def __aenter__(self) -> "Self":
179-
# mypy error: Cannot assign to a method
180-
self.w3.provider.make_request = self._async_mock_request_handler # type: ignore[method-assign] # noqa: E501
181-
# reset request func cache to re-build request_func with mocked make_request
182-
self.w3.provider._request_func_cache = (None, None)
192+
if not isinstance(self.w3.provider, PersistentConnectionProvider):
193+
# mypy error: Cannot assign to a method
194+
self.w3.provider.make_request = self._async_mock_request_handler # type: ignore[method-assign] # noqa: E501
195+
# reset request func cache to re-build request_func w/ mocked make_request
196+
self.w3.provider._request_func_cache = (None, None)
197+
else:
198+
self.w3.provider.send_request = self._async_mock_send_handler # type: ignore[method-assign] # noqa: E501
199+
self.w3.provider.recv_for_request = self._async_mock_recv_handler # type: ignore[method-assign] # noqa: E501
200+
self.w3.provider._send_func_cache = (None, None)
201+
self.w3.provider._recv_func_cache = (None, None)
183202
return self
184203

185204
async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
186-
# mypy error: Cannot assign to a method
187-
self.w3.provider.make_request = self._make_request # type: ignore[assignment]
188-
# reset request func cache to re-build request_func with original make_request
189-
self.w3.provider._request_func_cache = (None, None)
205+
if not isinstance(self.w3.provider, PersistentConnectionProvider):
206+
# mypy error: Cannot assign to a method
207+
self.w3.provider.make_request = self._make_request # type: ignore[assignment] # noqa: E501
208+
# reset request func cache to re-build request_func w/ original make_request
209+
self.w3.provider._request_func_cache = (None, None)
210+
else:
211+
self.w3.provider.send_request = self._send_request # type: ignore[method-assign] # noqa: E501
212+
self.w3.provider.recv_for_request = self._recv_for_request # type: ignore[method-assign] # noqa: E501
213+
self.w3.provider._send_func_cache = (None, None)
214+
self.w3.provider._recv_func_cache = (None, None)
190215

191-
async def _async_mock_request_handler(
192-
self, method: "RPCEndpoint", params: Any
216+
async def _async_build_mock_result(
217+
self, method: "RPCEndpoint", params: Any, request_id: int = None
193218
) -> "RPCResponse":
194-
self.w3 = cast("AsyncWeb3", self.w3)
195-
self._make_request = cast("AsyncMakeRequestFn", self._make_request)
196-
197-
if all(
198-
method not in mock_dict
199-
for mock_dict in (self.mock_errors, self.mock_results, self.mock_responses)
200-
):
201-
return await self._make_request(method, params)
202-
203-
request_id = (
204-
next(copy.deepcopy(self.w3.provider.request_counter))
205-
if hasattr(self.w3.provider, "request_counter")
206-
else 1
207-
)
219+
request_id = request_id if request_id else self._build_request_id()
208220
response_dict = {"jsonrpc": "2.0", "id": request_id}
209221

210222
if method in self.mock_responses:
@@ -244,6 +256,19 @@ async def _async_mock_request_handler(
244256
else:
245257
raise Exception("Invariant: unreachable code path")
246258

259+
return mocked_result
260+
261+
async def _async_mock_request_handler(
262+
self, method: "RPCEndpoint", params: Any
263+
) -> "RPCResponse":
264+
self.w3 = cast("AsyncWeb3", self.w3)
265+
self._make_request = cast("AsyncMakeRequestFn", self._make_request)
266+
if all(
267+
method not in mock_dict
268+
for mock_dict in (self.mock_errors, self.mock_results, self.mock_responses)
269+
):
270+
return await self._make_request(method, params)
271+
mocked_result = await self._async_build_mock_result(method, params)
247272
decorator = getattr(self._make_request, "_decorator", None)
248273
if decorator is not None:
249274
# If the original make_request was decorated, we need to re-apply
@@ -259,6 +284,47 @@ async def _coro(
259284
else:
260285
return mocked_result
261286

287+
async def _async_mock_send_handler(
288+
self, method: "RPCEndpoint", params: Any
289+
) -> "RPCRequest":
290+
if all(
291+
method not in mock_dict
292+
for mock_dict in (self.mock_errors, self.mock_results, self.mock_responses)
293+
):
294+
return await self._send_request(method, params)
295+
else:
296+
request_id = self._build_request_id()
297+
return {"id": request_id, "method": method, "params": params}
298+
299+
async def _async_mock_recv_handler(
300+
self, rpc_request: "RPCRequest"
301+
) -> "RPCResponse":
302+
self.w3 = cast("AsyncWeb3", self.w3)
303+
method = rpc_request["method"]
304+
request_id = rpc_request["id"]
305+
if all(
306+
method not in mock_dict
307+
for mock_dict in (self.mock_errors, self.mock_results, self.mock_responses)
308+
):
309+
return await self._recv_for_request(request_id)
310+
mocked_result = await self._async_build_mock_result(
311+
method, rpc_request["params"], request_id=int(request_id)
312+
)
313+
decorator = getattr(self._recv_for_request, "_decorator", None)
314+
if decorator is not None:
315+
# If the original recv_for_request was decorated, we need to re-apply
316+
# the decorator to the mocked recv_for_request. This is necessary for
317+
# the request caching decorator to work properly.
318+
319+
async def _coro(
320+
_provider: Any, _rpc_request: "RPCRequest"
321+
) -> "RPCResponse":
322+
return mocked_result
323+
324+
return await decorator(_coro)(self.w3.provider, rpc_request)
325+
else:
326+
return mocked_result
327+
262328
@staticmethod
263329
def _create_error_object(error: Dict[str, Any]) -> Dict[str, Any]:
264330
code = error.get("code", -32000)

web3/manager.py

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
AsyncGenerator,
77
Callable,
88
Coroutine,
9+
Dict,
910
List,
1011
Optional,
1112
Sequence,
@@ -71,6 +72,7 @@
7172
from web3.types import (
7273
FormattedEthSubscriptionResponse,
7374
RPCEndpoint,
75+
RPCRequest,
7476
RPCResponse,
7577
)
7678

@@ -502,31 +504,62 @@ def _format_batched_response(
502504
# -- persistent connection -- #
503505

504506
async def socket_request(
505-
self, method: RPCEndpoint, params: Any
506-
) -> Union[RPCResponse, FormattedEthSubscriptionResponse]:
507+
self,
508+
method: RPCEndpoint,
509+
params: Any,
510+
response_formatters: Optional[
511+
Tuple[Dict[str, Callable[..., Any]], Callable[..., Any], Callable[..., Any]]
512+
] = None,
513+
) -> RPCResponse:
507514
provider = cast(PersistentConnectionProvider, self._provider)
508-
request_func = await provider.request_func(
509-
cast("AsyncWeb3", self.w3), cast("MiddlewareOnion", self.middleware_onion)
510-
)
511515
self.logger.debug(
512516
"Making request to open socket connection and waiting for response: "
513-
f"{provider.get_endpoint_uri_or_ipc_path()}, method: {method}"
517+
f"{provider.get_endpoint_uri_or_ipc_path()},\n method: {method},\n"
518+
f" params: {params}"
514519
)
515-
response = await request_func(method, params)
516-
return await self._process_response(response)
520+
rpc_request = await self.send(method, params)
521+
provider._request_processor.cache_request_information(
522+
rpc_request["id"],
523+
rpc_request["method"],
524+
rpc_request["params"],
525+
response_formatters=response_formatters or ((), (), ()),
526+
)
527+
return await self.recv_for_request(rpc_request)
517528

518-
async def send(self, method: RPCEndpoint, params: Any) -> None:
529+
async def send(self, method: RPCEndpoint, params: Any) -> RPCRequest:
519530
provider = cast(PersistentConnectionProvider, self._provider)
520-
# run through the request processors of the middleware
521-
for mw_class in self.middleware_onion.as_tuple_of_middleware():
522-
mw = mw_class(self.w3)
523-
method, params = mw.request_processor(method, params)
524-
531+
async_w3 = cast("AsyncWeb3", self.w3)
532+
middleware_onion = cast("MiddlewareOnion", self.middleware_onion)
533+
send_func = await provider.send_func(
534+
async_w3,
535+
middleware_onion,
536+
)
525537
self.logger.debug(
526538
"Sending request to open socket connection: "
527-
f"{provider.get_endpoint_uri_or_ipc_path()}, method: {method}"
539+
f"{provider.get_endpoint_uri_or_ipc_path()},\n method: {method},\n"
540+
f" params: {params}"
541+
)
542+
return await send_func(method, params)
543+
544+
async def recv_for_request(self, rpc_request: RPCRequest) -> RPCResponse:
545+
provider = cast(PersistentConnectionProvider, self._provider)
546+
async_w3 = cast("AsyncWeb3", self.w3)
547+
middleware_onion = cast("MiddlewareOnion", self.middleware_onion)
548+
recv_func = await provider.recv_func(
549+
async_w3,
550+
middleware_onion,
551+
)
552+
self.logger.debug(
553+
"Getting response for request from open socket connection:\n"
554+
f" request: {rpc_request}"
528555
)
529-
await provider.socket_send(provider.encode_rpc_request(method, params))
556+
response = await recv_func(rpc_request)
557+
try:
558+
return cast(RPCResponse, await self._process_response(response))
559+
except Exception:
560+
response_id_key = generate_cache_key(response["id"])
561+
provider._request_processor._request_information_cache.pop(response_id_key)
562+
raise
530563

531564
async def recv(self) -> Union[RPCResponse, FormattedEthSubscriptionResponse]:
532565
provider = cast(PersistentConnectionProvider, self._provider)

web3/method.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ def input_munger(self, module: "Module", args: Any, kwargs: Any) -> List[Any]:
203203
def process_params(
204204
self, module: "Module", *args: Any, **kwargs: Any
205205
) -> Tuple[
206-
Tuple[Union[RPCEndpoint, Callable[..., RPCEndpoint]], Tuple[Any, ...]],
206+
Tuple[Union[RPCEndpoint, Callable[..., RPCEndpoint]], Tuple[RPCEndpoint, ...]],
207207
Tuple[
208208
Union[TReturn, Dict[str, Callable[..., Any]]],
209209
Callable[..., Any],

0 commit comments

Comments
 (0)