From f6d3edafa68ff3a2126e2abf43247ce5ba66cb67 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 12 Dec 2025 17:50:43 +0100 Subject: [PATCH 01/24] split up poll_transmit per path and per space --- quinn-proto/src/connection/mod.rs | 538 +++++++++++++++++------------- 1 file changed, 302 insertions(+), 236 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 64d44a87e..c268223a4 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -322,6 +322,19 @@ pub struct Connection { qlog: QlogSink, } +#[derive(Debug)] +enum PathSendStatus { + SendTransmit { transmit: Option }, + NothingToSend { congestion_blocked: bool }, +} + +enum PathSpaceStatus { + NextSpace, + NothingToSend { congestion_blocked: bool }, + Send, + SendTransmit { transmit: Transmit }, +} + impl Connection { pub(crate) fn new( endpoint_config: Arc, @@ -985,6 +998,123 @@ impl Connection { && self.peer_supports_ack_frequency(); } + // If there is any open, validated and available path we only want to send frames to + // any backup path that must be sent on that backup path exclusively. + let have_available_path = self.paths.iter().any(|(id, path)| { + path.data.validated + && path.data.local_status() == PathStatus::Available + && self.rem_cids.contains_key(id) + }); + + // TODO: how to avoid the allocation? Cannot use a for loop because of borrowing + let path_ids: Vec<_> = self.paths.keys().copied().collect(); + + let mut transmit = TransmitBuf::new( + buf, + max_datagrams, + self.path_data(path_ids[0]).current_mtu().into(), + ); + + let mut congestion_blocked = false; + + for &path_id in &path_ids { + // Update per path state + transmit.set_segment_size(self.path_data(path_id).current_mtu().into()); + + if let Some(challenge) = self.send_prev_path_challenge(now, &mut transmit, path_id) { + return Some(challenge); + } + + match self.poll_transmit_path(now, &mut transmit, path_id, have_available_path, close) { + PathSendStatus::SendTransmit { transmit: t } => match t { + None => { + let transmit = self.build_transmit(path_id, transmit); + return Some(transmit); + } + Some(transmit) => { + return Some(transmit); + } + }, + PathSendStatus::NothingToSend { + congestion_blocked: cb, + } => { + // TODO: congestion blocked should be per path? how to deal with this better + + // move to next path + if cb { + // note congestion block + congestion_blocked = true; + } + } + } + // Nothing more to send. + trace!(%path_id, "nothing to send on path"); + } + + // We didn't produce any application data packet + debug_assert!( + transmit.is_empty(), + "there was data in the transmit, but it was not sent" + ); + self.app_limited = !congestion_blocked; + + if self.state.is_established() { + // Try MTU probing now + for path_id in path_ids { + self.poll_transmit_mtu_probe(now, &mut transmit, path_id); + if !transmit.is_empty() { + let transmit = self.build_transmit(path_id, transmit); + return Some(transmit); + } + } + } + + None + } + + fn build_transmit(&mut self, path_id: PathId, transmit: TransmitBuf<'_>) -> Transmit { + let network_path = self.path_data(path_id).network_path; + trace!( + segment_size = transmit.segment_size(), + last_datagram_len = transmit.len() % transmit.segment_size(), + %network_path, + "sending {} bytes in {} datagrams", + transmit.len(), + transmit.num_datagrams() + ); + self.path_data_mut(path_id) + .inc_total_sent(transmit.len() as u64); + + self.stats + .udp_tx + .on_sent(transmit.num_datagrams() as u64, transmit.len()); + + Transmit { + destination: network_path.remote, + size: transmit.len(), + ecn: if self.path_data(path_id).sending_ecn { + Some(EcnCodepoint::Ect0) + } else { + None + }, + segment_size: match transmit.num_datagrams() { + 1 => None, + _ => Some(transmit.segment_size()), + }, + src_ip: network_path.local_ip, + } + } + + /// poll_transmit logic for a specific path + #[must_use] + fn poll_transmit_path( + &mut self, + now: Instant, + transmit: &mut TransmitBuf<'_>, + path_id: PathId, + have_available_path: bool, + close: bool, + ) -> PathSendStatus { // Whether this packet can be coalesced with another one in the same datagram. let mut coalesce = true; @@ -992,40 +1122,86 @@ impl Connection { // to at least MIN_INITIAL_SIZE, or to the maximum segment size if this is smaller. let mut pad_datagram = PadDatagram::No; - // Whether congestion control stopped the next packet from being sent. Further - // packets could still be built, as e.g. tail-loss probes are not congestion - // limited. - let mut congestion_blocked = false; - // The packet number of the last built packet. let mut last_packet_number = None; - let mut path_id = *self.paths.first_key_value().expect("one path must exist").0; + // Iterate over the available spaces + for space_id in SpaceId::iter() { + // Only Path0 uses non Data space ids + if path_id != PathId::ZERO && space_id != SpaceId::Data { + continue; + } - // If there is any open, validated and available path we only want to send frames to - // any backup path that must be sent on that backup path exclusively. - let have_available_path = self.paths.iter().any(|(id, path)| { - path.data.validated - && path.data.local_status() == PathStatus::Available - && self.rem_cids.contains_key(id) - }); + let res = self.poll_transmit_path_space( + now, + transmit, + path_id, + space_id, + have_available_path, + close, + &mut coalesce, + &mut pad_datagram, + &mut last_packet_number, + ); + match res { + PathSpaceStatus::Send => { + break; + } + PathSpaceStatus::SendTransmit { transmit } => { + return PathSendStatus::SendTransmit { + transmit: Some(transmit), + }; + } + PathSpaceStatus::NothingToSend { congestion_blocked } => { + return PathSendStatus::NothingToSend { congestion_blocked }; + } + PathSpaceStatus::NextSpace => { + // moving onto the next space + } + } + } - // Setup for the first path_id - let mut transmit = TransmitBuf::new( - buf, - max_datagrams, - self.path_data(path_id).current_mtu().into(), + if let Some(last_packet_number) = last_packet_number { + // Note that when sending in multiple packet spaces the last packet number will + // be the one from the highest packet space. + self.path_data_mut(path_id).congestion.on_sent( + now, + transmit.len() as u64, + last_packet_number, + ); + } + + self.qlog.emit_recovery_metrics( + path_id, + &mut self.paths.get_mut(&path_id).unwrap().data, + now, ); - if let Some(challenge) = self.send_prev_path_challenge(now, &mut transmit, path_id) { - return Some(challenge); + + if transmit.is_empty() { + PathSendStatus::NothingToSend { + congestion_blocked: false, + } + } else { + PathSendStatus::SendTransmit { transmit: None } } - let mut space_id = match path_id { - PathId::ZERO => SpaceId::Initial, - _ => SpaceId::Data, - }; + } + /// poll_transmit logic for a path_id - space_id combination + #[must_use] + fn poll_transmit_path_space( + &mut self, + now: Instant, + transmit: &mut TransmitBuf<'_>, + path_id: PathId, + space_id: SpaceId, + have_available_path: bool, + close: bool, + coalesce: &mut bool, + pad_datagram: &mut PadDatagram, + last_packet_number: &mut Option, + ) -> PathSpaceStatus { loop { - // check if there is at least one active CID to use for sending + // Check if there is at least one active CID to use for sending let Some(remote_cid) = self.rem_cids.get(&path_id).map(CidQueue::active) else { let err = PathError::RemoteCidsExhausted; if !self.abandoned_paths.contains(&path_id) { @@ -1051,36 +1227,12 @@ impl Connection { trace!(%path_id, "remote CIDs retired for abandoned path"); } - match self.paths.keys().find(|&&next| next > path_id) { - Some(next_path_id) => { - // See if this next path can send anything. - path_id = *next_path_id; - space_id = SpaceId::Data; - - // update per path state - transmit.set_segment_size(self.path_data(path_id).current_mtu().into()); - if let Some(challenge) = - self.send_prev_path_challenge(now, &mut transmit, path_id) - { - return Some(challenge); - } - - continue; - } - None => { - // Nothing more to send. - trace!( - ?space_id, - %path_id, - "no CIDs to send on path, no more paths" - ); - break; - } - } + return PathSpaceStatus::NothingToSend { + congestion_blocked: false, + }; }; - // Determine if anything can be sent in this packet number space (SpaceId + - // PathId). + // Determine if anything can be sent in this packet number space (SpaceId + PathId). let max_packet_size = if transmit.datagram_remaining_mut() > 0 { // We are trying to coalesce another packet into this datagram. transmit.datagram_remaining_mut() @@ -1088,7 +1240,9 @@ impl Connection { // A new datagram needs to be started. transmit.segment_size() }; + let can_send = self.space_can_send(space_id, path_id, max_packet_size, close); + let path_should_send = { let path_exclusive_only = space_id == SpaceId::Data && have_available_path @@ -1106,25 +1260,29 @@ impl Connection { if self.spaces[space_id].crypto.is_some() { trace!(?space_id, %path_id, "nothing to send in space"); } - space_id = space_id.next(); - continue; + return PathSpaceStatus::NextSpace; } let send_blocked = if path_should_send && transmit.datagram_remaining_mut() == 0 { // Only check congestion control if a new datagram is needed. - self.path_congestion_check(space_id, path_id, &transmit, &can_send, now) + self.path_congestion_check(space_id, path_id, transmit, &can_send, now) } else { PathBlocked::No }; - if send_blocked != PathBlocked::No { + + // Whether congestion control stopped the next packet from being sent. Further + // packets could still be built, as e.g. tail-loss probes are not congestion + // limited. + let congestion_blocked = if send_blocked != PathBlocked::No { trace!(?space_id, %path_id, ?send_blocked, "congestion blocked"); - congestion_blocked = true; - } + true + } else { + false + }; if send_blocked != PathBlocked::No && space_id < SpaceId::Data { // Higher spaces might still have tail-loss probes to send, which are not // congestion blocked. - space_id = space_id.next(); - continue; + return PathSpaceStatus::NextSpace; } if !path_should_send || send_blocked != PathBlocked::No { // Nothing more to send on this path, check the next path if possible. @@ -1132,49 +1290,16 @@ impl Connection { // If there are any datagrams in the transmit, packets for another path can // not be built. if transmit.num_datagrams() > 0 { - break; + return PathSpaceStatus::Send; } - match self.paths.keys().find(|&&next| next > path_id) { - Some(next_path_id) => { - // See if this next path can send anything. - trace!( - ?space_id, - %path_id, - %next_path_id, - "nothing to send on path" - ); - path_id = *next_path_id; - space_id = SpaceId::Data; - - // update per path state - transmit.set_segment_size(self.path_data(path_id).current_mtu().into()); - if let Some(challenge) = - self.send_prev_path_challenge(now, &mut transmit, path_id) - { - return Some(challenge); - } - - continue; - } - None => { - // Nothing more to send. - trace!( - ?space_id, - %path_id, - next_path_id=?None::, - "nothing to send on path" - ); - break; - } - } + return PathSpaceStatus::NothingToSend { congestion_blocked }; } - // If the datagram is full, we need to start a new one. if transmit.datagram_remaining_mut() == 0 { if transmit.num_datagrams() >= transmit.max_datagrams().get() { // No more datagrams allowed - break; + return PathSpaceStatus::Send; } match self.spaces[space_id].for_path(path_id).loss_probes { @@ -1201,8 +1326,8 @@ impl Connection { } } trace!(count = transmit.num_datagrams(), "new datagram started"); - coalesce = true; - pad_datagram = PadDatagram::No; + *coalesce = true; + *pad_datagram = PadDatagram::No; } // If coalescing another packet into the existing datagram, there should @@ -1227,24 +1352,26 @@ impl Connection { prev.update_unacked = false; } - let mut builder = PacketBuilder::new( + let Some(mut builder) = PacketBuilder::new( now, space_id, path_id, remote_cid, - &mut transmit, + transmit, can_send.other, self, - )?; - last_packet_number = Some(builder.exact_number); - coalesce = coalesce && !builder.short_header; + ) else { + return PathSpaceStatus::NothingToSend { congestion_blocked }; + }; + *last_packet_number = Some(builder.exact_number); + *coalesce = *coalesce && !builder.short_header; if space_id == SpaceId::Initial && (self.side.is_client() || can_send.other) { // https://www.rfc-editor.org/rfc/rfc9000.html#section-14.1 - pad_datagram |= PadDatagram::ToMinMtu; + *pad_datagram |= PadDatagram::ToMinMtu; } if space_id == SpaceId::Data && self.config.pad_to_mtu { - pad_datagram |= PadDatagram::ToSegmentSize; + *pad_datagram |= PadDatagram::ToSegmentSize; } if can_send.close { @@ -1300,19 +1427,18 @@ impl Connection { }; builder.write_frame(close.encoder(max_frame_size), stats); } - builder.finish_and_track(now, self, path_id, pad_datagram); + builder.finish_and_track(now, self, path_id, *pad_datagram); if space_id == self.highest_space { // Don't send another close packet. Even with multipath we only send // CONNECTION_CLOSE on a single path since we expect our paths to work. self.close = false; // `CONNECTION_CLOSE` is the final packet - break; + return PathSpaceStatus::Send; } else { // Send a close frame in every possible space for robustness, per // RFC9000 "Immediate Close during the Handshake". Don't bother trying // to send anything else. - space_id = space_id.next(); - continue; + return PathSpaceStatus::NextSpace; } } @@ -1332,13 +1458,16 @@ impl Connection { builder.write_frame_with_log_msg(frame, stats, Some("(off-path)")); builder.finish_and_track(now, self, path_id, PadDatagram::ToMinMtu); self.stats.udp_tx.on_sent(1, transmit.len()); - return Some(Transmit { - destination: network_path.remote, - size: transmit.len(), - ecn: None, - segment_size: None, - src_ip: network_path.local_ip, - }); + + return PathSpaceStatus::SendTransmit { + transmit: Transmit { + destination: network_path.remote, + size: transmit.len(), + ecn: None, + segment_size: None, + src_ip: network_path.local_ip, + }, + }; } } @@ -1362,7 +1491,7 @@ impl Connection { "SendableFrames was {can_send:?}, but only ACKs have been written" ); if builder.sent_frames().requires_padding { - pad_datagram |= PadDatagram::ToMinMtu; + *pad_datagram |= PadDatagram::ToMinMtu; } for (path_id, _pn) in builder.sent_frames().largest_acked.iter() { @@ -1383,7 +1512,7 @@ impl Connection { // Are we allowed to coalesce AND is there enough space for another *packet* in // this datagram AND is there another packet to send in this or the next space? - if coalesce + if *coalesce && builder .buf .datagram_remaining_mut() @@ -1416,144 +1545,81 @@ impl Connection { builder.buf.datagram_remaining_mut() - builder.predict_packet_end() ); builder.finish_and_track(now, self, path_id, PadDatagram::No); - break; + return PathSpaceStatus::Send; } // Pad the current datagram to GSO segment size so it can be // included in the GSO batch. builder.finish_and_track(now, self, path_id, PadDatagram::ToSegmentSize); } else { - builder.finish_and_track(now, self, path_id, pad_datagram); + builder.finish_and_track(now, self, path_id, *pad_datagram); } if transmit.num_datagrams() == 1 { transmit.clip_datagram_size(); } } } + } - if let Some(last_packet_number) = last_packet_number { - // Note that when sending in multiple packet spaces the last packet number will - // be the one from the highest packet space. - self.path_data_mut(path_id).congestion.on_sent( - now, - transmit.len() as u64, - last_packet_number, - ); - } - - self.qlog.emit_recovery_metrics( - path_id, - &mut self.paths.get_mut(&path_id).unwrap().data, - now, - ); - - self.app_limited = transmit.is_empty() && !congestion_blocked; - - // Send MTU probe if necessary - if transmit.is_empty() && self.state.is_established() { - // MTU probing happens only in Data space. - let space_id = SpaceId::Data; - path_id = *self.paths.first_key_value().expect("one path must exist").0; - let probe_data = loop { - // We MTU probe all paths for which all of the following is true: - // - We have an active destination CID for the path. - // - The remote address *and* path are validated. - // - The path is not abandoned. - // - The MTU Discovery subsystem wants to probe the path. - let active_cid = self.rem_cids.get(&path_id).map(CidQueue::active); - let eligible = self.path_data(path_id).validated - && !self.path_data(path_id).is_validating_path() - && !self.abandoned_paths.contains(&path_id); - let probe_size = eligible - .then(|| { - let next_pn = self.spaces[space_id].for_path(path_id).peek_tx_number(); - self.path_data_mut(path_id).mtud.poll_transmit(now, next_pn) - }) - .flatten(); - match (active_cid, probe_size) { - (Some(active_cid), Some(probe_size)) => { - // Let's send an MTUD probe! - break Some((active_cid, probe_size)); - } - _ => { - // Find the next path to check if it needs an MTUD probe. - match self.paths.keys().find(|&&next| next > path_id) { - Some(next) => { - path_id = *next; - continue; - } - None => break None, - } - } - } - }; - if let Some((active_cid, probe_size)) = probe_data { - // We are definitely sending a DPLPMTUD probe. - debug_assert_eq!(transmit.num_datagrams(), 0); - transmit.start_new_datagram_with_size(probe_size as usize); + fn poll_transmit_mtu_probe( + &mut self, + now: Instant, + transmit: &mut TransmitBuf<'_>, + path_id: PathId, + ) { + // MTU probing happens only in Data space. + let space_id = SpaceId::Data; + let probe_data = { + // We MTU probe all paths for which all of the following is true: + // - We have an active destination CID for the path. + // - The remote address *and* path are validated. + // - The path is not abandoned. + // - The MTU Discovery subsystem wants to probe the path. + let active_cid = self.rem_cids.get(&path_id).map(CidQueue::active); + let eligible = self.path_data(path_id).validated + && !self.path_data(path_id).is_validating_path() + && !self.abandoned_paths.contains(&path_id); + let probe_size = eligible + .then(|| { + let next_pn = self.spaces[space_id].for_path(path_id).peek_tx_number(); + self.path_data_mut(path_id).mtud.poll_transmit(now, next_pn) + }) + .flatten(); - let mut builder = PacketBuilder::new( - now, - space_id, - path_id, - active_cid, - &mut transmit, - true, - self, - )?; + match (active_cid, probe_size) { + (Some(active_cid), Some(probe_size)) => Some((active_cid, probe_size)), + _ => None, + } + }; - // We implement MTU probes as ping packets padded up to the probe size - trace!(?probe_size, "writing MTUD probe"); - builder.write_frame(frame::Ping, &mut self.stats.frame_tx); + // Let's send an MTUD probe! + if let Some((active_cid, probe_size)) = probe_data { + // We are definitely sending a DPLPMTUD probe. + debug_assert_eq!(transmit.num_datagrams(), 0); + transmit.start_new_datagram_with_size(probe_size as usize); - // If supported by the peer, we want no delays to the probe's ACK - if self.peer_supports_ack_frequency() { - builder.write_frame(frame::ImmediateAck, &mut self.stats.frame_tx); - } + let Some(mut builder) = + PacketBuilder::new(now, space_id, path_id, active_cid, transmit, true, self) + else { + return; + }; - builder.finish_and_track(now, self, path_id, PadDatagram::ToSize(probe_size)); + // We implement MTU probes as ping packets padded up to the probe size + trace!(?probe_size, "writing MTUD probe"); + builder.write_frame(frame::Ping, &mut self.stats.frame_tx); - self.path_stats - .entry(path_id) - .or_default() - .sent_plpmtud_probes += 1; + // If supported by the peer, we want no delays to the probe's ACK + if self.peer_supports_ack_frequency() { + builder.write_frame(frame::ImmediateAck, &mut self.stats.frame_tx); } - } - - if transmit.is_empty() { - return None; - } - - let network_path = self.path_data(path_id).network_path; - trace!( - segment_size = transmit.segment_size(), - last_datagram_len = transmit.len() % transmit.segment_size(), - %network_path, - "sending {} bytes in {} datagrams", - transmit.len(), - transmit.num_datagrams() - ); - self.path_data_mut(path_id) - .inc_total_sent(transmit.len() as u64); - self.stats - .udp_tx - .on_sent(transmit.num_datagrams() as u64, transmit.len()); + builder.finish_and_track(now, self, path_id, PadDatagram::ToSize(probe_size)); - Some(Transmit { - destination: network_path.remote, - size: transmit.len(), - ecn: if self.path_data(path_id).sending_ecn { - Some(EcnCodepoint::Ect0) - } else { - None - }, - segment_size: match transmit.num_datagrams() { - 1 => None, - _ => Some(transmit.segment_size()), - }, - src_ip: network_path.local_ip, - }) + self.path_stats + .entry(path_id) + .or_default() + .sent_plpmtud_probes += 1; + } } /// Returns the [`SpaceId`] of the next packet space which has data to send From d5f83860181fae698822754c0e308d3db517cc9c Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Sat, 13 Dec 2025 23:22:11 +0100 Subject: [PATCH 02/24] refactor: return last_packet_number --- quinn-proto/src/connection/mod.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index c268223a4..4d780958f 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -331,7 +331,7 @@ enum PathSendStatus { enum PathSpaceStatus { NextSpace, NothingToSend { congestion_blocked: bool }, - Send, + Send { last_packet_number: Option }, SendTransmit { transmit: Transmit }, } @@ -1141,10 +1141,12 @@ impl Connection { close, &mut coalesce, &mut pad_datagram, - &mut last_packet_number, ); match res { - PathSpaceStatus::Send => { + PathSpaceStatus::Send { + last_packet_number: lp, + } => { + last_packet_number = lp; break; } PathSpaceStatus::SendTransmit { transmit } => { @@ -1198,8 +1200,8 @@ impl Connection { close: bool, coalesce: &mut bool, pad_datagram: &mut PadDatagram, - last_packet_number: &mut Option, ) -> PathSpaceStatus { + let mut last_packet_number = None; loop { // Check if there is at least one active CID to use for sending let Some(remote_cid) = self.rem_cids.get(&path_id).map(CidQueue::active) else { @@ -1290,7 +1292,7 @@ impl Connection { // If there are any datagrams in the transmit, packets for another path can // not be built. if transmit.num_datagrams() > 0 { - return PathSpaceStatus::Send; + return PathSpaceStatus::Send { last_packet_number }; } return PathSpaceStatus::NothingToSend { congestion_blocked }; @@ -1299,7 +1301,7 @@ impl Connection { if transmit.datagram_remaining_mut() == 0 { if transmit.num_datagrams() >= transmit.max_datagrams().get() { // No more datagrams allowed - return PathSpaceStatus::Send; + return PathSpaceStatus::Send { last_packet_number }; } match self.spaces[space_id].for_path(path_id).loss_probes { @@ -1363,7 +1365,7 @@ impl Connection { ) else { return PathSpaceStatus::NothingToSend { congestion_blocked }; }; - *last_packet_number = Some(builder.exact_number); + last_packet_number = Some(builder.exact_number); *coalesce = *coalesce && !builder.short_header; if space_id == SpaceId::Initial && (self.side.is_client() || can_send.other) { @@ -1433,7 +1435,7 @@ impl Connection { // CONNECTION_CLOSE on a single path since we expect our paths to work. self.close = false; // `CONNECTION_CLOSE` is the final packet - return PathSpaceStatus::Send; + return PathSpaceStatus::Send { last_packet_number }; } else { // Send a close frame in every possible space for robustness, per // RFC9000 "Immediate Close during the Handshake". Don't bother trying @@ -1545,7 +1547,7 @@ impl Connection { builder.buf.datagram_remaining_mut() - builder.predict_packet_end() ); builder.finish_and_track(now, self, path_id, PadDatagram::No); - return PathSpaceStatus::Send; + return PathSpaceStatus::Send { last_packet_number }; } // Pad the current datagram to GSO segment size so it can be From 382ad3d11c491195ff5b1edbbd334d2b05a6d15c Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Sat, 13 Dec 2025 23:26:11 +0100 Subject: [PATCH 03/24] cleanup status enums --- quinn-proto/src/connection/mod.rs | 73 +++++++++++++++---------------- 1 file changed, 35 insertions(+), 38 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 4d780958f..f2987d7b5 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -323,16 +323,18 @@ pub struct Connection { } #[derive(Debug)] -enum PathSendStatus { - SendTransmit { transmit: Option }, +enum PollPathStatus { + Send, + SendTransmit { transmit: Transmit }, NothingToSend { congestion_blocked: bool }, } -enum PathSpaceStatus { +#[derive(Debug)] +enum PollPathSpaceStatus { NextSpace, - NothingToSend { congestion_blocked: bool }, Send { last_packet_number: Option }, SendTransmit { transmit: Transmit }, + NothingToSend { congestion_blocked: bool }, } impl Connection { @@ -1026,16 +1028,14 @@ impl Connection { } match self.poll_transmit_path(now, &mut transmit, path_id, have_available_path, close) { - PathSendStatus::SendTransmit { transmit: t } => match t { - None => { - let transmit = self.build_transmit(path_id, transmit); - return Some(transmit); - } - Some(transmit) => { - return Some(transmit); - } - }, - PathSendStatus::NothingToSend { + PollPathStatus::SendTransmit { transmit } => { + return Some(transmit); + } + PollPathStatus::Send => { + let transmit = self.build_transmit(path_id, transmit); + return Some(transmit); + } + PollPathStatus::NothingToSend { congestion_blocked: cb, } => { // TODO: congestion blocked should be per path? how to deal with this better @@ -1114,7 +1114,7 @@ impl Connection { path_id: PathId, have_available_path: bool, close: bool, - ) -> PathSendStatus { + ) -> PollPathStatus { // Whether this packet can be coalesced with another one in the same datagram. let mut coalesce = true; @@ -1143,21 +1143,19 @@ impl Connection { &mut pad_datagram, ); match res { - PathSpaceStatus::Send { + PollPathSpaceStatus::Send { last_packet_number: lp, } => { last_packet_number = lp; break; } - PathSpaceStatus::SendTransmit { transmit } => { - return PathSendStatus::SendTransmit { - transmit: Some(transmit), - }; + PollPathSpaceStatus::SendTransmit { transmit } => { + return PollPathStatus::SendTransmit { transmit }; } - PathSpaceStatus::NothingToSend { congestion_blocked } => { - return PathSendStatus::NothingToSend { congestion_blocked }; + PollPathSpaceStatus::NothingToSend { congestion_blocked } => { + return PollPathStatus::NothingToSend { congestion_blocked }; } - PathSpaceStatus::NextSpace => { + PollPathSpaceStatus::NextSpace => { // moving onto the next space } } @@ -1180,11 +1178,11 @@ impl Connection { ); if transmit.is_empty() { - PathSendStatus::NothingToSend { + PollPathStatus::NothingToSend { congestion_blocked: false, } } else { - PathSendStatus::SendTransmit { transmit: None } + PollPathStatus::Send } } @@ -1200,7 +1198,7 @@ impl Connection { close: bool, coalesce: &mut bool, pad_datagram: &mut PadDatagram, - ) -> PathSpaceStatus { + ) -> PollPathSpaceStatus { let mut last_packet_number = None; loop { // Check if there is at least one active CID to use for sending @@ -1229,7 +1227,7 @@ impl Connection { trace!(%path_id, "remote CIDs retired for abandoned path"); } - return PathSpaceStatus::NothingToSend { + return PollPathSpaceStatus::NothingToSend { congestion_blocked: false, }; }; @@ -1262,7 +1260,7 @@ impl Connection { if self.spaces[space_id].crypto.is_some() { trace!(?space_id, %path_id, "nothing to send in space"); } - return PathSpaceStatus::NextSpace; + return PollPathSpaceStatus::NextSpace; } let send_blocked = if path_should_send && transmit.datagram_remaining_mut() == 0 { @@ -1284,7 +1282,7 @@ impl Connection { if send_blocked != PathBlocked::No && space_id < SpaceId::Data { // Higher spaces might still have tail-loss probes to send, which are not // congestion blocked. - return PathSpaceStatus::NextSpace; + return PollPathSpaceStatus::NextSpace; } if !path_should_send || send_blocked != PathBlocked::No { // Nothing more to send on this path, check the next path if possible. @@ -1292,16 +1290,16 @@ impl Connection { // If there are any datagrams in the transmit, packets for another path can // not be built. if transmit.num_datagrams() > 0 { - return PathSpaceStatus::Send { last_packet_number }; + return PollPathSpaceStatus::Send { last_packet_number }; } - return PathSpaceStatus::NothingToSend { congestion_blocked }; + return PollPathSpaceStatus::NothingToSend { congestion_blocked }; } if transmit.datagram_remaining_mut() == 0 { if transmit.num_datagrams() >= transmit.max_datagrams().get() { // No more datagrams allowed - return PathSpaceStatus::Send { last_packet_number }; + return PollPathSpaceStatus::Send { last_packet_number }; } match self.spaces[space_id].for_path(path_id).loss_probes { @@ -1363,7 +1361,7 @@ impl Connection { can_send.other, self, ) else { - return PathSpaceStatus::NothingToSend { congestion_blocked }; + return PollPathSpaceStatus::NothingToSend { congestion_blocked }; }; last_packet_number = Some(builder.exact_number); *coalesce = *coalesce && !builder.short_header; @@ -1435,12 +1433,12 @@ impl Connection { // CONNECTION_CLOSE on a single path since we expect our paths to work. self.close = false; // `CONNECTION_CLOSE` is the final packet - return PathSpaceStatus::Send { last_packet_number }; + return PollPathSpaceStatus::Send { last_packet_number }; } else { // Send a close frame in every possible space for robustness, per // RFC9000 "Immediate Close during the Handshake". Don't bother trying // to send anything else. - return PathSpaceStatus::NextSpace; + return PollPathSpaceStatus::NextSpace; } } @@ -1460,8 +1458,7 @@ impl Connection { builder.write_frame_with_log_msg(frame, stats, Some("(off-path)")); builder.finish_and_track(now, self, path_id, PadDatagram::ToMinMtu); self.stats.udp_tx.on_sent(1, transmit.len()); - - return PathSpaceStatus::SendTransmit { + return PollPathSpaceStatus::SendTransmit { transmit: Transmit { destination: network_path.remote, size: transmit.len(), @@ -1547,7 +1544,7 @@ impl Connection { builder.buf.datagram_remaining_mut() - builder.predict_packet_end() ); builder.finish_and_track(now, self, path_id, PadDatagram::No); - return PathSpaceStatus::Send { last_packet_number }; + return PollPathSpaceStatus::Send { last_packet_number }; } // Pad the current datagram to GSO segment size so it can be From 9daa159cf037a06e90ab584da1ae8e3d53e7c470 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Sat, 13 Dec 2025 23:36:20 +0100 Subject: [PATCH 04/24] refactor: streamline mtu probe building --- quinn-proto/src/connection/mod.rs | 102 +++++++++++++++++------------- 1 file changed, 58 insertions(+), 44 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index f2987d7b5..7668f8e00 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -1061,6 +1061,8 @@ impl Connection { if self.state.is_established() { // Try MTU probing now for path_id in path_ids { + // Update per path state + transmit.set_segment_size(self.path_data(path_id).current_mtu().into()); self.poll_transmit_mtu_probe(now, &mut transmit, path_id); if !transmit.is_empty() { let transmit = self.build_transmit(path_id, transmit); @@ -1073,6 +1075,11 @@ impl Connection { } fn build_transmit(&mut self, path_id: PathId, transmit: TransmitBuf<'_>) -> Transmit { + debug_assert!( + !transmit.is_empty(), + "must not be called with an empty transmit buffer" + ); + let network_path = self.path_data(path_id).network_path; trace!( segment_size = transmit.segment_size(), @@ -1566,59 +1573,66 @@ impl Connection { transmit: &mut TransmitBuf<'_>, path_id: PathId, ) { - // MTU probing happens only in Data space. - let space_id = SpaceId::Data; - let probe_data = { - // We MTU probe all paths for which all of the following is true: - // - We have an active destination CID for the path. - // - The remote address *and* path are validated. - // - The path is not abandoned. - // - The MTU Discovery subsystem wants to probe the path. - let active_cid = self.rem_cids.get(&path_id).map(CidQueue::active); - let eligible = self.path_data(path_id).validated - && !self.path_data(path_id).is_validating_path() - && !self.abandoned_paths.contains(&path_id); - let probe_size = eligible - .then(|| { - let next_pn = self.spaces[space_id].for_path(path_id).peek_tx_number(); - self.path_data_mut(path_id).mtud.poll_transmit(now, next_pn) - }) - .flatten(); + let Some((active_cid, probe_size)) = self.get_mtu_probe_data(now, path_id) else { + return; + }; - match (active_cid, probe_size) { - (Some(active_cid), Some(probe_size)) => Some((active_cid, probe_size)), - _ => None, - } + // We are definitely sending a DPLPMTUD probe. + debug_assert_eq!(transmit.num_datagrams(), 0); + transmit.start_new_datagram_with_size(probe_size as usize); + + let Some(mut builder) = PacketBuilder::new( + now, + SpaceId::Data, + path_id, + active_cid, + transmit, + true, + self, + ) else { + return; }; - // Let's send an MTUD probe! - if let Some((active_cid, probe_size)) = probe_data { - // We are definitely sending a DPLPMTUD probe. - debug_assert_eq!(transmit.num_datagrams(), 0); - transmit.start_new_datagram_with_size(probe_size as usize); + // We implement MTU probes as ping packets padded up to the probe size + trace!(?probe_size, "writing MTUD probe"); + builder.write_frame(frame::Ping, &mut self.stats.frame_tx); - let Some(mut builder) = - PacketBuilder::new(now, space_id, path_id, active_cid, transmit, true, self) - else { - return; - }; + // If supported by the peer, we want no delays to the probe's ACK + if self.peer_supports_ack_frequency() { + builder.write_frame(frame::ImmediateAck, &mut self.stats.frame_tx); + } - // We implement MTU probes as ping packets padded up to the probe size - trace!(?probe_size, "writing MTUD probe"); - builder.write_frame(frame::Ping, &mut self.stats.frame_tx); + builder.finish_and_track(now, self, path_id, PadDatagram::ToSize(probe_size)); - // If supported by the peer, we want no delays to the probe's ACK - if self.peer_supports_ack_frequency() { - builder.write_frame(frame::ImmediateAck, &mut self.stats.frame_tx); - } + self.path_stats + .entry(path_id) + .or_default() + .sent_plpmtud_probes += 1; + } - builder.finish_and_track(now, self, path_id, PadDatagram::ToSize(probe_size)); + fn get_mtu_probe_data(&mut self, now: Instant, path_id: PathId) -> Option<(ConnectionId, u16)> { + // We MTU probe all paths for which all of the following is true: + // - We have an active destination CID for the path. + // - The remote address *and* path are validated. + // - The path is not abandoned. + // - The MTU Discovery subsystem wants to probe the path. + let active_cid = self.rem_cids.get(&path_id).map(CidQueue::active)?; + let is_eligible = self.path_data(path_id).validated + && !self.path_data(path_id).is_validating_path() + && !self.abandoned_paths.contains(&path_id); - self.path_stats - .entry(path_id) - .or_default() - .sent_plpmtud_probes += 1; + if !is_eligible { + return None; } + let next_pn = self.spaces[SpaceId::Data] + .for_path(path_id) + .peek_tx_number(); + let probe_size = self + .path_data_mut(path_id) + .mtud + .poll_transmit(now, next_pn)?; + + Some((active_cid, probe_size)) } /// Returns the [`SpaceId`] of the next packet space which has data to send From 6e3106d4287a687664f63e72895dd808838521d6 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Sat, 13 Dec 2025 23:41:35 +0100 Subject: [PATCH 05/24] refactor: extract remote cid exhaustion handling into function --- quinn-proto/src/connection/mod.rs | 49 ++++++++++++++++--------------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 7668f8e00..b9ee7090c 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -1210,29 +1210,7 @@ impl Connection { loop { // Check if there is at least one active CID to use for sending let Some(remote_cid) = self.rem_cids.get(&path_id).map(CidQueue::active) else { - let err = PathError::RemoteCidsExhausted; - if !self.abandoned_paths.contains(&path_id) { - debug!(?err, %path_id, "no active CID for path"); - self.events.push_back(Event::Path(PathEvent::LocallyClosed { - id: path_id, - error: err, - })); - // Locally we should have refused to open this path, the remote should - // have given us CIDs for this path before opening it. So we can always - // abandon this here. - self.close_path( - now, - path_id, - TransportErrorCode::NO_CID_AVAILABLE_FOR_PATH.into(), - ) - .ok(); - self.spaces[SpaceId::Data] - .pending - .path_cids_blocked - .insert(path_id); - } else { - trace!(%path_id, "remote CIDs retired for abandoned path"); - } + self.on_remote_cids_exhausted(now, path_id); return PollPathSpaceStatus::NothingToSend { congestion_blocked: false, @@ -1567,6 +1545,31 @@ impl Connection { } } + fn on_remote_cids_exhausted(&mut self, now: Instant, path_id: PathId) { + if self.abandoned_paths.contains(&path_id) { + trace!(%path_id, "remote CIDs retired for abandoned path"); + return; + } + + let error = PathError::RemoteCidsExhausted; + debug!(?error, %path_id, "no active CID for path"); + self.events + .push_back(Event::Path(PathEvent::LocallyClosed { id: path_id, error })); + // Locally we should have refused to open this path, the remote should + // have given us CIDs for this path before opening it. So we can always + // abandon this here. + self.close_path( + now, + path_id, + TransportErrorCode::NO_CID_AVAILABLE_FOR_PATH.into(), + ) + .ok(); + self.spaces[SpaceId::Data] + .pending + .path_cids_blocked + .insert(path_id); + } + fn poll_transmit_mtu_probe( &mut self, now: Instant, From c4d87c920d1e71a46dd753a88cd620b4cb2febc4 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Sat, 13 Dec 2025 23:49:53 +0100 Subject: [PATCH 06/24] refactor: move remote_cid check per path --- quinn-proto/src/connection/mod.rs | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index b9ee7090c..991f028a3 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -1122,6 +1122,15 @@ impl Connection { have_available_path: bool, close: bool, ) -> PollPathStatus { + // Check if there is at least one active CID to use for sending + let Some(remote_cid) = self.rem_cids.get(&path_id).map(CidQueue::active) else { + self.on_remote_cids_exhausted(now, path_id); + + return PollPathStatus::NothingToSend { + congestion_blocked: false, + }; + }; + // Whether this packet can be coalesced with another one in the same datagram. let mut coalesce = true; @@ -1144,6 +1153,7 @@ impl Connection { transmit, path_id, space_id, + remote_cid, have_available_path, close, &mut coalesce, @@ -1201,22 +1211,15 @@ impl Connection { transmit: &mut TransmitBuf<'_>, path_id: PathId, space_id: SpaceId, + remote_cid: ConnectionId, have_available_path: bool, close: bool, coalesce: &mut bool, pad_datagram: &mut PadDatagram, ) -> PollPathSpaceStatus { let mut last_packet_number = None; - loop { - // Check if there is at least one active CID to use for sending - let Some(remote_cid) = self.rem_cids.get(&path_id).map(CidQueue::active) else { - self.on_remote_cids_exhausted(now, path_id); - - return PollPathSpaceStatus::NothingToSend { - congestion_blocked: false, - }; - }; + loop { // Determine if anything can be sent in this packet number space (SpaceId + PathId). let max_packet_size = if transmit.datagram_remaining_mut() > 0 { // We are trying to coalesce another packet into this datagram. From f3c8a40c94ac293d945e150556b8ddcbef1828c6 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Sun, 14 Dec 2025 00:14:30 +0100 Subject: [PATCH 07/24] refactor: simplify send space check --- quinn-proto/src/connection/mod.rs | 23 ++++++++++------------- quinn-proto/src/packet.rs | 10 +++++----- 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 991f028a3..fc811f282 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -1505,9 +1505,7 @@ impl Connection { .datagram_remaining_mut() .saturating_sub(builder.predict_packet_end()) > MIN_PACKET_SPACE - && self - .next_send_space(space_id, path_id, builder.buf, close) - .is_some() + && self.has_next_send_space(space_id, path_id, builder.buf, close) { // We can append/coalesce the next packet into the current // datagram. Finish the current packet without adding extra padding. @@ -1641,17 +1639,17 @@ impl Connection { Some((active_cid, probe_size)) } - /// Returns the [`SpaceId`] of the next packet space which has data to send + /// Returns if there is anext packet space which has data to send /// /// This takes into account the space available to frames in the next datagram. // TODO(flub): This duplication is not nice. - fn next_send_space( + fn has_next_send_space( &mut self, current_space_id: SpaceId, path_id: PathId, buf: &TransmitBuf<'_>, close: bool, - ) -> Option { + ) -> bool { // Number of bytes available for frames if this is a 1-RTT packet. We're guaranteed // to be able to send an individual frame at least this large in the next 1-RTT // packet. This could be generalized to support every space, but it's only needed to @@ -1662,15 +1660,14 @@ impl Connection { loop { let can_send = self.space_can_send(space_id, path_id, buf.segment_size(), close); if !can_send.is_empty() || (close && self.spaces[space_id].crypto.is_some()) { - return Some(space_id); - } - space_id = match space_id { - SpaceId::Initial => SpaceId::Handshake, - SpaceId::Handshake => SpaceId::Data, - SpaceId::Data => break, + return true; } + let Some(next_space) = space_id.next() else { + break; + }; + space_id = next_space; } - None + false } /// Checks if creating a new datagram would be blocked by congestion control diff --git a/quinn-proto/src/packet.rs b/quinn-proto/src/packet.rs index b9e7c51cf..6c40b04c1 100644 --- a/quinn-proto/src/packet.rs +++ b/quinn-proto/src/packet.rs @@ -934,12 +934,12 @@ impl SpaceId { /// Returns the next higher packet space. /// - /// Keeps returning [`SpaceId::Data`] as the highest space. - pub(crate) fn next(&self) -> Self { + /// Returns `None` if at [`SpaceId::Data`]. + pub fn next(&self) -> Option { match self { - Self::Initial => Self::Handshake, - Self::Handshake => Self::Data, - Self::Data => Self::Data, + Self::Initial => Some(Self::Handshake), + Self::Handshake => Some(Self::Data), + Self::Data => None, } } } From 3aac39dae155324308e7b9f53b3763934c2b5204 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Tue, 13 Jan 2026 16:28:45 +0100 Subject: [PATCH 08/24] fixup --- quinn-proto/src/packet.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quinn-proto/src/packet.rs b/quinn-proto/src/packet.rs index 6c40b04c1..4f3b0ba69 100644 --- a/quinn-proto/src/packet.rs +++ b/quinn-proto/src/packet.rs @@ -935,7 +935,7 @@ impl SpaceId { /// Returns the next higher packet space. /// /// Returns `None` if at [`SpaceId::Data`]. - pub fn next(&self) -> Option { + pub(crate) fn next(&self) -> Option { match self { Self::Initial => Some(Self::Handshake), Self::Handshake => Some(Self::Data), From b5c2cce39cdca2b5a3e5b43e0709c133199858b6 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Wed, 14 Jan 2026 18:08:56 +0100 Subject: [PATCH 09/24] docs(proto): describe enums --- quinn-proto/src/connection/mod.rs | 31 +++++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index fc811f282..667c343a3 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -322,19 +322,42 @@ pub struct Connection { qlog: QlogSink, } +/// Return value for `poll_transmit_path`. #[derive(Debug)] enum PollPathStatus { + /// We have something to send, and data has been accumulated + /// on the passed in `transmit`. Send, - SendTransmit { transmit: Transmit }, - NothingToSend { congestion_blocked: bool }, + /// A transmit is ready to be sent out. + SendTransmit { + /// The transmit to send. + transmit: Transmit, + }, + /// Nothing to send currently. + NothingToSend { + /// Set to `true` if we consider the current inability to send something because of congestion control + congestion_blocked: bool, + }, } +/// Return value for `poll_transmit_path_space`. #[derive(Debug)] enum PollPathSpaceStatus { + /// Nothing to send on this space, continue to the next one. NextSpace, + /// We have something to send, and data has been accumulated + /// on the passed in `transmit`. Send { last_packet_number: Option }, - SendTransmit { transmit: Transmit }, - NothingToSend { congestion_blocked: bool }, + /// A transmit is ready to be sent out. + SendTransmit { + /// The transmit to send. + transmit: Transmit, + }, + /// Nothing to send currently. + NothingToSend { + /// Set to `true` if we consider the current inability to send something because of congestion control + congestion_blocked: bool, + }, } impl Connection { From b7acb73c210c56deb241f0347475314a82af38f8 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Fri, 16 Jan 2026 15:26:09 +0100 Subject: [PATCH 10/24] wip --- quinn-proto/src/connection/mod.rs | 60 +++++++++++++++++-------------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 667c343a3..10369ce90 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -975,20 +975,7 @@ impl Connection { // Each call to poll_transmit can only send datagrams to one destination, because // all datagrams in a GSO batch are for the same destination. Therefore only - // datagrams for one Path ID are produced for each poll_transmit call. - - // TODO(flub): this is wishful thinking and not actually implemented, but perhaps it - // should be: - - // First, if we have to send a close, select a path for that. - // Next, all paths that have a PATH_CHALLENGE or PATH_RESPONSE pending. - - // For all, open, validated and AVAILABLE paths: - // - Is the path congestion blocked or pacing blocked? - // - call maybe_queue_ to ensure a tail-loss probe would be sent? - // - do we need to send a close message? - // - call can_send - // Once there's nothing more to send on the AVAILABLE paths, do the same for BACKUP paths + // datagrams for one destination address are produced for each poll_transmit call. // Check whether we need to send a close message let close = match self.state.as_type() { @@ -1008,7 +995,7 @@ impl Connection { _ => false, }; - // Check whether we need to send an ACK_FREQUENCY frame + // Schedule an ACK_FREQUENCY frame if a new one needs to be sent. if let Some(config) = &self.config.ack_frequency_config { let rtt = self .paths @@ -1023,8 +1010,23 @@ impl Connection { && self.peer_supports_ack_frequency(); } - // If there is any open, validated and available path we only want to send frames to - // any backup path that must be sent on that backup path exclusively. + // Path scheduling logic is currently as such: + // + // - For any un-validated paths we only send frames that *must* be sent on that + // path. E.g. PATH_CHALLENGE, PATH_RESPONSE. + // + // - If there are any paths with PathStatus::Available we only send any frames that + // can be sent on any path, e.g. STREAM, DATAGRAM, on these available paths. + // + // - If there are any paths with PathStatus::Available we only send frames that + // *must* be sent on a specific path to any other paths, e.g. a tail-loss probe for + // that path, PMTU probe, a keep-alive PING. + // + // For all this we use the *path_exclusive_only* boolean: If set to true, only + // frames that must be sent on the path will be built into the packet. + + // Is there any open, validated and status available path with dst CIDs? If so we'll + // want to set path_exclusive_only for any other paths. let have_available_path = self.paths.iter().any(|(id, path)| { path.data.validated && path.data.local_status() == PathStatus::Available @@ -1040,10 +1042,12 @@ impl Connection { self.path_data(path_ids[0]).current_mtu().into(), ); + // If we end up not sending anything, we need to know if that was because there was + // nothing to send or because we were congestion blocked. let mut congestion_blocked = false; for &path_id in &path_ids { - // Update per path state + // Set the segment size to this path's MTU. transmit.set_segment_size(self.path_data(path_id).current_mtu().into()); if let Some(challenge) = self.send_prev_path_challenge(now, &mut transmit, path_id) { @@ -1061,13 +1065,7 @@ impl Connection { PollPathStatus::NothingToSend { congestion_blocked: cb, } => { - // TODO: congestion blocked should be per path? how to deal with this better - - // move to next path - if cb { - // note congestion block - congestion_blocked = true; - } + congestion_blocked |= cb; } } // Nothing more to send. @@ -1079,6 +1077,7 @@ impl Connection { transmit.is_empty(), "there was data in the transmit, but it was not sent" ); + self.app_limited = !congestion_blocked; if self.state.is_established() { @@ -1118,6 +1117,7 @@ impl Connection { self.stats .udp_tx .on_sent(transmit.num_datagrams() as u64, transmit.len()); + // TODO(flub): check https://github.com/n0-computer/quinn/pull/332 changes Transmit { destination: network_path.remote, @@ -1135,7 +1135,13 @@ impl Connection { } } - /// poll_transmit logic for a specific path + /// poll_transmit logic for a specific [`PathState::data`]. + /// + /// This is not quite the same as for a multipath packet space, since [`PathId::ZERO`] + /// has 3 packet spaces, which this handles. + /// + /// This does not handle sending for [`PathState::prev`] which is handled by + /// [`Self::send_prev_path_challenge`] called directly from [`Self::poll_transmit`]. #[must_use] fn poll_transmit_path( &mut self, @@ -1149,6 +1155,8 @@ impl Connection { let Some(remote_cid) = self.rem_cids.get(&path_id).map(CidQueue::active) else { self.on_remote_cids_exhausted(now, path_id); + // TODO(flub): on_remote_cids_exhausted might have scheduled a PATH_CIDS_BLOCKED + // frame which could have been sent on an earlier path again. return PollPathStatus::NothingToSend { congestion_blocked: false, }; From d2cad6da79ee9a8f949a5d2883b8f534bc2e87ef Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Sat, 17 Jan 2026 19:07:23 +0100 Subject: [PATCH 11/24] friday was too short. make it work, maybe --- quinn-proto/src/connection/mod.rs | 368 ++++++++++++--------- quinn-proto/src/connection/transmit_buf.rs | 4 +- 2 files changed, 220 insertions(+), 152 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 10369ce90..20a1ff1ad 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -340,22 +340,28 @@ enum PollPathStatus { }, } -/// Return value for `poll_transmit_path_space`. #[derive(Debug)] -enum PollPathSpaceStatus { - /// Nothing to send on this space, continue to the next one. - NextSpace, - /// We have something to send, and data has been accumulated - /// on the passed in `transmit`. - Send { last_packet_number: Option }, - /// A transmit is ready to be sent out. - SendTransmit { - /// The transmit to send. - transmit: Transmit, +enum PollPathSpaceOutcome { + /// One or more datagrams have been written into the [`TransmitBuf`]. + Send { + /// The highest packet number. + last_packet_number: u64, + /// Whether a next packet may be coalesced into the current datagram. + /// + /// If `Some` this signals that there is still enough space in the current datagram + /// of the [`TransmitBuf`] for another packet and the packets written so far allow + /// coalescing. + /// + /// The [`PadDatagram`] value indicates whether the final datagram containing the + /// coalesced packets must have an exact size. + may_coalesce: Option, }, - /// Nothing to send currently. + /// Send a transmit directly, usually a hack to send off-path datagrams. + SendTransmit { transmit: Transmit }, + /// Nothing to send, nothing was written into the [`TransmitBuf`]. + // TODO: consider splitting this: NothingToSend, CongestionBlocked, TransmitFull NothingToSend { - /// Set to `true` if we consider the current inability to send something because of congestion control + /// If true there was data to send but congestion control did not allow so. congestion_blocked: bool, }, } @@ -1033,7 +1039,8 @@ impl Connection { && self.rem_cids.contains_key(id) }); - // TODO: how to avoid the allocation? Cannot use a for loop because of borrowing + // TODO: how to avoid the allocation? Cannot use a for loop because of + // borrowing. Maybe SmallVec or similar. let path_ids: Vec<_> = self.paths.keys().copied().collect(); let mut transmit = TransmitBuf::new( @@ -1050,10 +1057,12 @@ impl Connection { // Set the segment size to this path's MTU. transmit.set_segment_size(self.path_data(path_id).current_mtu().into()); + // Poll (some) off-path transmits first. if let Some(challenge) = self.send_prev_path_challenge(now, &mut transmit, path_id) { return Some(challenge); } + // Poll for (mostly) on-path transmits. match self.poll_transmit_path(now, &mut transmit, path_id, have_available_path, close) { PollPathStatus::SendTransmit { transmit } => { return Some(transmit); @@ -1066,10 +1075,14 @@ impl Connection { congestion_blocked: cb, } => { congestion_blocked |= cb; + // Continue checking other paths, tail-loss probes may need to be sent + // in all spaces. + debug_assert!( + transmit.is_empty(), + "nothing to send on path but transmit not empty" + ); } } - // Nothing more to send. - trace!(%path_id, "nothing to send on path"); } // We didn't produce any application data packet @@ -1153,31 +1166,37 @@ impl Connection { ) -> PollPathStatus { // Check if there is at least one active CID to use for sending let Some(remote_cid) = self.rem_cids.get(&path_id).map(CidQueue::active) else { - self.on_remote_cids_exhausted(now, path_id); - - // TODO(flub): on_remote_cids_exhausted might have scheduled a PATH_CIDS_BLOCKED - // frame which could have been sent on an earlier path again. + if self.abandoned_paths.contains(&path_id) { + trace!(%path_id, "remote CIDs retired for abandoned path"); + } else { + debug!(%path_id, "no remote CIDs for path"); + } return PollPathStatus::NothingToSend { congestion_blocked: false, }; }; - // Whether this packet can be coalesced with another one in the same datagram. - let mut coalesce = true; - // Whether the last packet in the datagram must be padded so the datagram takes up - // to at least MIN_INITIAL_SIZE, or to the maximum segment size if this is smaller. + // an exact size. let mut pad_datagram = PadDatagram::No; - // The packet number of the last built packet. + // The packet number of the last built packet. This is kept kept across spaces. This + // is only relevant for the Initial and Handshake spaces as only those can coalesce + // packets with the next space. QUIC is supposed to have a single congestion + // controller for the Initial, Handshake and Data(PathId::ZERO) spaces. let mut last_packet_number = None; + // If we end up not sending anything, we need to know if that was because there was + // nothing to send or because we were congestion blocked. + let mut congestion_blocked = false; + // Iterate over the available spaces for space_id in SpaceId::iter() { // Only Path0 uses non Data space ids if path_id != PathId::ZERO && space_id != SpaceId::Data { continue; } + tracing::warn!(?space_id, %path_id, "A SPACE"); let res = self.poll_transmit_path_space( now, @@ -1187,54 +1206,64 @@ impl Connection { remote_cid, have_available_path, close, - &mut coalesce, - &mut pad_datagram, + pad_datagram, ); + trace!(%path_id, ?space_id, ?res, "poll_transmit_space"); match res { - PollPathSpaceStatus::Send { + PollPathSpaceOutcome::Send { last_packet_number: lp, + may_coalesce, } => { - last_packet_number = lp; - break; + debug_assert!(!transmit.is_empty(), "transmit must contain packets"); + last_packet_number = Some(lp); + match may_coalesce { + Some(should_pad) => { + tracing::warn!("LETS MOVE ON"); + pad_datagram = should_pad; + continue; + } + None => break, + } } - PollPathSpaceStatus::SendTransmit { transmit } => { + PollPathSpaceOutcome::SendTransmit { transmit } => { return PollPathStatus::SendTransmit { transmit }; } - PollPathSpaceStatus::NothingToSend { congestion_blocked } => { - return PollPathStatus::NothingToSend { congestion_blocked }; - } - PollPathSpaceStatus::NextSpace => { - // moving onto the next space + PollPathSpaceOutcome::NothingToSend { + congestion_blocked: cb, + } => { + congestion_blocked = cb; + // Continue checking other spaces, tail-loss probes may need to be sent + // in all spaces. } } } - if let Some(last_packet_number) = last_packet_number { - // Note that when sending in multiple packet spaces the last packet number will - // be the one from the highest packet space. - self.path_data_mut(path_id).congestion.on_sent( + if last_packet_number.is_some() || congestion_blocked { + self.qlog.emit_recovery_metrics( + path_id, + &mut self.paths.get_mut(&path_id).unwrap().data, now, - transmit.len() as u64, - last_packet_number, ); } - self.qlog.emit_recovery_metrics( - path_id, - &mut self.paths.get_mut(&path_id).unwrap().data, - now, - ); + match last_packet_number { + Some(last_packet_number) => { + debug_assert!(!transmit.is_empty(), "packet number but transmit is empty"); + // Note that when sending in multiple packet spaces the last packet number will + // be the one from the highest packet space. + self.path_data_mut(path_id).congestion.on_sent( + now, + transmit.len() as u64, + last_packet_number, + ); - if transmit.is_empty() { - PollPathStatus::NothingToSend { - congestion_blocked: false, + PollPathStatus::Send } - } else { - PollPathStatus::Send + None => PollPathStatus::NothingToSend { congestion_blocked }, } } - /// poll_transmit logic for a path_id - space_id combination + /// poll_transmit logic for a QUIC-MULTIPATH packet number space (PathID + SpaceId). #[must_use] fn poll_transmit_path_space( &mut self, @@ -1243,17 +1272,29 @@ impl Connection { path_id: PathId, space_id: SpaceId, remote_cid: ConnectionId, + // If any other packet space has a usable path with PathStatus::Available. have_available_path: bool, + // If we need to send a CONNECTION_CLOSE frame. close: bool, - coalesce: &mut bool, - pad_datagram: &mut PadDatagram, - ) -> PollPathSpaceStatus { + // Whether the current datagram needs to be padded if no more packets can be + // coalesced into it. + mut pad_datagram: PadDatagram, + ) -> PollPathSpaceOutcome { let mut last_packet_number = None; - + let mut coalesce = true; + tracing::warn!("HELLO"); + + // Each loop of this may build one packet. If the packet allows coalescing and there + // is still enough space in the current datagram the function will return and the + // caller will call us again for the next space to fill up the remainder of the + // datagram. Otherwise the loop will restart and if there is still more data to send + // and more space in the GSO batch new datagrams will be started and new packets + // will be written. loop { // Determine if anything can be sent in this packet number space (SpaceId + PathId). let max_packet_size = if transmit.datagram_remaining_mut() > 0 { - // We are trying to coalesce another packet into this datagram. + // A datagram is started already, we are trying to coalesce another packet + // into this datagram. transmit.datagram_remaining_mut() } else { // A new datagram needs to be started. @@ -1262,7 +1303,9 @@ impl Connection { let can_send = self.space_can_send(space_id, path_id, max_packet_size, close); - let path_should_send = { + // Whether we would like to send any frames on this packet space. See the packet + // scheduling described in poll_transmit. + let space_should_send = { let path_exclusive_only = space_id == SpaceId::Data && have_available_path && self.path_data(path_id).local_status() == PathStatus::Backup; @@ -1275,50 +1318,70 @@ impl Connection { path_should_send || needs_loss_probe }; - if !path_should_send && space_id < SpaceId::Data { - if self.spaces[space_id].crypto.is_some() { - trace!(?space_id, %path_id, "nothing to send in space"); - } - return PollPathSpaceStatus::NextSpace; - } - - let send_blocked = if path_should_send && transmit.datagram_remaining_mut() == 0 { - // Only check congestion control if a new datagram is needed. - self.path_congestion_check(space_id, path_id, transmit, &can_send, now) - } else { - PathBlocked::No - }; - - // Whether congestion control stopped the next packet from being sent. Further - // packets could still be built, as e.g. tail-loss probes are not congestion - // limited. - let congestion_blocked = if send_blocked != PathBlocked::No { - trace!(?space_id, %path_id, ?send_blocked, "congestion blocked"); - true - } else { - false - }; - if send_blocked != PathBlocked::No && space_id < SpaceId::Data { - // Higher spaces might still have tail-loss probes to send, which are not - // congestion blocked. - return PollPathSpaceStatus::NextSpace; + if !space_should_send { + // Nothing more to send. Previous iterations of this loop may have built + // packets already. + return match last_packet_number { + Some(pn) => PollPathSpaceOutcome::Send { + last_packet_number: pn, + may_coalesce: if coalesce { Some(pad_datagram) } else { None }, + }, + None => { + // If the crypto for the Initial and Handshake spaces is None then those + // spaces are done with forever, no need to log them. + if space_id == SpaceId::Data || self.spaces[space_id].crypto.is_some() { + trace!(?space_id, %path_id, "nothing to send in space"); + } + return PollPathSpaceOutcome::NothingToSend { + congestion_blocked: false, + }; + } + }; } - if !path_should_send || send_blocked != PathBlocked::No { - // Nothing more to send on this path, check the next path if possible. - // If there are any datagrams in the transmit, packets for another path can - // not be built. - if transmit.num_datagrams() > 0 { - return PollPathSpaceStatus::Send { last_packet_number }; + // We want to send on this space, check congestion control if we can. But only + // if we will need to start a new datagram. If we are coalescing into an already + // started datagram we do not need to check congestion control again. + if transmit.datagram_remaining_mut() == 0 { + let congestion_blocked = + self.path_congestion_check(space_id, path_id, transmit, &can_send, now); + if congestion_blocked != PathBlocked::No { + trace!(?space_id, %path_id, ?congestion_blocked, "congestion blocked"); + // Previous iterations of this loop may have built packets already. + return match last_packet_number { + Some(pn) => PollPathSpaceOutcome::Send { + last_packet_number: pn, + may_coalesce: if coalesce { Some(pad_datagram) } else { None }, + }, + None => { + // If the crypto for the Initial and Handshake spaces is None then those + // spaces are done with forever, no need to log them. + if space_id == SpaceId::Data || self.spaces[space_id].crypto.is_some() { + trace!(?space_id, %path_id, "nothing to send in space"); + } + return PollPathSpaceOutcome::NothingToSend { + congestion_blocked: true, + }; + } + }; } - - return PollPathSpaceStatus::NothingToSend { congestion_blocked }; } + // If the datagram is full (or there never was one started), we need to start a + // new one. if transmit.datagram_remaining_mut() == 0 { if transmit.num_datagrams() >= transmit.max_datagrams().get() { - // No more datagrams allowed - return PollPathSpaceStatus::Send { last_packet_number }; + // No more datagrams allowed. + return match last_packet_number { + Some(pn) => PollPathSpaceOutcome::Send { + last_packet_number: pn, + may_coalesce: None, + }, + // TODO: Could consider a new return value here? + None => PollPathSpaceOutcome::NothingToSend { + congestion_blocked: false, + }, + }; } match self.spaces[space_id].for_path(path_id).loss_probes { @@ -1345,8 +1408,11 @@ impl Connection { } } trace!(count = transmit.num_datagrams(), "new datagram started"); - *coalesce = true; - *pad_datagram = PadDatagram::No; + + // We started a new datagram, initialise as allowing coalescing packets and + // no need for padding the datagram. + coalesce = true; + pad_datagram = PadDatagram::No; } // If coalescing another packet into the existing datagram, there should @@ -1380,17 +1446,27 @@ impl Connection { can_send.other, self, ) else { - return PollPathSpaceStatus::NothingToSend { congestion_blocked }; + // Confidentiality limit is exceeded and the connection has been killed. We + // should not send any other packets. This works in a roundabout way: We + // have started a datagram but not written anything into it. So even if we + // get called again for another space we will see an already started + // datagram and try and start another packet here. Then be stopped by the + // same confidentiality limit. + return PollPathSpaceOutcome::NothingToSend { + congestion_blocked: false, + }; }; last_packet_number = Some(builder.exact_number); - *coalesce = *coalesce && !builder.short_header; + let last_pn = builder.exact_number; + // TODO(flub): Use builder.can_coalesce after merging main. + coalesce = coalesce && !builder.short_header; if space_id == SpaceId::Initial && (self.side.is_client() || can_send.other) { // https://www.rfc-editor.org/rfc/rfc9000.html#section-14.1 - *pad_datagram |= PadDatagram::ToMinMtu; + pad_datagram |= PadDatagram::ToMinMtu; } if space_id == SpaceId::Data && self.config.pad_to_mtu { - *pad_datagram |= PadDatagram::ToSegmentSize; + pad_datagram |= PadDatagram::ToSegmentSize; } if can_send.close { @@ -1446,23 +1522,29 @@ impl Connection { }; builder.write_frame(close.encoder(max_frame_size), stats); } - builder.finish_and_track(now, self, path_id, *pad_datagram); + builder.finish_and_track(now, self, path_id, pad_datagram); if space_id == self.highest_space { // Don't send another close packet. Even with multipath we only send // CONNECTION_CLOSE on a single path since we expect our paths to work. self.close = false; - // `CONNECTION_CLOSE` is the final packet - return PollPathSpaceStatus::Send { last_packet_number }; - } else { - // Send a close frame in every possible space for robustness, per - // RFC9000 "Immediate Close during the Handshake". Don't bother trying - // to send anything else. - return PollPathSpaceStatus::NextSpace; } + // Send a close frame in every possible space for robustness, per + // RFC9000 "Immediate Close during the Handshake". Don't bother trying + // to send anything else. + // TODO(flub): This breaks during the handshake if we can not coalesce + // packets due to space reasons: the next space would either fail a + // debug_assert checking for enough packet space or produce an invalid + // packet. That is an existing bug however. + return PollPathSpaceOutcome::Send { + last_packet_number: last_pn, + may_coalesce: Some(pad_datagram), + }; } // Send an off-path PATH_RESPONSE. Prioritized over on-path data to ensure that // path validation can occur while the link is saturated. + // TODO(flub): This needs to be done outside of this function, e.g. together + // with the off-path path challenges of Self::send_prev_path_challenge. if space_id == SpaceId::Data && builder.buf.num_datagrams() == 1 { let path = self.path_data_mut(path_id); if let Some((token, network_path)) = @@ -1477,7 +1559,7 @@ impl Connection { builder.write_frame_with_log_msg(frame, stats, Some("(off-path)")); builder.finish_and_track(now, self, path_id, PadDatagram::ToMinMtu); self.stats.udp_tx.on_sent(1, transmit.len()); - return PollPathSpaceStatus::SendTransmit { + return PollPathSpaceOutcome::SendTransmit { transmit: Transmit { destination: network_path.remote, size: transmit.len(), @@ -1489,8 +1571,11 @@ impl Connection { } } + // If this boolean is true we only want to send frames which can not be sent on + // any other path. See the path scheduling notes in Self::poll_transmit. let path_exclusive_only = have_available_path && self.path_data(path_id).local_status() == PathStatus::Backup; + self.populate_packet(now, space_id, path_id, path_exclusive_only, &mut builder); // ACK-only packets should only be sent when explicitly allowed. If we write them due to @@ -1509,7 +1594,7 @@ impl Connection { "SendableFrames was {can_send:?}, but only ACKs have been written" ); if builder.sent_frames().requires_padding { - *pad_datagram |= PadDatagram::ToMinMtu; + pad_datagram |= PadDatagram::ToMinMtu; } for (path_id, _pn) in builder.sent_frames().largest_acked.iter() { @@ -1529,8 +1614,10 @@ impl Connection { // might be needed because of the packet type, or to fill the GSO segment size. // Are we allowed to coalesce AND is there enough space for another *packet* in - // this datagram AND is there another packet to send in this or the next space? - if *coalesce + // this datagram AND is there another packet to send in the next space? + // TODO(flub): We should only check this for the Initial and Handshake space. In + // the Data spaces we never coalesce. + if coalesce && builder .buf .datagram_remaining_mut() @@ -1540,10 +1627,14 @@ impl Connection { { // We can append/coalesce the next packet into the current // datagram. Finish the current packet without adding extra padding. + tracing::warn!("we think we can coalesce"); builder.finish_and_track(now, self, path_id, PadDatagram::No); } else { // We need a new datagram for the next packet. Finish the current // packet with padding. + // TODO(flub): if there isn't any more data to be sent, this will still pad + // to the segment size and only discover there is nothing to send before + // starting the next packet. That is wasting up to 32 bytes. if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) { // If too many padding bytes would be required to continue the // GSO batch after this packet, end the GSO batch here. Ensures @@ -1561,47 +1652,28 @@ impl Connection { builder.buf.datagram_remaining_mut() - builder.predict_packet_end() ); builder.finish_and_track(now, self, path_id, PadDatagram::No); - return PollPathSpaceStatus::Send { last_packet_number }; + return PollPathSpaceOutcome::Send { + last_packet_number: last_pn, + may_coalesce: None, + }; } // Pad the current datagram to GSO segment size so it can be // included in the GSO batch. builder.finish_and_track(now, self, path_id, PadDatagram::ToSegmentSize); } else { - builder.finish_and_track(now, self, path_id, *pad_datagram); + builder.finish_and_track(now, self, path_id, pad_datagram); } + + // If this is the first datagram we set the segment size to the size of the + // first datagram. if transmit.num_datagrams() == 1 { - transmit.clip_datagram_size(); + transmit.clip_segment_size(); } } } } - fn on_remote_cids_exhausted(&mut self, now: Instant, path_id: PathId) { - if self.abandoned_paths.contains(&path_id) { - trace!(%path_id, "remote CIDs retired for abandoned path"); - return; - } - - let error = PathError::RemoteCidsExhausted; - debug!(?error, %path_id, "no active CID for path"); - self.events - .push_back(Event::Path(PathEvent::LocallyClosed { id: path_id, error })); - // Locally we should have refused to open this path, the remote should - // have given us CIDs for this path before opening it. So we can always - // abandon this here. - self.close_path( - now, - path_id, - TransportErrorCode::NO_CID_AVAILABLE_FOR_PATH.into(), - ) - .ok(); - self.spaces[SpaceId::Data] - .pending - .path_cids_blocked - .insert(path_id); - } - fn poll_transmit_mtu_probe( &mut self, now: Instant, @@ -1670,13 +1742,13 @@ impl Connection { Some((active_cid, probe_size)) } - /// Returns if there is anext packet space which has data to send + /// Returns if there is a next packet space on the same path, which has data to send. /// /// This takes into account the space available to frames in the next datagram. // TODO(flub): This duplication is not nice. fn has_next_send_space( &mut self, - current_space_id: SpaceId, + mut current_space_id: SpaceId, path_id: PathId, buf: &TransmitBuf<'_>, close: bool, @@ -1687,16 +1759,12 @@ impl Connection { // handle large fixed-size frames, which only exist in 1-RTT (application // datagrams). We don't account for coalesced packets potentially occupying space // because frames can always spill into the next datagram. - let mut space_id = current_space_id; - loop { + while let Some(space_id) = current_space_id.next() { let can_send = self.space_can_send(space_id, path_id, buf.segment_size(), close); if !can_send.is_empty() || (close && self.spaces[space_id].crypto.is_some()) { return true; } - let Some(next_space) = space_id.next() else { - break; - }; - space_id = next_space; + current_space_id = space_id; } false } diff --git a/quinn-proto/src/connection/transmit_buf.rs b/quinn-proto/src/connection/transmit_buf.rs index ce5c40c4d..57326aa76 100644 --- a/quinn-proto/src/connection/transmit_buf.rs +++ b/quinn-proto/src/connection/transmit_buf.rs @@ -140,7 +140,7 @@ impl<'a> TransmitBuf<'a> { self.num_datagrams += 1; } - /// Clips the datagram size to the current size + /// Clips the segment size to the current size /// /// Only valid for the first datagram, when the datagram might be smaller than the /// segment size. Needed before estimating the available space in the next datagram @@ -148,7 +148,7 @@ impl<'a> TransmitBuf<'a> { /// /// Use [`TransmitBuf::start_new_datagram_with_size`] if you need to reduce the size of /// the last datagram in a batch. - pub(super) fn clip_datagram_size(&mut self) { + pub(super) fn clip_segment_size(&mut self) { debug_assert_eq!(self.num_datagrams, 1); if self.buf.len() < self.segment_size { trace!( From fa271d48a7db763d53f83661443623bca5f14e4c Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Sun, 18 Jan 2026 15:19:02 +0100 Subject: [PATCH 12/24] Do coalescing right --- quinn-proto/src/connection/mod.rs | 302 +++++++++++-------- quinn-proto/src/connection/packet_builder.rs | 18 +- 2 files changed, 182 insertions(+), 138 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 76eab5fc6..b63fdb494 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -226,7 +226,7 @@ pub struct Connection { // Queued non-retransmittable 1-RTT data // /// If the CONNECTION_CLOSE frame needs to be sent - close: bool, + connection_close_pending: bool, // // ACK frequency @@ -342,28 +342,42 @@ enum PollPathStatus { #[derive(Debug)] enum PollPathSpaceOutcome { - /// One or more datagrams have been written into the [`TransmitBuf`]. - Send { + /// Nothing to send in the space, nothing was written into the [`TransmitBuf`]. + NothingToSend { + /// If true there was data to send but congestion control did not allow so. + congestion_blocked: bool, + }, + /// One or more packets have been written into the [`TransmitBuf`]. + WrotePacket { /// The highest packet number. last_packet_number: u64, - /// Whether a next packet may be coalesced into the current datagram. + /// Whether to pad an already started datagram in the next packet. + /// + /// When packets in Initial, 0-RTT or Handhake packet do not fill the entire + /// datagram they may decide to coalesce with the next packet from a higher + /// encryption level on the same path. But the earlier packet may require specific + /// size requirements for the datagram they are sent in. /// - /// If `Some` this signals that there is still enough space in the current datagram - /// of the [`TransmitBuf`] for another packet and the packets written so far allow - /// coalescing. + /// If a space did not complete the datagram, they use this to request the correct + /// padding in the final packet of the datagram so that the final datagram will have + /// the correct size. /// - /// The [`PadDatagram`] value indicates whether the final datagram containing the - /// coalesced packets must have an exact size. - may_coalesce: Option, + /// If a space did fill an entire datagram, it leaves this to the default of + /// [`PadDatagram::No`]. + pad_datagram: PadDatagram, + }, + /// Send the contents of the transmit immediately. + /// + /// Packets were written and the GSO batch must end now, regardless from whether higher + /// spaces still have frames to write. This is used when the last datagram written would + /// require too much padding to continue a GSO batch, which would waste space on the + /// wire. + Send { + /// The highest packet number written into the transmit. + last_packet_number: u64, }, /// Send a transmit directly, usually a hack to send off-path datagrams. SendTransmit { transmit: Transmit }, - /// Nothing to send, nothing was written into the [`TransmitBuf`]. - // TODO: consider splitting this: NothingToSend, CongestionBlocked, TransmitFull - NothingToSend { - /// If true there was data to send but congestion control did not allow so. - congestion_blocked: bool, - }, } impl Connection { @@ -467,7 +481,7 @@ impl Connection { }, timers: TimerTable::default(), authentication_failures: 0, - close: false, + connection_close_pending: false, ack_frequency: AckFrequencyState::new(get_max_ack_delay( &TransportParameters::default(), @@ -992,7 +1006,7 @@ impl Connection { StateType::Draining | StateType::Closed => { // self.close is only reset once the associated packet had been // encoded successfully - if !self.close { + if !self.connection_close_pending { self.app_limited = true; return None; } @@ -1154,7 +1168,9 @@ impl Connection { /// has 3 packet spaces, which this handles. /// /// This does not handle sending for [`PathState::prev`] which is handled by - /// [`Self::send_prev_path_challenge`] called directly from [`Self::poll_transmit`]. + /// [`Self::send_prev_path_challenge`] called directly from + /// [`Self::poll_transmit`]. Eventually we'd like this to only produce on-path + /// transmits. #[must_use] fn poll_transmit_path( &mut self, @@ -1177,27 +1193,27 @@ impl Connection { }; // Whether the last packet in the datagram must be padded so the datagram takes up - // an exact size. + // an exact size. An earlier space can decide to not fill an entire datagram and + // require the next space to fill it further. But may need a specific size of the + // datagram containing the packet. The final packet built in the datagram must pad + // to this size. let mut pad_datagram = PadDatagram::No; - // The packet number of the last built packet. This is kept kept across spaces. This - // is only relevant for the Initial and Handshake spaces as only those can coalesce - // packets with the next space. QUIC is supposed to have a single congestion - // controller for the Initial, Handshake and Data(PathId::ZERO) spaces. + // The packet number of the last built packet. This is kept kept across spaces. + // QUIC is supposed to have a single congestion controller for the Initial, + // Handshake and Data(PathId::ZERO) spaces. let mut last_packet_number = None; // If we end up not sending anything, we need to know if that was because there was // nothing to send or because we were congestion blocked. let mut congestion_blocked = false; - // Iterate over the available spaces + // Iterate over the available spaces. for space_id in SpaceId::iter() { - // Only Path0 uses non Data space ids + // Only PathId::ZERO uses non Data space ids. if path_id != PathId::ZERO && space_id != SpaceId::Data { continue; } - tracing::warn!(?space_id, %path_id, "A SPACE"); - let res = self.poll_transmit_path_space( now, transmit, @@ -1210,31 +1226,36 @@ impl Connection { ); trace!(%path_id, ?space_id, ?res, "poll_transmit_space"); match res { + PollPathSpaceOutcome::NothingToSend { + congestion_blocked: cb, + } => { + congestion_blocked |= cb; + // Continue checking other spaces, tail-loss probes may need to be sent + // in all spaces. + } + PollPathSpaceOutcome::WrotePacket { + last_packet_number: pn, + pad_datagram: pad, + } => { + debug_assert!(!transmit.is_empty(), "transmit must contain packets"); + last_packet_number = Some(pn); + pad_datagram = pad; + // Always check higher spaces. If the transmit is full or they have + // nothing to send they will not write packets. But if they can, they + // should always be allowed to add to this transmit because it is all + // sent to the same remote. + continue; + } PollPathSpaceOutcome::Send { - last_packet_number: lp, - may_coalesce, + last_packet_number: pn, } => { debug_assert!(!transmit.is_empty(), "transmit must contain packets"); - last_packet_number = Some(lp); - match may_coalesce { - Some(should_pad) => { - tracing::warn!("LETS MOVE ON"); - pad_datagram = should_pad; - continue; - } - None => break, - } + last_packet_number = Some(pn); + break; } PollPathSpaceOutcome::SendTransmit { transmit } => { return PollPathStatus::SendTransmit { transmit }; } - PollPathSpaceOutcome::NothingToSend { - congestion_blocked: cb, - } => { - congestion_blocked = cb; - // Continue checking other spaces, tail-loss probes may need to be sent - // in all spaces. - } } } @@ -1248,9 +1269,8 @@ impl Connection { match last_packet_number { Some(last_packet_number) => { - debug_assert!(!transmit.is_empty(), "packet number but transmit is empty"); - // Note that when sending in multiple packet spaces the last packet number will - // be the one from the highest packet space. + // Note that when sending in multiple spaces the last packet number will be + // the one from the highest space. self.path_data_mut(path_id).congestion.on_sent( now, transmit.len() as u64, @@ -1275,33 +1295,40 @@ impl Connection { // If any other packet space has a usable path with PathStatus::Available. have_available_path: bool, // If we need to send a CONNECTION_CLOSE frame. - close: bool, - // Whether the current datagram needs to be padded if no more packets can be - // coalesced into it. + connection_close_pending: bool, + // Whether the current datagram needs to be padded to a certain size. mut pad_datagram: PadDatagram, ) -> PollPathSpaceOutcome { + // Keep track of the last packet number we wrote. If None we did not write any + // packets. let mut last_packet_number = None; - let mut coalesce = true; - tracing::warn!("HELLO"); - - // Each loop of this may build one packet. If the packet allows coalescing and there - // is still enough space in the current datagram the function will return and the - // caller will call us again for the next space to fill up the remainder of the - // datagram. Otherwise the loop will restart and if there is still more data to send - // and more space in the GSO batch new datagrams will be started and new packets - // will be written. + + // Each loop of this may build one packet. It works logically as follows: + // + // - Check if something *needs* to be sent in this space and *can* be sent. + // - If not, return to the caller who will call us again for the next space. + // - Start a new datagram. + // - Unless coalescing the packet into an existing datagram. + // - Write the packet header and payload. + // - Check if coalescing a next packet into the datagram is possible. + // - If coalescing, finish packet without padding to leave space in the datagram. + // - If not coalescing, complete the datagram: + // - Finish packet with padding. + // - Set the transmit segment size if this is the first datagram. + // - Loop: next iteration will exit the loop if nothing more to send in this + // space. The TransmitBuf will contain a started datagram with space if + // coalescing, or completely filled datagram if not coalescing. loop { // Determine if anything can be sent in this packet number space (SpaceId + PathId). let max_packet_size = if transmit.datagram_remaining_mut() > 0 { - // A datagram is started already, we are trying to coalesce another packet - // into this datagram. + // A datagram is started already, we are coalescing another packet into it. transmit.datagram_remaining_mut() } else { // A new datagram needs to be started. transmit.segment_size() }; - - let can_send = self.space_can_send(space_id, path_id, max_packet_size, close); + let can_send = + self.space_can_send(space_id, path_id, max_packet_size, connection_close_pending); // Whether we would like to send any frames on this packet space. See the packet // scheduling described in poll_transmit. @@ -1322,9 +1349,9 @@ impl Connection { // Nothing more to send. Previous iterations of this loop may have built // packets already. return match last_packet_number { - Some(pn) => PollPathSpaceOutcome::Send { + Some(pn) => PollPathSpaceOutcome::WrotePacket { last_packet_number: pn, - may_coalesce: if coalesce { Some(pad_datagram) } else { None }, + pad_datagram, }, None => { // If the crypto for the Initial and Handshake spaces is None then those @@ -1346,19 +1373,13 @@ impl Connection { let congestion_blocked = self.path_congestion_check(space_id, path_id, transmit, &can_send, now); if congestion_blocked != PathBlocked::No { - trace!(?space_id, %path_id, ?congestion_blocked, "congestion blocked"); // Previous iterations of this loop may have built packets already. return match last_packet_number { - Some(pn) => PollPathSpaceOutcome::Send { + Some(pn) => PollPathSpaceOutcome::WrotePacket { last_packet_number: pn, - may_coalesce: if coalesce { Some(pad_datagram) } else { None }, + pad_datagram, }, None => { - // If the crypto for the Initial and Handshake spaces is None then those - // spaces are done with forever, no need to log them. - if space_id == SpaceId::Data || self.spaces[space_id].crypto.is_some() { - trace!(?space_id, %path_id, "nothing to send in space"); - } return PollPathSpaceOutcome::NothingToSend { congestion_blocked: true, }; @@ -1372,15 +1393,17 @@ impl Connection { if transmit.datagram_remaining_mut() == 0 { if transmit.num_datagrams() >= transmit.max_datagrams().get() { // No more datagrams allowed. + // Previous iterations of this loop may have built packets already. return match last_packet_number { - Some(pn) => PollPathSpaceOutcome::Send { + Some(pn) => PollPathSpaceOutcome::WrotePacket { last_packet_number: pn, - may_coalesce: None, - }, - // TODO: Could consider a new return value here? - None => PollPathSpaceOutcome::NothingToSend { - congestion_blocked: false, + pad_datagram, }, + None => { + return PollPathSpaceOutcome::NothingToSend { + congestion_blocked: false, + }; + } }; } @@ -1409,9 +1432,7 @@ impl Connection { } trace!(count = transmit.num_datagrams(), "new datagram started"); - // We started a new datagram, initialise as allowing coalescing packets and - // no need for padding the datagram. - coalesce = true; + // We started a new datagram, we decide later if it needs padding. pad_datagram = PadDatagram::No; } @@ -1456,9 +1477,7 @@ impl Connection { congestion_blocked: false, }; }; - last_packet_number = Some(builder.exact_number); - let last_pn = builder.exact_number; - coalesce = coalesce && builder.can_coalesce; + last_packet_number = Some(builder.packet_number); if space_id == SpaceId::Initial && (self.side.is_client() || can_send.other) { // https://www.rfc-editor.org/rfc/rfc9000.html#section-14.1 @@ -1497,6 +1516,7 @@ impl Connection { // Since there only 64 ACK frames there will always be enough space // to encode the ConnectionClose frame too. However we still have the // check here to prevent crashes if something changes. + // TODO(flub): This needs fixing for multipath. debug_assert!( builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND, "ACKs should leave space for ConnectionClose" @@ -1521,11 +1541,12 @@ impl Connection { }; builder.write_frame(close.encoder(max_frame_size), stats); } + let last_pn = builder.packet_number; builder.finish_and_track(now, self, path_id, pad_datagram); if space_id == self.highest_space { // Don't send another close packet. Even with multipath we only send // CONNECTION_CLOSE on a single path since we expect our paths to work. - self.close = false; + self.connection_close_pending = false; } // Send a close frame in every possible space for robustness, per // RFC9000 "Immediate Close during the Handshake". Don't bother trying @@ -1533,10 +1554,15 @@ impl Connection { // TODO(flub): This breaks during the handshake if we can not coalesce // packets due to space reasons: the next space would either fail a // debug_assert checking for enough packet space or produce an invalid - // packet. That is an existing bug however. - return PollPathSpaceOutcome::Send { + // packet. We need to keep track of per-space pending CONNECTION_CLOSE to + // be able to send these across multiple calls to poll_transmit. Then + // check for coalescing space here because initial packets need to be in + // padded datagrams. And also add space checks for CONNECTION_CLOSE in + // space_can_send so it would stop a GSO batch if the datagram is too + // small for another CONNECTION_CLOSE packet. + return PollPathSpaceOutcome::WrotePacket { last_packet_number: last_pn, - may_coalesce: Some(pad_datagram), + pad_datagram, }; } @@ -1618,20 +1644,18 @@ impl Connection { // might be needed because of the packet type, or to fill the GSO segment size. // Are we allowed to coalesce AND is there enough space for another *packet* in - // this datagram AND is there another packet to send in the next space? - // TODO(flub): We should only check this for the Initial and Handshake space. In - // the Data spaces we never coalesce. - if coalesce - && builder + // this datagram AND will we definitely send another packet? + if builder.can_coalesce && path_id == PathId::ZERO && { + let max_packet_size = builder .buf .datagram_remaining_mut() - .saturating_sub(builder.predict_packet_end()) - > MIN_PACKET_SPACE - && self.has_next_send_space(space_id, path_id, builder.buf, close) - { + .saturating_sub(builder.predict_packet_end()); + max_packet_size > MIN_PACKET_SPACE + && self.has_pending_packet(space_id, max_packet_size, connection_close_pending) + } { // We can append/coalesce the next packet into the current // datagram. Finish the current packet without adding extra padding. - tracing::warn!("we think we can coalesce"); + trace!("will coalesce with next packet"); builder.finish_and_track(now, self, path_id, PadDatagram::No); } else { // We need a new datagram for the next packet. Finish the current @@ -1655,10 +1679,10 @@ impl Connection { "GSO truncated by demand for {} padding bytes", builder.buf.datagram_remaining_mut() - builder.predict_packet_end() ); + let last_pn = builder.packet_number; builder.finish_and_track(now, self, path_id, PadDatagram::No); return PollPathSpaceOutcome::Send { last_packet_number: last_pn, - may_coalesce: None, }; } @@ -1746,29 +1770,43 @@ impl Connection { Some((active_cid, probe_size)) } - /// Returns if there is a next packet space on the same path, which has data to send. + /// Returns true if there is a further packet to send on [`PathId::ZERO`]. /// - /// This takes into account the space available to frames in the next datagram. - // TODO(flub): This duplication is not nice. - fn has_next_send_space( + /// In other words this is predicting whether the next call to + /// [`Connection::space_can_send`] issued will return some frames to be sent. Including + /// having to predict which packet number space it will be invoked with. This depends on + /// how both [`Connection::poll_transmit_path`] and + /// [`Connection::poll_transmit_path_space`] behave. + /// + /// This is needed to determine if packet coalescing can happen. Because the last packet + /// in a datagram may need to be padded and thus we must know if another packet will + /// follow or not. + /// + /// The next packet can be either in the same space, or in one of the following spaces + /// on the same path. Because a 0-RTT packet can be coalesced with a 1-RTT packet and + /// both are in the Data(PathId::ZERO) space. Previous spaces are not checked, because + /// packets are built from Initial to Handshake to Data spaces. + fn has_pending_packet( &mut self, - mut current_space_id: SpaceId, - path_id: PathId, - buf: &TransmitBuf<'_>, - close: bool, + current_space_id: SpaceId, + max_packet_size: usize, + connection_close_pending: bool, ) -> bool { - // Number of bytes available for frames if this is a 1-RTT packet. We're guaranteed - // to be able to send an individual frame at least this large in the next 1-RTT - // packet. This could be generalized to support every space, but it's only needed to - // handle large fixed-size frames, which only exist in 1-RTT (application - // datagrams). We don't account for coalesced packets potentially occupying space - // because frames can always spill into the next datagram. - while let Some(space_id) = current_space_id.next() { - let can_send = self.space_can_send(space_id, path_id, buf.segment_size(), close); - if !can_send.is_empty() || (close && self.spaces[space_id].crypto.is_some()) { + let mut space_id = current_space_id; + loop { + let can_send = self.space_can_send( + space_id, + PathId::ZERO, + max_packet_size, + connection_close_pending, + ); + if !can_send.is_empty() { return true; } - current_space_id = space_id; + match space_id.next() { + Some(next_space_id) => space_id = next_space_id, + None => break, + } } false } @@ -1894,13 +1932,14 @@ impl Connection { /// Indicate what types of frames are ready to send for the given space /// /// *packet_size* is the number of bytes available to build the next packet. - /// *close* indicates whether a CONNECTION_CLOSE frame needs to be sent. + /// *connection_close_pending* indicates whether a CONNECTION_CLOSE frame needs to be + /// sent. fn space_can_send( &mut self, space_id: SpaceId, path_id: PathId, packet_size: usize, - close: bool, + connection_close_pending: bool, ) -> SendableFrames { let space = &mut self.spaces[space_id]; let space_has_crypto = space.crypto.is_some(); @@ -1916,15 +1955,20 @@ impl Connection { let mut can_send = space.can_send(path_id, &self.streams); + // Check for 1RTT space. if space_id == SpaceId::Data { - // Check for 1RTT space. let pn = space.for_path(path_id).peek_tx_number(); + // Number of bytes available for frames if this is a 1-RTT packet. We're + // guaranteed to be able to send an individual frame at least this large in the + // next 1-RTT packet. This could be generalized to support every space, but it's + // only needed to handle large fixed-size frames, which only exist in 1-RTT + // (application datagrams). let frame_space_1rtt = packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id)); can_send |= self.can_send_1rtt(path_id, frame_space_1rtt); } - can_send.close = close && space_has_crypto; + can_send.close = connection_close_pending && space_has_crypto; can_send } @@ -2254,7 +2298,7 @@ impl Connection { if !was_closed { self.close_common(); self.set_close_timer(now); - self.close = true; + self.connection_close_pending = true; self.state.move_to_closed_local(reason); } } @@ -3315,7 +3359,7 @@ impl Connection { )); self.close_common(); self.set_close_timer(now); - self.close = true; + self.connection_close_pending = true; } return; } @@ -3962,7 +4006,7 @@ impl Connection { .get(&path_id) .map(|p| p.data.network_path) .unwrap_or(network_path); - self.close = network_path == path_remote; + self.connection_close_pending = network_path == path_remote; } } @@ -5035,7 +5079,7 @@ impl Connection { if let Some(reason) = close { self.state.move_to_draining(Some(reason.into())); - self.close = true; + self.connection_close_pending = true; } if Some(number) == self.spaces[SpaceId::Data].for_path(path_id).rx_packet @@ -5229,7 +5273,7 @@ impl Connection { path_exclusive_only: bool, builder: &mut PacketBuilder<'a, 'b>, ) { - let pn = builder.exact_number; + let pn = builder.packet_number; let is_multipath_negotiated = self.is_multipath_negotiated(); let stats = &mut self.stats.frame_tx; let space = &mut self.spaces[space_id]; diff --git a/quinn-proto/src/connection/packet_builder.rs b/quinn-proto/src/connection/packet_builder.rs index 80f0d990d..a22792b75 100644 --- a/quinn-proto/src/connection/packet_builder.rs +++ b/quinn-proto/src/connection/packet_builder.rs @@ -25,7 +25,7 @@ pub(super) struct PacketBuilder<'a, 'b> { path: PathId, pub(super) partial_encode: PartialEncode, pub(super) ack_eliciting: bool, - pub(super) exact_number: u64, + pub(super) packet_number: u64, /// Is this packet allowed to be coalesced? pub(super) can_coalesce: bool, /// Smallest absolute position in the associated buffer that must be occupied by this packet's @@ -93,11 +93,11 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { } let space = &mut conn.spaces[space_id]; - let exact_number = space.for_path(path_id).get_tx_number(&mut conn.rng); - let span = trace_span!("send", space = ?space_id, pn = exact_number, %path_id).entered(); + let packet_number = space.for_path(path_id).get_tx_number(&mut conn.rng); + let span = trace_span!("send", space = ?space_id, pn = packet_number, %path_id).entered(); let number = PacketNumber::new( - exact_number, + packet_number, space.for_path(path_id).largest_acked_packet.unwrap_or(0), ); let header = match space_id { @@ -171,7 +171,7 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { qlog.header( &header, - Some(exact_number), + Some(packet_number), space_id, space_id == SpaceId::Data && conn.spaces[SpaceId::Data].crypto.is_none(), path_id, @@ -182,7 +182,7 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { space: space_id, path: path_id, partial_encode, - exact_number, + packet_number, can_coalesce: header.can_coalesce(), min_size, tag_len, @@ -201,7 +201,7 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { path: PathId::ZERO, partial_encode: PartialEncode::no_header(), ack_eliciting: true, - exact_number: 0, + packet_number: 0, can_coalesce: true, min_size: 0, tag_len: 0, @@ -291,7 +291,7 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { PadDatagram::ToMinMtu => self.pad_to(MIN_INITIAL_SIZE), } let ack_eliciting = self.ack_eliciting; - let exact_number = self.exact_number; + let exact_number = self.packet_number; let space_id = self.space; let (size, padded, sent) = self.finish(conn, now); @@ -372,7 +372,7 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { self.partial_encode.finish( packet_buf, header_crypto, - Some((self.exact_number, self.path, packet_crypto)), + Some((self.packet_number, self.path, packet_crypto)), ); let packet_len = self.buf.len() - encode_start; From 07b5423a85acc40c04d5af16dde7cd39ec143c1c Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Sun, 18 Jan 2026 16:15:30 +0100 Subject: [PATCH 13/24] tweak comments, logs --- quinn-proto/src/connection/mod.rs | 34 +++++++++++++++---------------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index b63fdb494..ce05f6824 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -322,24 +322,23 @@ pub struct Connection { qlog: QlogSink, } -/// Return value for `poll_transmit_path`. +/// Return value for [`Connection::poll_transmit_path`]. #[derive(Debug)] enum PollPathStatus { - /// We have something to send, and data has been accumulated - /// on the passed in `transmit`. - Send, - /// A transmit is ready to be sent out. - SendTransmit { - /// The transmit to send. - transmit: Transmit, - }, - /// Nothing to send currently. + /// Nothing to send on the path, nothing was written into the [`TransmitBuf`]. NothingToSend { - /// Set to `true` if we consider the current inability to send something because of congestion control + /// If true there was data to send but congestion control did not allow so. congestion_blocked: bool, }, + /// One or more packets have been written into the [`TransmitBuf`] and should be sent. + Send, + /// Send the contents of the transmit without using the [`TransmitBuf`]. + /// + /// Indicates off-path data being sent. + SendTransmit { transmit: Transmit }, } +/// Return value for [`Connection::poll_transmit_path_space`]. #[derive(Debug)] enum PollPathSpaceOutcome { /// Nothing to send in the space, nothing was written into the [`TransmitBuf`]. @@ -1214,7 +1213,7 @@ impl Connection { if path_id != PathId::ZERO && space_id != SpaceId::Data { continue; } - let res = self.poll_transmit_path_space( + match self.poll_transmit_path_space( now, transmit, path_id, @@ -1223,9 +1222,7 @@ impl Connection { have_available_path, close, pad_datagram, - ); - trace!(%path_id, ?space_id, ?res, "poll_transmit_space"); - match res { + ) { PollPathSpaceOutcome::NothingToSend { congestion_blocked: cb, } => { @@ -1354,9 +1351,10 @@ impl Connection { pad_datagram, }, None => { - // If the crypto for the Initial and Handshake spaces is None then those - // spaces are done with forever, no need to log them. - if space_id == SpaceId::Data || self.spaces[space_id].crypto.is_some() { + // Only log for spaces which have crypto. + if self.spaces[space_id].crypto.is_some() + || (space_id == SpaceId::Data && self.zero_rtt_crypto.is_some()) + { trace!(?space_id, %path_id, "nothing to send in space"); } return PollPathSpaceOutcome::NothingToSend { From f7301a8764a12ae5c7c8cf219b2a6d34147b88c4 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Sun, 18 Jan 2026 16:48:57 +0100 Subject: [PATCH 14/24] remove redundant size-setting --- quinn-proto/src/connection/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index ce05f6824..f97832e7d 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -1109,8 +1109,6 @@ impl Connection { if self.state.is_established() { // Try MTU probing now for path_id in path_ids { - // Update per path state - transmit.set_segment_size(self.path_data(path_id).current_mtu().into()); self.poll_transmit_mtu_probe(now, &mut transmit, path_id); if !transmit.is_empty() { let transmit = self.build_transmit(path_id, transmit); From 33b9d34acbfa3140f089a1ed8c919bb52bf77025 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Sun, 18 Jan 2026 17:04:27 +0100 Subject: [PATCH 15/24] fix path scheduling description, add bug description --- quinn-proto/src/connection/mod.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index f97832e7d..59f087272 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -1029,17 +1029,22 @@ impl Connection { && self.peer_supports_ack_frequency(); } + // TODO(flub): path scheduling logic might be buggy if there are only un-validated + // paths and PATH_STATUS_BACKUP paths. + // Path scheduling logic is currently as such: // // - For any un-validated paths we only send frames that *must* be sent on that // path. E.g. PATH_CHALLENGE, PATH_RESPONSE. // - // - If there are any paths with PathStatus::Available we only send any frames that - // can be sent on any path, e.g. STREAM, DATAGRAM, on these available paths. + // - If there are any validated paths with CIDs and PathStatus::Available: + // - Frames that can be sent on any path, e.g. STREAM, DATAGRAM, are only sent on + // these available paths. + // - All other paths only send frames that *must* be sent on those paths, + // e.g. PATH_CHALLENGE, PATH_RESPONSE, tail-loss probes, keep alive PING. // - // - If there are any paths with PathStatus::Available we only send frames that - // *must* be sent on a specific path to any other paths, e.g. a tail-loss probe for - // that path, PMTU probe, a keep-alive PING. + // - If there are no validated paths with CIDs and PathStatus::Available all frames + // are sent on the earlierst possible path. // // For all this we use the *path_exclusive_only* boolean: If set to true, only // frames that must be sent on the path will be built into the packet. From 6d461ef36fa45ae09cb79753ce2f700e3131728b Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Sun, 18 Jan 2026 17:14:08 +0100 Subject: [PATCH 16/24] fixup path stats --- quinn-proto/src/connection/mod.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 59f087272..4ce80a8db 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -1146,7 +1146,11 @@ impl Connection { self.stats .udp_tx .on_sent(transmit.num_datagrams() as u64, transmit.len()); - // TODO(flub): check https://github.com/n0-computer/quinn/pull/332 changes + self.path_stats + .entry(path_id) + .or_default() + .udp_tx + .on_sent(transmit.num_datagrams() as u64, transmit.len()); Transmit { destination: network_path.remote, From b12e786e96eb0d5b32df972572c78b180ae9c65b Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Sun, 18 Jan 2026 17:38:36 +0100 Subject: [PATCH 17/24] wording, wording --- quinn-proto/src/connection/mod.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 4ce80a8db..02e2ecdfc 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -352,7 +352,7 @@ enum PollPathSpaceOutcome { last_packet_number: u64, /// Whether to pad an already started datagram in the next packet. /// - /// When packets in Initial, 0-RTT or Handhake packet do not fill the entire + /// When packets in Initial, 0-RTT or Handshake packet do not fill the entire /// datagram they may decide to coalesce with the next packet from a higher /// encryption level on the same path. But the earlier packet may require specific /// size requirements for the datagram they are sent in. @@ -1246,8 +1246,8 @@ impl Connection { pad_datagram = pad; // Always check higher spaces. If the transmit is full or they have // nothing to send they will not write packets. But if they can, they - // should always be allowed to add to this transmit because it is all - // sent to the same remote. + // must always be allowed to add to this transmit because coalescing may + // be required. continue; } PollPathSpaceOutcome::Send { @@ -1750,12 +1750,14 @@ impl Connection { .sent_plpmtud_probes += 1; } + /// Returns the CID and probe size if a DPLPMTUD probe is needed. + /// + /// We MTU probe all paths for which all of the following is true: + /// - We have an active destination CID for the path. + /// - The remote address *and* path are validated. + /// - The path is not abandoned. + /// - The MTU Discovery subsystem wants to probe the path. fn get_mtu_probe_data(&mut self, now: Instant, path_id: PathId) -> Option<(ConnectionId, u16)> { - // We MTU probe all paths for which all of the following is true: - // - We have an active destination CID for the path. - // - The remote address *and* path are validated. - // - The path is not abandoned. - // - The MTU Discovery subsystem wants to probe the path. let active_cid = self.rem_cids.get(&path_id).map(CidQueue::active)?; let is_eligible = self.path_data(path_id).validated && !self.path_data(path_id).is_validating_path() From ff91de9c3029db16ce6f1e3f5f45260442b4e29d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diva=20Mart=C3=ADnez?= Date: Mon, 19 Jan 2026 13:43:03 -0500 Subject: [PATCH 18/24] weird off path enum variants are no longer needed! --- quinn-proto/src/connection/mod.rs | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 34882f8bc..4ddaf3d40 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -335,10 +335,6 @@ enum PollPathStatus { }, /// One or more packets have been written into the [`TransmitBuf`] and should be sent. Send, - /// Send the contents of the transmit without using the [`TransmitBuf`]. - /// - /// Indicates off-path data being sent. - SendTransmit { transmit: Transmit }, } /// Return value for [`Connection::poll_transmit_path_space`]. @@ -378,8 +374,6 @@ enum PollPathSpaceOutcome { /// The highest packet number written into the transmit. last_packet_number: u64, }, - /// Send a transmit directly, usually a hack to send off-path datagrams. - SendTransmit { transmit: Transmit }, } impl Connection { @@ -1093,9 +1087,6 @@ impl Connection { // Poll for on-path transmits. match self.poll_transmit_path(now, &mut transmit, path_id, have_available_path, close) { - PollPathStatus::SendTransmit { transmit } => { - return Some(transmit); - } PollPathStatus::Send => { let transmit = self.build_transmit(path_id, transmit); return Some(transmit); @@ -1268,9 +1259,6 @@ impl Connection { last_packet_number = Some(pn); break; } - PollPathSpaceOutcome::SendTransmit { transmit } => { - return PollPathStatus::SendTransmit { transmit }; - } } } From 08751b8ea7c96c36fa3264d12e8bcfbab085e36f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diva=20Mart=C3=ADnez?= Date: Mon, 19 Jan 2026 13:44:17 -0500 Subject: [PATCH 19/24] fix fmt --- quinn-proto/src/connection/mod.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 4ddaf3d40..df81be915 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -1078,11 +1078,10 @@ impl Connection { } else if let Some(response) = self.send_off_path_path_response(now, buf, path_id) { return Some(response); } - { - // Set the segment size to this path's MTU for on-path data. - let pmtu = self.path_data(path_id).current_mtu().into(); - TransmitBuf::new(buf, max_datagrams, pmtu) - } + + // Set the segment size to this path's MTU for on-path data. + let pmtu = self.path_data(path_id).current_mtu().into(); + TransmitBuf::new(buf, max_datagrams, pmtu) }; // Poll for on-path transmits. From ae89bc51d1d9faf2861d10bd98f049eb5396a2e3 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Tue, 20 Jan 2026 13:26:29 +0100 Subject: [PATCH 20/24] Make TransmitBuf in each loop and collate off-path work This make the construction of the TransmitBuf a little less weird, but still a bit weird. --- quinn-proto/src/connection/mod.rs | 65 ++++++++++++---------- quinn-proto/src/connection/transmit_buf.rs | 12 ---- 2 files changed, 37 insertions(+), 40 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index df81be915..aa04d587f 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -1058,34 +1058,27 @@ impl Connection { // borrowing. Maybe SmallVec or similar. let path_ids: Vec<_> = self.paths.keys().copied().collect(); - let mut transmit = TransmitBuf::new( - buf, - max_datagrams, - self.path_data(path_ids[0]).current_mtu().into(), - ); - // If we end up not sending anything, we need to know if that was because there was // nothing to send or because we were congestion blocked. let mut congestion_blocked = false; for &path_id in &path_ids { - transmit = { - let (buf, _) = transmit.finish(); - - // Poll off-path transmits first. - if let Some(challenge) = self.send_prev_path_challenge(now, buf, path_id) { - return Some(challenge); - } else if let Some(response) = self.send_off_path_path_response(now, buf, path_id) { - return Some(response); - } + if let Some(transmit) = self.poll_transmit_off_path(now, buf, path_id) { + return Some(transmit); + } - // Set the segment size to this path's MTU for on-path data. - let pmtu = self.path_data(path_id).current_mtu().into(); - TransmitBuf::new(buf, max_datagrams, pmtu) - }; + // Set the segment size to this path's MTU for on-path data. + let pmtu = self.path_data(path_id).current_mtu().into(); + let mut transmit = TransmitBuf::new(buf, max_datagrams, pmtu); // Poll for on-path transmits. - match self.poll_transmit_path(now, &mut transmit, path_id, have_available_path, close) { + match self.poll_transmit_on_path( + now, + &mut transmit, + path_id, + have_available_path, + close, + ) { PollPathStatus::Send => { let transmit = self.build_transmit(path_id, transmit); return Some(transmit); @@ -1106,8 +1099,8 @@ impl Connection { // We didn't produce any application data packet debug_assert!( - transmit.is_empty(), - "there was data in the transmit, but it was not sent" + buf.is_empty(), + "there was data in the buffer, but it was not sent" ); self.app_limited = !congestion_blocked; @@ -1115,6 +1108,9 @@ impl Connection { if self.state.is_established() { // Try MTU probing now for path_id in path_ids { + // The MTU parameter here is really a dummy one, poll_transmit_mut_probe + // explicitly sets the size. + let mut transmit = TransmitBuf::new(buf, max_datagrams, MIN_INITIAL_SIZE.into()); self.poll_transmit_mtu_probe(now, &mut transmit, path_id); if !transmit.is_empty() { let transmit = self.build_transmit(path_id, transmit); @@ -1169,17 +1165,30 @@ impl Connection { } } - /// poll_transmit logic for a specific [`PathState::data`]. + /// poll_transmit logic for off-path data. + fn poll_transmit_off_path( + &mut self, + now: Instant, + buf: &mut Vec, + path_id: PathId, + ) -> Option { + if let Some(challenge) = self.send_prev_path_challenge(now, buf, path_id) { + return Some(challenge); + } + if let Some(response) = self.send_off_path_path_response(now, buf, path_id) { + return Some(response); + } + None + } + + /// poll_transmit logic for on-path data. /// /// This is not quite the same as for a multipath packet space, since [`PathId::ZERO`] /// has 3 packet spaces, which this handles. /// - /// This does not handle sending for [`PathState::prev`] which is handled by - /// [`Self::send_prev_path_challenge`] called directly from - /// [`Self::poll_transmit`]. Eventually we'd like this to only produce on-path - /// transmits. + /// See [`Self::poll_transmit_off_path`] for off-path data. #[must_use] - fn poll_transmit_path( + fn poll_transmit_on_path( &mut self, now: Instant, transmit: &mut TransmitBuf<'_>, diff --git a/quinn-proto/src/connection/transmit_buf.rs b/quinn-proto/src/connection/transmit_buf.rs index 9908fe3a5..0e63cfdbc 100644 --- a/quinn-proto/src/connection/transmit_buf.rs +++ b/quinn-proto/src/connection/transmit_buf.rs @@ -213,18 +213,6 @@ impl<'a> TransmitBuf<'a> { pub(super) fn as_mut_slice(&mut self) -> &mut [u8] { self.buf.as_mut_slice() } - - /// Returns the underlying buffer and the GSO segment size, if any. - /// - /// Note that the GSO segment size is only defined if there is more than one segment. - pub(super) fn finish(self) -> (&'a mut Vec, Option) { - let gso_segment_size = if self.num_datagrams() > 1 { - Some(self.segment_size) - } else { - None - }; - (self.buf, gso_segment_size) - } } unsafe impl BufMut for TransmitBuf<'_> { From 99c1f8cc873c36bf3bb983caf9188b522a148622 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Tue, 20 Jan 2026 13:40:12 +0100 Subject: [PATCH 21/24] Consistent naming for now --- quinn-proto/src/connection/mod.rs | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index aa04d587f..c63d66d9b 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -339,7 +339,7 @@ enum PollPathStatus { /// Return value for [`Connection::poll_transmit_path_space`]. #[derive(Debug)] -enum PollPathSpaceOutcome { +enum PollPathSpaceStatus { /// Nothing to send in the space, nothing was written into the [`TransmitBuf`]. NothingToSend { /// If true there was data to send but congestion control did not allow so. @@ -1240,14 +1240,14 @@ impl Connection { close, pad_datagram, ) { - PollPathSpaceOutcome::NothingToSend { + PollPathSpaceStatus::NothingToSend { congestion_blocked: cb, } => { congestion_blocked |= cb; // Continue checking other spaces, tail-loss probes may need to be sent // in all spaces. } - PollPathSpaceOutcome::WrotePacket { + PollPathSpaceStatus::WrotePacket { last_packet_number: pn, pad_datagram: pad, } => { @@ -1260,7 +1260,7 @@ impl Connection { // be required. continue; } - PollPathSpaceOutcome::Send { + PollPathSpaceStatus::Send { last_packet_number: pn, } => { debug_assert!(!transmit.is_empty(), "transmit must contain packets"); @@ -1309,7 +1309,7 @@ impl Connection { connection_close_pending: bool, // Whether the current datagram needs to be padded to a certain size. mut pad_datagram: PadDatagram, - ) -> PollPathSpaceOutcome { + ) -> PollPathSpaceStatus { // Keep track of the last packet number we wrote. If None we did not write any // packets. let mut last_packet_number = None; @@ -1360,7 +1360,7 @@ impl Connection { // Nothing more to send. Previous iterations of this loop may have built // packets already. return match last_packet_number { - Some(pn) => PollPathSpaceOutcome::WrotePacket { + Some(pn) => PollPathSpaceStatus::WrotePacket { last_packet_number: pn, pad_datagram, }, @@ -1371,7 +1371,7 @@ impl Connection { { trace!(?space_id, %path_id, "nothing to send in space"); } - return PollPathSpaceOutcome::NothingToSend { + return PollPathSpaceStatus::NothingToSend { congestion_blocked: false, }; } @@ -1387,12 +1387,12 @@ impl Connection { if congestion_blocked != PathBlocked::No { // Previous iterations of this loop may have built packets already. return match last_packet_number { - Some(pn) => PollPathSpaceOutcome::WrotePacket { + Some(pn) => PollPathSpaceStatus::WrotePacket { last_packet_number: pn, pad_datagram, }, None => { - return PollPathSpaceOutcome::NothingToSend { + return PollPathSpaceStatus::NothingToSend { congestion_blocked: true, }; } @@ -1407,12 +1407,12 @@ impl Connection { // No more datagrams allowed. // Previous iterations of this loop may have built packets already. return match last_packet_number { - Some(pn) => PollPathSpaceOutcome::WrotePacket { + Some(pn) => PollPathSpaceStatus::WrotePacket { last_packet_number: pn, pad_datagram, }, None => { - return PollPathSpaceOutcome::NothingToSend { + return PollPathSpaceStatus::NothingToSend { congestion_blocked: false, }; } @@ -1485,7 +1485,7 @@ impl Connection { // get called again for another space we will see an already started // datagram and try and start another packet here. Then be stopped by the // same confidentiality limit. - return PollPathSpaceOutcome::NothingToSend { + return PollPathSpaceStatus::NothingToSend { congestion_blocked: false, }; }; @@ -1572,7 +1572,7 @@ impl Connection { // padded datagrams. And also add space checks for CONNECTION_CLOSE in // space_can_send so it would stop a GSO batch if the datagram is too // small for another CONNECTION_CLOSE packet. - return PollPathSpaceOutcome::WrotePacket { + return PollPathSpaceStatus::WrotePacket { last_packet_number: last_pn, pad_datagram, }; @@ -1658,7 +1658,7 @@ impl Connection { ); let last_pn = builder.packet_number; builder.finish_and_track(now, self, path_id, PadDatagram::No); - return PollPathSpaceOutcome::Send { + return PollPathSpaceStatus::Send { last_packet_number: last_pn, }; } From b8238c46cd46b0c893a5500539233b52ab491851 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Tue, 20 Jan 2026 13:42:16 +0100 Subject: [PATCH 22/24] missing rename --- quinn-proto/src/connection/packet_builder.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/quinn-proto/src/connection/packet_builder.rs b/quinn-proto/src/connection/packet_builder.rs index a22792b75..3c0228523 100644 --- a/quinn-proto/src/connection/packet_builder.rs +++ b/quinn-proto/src/connection/packet_builder.rs @@ -291,7 +291,7 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { PadDatagram::ToMinMtu => self.pad_to(MIN_INITIAL_SIZE), } let ack_eliciting = self.ack_eliciting; - let exact_number = self.packet_number; + let packet_number = self.packet_number; let space_id = self.space; let (size, padded, sent) = self.finish(conn, now); @@ -311,7 +311,7 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { }; conn.paths.get_mut(&path_id).unwrap().data.sent( - exact_number, + packet_number, packet, conn.spaces[space_id].for_path(path_id), ); From 6798a525f98602c2eeac86c5ca7d15220dcef48d Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Tue, 20 Jan 2026 19:57:19 +0100 Subject: [PATCH 23/24] refactor: Introduce explicit TransmitBuf::first_datagram Having to initialise an TransmitBuf with the MTU makes no sense in the multipath world. Because the MTU may change depending on which path the buffer is populated for. By passing in the MTU when creating the first datagram, this becomes a lot clearer. Additionally it makes starting datagrams for MTU probes or off-path packets more straightforward. Finally it makes it obvious where the MTU size otherwise sneaks into. --- quinn-proto/src/connection/mod.rs | 94 ++++++----- quinn-proto/src/connection/packet_builder.rs | 20 +-- quinn-proto/src/connection/streams/state.rs | 4 +- quinn-proto/src/connection/transmit_buf.rs | 161 +++++++++---------- quinn-proto/src/tests/mod.rs | 1 + quinn-proto/src/tests/util.rs | 1 + 6 files changed, 139 insertions(+), 142 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index cdfbd5878..6bca6330b 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -1068,9 +1068,9 @@ impl Connection { return Some(transmit); } - // Set the segment size to this path's MTU for on-path data. - let pmtu = self.path_data(path_id).current_mtu().into(); - let mut transmit = TransmitBuf::new(buf, max_datagrams, pmtu); + // // Set the segment size to this path's MTU for on-path data. + // let pmtu = self.path_data(path_id).current_mtu().into(); + let mut transmit = TransmitBuf::new(buf, max_datagrams); // Poll for on-path transmits. match self.poll_transmit_on_path( @@ -1111,7 +1111,7 @@ impl Connection { for path_id in path_ids { // The MTU parameter here is really a dummy one, poll_transmit_mut_probe // explicitly sets the size. - let mut transmit = TransmitBuf::new(buf, max_datagrams, MIN_INITIAL_SIZE.into()); + let mut transmit = TransmitBuf::new(buf, max_datagrams); self.poll_transmit_mtu_probe(now, &mut transmit, path_id); if !transmit.is_empty() { let transmit = self.build_transmit(path_id, transmit); @@ -1130,9 +1130,13 @@ impl Connection { ); let network_path = self.path_data(path_id).network_path; + let last_datagram_len = transmit + .segment_size() + .map(|segment_size| transmit.len() % segment_size) + .unwrap_or_else(|| transmit.len()); trace!( segment_size = transmit.segment_size(), - last_datagram_len = transmit.len() % transmit.segment_size(), + last_datagram_len, %network_path, "sending {} bytes in {} datagrams", transmit.len(), @@ -1158,10 +1162,7 @@ impl Connection { } else { None }, - segment_size: match transmit.num_datagrams() { - 1 => None, - _ => Some(transmit.segment_size()), - }, + segment_size: transmit.segment_size(), src_ip: network_path.local_ip, } } @@ -1336,9 +1337,15 @@ impl Connection { // A datagram is started already, we are coalescing another packet into it. transmit.datagram_remaining_mut() } else { - // A new datagram needs to be started. - transmit.segment_size() + // A new datagram needs to be started. The first datagram is sized to the + // MTU. All other datagrams can be maximum the size of the previous + // datagram. + match transmit.num_datagrams { + 0 => self.path_data(path_id).current_mtu().into(), + _ => transmit.max_datagram_size(), + } }; + tracing::warn!(max_packet_size, "XXXXXXXXXXXXXX"); let can_send = self.space_can_send(space_id, path_id, max_packet_size, connection_close_pending); @@ -1420,30 +1427,29 @@ impl Connection { }; } - match self.spaces[space_id].for_path(path_id).loss_probes { - 0 => transmit.start_new_datagram(), - _ => { - // We need something to send for a tail-loss probe. - let request_immediate_ack = - space_id == SpaceId::Data && self.peer_supports_ack_frequency(); - self.spaces[space_id].maybe_queue_probe( - path_id, - request_immediate_ack, - &self.streams, - ); + if transmit.num_datagrams() == 0 { + let mtu = match self.spaces[space_id].for_path(path_id).loss_probes { + 0 => self.path_data(path_id).current_mtu(), + _ => INITIAL_MTU, + }; + transmit.start_first_datagram(mtu.into()); + } else { + transmit.start_datagram(); + } + trace!(count = transmit.num_datagrams(), "new datagram started"); - self.spaces[space_id].for_path(path_id).loss_probes -= 1; + // We need something to send for a tail-loss probe. + if self.spaces[space_id].for_path(path_id).loss_probes > 0 { + let request_immediate_ack = + space_id == SpaceId::Data && self.peer_supports_ack_frequency(); + self.spaces[space_id].maybe_queue_probe( + path_id, + request_immediate_ack, + &self.streams, + ); - // Clamp the datagram to at most the minimum MTU to ensure that loss - // probes can get through and enable recovery even if the path MTU - // has shrank unexpectedly. - transmit.start_new_datagram_with_size(std::cmp::min( - usize::from(INITIAL_MTU), - transmit.segment_size(), - )); - } + self.spaces[space_id].for_path(path_id).loss_probes -= 1; } - trace!(count = transmit.num_datagrams(), "new datagram started"); // We started a new datagram, we decide later if it needs padding. pad_datagram = PadDatagram::No; @@ -1497,7 +1503,7 @@ impl Connection { pad_datagram |= PadDatagram::ToMinMtu; } if space_id == SpaceId::Data && self.config.pad_to_mtu { - pad_datagram |= PadDatagram::ToSegmentSize; + pad_datagram |= PadDatagram::ToMaxSize; } if can_send.close { @@ -1596,7 +1602,7 @@ impl Connection { !(builder.sent_frames().is_ack_only(&self.streams) && !can_send.acks && can_send.other - && builder.buf.segment_size() + && builder.buf.max_datagram_size() == self.path_data(path_id).current_mtu() as usize && self.datagrams.outgoing.is_empty()), "SendableFrames was {can_send:?}, but only ACKs have been written" @@ -1666,7 +1672,7 @@ impl Connection { // Pad the current datagram to GSO segment size so it can be // included in the GSO batch. - builder.finish_and_track(now, self, path_id, PadDatagram::ToSegmentSize); + builder.finish_and_track(now, self, path_id, PadDatagram::ToMaxSize); } else { builder.finish_and_track(now, self, path_id, pad_datagram); } @@ -1674,7 +1680,7 @@ impl Connection { // If this is the first datagram we set the segment size to the size of the // first datagram. if transmit.num_datagrams() == 1 { - transmit.clip_segment_size(); + transmit.end_first_datagram(); } } } @@ -1691,8 +1697,7 @@ impl Connection { }; // We are definitely sending a DPLPMTUD probe. - debug_assert_eq!(transmit.num_datagrams(), 0); - transmit.start_new_datagram_with_size(probe_size as usize); + transmit.start_first_datagram(probe_size.into()); let Some(mut builder) = PacketBuilder::new( now, @@ -1816,7 +1821,10 @@ impl Connection { // Congestion control check. // Tail loss probes must not be blocked by congestion, or a deadlock could arise. - let bytes_to_send = transmit.segment_size() as u64; + let bytes_to_send = match transmit.num_datagrams { + 0 => self.path_data(path_id).current_mtu().into(), + _ => transmit.max_datagram_size(), + } as u64; let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0; if can_send.other && !need_loss_probe && !can_send.close { @@ -1872,8 +1880,8 @@ impl Connection { SpaceId::Data, "PATH_CHALLENGE queued without 1-RTT keys" ); - let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into()); - buf.start_new_datagram(); + let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN); + buf.start_first_datagram(MIN_INITIAL_SIZE.into()); // Use the previous CID to avoid linking the new path with the previous path. We // don't bother accounting for possible retirement of that prev_cid because this is @@ -1927,8 +1935,8 @@ impl Connection { let frame = frame::PathResponse(token); - let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into()); - buf.start_new_datagram(); + let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN); + buf.start_first_datagram(MIN_INITIAL_SIZE.into()); let mut builder = PacketBuilder::new(now, SpaceId::Data, path_id, cid, buf, false, self)?; let stats = &mut self.stats.frame_tx; diff --git a/quinn-proto/src/connection/packet_builder.rs b/quinn-proto/src/connection/packet_builder.rs index 3c0228523..f3f766ddb 100644 --- a/quinn-proto/src/connection/packet_builder.rs +++ b/quinn-proto/src/connection/packet_builder.rs @@ -287,7 +287,7 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { match pad_datagram { PadDatagram::No => (), PadDatagram::ToSize(size) => self.pad_to(size), - PadDatagram::ToSegmentSize => self.pad_to(self.buf.segment_size() as u16), + PadDatagram::ToMaxSize => self.pad_to(self.buf.max_datagram_size() as u16), PadDatagram::ToMinMtu => self.pad_to(MIN_INITIAL_SIZE), } let ack_eliciting = self.ack_eliciting; @@ -414,11 +414,11 @@ pub(super) enum PadDatagram { No, /// To a specific size ToSize(u16), - /// Pad to the current MTU/segment size + /// Pad to the maximum allowed size for this datagram. /// - /// For the first datagram in a transmit the MTU is the same as the - /// [`TransmitBuf::segment_size`]. - ToSegmentSize, + /// Usually this will be the path MTU as initialised by + /// [`TransmitBuf::start_first_datagram`]. + ToMaxSize, /// Pad to [`MIN_INITIAL_SIZE`], the minimal QUIC MTU of 1200 bytes ToMinMtu, } @@ -437,12 +437,12 @@ impl std::ops::BitOr for PadDatagram { (Self::No, rhs) => rhs, (Self::ToSize(size), Self::No) => Self::ToSize(size), (Self::ToSize(a), Self::ToSize(b)) => Self::ToSize(a.max(b)), - (Self::ToSize(_), Self::ToSegmentSize) => Self::ToSegmentSize, + (Self::ToSize(_), Self::ToMaxSize) => Self::ToMaxSize, (Self::ToSize(_), Self::ToMinMtu) => Self::ToMinMtu, - (Self::ToSegmentSize, Self::No) => Self::ToSegmentSize, - (Self::ToSegmentSize, Self::ToSize(_)) => Self::ToSegmentSize, - (Self::ToSegmentSize, Self::ToSegmentSize) => Self::ToSegmentSize, - (Self::ToSegmentSize, Self::ToMinMtu) => Self::ToMinMtu, + (Self::ToMaxSize, Self::No) => Self::ToMaxSize, + (Self::ToMaxSize, Self::ToSize(_)) => Self::ToMaxSize, + (Self::ToMaxSize, Self::ToMaxSize) => Self::ToMaxSize, + (Self::ToMaxSize, Self::ToMinMtu) => Self::ToMinMtu, (Self::ToMinMtu, _) => Self::ToMinMtu, } } diff --git a/quinn-proto/src/connection/streams/state.rs b/quinn-proto/src/connection/streams/state.rs index 5ed48991a..bfb5d59fb 100644 --- a/quinn-proto/src/connection/streams/state.rs +++ b/quinn-proto/src/connection/streams/state.rs @@ -565,8 +565,8 @@ impl StreamsState { #[cfg(test)] fn write_frames_for_test(&mut self, capacity: usize, fair: bool) -> frame::StreamMetaVec { let buf = &mut Vec::with_capacity(capacity); - let mut tbuf = crate::connection::TransmitBuf::new(buf, std::num::NonZeroUsize::MIN, 1_200); - tbuf.start_new_datagram_with_size(capacity); + let mut tbuf = crate::connection::TransmitBuf::new(buf, std::num::NonZeroUsize::MIN); + tbuf.start_first_datagram(capacity); let builder = &mut PacketBuilder::simple_data_buf(&mut tbuf); let stats = &mut FrameStats::default(); self.write_stream_frames(builder, fair, stats); diff --git a/quinn-proto/src/connection/transmit_buf.rs b/quinn-proto/src/connection/transmit_buf.rs index 0e63cfdbc..18fba768e 100644 --- a/quinn-proto/src/connection/transmit_buf.rs +++ b/quinn-proto/src/connection/transmit_buf.rs @@ -35,138 +35,125 @@ pub(super) struct TransmitBuf<'a> { /// /// Note that when coalescing packets this might be before the start of the current /// packet. - datagram_start: usize, + datagram_start_offset: usize, /// The maximum offset allowed to be used for the current datagram in the buffer /// /// The first and last datagram in a batch are allowed to be smaller then the maximum /// size. All datagrams in between need to be exactly this size. - buf_capacity: usize, + datagram_max_offset: usize, /// The maximum number of datagrams allowed to write into [`TransmitBuf::buf`] max_datagrams: NonZeroUsize, /// The number of datagrams already (partially) written into the buffer /// /// Incremented by a call to [`TransmitBuf::start_new_datagram`]. pub(super) num_datagrams: usize, - /// The segment size of this GSO batch + /// The segment size of this GSO batch, set once the second datagram is started. /// /// The segment size is the size of each datagram in the GSO batch, only the last /// datagram in the batch may be smaller. /// - /// For the first datagram this is set to the maximum size a datagram is allowed to be: - /// the current path MTU. After the first datagram is finished this is reduced to the - /// size of the first datagram and can no longer change. - segment_size: usize, + /// Only set once there is more than one datagram. + segment_size: Option, } impl<'a> TransmitBuf<'a> { - pub(super) fn new(buf: &'a mut Vec, max_datagrams: NonZeroUsize, mtu: usize) -> Self { + pub(super) fn new(buf: &'a mut Vec, max_datagrams: NonZeroUsize) -> Self { buf.clear(); Self { buf, - datagram_start: 0, - buf_capacity: 0, + datagram_start_offset: 0, + datagram_max_offset: 0, max_datagrams, num_datagrams: 0, - segment_size: mtu, + segment_size: None, } } - /// Starts a datagram with a custom datagram size + /// Starts the first datagram in the GSO batch. /// - /// This is a specialized version of [`TransmitBuf::start_new_datagram`] which sets the - /// datagram size. Useful for e.g. PATH_CHALLENGE, tail-loss probes or MTU probes. - /// - /// After the first datagram you can never increase the segment size. If you decrease - /// the size of a datagram in a batch, it must be the last datagram of the batch. - pub(super) fn start_new_datagram_with_size(&mut self, datagram_size: usize) { - // Only reserve space for this datagram, usually it is the last one in the batch. - let max_capacity_hint = datagram_size; - self.new_datagram_inner(datagram_size, max_capacity_hint) + /// The size of the first datagram sets the segment size of the GSO batch. + pub(super) fn start_first_datagram(&mut self, max_size: usize) { + debug_assert_eq!(self.num_datagrams, 0, "No datagram can be stared yet"); + debug_assert!( + self.buf.is_empty(), + "Buffer must be empty for first datagram" + ); + self.datagram_max_offset = max_size; + self.num_datagrams = 1; + if self.datagram_max_offset > self.buf.capacity() { + // Reserve all remaining capacity right away. + let max_batch_capacity = max_size * self.max_datagrams.get(); + self.buf + .reserve_exact(max_batch_capacity.saturating_sub(self.buf.capacity())); + } } - /// Starts a new datagram in the transmit buffer - /// - /// If this starts the second datagram the segment size will be set to the size of the - /// first datagram. - /// - /// If the underlying buffer does not have enough capacity yet this will allocate enough - /// capacity for all the datagrams allowed in a single batch. Use - /// [`TransmitBuf::start_new_datagram_with_size`] if you know you will need less. - pub(super) fn start_new_datagram(&mut self) { - // We reserve the maximum space for sending `max_datagrams` upfront to avoid any - // reallocations if more datagrams have to be appended later on. Benchmarks have - // shown a 5-10% throughput improvement compared to continuously resizing the - // datagram buffer. While this will lead to over-allocation for small transmits - // (e.g. purely containing ACKs), modern memory allocators (e.g. mimalloc and - // jemalloc) will pool certain allocation sizes and therefore this is still rather - // efficient. - let max_capacity_hint = self.max_datagrams.get() * self.segment_size; - self.new_datagram_inner(self.segment_size, max_capacity_hint) - } - - fn new_datagram_inner(&mut self, datagram_size: usize, max_capacity_hint: usize) { - debug_assert!(self.num_datagrams < self.max_datagrams.into()); - if self.num_datagrams == 1 { - // Set the segment size to the size of the first datagram. - self.segment_size = self.buf.len(); - } - if self.num_datagrams >= 1 { - debug_assert!(datagram_size <= self.segment_size); - if datagram_size < self.segment_size { - // If this is a GSO batch and this datagram is smaller than the segment - // size, this must be the last datagram in the batch. - self.max_datagrams = NonZeroUsize::MIN.saturating_add(self.num_datagrams); - } - } - self.datagram_start = self.buf.len(); - debug_assert_eq!( - self.datagram_start % self.segment_size, - 0, - "datagrams in a GSO batch must be aligned to the segment size" + /// Starts a subsequent datagram in the GSO batch. + pub(super) fn start_datagram(&mut self) { + // Could be enforced with typestate, but that's probably also meh. + debug_assert!( + self.num_datagrams >= 1, + "Use start_first_datagram for first datagram" ); - self.buf_capacity = self.datagram_start + datagram_size; - if self.buf_capacity > self.buf.capacity() { - self.buf - .reserve_exact(max_capacity_hint.saturating_sub(self.buf.capacity())); + debug_assert!( + self.buf.len() <= self.datagram_max_offset, + "Datagram exceeded max offset" + ); + let segment_size = self + .segment_size + .get_or_insert_with(|| self.buf.len() - self.datagram_start_offset); + let segment_size = *segment_size; + if self.num_datagrams > 1 { + debug_assert_eq!( + self.buf.len(), + self.datagram_max_offset, + "Subsequent datagrams must be exactly the segment size" + ); } + self.num_datagrams += 1; + self.datagram_start_offset = self.buf.len(); + self.datagram_max_offset = self.buf.len() + segment_size; + if self.datagram_max_offset > self.buf.capacity() { + // Reserve all remaining capacity right away. + let max_batch_capacity = segment_size * self.max_datagrams.get(); + self.buf + .reserve_exact(max_batch_capacity.saturating_sub(self.buf.capacity())); + } } - /// Clips the segment size to the current size + /// Mark the first datagram as completely written, setting its size. /// /// Only valid for the first datagram, when the datagram might be smaller than the - /// segment size. Needed before estimating the available space in the next datagram - /// based on [`TransmitBuf::segment_size`]. + /// maximum size it was allowed to be. Needed before estimating the available space in + /// the next datagram based on [`TransmitBuf::segment_size`]. /// /// Use [`TransmitBuf::start_new_datagram_with_size`] if you need to reduce the size of /// the last datagram in a batch. - pub(super) fn clip_segment_size(&mut self) { + pub(super) fn end_first_datagram(&mut self) { debug_assert_eq!(self.num_datagrams, 1); - if self.buf.len() < self.segment_size { + if self.buf.len() < self.datagram_max_offset { trace!( - segment_size = self.buf.len(), - prev_segment_size = self.segment_size, + size = self.buf.len(), + max_size = self.datagram_max_offset, "clipped datagram size" ); } - self.segment_size = self.buf.len(); - self.buf_capacity = self.buf.len(); + self.datagram_max_offset = self.buf.len(); } - /// Returns the GSO segment size - /// - /// This is also the maximum size datagrams are allowed to be. The first and last - /// datagram in a batch are allowed to be smaller however. After the first datagram the - /// segment size is clipped to the size of the first datagram. - /// - /// If the last datagram was created using [`TransmitBuf::start_new_datagram_with_size`] - /// the the segment size will be greater than the current datagram is allowed to be. - /// Thus [`TransmitBuf::datagram_remaining_mut`] should be used if you need to know the - /// amount of data that can be written into the datagram. - pub(super) fn segment_size(&self) -> usize { + /// Returns the GSO segment size. + pub(super) fn segment_size(&self) -> Option { self.segment_size } + /// Returns the maximum size this datagram is allowed to be. + /// + /// Once a second datagram is started this is equivalent to the segment size. + pub(super) fn max_datagram_size(&self) -> usize { + self.datagram_max_offset - self.datagram_start_offset + } + /// Returns the number of datagrams written into the buffer /// /// The last datagram is not necessarily finished yet. @@ -183,7 +170,7 @@ impl<'a> TransmitBuf<'a> { /// /// In other words, this offset contains the first byte of the current datagram. pub(super) fn datagram_start_offset(&self) -> usize { - self.datagram_start + self.datagram_start_offset } /// Returns the maximum offset in the buffer allowed for the current datagram @@ -191,12 +178,12 @@ impl<'a> TransmitBuf<'a> { /// The first and last datagram in a batch are allowed to be smaller then the maximum /// size. All datagrams in between need to be exactly this size. pub(super) fn datagram_max_offset(&self) -> usize { - self.buf_capacity + self.datagram_max_offset } /// Returns the number of bytes that may still be written into this datagram pub(super) fn datagram_remaining_mut(&self) -> usize { - self.buf_capacity.saturating_sub(self.buf.len()) + self.datagram_max_offset.saturating_sub(self.buf.len()) } /// Returns `true` if the buffer did not have anything written into it diff --git a/quinn-proto/src/tests/mod.rs b/quinn-proto/src/tests/mod.rs index dd3b3fe11..909a9d28d 100644 --- a/quinn-proto/src/tests/mod.rs +++ b/quinn-proto/src/tests/mod.rs @@ -3485,6 +3485,7 @@ fn large_datagram_with_acks() { pair.drive_server(); } + info!("sending datagrams"); let max_size = pair.client_datagrams(client_ch).max_size().unwrap(); let msg = Bytes::from(vec![0; max_size]); pair.client_datagrams(client_ch) diff --git a/quinn-proto/src/tests/util.rs b/quinn-proto/src/tests/util.rs index d9eb56782..adff80efb 100644 --- a/quinn-proto/src/tests/util.rs +++ b/quinn-proto/src/tests/util.rs @@ -318,6 +318,7 @@ impl Pair { self.client_conn_mut(client_ch).poll(), Some(Event::HandshakeConfirmed) ); + info!("connected"); } pub(super) fn client_conn_mut(&mut self, ch: ConnectionHandle) -> &mut Connection { From 3bddedaca9a582d7ea047f4ec7ac4f51384973e1 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 21 Jan 2026 12:02:36 +0100 Subject: [PATCH 24/24] small cleanups --- quinn-proto/src/connection/mod.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 6bca6330b..d0a2cafad 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -1338,14 +1338,13 @@ impl Connection { transmit.datagram_remaining_mut() } else { // A new datagram needs to be started. The first datagram is sized to the - // MTU. All other datagrams can be maximum the size of the previous + // MTU. All other datagrams can be at most the size of the previous // datagram. match transmit.num_datagrams { 0 => self.path_data(path_id).current_mtu().into(), _ => transmit.max_datagram_size(), } }; - tracing::warn!(max_packet_size, "XXXXXXXXXXXXXX"); let can_send = self.space_can_send(space_id, path_id, max_packet_size, connection_close_pending); @@ -1677,8 +1676,8 @@ impl Connection { builder.finish_and_track(now, self, path_id, pad_datagram); } - // If this is the first datagram we set the segment size to the size of the - // first datagram. + // If this is the first datagram we set its maximum size to this finished + // packet. if transmit.num_datagrams() == 1 { transmit.end_first_datagram(); }