@@ -141,9 +141,7 @@ async def write(self, data: bytes) -> None:
141
141
await self .conn .secured_conn .write (header + chunk )
142
142
sent += to_send
143
143
144
- async def send_window_update (
145
- self , increment : int | None , skip_lock : bool = False
146
- ) -> None :
144
+ async def send_window_update (self , increment : int , skip_lock : bool = False ) -> None :
147
145
"""
148
146
Send a window update to peer.
149
147
@@ -154,12 +152,7 @@ async def send_window_update(
154
152
This should only be used when calling from a context
155
153
that already holds the lock.
156
154
"""
157
- increment_value = 0
158
- if increment is None :
159
- increment_value = DEFAULT_WINDOW_SIZE - self .recv_window
160
- else :
161
- increment_value = increment
162
- if increment_value <= 0 :
155
+ if increment <= 0 :
163
156
# If increment is zero or negative, skip sending update
164
157
logging .debug (
165
158
f"Stream { self .stream_id } : Skipping window update"
@@ -171,14 +164,13 @@ async def send_window_update(
171
164
)
172
165
173
166
async def _do_window_update () -> None :
174
- self .recv_window += increment_value
175
167
header = struct .pack (
176
168
YAMUX_HEADER_FORMAT ,
177
169
0 ,
178
170
TYPE_WINDOW_UPDATE ,
179
171
0 ,
180
172
self .stream_id ,
181
- increment_value ,
173
+ increment ,
182
174
)
183
175
await self .conn .secured_conn .write (header )
184
176
@@ -188,6 +180,22 @@ async def _do_window_update() -> None:
188
180
async with self .window_lock :
189
181
await _do_window_update ()
190
182
183
+ async def read_EOF (self ) -> bytes :
184
+ """
185
+ To read data from stream until it is closed.
186
+ """
187
+ data = b""
188
+ try :
189
+ while True :
190
+ recv = await self .read ()
191
+ if recv :
192
+ data += recv
193
+ except MuxedStreamEOF :
194
+ logging .debug (
195
+ f"Stream { self .stream_id } :EOF reached,total data read:{ len (data )} bytes"
196
+ )
197
+ return data
198
+
191
199
async def read (self , n : int | None = - 1 ) -> bytes :
192
200
# Handle None value for n by converting it to -1
193
201
if n is None :
@@ -202,61 +210,34 @@ async def read(self, n: int | None = -1) -> bytes:
202
210
203
211
# If reading until EOF (n == -1), block until stream is closed
204
212
if n == - 1 :
205
- while not self .recv_closed and not self .conn .event_shutting_down .is_set ():
206
- # Check if there's data in the buffer
207
- buffer = self .conn .stream_buffers .get (self .stream_id )
208
- if buffer and len (buffer ) > 0 :
209
- # Wait for closure even if data is available
210
- logging .debug (
211
- f"Stream { self .stream_id } :Waiting for FIN before returning data"
212
- )
213
- await self .conn .stream_events [self .stream_id ].wait ()
214
- self .conn .stream_events [self .stream_id ] = trio .Event ()
215
- else :
216
- # No data, wait for data or closure
217
- logging .debug (f"Stream { self .stream_id } : Waiting for data or FIN" )
218
- await self .conn .stream_events [self .stream_id ].wait ()
219
- self .conn .stream_events [self .stream_id ] = trio .Event ()
220
-
221
- # After loop, check if stream is closed or shutting down
222
- async with self .conn .streams_lock :
223
- if self .conn .event_shutting_down .is_set ():
224
- logging .debug (f"Stream { self .stream_id } : Connection shutting down" )
225
- raise MuxedStreamEOF ("Connection shut down" )
226
- if self .closed :
227
- if self .reset_received :
228
- logging .debug (f"Stream { self .stream_id } : Stream was reset" )
229
- raise MuxedStreamReset ("Stream was reset" )
230
- else :
231
- logging .debug (
232
- f"Stream { self .stream_id } : Stream closed cleanly (EOF)"
233
- )
234
- raise MuxedStreamEOF ("Stream closed cleanly (EOF)" )
235
- buffer = self .conn .stream_buffers .get (self .stream_id )
236
- if buffer is None :
237
- logging .debug (
238
- f"Stream { self .stream_id } : Buffer gone, assuming closed"
239
- )
240
- raise MuxedStreamEOF ("Stream buffer closed" )
241
- if self .recv_closed and len (buffer ) == 0 :
242
- logging .debug (f"Stream { self .stream_id } : EOF reached" )
243
- raise MuxedStreamEOF ("Stream is closed for receiving" )
244
- # Return all buffered data
213
+ # Check if there's data in the buffer
214
+ buffer = self .conn .stream_buffers .get (self .stream_id )
215
+ size = len (buffer ) if buffer else 0
216
+ if size > 0 :
217
+ # If any data is available,return it immediately
218
+ assert buffer is not None
245
219
data = bytes (buffer )
246
220
buffer .clear ()
247
- return data
248
-
249
- data = await self .conn .read_stream (self .stream_id , n )
250
- async with self .window_lock :
251
- self .recv_window -= len (data )
252
- # Automatically send a window update if recv_window is low
253
- if self .recv_window <= DEFAULT_WINDOW_SIZE // 2 :
221
+ async with self .window_lock :
222
+ self .recv_window += len (data )
223
+ await self .send_window_update (len (data ), skip_lock = True )
224
+ return data
225
+ # Otherwise,wait for data or FIN
226
+ if self .recv_closed :
227
+ raise MuxedStreamEOF ("Stream is closed for receiving" )
228
+ await self .conn .stream_events [self .stream_id ].wait ()
229
+ self .conn .stream_events [self .stream_id ] = trio .Event ()
230
+ return b""
231
+ else :
232
+ data = await self .conn .read_stream (self .stream_id , n )
233
+ async with self .window_lock :
234
+ self .recv_window += len (data )
254
235
logging .debug (
255
- f"Stream { self .stream_id } : "
256
- f"Low recv_window ( { self . recv_window } ), sending update "
236
+ f"Stream { self .stream_id } : Sending window update after read, "
237
+ f"increment= { len ( data ) } "
257
238
)
258
- await self .send_window_update (None , skip_lock = True )
259
- return data
239
+ await self .send_window_update (len ( data ) , skip_lock = True )
240
+ return data
260
241
261
242
async def close (self ) -> None :
262
243
if not self .send_closed :
0 commit comments