Skip to content

Commit 724375e

Browse files
committed
updated doc-string and reverted mplex-changes
1 parent d7cdae8 commit 724375e

File tree

6 files changed

+56
-187
lines changed

6 files changed

+56
-187
lines changed

libp2p/stream_muxer/mplex/mplex_stream.py

Lines changed: 48 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,8 @@ class MplexStream(IMuxedStream):
4646
read_deadline: int | None
4747
write_deadline: int | None
4848

49+
# TODO: Add lock for read/write to avoid interleaving receiving messages?
4950
close_lock: trio.Lock
50-
read_lock: trio.Lock
51-
write_lock: trio.Lock
5251

5352
# NOTE: `dataIn` is size of 8 in Go implementation.
5453
incoming_data_channel: "trio.MemoryReceiveChannel[bytes]"
@@ -81,8 +80,6 @@ def __init__(
8180
self.event_remote_closed = trio.Event()
8281
self.event_reset = trio.Event()
8382
self.close_lock = trio.Lock()
84-
self.read_lock = trio.Lock()
85-
self.write_lock = trio.Lock()
8683
self.incoming_data_channel = incoming_data_channel
8784
self._buf = bytearray()
8885

@@ -116,65 +113,63 @@ async def read(self, n: int | None = None) -> bytes:
116113
:param n: number of bytes to read
117114
:return: bytes actually read
118115
"""
119-
async with self.read_lock:
120-
if n is not None and n < 0:
121-
raise ValueError(
122-
"the number of bytes to read `n` must be non-negative or "
123-
f"`None` to indicate read until EOF, got n={n}"
124-
)
125-
if self.event_reset.is_set():
126-
raise MplexStreamReset
127-
if n is None:
128-
return await self._read_until_eof()
129-
if len(self._buf) == 0:
130-
data: bytes
131-
# Peek whether there is data available. If yes, we just read until
132-
# there is no data, then return.
116+
if n is not None and n < 0:
117+
raise ValueError(
118+
"the number of bytes to read `n` must be non-negative or "
119+
f"`None` to indicate read until EOF, got n={n}"
120+
)
121+
if self.event_reset.is_set():
122+
raise MplexStreamReset
123+
if n is None:
124+
return await self._read_until_eof()
125+
if len(self._buf) == 0:
126+
data: bytes
127+
# Peek whether there is data available. If yes, we just read until there is
128+
# no data, then return.
129+
try:
130+
data = self.incoming_data_channel.receive_nowait()
131+
self._buf.extend(data)
132+
except trio.EndOfChannel:
133+
raise MplexStreamEOF
134+
except trio.WouldBlock:
135+
# We know `receive` will be blocked here. Wait for data here with
136+
# `receive` and catch all kinds of errors here.
133137
try:
134-
data = self.incoming_data_channel.receive_nowait()
138+
data = await self.incoming_data_channel.receive()
135139
self._buf.extend(data)
136140
except trio.EndOfChannel:
137-
raise MplexStreamEOF
138-
except trio.WouldBlock:
139-
# We know `receive` will be blocked here. Wait for data here with
140-
# `receive` and catch all kinds of errors here.
141-
try:
142-
data = await self.incoming_data_channel.receive()
143-
self._buf.extend(data)
144-
except trio.EndOfChannel:
145-
if self.event_reset.is_set():
146-
raise MplexStreamReset
147-
if self.event_remote_closed.is_set():
148-
raise MplexStreamEOF
149-
except trio.ClosedResourceError as error:
150-
# Probably `incoming_data_channel` is closed in `reset` when
151-
# we are waiting for `receive`.
152-
if self.event_reset.is_set():
153-
raise MplexStreamReset
154-
raise Exception(
155-
"`incoming_data_channel` is closed but stream is not reset."
156-
"This should never happen."
157-
) from error
158-
self._buf.extend(self._read_return_when_blocked())
159-
payload = self._buf[:n]
160-
self._buf = self._buf[len(payload) :]
161-
return bytes(payload)
141+
if self.event_reset.is_set():
142+
raise MplexStreamReset
143+
if self.event_remote_closed.is_set():
144+
raise MplexStreamEOF
145+
except trio.ClosedResourceError as error:
146+
# Probably `incoming_data_channel` is closed in `reset` when we are
147+
# waiting for `receive`.
148+
if self.event_reset.is_set():
149+
raise MplexStreamReset
150+
raise Exception(
151+
"`incoming_data_channel` is closed but stream is not reset. "
152+
"This should never happen."
153+
) from error
154+
self._buf.extend(self._read_return_when_blocked())
155+
payload = self._buf[:n]
156+
self._buf = self._buf[len(payload) :]
157+
return bytes(payload)
162158

163159
async def write(self, data: bytes) -> None:
164160
"""
165161
Write to stream.
166162
167163
:return: number of bytes written
168164
"""
169-
async with self.write_lock:
170-
if self.event_local_closed.is_set():
171-
raise MplexStreamClosed(f"cannot write to closed stream: data={data!r}")
172-
flag = (
173-
HeaderTags.MessageInitiator
174-
if self.is_initiator
175-
else HeaderTags.MessageReceiver
176-
)
177-
await self.muxed_conn.send_message(flag, data, self.stream_id)
165+
if self.event_local_closed.is_set():
166+
raise MplexStreamClosed(f"cannot write to closed stream: data={data!r}")
167+
flag = (
168+
HeaderTags.MessageInitiator
169+
if self.is_initiator
170+
else HeaderTags.MessageReceiver
171+
)
172+
await self.muxed_conn.send_message(flag, data, self.stream_id)
178173

179174
async def close(self) -> None:
180175
"""

libp2p/stream_muxer/yamux/yamux.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,6 @@ def __init__(self, stream_id: int, conn: "Yamux", is_initiator: bool) -> None:
7777
self.send_window = DEFAULT_WINDOW_SIZE
7878
self.recv_window = DEFAULT_WINDOW_SIZE
7979
self.window_lock = trio.Lock()
80-
self.read_lock = trio.Lock()
81-
self.write_lock = trio.Lock()
8280

8381
async def __aenter__(self) -> "YamuxStream":
8482
"""Enter the async context manager."""

newsfragments/639.feature.rst

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,6 @@
1-
Added separate read and write locks to the `MplexStream` & `YamuxStream` class.This ensures thread-safe access and data integrity when multiple coroutines interact with the same MplexStream instance.
1+
Fixed several flow-control and concurrency issues in the `YamuxStream` class. Previously, stress-testing revealed that transferring data over `DEFAULT_WINDOW_SIZE` would break the stream due to inconsistent window update handling and lock management. The fixes include:
2+
3+
- Removed sending of window updates during writes to maintain correct flow-control.
4+
- Added proper timeout handling when releasing and acquiring locks to prevent concurrency errors.
5+
- Corrected the `read` function to properly handle window updates for both `read_until_EOF` and `read_n_bytes`.
6+
- Added event logging at `send_window_updates` and `waiting_for_window_updates` for better observability.

tests/core/stream_muxer/test_mplex_read_write_lock.py

Lines changed: 0 additions & 127 deletions
This file was deleted.

tests/core/stream_muxer/test_yamux.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -224,16 +224,14 @@ async def test_yamux_stream_reset(yamux_pair):
224224
await client_stream.reset()
225225
# After reset, reading should raise MuxedStreamReset or MuxedStreamEOF
226226
try:
227-
while True:
228-
await server_stream.read()
227+
await server_stream.read()
229228
except (MuxedStreamEOF, MuxedStreamError):
230229
pass
231230
else:
232231
pytest.fail("Expected MuxedStreamEOF or MuxedStreamError")
233232
# Verify subsequent operations fail with StreamReset or EOF
234233
with pytest.raises(MuxedStreamError):
235-
while True:
236-
await server_stream.read()
234+
await server_stream.read()
237235
with pytest.raises(MuxedStreamError):
238236
await server_stream.write(b"test")
239237
logging.debug("test_yamux_stream_reset complete")

0 commit comments

Comments
 (0)