Skip to content

Commit ef1868f

Browse files
committed
feat(yamux): Abstract the reduction of the incoming state, manage recv buffer size growth
1 parent d297e05 commit ef1868f

File tree

2 files changed

+171
-128
lines changed

2 files changed

+171
-128
lines changed

p2p/src/network/yamux/p2p_network_yamux_reducer.rs

Lines changed: 27 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,35 @@ use openmina_core::{bug_condition, fuzz_maybe, fuzzed_maybe, Substate, SubstateA
44

55
use crate::P2pLimits;
66

7-
use self::p2p_network_yamux_state::{
8-
YamuxFlags, YamuxFrame, YamuxFrameInner, YamuxFrameParseError, YamuxSessionError,
9-
YamuxStreamState,
10-
};
7+
use self::p2p_network_yamux_state::{YamuxFlags, YamuxFrame, YamuxFrameInner, YamuxStreamState};
118

129
use super::{super::*, *};
1310

1411
impl P2pNetworkYamuxState {
15-
pub fn set_err(&mut self, err: YamuxFrameParseError) {
16-
self.terminated = Some(Err(err));
17-
}
18-
19-
pub fn set_res(&mut self, res: Result<(), YamuxSessionError>) {
20-
self.terminated = Some(Ok(res));
21-
}
22-
23-
/// Substate is accessed
12+
/// Handles the main reducer logic for Yamux protocol actions. It processes incoming and outgoing
13+
/// data, selects appropriate behavior based on frame types, and manages the state of streams
14+
/// within a Yamux session.
15+
///
16+
/// # High-Level Overview
17+
///
18+
/// - When data arrives, it is appended to an internal buffer. The buffer is then parsed for
19+
/// valid Yamux frames (using protocol-specific header fields and logic). Incomplete data
20+
/// remains in the buffer for future parsing.
21+
/// - On successful parsing, frames are enqueued for further handling (e.g., dispatching
22+
/// actions to notify higher-level protocols or responding to pings).
23+
/// - If protocol inconsistencies or invalid headers are encountered, it marks an error or
24+
/// terminates gracefully, preventing further processing of unexpected data.
25+
/// - Outgoing data is prepared as frames that respect the window constraints and established
26+
/// flags (e.g., SYN, ACK, FIN), and they are dispatched for transmission.
27+
/// - Once frames are processed, the function checks if the buffer has grown beyond a certain
28+
/// threshold relative to its initial capacity. If so, and if the remaining data is small,
29+
/// it resets the buffer capacity to a default size to avoid excessive memory usage.
30+
/// - The function also manages streams and their states, ensuring that proper handshake
31+
/// flags are set (SYN, ACK) when a new stream is opened or accepted, enforcing limits on
32+
/// the number of streams, and notifying higher-level components about events like
33+
/// incoming data or connection errors.
2434
pub fn reducer<State, Action>(
35+
// Substate is accessed
2536
mut state_context: Substate<Action, State, P2pNetworkSchedulerState>,
2637
action: redux::ActionWithMeta<P2pNetworkYamuxAction>,
2738
) -> Result<(), String>
@@ -47,107 +58,10 @@ impl P2pNetworkYamuxState {
4758

4859
match action {
4960
P2pNetworkYamuxAction::IncomingData { data, addr } => {
50-
// TODO: this may grow over the capacity, if that happens make sure to eventually
51-
// truncate it to a reasonable size.
52-
yamux_state.buffer.extend_from_slice(&data);
53-
let mut offset = 0;
54-
loop {
55-
let buf = &yamux_state.buffer[offset..];
56-
if buf.len() >= 12 {
57-
let _version = match buf[0] {
58-
0 => 0,
59-
unknown => {
60-
yamux_state.set_err(YamuxFrameParseError::Version(unknown));
61-
break;
62-
}
63-
};
64-
let flags = u16::from_be_bytes(buf[2..4].try_into().expect("cannot fail"));
65-
let Some(flags) = YamuxFlags::from_bits(flags) else {
66-
yamux_state.set_err(YamuxFrameParseError::Flags(flags));
67-
break;
68-
};
69-
let stream_id =
70-
u32::from_be_bytes(buf[4..8].try_into().expect("cannot fail"));
71-
let b = buf[8..12].try_into().expect("cannot fail");
72-
73-
match buf[1] {
74-
0 => {
75-
let len = u32::from_be_bytes(b) as usize;
76-
if len > yamux_state.message_size_limit {
77-
yamux_state.set_res(Err(YamuxSessionError::Internal));
78-
break;
79-
}
80-
if buf.len() >= 12 + len {
81-
let frame = YamuxFrame {
82-
flags,
83-
stream_id,
84-
inner: YamuxFrameInner::Data(
85-
buf[12..(12 + len)].to_vec().into(),
86-
),
87-
};
88-
yamux_state.incoming.push_back(frame);
89-
offset += 12 + len;
90-
continue;
91-
}
92-
}
93-
1 => {
94-
let difference = u32::from_be_bytes(b);
95-
let frame = YamuxFrame {
96-
flags,
97-
stream_id,
98-
inner: YamuxFrameInner::WindowUpdate { difference },
99-
};
100-
yamux_state.incoming.push_back(frame);
101-
offset += 12;
102-
continue;
103-
}
104-
2 => {
105-
let opaque = u32::from_be_bytes(b);
106-
let frame = YamuxFrame {
107-
flags,
108-
stream_id,
109-
inner: YamuxFrameInner::Ping { opaque },
110-
};
111-
yamux_state.incoming.push_back(frame);
112-
offset += 12;
113-
continue;
114-
}
115-
3 => {
116-
let code = u32::from_be_bytes(b);
117-
let result = match code {
118-
0 => Ok(()),
119-
1 => Err(YamuxSessionError::Protocol),
120-
2 => Err(YamuxSessionError::Internal),
121-
unknown => {
122-
yamux_state
123-
.set_err(YamuxFrameParseError::ErrorCode(unknown));
124-
break;
125-
}
126-
};
127-
let frame = YamuxFrame {
128-
flags,
129-
stream_id,
130-
inner: YamuxFrameInner::GoAway(result),
131-
};
132-
yamux_state.incoming.push_back(frame);
133-
offset += 12;
134-
continue;
135-
}
136-
unknown => {
137-
yamux_state.set_err(YamuxFrameParseError::Type(unknown));
138-
break;
139-
}
140-
}
141-
}
142-
143-
break;
144-
}
145-
146-
let new_len = yamux_state.buffer.len() - offset;
147-
yamux_state.buffer.copy_within(offset.., 0);
148-
yamux_state.buffer.truncate(new_len);
61+
yamux_state.extend_buffer(&data);
62+
yamux_state.parse_frames();
14963

150-
let frame_count = yamux_state.incoming.len();
64+
let frame_count = yamux_state.incoming_frame_count();
15165
let dispatcher = state_context.into_dispatcher();
15266

15367
for _ in 0..frame_count {

p2p/src/network/yamux/p2p_network_yamux_state.rs

Lines changed: 144 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ use serde::{Deserialize, Serialize};
44

55
use super::super::*;
66

7-
#[derive(Serialize, Deserialize, Debug, Clone)]
7+
pub const INITIAL_RECV_BUFFER_CAPACITY: usize = 0x40000; // 256kb
8+
9+
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
810
pub struct P2pNetworkYamuxState {
911
pub message_size_limit: Limit<usize>,
1012
pub pending_outgoing_limit: Limit<usize>,
@@ -15,20 +17,6 @@ pub struct P2pNetworkYamuxState {
1517
pub init: bool,
1618
}
1719

18-
impl Default for P2pNetworkYamuxState {
19-
fn default() -> Self {
20-
Self {
21-
message_size_limit: Default::default(),
22-
pending_outgoing_limit: Default::default(),
23-
buffer: Vec::with_capacity(0x40000), // 256kb
24-
incoming: VecDeque::with_capacity(10), // TODO: measure and see what is a good default
25-
streams: Default::default(),
26-
terminated: Default::default(),
27-
init: Default::default(),
28-
}
29-
}
30-
}
31-
3220
impl P2pNetworkYamuxState {
3321
/// Calculates and returns the next available stream ID for outgoing
3422
/// communication.
@@ -58,6 +46,147 @@ impl P2pNetworkYamuxState {
5846

5947
windows + headers * SIZE_OF_HEADER
6048
}
49+
50+
pub fn set_err(&mut self, err: YamuxFrameParseError) {
51+
self.terminated = Some(Err(err));
52+
}
53+
54+
pub fn set_res(&mut self, res: Result<(), YamuxSessionError>) {
55+
self.terminated = Some(Ok(res));
56+
}
57+
58+
/// Attempts to parse a Yamux frame from the buffer starting at the given offset.
59+
/// Returns the number of bytes consumed if a frame was successfully parsed.
60+
pub fn try_parse_frame(&mut self, offset: usize) -> Option<usize> {
61+
let buf = &self.buffer[offset..];
62+
if buf.len() < 12 {
63+
return None;
64+
}
65+
66+
let _version = match buf[0] {
67+
0 => 0,
68+
unknown => {
69+
self.set_err(YamuxFrameParseError::Version(unknown));
70+
return None;
71+
}
72+
};
73+
74+
let flags = u16::from_be_bytes(buf[2..4].try_into().expect("cannot fail"));
75+
let Some(flags) = YamuxFlags::from_bits(flags) else {
76+
self.set_err(YamuxFrameParseError::Flags(flags));
77+
return None;
78+
};
79+
let stream_id = u32::from_be_bytes(buf[4..8].try_into().expect("cannot fail"));
80+
let b = buf[8..12].try_into().expect("cannot fail");
81+
82+
match buf[1] {
83+
// Data frame - contains actual payload data for the stream
84+
0 => {
85+
let len = u32::from_be_bytes(b) as usize;
86+
if len > self.message_size_limit {
87+
self.set_res(Err(YamuxSessionError::Internal));
88+
return None;
89+
}
90+
if buf.len() >= 12 + len {
91+
let frame = YamuxFrame {
92+
flags,
93+
stream_id,
94+
inner: YamuxFrameInner::Data(buf[12..(12 + len)].to_vec().into()),
95+
};
96+
self.incoming.push_back(frame);
97+
Some(12 + len)
98+
} else {
99+
None
100+
}
101+
}
102+
// Window Update frame - used for flow control, updates available window size
103+
1 => {
104+
let difference = u32::from_be_bytes(b);
105+
let frame = YamuxFrame {
106+
flags,
107+
stream_id,
108+
inner: YamuxFrameInner::WindowUpdate { difference },
109+
};
110+
self.incoming.push_back(frame);
111+
Some(12)
112+
}
113+
// Ping frame - used for keepalive and round-trip time measurements
114+
2 => {
115+
let opaque = u32::from_be_bytes(b);
116+
let frame = YamuxFrame {
117+
flags,
118+
stream_id,
119+
inner: YamuxFrameInner::Ping { opaque },
120+
};
121+
self.incoming.push_back(frame);
122+
Some(12)
123+
}
124+
// GoAway frame - signals session termination with optional error code
125+
3 => {
126+
let code = u32::from_be_bytes(b);
127+
let result = match code {
128+
0 => Ok(()), // Normal termination
129+
1 => Err(YamuxSessionError::Protocol), // Protocol error
130+
2 => Err(YamuxSessionError::Internal), // Internal error
131+
unknown => {
132+
self.set_err(YamuxFrameParseError::ErrorCode(unknown));
133+
return None;
134+
}
135+
};
136+
let frame = YamuxFrame {
137+
flags,
138+
stream_id,
139+
inner: YamuxFrameInner::GoAway(result),
140+
};
141+
self.incoming.push_back(frame);
142+
Some(12)
143+
}
144+
// Unknown frame type
145+
unknown => {
146+
self.set_err(YamuxFrameParseError::Type(unknown));
147+
None
148+
}
149+
}
150+
}
151+
152+
/// Attempts to parse all available complete frames from the buffer,
153+
/// then shifts and compacts the buffer as needed.
154+
pub fn parse_frames(&mut self) {
155+
let mut offset = 0;
156+
while let Some(consumed) = self.try_parse_frame(offset) {
157+
offset += consumed;
158+
}
159+
self.shift_and_compact_buffer(offset);
160+
}
161+
162+
fn shift_and_compact_buffer(&mut self, offset: usize) {
163+
let new_len = self.buffer.len() - offset;
164+
if self.buffer.capacity() > INITIAL_RECV_BUFFER_CAPACITY * 2
165+
&& new_len < INITIAL_RECV_BUFFER_CAPACITY / 2
166+
{
167+
let old_buffer = &self.buffer;
168+
let mut new_buffer = Vec::with_capacity(INITIAL_RECV_BUFFER_CAPACITY);
169+
new_buffer.extend_from_slice(&old_buffer[offset..]);
170+
self.buffer = new_buffer;
171+
} else {
172+
self.buffer.copy_within(offset.., 0);
173+
self.buffer.truncate(new_len);
174+
}
175+
}
176+
177+
/// Extends the internal buffer with new data, ensuring it has appropriate capacity.
178+
/// On first use, reserves the initial capacity.
179+
pub fn extend_buffer(&mut self, data: &[u8]) {
180+
if self.buffer.capacity() == 0 {
181+
self.buffer.reserve(INITIAL_RECV_BUFFER_CAPACITY);
182+
}
183+
self.buffer.extend_from_slice(data);
184+
}
185+
186+
/// Returns the number of incoming frames that have been parsed and are ready for processing.
187+
pub fn incoming_frame_count(&self) -> usize {
188+
self.incoming.len()
189+
}
61190
}
62191

63192
#[derive(Serialize, Deserialize, Debug, Clone)]

0 commit comments

Comments
 (0)