Skip to content

Commit 555bc51

Browse files
authored
Merge pull request #817 from openmina/fix/yamux-backpressure
Introduce a queue that allows to respect peer window size
2 parents 07fa6a1 + e54fb26 commit 555bc51

File tree

6 files changed

+141
-17
lines changed

6 files changed

+141
-17
lines changed

p2p/src/network/scheduler/p2p_network_scheduler_actions.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ pub enum P2pNetworkSchedulerAction {
7272
addr: ConnectionAddr,
7373
peer_id: PeerId,
7474
message_size_limit: Limit<usize>,
75+
pending_outgoing_limit: Limit<usize>,
7576
},
7677

7778
/// Action that initiate the specified peer disconnection.

p2p/src/network/scheduler/p2p_network_scheduler_reducer.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,8 +249,9 @@ impl P2pNetworkSchedulerState {
249249
}
250250
P2pNetworkSchedulerAction::YamuxDidInit {
251251
addr,
252-
message_size_limit,
253252
peer_id,
253+
message_size_limit,
254+
pending_outgoing_limit,
254255
} => {
255256
let Some(cn) = scheduler_state.connections.get_mut(&addr) else {
256257
bug_condition!(
@@ -261,6 +262,7 @@ impl P2pNetworkSchedulerState {
261262
if let Some(P2pNetworkConnectionMuxState::Yamux(yamux)) = &mut cn.mux {
262263
yamux.init = true;
263264
yamux.message_size_limit = message_size_limit;
265+
yamux.pending_outgoing_limit = pending_outgoing_limit;
264266
}
265267

266268
let incoming = cn.incoming;
@@ -503,10 +505,13 @@ impl P2pNetworkSchedulerState {
503505
return;
504506
};
505507
let message_size_limit = p2p_state.config.limits.yamux_message_size();
508+
let pending_outgoing_limit =
509+
p2p_state.config.limits.yamux_pending_outgoing_per_peer();
506510
dispatcher.push(P2pNetworkSchedulerAction::YamuxDidInit {
507511
addr,
508512
peer_id,
509513
message_size_limit,
514+
pending_outgoing_limit,
510515
});
511516
}
512517
Some(Protocol::Stream(kind)) => {

p2p/src/network/scheduler/p2p_network_scheduler_state.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,10 @@ pub enum P2pNetworkConnectionError {
196196
StreamReset(StreamId),
197197
#[error("pubsub error: {0}")]
198198
PubSubError(String),
199+
#[error("peer make us keep too much data at stream {0}")]
200+
YamuxOverflow(StreamId),
201+
#[error("peer should not decrease window size at stream {0}")]
202+
YamuxBadWindowUpdate(StreamId),
199203
}
200204

201205
#[derive(Serialize, Deserialize, Debug, Clone)]

p2p/src/network/yamux/p2p_network_yamux_reducer.rs

Lines changed: 82 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::collections::VecDeque;
2+
13
use openmina_core::{bug_condition, fuzz_maybe, fuzzed_maybe, Substate, SubstateAccess};
24

35
use crate::P2pLimits;
@@ -155,14 +157,14 @@ impl P2pNetworkYamuxState {
155157
data,
156158
mut flags,
157159
} => {
158-
let yamux_state = yamux_state
160+
let stream_state = yamux_state
159161
.streams
160162
.get(&stream_id)
161163
.ok_or_else(|| format!("Stream with id {stream_id} not found for `P2pNetworkYamuxAction::OutgoingData`"))?;
162164

163-
if !yamux_state.incoming && !yamux_state.established && !yamux_state.syn_sent {
165+
if !stream_state.incoming && !stream_state.established && !stream_state.syn_sent {
164166
flags.insert(YamuxFlags::SYN);
165-
} else if yamux_state.incoming && !yamux_state.established {
167+
} else if stream_state.incoming && !stream_state.established {
166168
flags.insert(YamuxFlags::ACK);
167169
}
168170

@@ -180,6 +182,7 @@ impl P2pNetworkYamuxState {
180182
Ok(())
181183
}
182184
P2pNetworkYamuxAction::IncomingFrame { addr, frame } => {
185+
let mut pending_outgoing = VecDeque::default();
183186
if let Some(frame) = yamux_state.incoming.pop_front() {
184187
if frame.flags.contains(YamuxFlags::SYN) {
185188
yamux_state
@@ -211,11 +214,32 @@ impl P2pNetworkYamuxState {
211214
}
212215
}
213216
YamuxFrameInner::WindowUpdate { difference } => {
214-
yamux_state
217+
let stream = yamux_state
215218
.streams
216219
.entry(frame.stream_id)
217-
.or_insert_with(YamuxStreamState::incoming)
218-
.update_window(false, difference);
220+
.or_insert_with(YamuxStreamState::incoming);
221+
stream.update_window(false, difference);
222+
if difference > 0 {
223+
// have some fresh space in the window
224+
// try send as many frames as can
225+
let mut window = stream.window_theirs;
226+
while let Some(mut frame) = stream.pending.pop_front() {
227+
let len = frame.len() as u32;
228+
if let Some(new_window) = window.checked_sub(len) {
229+
pending_outgoing.push_back(frame);
230+
window = new_window;
231+
} else {
232+
if let Some(remaining) =
233+
frame.split_at((len - window) as usize)
234+
{
235+
stream.pending.push_front(remaining);
236+
}
237+
pending_outgoing.push_back(frame);
238+
239+
break;
240+
}
241+
}
242+
}
219243
}
220244
YamuxFrameInner::Ping { .. } => {}
221245
YamuxFrameInner::GoAway(res) => yamux_state.set_res(res),
@@ -282,15 +306,17 @@ impl P2pNetworkYamuxState {
282306
}
283307
match &frame.inner {
284308
YamuxFrameInner::Data(data) => {
309+
// here we are very permissive
310+
// always when our window is smaller 64 kb, just increase it by 256 kb
311+
// if we need fine grained back pressure, it should be implemented here
285312
if stream.window_ours < 64 * 1024 {
313+
let difference = 256 * 1024;
286314
dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame {
287315
addr,
288316
frame: YamuxFrame {
289317
stream_id: frame.stream_id,
290318
flags: YamuxFlags::empty(),
291-
inner: YamuxFrameInner::WindowUpdate {
292-
difference: 256 * 1024,
293-
},
319+
inner: YamuxFrameInner::WindowUpdate { difference },
294320
},
295321
});
296322
}
@@ -318,21 +344,61 @@ impl P2pNetworkYamuxState {
318344
});
319345
}
320346
}
347+
YamuxFrameInner::WindowUpdate { difference } => {
348+
if *difference < 0 {
349+
let error =
350+
P2pNetworkConnectionError::YamuxBadWindowUpdate(frame.stream_id);
351+
dispatcher.push(P2pNetworkSchedulerAction::Error { addr, error });
352+
} else {
353+
while let Some(frame) = pending_outgoing.pop_front() {
354+
dispatcher
355+
.push(P2pNetworkYamuxAction::OutgoingFrame { addr, frame });
356+
}
357+
}
358+
}
321359
_ => {}
322360
}
323361

324362
Ok(())
325363
}
326-
P2pNetworkYamuxAction::OutgoingFrame { frame, addr } => {
327-
let Some(stream) = yamux_state.streams.get_mut(&frame.stream_id) else {
364+
P2pNetworkYamuxAction::OutgoingFrame { mut frame, addr } => {
365+
let stream_id = frame.stream_id;
366+
let Some(stream) = yamux_state.streams.get_mut(&stream_id) else {
328367
return Ok(());
329368
};
330-
match &frame.inner {
369+
match &mut frame.inner {
331370
YamuxFrameInner::Data(data) => {
332-
// must not underflow
333-
// the action must not dispatch if it doesn't fit in the window
334-
// TODO: add pending queue, where frames will wait for window increase
335-
stream.window_theirs = stream.window_theirs.wrapping_sub(data.len() as u32);
371+
if let Some(new_window) =
372+
stream.window_theirs.checked_sub(data.len() as u32)
373+
{
374+
// their window is big enough, decrease the size
375+
// and send the whole frame
376+
stream.window_theirs = new_window;
377+
} else if stream.window_theirs != 0 && stream.pending.is_empty() {
378+
// their window is not big enough, but has some space,
379+
// and the queue is empty,
380+
// do not send the whole frame,
381+
// split it and put remaining in the queue,
382+
if let Some(remaining) = frame.split_at(stream.window_theirs as usize) {
383+
stream.pending.push_back(remaining);
384+
}
385+
// the window will be zero after sending
386+
stream.window_theirs = 0;
387+
} else {
388+
// either the window cannot accept any byte,
389+
// or the queue is already not empty
390+
// in both cases the whole frame goes in the queue and nothing to send
391+
stream.pending.push_back(frame);
392+
if stream.pending.iter().map(YamuxFrame::len).sum::<usize>()
393+
> yamux_state.pending_outgoing_limit
394+
{
395+
let dispatcher = state_context.into_dispatcher();
396+
let error = P2pNetworkConnectionError::YamuxOverflow(stream_id);
397+
dispatcher.push(P2pNetworkSchedulerAction::Error { addr, error });
398+
}
399+
400+
return Ok(());
401+
}
336402
}
337403
YamuxFrameInner::WindowUpdate { difference } => {
338404
stream.update_window(true, *difference);

p2p/src/network/yamux/p2p_network_yamux_state.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use super::super::*;
77
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
88
pub struct P2pNetworkYamuxState {
99
pub message_size_limit: Limit<usize>,
10+
pub pending_outgoing_limit: Limit<usize>,
1011
pub buffer: Vec<u8>,
1112
pub incoming: VecDeque<YamuxFrame>,
1213
pub streams: BTreeMap<StreamId, YamuxStreamState>,
@@ -54,6 +55,7 @@ pub struct YamuxStreamState {
5455
pub writable: bool,
5556
pub window_theirs: u32,
5657
pub window_ours: u32,
58+
pub pending: VecDeque<YamuxFrame>,
5759
}
5860

5961
impl Default for YamuxStreamState {
@@ -66,6 +68,7 @@ impl Default for YamuxStreamState {
6668
writable: false,
6769
window_theirs: 256 * 1024,
6870
window_ours: 256 * 1024,
71+
pending: VecDeque::default(),
6972
}
7073
}
7174
}
@@ -182,6 +185,43 @@ impl YamuxFrame {
182185

183186
vec
184187
}
188+
189+
pub fn len(&self) -> usize {
190+
if let YamuxFrameInner::Data(data) = &self.inner {
191+
data.len()
192+
} else {
193+
0
194+
}
195+
}
196+
197+
/// If this data is bigger then `pos`, keep only first `pos` bytes and return some remaining
198+
/// otherwise return none
199+
pub fn split_at(&mut self, pos: usize) -> Option<Self> {
200+
use std::ops::Sub;
201+
202+
if let YamuxFrameInner::Data(data) = &mut self.inner {
203+
if data.len() <= pos {
204+
return None;
205+
}
206+
let (keep, rest) = data.split_at(pos);
207+
let rest = Data(rest.to_vec().into_boxed_slice());
208+
*data = Data(keep.to_vec().into_boxed_slice());
209+
210+
let fin = if self.flags.contains(YamuxFlags::FIN) {
211+
self.flags.remove(YamuxFlags::FIN);
212+
YamuxFlags::FIN
213+
} else {
214+
YamuxFlags::empty()
215+
};
216+
Some(YamuxFrame {
217+
flags: self.flags.sub(YamuxFlags::SYN | YamuxFlags::ACK) | fin,
218+
stream_id: self.stream_id,
219+
inner: YamuxFrameInner::Data(rest),
220+
})
221+
} else {
222+
None
223+
}
224+
}
185225
}
186226

187227
#[derive(Serialize, Deserialize, Debug, Clone)]

p2p/src/p2p_config.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,7 @@ pub struct P2pLimits {
252252
max_peers_in_state: Limit<usize>,
253253
max_streams: Limit<usize>,
254254
yamux_message_size: Limit<usize>,
255+
yamux_pending_outgoing_per_peer: Limit<usize>,
255256

256257
identify_message: Limit<usize>,
257258
kademlia_request: Limit<usize>,
@@ -319,6 +320,12 @@ impl P2pLimits {
319320
/// Sets the maximum number of streams that a peer is allowed to open simultaneously.
320321
with_yamux_message_size
321322
);
323+
limit!(
324+
/// Maximum number of streams from a peer.
325+
yamux_pending_outgoing_per_peer,
326+
/// Sets the maximum number of streams that a peer is allowed to open simultaneously.
327+
with_yamux_pending_outgoing_per_peer
328+
);
322329

323330
limit!(
324331
/// Minimum number of peers.
@@ -400,6 +407,7 @@ impl Default for P2pLimits {
400407
max_peers_in_state,
401408
max_streams,
402409
yamux_message_size,
410+
yamux_pending_outgoing_per_peer: rpc_get_staged_ledger,
403411

404412
identify_message,
405413
kademlia_request,

0 commit comments

Comments
 (0)