@@ -4,9 +4,9 @@ use openmina_core::{bug_condition, fuzz_maybe, fuzzed_maybe, Substate, SubstateA
4
4
5
5
use crate :: {
6
6
yamux:: p2p_network_yamux_state:: { YamuxFrame , YamuxFrameInner } ,
7
- Data , Limit , P2pLimits , P2pNetworkAuthState , P2pNetworkConnectionError ,
8
- P2pNetworkConnectionMuxState , P2pNetworkNoiseAction , P2pNetworkSchedulerAction ,
9
- P2pNetworkSchedulerState , P2pNetworkSelectAction , P2pNetworkStreamState , SelectKind ,
7
+ Data , P2pLimits , P2pNetworkAuthState , P2pNetworkConnectionError , P2pNetworkConnectionMuxState ,
8
+ P2pNetworkNoiseAction , P2pNetworkSchedulerAction , P2pNetworkSchedulerState ,
9
+ P2pNetworkSelectAction , P2pNetworkStreamState , SelectKind ,
10
10
} ;
11
11
12
12
use super :: {
@@ -173,26 +173,19 @@ impl P2pNetworkYamuxState {
173
173
174
174
let ( dispatcher, state) = state_context. into_dispatcher_and_state ( ) ;
175
175
let limits: & P2pLimits = state. substate ( ) ?;
176
- let max_streams = limits. max_streams ( ) ;
177
176
let connection_state =
178
177
<State as SubstateAccess < P2pNetworkSchedulerState > >:: substate ( state) ?
179
178
. connection_state ( & addr)
180
179
. ok_or_else ( || format ! ( "Connection not found {}" , addr) ) ?;
181
180
182
- let stream = connection_state
183
- . yamux_state ( )
184
- . and_then ( |yamux_state| yamux_state. streams . get ( & frame. stream_id ) )
181
+ let stream = connection_state. get_yamux_stream ( frame. stream_id )
185
182
. ok_or_else ( || format ! ( "Stream with id {} not found for `P2pNetworkYamuxAction::IncomingFrame`" , frame. stream_id) ) ?;
186
183
187
- let peer_id = match connection_state
188
- . auth
189
- . as_ref ( )
190
- . and_then ( |P2pNetworkAuthState :: Noise ( noise) | noise. peer_id ( ) )
191
- {
192
- Some ( peer_id) => * peer_id,
193
- None => return Ok ( ( ) ) ,
184
+ let Some ( peer_id) = connection_state. peer_id ( ) . cloned ( ) else {
185
+ return Ok ( ( ) ) ;
194
186
} ;
195
187
188
+ // connection was reset by the peer
196
189
if frame. flags . contains ( YamuxFlags :: RST ) {
197
190
dispatcher. push ( P2pNetworkSchedulerAction :: Error {
198
191
addr,
@@ -201,34 +194,26 @@ impl P2pNetworkYamuxState {
201
194
return Ok ( ( ) ) ;
202
195
}
203
196
197
+ // if the peer tries to open more streams than allowed, close the stream
204
198
if frame. flags . contains ( YamuxFlags :: SYN ) && frame. stream_id != 0 {
205
- // count incoming streams
206
- let incoming_streams_number = connection_state
207
- . streams
208
- . values ( )
209
- . filter ( |s| s. select . is_incoming ( ) )
210
- . count ( ) ;
211
-
212
- match ( max_streams, incoming_streams_number) {
213
- ( Limit :: Some ( limit) , actual) if actual > limit => {
214
- dispatcher. push ( P2pNetworkYamuxAction :: OutgoingFrame {
215
- addr,
216
- frame : YamuxFrame {
217
- flags : YamuxFlags :: FIN ,
218
- stream_id : frame. stream_id ,
219
- inner : YamuxFrameInner :: Data ( vec ! [ ] . into ( ) ) ,
220
- } ,
221
- } ) ;
222
- }
223
- _ => {
224
- dispatcher. push ( P2pNetworkSelectAction :: Init {
225
- addr,
226
- kind : SelectKind :: Stream ( peer_id, frame. stream_id ) ,
227
- incoming : true ,
228
- } ) ;
229
- }
199
+ if limits. max_streams ( ) >= connection_state. incoming_streams_count ( ) {
200
+ dispatcher. push ( P2pNetworkSelectAction :: Init {
201
+ addr,
202
+ kind : SelectKind :: Stream ( peer_id, frame. stream_id ) ,
203
+ incoming : true ,
204
+ } ) ;
205
+ } else {
206
+ dispatcher. push ( P2pNetworkYamuxAction :: OutgoingFrame {
207
+ addr,
208
+ frame : YamuxFrame {
209
+ flags : YamuxFlags :: RST ,
210
+ stream_id : frame. stream_id ,
211
+ inner : YamuxFrameInner :: Data ( vec ! [ ] . into ( ) ) ,
212
+ } ,
213
+ } ) ;
230
214
}
231
215
}
216
+
232
217
match & frame. inner {
233
218
YamuxFrameInner :: Data ( data) => {
234
219
// when our window size is less than half of the max window size send window update
0 commit comments