Skip to content

Commit 2d5bc2c

Browse files
authored
Merge pull request #1032 from openmina/optimization/reuse-yamux-recv-buffer
feat(yamux): More efficient management of the recv buffer
2 parents 6d8f1d9 + ef1868f commit 2d5bc2c

File tree

2 files changed

+170
-109
lines changed

2 files changed

+170
-109
lines changed

p2p/src/network/yamux/p2p_network_yamux_reducer.rs

Lines changed: 27 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,35 @@ use openmina_core::{bug_condition, fuzz_maybe, fuzzed_maybe, Substate, SubstateA
44

55
use crate::P2pLimits;
66

7-
use self::p2p_network_yamux_state::{
8-
YamuxFlags, YamuxFrame, YamuxFrameInner, YamuxFrameParseError, YamuxSessionError,
9-
YamuxStreamState,
10-
};
7+
use self::p2p_network_yamux_state::{YamuxFlags, YamuxFrame, YamuxFrameInner, YamuxStreamState};
118

129
use super::{super::*, *};
1310

1411
impl P2pNetworkYamuxState {
15-
pub fn set_err(&mut self, err: YamuxFrameParseError) {
16-
self.terminated = Some(Err(err));
17-
}
18-
19-
pub fn set_res(&mut self, res: Result<(), YamuxSessionError>) {
20-
self.terminated = Some(Ok(res));
21-
}
22-
23-
/// Substate is accessed
12+
/// Handles the main reducer logic for Yamux protocol actions. It processes incoming and outgoing
13+
/// data, selects appropriate behavior based on frame types, and manages the state of streams
14+
/// within a Yamux session.
15+
///
16+
/// # High-Level Overview
17+
///
18+
/// - When data arrives, it is appended to an internal buffer. The buffer is then parsed for
19+
/// valid Yamux frames (using protocol-specific header fields and logic). Incomplete data
20+
/// remains in the buffer for future parsing.
21+
/// - On successful parsing, frames are enqueued for further handling (e.g., dispatching
22+
/// actions to notify higher-level protocols or responding to pings).
23+
/// - If protocol inconsistencies or invalid headers are encountered, it marks an error or
24+
/// terminates gracefully, preventing further processing of unexpected data.
25+
/// - Outgoing data is prepared as frames that respect the window constraints and established
26+
/// flags (e.g., SYN, ACK, FIN), and they are dispatched for transmission.
27+
/// - Once frames are processed, the function checks if the buffer has grown beyond a certain
28+
/// threshold relative to its initial capacity. If so, and if the remaining data is small,
29+
/// it resets the buffer capacity to a default size to avoid excessive memory usage.
30+
/// - The function also manages streams and their states, ensuring that proper handshake
31+
/// flags are set (SYN, ACK) when a new stream is opened or accepted, enforcing limits on
32+
/// the number of streams, and notifying higher-level components about events like
33+
/// incoming data or connection errors.
2434
pub fn reducer<State, Action>(
35+
// Substate is accessed
2536
mut state_context: Substate<Action, State, P2pNetworkSchedulerState>,
2637
action: redux::ActionWithMeta<P2pNetworkYamuxAction>,
2738
) -> Result<(), String>
@@ -47,103 +58,10 @@ impl P2pNetworkYamuxState {
4758

4859
match action {
4960
P2pNetworkYamuxAction::IncomingData { data, addr } => {
50-
yamux_state.buffer.extend_from_slice(&data);
51-
let mut offset = 0;
52-
loop {
53-
let buf = &yamux_state.buffer[offset..];
54-
if buf.len() >= 12 {
55-
let _version = match buf[0] {
56-
0 => 0,
57-
unknown => {
58-
yamux_state.set_err(YamuxFrameParseError::Version(unknown));
59-
break;
60-
}
61-
};
62-
let flags = u16::from_be_bytes(buf[2..4].try_into().expect("cannot fail"));
63-
let Some(flags) = YamuxFlags::from_bits(flags) else {
64-
yamux_state.set_err(YamuxFrameParseError::Flags(flags));
65-
break;
66-
};
67-
let stream_id =
68-
u32::from_be_bytes(buf[4..8].try_into().expect("cannot fail"));
69-
let b = buf[8..12].try_into().expect("cannot fail");
70-
71-
match buf[1] {
72-
0 => {
73-
let len = u32::from_be_bytes(b) as usize;
74-
if len > yamux_state.message_size_limit {
75-
yamux_state.set_res(Err(YamuxSessionError::Internal));
76-
break;
77-
}
78-
if buf.len() >= 12 + len {
79-
let frame = YamuxFrame {
80-
flags,
81-
stream_id,
82-
inner: YamuxFrameInner::Data(
83-
buf[12..(12 + len)].to_vec().into(),
84-
),
85-
};
86-
yamux_state.incoming.push_back(frame);
87-
offset += 12 + len;
88-
continue;
89-
}
90-
}
91-
1 => {
92-
let difference = u32::from_be_bytes(b);
93-
let frame = YamuxFrame {
94-
flags,
95-
stream_id,
96-
inner: YamuxFrameInner::WindowUpdate { difference },
97-
};
98-
yamux_state.incoming.push_back(frame);
99-
offset += 12;
100-
continue;
101-
}
102-
2 => {
103-
let opaque = u32::from_be_bytes(b);
104-
let frame = YamuxFrame {
105-
flags,
106-
stream_id,
107-
inner: YamuxFrameInner::Ping { opaque },
108-
};
109-
yamux_state.incoming.push_back(frame);
110-
offset += 12;
111-
continue;
112-
}
113-
3 => {
114-
let code = u32::from_be_bytes(b);
115-
let result = match code {
116-
0 => Ok(()),
117-
1 => Err(YamuxSessionError::Protocol),
118-
2 => Err(YamuxSessionError::Internal),
119-
unknown => {
120-
yamux_state
121-
.set_err(YamuxFrameParseError::ErrorCode(unknown));
122-
break;
123-
}
124-
};
125-
let frame = YamuxFrame {
126-
flags,
127-
stream_id,
128-
inner: YamuxFrameInner::GoAway(result),
129-
};
130-
yamux_state.incoming.push_back(frame);
131-
offset += 12;
132-
continue;
133-
}
134-
unknown => {
135-
yamux_state.set_err(YamuxFrameParseError::Type(unknown));
136-
break;
137-
}
138-
}
139-
}
140-
141-
break;
142-
}
143-
144-
yamux_state.buffer = yamux_state.buffer[offset..].to_vec();
61+
yamux_state.extend_buffer(&data);
62+
yamux_state.parse_frames();
14563

146-
let frame_count = yamux_state.incoming.len();
64+
let frame_count = yamux_state.incoming_frame_count();
14765
let dispatcher = state_context.into_dispatcher();
14866

14967
for _ in 0..frame_count {

p2p/src/network/yamux/p2p_network_yamux_state.rs

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ use serde::{Deserialize, Serialize};
44

55
use super::super::*;
66

7+
pub const INITIAL_RECV_BUFFER_CAPACITY: usize = 0x40000; // 256kb
8+
79
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
810
pub struct P2pNetworkYamuxState {
911
pub message_size_limit: Limit<usize>,
@@ -44,6 +46,147 @@ impl P2pNetworkYamuxState {
4446

4547
windows + headers * SIZE_OF_HEADER
4648
}
49+
50+
pub fn set_err(&mut self, err: YamuxFrameParseError) {
51+
self.terminated = Some(Err(err));
52+
}
53+
54+
pub fn set_res(&mut self, res: Result<(), YamuxSessionError>) {
55+
self.terminated = Some(Ok(res));
56+
}
57+
58+
/// Attempts to parse a Yamux frame from the buffer starting at the given offset.
59+
/// Returns the number of bytes consumed if a frame was successfully parsed.
60+
pub fn try_parse_frame(&mut self, offset: usize) -> Option<usize> {
61+
let buf = &self.buffer[offset..];
62+
if buf.len() < 12 {
63+
return None;
64+
}
65+
66+
let _version = match buf[0] {
67+
0 => 0,
68+
unknown => {
69+
self.set_err(YamuxFrameParseError::Version(unknown));
70+
return None;
71+
}
72+
};
73+
74+
let flags = u16::from_be_bytes(buf[2..4].try_into().expect("cannot fail"));
75+
let Some(flags) = YamuxFlags::from_bits(flags) else {
76+
self.set_err(YamuxFrameParseError::Flags(flags));
77+
return None;
78+
};
79+
let stream_id = u32::from_be_bytes(buf[4..8].try_into().expect("cannot fail"));
80+
let b = buf[8..12].try_into().expect("cannot fail");
81+
82+
match buf[1] {
83+
// Data frame - contains actual payload data for the stream
84+
0 => {
85+
let len = u32::from_be_bytes(b) as usize;
86+
if len > self.message_size_limit {
87+
self.set_res(Err(YamuxSessionError::Internal));
88+
return None;
89+
}
90+
if buf.len() >= 12 + len {
91+
let frame = YamuxFrame {
92+
flags,
93+
stream_id,
94+
inner: YamuxFrameInner::Data(buf[12..(12 + len)].to_vec().into()),
95+
};
96+
self.incoming.push_back(frame);
97+
Some(12 + len)
98+
} else {
99+
None
100+
}
101+
}
102+
// Window Update frame - used for flow control, updates available window size
103+
1 => {
104+
let difference = u32::from_be_bytes(b);
105+
let frame = YamuxFrame {
106+
flags,
107+
stream_id,
108+
inner: YamuxFrameInner::WindowUpdate { difference },
109+
};
110+
self.incoming.push_back(frame);
111+
Some(12)
112+
}
113+
// Ping frame - used for keepalive and round-trip time measurements
114+
2 => {
115+
let opaque = u32::from_be_bytes(b);
116+
let frame = YamuxFrame {
117+
flags,
118+
stream_id,
119+
inner: YamuxFrameInner::Ping { opaque },
120+
};
121+
self.incoming.push_back(frame);
122+
Some(12)
123+
}
124+
// GoAway frame - signals session termination with optional error code
125+
3 => {
126+
let code = u32::from_be_bytes(b);
127+
let result = match code {
128+
0 => Ok(()), // Normal termination
129+
1 => Err(YamuxSessionError::Protocol), // Protocol error
130+
2 => Err(YamuxSessionError::Internal), // Internal error
131+
unknown => {
132+
self.set_err(YamuxFrameParseError::ErrorCode(unknown));
133+
return None;
134+
}
135+
};
136+
let frame = YamuxFrame {
137+
flags,
138+
stream_id,
139+
inner: YamuxFrameInner::GoAway(result),
140+
};
141+
self.incoming.push_back(frame);
142+
Some(12)
143+
}
144+
// Unknown frame type
145+
unknown => {
146+
self.set_err(YamuxFrameParseError::Type(unknown));
147+
None
148+
}
149+
}
150+
}
151+
152+
/// Attempts to parse all available complete frames from the buffer,
153+
/// then shifts and compacts the buffer as needed.
154+
pub fn parse_frames(&mut self) {
155+
let mut offset = 0;
156+
while let Some(consumed) = self.try_parse_frame(offset) {
157+
offset += consumed;
158+
}
159+
self.shift_and_compact_buffer(offset);
160+
}
161+
162+
fn shift_and_compact_buffer(&mut self, offset: usize) {
163+
let new_len = self.buffer.len() - offset;
164+
if self.buffer.capacity() > INITIAL_RECV_BUFFER_CAPACITY * 2
165+
&& new_len < INITIAL_RECV_BUFFER_CAPACITY / 2
166+
{
167+
let old_buffer = &self.buffer;
168+
let mut new_buffer = Vec::with_capacity(INITIAL_RECV_BUFFER_CAPACITY);
169+
new_buffer.extend_from_slice(&old_buffer[offset..]);
170+
self.buffer = new_buffer;
171+
} else {
172+
self.buffer.copy_within(offset.., 0);
173+
self.buffer.truncate(new_len);
174+
}
175+
}
176+
177+
/// Extends the internal buffer with new data, ensuring it has appropriate capacity.
178+
/// On first use, reserves the initial capacity.
179+
pub fn extend_buffer(&mut self, data: &[u8]) {
180+
if self.buffer.capacity() == 0 {
181+
self.buffer.reserve(INITIAL_RECV_BUFFER_CAPACITY);
182+
}
183+
self.buffer.extend_from_slice(data);
184+
}
185+
186+
/// Returns the number of incoming frames that have been parsed and are ready for processing.
187+
pub fn incoming_frame_count(&self) -> usize {
188+
self.incoming.len()
189+
}
47190
}
48191

49192
#[derive(Serialize, Deserialize, Debug, Clone)]

0 commit comments

Comments
 (0)