diff --git a/node/src/action_kind.rs b/node/src/action_kind.rs index f9290a60a..fd726cb83 100644 --- a/node/src/action_kind.rs +++ b/node/src/action_kind.rs @@ -460,6 +460,10 @@ pub enum ActionKind { P2pNetworkSelectTimeout, P2pNetworkYamuxIncomingData, P2pNetworkYamuxIncomingFrame, + P2pNetworkYamuxIncomingFrameData, + P2pNetworkYamuxIncomingFrameGoAway, + P2pNetworkYamuxIncomingFramePing, + P2pNetworkYamuxIncomingFrameWindowUpdate, P2pNetworkYamuxOpenStream, P2pNetworkYamuxOutgoingData, P2pNetworkYamuxOutgoingFrame, @@ -720,7 +724,7 @@ pub enum ActionKind { } impl ActionKind { - pub const COUNT: u16 = 610; + pub const COUNT: u16 = 614; } impl std::fmt::Display for ActionKind { @@ -1948,6 +1952,12 @@ impl ActionKindGet for P2pNetworkYamuxAction { Self::IncomingData { .. } => ActionKind::P2pNetworkYamuxIncomingData, Self::OutgoingData { .. } => ActionKind::P2pNetworkYamuxOutgoingData, Self::IncomingFrame { .. } => ActionKind::P2pNetworkYamuxIncomingFrame, + Self::IncomingFrameData { .. } => ActionKind::P2pNetworkYamuxIncomingFrameData, + Self::IncomingFramePing { .. } => ActionKind::P2pNetworkYamuxIncomingFramePing, + Self::IncomingFrameWindowUpdate { .. } => { + ActionKind::P2pNetworkYamuxIncomingFrameWindowUpdate + } + Self::IncomingFrameGoAway { .. } => ActionKind::P2pNetworkYamuxIncomingFrameGoAway, Self::OutgoingFrame { .. } => ActionKind::P2pNetworkYamuxOutgoingFrame, Self::PingStream { .. } => ActionKind::P2pNetworkYamuxPingStream, Self::OpenStream { .. } => ActionKind::P2pNetworkYamuxOpenStream, diff --git a/p2p/src/network/scheduler/p2p_network_scheduler_state.rs b/p2p/src/network/scheduler/p2p_network_scheduler_state.rs index 6cae923c0..9d8ad03a0 100644 --- a/p2p/src/network/scheduler/p2p_network_scheduler_state.rs +++ b/p2p/src/network/scheduler/p2p_network_scheduler_state.rs @@ -8,7 +8,9 @@ use malloc_size_of_derive::MallocSizeOf; use redux::Timestamp; use serde::{Deserialize, Serialize}; -use crate::{disconnection::P2pDisconnectionReason, identity::PublicKey, PeerId}; +use crate::{ + disconnection::P2pDisconnectionReason, identity::PublicKey, yamux::YamuxStreamState, PeerId, +}; use super::super::*; @@ -183,6 +185,17 @@ impl P2pNetworkConnectionState { SelectKind::Stream(_, stream_id) => Some(&self.streams.get(stream_id)?.select), } } + + pub fn get_yamux_stream(&self, stream_id: StreamId) -> Option<&YamuxStreamState> { + self.yamux_state()?.streams.get(&stream_id) + } + + pub fn incoming_streams_count(&self) -> usize { + self.streams + .iter() + .filter(|(_, state)| state.select.is_incoming()) + .count() + } } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, thiserror::Error)] diff --git a/p2p/src/network/yamux/mod.rs b/p2p/src/network/yamux/mod.rs index d4c43b45b..41f12567e 100644 --- a/p2p/src/network/yamux/mod.rs +++ b/p2p/src/network/yamux/mod.rs @@ -3,8 +3,11 @@ pub use self::p2p_network_yamux_actions::*; mod p2p_network_yamux_state; pub use self::p2p_network_yamux_state::{ - P2pNetworkYamuxState, StreamId, YamuxFlags, YamuxPing, YamuxStreamKind, + P2pNetworkYamuxState, StreamId, YamuxFlags, YamuxPing, YamuxStreamKind, YamuxStreamState, }; #[cfg(feature = "p2p-libp2p")] mod p2p_network_yamux_reducer; + +#[cfg(test)] +mod tests; diff --git a/p2p/src/network/yamux/p2p_network_yamux_actions.rs b/p2p/src/network/yamux/p2p_network_yamux_actions.rs index f34166d42..a7772c5a6 100644 --- a/p2p/src/network/yamux/p2p_network_yamux_actions.rs +++ b/p2p/src/network/yamux/p2p_network_yamux_actions.rs @@ -22,6 +22,22 @@ pub enum P2pNetworkYamuxAction { addr: ConnectionAddr, }, #[action_event(level = trace)] + IncomingFrameData { + addr: ConnectionAddr, + }, + #[action_event(level = trace)] + IncomingFramePing { + addr: ConnectionAddr, + }, + #[action_event(level = trace)] + IncomingFrameWindowUpdate { + addr: ConnectionAddr, + }, + #[action_event(level = trace)] + IncomingFrameGoAway { + addr: ConnectionAddr, + }, + #[action_event(level = trace)] OutgoingFrame { addr: ConnectionAddr, frame: YamuxFrame, @@ -43,6 +59,10 @@ impl P2pNetworkYamuxAction { Self::IncomingData { addr, .. } => addr, Self::OutgoingData { addr, .. } => addr, Self::IncomingFrame { addr, .. } => addr, + Self::IncomingFrameData { addr, .. } => addr, + Self::IncomingFramePing { addr, .. } => addr, + Self::IncomingFrameWindowUpdate { addr, .. } => addr, + Self::IncomingFrameGoAway { addr, .. } => addr, Self::OutgoingFrame { addr, .. } => addr, Self::PingStream { addr, .. } => addr, Self::OpenStream { addr, .. } => addr, @@ -72,7 +92,11 @@ impl redux::EnablingCondition for P2pNetworkYamuxAction { P2pNetworkYamuxAction::OutgoingData { stream_id, .. } => { yamux_state.streams.contains_key(stream_id) } - P2pNetworkYamuxAction::IncomingFrame { .. } => true, + P2pNetworkYamuxAction::IncomingFrame { .. } + | P2pNetworkYamuxAction::IncomingFrameData { .. } + | P2pNetworkYamuxAction::IncomingFramePing { .. } + | P2pNetworkYamuxAction::IncomingFrameWindowUpdate { .. } + | P2pNetworkYamuxAction::IncomingFrameGoAway { .. } => !yamux_state.incoming.is_empty(), P2pNetworkYamuxAction::OutgoingFrame { .. } => true, P2pNetworkYamuxAction::PingStream { .. } => true, P2pNetworkYamuxAction::OpenStream { .. } => true, diff --git a/p2p/src/network/yamux/p2p_network_yamux_reducer.rs b/p2p/src/network/yamux/p2p_network_yamux_reducer.rs index e3fbdb72c..9643b1826 100644 --- a/p2p/src/network/yamux/p2p_network_yamux_reducer.rs +++ b/p2p/src/network/yamux/p2p_network_yamux_reducer.rs @@ -1,16 +1,14 @@ -use std::collections::VecDeque; - use openmina_core::{bug_condition, fuzz_maybe, fuzzed_maybe, Substate, SubstateAccess}; use crate::{ yamux::p2p_network_yamux_state::{YamuxFrame, YamuxFrameInner}, - Data, Limit, P2pLimits, P2pNetworkAuthState, P2pNetworkConnectionError, - P2pNetworkConnectionMuxState, P2pNetworkNoiseAction, P2pNetworkSchedulerAction, - P2pNetworkSchedulerState, P2pNetworkSelectAction, P2pNetworkStreamState, SelectKind, + Data, P2pLimits, P2pNetworkConnectionError, P2pNetworkConnectionMuxState, + P2pNetworkNoiseAction, P2pNetworkSchedulerAction, P2pNetworkSchedulerState, + P2pNetworkSelectAction, P2pNetworkStreamState, SelectKind, }; use super::{ - p2p_network_yamux_state::{YamuxStreamState, MAX_WINDOW_SIZE}, + p2p_network_yamux_state::{YamuxFrameKind, YamuxStreamState}, P2pNetworkYamuxAction, P2pNetworkYamuxState, YamuxFlags, YamuxPing, }; @@ -107,176 +105,201 @@ impl P2pNetworkYamuxState { Ok(()) } P2pNetworkYamuxAction::IncomingFrame { addr } => { - let mut pending_outgoing = VecDeque::default(); - let Some(frame) = yamux_state.incoming.pop_front() else { - bug_condition!( - "Frame not found for action `P2pNetworkYamuxAction::IncomingFrame`" - ); - return Ok(()); - }; + let frame = yamux_state.incoming.front().unwrap(); // Cannot fail + let time = meta.time(); + let mut is_new_stream = false; if frame.flags.contains(YamuxFlags::SYN) { + // If there is a SYN flag, it means a new incoming stream is being opened yamux_state .streams .insert(frame.stream_id, YamuxStreamState::incoming()); - if frame.stream_id != 0 { - connection_state.streams.insert( - frame.stream_id, - P2pNetworkStreamState::new_incoming(meta.time()), - ); + is_new_stream = !frame.is_session_stream(); + if is_new_stream { + connection_state + .streams + .insert(frame.stream_id, P2pNetworkStreamState::new_incoming(time)); } } - if frame.flags.contains(YamuxFlags::ACK) { - yamux_state - .streams - .entry(frame.stream_id) - .or_default() - .established = true; - } - match &frame.inner { - YamuxFrameInner::Data(_) => { - if let Some(stream) = yamux_state.streams.get_mut(&frame.stream_id) { - // must not underflow - // TODO: check it and disconnect peer that violates flow rules - stream.window_ours = - stream.window_ours.saturating_sub(frame.len_as_u32()); - } - } - YamuxFrameInner::WindowUpdate { difference } => { - let stream = yamux_state - .streams - .entry(frame.stream_id) - .or_insert_with(YamuxStreamState::incoming); - - stream.window_theirs = stream.window_theirs.saturating_add(*difference); - - if *difference > 0 { - // have some fresh space in the window - // try send as many frames as can - let mut window = stream.window_theirs; - while let Some(frame) = stream.pending.pop_front() { - let len = frame.len_as_u32(); - pending_outgoing.push_back(frame); - if let Some(new_window) = window.checked_sub(len) { - window = new_window; - } else { - break; - } - } - } + // If there is an ACK flag, it means the outgoing stream has been established + if frame.flags.contains(YamuxFlags::ACK) { + // TODO: what if the stream doesn't exist? + if let Some(stream) = yamux_state.streams.get_mut(&frame.stream_id) { + stream.established = true; } - YamuxFrameInner::Ping { .. } => {} - YamuxFrameInner::GoAway(res) => yamux_state.set_res(*res), } + let frame_stream_id = frame.stream_id; + let frame_flags = frame.flags; + let frame_kind = frame.kind(); + let (dispatcher, state) = state_context.into_dispatcher_and_state(); + let limits: &P2pLimits = state.substate()?; - let max_streams = limits.max_streams(); let connection_state = >::substate(state)? .connection_state(&addr) .ok_or_else(|| format!("Connection not found {}", addr))?; - let stream = connection_state - .yamux_state() - .and_then(|yamux_state| yamux_state.streams.get(&frame.stream_id)) - .ok_or_else(|| format!("Stream with id {} not found for `P2pNetworkYamuxAction::IncomingFrame`", frame.stream_id))?; - - let peer_id = match connection_state - .auth - .as_ref() - .and_then(|P2pNetworkAuthState::Noise(noise)| noise.peer_id()) - { + let stream_exists = connection_state.get_yamux_stream(frame_stream_id).is_some(); + let peer_id = match connection_state.peer_id() { Some(peer_id) => *peer_id, - None => return Ok(()), + None => { + bug_condition!("Peer id must exist"); + return Ok(()); + } }; - if frame.flags.contains(YamuxFlags::RST) { + // Peer reset this stream + if stream_exists && frame_flags.contains(YamuxFlags::RST) { dispatcher.push(P2pNetworkSchedulerAction::Error { addr, - error: P2pNetworkConnectionError::StreamReset(frame.stream_id), + error: P2pNetworkConnectionError::StreamReset(frame_stream_id), }); return Ok(()); } - if frame.flags.contains(YamuxFlags::SYN) && frame.stream_id != 0 { - // count incoming streams - let incoming_streams_number = connection_state - .streams - .values() - .filter(|s| s.select.is_incoming()) - .count(); - - match (max_streams, incoming_streams_number) { - (Limit::Some(limit), actual) if actual > limit => { - dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame { - addr, - frame: YamuxFrame { - flags: YamuxFlags::FIN, - stream_id: frame.stream_id, - inner: YamuxFrameInner::Data(vec![].into()), - }, - }); - } - _ => { - dispatcher.push(P2pNetworkSelectAction::Init { - addr, - kind: SelectKind::Stream(peer_id, frame.stream_id), - incoming: true, - }); - } - } - } - match &frame.inner { - YamuxFrameInner::Data(data) => { - // when our window size is less than half of the max window size send window update - if stream.window_ours < stream.max_window_size / 2 { - let difference = - stream.max_window_size.saturating_mul(2).min(1024 * 1024); - - dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame { - addr, - frame: YamuxFrame { - stream_id: frame.stream_id, - flags: YamuxFlags::empty(), - inner: YamuxFrameInner::WindowUpdate { difference }, - }, - }); - } - - dispatcher.push(P2pNetworkSelectAction::IncomingData { + // Enforce stream limits + if is_new_stream { + if connection_state.incoming_streams_count() > limits.max_streams() { + dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame { + addr, + frame: YamuxFrame { + flags: YamuxFlags::FIN, + stream_id: frame_stream_id, + inner: YamuxFrameInner::WindowUpdate { difference: 0 }, + }, + }); + } else { + dispatcher.push(P2pNetworkSelectAction::Init { addr, - peer_id, - stream_id: frame.stream_id, - data: data.clone(), - fin: frame.flags.contains(YamuxFlags::FIN), + kind: SelectKind::Stream(peer_id, frame_stream_id), + incoming: true, }); } - YamuxFrameInner::Ping { opaque } => { - let response = frame.flags.contains(YamuxFlags::ACK); - // if this ping is not response create our response - if !response { - let ping = YamuxPing { - stream_id: frame.stream_id, - opaque: *opaque, - response: true, - }; - dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame { - addr, - frame: ping.into_frame(), - }); - } + } + + dispatcher.push(match frame_kind { + YamuxFrameKind::Data => P2pNetworkYamuxAction::IncomingFrameData { addr }, + YamuxFrameKind::WindowUpdate => { + P2pNetworkYamuxAction::IncomingFrameWindowUpdate { addr } } - YamuxFrameInner::WindowUpdate { .. } => { - while let Some(frame) = pending_outgoing.pop_front() { - dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame { addr, frame }); - } + YamuxFrameKind::Ping => P2pNetworkYamuxAction::IncomingFramePing { addr }, + YamuxFrameKind::GoAway => P2pNetworkYamuxAction::IncomingFrameGoAway { addr }, + }); + + Ok(()) + } + P2pNetworkYamuxAction::IncomingFrameData { addr } => { + let frame = yamux_state.incoming.pop_front().unwrap(); // Cannot fail + let YamuxFrameInner::Data(data) = &frame.inner else { + bug_condition!("Expected Data frame"); + return Ok(()); + }; + + let Some(stream) = yamux_state.streams.get_mut(&frame.stream_id) else { + return Ok(()); + }; + + // Process incoming data and check if we need to update window + let window_update_info = + if let Some(window_increase) = stream.process_incoming_data(&frame) { + Some((frame.stream_id, window_increase)) + } else { + None + }; + + let peer_id = match connection_state.peer_id() { + Some(peer_id) => *peer_id, + None => { + bug_condition!("Peer id must exist"); + return Ok(()); } - _ => {} + }; + + let dispatcher = state_context.into_dispatcher(); + + if let Some((update_stream_id, difference)) = window_update_info { + dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame { + addr, + frame: YamuxFrame { + stream_id: update_stream_id, + flags: YamuxFlags::empty(), + inner: YamuxFrameInner::WindowUpdate { difference }, + }, + }); } + dispatcher.push(P2pNetworkSelectAction::IncomingData { + addr, + peer_id, + stream_id: frame.stream_id, + data: data.clone(), + fin: frame.flags.contains(YamuxFlags::FIN), + }); + Ok(()) + } + P2pNetworkYamuxAction::IncomingFramePing { addr } => { + let frame = yamux_state.incoming.pop_front().unwrap(); // Cannot fail + + // If the frame has an ACK flag, it means the ping was a response, nothing to do + if frame.flags.contains(YamuxFlags::ACK) { + return Ok(()); + } + + let YamuxFrameInner::Ping { opaque } = frame.inner else { + bug_condition!( + "Expected Ping frame for action `P2pNetworkYamuxAction::IncomingFramePing`" + ); + return Ok(()); + }; + + let ping = YamuxPing { + stream_id: frame.stream_id, + opaque, + response: true, + }; + + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame { + addr, + frame: ping.into_frame(), + }); + + Ok(()) + } + P2pNetworkYamuxAction::IncomingFrameWindowUpdate { addr } => { + let frame = yamux_state.incoming.pop_front().unwrap(); // Cannot fail + let YamuxFrameInner::WindowUpdate { difference } = frame.inner else { + bug_condition!("Expected WindowUpdate frame for action `P2pNetworkYamuxAction::IncomingFrameWindowUpdate`"); + return Ok(()); + }; + + let stream = yamux_state + .streams + .entry(frame.stream_id) + .or_insert_with(YamuxStreamState::incoming); + + let sendable_frames = stream.update_remote_window(difference); + + let dispatcher = state_context.into_dispatcher(); + + for frame in sendable_frames { + dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame { addr, frame }); + } + + Ok(()) + } + P2pNetworkYamuxAction::IncomingFrameGoAway { .. } => { + let frame = yamux_state.incoming.pop_front().unwrap(); // Cannot fail + let YamuxFrameInner::GoAway(res) = frame.inner else { + bug_condition!("Expected GoAway frame for action `P2pNetworkYamuxAction::IncomingFrameGoAway`"); + return Ok(()); + }; + + yamux_state.set_res(res); Ok(()) } P2pNetworkYamuxAction::OutgoingFrame { mut frame, addr } => { @@ -284,40 +307,24 @@ impl P2pNetworkYamuxState { let Some(stream) = yamux_state.streams.get_mut(&stream_id) else { return Ok(()); }; - match &mut frame.inner { + + match &frame.inner { YamuxFrameInner::Data(_) => { - if let Some(new_window) = - stream.window_theirs.checked_sub(frame.len_as_u32()) - { - // their window is big enough, decrease the size - // and send the whole frame - stream.window_theirs = new_window; - } else { - // their window is not big enough - // split the frame to send as much as you can and put the rest in the queue - if let Some(remaining) = frame.split_at(stream.window_theirs as usize) { - stream.pending.push_front(remaining); - } - - // the window will be zero after sending - stream.window_theirs = 0; - - // if size of pending that is above the limit, ignore the peer - if stream.pending.iter().map(YamuxFrame::len).sum::() - > yamux_state.pending_outgoing_limit - { - let dispatcher = state_context.into_dispatcher(); - let error = P2pNetworkConnectionError::YamuxOverflow(stream_id); - dispatcher.push(P2pNetworkSchedulerAction::Error { addr, error }); - return Ok(()); - } + let (accepted, remaining) = + stream.queue_frame(frame, yamux_state.pending_outgoing_limit); + + if remaining.is_some() { + let dispatcher = state_context.into_dispatcher(); + let error = P2pNetworkConnectionError::YamuxOverflow(stream_id); + dispatcher.push(P2pNetworkSchedulerAction::Error { addr, error }); + return Ok(()); } + + frame = + accepted.expect("frame should be accepted or error should be returned"); } YamuxFrameInner::WindowUpdate { difference } => { - stream.window_ours = stream.window_ours.saturating_add(*difference); - if stream.window_ours > stream.max_window_size { - stream.max_window_size = stream.window_ours.min(MAX_WINDOW_SIZE); - } + stream.update_local_window(*difference); } _ => {} } @@ -364,13 +371,12 @@ impl P2pNetworkYamuxState { P2pNetworkStreamState::new(stream_kind, meta.time()), ); - let peer_id = match connection_state - .auth - .as_ref() - .and_then(|P2pNetworkAuthState::Noise(noise)| noise.peer_id()) - { + let peer_id = match connection_state.peer_id() { Some(peer_id) => *peer_id, - None => return Ok(()), + None => { + bug_condition!("Peer id must exist"); + return Ok(()); + } }; let dispatcher = state_context.into_dispatcher(); diff --git a/p2p/src/network/yamux/p2p_network_yamux_state.rs b/p2p/src/network/yamux/p2p_network_yamux_state.rs index 938ab3d68..89bfe5a78 100644 --- a/p2p/src/network/yamux/p2p_network_yamux_state.rs +++ b/p2p/src/network/yamux/p2p_network_yamux_state.rs @@ -7,7 +7,7 @@ use super::super::*; pub const INITIAL_RECV_BUFFER_CAPACITY: usize = 0x40000; // 256kb pub const INITIAL_WINDOW_SIZE: u32 = INITIAL_RECV_BUFFER_CAPACITY as u32; -pub const MAX_WINDOW_SIZE: u32 = 16 * 1024 * 1024; // 16mb +pub const MAX_WINDOW_SIZE: u32 = INITIAL_RECV_BUFFER_CAPACITY as u32; #[derive(Serialize, Deserialize, Debug, Clone, Default)] pub struct P2pNetworkYamuxState { @@ -66,6 +66,8 @@ impl P2pNetworkYamuxState { return None; } + // Version 0 is the only supported version as per Yamux specification. + // Any other version should be rejected. let _version = match buf[0] { 0 => 0, unknown => { @@ -162,7 +164,7 @@ impl P2pNetworkYamuxState { self.shift_and_compact_buffer(offset); } - fn shift_and_compact_buffer(&mut self, offset: usize) { + pub(crate) fn shift_and_compact_buffer(&mut self, offset: usize) { let new_len = self.buffer.len() - offset; if self.buffer.capacity() > INITIAL_RECV_BUFFER_CAPACITY * 2 && new_len < INITIAL_RECV_BUFFER_CAPACITY / 2 @@ -228,14 +230,117 @@ impl YamuxStreamState { ..Default::default() } } + + /// Updates the remote window size and returns any frames that can now be sent + /// Returns frames that were pending and can now be sent due to increased window size + pub fn update_remote_window(&mut self, difference: u32) -> VecDeque { + self.window_theirs = self.window_theirs.saturating_add(difference); + let mut sendable_frames = VecDeque::new(); + + if difference > 0 { + let mut available_window = self.window_theirs; + while let Some(frame) = self.pending.pop_front() { + let frame_len = frame.len_as_u32(); + if frame_len > available_window { + // Put frame back and stop + self.pending.push_front(frame); + break; + } + available_window -= frame_len; + sendable_frames.push_back(frame); + } + } + + sendable_frames + } + + /// Updates the local window size and possibly increases max window size + pub fn update_local_window(&mut self, difference: u32) { + self.window_ours = self.window_ours.saturating_add(difference); + if self.window_ours > self.max_window_size { + self.max_window_size = self.window_ours.min(MAX_WINDOW_SIZE); + } + } + + /// Consumes window space for outgoing data + /// Returns true if the frame can be sent immediately, + /// false if it needs to be queued (window too small) + pub fn try_consume_window(&mut self, frame_len: u32) -> bool { + if let Some(new_window) = self.window_theirs.checked_sub(frame_len) { + self.window_theirs = new_window; + true + } else { + false + } + } + + /// Checks if window should be updated based on current size + /// Returns the amount by which the window should be increased, if any + pub fn should_update_window(&self) -> Option { + if self.window_ours < self.max_window_size / 2 { + Some(self.max_window_size.saturating_mul(2).min(1024 * 1024)) + } else { + None + } + } + + /// Attempts to queue a frame, respecting window size and pending queue limits + /// Returns (accepted_frame, remaining_frame) where: + /// - accepted_frame is the portion that fits in the current window + /// - remaining_frame is the portion that needs to be queued (if any) + pub fn queue_frame( + &mut self, + frame: YamuxFrame, + pending_limit: Limit, + ) -> (Option, Option) { + let frame_len = frame.len_as_u32(); + + // Check if frame fits in current window + if self.try_consume_window(frame_len) { + return (Some(frame), None); + } + + // Split frame if needed + let mut frame = frame; + let remaining = frame.split_at(self.window_theirs as usize); + self.window_theirs = 0; + + // Check pending queue size limit + let pending_size = self.pending.iter().map(YamuxFrame::len).sum::(); + if remaining.as_ref().map_or(0, |f| f.len()) + pending_size > pending_limit { + // Queue is full + return (None, Some(frame)); + } + + if let Some(remaining) = remaining { + self.pending.push_back(remaining); + } + + (Some(frame), None) + } + + /// Processes an incoming data frame and returns any necessary window updates + pub fn process_incoming_data(&mut self, frame: &YamuxFrame) -> Option { + // must not underflow + // TODO: check it and disconnect peer that violates flow rules + // Update our window + self.window_ours = self.window_ours.saturating_sub(frame.len_as_u32()); + + // Check if window update needed + self.should_update_window() + } } bitflags::bitflags! { #[derive(Serialize, Deserialize, Debug, Default, Clone, Copy)] pub struct YamuxFlags: u16 { + /// Signals the start of a new stream. May be sent with a data or window update message. Also sent with a ping to indicate outbound. const SYN = 0b0001; + /// Acknowledges the start of a new stream. May be sent with a data or window update message. Also sent with a ping to indicate response. const ACK = 0b0010; + /// Performs a half-close of a stream. May be sent with a data message or window update. const FIN = 0b0100; + /// Reset a stream immediately. May be sent with a data or window update message. const RST = 0b1000; } } @@ -380,6 +485,27 @@ impl YamuxFrame { None } } + + pub fn is_session_stream(&self) -> bool { + self.stream_id == 0 + } + + pub fn kind(&self) -> YamuxFrameKind { + match self.inner { + YamuxFrameInner::Data(_) => YamuxFrameKind::Data, + YamuxFrameInner::WindowUpdate { .. } => YamuxFrameKind::WindowUpdate, + YamuxFrameInner::Ping { .. } => YamuxFrameKind::Ping, + YamuxFrameInner::GoAway(_) => YamuxFrameKind::GoAway, + } + } +} + +#[derive(Debug, Clone, Copy)] +pub enum YamuxFrameKind { + Data, + WindowUpdate, + Ping, + GoAway, } #[derive(Serialize, Deserialize, Debug, Clone, MallocSizeOf)] @@ -390,7 +516,7 @@ pub enum YamuxFrameInner { GoAway(#[ignore_malloc_size_of = "doesn't allocate"] Result<(), YamuxSessionError>), } -#[derive(Serialize, Deserialize, Debug, Clone, Copy)] +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone, Copy)] pub enum YamuxSessionError { Protocol, Internal, diff --git a/p2p/src/network/yamux/tests.rs b/p2p/src/network/yamux/tests.rs new file mode 100644 index 000000000..0fe919bdf --- /dev/null +++ b/p2p/src/network/yamux/tests.rs @@ -0,0 +1,592 @@ +use crate::{yamux::p2p_network_yamux_state::*, Data, Limit}; + +/// Tests frame serialization following the spec's framing requirements: +/// - 12-byte header (Version, Type, Flags, StreamID, Length) +/// - Proper big-endian encoding +/// - Correct payload length handling +#[test] +fn test_frame_serialization() { + // Test data frame + let data_frame = YamuxFrame { + flags: YamuxFlags::SYN | YamuxFlags::ACK, + stream_id: 1, + inner: YamuxFrameInner::Data(Data::from(vec![1, 2, 3, 4])), + }; + let bytes = data_frame.into_bytes(); + assert_eq!(bytes.len(), 16); // 12 bytes header + 4 bytes data + assert_eq!( + &bytes[..12], + &[ + 0x00, // version + 0x00, // type (DATA) + 0x00, 0x03, // flags (SYN | ACK) + 0x00, 0x00, 0x00, 0x01, // stream_id + 0x00, 0x00, 0x00, 0x04, // length + ] + ); + assert_eq!(&bytes[12..], &[1, 2, 3, 4]); + + // Test window update frame + let window_frame = YamuxFrame { + flags: YamuxFlags::empty(), + stream_id: 2, + inner: YamuxFrameInner::WindowUpdate { difference: 1024 }, + }; + let bytes = window_frame.into_bytes(); + assert_eq!(bytes.len(), 12); + assert_eq!( + &bytes[..], + &[ + 0x00, // version + 0x01, // type (WINDOW_UPDATE) + 0x00, 0x00, // flags + 0x00, 0x00, 0x00, 0x02, // stream_id + 0x00, 0x00, 0x04, 0x00, // difference (1024) + ] + ); +} + +/// Tests frame parsing according to spec's requirements: +/// - Version field validation (must be 0) +/// - Proper flag combinations (SYN | ACK) +/// - Stream ID handling +/// - Length field interpretation +/// - Payload extraction +#[test] +fn test_frame_parsing() { + let mut state = P2pNetworkYamuxState { + message_size_limit: Limit::Some(1024), + ..P2pNetworkYamuxState::default() + }; + + // Valid data frame + let data = vec![ + 0x00, // version + 0x00, // type (DATA) + 0x00, 0x03, // flags (SYN | ACK) + 0x00, 0x00, 0x00, 0x01, // stream_id + 0x00, 0x00, 0x00, 0x04, // length + 0x01, 0x02, 0x03, 0x04, // payload + ]; + state.extend_buffer(&data); + state.parse_frames(); + + assert_eq!(state.incoming.len(), 1); + let frame = state.incoming.pop_front().unwrap(); + assert!(frame.flags.contains(YamuxFlags::SYN)); + assert!(frame.flags.contains(YamuxFlags::ACK)); + assert_eq!(frame.stream_id, 1); + match frame.inner { + YamuxFrameInner::Data(data) => assert_eq!(&*data, &[1, 2, 3, 4]), + _ => panic!("Expected Data frame"), + } +} + +/// Tests version field validation as per spec: +/// "The version field is used for future backward compatibility. +/// At the current time, the field is always set to 0" +#[test] +fn test_invalid_version() { + let mut state = P2pNetworkYamuxState::default(); + + // Invalid version + let data = vec![ + 0x01, // invalid version + 0x00, // type + 0x00, 0x00, // flags + 0x00, 0x00, 0x00, 0x01, // stream_id + 0x00, 0x00, 0x00, 0x00, // length + ]; + state.extend_buffer(&data); + state.parse_frames(); + + assert!(matches!( + state.terminated, + Some(Err(YamuxFrameParseError::Version(1))) + )); +} + +/// Tests window management as specified: +/// - Window size tracking +/// - Frame splitting when exceeding window +/// - Window update mechanism +/// - Pending queue behavior +#[test] +fn test_window_management() { + let mut stream = YamuxStreamState::default(); + assert_eq!(stream.window_ours, INITIAL_WINDOW_SIZE); + assert_eq!(stream.window_theirs, INITIAL_WINDOW_SIZE); + + // Test consuming window space + let frame_len = 1000; + assert!(stream.try_consume_window(frame_len)); + assert_eq!(stream.window_theirs, INITIAL_WINDOW_SIZE - frame_len); + + // Test window update + let update_size = 2048; + let sendable = stream.update_remote_window(update_size); + assert_eq!( + stream.window_theirs, + INITIAL_WINDOW_SIZE - frame_len + update_size + ); + assert!(sendable.is_empty()); // No pending frames yet + + // Test window auto-tuning + assert_eq!(stream.window_ours, INITIAL_WINDOW_SIZE); + stream.window_ours = stream.max_window_size / 3; // Simulate consumed window + assert!(stream.should_update_window().is_some()); // Should trigger update + + // Test pending queue + stream.window_theirs = 10; // Set small window + let large_frame = YamuxFrame { + flags: YamuxFlags::empty(), + stream_id: 1, + inner: YamuxFrameInner::Data(Data::from(vec![1; 20])), + }; + stream.pending.push_back(large_frame); + + // Update window and check if frame gets sent + let sendable = stream.update_remote_window(15); + assert_eq!(sendable.len(), 1); + assert!(stream.pending.is_empty()); +} + +/// Tests stream ID allocation rules from spec: +/// "The client side should use odd ID's, and the server even. +/// This prevents any collisions." +#[test] +fn test_stream_id_allocation() { + // Test client (odd) IDs + assert_eq!(YamuxStreamKind::Rpc.stream_id(false), 1); + assert_eq!(YamuxStreamKind::Gossipsub.stream_id(false), 3); + + // Test server (even) IDs + assert_eq!(YamuxStreamKind::Rpc.stream_id(true), 2); + assert_eq!(YamuxStreamKind::Gossipsub.stream_id(true), 4); +} + +/// Tests message size limiting (implementation-specific safeguard +/// not in spec but required for security) +#[test] +fn test_message_size_limit() { + let mut state = P2pNetworkYamuxState { + message_size_limit: Limit::Some(10), + ..P2pNetworkYamuxState::default() + }; + + // Message exceeding size limit + let data = vec![ + 0x00, // version + 0x00, // type (DATA) + 0x00, 0x00, // flags + 0x00, 0x00, 0x00, 0x01, // stream_id + 0x00, 0x00, 0x00, 0x20, // length (32 bytes, exceeds limit) + ]; + state.extend_buffer(&data); + state.parse_frames(); + + assert!(matches!( + state.terminated, + Some(Ok(Err(YamuxSessionError::Internal))) + )); +} + +/// Tests ping messages as per spec: +/// "Used to measure RTT. It can also be used to heart-beat +/// and do keep-alives over TCP." +/// - SYN flag for outbound +/// - ACK flag for response +#[test] +fn test_ping_pong() { + let ping = YamuxPing { + stream_id: 0, + opaque: 12345, + response: false, + }; + + let frame = ping.into_frame(); + assert!(frame.flags.contains(YamuxFlags::SYN)); + assert!(!frame.flags.contains(YamuxFlags::ACK)); + + match frame.inner { + YamuxFrameInner::Ping { opaque } => assert_eq!(opaque, 12345), + _ => panic!("Expected Ping frame"), + } + + // Test ping response + let response = YamuxPing { + stream_id: 0, + opaque: 12345, + response: true, + }; + + let frame = response.into_frame(); + assert!(frame.flags.contains(YamuxFlags::ACK)); + assert!(!frame.flags.contains(YamuxFlags::SYN)); +} + +/// Tests handling of incomplete frames: +/// Verifies that parser correctly handles partial frame data +/// and waits for complete frame before processing +#[test] +fn test_partial_frame_parsing() { + let mut state = P2pNetworkYamuxState { + message_size_limit: Limit::Some(1024), + ..P2pNetworkYamuxState::default() + }; + + // Send header only first + let header = vec![ + 0x00, // version + 0x00, // type (DATA) + 0x00, 0x03, // flags (SYN | ACK) + 0x00, 0x00, 0x00, 0x01, // stream_id + 0x00, 0x00, 0x00, 0x04, // length + ]; + state.extend_buffer(&header); + state.parse_frames(); + assert_eq!(state.incoming.len(), 0); // Should not parse incomplete frame + + // Send payload + let payload = vec![0x01, 0x02, 0x03, 0x04]; + state.extend_buffer(&payload); + state.parse_frames(); + assert_eq!(state.incoming.len(), 1); // Should now parse complete frame +} + +/// Tests parsing of multiple consecutive frames +/// Ensures correct frame boundary detection and sequential processing +#[test] +fn test_multiple_frames_parsing() { + let mut state = P2pNetworkYamuxState { + message_size_limit: Limit::Some(1024), + ..P2pNetworkYamuxState::default() + }; + + // Two consecutive frames + let frames = vec![ + 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x01, + 0x02, // Frame 1 + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x02, 0x03, + 0x04, // Frame 2 + ]; + state.extend_buffer(&frames); + state.parse_frames(); + assert_eq!(state.incoming.len(), 2); +} + +/// Tests flow control as per spec: +/// - Window size tracking +/// - Frame splitting when exceeding window +/// - Window update mechanism +#[test] +fn test_flow_control() { + let mut stream = YamuxStreamState { + window_theirs: 10, // Small window for testing + ..YamuxStreamState::default() + }; + + // Create frame larger than window + let large_frame = YamuxFrame { + flags: YamuxFlags::empty(), + stream_id: 1, + inner: YamuxFrameInner::Data(Data::from(vec![1; 20])), + }; + + // Test frame splitting and queueing + let (accepted, _) = stream.queue_frame(large_frame, Limit::Some(1000)); + assert!(accepted.is_some()); + assert_eq!(stream.pending.len(), 1); + assert_eq!(stream.window_theirs, 0); + + // Test window update triggering incoming data + let data_frame = YamuxFrame { + flags: YamuxFlags::empty(), + stream_id: 1, + inner: YamuxFrameInner::Data(Data::from(vec![1; (INITIAL_WINDOW_SIZE / 2 + 1) as usize])), + }; + + let update = stream.process_incoming_data(&data_frame); + assert!(update.is_some()); // Should trigger update since window_ours < max_window_size/2 + + // Test pending frames get sent when window is updated + let sendable = stream.update_remote_window(15); + assert_eq!(sendable.len(), 1); + assert!(stream.pending.is_empty()); +} + +/// Tests invalid flag combinations +/// Ensures proper error handling for undefined flag bits +#[test] +fn test_invalid_flags() { + let mut state = P2pNetworkYamuxState::default(); + + // Invalid flags value + let data = vec![ + 0x00, // version + 0x00, // type + 0xFF, 0xFF, // invalid flags + 0x00, 0x00, 0x00, 0x01, // stream_id + 0x00, 0x00, 0x00, 0x00, // length + ]; + state.extend_buffer(&data); + state.parse_frames(); + + assert!(matches!( + state.terminated, + Some(Err(YamuxFrameParseError::Flags(0xFFFF))) + )); +} + +/// Tests invalid frame type handling as per spec: +/// Only types 0x0 (Data), 0x1 (Window Update), +/// 0x2 (Ping), and 0x3 (Go Away) are valid +#[test] +fn test_invalid_frame_type() { + let mut state = P2pNetworkYamuxState::default(); + + // Invalid frame type + let data = vec![ + 0x00, // version + 0xFF, // invalid type + 0x00, 0x00, // flags + 0x00, 0x00, 0x00, 0x01, // stream_id + 0x00, 0x00, 0x00, 0x00, // length + ]; + state.extend_buffer(&data); + state.parse_frames(); + + assert!(matches!( + state.terminated, + Some(Err(YamuxFrameParseError::Type(0xFF))) + )); +} + +/// Tests GoAway error codes as specified: +/// - 0x0 Normal termination +/// - 0x1 Protocol error +/// - 0x2 Internal error +#[test] +fn test_goaway_error_codes() { + let mut state = P2pNetworkYamuxState::default(); + + // Test each GoAway error code + for (code, expected_result) in &[ + (0u8, Ok(())), + (1u8, Err(YamuxSessionError::Protocol)), + (2u8, Err(YamuxSessionError::Internal)), + ] { + let data = vec![ + 0x00, // version + 0x03, // GoAway type + 0x00, 0x00, // flags + 0x00, 0x00, 0x00, 0x00, // stream_id (0 for session) + 0x00, 0x00, 0x00, *code, // error code + ]; + state.extend_buffer(&data); + state.parse_frames(); + + assert_eq!(state.incoming.len(), 1); + let frame = state.incoming.pop_front().unwrap(); + match frame.inner { + YamuxFrameInner::GoAway(result) => assert_eq!(&result, expected_result), + _ => panic!("Expected GoAway frame"), + } + } +} + +/// Tests buffer capacity management for received data +/// Implementation-specific but critical for memory management +#[test] +fn test_buffer_capacity_management() { + let mut state = P2pNetworkYamuxState::default(); + + // Fill buffer with some data + let initial_data = vec![0; INITIAL_RECV_BUFFER_CAPACITY]; + state.extend_buffer(&initial_data); + + // Check initial capacity + assert_eq!(state.buffer.capacity(), INITIAL_RECV_BUFFER_CAPACITY); + + // Add more data to trigger buffer growth + state.extend_buffer(&[0; INITIAL_RECV_BUFFER_CAPACITY]); + + // Verify buffer has grown + assert!(state.buffer.capacity() > INITIAL_RECV_BUFFER_CAPACITY); + + // Consume most of the data and compact + state.shift_and_compact_buffer(INITIAL_RECV_BUFFER_CAPACITY + INITIAL_RECV_BUFFER_CAPACITY / 2); + + // Check if buffer was compacted + // Note: The actual capacity might be larger than INITIAL_RECV_BUFFER_CAPACITY + // because Vec doesn't automatically shrink unless specifically requested + assert!(state.buffer.capacity() >= INITIAL_RECV_BUFFER_CAPACITY); + assert_eq!(state.buffer.len(), INITIAL_RECV_BUFFER_CAPACITY / 2); +} + +/// Tests complete stream lifecycle as per spec: +/// - Stream establishment (SYN/ACK handshake) +/// - Data transfer +/// - Stream closure (FIN) +#[test] +fn test_stream_lifecycle() { + // TODO: This test is incomplete: + // - Doesn't verify stream state changes after each frame + // - Should test both sides of the handshake + // - Needs to verify cleanup after FIN + // - Should test RST handling + let mut state = P2pNetworkYamuxState::default(); + let stream_id = 1; + + // Test stream establishment (SYN -> ACK) + let stream = YamuxStreamState::default(); + assert!(!stream.established); + state.streams.insert(stream_id, stream); + + // Simulate SYN + let syn_frame = YamuxFrame { + flags: YamuxFlags::SYN, + stream_id, + inner: YamuxFrameInner::Data(Data::from(vec![1, 2])), + }; + state.incoming.push_back(syn_frame); + + // Simulate ACK + let ack_frame = YamuxFrame { + flags: YamuxFlags::ACK, + stream_id, + inner: YamuxFrameInner::WindowUpdate { difference: 0 }, + }; + state.incoming.push_back(ack_frame); + + // Test stream closure (FIN) + let fin_frame = YamuxFrame { + flags: YamuxFlags::FIN, + stream_id, + inner: YamuxFrameInner::Data(Data::from(vec![])), + }; + state.incoming.push_back(fin_frame); +} + +/// Tests window size overflow protection: +/// Ensures window updates don't exceed MAX_WINDOW_SIZE +/// Implementation-specific safety measure +#[test] +fn test_window_overflow() { + let mut state = P2pNetworkYamuxState::default(); + let stream_id = 1; + + let stream = YamuxStreamState { + window_theirs: MAX_WINDOW_SIZE, + ..YamuxStreamState::default() + }; + state.streams.insert(stream_id, stream); + + // Try to update beyond MAX_WINDOW_SIZE + let update_frame = YamuxFrame { + flags: YamuxFlags::empty(), + stream_id, + inner: YamuxFrameInner::WindowUpdate { + difference: MAX_WINDOW_SIZE, + }, + }; + state.incoming.push_back(update_frame); + + let stream = state.streams.get(&stream_id).unwrap(); + assert!(stream.window_theirs <= MAX_WINDOW_SIZE); +} + +/// Tests handling of malformed frames: +/// Ensures proper error handling when frame length +/// doesn't match actual data length +#[test] +fn test_malformed_frame() { + let mut state = P2pNetworkYamuxState { + message_size_limit: Limit::Some(1024), + ..P2pNetworkYamuxState::default() + }; + + // Frame with length field larger than actual data + let data = vec![ + 0x00, // version + 0x00, // type (DATA) + 0x00, 0x00, // flags + 0x00, 0x00, 0x00, 0x01, // stream_id + 0x00, 0x00, 0x00, 0x10, // length (16 bytes) + 0x01, 0x02, 0x03, // only 3 bytes of payload + ]; + state.extend_buffer(&data); + state.parse_frames(); + assert_eq!(state.incoming.len(), 0); // Should not parse malformed frame +} + +/// Tests session stream (ID 0) rules per spec: +/// "The 0 ID is reserved to represent the session." +/// Verifies proper handling of session-level messages +#[test] +fn test_session_stream_rules() { + // TODO: This test is incomplete: + // - Should verify rejection of data frames on session stream + // - Needs to test all allowed frame types (ping and go-away) + // - Should verify session stream handling in both directions + let mut state = P2pNetworkYamuxState::default(); + + // Test session stream (ID 0) with data frame (should be invalid) + let data = vec![ + 0x00, // version + 0x00, // type (DATA) + 0x00, 0x00, // flags + 0x00, 0x00, 0x00, 0x00, // stream_id 0 (session) + 0x00, 0x00, 0x00, 0x01, // length + 0x00, // payload + ]; + state.extend_buffer(&data); + state.parse_frames(); + + // Verify only ping and go-away frames are allowed on session stream + let frame = YamuxFrame { + flags: YamuxFlags::empty(), + stream_id: 0, + inner: YamuxFrameInner::Ping { opaque: 1 }, + }; + assert!(frame.is_session_stream()); +} + +/// Tests stream state transitions: +/// - Initial state +/// - SYN received +/// - Establishment +/// - Readable/writable states +#[test] +fn test_stream_state_transitions() { + // TODO: This test is incomplete: + // - Doesn't verify state transitions are atomic + // - Should test invalid state transitions + // - Needs to verify readable/writable states after various events + // - Should test concurrent operations on stream + let mut state = P2pNetworkYamuxState::default(); + let stream_id = 1; + + // Initial state + let stream = YamuxStreamState::default(); + assert!(!stream.established); + assert!(!stream.readable); + assert!(!stream.writable); + state.streams.insert(stream_id, stream); + + // Test SYN -> established transition + let syn_frame = YamuxFrame { + flags: YamuxFlags::SYN, + stream_id, + inner: YamuxFrameInner::Data(Data::from(vec![])), + }; + state.incoming.push_back(syn_frame); + + // Test readable/writable state after establishment + if let Some(stream) = state.streams.get_mut(&stream_id) { + stream.established = true; + stream.readable = true; + stream.writable = true; + assert!(stream.established); + } +}