Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions p2p/src/network/scheduler/p2p_network_scheduler_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub enum P2pNetworkSchedulerAction {
addr: ConnectionAddr,
peer_id: PeerId,
message_size_limit: Limit<usize>,
pending_outgoing_limit: Limit<usize>,
},

/// Action that initiate the specified peer disconnection.
Expand Down
7 changes: 6 additions & 1 deletion p2p/src/network/scheduler/p2p_network_scheduler_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,9 @@ impl P2pNetworkSchedulerState {
}
P2pNetworkSchedulerAction::YamuxDidInit {
addr,
message_size_limit,
peer_id,
message_size_limit,
pending_outgoing_limit,
} => {
let Some(cn) = scheduler_state.connections.get_mut(&addr) else {
bug_condition!(
Expand All @@ -261,6 +262,7 @@ impl P2pNetworkSchedulerState {
if let Some(P2pNetworkConnectionMuxState::Yamux(yamux)) = &mut cn.mux {
yamux.init = true;
yamux.message_size_limit = message_size_limit;
yamux.pending_outgoing_limit = pending_outgoing_limit;
}

let incoming = cn.incoming;
Expand Down Expand Up @@ -503,10 +505,13 @@ impl P2pNetworkSchedulerState {
return;
};
let message_size_limit = p2p_state.config.limits.yamux_message_size();
let pending_outgoing_limit =
p2p_state.config.limits.yamux_pending_outgoing_per_peer();
dispatcher.push(P2pNetworkSchedulerAction::YamuxDidInit {
addr,
peer_id,
message_size_limit,
pending_outgoing_limit,
});
}
Some(Protocol::Stream(kind)) => {
Expand Down
4 changes: 4 additions & 0 deletions p2p/src/network/scheduler/p2p_network_scheduler_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ pub enum P2pNetworkConnectionError {
StreamReset(StreamId),
#[error("pubsub error: {0}")]
PubSubError(String),
#[error("peer make us keep too much data at stream {0}")]
YamuxOverflow(StreamId),
#[error("peer should not decrease window size at stream {0}")]
YamuxBadWindowUpdate(StreamId),
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down
98 changes: 82 additions & 16 deletions p2p/src/network/yamux/p2p_network_yamux_reducer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::VecDeque;

use openmina_core::{bug_condition, fuzz_maybe, fuzzed_maybe, Substate, SubstateAccess};

use crate::P2pLimits;
Expand Down Expand Up @@ -155,14 +157,14 @@ impl P2pNetworkYamuxState {
data,
mut flags,
} => {
let yamux_state = yamux_state
let stream_state = yamux_state
.streams
.get(&stream_id)
.ok_or_else(|| format!("Stream with id {stream_id} not found for `P2pNetworkYamuxAction::OutgoingData`"))?;

if !yamux_state.incoming && !yamux_state.established && !yamux_state.syn_sent {
if !stream_state.incoming && !stream_state.established && !stream_state.syn_sent {
flags.insert(YamuxFlags::SYN);
} else if yamux_state.incoming && !yamux_state.established {
} else if stream_state.incoming && !stream_state.established {
flags.insert(YamuxFlags::ACK);
}

Expand All @@ -180,6 +182,7 @@ impl P2pNetworkYamuxState {
Ok(())
}
P2pNetworkYamuxAction::IncomingFrame { addr, frame } => {
let mut pending_outgoing = VecDeque::default();
if let Some(frame) = yamux_state.incoming.pop_front() {
if frame.flags.contains(YamuxFlags::SYN) {
yamux_state
Expand Down Expand Up @@ -211,11 +214,32 @@ impl P2pNetworkYamuxState {
}
}
YamuxFrameInner::WindowUpdate { difference } => {
yamux_state
let stream = yamux_state
.streams
.entry(frame.stream_id)
.or_insert_with(YamuxStreamState::incoming)
.update_window(false, difference);
.or_insert_with(YamuxStreamState::incoming);
stream.update_window(false, difference);
if difference > 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be 0 or negative? in which cases does that happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my implementation it is signed, but in go implementation it is unsigned, so the window could only grow. So, this is just sanity check, maybe redundant. Peer cannot harm us by making its own window too big.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, please add a bug_condition! call in the else branch so that if this ever happens for some reason we will know about it. That is something we want to do in general, to avoid such unexpected (unenforced) cases happening silently without us noticing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add a runtime error, not bug_condition, because peer can send us this and we should not crash.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, this is based on external input, so it is not exactly a bug_condition!. Btw bug_condition only panics if a flag is enabled in the environment, otherwise it just logs an error (but again this specific case is not for it)

// have some fresh space in the window
// try send as many frames as can
let mut window = stream.window_theirs;
while let Some(mut frame) = stream.pending.pop_front() {
let len = frame.len() as u32;
if let Some(new_window) = window.checked_sub(len) {
pending_outgoing.push_back(frame);
window = new_window;
} else {
if let Some(remaining) =
frame.split_at((len - window) as usize)
{
stream.pending.push_front(remaining);
}
pending_outgoing.push_back(frame);

break;
}
}
}
}
YamuxFrameInner::Ping { .. } => {}
YamuxFrameInner::GoAway(res) => yamux_state.set_res(res),
Expand Down Expand Up @@ -282,15 +306,17 @@ impl P2pNetworkYamuxState {
}
match &frame.inner {
YamuxFrameInner::Data(data) => {
// here we are very permissive
// always when our window is smaller 64 kb, just increase it by 256 kb
// if we need fine grained back pressure, it should be implemented here
if stream.window_ours < 64 * 1024 {
let difference = 256 * 1024;
dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame {
addr,
frame: YamuxFrame {
stream_id: frame.stream_id,
flags: YamuxFlags::empty(),
inner: YamuxFrameInner::WindowUpdate {
difference: 256 * 1024,
},
inner: YamuxFrameInner::WindowUpdate { difference },
},
});
}
Expand Down Expand Up @@ -318,21 +344,61 @@ impl P2pNetworkYamuxState {
});
}
}
YamuxFrameInner::WindowUpdate { difference } => {
if *difference < 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vlad9486 what about 0 ? that case is not covered. The other condition uses > 0 and this one < 0, which means there is no handling for 0

Copy link
Contributor Author

@vlad9486 vlad9486 Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tizoc Yes, initially I wrote <=, but apparently the peer sends window update with zero if it wants to open the stream but not send data, so it sends flag SYN. Of course it could send data with zero length body, but it sends window update.

let error =
P2pNetworkConnectionError::YamuxBadWindowUpdate(frame.stream_id);
dispatcher.push(P2pNetworkSchedulerAction::Error { addr, error });
} else {
while let Some(frame) = pending_outgoing.pop_front() {
dispatcher
.push(P2pNetworkYamuxAction::OutgoingFrame { addr, frame });
}
}
}
_ => {}
}

Ok(())
}
P2pNetworkYamuxAction::OutgoingFrame { frame, addr } => {
let Some(stream) = yamux_state.streams.get_mut(&frame.stream_id) else {
P2pNetworkYamuxAction::OutgoingFrame { mut frame, addr } => {
let stream_id = frame.stream_id;
let Some(stream) = yamux_state.streams.get_mut(&stream_id) else {
return Ok(());
};
match &frame.inner {
match &mut frame.inner {
YamuxFrameInner::Data(data) => {
// must not underflow
// the action must not dispatch if it doesn't fit in the window
// TODO: add pending queue, where frames will wait for window increase
stream.window_theirs = stream.window_theirs.wrapping_sub(data.len() as u32);
if let Some(new_window) =
stream.window_theirs.checked_sub(data.len() as u32)
{
// their window is big enough, decrease the size
// and send the whole frame
stream.window_theirs = new_window;
} else if stream.window_theirs != 0 && stream.pending.is_empty() {
// their window is not big enough, but has some space,
// and the queue is empty,
// do not send the whole frame,
// split it and put remaining in the queue,
if let Some(remaining) = frame.split_at(stream.window_theirs as usize) {
stream.pending.push_back(remaining);
}
// the window will be zero after sending
stream.window_theirs = 0;
} else {
// either the window cannot accept any byte,
// or the queue is already not empty
// in both cases the whole frame goes in the queue and nothing to send
stream.pending.push_back(frame);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add a limit here. Malicious peer can keep its window small and send an RPC request in a loop.

if stream.pending.iter().map(YamuxFrame::len).sum::<usize>()
> yamux_state.pending_outgoing_limit
{
let dispatcher = state_context.into_dispatcher();
let error = P2pNetworkConnectionError::YamuxOverflow(stream_id);
dispatcher.push(P2pNetworkSchedulerAction::Error { addr, error });
}

return Ok(());
}
}
YamuxFrameInner::WindowUpdate { difference } => {
stream.update_window(true, *difference);
Expand Down
40 changes: 40 additions & 0 deletions p2p/src/network/yamux/p2p_network_yamux_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use super::super::*;
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
pub struct P2pNetworkYamuxState {
pub message_size_limit: Limit<usize>,
pub pending_outgoing_limit: Limit<usize>,
pub buffer: Vec<u8>,
pub incoming: VecDeque<YamuxFrame>,
pub streams: BTreeMap<StreamId, YamuxStreamState>,
Expand Down Expand Up @@ -54,6 +55,7 @@ pub struct YamuxStreamState {
pub writable: bool,
pub window_theirs: u32,
pub window_ours: u32,
pub pending: VecDeque<YamuxFrame>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make this bounded

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I missed this

}

impl Default for YamuxStreamState {
Expand All @@ -66,6 +68,7 @@ impl Default for YamuxStreamState {
writable: false,
window_theirs: 256 * 1024,
window_ours: 256 * 1024,
pending: VecDeque::default(),
}
}
}
Expand Down Expand Up @@ -182,6 +185,43 @@ impl YamuxFrame {

vec
}

pub fn len(&self) -> usize {
if let YamuxFrameInner::Data(data) = &self.inner {
data.len()
} else {
0
}
}

/// If this data is bigger then `pos`, keep only first `pos` bytes and return some remaining
/// otherwise return none
pub fn split_at(&mut self, pos: usize) -> Option<Self> {
use std::ops::Sub;

if let YamuxFrameInner::Data(data) = &mut self.inner {
if data.len() <= pos {
return None;
}
let (keep, rest) = data.split_at(pos);
let rest = Data(rest.to_vec().into_boxed_slice());
*data = Data(keep.to_vec().into_boxed_slice());

let fin = if self.flags.contains(YamuxFlags::FIN) {
self.flags.remove(YamuxFlags::FIN);
YamuxFlags::FIN
} else {
YamuxFlags::empty()
};
Some(YamuxFrame {
flags: self.flags.sub(YamuxFlags::SYN | YamuxFlags::ACK) | fin,
stream_id: self.stream_id,
inner: YamuxFrameInner::Data(rest),
})
} else {
None
}
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down
8 changes: 8 additions & 0 deletions p2p/src/p2p_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ pub struct P2pLimits {
max_peers_in_state: Limit<usize>,
max_streams: Limit<usize>,
yamux_message_size: Limit<usize>,
yamux_pending_outgoing_per_peer: Limit<usize>,

identify_message: Limit<usize>,
kademlia_request: Limit<usize>,
Expand Down Expand Up @@ -319,6 +320,12 @@ impl P2pLimits {
/// Sets the maximum number of streams that a peer is allowed to open simultaneously.
with_yamux_message_size
);
limit!(
/// Maximum number of streams from a peer.
yamux_pending_outgoing_per_peer,
/// Sets the maximum number of streams that a peer is allowed to open simultaneously.
with_yamux_pending_outgoing_per_peer
);

limit!(
/// Minimum number of peers.
Expand Down Expand Up @@ -400,6 +407,7 @@ impl Default for P2pLimits {
max_peers_in_state,
max_streams,
yamux_message_size,
yamux_pending_outgoing_per_peer: rpc_get_staged_ledger,

identify_message,
kademlia_request,
Expand Down