Skip to content

Commit 517399c

Browse files
committed
Fix buffer issues with async IPC; add tests:
- Move to a readline() approach on the stream reader for IPC. The older implementation for the original IPC would read in chunks but this isn't ideal and, it turns out, opting for readline while allowing control of the buffer limit lends itself quite nicely to plucking well-formed responses from the socket.
1 parent e8eb20e commit 517399c

File tree

4 files changed

+112
-56
lines changed

4 files changed

+112
-56
lines changed

newsfragments/3492.feature.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add a configuration option for the ``read_buffer_limit`` for ``AsyncIPCProvider`` in order to control the expected message size limit. Set this default value to 20MB.

newsfragments/3492.performance.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Improve logic for reading from the async IPC socket in order to properly handle and adjust the handling of large messages. This improves reading speeds in general.

tests/core/providers/test_async_ipc_provider.py

Lines changed: 95 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@
3838
},
3939
}
4040

41+
TWENTY_MB = 20 * 1024 * 1024
42+
SIZED_MSG_START = b'{"id": 0, "jsonrpc": "2.0", "result": "'
43+
SIZED_MSG_END = b'"}\n' b""
44+
4145

4246
@pytest.fixture
4347
def jsonrpc_ipc_pipe_path():
@@ -61,53 +65,80 @@ def simple_ipc_server(jsonrpc_ipc_pipe_path):
6165
serv.close()
6266

6367

64-
@pytest.fixture
65-
def serve_empty_result(simple_ipc_server):
68+
def ipc_server_reply(simple_ipc_server, response_fn):
6669
def reply():
6770
connection, client_address = simple_ipc_server.accept()
6871
try:
6972
connection.recv(1024)
70-
connection.sendall(b'{"id": 0, "result": {}')
71-
time.sleep(0.1)
72-
connection.sendall(b"}")
73+
response_fn(connection)
7374
except BrokenPipeError:
7475
pass
7576
finally:
76-
# Clean up the connection
7777
connection.close()
7878
simple_ipc_server.close()
7979

8080
thd = Thread(target=reply, daemon=True)
8181
thd.start()
82-
83-
try:
84-
yield
85-
finally:
86-
thd.join()
82+
return thd
8783

8884

8985
@pytest.fixture
90-
def serve_subscription_result(simple_ipc_server):
91-
def reply():
92-
connection, client_address = simple_ipc_server.accept()
86+
def ipc_server_fixture(simple_ipc_server):
87+
def server_fixture(response_fn):
88+
thread = ipc_server_reply(simple_ipc_server, response_fn)
9389
try:
94-
connection.recv(1024)
95-
connection.sendall(
96-
b'{"jsonrpc": "2.0", "id": 0, "result": "0xf13f7073ddef66a8c1b0c9c9f0e543c3"}' # noqa: E501
97-
)
98-
connection.sendall(json.dumps(ETH_SUBSCRIBE_RESPONSE).encode("utf-8"))
90+
yield
9991
finally:
100-
# Clean up the connection
101-
connection.close()
102-
simple_ipc_server.close()
92+
thread.join()
10393

104-
thd = Thread(target=reply, daemon=True)
105-
thd.start()
94+
return server_fixture
10695

107-
try:
108-
yield
109-
finally:
110-
thd.join()
96+
97+
@pytest.fixture
98+
def serve_empty_result(ipc_server_fixture):
99+
def response_fn(connection):
100+
connection.sendall(b'{"id": 0, "result": {}')
101+
time.sleep(0.1)
102+
connection.sendall(b"}\n")
103+
104+
yield from ipc_server_fixture(response_fn)
105+
106+
107+
@pytest.fixture
108+
def serve_20mb_response(ipc_server_fixture):
109+
def response_fn(connection):
110+
connection.sendall(
111+
SIZED_MSG_START
112+
+ (b"a" * (TWENTY_MB - len(SIZED_MSG_START) - len(SIZED_MSG_END)))
113+
+ SIZED_MSG_END
114+
)
115+
116+
yield from ipc_server_fixture(response_fn)
117+
118+
119+
@pytest.fixture
120+
def serve_larger_than_20mb_response(ipc_server_fixture):
121+
def response_fn(connection):
122+
connection.sendall(
123+
SIZED_MSG_START
124+
+ (b"a" * (TWENTY_MB - len(SIZED_MSG_START) - len(SIZED_MSG_END) + 1024))
125+
+ SIZED_MSG_END
126+
)
127+
128+
yield from ipc_server_fixture(response_fn)
129+
130+
131+
@pytest.fixture
132+
def serve_subscription_result(ipc_server_fixture):
133+
def response_fn(connection):
134+
connection.sendall(
135+
b"{"
136+
b'"jsonrpc": "2.0", "id": 0, "result": "0xf13f7073ddef66a8c1b0c9c9f0e543c3"'
137+
b"}\n"
138+
)
139+
connection.sendall(json.dumps(ETH_SUBSCRIBE_RESPONSE).encode("utf-8"))
140+
141+
yield from ipc_server_fixture(response_fn)
111142

112143

113144
def test_ipc_tilde_in_path():
@@ -226,7 +257,7 @@ async def test_async_iterator_pattern_exception_handling_for_requests(
226257
exception_caught = False
227258
async for w3 in AsyncWeb3(AsyncIPCProvider(pathlib.Path(jsonrpc_ipc_pipe_path))):
228259
# patch the listener to raise `ConnectionClosed` on read
229-
w3.provider._reader.read = _raise_connection_closed
260+
w3.provider._reader.readline = _raise_connection_closed
230261
try:
231262
await w3.eth.block_number
232263
except ConnectionClosed:
@@ -249,7 +280,7 @@ async def test_async_iterator_pattern_exception_handling_for_subscriptions(
249280
exception_caught = False
250281
async for w3 in AsyncWeb3(AsyncIPCProvider(pathlib.Path(jsonrpc_ipc_pipe_path))):
251282
# patch the listener to raise `ConnectionClosed` on read
252-
w3.provider._reader.read = _raise_connection_closed
283+
w3.provider._reader.readline = _raise_connection_closed
253284
try:
254285
async for _ in w3.socket.process_subscriptions():
255286
# raises exception
@@ -264,3 +295,37 @@ async def test_async_iterator_pattern_exception_handling_for_subscriptions(
264295
pytest.fail("Expected `ConnectionClosed` exception.")
265296

266297
assert exception_caught
298+
299+
300+
@pytest.mark.asyncio
301+
async def test_async_ipc_reader_can_read_20mb_message(
302+
jsonrpc_ipc_pipe_path, serve_20mb_response
303+
):
304+
async with AsyncWeb3(AsyncIPCProvider(pathlib.Path(jsonrpc_ipc_pipe_path))) as w3:
305+
response = await w3.provider.make_request("method", [])
306+
assert len(response["result"]) == TWENTY_MB - len(SIZED_MSG_START) - len(
307+
SIZED_MSG_END
308+
)
309+
310+
311+
@pytest.mark.asyncio
312+
async def test_async_ipc_reader_raises_on_msg_over_20mb(
313+
jsonrpc_ipc_pipe_path, serve_larger_than_20mb_response
314+
):
315+
with pytest.raises(ValueError):
316+
async with AsyncWeb3(
317+
AsyncIPCProvider(pathlib.Path(jsonrpc_ipc_pipe_path))
318+
) as w3:
319+
await w3.provider.make_request("method", [])
320+
321+
322+
@pytest.mark.asyncio
323+
async def test_async_ipc_read_buffer_limit_is_configurable(
324+
jsonrpc_ipc_pipe_path, serve_larger_than_20mb_response
325+
):
326+
async with AsyncWeb3(
327+
AsyncIPCProvider(
328+
pathlib.Path(jsonrpc_ipc_pipe_path), read_buffer_limit=TWENTY_MB + 1024
329+
)
330+
) as w3:
331+
await w3.provider.make_request("method", [])

web3/providers/persistent/async_ipc.py

Lines changed: 15 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
import asyncio
22
import errno
33
import json
4-
from json import (
5-
JSONDecodeError,
6-
)
74
import logging
85
from pathlib import (
96
Path,
@@ -16,10 +13,6 @@
1613
Union,
1714
)
1815

19-
from eth_utils import (
20-
to_text,
21-
)
22-
2316
from web3.types import (
2417
RPCResponse,
2518
)
@@ -28,6 +21,7 @@
2821
PersistentConnectionProvider,
2922
)
3023
from ...exceptions import (
24+
PersistentConnectionClosedOK,
3125
ProviderConnectionError,
3226
Web3TypeError,
3327
)
@@ -37,7 +31,7 @@
3731

3832

3933
async def async_get_ipc_socket(
40-
ipc_path: str,
34+
ipc_path: str, read_buffer_limit: int
4135
) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]:
4236
if sys.platform == "win32":
4337
# On Windows named pipe is used. Simulate socket with it.
@@ -47,7 +41,7 @@ async def async_get_ipc_socket(
4741

4842
return NamedPipe(ipc_path)
4943
else:
50-
return await asyncio.open_unix_connection(ipc_path)
44+
return await asyncio.open_unix_connection(ipc_path, limit=read_buffer_limit)
5145

5246

5347
class AsyncIPCProvider(PersistentConnectionProvider):
@@ -56,11 +50,11 @@ class AsyncIPCProvider(PersistentConnectionProvider):
5650
_reader: Optional[asyncio.StreamReader] = None
5751
_writer: Optional[asyncio.StreamWriter] = None
5852
_decoder: json.JSONDecoder = json.JSONDecoder()
59-
_raw_message: str = ""
6053

6154
def __init__(
6255
self,
6356
ipc_path: Optional[Union[str, Path]] = None,
57+
read_buffer_limit: int = 20 * 1024 * 1024, # 20 MB
6458
# `PersistentConnectionProvider` kwargs can be passed through
6559
**kwargs: Any,
6660
) -> None:
@@ -71,6 +65,7 @@ def __init__(
7165
else:
7266
raise Web3TypeError("ipc_path must be of type string or pathlib.Path")
7367

68+
self.read_buffer_limit = read_buffer_limit
7469
super().__init__(**kwargs)
7570

7671
def __str__(self) -> str:
@@ -101,18 +96,10 @@ async def socket_send(self, request_data: bytes) -> None:
10196
)
10297

10398
async def socket_recv(self) -> RPCResponse:
104-
while True:
105-
# yield to the event loop to allow other tasks to run
106-
await asyncio.sleep(0)
107-
108-
try:
109-
response, pos = self._decoder.raw_decode(self._raw_message)
110-
self._raw_message = self._raw_message[pos:].lstrip()
111-
return response
112-
except JSONDecodeError:
113-
# read more data from the socket if the current raw message is
114-
# incomplete
115-
self._raw_message += to_text(await self._reader.read(4096)).lstrip()
99+
data = await self._reader.readline()
100+
if not data:
101+
raise PersistentConnectionClosedOK("Socket reader received end of stream.")
102+
return self.decode_rpc_response(data)
116103

117104
# -- private methods -- #
118105

@@ -131,10 +118,14 @@ async def _socket_send(self, request_data: bytes) -> None:
131118
async def _reset_socket(self) -> None:
132119
self._writer.close()
133120
await self._writer.wait_closed()
134-
self._reader, self._writer = await async_get_ipc_socket(self.ipc_path)
121+
self._reader, self._writer = await async_get_ipc_socket(
122+
self.ipc_path, self.read_buffer_limit
123+
)
135124

136125
async def _provider_specific_connect(self) -> None:
137-
self._reader, self._writer = await async_get_ipc_socket(self.ipc_path)
126+
self._reader, self._writer = await async_get_ipc_socket(
127+
self.ipc_path, self.read_buffer_limit
128+
)
138129

139130
async def _provider_specific_disconnect(self) -> None:
140131
if self._writer and not self._writer.is_closing():
@@ -149,5 +140,3 @@ async def _provider_specific_socket_reader(self) -> RPCResponse:
149140

150141
def _error_log_listener_task_exception(self, e: Exception) -> None:
151142
super()._error_log_listener_task_exception(e)
152-
# reset the raw message buffer on exception when error logging
153-
self._raw_message = ""

0 commit comments

Comments
 (0)