@@ -273,32 +273,36 @@ fn test_multiple_frames_parsing() {
273
273
/// - Window update mechanism
274
274
#[ test]
275
275
fn test_flow_control ( ) {
276
- // TODO: This test is incomplete:
277
- // - Doesn't verify pending queue behavior when window is full
278
- // - Should test frame redelivery when window becomes available
279
- // - Needs to verify window update triggers for pending frames
280
- // - Should test behavior when pending queue reaches limit
281
- let mut state = P2pNetworkYamuxState :: default ( ) ;
282
- let stream_id = 1 ;
283
-
284
276
let mut stream = YamuxStreamState :: default ( ) ;
285
277
stream. window_theirs = 10 ; // Small window for testing
286
- state. streams . insert ( stream_id, stream) ;
287
278
288
279
// Create frame larger than window
289
- let large_data = vec ! [ 1 ; 20 ] ;
290
- let mut frame = YamuxFrame {
280
+ let large_frame = YamuxFrame {
291
281
flags : YamuxFlags :: empty ( ) ,
292
- stream_id,
293
- inner : YamuxFrameInner :: Data ( Data :: from ( large_data ) ) ,
282
+ stream_id : 1 ,
283
+ inner : YamuxFrameInner :: Data ( Data :: from ( vec ! [ 1 ; 20 ] ) ) ,
294
284
} ;
295
285
296
- // Check if frame gets split according to window size
297
- let split_frame = frame. split_at ( 10 ) ;
298
- assert ! ( split_frame. is_some( ) ) ;
286
+ // Test frame splitting and queueing
287
+ let ( accepted, _) = stream. queue_frame ( large_frame, Limit :: Some ( 1000 ) ) ;
288
+ assert ! ( accepted. is_some( ) ) ;
289
+ assert_eq ! ( stream. pending. len( ) , 1 ) ;
290
+ assert_eq ! ( stream. window_theirs, 0 ) ;
299
291
300
- let stream = state. streams . get ( & stream_id) . unwrap ( ) ;
301
- assert_eq ! ( stream. window_theirs, 10 ) ;
292
+ // Test window update triggering incoming data
293
+ let data_frame = YamuxFrame {
294
+ flags : YamuxFlags :: empty ( ) ,
295
+ stream_id : 1 ,
296
+ inner : YamuxFrameInner :: Data ( Data :: from ( vec ! [ 1 ; ( INITIAL_WINDOW_SIZE / 2 + 1 ) as usize ] ) ) ,
297
+ } ;
298
+
299
+ let update = stream. process_incoming_data ( & data_frame) ;
300
+ assert ! ( update. is_some( ) ) ; // Should trigger update since window_ours < max_window_size/2
301
+
302
+ // Test pending frames get sent when window is updated
303
+ let sendable = stream. update_remote_window ( 15 ) ;
304
+ assert_eq ! ( sendable. len( ) , 1 ) ;
305
+ assert ! ( stream. pending. is_empty( ) ) ;
302
306
}
303
307
304
308
/// Tests invalid flag combinations
0 commit comments