Skip to content

Commit 6bd36e8

Browse files
committed
tweak(yamux): Simplify reducer a bit
1 parent 30774b9 commit 6bd36e8

File tree

3 files changed

+39
-41
lines changed

3 files changed

+39
-41
lines changed

p2p/src/network/scheduler/p2p_network_scheduler_state.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ use malloc_size_of_derive::MallocSizeOf;
88
use redux::Timestamp;
99
use serde::{Deserialize, Serialize};
1010

11-
use crate::{disconnection::P2pDisconnectionReason, identity::PublicKey, PeerId};
11+
use crate::{
12+
disconnection::P2pDisconnectionReason, identity::PublicKey, yamux::YamuxStreamState, PeerId,
13+
};
1214

1315
use super::super::*;
1416

@@ -183,6 +185,17 @@ impl P2pNetworkConnectionState {
183185
SelectKind::Stream(_, stream_id) => Some(&self.streams.get(stream_id)?.select),
184186
}
185187
}
188+
189+
pub fn get_yamux_stream(&self, stream_id: StreamId) -> Option<&YamuxStreamState> {
190+
self.yamux_state()?.streams.get(&stream_id)
191+
}
192+
193+
pub fn incoming_streams_count(&self) -> usize {
194+
self.streams
195+
.iter()
196+
.filter(|(_, state)| state.select.is_incoming())
197+
.count()
198+
}
186199
}
187200

188201
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, thiserror::Error)]

p2p/src/network/yamux/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ pub use self::p2p_network_yamux_actions::*;
33

44
mod p2p_network_yamux_state;
55
pub use self::p2p_network_yamux_state::{
6-
P2pNetworkYamuxState, StreamId, YamuxFlags, YamuxPing, YamuxStreamKind,
6+
P2pNetworkYamuxState, StreamId, YamuxFlags, YamuxPing, YamuxStreamKind, YamuxStreamState,
77
};
88

99
#[cfg(feature = "p2p-libp2p")]

p2p/src/network/yamux/p2p_network_yamux_reducer.rs

Lines changed: 24 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ use openmina_core::{bug_condition, fuzz_maybe, fuzzed_maybe, Substate, SubstateA
44

55
use crate::{
66
yamux::p2p_network_yamux_state::{YamuxFrame, YamuxFrameInner},
7-
Data, Limit, P2pLimits, P2pNetworkAuthState, P2pNetworkConnectionError,
8-
P2pNetworkConnectionMuxState, P2pNetworkNoiseAction, P2pNetworkSchedulerAction,
9-
P2pNetworkSchedulerState, P2pNetworkSelectAction, P2pNetworkStreamState, SelectKind,
7+
Data, P2pLimits, P2pNetworkAuthState, P2pNetworkConnectionError, P2pNetworkConnectionMuxState,
8+
P2pNetworkNoiseAction, P2pNetworkSchedulerAction, P2pNetworkSchedulerState,
9+
P2pNetworkSelectAction, P2pNetworkStreamState, SelectKind,
1010
};
1111

1212
use super::{
@@ -173,26 +173,19 @@ impl P2pNetworkYamuxState {
173173

174174
let (dispatcher, state) = state_context.into_dispatcher_and_state();
175175
let limits: &P2pLimits = state.substate()?;
176-
let max_streams = limits.max_streams();
177176
let connection_state =
178177
<State as SubstateAccess<P2pNetworkSchedulerState>>::substate(state)?
179178
.connection_state(&addr)
180179
.ok_or_else(|| format!("Connection not found {}", addr))?;
181180

182-
let stream = connection_state
183-
.yamux_state()
184-
.and_then(|yamux_state| yamux_state.streams.get(&frame.stream_id))
181+
let stream = connection_state.get_yamux_stream(frame.stream_id)
185182
.ok_or_else(|| format!("Stream with id {} not found for `P2pNetworkYamuxAction::IncomingFrame`", frame.stream_id))?;
186183

187-
let peer_id = match connection_state
188-
.auth
189-
.as_ref()
190-
.and_then(|P2pNetworkAuthState::Noise(noise)| noise.peer_id())
191-
{
192-
Some(peer_id) => *peer_id,
193-
None => return Ok(()),
184+
let Some(peer_id) = connection_state.peer_id().cloned() else {
185+
return Ok(());
194186
};
195187

188+
// connection was reset by the peer
196189
if frame.flags.contains(YamuxFlags::RST) {
197190
dispatcher.push(P2pNetworkSchedulerAction::Error {
198191
addr,
@@ -201,34 +194,26 @@ impl P2pNetworkYamuxState {
201194
return Ok(());
202195
}
203196

197+
// if the peer tries to open more streams than allowed, close the stream
204198
if frame.flags.contains(YamuxFlags::SYN) && frame.stream_id != 0 {
205-
// count incoming streams
206-
let incoming_streams_number = connection_state
207-
.streams
208-
.values()
209-
.filter(|s| s.select.is_incoming())
210-
.count();
211-
212-
match (max_streams, incoming_streams_number) {
213-
(Limit::Some(limit), actual) if actual > limit => {
214-
dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame {
215-
addr,
216-
frame: YamuxFrame {
217-
flags: YamuxFlags::FIN,
218-
stream_id: frame.stream_id,
219-
inner: YamuxFrameInner::Data(vec![].into()),
220-
},
221-
});
222-
}
223-
_ => {
224-
dispatcher.push(P2pNetworkSelectAction::Init {
225-
addr,
226-
kind: SelectKind::Stream(peer_id, frame.stream_id),
227-
incoming: true,
228-
});
229-
}
199+
if limits.max_streams() >= connection_state.incoming_streams_count() {
200+
dispatcher.push(P2pNetworkSelectAction::Init {
201+
addr,
202+
kind: SelectKind::Stream(peer_id, frame.stream_id),
203+
incoming: true,
204+
});
205+
} else {
206+
dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame {
207+
addr,
208+
frame: YamuxFrame {
209+
flags: YamuxFlags::RST,
210+
stream_id: frame.stream_id,
211+
inner: YamuxFrameInner::Data(vec![].into()),
212+
},
213+
});
230214
}
231215
}
216+
232217
match &frame.inner {
233218
YamuxFrameInner::Data(data) => {
234219
// when our window size is less than half of the max window size send window update

0 commit comments

Comments
 (0)