Skip to content

Commit 7658139

Browse files
committed
tweak(yamux): Simplify reducer a bit more
1 parent e79c2f5 commit 7658139

File tree

2 files changed

+72
-55
lines changed

2 files changed

+72
-55
lines changed

p2p/src/network/yamux/p2p_network_yamux_reducer.rs

Lines changed: 48 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -107,69 +107,21 @@ impl P2pNetworkYamuxState {
107107
Ok(())
108108
}
109109
P2pNetworkYamuxAction::IncomingFrame { addr } => {
110-
let mut pending_outgoing = VecDeque::default();
111110
let Some(frame) = yamux_state.incoming.pop_front() else {
112111
bug_condition!(
113112
"Frame not found for action `P2pNetworkYamuxAction::IncomingFrame`"
114113
);
115114
return Ok(());
116115
};
117116

118-
if frame.flags.contains(YamuxFlags::SYN) {
119-
yamux_state
120-
.streams
121-
.insert(frame.stream_id, YamuxStreamState::incoming());
122-
123-
if frame.stream_id != 0 {
124-
connection_state.streams.insert(
125-
frame.stream_id,
126-
P2pNetworkStreamState::new_incoming(meta.time()),
127-
);
128-
}
129-
}
130-
if frame.flags.contains(YamuxFlags::ACK) {
131-
yamux_state
132-
.streams
133-
.entry(frame.stream_id)
134-
.or_default()
135-
.established = true;
136-
}
117+
YamuxStreamState::handle_frame_syn_ack_flags(
118+
&mut yamux_state.streams,
119+
&mut connection_state.streams,
120+
&frame,
121+
meta.time(),
122+
);
137123

138-
match &frame.inner {
139-
YamuxFrameInner::Data(_) => {
140-
if let Some(stream) = yamux_state.streams.get_mut(&frame.stream_id) {
141-
// must not underflow
142-
// TODO: check it and disconnect peer that violates flow rules
143-
stream.window_ours =
144-
stream.window_ours.saturating_sub(frame.len_as_u32());
145-
}
146-
}
147-
YamuxFrameInner::WindowUpdate { difference } => {
148-
let stream = yamux_state
149-
.streams
150-
.entry(frame.stream_id)
151-
.or_insert_with(YamuxStreamState::incoming);
152-
153-
stream.window_theirs = stream.window_theirs.saturating_add(*difference);
154-
155-
if *difference > 0 {
156-
// have some fresh space in the window
157-
// try send as many frames as can
158-
let mut window = stream.window_theirs;
159-
while let Some(frame) = stream.pending.pop_front() {
160-
let len = frame.len_as_u32();
161-
pending_outgoing.push_back(frame);
162-
if let Some(new_window) = window.checked_sub(len) {
163-
window = new_window;
164-
} else {
165-
break;
166-
}
167-
}
168-
}
169-
}
170-
YamuxFrameInner::Ping { .. } => {}
171-
YamuxFrameInner::GoAway(res) => yamux_state.set_res(*res),
172-
}
124+
let mut pending_outgoing = yamux_state.handle_frame_message(&frame);
173125

174126
let (dispatcher, state) = state_context.into_dispatcher_and_state();
175127
let limits: &P2pLimits = state.substate()?;
@@ -368,4 +320,45 @@ impl P2pNetworkYamuxState {
368320
}
369321
}
370322
}
323+
324+
fn handle_frame_message(&mut self, frame: &YamuxFrame) -> VecDeque<YamuxFrame> {
325+
let mut pending_outgoing = VecDeque::default();
326+
327+
match &frame.inner {
328+
YamuxFrameInner::Data(_) => {
329+
if let Some(stream) = self.streams.get_mut(&frame.stream_id) {
330+
// must not underflow
331+
// TODO: check it and disconnect peer that violates flow rules
332+
stream.window_ours = stream.window_ours.saturating_sub(frame.len_as_u32());
333+
}
334+
}
335+
YamuxFrameInner::WindowUpdate { difference } => {
336+
let stream = self
337+
.streams
338+
.entry(frame.stream_id)
339+
.or_insert_with(YamuxStreamState::incoming);
340+
341+
stream.window_theirs = stream.window_theirs.saturating_add(*difference);
342+
343+
if *difference > 0 {
344+
// have some fresh space in the window
345+
// try send as many frames as can
346+
let mut window = stream.window_theirs;
347+
while let Some(frame) = stream.pending.pop_front() {
348+
let len = frame.len_as_u32();
349+
pending_outgoing.push_back(frame);
350+
if let Some(new_window) = window.checked_sub(len) {
351+
window = new_window;
352+
} else {
353+
break;
354+
}
355+
}
356+
}
357+
}
358+
YamuxFrameInner::Ping { .. } => {}
359+
YamuxFrameInner::GoAway(res) => self.set_res(*res),
360+
}
361+
362+
pending_outgoing
363+
}
371364
}

p2p/src/network/yamux/p2p_network_yamux_state.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,30 @@ impl YamuxStreamState {
228228
..Default::default()
229229
}
230230
}
231+
232+
/// Processes SYN and ACK flags from an incoming Yamux frame and updates stream states accordingly.
233+
pub fn handle_frame_syn_ack_flags(
234+
yamux_streams: &mut BTreeMap<StreamId, YamuxStreamState>,
235+
connection_streams: &mut BTreeMap<u32, P2pNetworkStreamState>,
236+
frame: &YamuxFrame,
237+
time: redux::Timestamp,
238+
) {
239+
if frame.flags.contains(YamuxFlags::SYN) {
240+
yamux_streams.insert(frame.stream_id, YamuxStreamState::incoming());
241+
242+
// TODO: when is stream 0 used? why is it ignored?
243+
if frame.stream_id != 0 {
244+
connection_streams
245+
.insert(frame.stream_id, P2pNetworkStreamState::new_incoming(time));
246+
}
247+
}
248+
249+
if frame.flags.contains(YamuxFlags::ACK) {
250+
if let Some(stream) = yamux_streams.get_mut(&frame.stream_id) {
251+
stream.established = true;
252+
}
253+
}
254+
}
231255
}
232256

233257
bitflags::bitflags! {

0 commit comments

Comments
 (0)