Skip to content

Commit d64cdb7

Browse files
authored
Merge pull request #1040 from 0xMimir/bugfix/yamux-queue
Fixed the issue with pending queue in yamux
2 parents 9819090 + dc40eba commit d64cdb7

File tree

2 files changed

+66
-62
lines changed

2 files changed

+66
-62
lines changed

p2p/src/network/yamux/p2p_network_yamux_reducer.rs

Lines changed: 51 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,17 @@ use std::collections::VecDeque;
22

33
use openmina_core::{bug_condition, fuzz_maybe, fuzzed_maybe, Substate, SubstateAccess};
44

5-
use crate::P2pLimits;
6-
7-
use self::p2p_network_yamux_state::{YamuxFlags, YamuxFrame, YamuxFrameInner, YamuxStreamState};
8-
9-
use super::{super::*, *};
5+
use crate::{
6+
yamux::p2p_network_yamux_state::{YamuxFrame, YamuxFrameInner},
7+
Data, Limit, P2pLimits, P2pNetworkAuthState, P2pNetworkConnectionError,
8+
P2pNetworkConnectionMuxState, P2pNetworkNoiseAction, P2pNetworkSchedulerAction,
9+
P2pNetworkSchedulerState, P2pNetworkSelectAction, P2pNetworkStreamState, SelectKind,
10+
};
11+
12+
use super::{
13+
p2p_network_yamux_state::{YamuxStreamState, MAX_WINDOW_SIZE},
14+
P2pNetworkYamuxAction, P2pNetworkYamuxState, YamuxFlags, YamuxPing,
15+
};
1016

1117
impl P2pNetworkYamuxState {
1218
/// Handles the main reducer logic for Yamux protocol actions. It processes incoming and outgoing
@@ -130,12 +136,12 @@ impl P2pNetworkYamuxState {
130136
}
131137

132138
match &frame.inner {
133-
YamuxFrameInner::Data(data) => {
139+
YamuxFrameInner::Data(_) => {
134140
if let Some(stream) = yamux_state.streams.get_mut(&frame.stream_id) {
135141
// must not underflow
136142
// TODO: check it and disconnect peer that violates flow rules
137143
stream.window_ours =
138-
stream.window_ours.saturating_sub(data.len() as u32);
144+
stream.window_ours.saturating_sub(frame.len_as_u32());
139145
}
140146
}
141147
YamuxFrameInner::WindowUpdate { difference } => {
@@ -150,18 +156,12 @@ impl P2pNetworkYamuxState {
150156
// have some fresh space in the window
151157
// try send as many frames as can
152158
let mut window = stream.window_theirs;
153-
while let Some(mut frame) = stream.pending.pop_front() {
154-
let len = frame.len() as u32;
159+
while let Some(frame) = stream.pending.pop_front() {
160+
let len = frame.len_as_u32();
161+
pending_outgoing.push_back(frame);
155162
if let Some(new_window) = window.checked_sub(len) {
156-
pending_outgoing.push_back(frame);
157163
window = new_window;
158164
} else {
159-
if let Some(remaining) = frame.split_at((len - window) as usize)
160-
{
161-
stream.pending.push_front(remaining);
162-
}
163-
pending_outgoing.push_back(frame);
164-
165165
break;
166166
}
167167
}
@@ -231,11 +231,11 @@ impl P2pNetworkYamuxState {
231231
}
232232
match &frame.inner {
233233
YamuxFrameInner::Data(data) => {
234-
// here we are very permissive
235-
// always when our window is smaller 64 kb, just increase it by 256 kb
236-
// if we need fine grained back pressure, it should be implemented here
237-
if stream.window_ours < 64 * 1024 {
238-
let difference = 256 * 1024;
234+
// when our window size is less than half of the max window size send window update
235+
if stream.window_ours < stream.max_window_size / 2 {
236+
let difference =
237+
stream.max_window_size.saturating_mul(2).min(1024 * 1024);
238+
239239
dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame {
240240
addr,
241241
frame: YamuxFrame {
@@ -285,48 +285,39 @@ impl P2pNetworkYamuxState {
285285
return Ok(());
286286
};
287287
match &mut frame.inner {
288-
YamuxFrameInner::Data(data) => {
289-
stream.window_theirs = stream
290-
.window_theirs
291-
.checked_sub(data.len() as u32)
292-
.unwrap_or_default();
293-
294-
// TODO: code bellow include splitting the frame so that data which doesn't fit inside the window doesn't get sent, this breaks the bootstrap to rust
295-
296-
// if let Some(new_window) =
297-
// stream.window_theirs.checked_sub(data.len() as u32)
298-
// {
299-
// // their window is big enough, decrease the size
300-
// // and send the whole frame
301-
// stream.window_theirs = new_window;
302-
// } else if stream.window_theirs != 0 && stream.pending.is_empty() {
303-
// // their window is not big enough, but has some space,
304-
// // and the queue is empty,
305-
// // do not send the whole frame,
306-
// // split it and put remaining in the queue,
307-
// if let Some(remaining) = frame.split_at(stream.window_theirs as usize) {
308-
// stream.pending.push_back(remaining);
309-
// }
310-
// // the window will be zero after sending
311-
// stream.window_theirs = 0;
312-
// } else {
313-
// // either the window cannot accept any byte,
314-
// // or the queue is already not empty
315-
// // in both cases the whole frame goes in the queue and nothing to send
316-
// stream.pending.push_back(frame);
317-
// if stream.pending.iter().map(YamuxFrame::len).sum::<usize>()
318-
// > yamux_state.pending_outgoing_limit
319-
// {
320-
// let dispatcher = state_context.into_dispatcher();
321-
// let error = P2pNetworkConnectionError::YamuxOverflow(stream_id);
322-
// dispatcher.push(P2pNetworkSchedulerAction::Error { addr, error });
323-
// }
324-
325-
// return Ok(());
326-
// }
288+
YamuxFrameInner::Data(_) => {
289+
if let Some(new_window) =
290+
stream.window_theirs.checked_sub(frame.len_as_u32())
291+
{
292+
// their window is big enough, decrease the size
293+
// and send the whole frame
294+
stream.window_theirs = new_window;
295+
} else {
296+
// their window is not big enough
297+
// split the frame to send as much as you can and put the rest in the queue
298+
if let Some(remaining) = frame.split_at(stream.window_theirs as usize) {
299+
stream.pending.push_front(remaining);
300+
}
301+
302+
// the window will be zero after sending
303+
stream.window_theirs = 0;
304+
305+
// if size of pending that is above the limit, ignore the peer
306+
if stream.pending.iter().map(YamuxFrame::len).sum::<usize>()
307+
> yamux_state.pending_outgoing_limit
308+
{
309+
let dispatcher = state_context.into_dispatcher();
310+
let error = P2pNetworkConnectionError::YamuxOverflow(stream_id);
311+
dispatcher.push(P2pNetworkSchedulerAction::Error { addr, error });
312+
return Ok(());
313+
}
314+
}
327315
}
328316
YamuxFrameInner::WindowUpdate { difference } => {
329317
stream.window_ours = stream.window_ours.saturating_add(*difference);
318+
if stream.window_ours > stream.max_window_size {
319+
stream.max_window_size = stream.window_ours.min(MAX_WINDOW_SIZE);
320+
}
330321
}
331322
_ => {}
332323
}

p2p/src/network/yamux/p2p_network_yamux_state.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ use serde::{Deserialize, Serialize};
66
use super::super::*;
77

88
pub const INITIAL_RECV_BUFFER_CAPACITY: usize = 0x40000; // 256kb
9+
pub const INITIAL_WINDOW_SIZE: u32 = INITIAL_RECV_BUFFER_CAPACITY as u32;
10+
pub const MAX_WINDOW_SIZE: u32 = 16 * 1024 * 1024; // 16mb
911

1012
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
1113
pub struct P2pNetworkYamuxState {
@@ -199,6 +201,7 @@ pub struct YamuxStreamState {
199201
pub writable: bool,
200202
pub window_theirs: u32,
201203
pub window_ours: u32,
204+
pub max_window_size: u32,
202205
pub pending: VecDeque<YamuxFrame>,
203206
}
204207

@@ -210,8 +213,9 @@ impl Default for YamuxStreamState {
210213
established: false,
211214
readable: false,
212215
writable: false,
213-
window_theirs: 256 * 1024,
214-
window_ours: 256 * 1024,
216+
window_theirs: INITIAL_WINDOW_SIZE,
217+
window_ours: INITIAL_WINDOW_SIZE,
218+
max_window_size: INITIAL_WINDOW_SIZE,
215219
pending: VecDeque::default(),
216220
}
217221
}
@@ -339,6 +343,15 @@ impl YamuxFrame {
339343
}
340344
}
341345

346+
// When we parse the frame we parse length as u32 and so `data.len()` should always be representable as u32
347+
pub fn len_as_u32(&self) -> u32 {
348+
if let YamuxFrameInner::Data(data) = &self.inner {
349+
u32::try_from(data.len()).unwrap_or(u32::MAX)
350+
} else {
351+
0
352+
}
353+
}
354+
342355
/// If this data is bigger then `pos`, keep only first `pos` bytes and return some remaining
343356
/// otherwise return none
344357
pub fn split_at(&mut self, pos: usize) -> Option<Self> {

0 commit comments

Comments
 (0)