diff --git a/p2p/src/network/scheduler/p2p_network_scheduler_actions.rs b/p2p/src/network/scheduler/p2p_network_scheduler_actions.rs index 43263a09b..8c5140c67 100644 --- a/p2p/src/network/scheduler/p2p_network_scheduler_actions.rs +++ b/p2p/src/network/scheduler/p2p_network_scheduler_actions.rs @@ -72,6 +72,7 @@ pub enum P2pNetworkSchedulerAction { addr: ConnectionAddr, peer_id: PeerId, message_size_limit: Limit, + pending_outgoing_limit: Limit, }, /// Action that initiate the specified peer disconnection. diff --git a/p2p/src/network/scheduler/p2p_network_scheduler_reducer.rs b/p2p/src/network/scheduler/p2p_network_scheduler_reducer.rs index 566041f34..5f4832293 100644 --- a/p2p/src/network/scheduler/p2p_network_scheduler_reducer.rs +++ b/p2p/src/network/scheduler/p2p_network_scheduler_reducer.rs @@ -249,8 +249,9 @@ impl P2pNetworkSchedulerState { } P2pNetworkSchedulerAction::YamuxDidInit { addr, - message_size_limit, peer_id, + message_size_limit, + pending_outgoing_limit, } => { let Some(cn) = scheduler_state.connections.get_mut(&addr) else { bug_condition!( @@ -261,6 +262,7 @@ impl P2pNetworkSchedulerState { if let Some(P2pNetworkConnectionMuxState::Yamux(yamux)) = &mut cn.mux { yamux.init = true; yamux.message_size_limit = message_size_limit; + yamux.pending_outgoing_limit = pending_outgoing_limit; } let incoming = cn.incoming; @@ -503,10 +505,13 @@ impl P2pNetworkSchedulerState { return; }; let message_size_limit = p2p_state.config.limits.yamux_message_size(); + let pending_outgoing_limit = + p2p_state.config.limits.yamux_pending_outgoing_per_peer(); dispatcher.push(P2pNetworkSchedulerAction::YamuxDidInit { addr, peer_id, message_size_limit, + pending_outgoing_limit, }); } Some(Protocol::Stream(kind)) => { diff --git a/p2p/src/network/scheduler/p2p_network_scheduler_state.rs b/p2p/src/network/scheduler/p2p_network_scheduler_state.rs index 2dc330145..63e51ab04 100644 --- a/p2p/src/network/scheduler/p2p_network_scheduler_state.rs +++ b/p2p/src/network/scheduler/p2p_network_scheduler_state.rs @@ -196,6 +196,10 @@ pub enum P2pNetworkConnectionError { StreamReset(StreamId), #[error("pubsub error: {0}")] PubSubError(String), + #[error("peer make us keep too much data at stream {0}")] + YamuxOverflow(StreamId), + #[error("peer should not decrease window size at stream {0}")] + YamuxBadWindowUpdate(StreamId), } #[derive(Serialize, Deserialize, Debug, Clone)] diff --git a/p2p/src/network/yamux/p2p_network_yamux_reducer.rs b/p2p/src/network/yamux/p2p_network_yamux_reducer.rs index 48454331b..ff8074f88 100644 --- a/p2p/src/network/yamux/p2p_network_yamux_reducer.rs +++ b/p2p/src/network/yamux/p2p_network_yamux_reducer.rs @@ -1,3 +1,5 @@ +use std::collections::VecDeque; + use openmina_core::{bug_condition, fuzz_maybe, fuzzed_maybe, Substate, SubstateAccess}; use crate::P2pLimits; @@ -155,14 +157,14 @@ impl P2pNetworkYamuxState { data, mut flags, } => { - let yamux_state = yamux_state + let stream_state = yamux_state .streams .get(&stream_id) .ok_or_else(|| format!("Stream with id {stream_id} not found for `P2pNetworkYamuxAction::OutgoingData`"))?; - if !yamux_state.incoming && !yamux_state.established && !yamux_state.syn_sent { + if !stream_state.incoming && !stream_state.established && !stream_state.syn_sent { flags.insert(YamuxFlags::SYN); - } else if yamux_state.incoming && !yamux_state.established { + } else if stream_state.incoming && !stream_state.established { flags.insert(YamuxFlags::ACK); } @@ -180,6 +182,7 @@ impl P2pNetworkYamuxState { Ok(()) } P2pNetworkYamuxAction::IncomingFrame { addr, frame } => { + let mut pending_outgoing = VecDeque::default(); if let Some(frame) = yamux_state.incoming.pop_front() { if frame.flags.contains(YamuxFlags::SYN) { yamux_state @@ -211,11 +214,32 @@ impl P2pNetworkYamuxState { } } YamuxFrameInner::WindowUpdate { difference } => { - yamux_state + let stream = yamux_state .streams .entry(frame.stream_id) - .or_insert_with(YamuxStreamState::incoming) - .update_window(false, difference); + .or_insert_with(YamuxStreamState::incoming); + stream.update_window(false, difference); + if difference > 0 { + // have some fresh space in the window + // try send as many frames as can + let mut window = stream.window_theirs; + while let Some(mut frame) = stream.pending.pop_front() { + let len = frame.len() as u32; + if let Some(new_window) = window.checked_sub(len) { + pending_outgoing.push_back(frame); + window = new_window; + } else { + if let Some(remaining) = + frame.split_at((len - window) as usize) + { + stream.pending.push_front(remaining); + } + pending_outgoing.push_back(frame); + + break; + } + } + } } YamuxFrameInner::Ping { .. } => {} YamuxFrameInner::GoAway(res) => yamux_state.set_res(res), @@ -282,15 +306,17 @@ impl P2pNetworkYamuxState { } match &frame.inner { YamuxFrameInner::Data(data) => { + // here we are very permissive + // always when our window is smaller 64 kb, just increase it by 256 kb + // if we need fine grained back pressure, it should be implemented here if stream.window_ours < 64 * 1024 { + let difference = 256 * 1024; dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame { addr, frame: YamuxFrame { stream_id: frame.stream_id, flags: YamuxFlags::empty(), - inner: YamuxFrameInner::WindowUpdate { - difference: 256 * 1024, - }, + inner: YamuxFrameInner::WindowUpdate { difference }, }, }); } @@ -318,21 +344,61 @@ impl P2pNetworkYamuxState { }); } } + YamuxFrameInner::WindowUpdate { difference } => { + if *difference < 0 { + let error = + P2pNetworkConnectionError::YamuxBadWindowUpdate(frame.stream_id); + dispatcher.push(P2pNetworkSchedulerAction::Error { addr, error }); + } else { + while let Some(frame) = pending_outgoing.pop_front() { + dispatcher + .push(P2pNetworkYamuxAction::OutgoingFrame { addr, frame }); + } + } + } _ => {} } Ok(()) } - P2pNetworkYamuxAction::OutgoingFrame { frame, addr } => { - let Some(stream) = yamux_state.streams.get_mut(&frame.stream_id) else { + P2pNetworkYamuxAction::OutgoingFrame { mut frame, addr } => { + let stream_id = frame.stream_id; + let Some(stream) = yamux_state.streams.get_mut(&stream_id) else { return Ok(()); }; - match &frame.inner { + match &mut frame.inner { YamuxFrameInner::Data(data) => { - // must not underflow - // the action must not dispatch if it doesn't fit in the window - // TODO: add pending queue, where frames will wait for window increase - stream.window_theirs = stream.window_theirs.wrapping_sub(data.len() as u32); + if let Some(new_window) = + stream.window_theirs.checked_sub(data.len() as u32) + { + // their window is big enough, decrease the size + // and send the whole frame + stream.window_theirs = new_window; + } else if stream.window_theirs != 0 && stream.pending.is_empty() { + // their window is not big enough, but has some space, + // and the queue is empty, + // do not send the whole frame, + // split it and put remaining in the queue, + if let Some(remaining) = frame.split_at(stream.window_theirs as usize) { + stream.pending.push_back(remaining); + } + // the window will be zero after sending + stream.window_theirs = 0; + } else { + // either the window cannot accept any byte, + // or the queue is already not empty + // in both cases the whole frame goes in the queue and nothing to send + stream.pending.push_back(frame); + if stream.pending.iter().map(YamuxFrame::len).sum::() + > yamux_state.pending_outgoing_limit + { + let dispatcher = state_context.into_dispatcher(); + let error = P2pNetworkConnectionError::YamuxOverflow(stream_id); + dispatcher.push(P2pNetworkSchedulerAction::Error { addr, error }); + } + + return Ok(()); + } } YamuxFrameInner::WindowUpdate { difference } => { stream.update_window(true, *difference); diff --git a/p2p/src/network/yamux/p2p_network_yamux_state.rs b/p2p/src/network/yamux/p2p_network_yamux_state.rs index 010d8449b..766465a78 100644 --- a/p2p/src/network/yamux/p2p_network_yamux_state.rs +++ b/p2p/src/network/yamux/p2p_network_yamux_state.rs @@ -7,6 +7,7 @@ use super::super::*; #[derive(Serialize, Deserialize, Debug, Clone, Default)] pub struct P2pNetworkYamuxState { pub message_size_limit: Limit, + pub pending_outgoing_limit: Limit, pub buffer: Vec, pub incoming: VecDeque, pub streams: BTreeMap, @@ -54,6 +55,7 @@ pub struct YamuxStreamState { pub writable: bool, pub window_theirs: u32, pub window_ours: u32, + pub pending: VecDeque, } impl Default for YamuxStreamState { @@ -66,6 +68,7 @@ impl Default for YamuxStreamState { writable: false, window_theirs: 256 * 1024, window_ours: 256 * 1024, + pending: VecDeque::default(), } } } @@ -182,6 +185,43 @@ impl YamuxFrame { vec } + + pub fn len(&self) -> usize { + if let YamuxFrameInner::Data(data) = &self.inner { + data.len() + } else { + 0 + } + } + + /// If this data is bigger then `pos`, keep only first `pos` bytes and return some remaining + /// otherwise return none + pub fn split_at(&mut self, pos: usize) -> Option { + use std::ops::Sub; + + if let YamuxFrameInner::Data(data) = &mut self.inner { + if data.len() <= pos { + return None; + } + let (keep, rest) = data.split_at(pos); + let rest = Data(rest.to_vec().into_boxed_slice()); + *data = Data(keep.to_vec().into_boxed_slice()); + + let fin = if self.flags.contains(YamuxFlags::FIN) { + self.flags.remove(YamuxFlags::FIN); + YamuxFlags::FIN + } else { + YamuxFlags::empty() + }; + Some(YamuxFrame { + flags: self.flags.sub(YamuxFlags::SYN | YamuxFlags::ACK) | fin, + stream_id: self.stream_id, + inner: YamuxFrameInner::Data(rest), + }) + } else { + None + } + } } #[derive(Serialize, Deserialize, Debug, Clone)] diff --git a/p2p/src/p2p_config.rs b/p2p/src/p2p_config.rs index 349a69b21..ed1cc4c93 100644 --- a/p2p/src/p2p_config.rs +++ b/p2p/src/p2p_config.rs @@ -252,6 +252,7 @@ pub struct P2pLimits { max_peers_in_state: Limit, max_streams: Limit, yamux_message_size: Limit, + yamux_pending_outgoing_per_peer: Limit, identify_message: Limit, kademlia_request: Limit, @@ -319,6 +320,12 @@ impl P2pLimits { /// Sets the maximum number of streams that a peer is allowed to open simultaneously. with_yamux_message_size ); + limit!( + /// Maximum number of streams from a peer. + yamux_pending_outgoing_per_peer, + /// Sets the maximum number of streams that a peer is allowed to open simultaneously. + with_yamux_pending_outgoing_per_peer + ); limit!( /// Minimum number of peers. @@ -400,6 +407,7 @@ impl Default for P2pLimits { max_peers_in_state, max_streams, yamux_message_size, + yamux_pending_outgoing_per_peer: rpc_get_staged_ledger, identify_message, kademlia_request,