@@ -107,69 +107,21 @@ impl P2pNetworkYamuxState {
107
107
Ok ( ( ) )
108
108
}
109
109
P2pNetworkYamuxAction :: IncomingFrame { addr } => {
110
- let mut pending_outgoing = VecDeque :: default ( ) ;
111
110
let Some ( frame) = yamux_state. incoming . pop_front ( ) else {
112
111
bug_condition ! (
113
112
"Frame not found for action `P2pNetworkYamuxAction::IncomingFrame`"
114
113
) ;
115
114
return Ok ( ( ) ) ;
116
115
} ;
117
116
118
- if frame. flags . contains ( YamuxFlags :: SYN ) {
119
- yamux_state
120
- . streams
121
- . insert ( frame. stream_id , YamuxStreamState :: incoming ( ) ) ;
122
-
123
- if frame. stream_id != 0 {
124
- connection_state. streams . insert (
125
- frame. stream_id ,
126
- P2pNetworkStreamState :: new_incoming ( meta. time ( ) ) ,
127
- ) ;
128
- }
129
- }
130
- if frame. flags . contains ( YamuxFlags :: ACK ) {
131
- yamux_state
132
- . streams
133
- . entry ( frame. stream_id )
134
- . or_default ( )
135
- . established = true ;
136
- }
117
+ YamuxStreamState :: handle_frame_syn_ack_flags (
118
+ & mut yamux_state. streams ,
119
+ & mut connection_state. streams ,
120
+ & frame,
121
+ meta. time ( ) ,
122
+ ) ;
137
123
138
- match & frame. inner {
139
- YamuxFrameInner :: Data ( _) => {
140
- if let Some ( stream) = yamux_state. streams . get_mut ( & frame. stream_id ) {
141
- // must not underflow
142
- // TODO: check it and disconnect peer that violates flow rules
143
- stream. window_ours =
144
- stream. window_ours . saturating_sub ( frame. len_as_u32 ( ) ) ;
145
- }
146
- }
147
- YamuxFrameInner :: WindowUpdate { difference } => {
148
- let stream = yamux_state
149
- . streams
150
- . entry ( frame. stream_id )
151
- . or_insert_with ( YamuxStreamState :: incoming) ;
152
-
153
- stream. window_theirs = stream. window_theirs . saturating_add ( * difference) ;
154
-
155
- if * difference > 0 {
156
- // have some fresh space in the window
157
- // try send as many frames as can
158
- let mut window = stream. window_theirs ;
159
- while let Some ( frame) = stream. pending . pop_front ( ) {
160
- let len = frame. len_as_u32 ( ) ;
161
- pending_outgoing. push_back ( frame) ;
162
- if let Some ( new_window) = window. checked_sub ( len) {
163
- window = new_window;
164
- } else {
165
- break ;
166
- }
167
- }
168
- }
169
- }
170
- YamuxFrameInner :: Ping { .. } => { }
171
- YamuxFrameInner :: GoAway ( res) => yamux_state. set_res ( * res) ,
172
- }
124
+ let mut pending_outgoing = yamux_state. handle_frame_message ( & frame) ;
173
125
174
126
let ( dispatcher, state) = state_context. into_dispatcher_and_state ( ) ;
175
127
let limits: & P2pLimits = state. substate ( ) ?;
@@ -368,4 +320,45 @@ impl P2pNetworkYamuxState {
368
320
}
369
321
}
370
322
}
323
+
324
+ fn handle_frame_message ( & mut self , frame : & YamuxFrame ) -> VecDeque < YamuxFrame > {
325
+ let mut pending_outgoing = VecDeque :: default ( ) ;
326
+
327
+ match & frame. inner {
328
+ YamuxFrameInner :: Data ( _) => {
329
+ if let Some ( stream) = self . streams . get_mut ( & frame. stream_id ) {
330
+ // must not underflow
331
+ // TODO: check it and disconnect peer that violates flow rules
332
+ stream. window_ours = stream. window_ours . saturating_sub ( frame. len_as_u32 ( ) ) ;
333
+ }
334
+ }
335
+ YamuxFrameInner :: WindowUpdate { difference } => {
336
+ let stream = self
337
+ . streams
338
+ . entry ( frame. stream_id )
339
+ . or_insert_with ( YamuxStreamState :: incoming) ;
340
+
341
+ stream. window_theirs = stream. window_theirs . saturating_add ( * difference) ;
342
+
343
+ if * difference > 0 {
344
+ // have some fresh space in the window
345
+ // try send as many frames as can
346
+ let mut window = stream. window_theirs ;
347
+ while let Some ( frame) = stream. pending . pop_front ( ) {
348
+ let len = frame. len_as_u32 ( ) ;
349
+ pending_outgoing. push_back ( frame) ;
350
+ if let Some ( new_window) = window. checked_sub ( len) {
351
+ window = new_window;
352
+ } else {
353
+ break ;
354
+ }
355
+ }
356
+ }
357
+ }
358
+ YamuxFrameInner :: Ping { .. } => { }
359
+ YamuxFrameInner :: GoAway ( res) => self . set_res ( * res) ,
360
+ }
361
+
362
+ pending_outgoing
363
+ }
371
364
}
0 commit comments