Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 70 additions & 15 deletions p2p/src/network/yamux/p2p_network_yamux_reducer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::VecDeque;

use openmina_core::{bug_condition, fuzz_maybe, fuzzed_maybe, Substate, SubstateAccess};

use crate::P2pLimits;
Expand Down Expand Up @@ -155,14 +157,14 @@ impl P2pNetworkYamuxState {
data,
mut flags,
} => {
let yamux_state = yamux_state
let stream_state = yamux_state
.streams
.get(&stream_id)
.ok_or_else(|| format!("Stream with id {stream_id} not found for `P2pNetworkYamuxAction::OutgoingData`"))?;

if !yamux_state.incoming && !yamux_state.established && !yamux_state.syn_sent {
if !stream_state.incoming && !stream_state.established && !stream_state.syn_sent {
flags.insert(YamuxFlags::SYN);
} else if yamux_state.incoming && !yamux_state.established {
} else if stream_state.incoming && !stream_state.established {
flags.insert(YamuxFlags::ACK);
}

Expand All @@ -180,6 +182,7 @@ impl P2pNetworkYamuxState {
Ok(())
}
P2pNetworkYamuxAction::IncomingFrame { addr, frame } => {
let mut pending_outgoing = VecDeque::default();
if let Some(frame) = yamux_state.incoming.pop_front() {
if frame.flags.contains(YamuxFlags::SYN) {
yamux_state
Expand Down Expand Up @@ -211,11 +214,37 @@ impl P2pNetworkYamuxState {
}
}
YamuxFrameInner::WindowUpdate { difference } => {
yamux_state
let stream = yamux_state
.streams
.entry(frame.stream_id)
.or_insert_with(YamuxStreamState::incoming)
.update_window(false, difference);
.or_insert_with(YamuxStreamState::incoming);
stream.update_window(false, difference);
if difference > 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be 0 or negative? in which cases does that happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my implementation it is signed, but in go implementation it is unsigned, so the window could only grow. So, this is just sanity check, maybe redundant. Peer cannot harm us by making its own window too big.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, please add a bug_condition! call in the else branch so that if this ever happens for some reason we will know about it. That is something we want to do in general, to avoid such unexpected (unenforced) cases happening silently without us noticing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add a runtime error, not bug_condition, because peer can send us this and we should not crash.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, this is based on external input, so it is not exactly a bug_condition!. Btw bug_condition only panics if a flag is enabled in the environment, otherwise it just logs an error (but again this specific case is not for it)

// have some fresh space in the window
// try send as many frames as can
// last frame
let mut window = stream.window_theirs;
while let Some(mut frame) = stream.pending.pop_front() {
let size = if let YamuxFrameInner::Data(data) = &frame.inner {
data.len()
} else {
0
} as u32;
if let Some(new_window) = window.checked_sub(size) {
pending_outgoing.push_back(frame);
window = new_window;
} else {
if let Some(remaining) =
frame.split_at((size - window) as usize)
{
stream.pending.push_front(remaining);
}
pending_outgoing.push_back(frame);

break;
}
}
}
}
YamuxFrameInner::Ping { .. } => {}
YamuxFrameInner::GoAway(res) => yamux_state.set_res(res),
Expand Down Expand Up @@ -282,15 +311,17 @@ impl P2pNetworkYamuxState {
}
match &frame.inner {
YamuxFrameInner::Data(data) => {
// here we are very permissive
// always when our window is smaller 64 kb, just increase it by 256 kb
// if we need fine grained back pressure, it should be implemented here
if stream.window_ours < 64 * 1024 {
let difference = 256 * 1024;
dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame {
addr,
frame: YamuxFrame {
stream_id: frame.stream_id,
flags: YamuxFlags::empty(),
inner: YamuxFrameInner::WindowUpdate {
difference: 256 * 1024,
},
inner: YamuxFrameInner::WindowUpdate { difference },
},
});
}
Expand Down Expand Up @@ -318,21 +349,45 @@ impl P2pNetworkYamuxState {
});
}
}
YamuxFrameInner::WindowUpdate { .. } => {
while let Some(frame) = pending_outgoing.pop_front() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any bound or practical limit in how many elements we can have in pending_outgoing?
I'm concerned that someone could try to fill the dispatch queue and and cause blockings in the state machine

dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame { addr, frame });
}
}
_ => {}
}

Ok(())
}
P2pNetworkYamuxAction::OutgoingFrame { frame, addr } => {
P2pNetworkYamuxAction::OutgoingFrame { mut frame, addr } => {
let Some(stream) = yamux_state.streams.get_mut(&frame.stream_id) else {
return Ok(());
};
match &frame.inner {
match &mut frame.inner {
YamuxFrameInner::Data(data) => {
// must not underflow
// the action must not dispatch if it doesn't fit in the window
// TODO: add pending queue, where frames will wait for window increase
stream.window_theirs = stream.window_theirs.wrapping_sub(data.len() as u32);
if let Some(new_window) =
stream.window_theirs.checked_sub(data.len() as u32)
{
// their window is big enough, decrease the size
// and send the whole frame
stream.window_theirs = new_window;
} else if stream.window_theirs != 0 && stream.pending.is_empty() {
// their window is not big enough, but has some space,
// and the queue is empty,
// do not send the whole frame,
// split it and put remaining in the queue,
if let Some(remaining) = frame.split_at(stream.window_theirs as usize) {
stream.pending.push_back(remaining);
}
// the window will be zero after sending
stream.window_theirs = 0;
} else {
// either the window cannot accept any byte,
// or the queue is already not empty
// in both cases the whole frame goes in the queue and nothing to send
stream.pending.push_back(frame);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add a limit here. Malicious peer can keep its window small and send an RPC request in a loop.

return Ok(());
}
}
YamuxFrameInner::WindowUpdate { difference } => {
stream.update_window(true, *difference);
Expand Down
31 changes: 31 additions & 0 deletions p2p/src/network/yamux/p2p_network_yamux_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub struct YamuxStreamState {
pub writable: bool,
pub window_theirs: u32,
pub window_ours: u32,
pub pending: VecDeque<YamuxFrame>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make this bounded

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I missed this

}

impl Default for YamuxStreamState {
Expand All @@ -66,6 +67,7 @@ impl Default for YamuxStreamState {
writable: false,
window_theirs: 256 * 1024,
window_ours: 256 * 1024,
pending: VecDeque::default(),
}
}
}
Expand Down Expand Up @@ -182,6 +184,35 @@ impl YamuxFrame {

vec
}

/// If this data is bigger then `pos`, keep only first `pos` bytes and return some remaining
/// otherwise return none
pub fn split_at(&mut self, pos: usize) -> Option<Self> {
use std::ops::Sub;

if let YamuxFrameInner::Data(data) = &mut self.inner {
if data.len() == pos {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe here should be if data.len() <= pos

return None;
}
let (keep, rest) = data.split_at(pos);
let rest = Data(rest.to_vec().into_boxed_slice());
*data = Data(keep.to_vec().into_boxed_slice());

let fin = if self.flags.contains(YamuxFlags::FIN) {
self.flags.remove(YamuxFlags::FIN);
YamuxFlags::FIN
} else {
YamuxFlags::empty()
};
Some(YamuxFrame {
flags: self.flags.sub(YamuxFlags::SYN | YamuxFlags::ACK) | fin,
stream_id: self.stream_id,
inner: YamuxFrameInner::Data(rest),
})
} else {
None
}
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down