Skip to content

Commit e6f96d3

Browse files
authored
Merge pull request #640 from kaneki003/main
Identifying & resolving race-around conditions in YamuxStream
2 parents 8d2b889 + 51313a5 commit e6f96d3

File tree

4 files changed

+507
-58
lines changed

4 files changed

+507
-58
lines changed

libp2p/stream_muxer/yamux/yamux.py

Lines changed: 107 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -98,16 +98,32 @@ async def write(self, data: bytes) -> None:
9898
# Flow control: Check if we have enough send window
9999
total_len = len(data)
100100
sent = 0
101-
101+
logging.debug(f"Stream {self.stream_id}: Starts writing {total_len} bytes ")
102102
while sent < total_len:
103+
# Wait for available window with timeout
104+
timeout = False
103105
async with self.window_lock:
104-
# Wait for available window
105-
while self.send_window == 0 and not self.closed:
106-
# Release lock while waiting
106+
if self.send_window == 0:
107+
logging.debug(
108+
f"Stream {self.stream_id}: Window is zero, waiting for update"
109+
)
110+
# Release lock and wait with timeout
107111
self.window_lock.release()
108-
await trio.sleep(0.01)
112+
# To avoid re-acquiring the lock immediately,
113+
with trio.move_on_after(5.0) as cancel_scope:
114+
while self.send_window == 0 and not self.closed:
115+
await trio.sleep(0.01)
116+
# If we timed out, cancel the scope
117+
timeout = cancel_scope.cancelled_caught
118+
# Re-acquire lock
109119
await self.window_lock.acquire()
110120

121+
# If we timed out waiting for window update, raise an error
122+
if timeout:
123+
raise MuxedStreamError(
124+
"Timed out waiting for window update after 5 seconds."
125+
)
126+
111127
if self.closed:
112128
raise MuxedStreamError("Stream is closed")
113129

@@ -123,25 +139,45 @@ async def write(self, data: bytes) -> None:
123139
await self.conn.secured_conn.write(header + chunk)
124140
sent += to_send
125141

126-
# If window is getting low, consider updating
127-
if self.send_window < DEFAULT_WINDOW_SIZE // 2:
128-
await self.send_window_update()
129-
130-
async def send_window_update(self, increment: int | None = None) -> None:
131-
"""Send a window update to peer."""
132-
if increment is None:
133-
increment = DEFAULT_WINDOW_SIZE - self.recv_window
134-
142+
async def send_window_update(self, increment: int, skip_lock: bool = False) -> None:
143+
"""
144+
Send a window update to peer.
145+
146+
param:increment: The amount to increment the window size by.
147+
If None, uses the difference between DEFAULT_WINDOW_SIZE
148+
and current receive window.
149+
param:skip_lock (bool): If True, skips acquiring window_lock.
150+
This should only be used when calling from a context
151+
that already holds the lock.
152+
"""
135153
if increment <= 0:
154+
# If increment is zero or negative, skip sending update
155+
logging.debug(
156+
f"Stream {self.stream_id}: Skipping window update"
157+
f"(increment={increment})"
158+
)
136159
return
160+
logging.debug(
161+
f"Stream {self.stream_id}: Sending window update with increment={increment}"
162+
)
137163

138-
async with self.window_lock:
139-
self.recv_window += increment
164+
async def _do_window_update() -> None:
140165
header = struct.pack(
141-
YAMUX_HEADER_FORMAT, 0, TYPE_WINDOW_UPDATE, 0, self.stream_id, increment
166+
YAMUX_HEADER_FORMAT,
167+
0,
168+
TYPE_WINDOW_UPDATE,
169+
0,
170+
self.stream_id,
171+
increment,
142172
)
143173
await self.conn.secured_conn.write(header)
144174

175+
if skip_lock:
176+
await _do_window_update()
177+
else:
178+
async with self.window_lock:
179+
await _do_window_update()
180+
145181
async def read(self, n: int | None = -1) -> bytes:
146182
# Handle None value for n by converting it to -1
147183
if n is None:
@@ -154,55 +190,68 @@ async def read(self, n: int | None = -1) -> bytes:
154190
)
155191
raise MuxedStreamEOF("Stream is closed for receiving")
156192

157-
# If reading until EOF (n == -1), block until stream is closed
158193
if n == -1:
159-
while not self.recv_closed and not self.conn.event_shutting_down.is_set():
194+
data = b""
195+
while not self.conn.event_shutting_down.is_set():
160196
# Check if there's data in the buffer
161197
buffer = self.conn.stream_buffers.get(self.stream_id)
162-
if buffer and len(buffer) > 0:
163-
# Wait for closure even if data is available
164-
logging.debug(
165-
f"Stream {self.stream_id}:Waiting for FIN before returning data"
166-
)
167-
await self.conn.stream_events[self.stream_id].wait()
168-
self.conn.stream_events[self.stream_id] = trio.Event()
169-
else:
170-
# No data, wait for data or closure
171-
logging.debug(f"Stream {self.stream_id}: Waiting for data or FIN")
172-
await self.conn.stream_events[self.stream_id].wait()
173-
self.conn.stream_events[self.stream_id] = trio.Event()
174-
175-
# After loop, check if stream is closed or shutting down
176-
async with self.conn.streams_lock:
177-
if self.conn.event_shutting_down.is_set():
178-
logging.debug(f"Stream {self.stream_id}: Connection shutting down")
179-
raise MuxedStreamEOF("Connection shut down")
180-
if self.closed:
181-
if self.reset_received:
182-
logging.debug(f"Stream {self.stream_id}: Stream was reset")
183-
raise MuxedStreamReset("Stream was reset")
184-
else:
185-
logging.debug(
186-
f"Stream {self.stream_id}: Stream closed cleanly (EOF)"
187-
)
188-
raise MuxedStreamEOF("Stream closed cleanly (EOF)")
189-
buffer = self.conn.stream_buffers.get(self.stream_id)
198+
199+
# If buffer is not available, check if stream is closed
190200
if buffer is None:
191-
logging.debug(
192-
f"Stream {self.stream_id}: Buffer gone, assuming closed"
193-
)
201+
logging.debug(f"Stream {self.stream_id}: No buffer available")
194202
raise MuxedStreamEOF("Stream buffer closed")
203+
204+
# If we have data in buffer, process it
205+
if len(buffer) > 0:
206+
chunk = bytes(buffer)
207+
buffer.clear()
208+
data += chunk
209+
210+
# Send window update for the chunk we just read
211+
async with self.window_lock:
212+
self.recv_window += len(chunk)
213+
logging.debug(f"Stream {self.stream_id}: Update {len(chunk)}")
214+
await self.send_window_update(len(chunk), skip_lock=True)
215+
216+
# If stream is closed (FIN received) and buffer is empty, break
195217
if self.recv_closed and len(buffer) == 0:
196-
logging.debug(f"Stream {self.stream_id}: EOF reached")
197-
raise MuxedStreamEOF("Stream is closed for receiving")
198-
# Return all buffered data
199-
data = bytes(buffer)
200-
buffer.clear()
201-
logging.debug(f"Stream {self.stream_id}: Returning {len(data)} bytes")
218+
logging.debug(f"Stream {self.stream_id}: Closed with empty buffer")
219+
break
220+
221+
# If stream was reset, raise reset error
222+
if self.reset_received:
223+
logging.debug(f"Stream {self.stream_id}: Stream was reset")
224+
raise MuxedStreamReset("Stream was reset")
225+
226+
# Wait for more data or stream closure
227+
logging.debug(f"Stream {self.stream_id}: Waiting for data or FIN")
228+
await self.conn.stream_events[self.stream_id].wait()
229+
self.conn.stream_events[self.stream_id] = trio.Event()
230+
231+
# After loop exit, first check if we have data to return
232+
if data:
233+
logging.debug(
234+
f"Stream {self.stream_id}: Returning {len(data)} bytes after loop"
235+
)
202236
return data
203237

204-
# For specific size read (n > 0), return available data immediately
205-
return await self.conn.read_stream(self.stream_id, n)
238+
# No data accumulated, now check why we exited the loop
239+
if self.conn.event_shutting_down.is_set():
240+
logging.debug(f"Stream {self.stream_id}: Connection shutting down")
241+
raise MuxedStreamEOF("Connection shut down")
242+
243+
# Return empty data
244+
return b""
245+
else:
246+
data = await self.conn.read_stream(self.stream_id, n)
247+
async with self.window_lock:
248+
self.recv_window += len(data)
249+
logging.debug(
250+
f"Stream {self.stream_id}: Sending window update after read, "
251+
f"increment={len(data)}"
252+
)
253+
await self.send_window_update(len(data), skip_lock=True)
254+
return data
206255

207256
async def close(self) -> None:
208257
if not self.send_closed:

newsfragments/639.feature.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
Fixed several flow-control and concurrency issues in the `YamuxStream` class. Previously, stress-testing revealed that transferring data over `DEFAULT_WINDOW_SIZE` would break the stream due to inconsistent window update handling and lock management. The fixes include:
2+
3+
- Removed sending of window updates during writes to maintain correct flow-control.
4+
- Added proper timeout handling when releasing and acquiring locks to prevent concurrency errors.
5+
- Corrected the `read` function to properly handle window updates for both `read_until_EOF` and `read_n_bytes`.
6+
- Added event logging at `send_window_updates` and `waiting_for_window_updates` for better observability.

0 commit comments

Comments
 (0)