Skip to content

Commit 07fe1c0

Browse files
committed
more refactor
1 parent 3a7a373 commit 07fe1c0

File tree

2 files changed

+81
-39
lines changed

2 files changed

+81
-39
lines changed

p2p/src/network/yamux/p2p_network_yamux_reducer.rs

Lines changed: 35 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -144,10 +144,12 @@ impl P2pNetworkYamuxState {
144144
.ok_or_else(|| format!("Connection not found {}", addr))?;
145145

146146
let stream_exists = connection_state.get_yamux_stream(frame_stream_id).is_some();
147-
// TODO: can this ever be None?
148147
let peer_id = match connection_state.peer_id() {
149148
Some(peer_id) => *peer_id,
150-
None => return Ok(()),
149+
None => {
150+
bug_condition!("Peer id must exist");
151+
return Ok(());
152+
}
151153
};
152154

153155
// Peer reset this stream
@@ -193,34 +195,30 @@ impl P2pNetworkYamuxState {
193195
P2pNetworkYamuxAction::IncomingFrameData { addr } => {
194196
let frame = yamux_state.incoming.pop_front().unwrap(); // Cannot fail
195197
let YamuxFrameInner::Data(data) = &frame.inner else {
196-
bug_condition!(
197-
"Expected Data frame for action `P2pNetworkYamuxAction::IncomingFrameData`"
198-
);
198+
bug_condition!("Expected Data frame");
199199
return Ok(());
200200
};
201201

202202
let Some(stream) = yamux_state.streams.get_mut(&frame.stream_id) else {
203203
return Ok(());
204204
};
205205

206-
// must not underflow
207-
// TODO: check it and disconnect peer that violates flow rules
208-
stream.window_ours = stream.window_ours.saturating_sub(frame.len_as_u32());
206+
// Process incoming data and check if we need to update window
207+
let window_update_info =
208+
if let Some(window_increase) = stream.process_incoming_data(&frame) {
209+
Some((frame.stream_id, window_increase))
210+
} else {
211+
None
212+
};
209213

210-
let window_update_info = if stream.window_ours < stream.max_window_size / 2 {
211-
Some((
212-
frame.stream_id,
213-
stream.max_window_size.saturating_mul(2).min(1024 * 1024),
214-
))
215-
} else {
216-
None
214+
let peer_id = match connection_state.peer_id() {
215+
Some(peer_id) => *peer_id,
216+
None => {
217+
bug_condition!("Peer id must exist");
218+
return Ok(());
219+
}
217220
};
218221

219-
let peer_id = connection_state
220-
.peer_id()
221-
.expect("peer id must exist")
222-
.clone();
223-
224222
let dispatcher = state_context.into_dispatcher();
225223

226224
if let Some((update_stream_id, difference)) = window_update_info {
@@ -309,26 +307,21 @@ impl P2pNetworkYamuxState {
309307
let Some(stream) = yamux_state.streams.get_mut(&stream_id) else {
310308
return Ok(());
311309
};
312-
match &mut frame.inner {
310+
311+
match &frame.inner {
313312
YamuxFrameInner::Data(_) => {
314-
let frame_len = frame.len_as_u32();
315-
if !stream.try_consume_window(frame_len) {
316-
// Window too small, split frame and queue remaining
317-
if let Some(remaining) = frame.split_at(stream.window_theirs as usize) {
318-
stream.pending.push_front(remaining);
319-
}
320-
stream.window_theirs = 0;
321-
322-
// Check pending queue size limit
323-
if stream.pending.iter().map(YamuxFrame::len).sum::<usize>()
324-
> yamux_state.pending_outgoing_limit
325-
{
326-
let dispatcher = state_context.into_dispatcher();
327-
let error = P2pNetworkConnectionError::YamuxOverflow(stream_id);
328-
dispatcher.push(P2pNetworkSchedulerAction::Error { addr, error });
329-
return Ok(());
330-
}
313+
let (accepted, remaining) =
314+
stream.queue_frame(frame, yamux_state.pending_outgoing_limit);
315+
316+
if remaining.is_some() {
317+
let dispatcher = state_context.into_dispatcher();
318+
let error = P2pNetworkConnectionError::YamuxOverflow(stream_id);
319+
dispatcher.push(P2pNetworkSchedulerAction::Error { addr, error });
320+
return Ok(());
331321
}
322+
323+
frame =
324+
accepted.expect("frame should be accepted or error should be returned");
332325
}
333326
YamuxFrameInner::WindowUpdate { difference } => {
334327
stream.update_local_window(*difference);
@@ -380,7 +373,10 @@ impl P2pNetworkYamuxState {
380373

381374
let peer_id = match connection_state.peer_id() {
382375
Some(peer_id) => *peer_id,
383-
None => return Ok(()),
376+
None => {
377+
bug_condition!("Peer id must exist");
378+
return Ok(());
379+
}
384380
};
385381

386382
let dispatcher = state_context.into_dispatcher();

p2p/src/network/yamux/p2p_network_yamux_state.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,52 @@ impl YamuxStreamState {
283283
None
284284
}
285285
}
286+
287+
/// Attempts to queue a frame, respecting window size and pending queue limits
288+
/// Returns (accepted_frame, remaining_frame) where:
289+
/// - accepted_frame is the portion that fits in the current window
290+
/// - remaining_frame is the portion that needs to be queued (if any)
291+
pub fn queue_frame(
292+
&mut self,
293+
frame: YamuxFrame,
294+
pending_limit: Limit<usize>,
295+
) -> (Option<YamuxFrame>, Option<YamuxFrame>) {
296+
let frame_len = frame.len_as_u32();
297+
298+
// Check if frame fits in current window
299+
if self.try_consume_window(frame_len) {
300+
return (Some(frame), None);
301+
}
302+
303+
// Split frame if needed
304+
let mut frame = frame;
305+
let remaining = frame.split_at(self.window_theirs as usize);
306+
self.window_theirs = 0;
307+
308+
// Check pending queue size limit
309+
let pending_size = self.pending.iter().map(YamuxFrame::len).sum::<usize>();
310+
if remaining.as_ref().map_or(0, |f| f.len()) + pending_size > pending_limit {
311+
// Queue is full
312+
return (None, Some(frame));
313+
}
314+
315+
if let Some(remaining) = remaining {
316+
self.pending.push_back(remaining);
317+
}
318+
319+
(Some(frame), None)
320+
}
321+
322+
/// Processes an incoming data frame and returns any necessary window updates
323+
pub fn process_incoming_data(&mut self, frame: &YamuxFrame) -> Option<u32> {
324+
// must not underflow
325+
// TODO: check it and disconnect peer that violates flow rules
326+
// Update our window
327+
self.window_ours = self.window_ours.saturating_sub(frame.len_as_u32());
328+
329+
// Check if window update needed
330+
self.should_update_window()
331+
}
286332
}
287333

288334
bitflags::bitflags! {

0 commit comments

Comments
 (0)