Skip to content

Commit 64b42cf

Browse files
committed
more refactor
1 parent 3a7a373 commit 64b42cf

File tree

2 files changed

+67
-33
lines changed

2 files changed

+67
-33
lines changed

p2p/src/network/yamux/p2p_network_yamux_reducer.rs

Lines changed: 21 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -193,28 +193,21 @@ impl P2pNetworkYamuxState {
193193
P2pNetworkYamuxAction::IncomingFrameData { addr } => {
194194
let frame = yamux_state.incoming.pop_front().unwrap(); // Cannot fail
195195
let YamuxFrameInner::Data(data) = &frame.inner else {
196-
bug_condition!(
197-
"Expected Data frame for action `P2pNetworkYamuxAction::IncomingFrameData`"
198-
);
196+
bug_condition!("Expected Data frame");
199197
return Ok(());
200198
};
201199

202200
let Some(stream) = yamux_state.streams.get_mut(&frame.stream_id) else {
203201
return Ok(());
204202
};
205203

206-
// must not underflow
207-
// TODO: check it and disconnect peer that violates flow rules
208-
stream.window_ours = stream.window_ours.saturating_sub(frame.len_as_u32());
209-
210-
let window_update_info = if stream.window_ours < stream.max_window_size / 2 {
211-
Some((
212-
frame.stream_id,
213-
stream.max_window_size.saturating_mul(2).min(1024 * 1024),
214-
))
215-
} else {
216-
None
217-
};
204+
// Process incoming data and check if we need to update window
205+
let window_update_info =
206+
if let Some(window_increase) = stream.process_incoming_data(&frame) {
207+
Some((frame.stream_id, window_increase))
208+
} else {
209+
None
210+
};
218211

219212
let peer_id = connection_state
220213
.peer_id()
@@ -309,26 +302,21 @@ impl P2pNetworkYamuxState {
309302
let Some(stream) = yamux_state.streams.get_mut(&stream_id) else {
310303
return Ok(());
311304
};
312-
match &mut frame.inner {
305+
306+
match &frame.inner {
313307
YamuxFrameInner::Data(_) => {
314-
let frame_len = frame.len_as_u32();
315-
if !stream.try_consume_window(frame_len) {
316-
// Window too small, split frame and queue remaining
317-
if let Some(remaining) = frame.split_at(stream.window_theirs as usize) {
318-
stream.pending.push_front(remaining);
319-
}
320-
stream.window_theirs = 0;
321-
322-
// Check pending queue size limit
323-
if stream.pending.iter().map(YamuxFrame::len).sum::<usize>()
324-
> yamux_state.pending_outgoing_limit
325-
{
326-
let dispatcher = state_context.into_dispatcher();
327-
let error = P2pNetworkConnectionError::YamuxOverflow(stream_id);
328-
dispatcher.push(P2pNetworkSchedulerAction::Error { addr, error });
329-
return Ok(());
330-
}
308+
let (accepted, remaining) =
309+
stream.queue_frame(frame, yamux_state.pending_outgoing_limit);
310+
311+
if remaining.is_some() {
312+
let dispatcher = state_context.into_dispatcher();
313+
let error = P2pNetworkConnectionError::YamuxOverflow(stream_id);
314+
dispatcher.push(P2pNetworkSchedulerAction::Error { addr, error });
315+
return Ok(());
331316
}
317+
318+
frame =
319+
accepted.expect("frame should be accepted or error should be returned");
332320
}
333321
YamuxFrameInner::WindowUpdate { difference } => {
334322
stream.update_local_window(*difference);

p2p/src/network/yamux/p2p_network_yamux_state.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,52 @@ impl YamuxStreamState {
283283
None
284284
}
285285
}
286+
287+
/// Attempts to queue a frame, respecting window size and pending queue limits
288+
/// Returns (accepted_frame, remaining_frame) where:
289+
/// - accepted_frame is the portion that fits in the current window
290+
/// - remaining_frame is the portion that needs to be queued (if any)
291+
pub fn queue_frame(
292+
&mut self,
293+
frame: YamuxFrame,
294+
pending_limit: Limit<usize>,
295+
) -> (Option<YamuxFrame>, Option<YamuxFrame>) {
296+
let frame_len = frame.len_as_u32();
297+
298+
// Check if frame fits in current window
299+
if self.try_consume_window(frame_len) {
300+
return (Some(frame), None);
301+
}
302+
303+
// Split frame if needed
304+
let mut frame = frame;
305+
let remaining = frame.split_at(self.window_theirs as usize);
306+
self.window_theirs = 0;
307+
308+
// Check pending queue size limit
309+
let pending_size = self.pending.iter().map(YamuxFrame::len).sum::<usize>();
310+
if remaining.as_ref().map_or(0, |f| f.len()) + pending_size > pending_limit {
311+
// Queue is full
312+
return (None, Some(frame));
313+
}
314+
315+
if let Some(remaining) = remaining {
316+
self.pending.push_back(remaining);
317+
}
318+
319+
(Some(frame), None)
320+
}
321+
322+
/// Processes an incoming data frame and returns any necessary window updates
323+
pub fn process_incoming_data(&mut self, frame: &YamuxFrame) -> Option<u32> {
324+
// must not underflow
325+
// TODO: check it and disconnect peer that violates flow rules
326+
// Update our window
327+
self.window_ours = self.window_ours.saturating_sub(frame.len_as_u32());
328+
329+
// Check if window update needed
330+
self.should_update_window()
331+
}
286332
}
287333

288334
bitflags::bitflags! {

0 commit comments

Comments
 (0)