Skip to content

Commit d95811a

Browse files
committed
feat(heartbeats): Move some state update logic to state methods
1 parent 9955f49 commit d95811a

File tree

2 files changed

+61
-36
lines changed

2 files changed

+61
-36
lines changed

p2p/src/network/yamux/p2p_network_yamux_reducer.rs

Lines changed: 8 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
use std::collections::VecDeque;
2-
31
use openmina_core::{bug_condition, fuzz_maybe, fuzzed_maybe, Substate, SubstateAccess};
42

53
use crate::{
@@ -10,7 +8,7 @@ use crate::{
108
};
119

1210
use super::{
13-
p2p_network_yamux_state::{YamuxFrameKind, YamuxStreamState, MAX_WINDOW_SIZE},
11+
p2p_network_yamux_state::{YamuxFrameKind, YamuxStreamState},
1412
P2pNetworkYamuxAction, P2pNetworkYamuxState, YamuxFlags, YamuxPing,
1513
};
1614

@@ -286,26 +284,11 @@ impl P2pNetworkYamuxState {
286284
.entry(frame.stream_id)
287285
.or_insert_with(YamuxStreamState::incoming);
288286

289-
stream.window_theirs = stream.window_theirs.saturating_add(difference);
290-
291-
let mut pending_frames = VecDeque::new();
292-
293-
if difference > 0 {
294-
let mut window = stream.window_theirs;
295-
while let Some(frame) = stream.pending.pop_front() {
296-
let len = frame.len_as_u32();
297-
pending_frames.push_back(frame);
298-
if let Some(new_window) = window.checked_sub(len) {
299-
window = new_window;
300-
} else {
301-
break;
302-
}
303-
}
304-
}
287+
let sendable_frames = stream.update_remote_window(difference);
305288

306289
let dispatcher = state_context.into_dispatcher();
307290

308-
while let Some(frame) = pending_frames.pop_front() {
291+
for frame in sendable_frames {
309292
dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame { addr, frame });
310293
}
311294

@@ -328,23 +311,15 @@ impl P2pNetworkYamuxState {
328311
};
329312
match &mut frame.inner {
330313
YamuxFrameInner::Data(_) => {
331-
if let Some(new_window) =
332-
stream.window_theirs.checked_sub(frame.len_as_u32())
333-
{
334-
// their window is big enough, decrease the size
335-
// and send the whole frame
336-
stream.window_theirs = new_window;
337-
} else {
338-
// their window is not big enough
339-
// split the frame to send as much as you can and put the rest in the queue
314+
let frame_len = frame.len_as_u32();
315+
if !stream.try_consume_window(frame_len) {
316+
// Window too small, split frame and queue remaining
340317
if let Some(remaining) = frame.split_at(stream.window_theirs as usize) {
341318
stream.pending.push_front(remaining);
342319
}
343-
344-
// the window will be zero after sending
345320
stream.window_theirs = 0;
346321

347-
// if size of pending that is above the limit, ignore the peer
322+
// Check pending queue size limit
348323
if stream.pending.iter().map(YamuxFrame::len).sum::<usize>()
349324
> yamux_state.pending_outgoing_limit
350325
{
@@ -356,10 +331,7 @@ impl P2pNetworkYamuxState {
356331
}
357332
}
358333
YamuxFrameInner::WindowUpdate { difference } => {
359-
stream.window_ours = stream.window_ours.saturating_add(*difference);
360-
if stream.window_ours > stream.max_window_size {
361-
stream.max_window_size = stream.window_ours.min(MAX_WINDOW_SIZE);
362-
}
334+
stream.update_local_window(*difference);
363335
}
364336
_ => {}
365337
}

p2p/src/network/yamux/p2p_network_yamux_state.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,59 @@ impl YamuxStreamState {
230230
..Default::default()
231231
}
232232
}
233+
234+
/// Updates the remote window size and returns any frames that can now be sent
235+
/// Returns frames that were pending and can now be sent due to increased window size
236+
pub fn update_remote_window(&mut self, difference: u32) -> VecDeque<YamuxFrame> {
237+
self.window_theirs = self.window_theirs.saturating_add(difference);
238+
let mut sendable_frames = VecDeque::new();
239+
240+
if difference > 0 {
241+
let mut available_window = self.window_theirs;
242+
while let Some(frame) = self.pending.pop_front() {
243+
let frame_len = frame.len_as_u32();
244+
if frame_len > available_window {
245+
// Put frame back and stop
246+
self.pending.push_front(frame);
247+
break;
248+
}
249+
available_window -= frame_len;
250+
sendable_frames.push_back(frame);
251+
}
252+
}
253+
254+
sendable_frames
255+
}
256+
257+
/// Updates the local window size and possibly increases max window size
258+
pub fn update_local_window(&mut self, difference: u32) {
259+
self.window_ours = self.window_ours.saturating_add(difference);
260+
if self.window_ours > self.max_window_size {
261+
self.max_window_size = self.window_ours.min(MAX_WINDOW_SIZE);
262+
}
263+
}
264+
265+
/// Consumes window space for outgoing data
266+
/// Returns true if the frame can be sent immediately,
267+
/// false if it needs to be queued (window too small)
268+
pub fn try_consume_window(&mut self, frame_len: u32) -> bool {
269+
if let Some(new_window) = self.window_theirs.checked_sub(frame_len) {
270+
self.window_theirs = new_window;
271+
true
272+
} else {
273+
false
274+
}
275+
}
276+
277+
/// Checks if window should be updated based on current size
278+
/// Returns the amount by which the window should be increased, if any
279+
pub fn should_update_window(&self) -> Option<u32> {
280+
if self.window_ours < self.max_window_size / 2 {
281+
Some(self.max_window_size.saturating_mul(2).min(1024 * 1024))
282+
} else {
283+
None
284+
}
285+
}
233286
}
234287

235288
bitflags::bitflags! {

0 commit comments

Comments
 (0)