Skip to content

Commit 7a942be

Browse files
committed
fix(yamux): introduce a queue that allows to respect peer window size
1 parent 1befb61 commit 7a942be

File tree

2 files changed

+101
-15
lines changed

2 files changed

+101
-15
lines changed

p2p/src/network/yamux/p2p_network_yamux_reducer.rs

Lines changed: 70 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::collections::VecDeque;
2+
13
use openmina_core::{bug_condition, fuzz_maybe, fuzzed_maybe, Substate, SubstateAccess};
24

35
use crate::P2pLimits;
@@ -155,14 +157,14 @@ impl P2pNetworkYamuxState {
155157
data,
156158
mut flags,
157159
} => {
158-
let yamux_state = yamux_state
160+
let stream_state = yamux_state
159161
.streams
160162
.get(&stream_id)
161163
.ok_or_else(|| format!("Stream with id {stream_id} not found for `P2pNetworkYamuxAction::OutgoingData`"))?;
162164

163-
if !yamux_state.incoming && !yamux_state.established && !yamux_state.syn_sent {
165+
if !stream_state.incoming && !stream_state.established && !stream_state.syn_sent {
164166
flags.insert(YamuxFlags::SYN);
165-
} else if yamux_state.incoming && !yamux_state.established {
167+
} else if stream_state.incoming && !stream_state.established {
166168
flags.insert(YamuxFlags::ACK);
167169
}
168170

@@ -180,6 +182,7 @@ impl P2pNetworkYamuxState {
180182
Ok(())
181183
}
182184
P2pNetworkYamuxAction::IncomingFrame { addr, frame } => {
185+
let mut pending_outgoing = VecDeque::default();
183186
if let Some(frame) = yamux_state.incoming.pop_front() {
184187
if frame.flags.contains(YamuxFlags::SYN) {
185188
yamux_state
@@ -211,11 +214,37 @@ impl P2pNetworkYamuxState {
211214
}
212215
}
213216
YamuxFrameInner::WindowUpdate { difference } => {
214-
yamux_state
217+
let stream = yamux_state
215218
.streams
216219
.entry(frame.stream_id)
217-
.or_insert_with(YamuxStreamState::incoming)
218-
.update_window(false, difference);
220+
.or_insert_with(YamuxStreamState::incoming);
221+
stream.update_window(false, difference);
222+
if difference > 0 {
223+
// have some fresh space in the window
224+
// try send as many frames as can
225+
// last frame
226+
let mut window = stream.window_theirs;
227+
while let Some(mut frame) = stream.pending.pop_front() {
228+
let size = if let YamuxFrameInner::Data(data) = &frame.inner {
229+
data.len()
230+
} else {
231+
0
232+
} as u32;
233+
if let Some(new_window) = window.checked_sub(size) {
234+
pending_outgoing.push_back(frame);
235+
window = new_window;
236+
} else {
237+
if let Some(remaining) =
238+
frame.split_at((size - window) as usize)
239+
{
240+
stream.pending.push_front(remaining);
241+
}
242+
pending_outgoing.push_back(frame);
243+
244+
break;
245+
}
246+
}
247+
}
219248
}
220249
YamuxFrameInner::Ping { .. } => {}
221250
YamuxFrameInner::GoAway(res) => yamux_state.set_res(res),
@@ -282,15 +311,17 @@ impl P2pNetworkYamuxState {
282311
}
283312
match &frame.inner {
284313
YamuxFrameInner::Data(data) => {
314+
// here we are very permissive
315+
// always when our window is smaller 64 kb, just increase it by 256 kb
316+
// if we need fine grained back pressure, it should be implemented here
285317
if stream.window_ours < 64 * 1024 {
318+
let difference = 256 * 1024;
286319
dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame {
287320
addr,
288321
frame: YamuxFrame {
289322
stream_id: frame.stream_id,
290323
flags: YamuxFlags::empty(),
291-
inner: YamuxFrameInner::WindowUpdate {
292-
difference: 256 * 1024,
293-
},
324+
inner: YamuxFrameInner::WindowUpdate { difference },
294325
},
295326
});
296327
}
@@ -318,21 +349,45 @@ impl P2pNetworkYamuxState {
318349
});
319350
}
320351
}
352+
YamuxFrameInner::WindowUpdate { .. } => {
353+
while let Some(frame) = pending_outgoing.pop_front() {
354+
dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame { addr, frame });
355+
}
356+
}
321357
_ => {}
322358
}
323359

324360
Ok(())
325361
}
326-
P2pNetworkYamuxAction::OutgoingFrame { frame, addr } => {
362+
P2pNetworkYamuxAction::OutgoingFrame { mut frame, addr } => {
327363
let Some(stream) = yamux_state.streams.get_mut(&frame.stream_id) else {
328364
return Ok(());
329365
};
330-
match &frame.inner {
366+
match &mut frame.inner {
331367
YamuxFrameInner::Data(data) => {
332-
// must not underflow
333-
// the action must not dispatch if it doesn't fit in the window
334-
// TODO: add pending queue, where frames will wait for window increase
335-
stream.window_theirs = stream.window_theirs.wrapping_sub(data.len() as u32);
368+
if let Some(new_window) =
369+
stream.window_theirs.checked_sub(data.len() as u32)
370+
{
371+
// their window is big enough, decrease the size
372+
// and send the whole frame
373+
stream.window_theirs = new_window;
374+
} else if stream.window_theirs != 0 && stream.pending.is_empty() {
375+
// their window is not big enough, but has some space,
376+
// and the queue is empty,
377+
// do not send the whole frame,
378+
// split it and put remaining in the queue,
379+
if let Some(remaining) = frame.split_at(stream.window_theirs as usize) {
380+
stream.pending.push_back(remaining);
381+
}
382+
// the window will be zero after sending
383+
stream.window_theirs = 0;
384+
} else {
385+
// either the window cannot accept any byte,
386+
// or the queue is already not empty
387+
// in both cases the whole frame goes in the queue and nothing to send
388+
stream.pending.push_back(frame);
389+
return Ok(());
390+
}
336391
}
337392
YamuxFrameInner::WindowUpdate { difference } => {
338393
stream.update_window(true, *difference);

p2p/src/network/yamux/p2p_network_yamux_state.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ pub struct YamuxStreamState {
5454
pub writable: bool,
5555
pub window_theirs: u32,
5656
pub window_ours: u32,
57+
pub pending: VecDeque<YamuxFrame>,
5758
}
5859

5960
impl Default for YamuxStreamState {
@@ -66,6 +67,7 @@ impl Default for YamuxStreamState {
6667
writable: false,
6768
window_theirs: 256 * 1024,
6869
window_ours: 256 * 1024,
70+
pending: VecDeque::default(),
6971
}
7072
}
7173
}
@@ -182,6 +184,35 @@ impl YamuxFrame {
182184

183185
vec
184186
}
187+
188+
/// If this data is bigger then `pos`, keep only first `pos` bytes and return some remaining
189+
/// otherwise return none
190+
pub fn split_at(&mut self, pos: usize) -> Option<Self> {
191+
use std::ops::Sub;
192+
193+
if let YamuxFrameInner::Data(data) = &mut self.inner {
194+
if data.len() == pos {
195+
return None;
196+
}
197+
let (keep, rest) = data.split_at(pos);
198+
let rest = Data(rest.to_vec().into_boxed_slice());
199+
*data = Data(keep.to_vec().into_boxed_slice());
200+
201+
let fin = if self.flags.contains(YamuxFlags::FIN) {
202+
self.flags.remove(YamuxFlags::FIN);
203+
YamuxFlags::FIN
204+
} else {
205+
YamuxFlags::empty()
206+
};
207+
Some(YamuxFrame {
208+
flags: self.flags.sub(YamuxFlags::SYN | YamuxFlags::ACK) | fin,
209+
stream_id: self.stream_id,
210+
inner: YamuxFrameInner::Data(rest),
211+
})
212+
} else {
213+
None
214+
}
215+
}
185216
}
186217

187218
#[derive(Serialize, Deserialize, Debug, Clone)]

0 commit comments

Comments
 (0)