From 42487095234e65098b5fb2a21fde73234ed6ae49 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Tue, 11 Mar 2025 15:37:13 +0100 Subject: [PATCH 01/19] Introduce a TransmitBuf struct This TransmitBuf is a wrapper around the buffer in which datagrams are being created. It keeps track of the state required to know the boundaries of the datagrams in the buffer. --- quinn-proto/src/connection/mod.rs | 153 +++++++++++---------- quinn-proto/src/connection/transmit_buf.rs | 91 ++++++++++++ 2 files changed, 173 insertions(+), 71 deletions(-) create mode 100644 quinn-proto/src/connection/transmit_buf.rs diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 9f10ec103c..11a5c093fe 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -21,6 +21,7 @@ use crate::{ cid_queue::CidQueue, coding::BufMutExt, config::{ServerConfig, TransportConfig}, + congestion::Controller, crypto::{self, KeyPair, Keys, PacketKey}, frame::{self, Close, Datagram, FrameStruct, NewToken}, packet::{ @@ -85,9 +86,11 @@ pub use streams::{ }; mod timer; -use crate::congestion::Controller; use timer::{Timer, TimerTable}; +mod transmit_buf; +use transmit_buf::TransmitBuf; + /// Protocol state and logic for a single QUIC connection /// /// Objects of this type receive [`ConnectionEvent`]s and emit [`EndpointEvent`]s and application @@ -455,14 +458,9 @@ impl Connection { false => 1, true => max_datagrams, }; + let mut buf = TransmitBuf::new(buf, max_datagrams, self.path.current_mtu().into()); - let mut num_datagrams = 0; - // Position in `buf` of the first byte of the current UDP datagram. When coalescing QUIC - // packets, this can be earlier than the start of the current QUIC packet. - let mut datagram_start = 0; - let mut segment_size = usize::from(self.path.current_mtu()); - - if let Some(challenge) = self.send_path_challenge(now, buf) { + if let Some(challenge) = self.send_path_challenge(now, &mut buf) { return Some(challenge); } @@ -500,11 +498,6 @@ impl Connection { && self.peer_supports_ack_frequency(); } - // Reserving capacity can provide more capacity than we asked for. However, we are not - // allowed to write more than `segment_size`. Therefore the maximum capacity is tracked - // separately. - let mut buf_capacity = 0; - let mut coalesce = true; let mut builder_storage: Option = None; let mut sent_frames = None; @@ -525,8 +518,9 @@ impl Connection { // don't account for coalesced packets potentially occupying space because frames can // always spill into the next datagram. let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]); - let frame_space_1rtt = - segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn))); + let frame_space_1rtt = buf + .segment_size + .saturating_sub(self.predict_1rtt_overhead(Some(pn))); // Is there data or a close message to send in this space? let can_send = self.space_can_send(space_id, frame_space_1rtt); @@ -560,11 +554,15 @@ impl Connection { } else { unreachable!("tried to send {:?} packet without keys", space_id) }; - if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE + tag_len { + + // We are NOT coalescing (the default is we are, so this was turned off in an + // earlier iteration) OR there is not enough space for another *packet* in this + // datagram (buf_capacity - buf_end == unused space in datagram). + if !coalesce || buf.buf_capacity - buf_end < MIN_PACKET_SPACE + tag_len { // We need to send 1 more datagram and extend the buffer for that. // Is 1 more datagram allowed? - if num_datagrams >= max_datagrams { + if buf.num_datagrams >= buf.max_datagrams { // No more datagrams allowed break; } @@ -577,7 +575,7 @@ impl Connection { // (see https://github.com/quinn-rs/quinn/issues/1082) if self .path - .anti_amplification_blocked(segment_size as u64 * (num_datagrams as u64) + 1) + .anti_amplification_blocked((buf.segment_size * buf.num_datagrams) as u64 + 1) { trace!("blocked by anti-amplification"); break; @@ -588,13 +586,13 @@ impl Connection { if ack_eliciting && self.spaces[space_id].loss_probes == 0 { // Assume the current packet will get padded to fill the segment let untracked_bytes = if let Some(builder) = &builder_storage { - buf_capacity - builder.partial_encode.start + buf.buf_capacity - builder.partial_encode.start } else { 0 } as u64; - debug_assert!(untracked_bytes <= segment_size as u64); + debug_assert!(untracked_bytes <= buf.segment_size as u64); - let bytes_to_send = segment_size as u64 + untracked_bytes; + let bytes_to_send = buf.segment_size as u64 + untracked_bytes; if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() { space_idx += 1; congestion_blocked = true; @@ -628,7 +626,7 @@ impl Connection { builder.pad_to(MIN_INITIAL_SIZE); } - if num_datagrams > 1 { + if buf.num_datagrams > 1 { // If too many padding bytes would be required to continue the GSO batch // after this packet, end the GSO batch here. Ensures that fixed-size frames // with heterogeneous sizes (e.g. application datagrams) won't inadvertently @@ -643,14 +641,14 @@ impl Connection { // `buf_capacity` by less than `segment_size`. const MAX_PADDING: usize = 16; let packet_len_unpadded = cmp::max(builder.min_size, buf.len()) - - datagram_start + - buf.datagram_start + builder.tag_len; - if packet_len_unpadded + MAX_PADDING < segment_size - || datagram_start + segment_size > buf_capacity + if packet_len_unpadded + MAX_PADDING < buf.segment_size + || buf.datagram_start + buf.segment_size > buf.buf_capacity { trace!( "GSO truncated by demand for {} padding bytes or loss probe", - segment_size - packet_len_unpadded + buf.segment_size - packet_len_unpadded ); builder_storage = Some(builder); break; @@ -658,22 +656,22 @@ impl Connection { // Pad the current datagram to GSO segment size so it can be included in the // GSO batch. - builder.pad_to(segment_size as u16); + builder.pad_to(buf.segment_size as u16); } - builder.finish_and_track(now, self, sent_frames.take(), buf); + builder.finish_and_track(now, self, sent_frames.take(), buf.buf); - if num_datagrams == 1 { + if buf.num_datagrams == 1 { // Set the segment size for this GSO batch to the size of the first UDP // datagram in the batch. Larger data that cannot be fragmented // (e.g. application datagrams) will be included in a future batch. When // sending large enough volumes of data for GSO to be useful, we expect // packet sizes to usually be consistent, e.g. populated by max-size STREAM // frames or uniformly sized datagrams. - segment_size = buf.len(); + buf.segment_size = buf.len(); // Clip the unused capacity out of the buffer so future packets don't // overrun - buf_capacity = buf.len(); + buf.buf_capacity = buf.len(); // Check whether the data we planned to send will fit in the reduced segment // size. If not, bail out and leave it for the next GSO batch so we don't @@ -682,8 +680,9 @@ impl Connection { // that time we haven't determined whether we're going to coalesce with the // first datagram or potentially pad it to `MIN_INITIAL_SIZE`. if space_id == SpaceId::Data { - let frame_space_1rtt = - segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn))); + let frame_space_1rtt = buf + .segment_size + .saturating_sub(self.predict_1rtt_overhead(Some(pn))); if self.space_can_send(space_id, frame_space_1rtt).is_empty() { break; } @@ -693,17 +692,17 @@ impl Connection { // Allocate space for another datagram let next_datagram_size_limit = match self.spaces[space_id].loss_probes { - 0 => segment_size, + 0 => buf.segment_size, _ => { self.spaces[space_id].loss_probes -= 1; // Clamp the datagram to at most the minimum MTU to ensure that loss probes // can get through and enable recovery even if the path MTU has shrank // unexpectedly. - std::cmp::min(segment_size, usize::from(INITIAL_MTU)) + std::cmp::min(buf.segment_size, usize::from(INITIAL_MTU)) } }; - buf_capacity += next_datagram_size_limit; - if buf.capacity() < buf_capacity { + buf.buf_capacity += next_datagram_size_limit; + if buf.buf.capacity() < buf.buf_capacity { // We reserve the maximum space for sending `max_datagrams` upfront // to avoid any reallocations if more datagrams have to be appended later on. // Benchmarks have shown shown a 5-10% throughput improvement @@ -712,15 +711,15 @@ impl Connection { // (e.g. purely containing ACKs), modern memory allocators // (e.g. mimalloc and jemalloc) will pool certain allocation sizes // and therefore this is still rather efficient. - buf.reserve(max_datagrams * segment_size); + buf.buf.reserve(buf.max_datagrams * buf.segment_size); } - num_datagrams += 1; + buf.num_datagrams += 1; coalesce = true; pad_datagram = false; - datagram_start = buf.len(); + buf.datagram_start = buf.len(); debug_assert_eq!( - datagram_start % segment_size, + buf.datagram_start % buf.segment_size, 0, "datagrams in a GSO batch must be aligned to the segment size" ); @@ -729,11 +728,11 @@ impl Connection { // datagram. // Finish current packet without adding extra padding if let Some(builder) = builder_storage.take() { - builder.finish_and_track(now, self, sent_frames.take(), buf); + builder.finish_and_track(now, self, sent_frames.take(), buf.buf); } } - debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE); + debug_assert!(buf.buf_capacity - buf.len() >= MIN_PACKET_SPACE); // // From here on, we've determined that a packet will definitely be sent. @@ -760,9 +759,9 @@ impl Connection { now, space_id, self.rem_cids.active(), - buf, - buf_capacity, - datagram_start, + buf.buf, + buf.buf_capacity, + buf.datagram_start, ack_eliciting, self, )?); @@ -784,7 +783,7 @@ impl Connection { self.receiving_ecn, &mut SentFrames::default(), &mut self.spaces[space_id], - buf, + buf.buf, &mut self.stats, ); } @@ -801,14 +800,14 @@ impl Connection { match self.state { State::Closed(state::Closed { ref reason }) => { if space_id == SpaceId::Data || reason.is_transport_layer() { - reason.encode(buf, max_frame_size) + reason.encode(&mut buf, max_frame_size) } else { frame::ConnectionClose { error_code: TransportErrorCode::APPLICATION_ERROR, frame_type: None, reason: Bytes::new(), } - .encode(buf, max_frame_size) + .encode(&mut buf, max_frame_size) } } State::Draining => frame::ConnectionClose { @@ -816,7 +815,7 @@ impl Connection { frame_type: None, reason: Bytes::new(), } - .encode(buf, max_frame_size), + .encode(&mut buf, max_frame_size), _ => unreachable!( "tried to make a close packet when the connection wasn't closed" ), @@ -838,7 +837,7 @@ impl Connection { // Send an off-path PATH_RESPONSE. Prioritized over on-path data to ensure that path // validation can occur while the link is saturated. - if space_id == SpaceId::Data && num_datagrams == 1 { + if space_id == SpaceId::Data && buf.num_datagrams == 1 { if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) { // `unwrap` guaranteed to succeed because `builder_storage` was populated just // above. @@ -855,7 +854,7 @@ impl Connection { non_retransmits: true, ..SentFrames::default() }), - buf, + buf.buf, ); self.stats.udp_tx.on_sent(1, buf.len()); return Some(Transmit { @@ -868,8 +867,13 @@ impl Connection { } } - let sent = - self.populate_packet(now, space_id, buf, builder.max_size, builder.exact_number); + let sent = self.populate_packet( + now, + space_id, + buf.buf, + builder.max_size, + builder.exact_number, + ); // ACK-only packets should only be sent when explicitly allowed. If we write them due to // any other reason, there is a bug which leads to one component announcing write @@ -881,7 +885,8 @@ impl Connection { !(sent.is_ack_only(&self.streams) && !can_send.acks && can_send.other - && (buf_capacity - builder.datagram_start) == self.path.current_mtu() as usize + && (buf.buf_capacity - builder.datagram_start) + == self.path.current_mtu() as usize && self.datagrams.outgoing.is_empty()), "SendableFrames was {can_send:?}, but only ACKs have been written" ); @@ -905,7 +910,7 @@ impl Connection { builder.pad_to(MIN_INITIAL_SIZE); } let last_packet_number = builder.exact_number; - builder.finish_and_track(now, self, sent_frames, buf); + builder.finish_and_track(now, self, sent_frames, buf.buf); self.path .congestion .on_sent(now, buf.len() as u64, last_packet_number); @@ -921,15 +926,15 @@ impl Connection { .mtud .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))?; - let buf_capacity = probe_size as usize; - buf.reserve(buf_capacity); + buf.buf_capacity = probe_size as usize; + buf.buf.reserve(buf.buf_capacity); let mut builder = PacketBuilder::new( now, space_id, self.rem_cids.active(), - buf, - buf_capacity, + buf.buf, + buf.buf_capacity, 0, true, self, @@ -950,10 +955,10 @@ impl Connection { non_retransmits: true, ..Default::default() }; - builder.finish_and_track(now, self, Some(sent_frames), buf); + builder.finish_and_track(now, self, Some(sent_frames), buf.buf); self.stats.path.sent_plpmtud_probes += 1; - num_datagrams = 1; + buf.num_datagrams = 1; trace!(?probe_size, "writing MTUD probe"); } @@ -962,10 +967,16 @@ impl Connection { return None; } - trace!("sending {} bytes in {} datagrams", buf.len(), num_datagrams); + trace!( + "sending {} bytes in {} datagrams", + buf.len(), + buf.num_datagrams + ); self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64); - self.stats.udp_tx.on_sent(num_datagrams as u64, buf.len()); + self.stats + .udp_tx + .on_sent(buf.num_datagrams as u64, buf.len()); Some(Transmit { destination: self.path.remote, @@ -975,16 +986,16 @@ impl Connection { } else { None }, - segment_size: match num_datagrams { + segment_size: match buf.num_datagrams { 1 => None, - _ => Some(segment_size), + _ => Some(buf.segment_size), }, src_ip: self.local_ip, }) } /// Send PATH_CHALLENGE for a previous path if necessary - fn send_path_challenge(&mut self, now: Instant, buf: &mut Vec) -> Option { + fn send_path_challenge(&mut self, now: Instant, buf: &mut TransmitBuf<'_>) -> Option { let (prev_cid, prev_path) = self.prev_path.as_mut()?; if !prev_path.challenge_pending { return None; @@ -999,9 +1010,9 @@ impl Connection { SpaceId::Data, "PATH_CHALLENGE queued without 1-RTT keys" ); - buf.reserve(MIN_INITIAL_SIZE as usize); + buf.buf.reserve(MIN_INITIAL_SIZE as usize); - let buf_capacity = buf.capacity(); + let buf_capacity = buf.buf.capacity(); // Use the previous CID to avoid linking the new path with the previous path. We // don't bother accounting for possible retirement of that prev_cid because this is @@ -1012,7 +1023,7 @@ impl Connection { now, SpaceId::Data, *prev_cid, - buf, + buf.buf, buf_capacity, 0, false, @@ -1029,7 +1040,7 @@ impl Connection { // sending a datagram of this size builder.pad_to(MIN_INITIAL_SIZE); - builder.finish(self, buf); + builder.finish(self, buf.buf); self.stats.udp_tx.on_sent(1, buf.len()); Some(Transmit { diff --git a/quinn-proto/src/connection/transmit_buf.rs b/quinn-proto/src/connection/transmit_buf.rs new file mode 100644 index 0000000000..de6da0877b --- /dev/null +++ b/quinn-proto/src/connection/transmit_buf.rs @@ -0,0 +1,91 @@ +use bytes::BufMut; + +/// The buffer in which to write datagrams for [`Connection::poll_transmit`] +/// +/// The `poll_transmit` function writes zero or more datagrams to a buffer. Multiple +/// datagrams are possible in case GSO (Generic Segmentation Offload) is supported. +/// +/// This buffer tracks datagrams being written to it. There is always a "current" datagram, +/// which is started by calling [`TransmitBuf::start_new_datagram`]. Writing to the buffer +/// is done through the [`BufMut`] interface. +/// +/// Usually a datagram contains one QUIC packet, though QUIC-TRANSPORT 12.2 Coalescing +/// Packets allows for placing multiple packets into a single datagram provided all but the +/// last packet uses long headers. This is normally used during connection setup where often +/// the initial, handshake and sometimes even a 1-RTT packet can be coalesced into a single +/// datagram. +/// +/// Inside a single packet multiple QUIC frames are written. +/// +/// The buffer managed here is passed straight to the OS' `sendmsg` call (or variant) once +/// `poll_transmit` returns. So needs to contain the datagrams as they are sent on the +/// wire. +/// +/// [`Connection::poll_transmit`]: super::Connection::poll_transmit +#[derive(Debug)] +pub(super) struct TransmitBuf<'a> { + /// The buffer itself, packets are written to this buffer + pub(super) buf: &'a mut Vec, + /// Offset into the buffer at which the current datagram starts + /// + /// Note that when coalescing packets this might be before the start of the current + /// packet. + pub(super) datagram_start: usize, + /// The maximum offset allowed to be used for the current datagram in the buffer + /// + /// The first and last datagram in a batch are allowed to be smaller then the maximum + /// size. All datagrams in between need to be exactly this size. + pub(super) buf_capacity: usize, + /// The maximum number of datagrams allowed to write into [`TransmitBuf::buf`] + pub(super) max_datagrams: usize, + /// The number of datagrams already (partially) written into the buffer + /// + /// Incremented by a call to [`TransmitBuf::start_new_datagram`]. + pub(super) num_datagrams: usize, + /// The segment size of this GSO batch + /// + /// The segment size is the size of each datagram in the GSO batch, only the last + /// datagram in the batch may be smaller. + /// + /// For the first datagram this is set to the maximum size a datagram is allowed to be: + /// the current path MTU. After the first datagram is finished this is reduced to the + /// size of the first datagram and can no longer change. + pub(super) segment_size: usize, +} + +impl<'a> TransmitBuf<'a> { + pub(super) fn new(buf: &'a mut Vec, max_datagrams: usize, mtu: usize) -> Self { + Self { + buf, + datagram_start: 0, + buf_capacity: 0, + max_datagrams, + num_datagrams: 0, + segment_size: mtu, + } + } + + /// Returns `true` if the buffer did not have anything written into it + pub(super) fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// The number of bytes written into the buffer so far + pub(super) fn len(&self) -> usize { + self.buf.len() + } +} + +unsafe impl BufMut for TransmitBuf<'_> { + fn remaining_mut(&self) -> usize { + self.buf.remaining_mut() + } + + unsafe fn advance_mut(&mut self, cnt: usize) { + self.buf.advance_mut(cnt); + } + + fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice { + self.buf.chunk_mut() + } +} From 3cc8ffc94ff469454b2a7720888806f8fecde607 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 12 Mar 2025 12:42:09 +0100 Subject: [PATCH 02/19] Pass the TransmitBuf directly to the PacketBuilder This removes the extra arguments from Packetbuilder::new that tell the builder about datagram boundaries in favour of using the state of TransmitBuf directly. --- quinn-proto/src/connection/mod.rs | 31 +++++--------------- quinn-proto/src/connection/packet_builder.rs | 14 ++++----- 2 files changed, 13 insertions(+), 32 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 11a5c093fe..d43c0d71ba 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -759,9 +759,7 @@ impl Connection { now, space_id, self.rem_cids.active(), - buf.buf, - buf.buf_capacity, - buf.datagram_start, + &mut buf, ack_eliciting, self, )?); @@ -929,16 +927,9 @@ impl Connection { buf.buf_capacity = probe_size as usize; buf.buf.reserve(buf.buf_capacity); - let mut builder = PacketBuilder::new( - now, - space_id, - self.rem_cids.active(), - buf.buf, - buf.buf_capacity, - 0, - true, - self, - )?; + debug_assert_eq!(buf.datagram_start, 0); + let mut builder = + PacketBuilder::new(now, space_id, self.rem_cids.active(), &mut buf, true, self)?; // We implement MTU probes as ping packets padded up to the probe size buf.write(frame::FrameType::PING); @@ -1012,23 +1003,15 @@ impl Connection { ); buf.buf.reserve(MIN_INITIAL_SIZE as usize); - let buf_capacity = buf.buf.capacity(); + buf.buf_capacity = buf.buf.capacity(); // Use the previous CID to avoid linking the new path with the previous path. We // don't bother accounting for possible retirement of that prev_cid because this is // sent once, immediately after migration, when the CID is known to be valid. Even // if a post-migration packet caused the CID to be retired, it's fair to pretend // this is sent first. - let mut builder = PacketBuilder::new( - now, - SpaceId::Data, - *prev_cid, - buf.buf, - buf_capacity, - 0, - false, - self, - )?; + debug_assert_eq!(buf.datagram_start, 0); + let mut builder = PacketBuilder::new(now, SpaceId::Data, *prev_cid, buf, false, self)?; trace!("validating previous path with PATH_CHALLENGE {:08x}", token); buf.write(frame::FrameType::PATH_CHALLENGE); buf.write(token); diff --git a/quinn-proto/src/connection/packet_builder.rs b/quinn-proto/src/connection/packet_builder.rs index d99f012b1a..dac1f9c9e2 100644 --- a/quinn-proto/src/connection/packet_builder.rs +++ b/quinn-proto/src/connection/packet_builder.rs @@ -2,7 +2,7 @@ use bytes::Bytes; use rand::Rng; use tracing::{trace, trace_span}; -use super::{Connection, SentFrames, spaces::SentPacket}; +use super::{Connection, SentFrames, TransmitBuf, spaces::SentPacket}; use crate::{ ConnectionId, Instant, TransportError, TransportErrorCode, connection::ConnectionSide, @@ -36,9 +36,7 @@ impl PacketBuilder { now: Instant, space_id: SpaceId, dst_cid: ConnectionId, - buffer: &mut Vec, - buffer_capacity: usize, - datagram_start: usize, + buffer: &mut TransmitBuf<'_>, ack_eliciting: bool, conn: &mut Connection, ) -> Option { @@ -122,9 +120,9 @@ impl PacketBuilder { version, }), }; - let partial_encode = header.encode(buffer); + let partial_encode = header.encode(buffer.buf); if conn.peer_params.grease_quic_bit && conn.rng.random() { - buffer[partial_encode.start] ^= FIXED_BIT; + buffer.buf[partial_encode.start] ^= FIXED_BIT; } let (sample_size, tag_len) = if let Some(ref crypto) = space.crypto { @@ -151,11 +149,11 @@ impl PacketBuilder { buffer.len() + (sample_size + 4).saturating_sub(number.len() + tag_len), partial_encode.start + dst_cid.len() + 6, ); - let max_size = buffer_capacity - tag_len; + let max_size = buffer.buf_capacity - tag_len; debug_assert!(max_size >= min_size); Some(Self { - datagram_start, + datagram_start: buffer.datagram_start, space: space_id, partial_encode, exact_number, From f2683b7085c7d82bf6e4694313f0c6446fccd5de Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 12 Mar 2025 18:16:15 +0100 Subject: [PATCH 03/19] Move allocating new datagrams into TransmitBuf This moves the logic of updating the buffer for new datagrams into a function on the TransmitBuf. This centralises all the manipulation of these variables into one logical place, ensuring all the invariants are upheld. --- quinn-proto/src/connection/mod.rs | 81 +++++++--------------- quinn-proto/src/connection/transmit_buf.rs | 75 ++++++++++++++++++++ 2 files changed, 101 insertions(+), 55 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index d43c0d71ba..a6468bd805 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -661,68 +661,42 @@ impl Connection { builder.finish_and_track(now, self, sent_frames.take(), buf.buf); - if buf.num_datagrams == 1 { - // Set the segment size for this GSO batch to the size of the first UDP - // datagram in the batch. Larger data that cannot be fragmented - // (e.g. application datagrams) will be included in a future batch. When - // sending large enough volumes of data for GSO to be useful, we expect - // packet sizes to usually be consistent, e.g. populated by max-size STREAM - // frames or uniformly sized datagrams. - buf.segment_size = buf.len(); - // Clip the unused capacity out of the buffer so future packets don't - // overrun - buf.buf_capacity = buf.len(); - - // Check whether the data we planned to send will fit in the reduced segment - // size. If not, bail out and leave it for the next GSO batch so we don't - // end up trying to send an empty packet. We can't easily compute the right - // segment size before the original call to `space_can_send`, because at - // that time we haven't determined whether we're going to coalesce with the - // first datagram or potentially pad it to `MIN_INITIAL_SIZE`. - if space_id == SpaceId::Data { - let frame_space_1rtt = buf - .segment_size - .saturating_sub(self.predict_1rtt_overhead(Some(pn))); - if self.space_can_send(space_id, frame_space_1rtt).is_empty() { - break; - } + if buf.num_datagrams == 1 && space_id == SpaceId::Data { + // Now that we know the size of the first datagram, check whether + // the data we planned to send will fit in the next segment. If + // not, bails out and leave it for the next GSO batch. We can't + // easily compute the right segment size before the original call to + // `space_can_send`, because at that time we haven't determined + // whether we're going to coalesce with the first datagram or + // potentially pad it to `MIN_INITIAL_SIZE`. + + buf.clip_datagram_size(); + + let frame_space_1rtt = buf + .segment_size + .saturating_sub(self.predict_1rtt_overhead(Some(pn))); + if self.space_can_send(space_id, frame_space_1rtt).is_empty() { + break; } } } - // Allocate space for another datagram - let next_datagram_size_limit = match self.spaces[space_id].loss_probes { - 0 => buf.segment_size, + // Start the next datagram + match self.spaces[space_id].loss_probes { + 0 => buf.start_new_datagram(), _ => { self.spaces[space_id].loss_probes -= 1; // Clamp the datagram to at most the minimum MTU to ensure that loss probes // can get through and enable recovery even if the path MTU has shrank // unexpectedly. - std::cmp::min(buf.segment_size, usize::from(INITIAL_MTU)) + buf.start_new_datagram_with_size(std::cmp::min( + usize::from(INITIAL_MTU), + buf.segment_size, + )); } }; - buf.buf_capacity += next_datagram_size_limit; - if buf.buf.capacity() < buf.buf_capacity { - // We reserve the maximum space for sending `max_datagrams` upfront - // to avoid any reallocations if more datagrams have to be appended later on. - // Benchmarks have shown shown a 5-10% throughput improvement - // compared to continuously resizing the datagram buffer. - // While this will lead to over-allocation for small transmits - // (e.g. purely containing ACKs), modern memory allocators - // (e.g. mimalloc and jemalloc) will pool certain allocation sizes - // and therefore this is still rather efficient. - buf.buf.reserve(buf.max_datagrams * buf.segment_size); - } - buf.num_datagrams += 1; coalesce = true; pad_datagram = false; - buf.datagram_start = buf.len(); - - debug_assert_eq!( - buf.datagram_start % buf.segment_size, - 0, - "datagrams in a GSO batch must be aligned to the segment size" - ); } else { // We can append/coalesce the next packet into the current // datagram. @@ -924,8 +898,8 @@ impl Connection { .mtud .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))?; - buf.buf_capacity = probe_size as usize; - buf.buf.reserve(buf.buf_capacity); + debug_assert_eq!(buf.num_datagrams, 0); + buf.start_new_datagram_with_size(probe_size as usize); debug_assert_eq!(buf.datagram_start, 0); let mut builder = @@ -949,7 +923,6 @@ impl Connection { builder.finish_and_track(now, self, Some(sent_frames), buf.buf); self.stats.path.sent_plpmtud_probes += 1; - buf.num_datagrams = 1; trace!(?probe_size, "writing MTUD probe"); } @@ -1001,9 +974,7 @@ impl Connection { SpaceId::Data, "PATH_CHALLENGE queued without 1-RTT keys" ); - buf.buf.reserve(MIN_INITIAL_SIZE as usize); - - buf.buf_capacity = buf.buf.capacity(); + buf.start_new_datagram_with_size(MIN_INITIAL_SIZE as usize); // Use the previous CID to avoid linking the new path with the previous path. We // don't bother accounting for possible retirement of that prev_cid because this is diff --git a/quinn-proto/src/connection/transmit_buf.rs b/quinn-proto/src/connection/transmit_buf.rs index de6da0877b..b42fb48960 100644 --- a/quinn-proto/src/connection/transmit_buf.rs +++ b/quinn-proto/src/connection/transmit_buf.rs @@ -65,6 +65,81 @@ impl<'a> TransmitBuf<'a> { } } + /// Starts a datagram with a custom datagram size + /// + /// This is a specialized version of [`TransmitBuf::start_new_datagram`] which sets the + /// datagram size. Useful for e.g. PATH_CHALLENGE, tail-loss probes or MTU probes. + /// + /// After the first datagram you can never increase the segment size. If you decrease + /// the size of a datagram in a batch, it must be the last datagram of the batch. + pub(super) fn start_new_datagram_with_size(&mut self, datagram_size: usize) { + // Only reserve space for this datagram, usually it is the last one in the batch. + let max_capacity_hint = datagram_size; + self.new_datagram_inner(datagram_size, max_capacity_hint) + } + + /// Starts a new datagram in the transmit buffer + /// + /// If this starts the second datagram the segment size will be set to the size of the + /// first datagram. + /// + /// If the underlying buffer does not have enough capacity yet this will allocate enough + /// capacity for all the datagrams allowed in a single batch. Use + /// [`TransmitBuf::start_new_datagram_with_size`] if you know you will need less. + pub(super) fn start_new_datagram(&mut self) { + // We reserve the maximum space for sending `max_datagrams` upfront to avoid any + // reallocations if more datagrams have to be appended later on. Benchmarks have + // shown a 5-10% throughput improvement compared to continuously resizing the + // datagram buffer. While this will lead to over-allocation for small transmits + // (e.g. purely containing ACKs), modern memory allocators (e.g. mimalloc and + // jemalloc) will pool certain allocation sizes and therefore this is still rather + // efficient. + let max_capacity_hint = self.max_datagrams * self.segment_size; + self.new_datagram_inner(self.segment_size, max_capacity_hint) + } + + fn new_datagram_inner(&mut self, datagram_size: usize, max_capacity_hint: usize) { + debug_assert!(self.num_datagrams < self.max_datagrams); + if self.num_datagrams == 1 { + // Set the segment size to the size of the first datagram. + self.segment_size = self.buf.len(); + } + if self.num_datagrams >= 1 { + debug_assert!(datagram_size <= self.segment_size); + if datagram_size < self.segment_size { + // If this is a GSO batch and this datagram is smaller than the segment + // size, this must be the last datagram in the batch. + self.max_datagrams = self.num_datagrams + 1; + } + } + self.datagram_start = self.buf.len(); + debug_assert_eq!( + self.datagram_start % self.segment_size, + 0, + "datagrams in a GSO batch must be aligned to the segment size" + ); + self.buf_capacity = self.datagram_start + datagram_size; + if self.buf_capacity > self.buf.capacity() { + self.buf + .reserve_exact(max_capacity_hint.saturating_sub(self.buf.capacity())); + } + self.num_datagrams += 1; + } + + /// Clips the datagram size to the current size + /// + /// Only valid for the first datagram, when the datagram might be smaller than the + /// segment size. Needed before estimating the available space in the next datagram + /// based on [`TransmitBuf::segment_size`]. + /// + /// Use [`TransmitBuf::start_new_datagram_with_size`] if you need to reduce the size of + /// the last datagram in a batch. + pub(super) fn clip_datagram_size(&mut self) { + debug_assert_eq!(self.num_datagrams, 1); + self.segment_size = self.buf.len(); + self.buf_capacity = self.buf.len(); + } + /// Returns `true` if the buffer did not have anything written into it pub(super) fn is_empty(&self) -> bool { self.len() == 0 From 75ed1dcc9855a0388a545d95a2fd79de42826b70 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 12 Mar 2025 18:28:07 +0100 Subject: [PATCH 04/19] Move simple field access to accessor functions This helps encapsulation by ensuring that no outside users can mutate these fields. Reducing the amount of logic that needs to be reasoned about. --- quinn-proto/src/connection/mod.rs | 86 ++++++++++---------- quinn-proto/src/connection/packet_builder.rs | 4 +- quinn-proto/src/connection/transmit_buf.rs | 44 +++++++++- 3 files changed, 86 insertions(+), 48 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index a6468bd805..34469fdb66 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -519,7 +519,7 @@ impl Connection { // always spill into the next datagram. let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]); let frame_space_1rtt = buf - .segment_size + .segment_size() .saturating_sub(self.predict_1rtt_overhead(Some(pn))); // Is there data or a close message to send in this space? @@ -558,11 +558,11 @@ impl Connection { // We are NOT coalescing (the default is we are, so this was turned off in an // earlier iteration) OR there is not enough space for another *packet* in this // datagram (buf_capacity - buf_end == unused space in datagram). - if !coalesce || buf.buf_capacity - buf_end < MIN_PACKET_SPACE + tag_len { + if !coalesce || buf.datagram_max_offset() - buf_end < MIN_PACKET_SPACE + tag_len { // We need to send 1 more datagram and extend the buffer for that. // Is 1 more datagram allowed? - if buf.num_datagrams >= buf.max_datagrams { + if buf.num_datagrams() >= buf.max_datagrams() { // No more datagrams allowed break; } @@ -573,10 +573,9 @@ impl Connection { // for starting another datagram. If there is any anti-amplification // budget left, we always allow a full MTU to be sent // (see https://github.com/quinn-rs/quinn/issues/1082) - if self - .path - .anti_amplification_blocked((buf.segment_size * buf.num_datagrams) as u64 + 1) - { + if self.path.anti_amplification_blocked( + (buf.segment_size() * buf.num_datagrams()) as u64 + 1, + ) { trace!("blocked by anti-amplification"); break; } @@ -586,13 +585,13 @@ impl Connection { if ack_eliciting && self.spaces[space_id].loss_probes == 0 { // Assume the current packet will get padded to fill the segment let untracked_bytes = if let Some(builder) = &builder_storage { - buf.buf_capacity - builder.partial_encode.start + buf.datagram_max_offset() - builder.partial_encode.start } else { 0 } as u64; - debug_assert!(untracked_bytes <= buf.segment_size as u64); + debug_assert!(untracked_bytes <= buf.segment_size() as u64); - let bytes_to_send = buf.segment_size as u64 + untracked_bytes; + let bytes_to_send = buf.segment_size() as u64 + untracked_bytes; if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() { space_idx += 1; congestion_blocked = true; @@ -626,7 +625,7 @@ impl Connection { builder.pad_to(MIN_INITIAL_SIZE); } - if buf.num_datagrams > 1 { + if buf.num_datagrams() > 1 { // If too many padding bytes would be required to continue the GSO batch // after this packet, end the GSO batch here. Ensures that fixed-size frames // with heterogeneous sizes (e.g. application datagrams) won't inadvertently @@ -641,14 +640,15 @@ impl Connection { // `buf_capacity` by less than `segment_size`. const MAX_PADDING: usize = 16; let packet_len_unpadded = cmp::max(builder.min_size, buf.len()) - - buf.datagram_start + - buf.datagram_start_offset() + builder.tag_len; - if packet_len_unpadded + MAX_PADDING < buf.segment_size - || buf.datagram_start + buf.segment_size > buf.buf_capacity + if packet_len_unpadded + MAX_PADDING < buf.segment_size() + || buf.datagram_start_offset() + buf.segment_size() + > buf.datagram_max_offset() { trace!( "GSO truncated by demand for {} padding bytes or loss probe", - buf.segment_size - packet_len_unpadded + buf.segment_size() - packet_len_unpadded ); builder_storage = Some(builder); break; @@ -656,27 +656,29 @@ impl Connection { // Pad the current datagram to GSO segment size so it can be included in the // GSO batch. - builder.pad_to(buf.segment_size as u16); + builder.pad_to(buf.segment_size() as u16); } builder.finish_and_track(now, self, sent_frames.take(), buf.buf); - if buf.num_datagrams == 1 && space_id == SpaceId::Data { - // Now that we know the size of the first datagram, check whether - // the data we planned to send will fit in the next segment. If - // not, bails out and leave it for the next GSO batch. We can't - // easily compute the right segment size before the original call to - // `space_can_send`, because at that time we haven't determined - // whether we're going to coalesce with the first datagram or - // potentially pad it to `MIN_INITIAL_SIZE`. - + if buf.num_datagrams() == 1 { buf.clip_datagram_size(); - - let frame_space_1rtt = buf - .segment_size - .saturating_sub(self.predict_1rtt_overhead(Some(pn))); - if self.space_can_send(space_id, frame_space_1rtt).is_empty() { - break; + if space_id == SpaceId::Data { + // Now that we know the size of the first datagram, check + // whether the data we planned to send will fit in the next + // segment. If not, bails out and leave it for the next GSO + // batch. We can't easily compute the right segment size before + // the original call to `space_can_send`, because at that time + // we haven't determined whether we're going to coalesce with + // the first datagram or potentially pad it to + // `MIN_INITIAL_SIZE`. + + let frame_space_1rtt = buf + .segment_size() + .saturating_sub(self.predict_1rtt_overhead(Some(pn))); + if self.space_can_send(space_id, frame_space_1rtt).is_empty() { + break; + } } } } @@ -691,7 +693,7 @@ impl Connection { // unexpectedly. buf.start_new_datagram_with_size(std::cmp::min( usize::from(INITIAL_MTU), - buf.segment_size, + buf.segment_size(), )); } }; @@ -706,7 +708,7 @@ impl Connection { } } - debug_assert!(buf.buf_capacity - buf.len() >= MIN_PACKET_SPACE); + debug_assert!(buf.datagram_max_offset() - buf.len() >= MIN_PACKET_SPACE); // // From here on, we've determined that a packet will definitely be sent. @@ -809,7 +811,7 @@ impl Connection { // Send an off-path PATH_RESPONSE. Prioritized over on-path data to ensure that path // validation can occur while the link is saturated. - if space_id == SpaceId::Data && buf.num_datagrams == 1 { + if space_id == SpaceId::Data && buf.num_datagrams() == 1 { if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) { // `unwrap` guaranteed to succeed because `builder_storage` was populated just // above. @@ -857,7 +859,7 @@ impl Connection { !(sent.is_ack_only(&self.streams) && !can_send.acks && can_send.other - && (buf.buf_capacity - builder.datagram_start) + && (buf.datagram_max_offset() - builder.datagram_start) == self.path.current_mtu() as usize && self.datagrams.outgoing.is_empty()), "SendableFrames was {can_send:?}, but only ACKs have been written" @@ -898,10 +900,10 @@ impl Connection { .mtud .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))?; - debug_assert_eq!(buf.num_datagrams, 0); + debug_assert_eq!(buf.num_datagrams(), 0); buf.start_new_datagram_with_size(probe_size as usize); - debug_assert_eq!(buf.datagram_start, 0); + debug_assert_eq!(buf.datagram_start_offset(), 0); let mut builder = PacketBuilder::new(now, space_id, self.rem_cids.active(), &mut buf, true, self)?; @@ -934,13 +936,13 @@ impl Connection { trace!( "sending {} bytes in {} datagrams", buf.len(), - buf.num_datagrams + buf.num_datagrams() ); self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64); self.stats .udp_tx - .on_sent(buf.num_datagrams as u64, buf.len()); + .on_sent(buf.num_datagrams() as u64, buf.len()); Some(Transmit { destination: self.path.remote, @@ -950,9 +952,9 @@ impl Connection { } else { None }, - segment_size: match buf.num_datagrams { + segment_size: match buf.num_datagrams() { 1 => None, - _ => Some(buf.segment_size), + _ => Some(buf.segment_size()), }, src_ip: self.local_ip, }) @@ -981,7 +983,7 @@ impl Connection { // sent once, immediately after migration, when the CID is known to be valid. Even // if a post-migration packet caused the CID to be retired, it's fair to pretend // this is sent first. - debug_assert_eq!(buf.datagram_start, 0); + debug_assert_eq!(buf.datagram_start_offset(), 0); let mut builder = PacketBuilder::new(now, SpaceId::Data, *prev_cid, buf, false, self)?; trace!("validating previous path with PATH_CHALLENGE {:08x}", token); buf.write(frame::FrameType::PATH_CHALLENGE); diff --git a/quinn-proto/src/connection/packet_builder.rs b/quinn-proto/src/connection/packet_builder.rs index dac1f9c9e2..48a5b3b8f0 100644 --- a/quinn-proto/src/connection/packet_builder.rs +++ b/quinn-proto/src/connection/packet_builder.rs @@ -149,11 +149,11 @@ impl PacketBuilder { buffer.len() + (sample_size + 4).saturating_sub(number.len() + tag_len), partial_encode.start + dst_cid.len() + 6, ); - let max_size = buffer.buf_capacity - tag_len; + let max_size = buffer.datagram_max_offset() - tag_len; debug_assert!(max_size >= min_size); Some(Self { - datagram_start: buffer.datagram_start, + datagram_start: buffer.datagram_start_offset(), space: space_id, partial_encode, exact_number, diff --git a/quinn-proto/src/connection/transmit_buf.rs b/quinn-proto/src/connection/transmit_buf.rs index b42fb48960..5054232563 100644 --- a/quinn-proto/src/connection/transmit_buf.rs +++ b/quinn-proto/src/connection/transmit_buf.rs @@ -30,14 +30,14 @@ pub(super) struct TransmitBuf<'a> { /// /// Note that when coalescing packets this might be before the start of the current /// packet. - pub(super) datagram_start: usize, + datagram_start: usize, /// The maximum offset allowed to be used for the current datagram in the buffer /// /// The first and last datagram in a batch are allowed to be smaller then the maximum /// size. All datagrams in between need to be exactly this size. - pub(super) buf_capacity: usize, + buf_capacity: usize, /// The maximum number of datagrams allowed to write into [`TransmitBuf::buf`] - pub(super) max_datagrams: usize, + max_datagrams: usize, /// The number of datagrams already (partially) written into the buffer /// /// Incremented by a call to [`TransmitBuf::start_new_datagram`]. @@ -50,7 +50,7 @@ pub(super) struct TransmitBuf<'a> { /// For the first datagram this is set to the maximum size a datagram is allowed to be: /// the current path MTU. After the first datagram is finished this is reduced to the /// size of the first datagram and can no longer change. - pub(super) segment_size: usize, + segment_size: usize, } impl<'a> TransmitBuf<'a> { @@ -140,6 +140,42 @@ impl<'a> TransmitBuf<'a> { self.buf_capacity = self.buf.len(); } + /// Returns the GSO segment size + /// + /// This is also the maximum size datagrams are allowed to be. The first and last + /// datagram in a batch are allowed to be smaller however. After the first datagram the + /// segment size is clipped to the size of the first datagram. + pub(super) fn segment_size(&self) -> usize { + self.segment_size + } + + /// Returns the number of datagrams written into the buffer + /// + /// The last datagram is not necessarily finished yet. + pub(super) fn num_datagrams(&self) -> usize { + self.num_datagrams + } + + /// Returns the maximum number of datagrams allowed to be written into the buffer + pub(super) fn max_datagrams(&self) -> usize { + self.max_datagrams + } + + /// Returns the start offset of the current datagram in the buffer + /// + /// In other words, this offset contains the first byte of the current datagram. + pub(super) fn datagram_start_offset(&self) -> usize { + self.datagram_start + } + + /// Returns the maximum offset in the buffer allowed for the current datagram + /// + /// The first and last datagram in a batch are allowed to be smaller then the maximum + /// size. All datagrams in between need to be exactly this size. + pub(super) fn datagram_max_offset(&self) -> usize { + self.buf_capacity + } + /// Returns `true` if the buffer did not have anything written into it pub(super) fn is_empty(&self) -> bool { self.len() == 0 From 76f0d0c5d8a5503fdb11bda3ba38b046cdfba3b3 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Thu, 13 Mar 2025 13:20:13 +0100 Subject: [PATCH 05/19] Do not pass buffer around as a Vec We want to hide the actual buffer implementation from most of these locations. Currently it is a TransmitBuf but it likely will become something else in the future. So make this generic. --- quinn-proto/src/connection/datagrams.rs | 6 ++--- quinn-proto/src/connection/mod.rs | 27 +++++++++++++++++---- quinn-proto/src/connection/streams/state.rs | 6 ++--- quinn-proto/src/connection/transmit_buf.rs | 8 ++++++ quinn-proto/src/frame.rs | 4 +-- quinn-proto/src/packet.rs | 3 ++- 6 files changed, 40 insertions(+), 14 deletions(-) diff --git a/quinn-proto/src/connection/datagrams.rs b/quinn-proto/src/connection/datagrams.rs index c22e8d7155..8e9fea0bdc 100644 --- a/quinn-proto/src/connection/datagrams.rs +++ b/quinn-proto/src/connection/datagrams.rs @@ -1,10 +1,10 @@ use std::collections::VecDeque; -use bytes::Bytes; +use bytes::{BufMut, Bytes}; use thiserror::Error; use tracing::{debug, trace}; -use super::Connection; +use super::{BufLen, Connection}; use crate::{ TransportError, frame::{Datagram, FrameStruct}, @@ -163,7 +163,7 @@ impl DatagramState { /// /// Returns whether a frame was written. At most `max_size` bytes will be written, including /// framing. - pub(super) fn write(&mut self, buf: &mut Vec, max_size: usize) -> bool { + pub(super) fn write(&mut self, buf: &mut (impl BufMut + BufLen), max_size: usize) -> bool { let datagram = match self.outgoing.pop_front() { Some(x) => x, None => return false, diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 34469fdb66..f36640d55d 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -7,7 +7,7 @@ use std::{ sync::Arc, }; -use bytes::{Bytes, BytesMut}; +use bytes::{BufMut, Bytes, BytesMut}; use frame::StreamMetaVec; use rand::{Rng, SeedableRng, rngs::StdRng}; use thiserror::Error; @@ -757,7 +757,7 @@ impl Connection { self.receiving_ecn, &mut SentFrames::default(), &mut self.spaces[space_id], - buf.buf, + &mut buf, &mut self.stats, ); } @@ -844,7 +844,7 @@ impl Connection { let sent = self.populate_packet( now, space_id, - buf.buf, + &mut buf, builder.max_size, builder.exact_number, ); @@ -3022,7 +3022,7 @@ impl Connection { &mut self, now: Instant, space_id: SpaceId, - buf: &mut Vec, + buf: &mut (impl BufMut + BufLen), max_size: usize, pn: u64, ) -> SentFrames { @@ -3288,7 +3288,7 @@ impl Connection { receiving_ecn: bool, sent: &mut SentFrames, space: &mut PacketSpace, - buf: &mut Vec, + buf: &mut impl BufMut, stats: &mut ConnectionStats, ) { debug_assert!(!space.pending_acks.ranges().is_empty()); @@ -3958,6 +3958,23 @@ fn negotiate_max_idle_timeout(x: Option, y: Option) -> Option usize; +} + +impl BufLen for Vec { + fn len(&self) -> usize { + self.len() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/quinn-proto/src/connection/streams/state.rs b/quinn-proto/src/connection/streams/state.rs index e05311423d..d6be209236 100644 --- a/quinn-proto/src/connection/streams/state.rs +++ b/quinn-proto/src/connection/streams/state.rs @@ -15,7 +15,7 @@ use super::{ use crate::{ Dir, MAX_STREAM_COUNT, Side, StreamId, TransportError, VarInt, coding::BufMutExt, - connection::stats::FrameStats, + connection::{BufLen, stats::FrameStats}, frame::{self, FrameStruct, StreamMetaVec}, transport_parameters::TransportParameters, }; @@ -411,7 +411,7 @@ impl StreamsState { pub(in crate::connection) fn write_control_frames( &mut self, - buf: &mut Vec, + buf: &mut (impl BufMut + BufLen), pending: &mut Retransmits, retransmits: &mut ThinRetransmits, stats: &mut FrameStats, @@ -541,7 +541,7 @@ impl StreamsState { pub(crate) fn write_stream_frames( &mut self, - buf: &mut Vec, + buf: &mut (impl BufMut + BufLen), max_buf_size: usize, fair: bool, ) -> StreamMetaVec { diff --git a/quinn-proto/src/connection/transmit_buf.rs b/quinn-proto/src/connection/transmit_buf.rs index 5054232563..a8188e156c 100644 --- a/quinn-proto/src/connection/transmit_buf.rs +++ b/quinn-proto/src/connection/transmit_buf.rs @@ -1,5 +1,7 @@ use bytes::BufMut; +use super::BufLen; + /// The buffer in which to write datagrams for [`Connection::poll_transmit`] /// /// The `poll_transmit` function writes zero or more datagrams to a buffer. Multiple @@ -200,3 +202,9 @@ unsafe impl BufMut for TransmitBuf<'_> { self.buf.chunk_mut() } } + +impl BufLen for TransmitBuf<'_> { + fn len(&self) -> usize { + self.len() + } +} diff --git a/quinn-proto/src/frame.rs b/quinn-proto/src/frame.rs index 21fe0340c2..9c2ca47ca3 100644 --- a/quinn-proto/src/frame.rs +++ b/quinn-proto/src/frame.rs @@ -899,13 +899,13 @@ impl FrameStruct for Datagram { } impl Datagram { - pub(crate) fn encode(&self, length: bool, out: &mut Vec) { + pub(crate) fn encode(&self, length: bool, out: &mut impl BufMut) { out.write(FrameType(*DATAGRAM_TYS.start() | u64::from(length))); // 1 byte if length { // Safe to unwrap because we check length sanity before queueing datagrams out.write(VarInt::from_u64(self.data.len() as u64).unwrap()); // <= 8 bytes } - out.extend_from_slice(&self.data); + out.put_slice(&self.data); } pub(crate) fn size(&self, length: bool) -> usize { diff --git a/quinn-proto/src/packet.rs b/quinn-proto/src/packet.rs index b5ef0c4026..d092b0124d 100644 --- a/quinn-proto/src/packet.rs +++ b/quinn-proto/src/packet.rs @@ -6,6 +6,7 @@ use thiserror::Error; use crate::{ ConnectionId, coding::{self, BufExt, BufMutExt}, + connection::BufLen, crypto, }; @@ -281,7 +282,7 @@ pub(crate) enum Header { } impl Header { - pub(crate) fn encode(&self, w: &mut Vec) -> PartialEncode { + pub(crate) fn encode(&self, w: &mut (impl BufMut + BufLen)) -> PartialEncode { use Header::*; let start = w.len(); match *self { From 86bcf5258d1cb38b7b4dc294136118179718ebd9 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Thu, 13 Mar 2025 13:22:06 +0100 Subject: [PATCH 06/19] Switch PacketBuilder to use TransmitBuf This allows making TransmitBuf::buf private at last. Now all the logic it handles is fully encapsulated. --- quinn-proto/src/connection/mod.rs | 12 ++++++------ quinn-proto/src/connection/packet_builder.rs | 20 ++++++++++++-------- quinn-proto/src/connection/transmit_buf.rs | 7 ++++++- 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index f36640d55d..3a39a43346 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -659,7 +659,7 @@ impl Connection { builder.pad_to(buf.segment_size() as u16); } - builder.finish_and_track(now, self, sent_frames.take(), buf.buf); + builder.finish_and_track(now, self, sent_frames.take(), &mut buf); if buf.num_datagrams() == 1 { buf.clip_datagram_size(); @@ -704,7 +704,7 @@ impl Connection { // datagram. // Finish current packet without adding extra padding if let Some(builder) = builder_storage.take() { - builder.finish_and_track(now, self, sent_frames.take(), buf.buf); + builder.finish_and_track(now, self, sent_frames.take(), &mut buf); } } @@ -828,7 +828,7 @@ impl Connection { non_retransmits: true, ..SentFrames::default() }), - buf.buf, + &mut buf, ); self.stats.udp_tx.on_sent(1, buf.len()); return Some(Transmit { @@ -884,7 +884,7 @@ impl Connection { builder.pad_to(MIN_INITIAL_SIZE); } let last_packet_number = builder.exact_number; - builder.finish_and_track(now, self, sent_frames, buf.buf); + builder.finish_and_track(now, self, sent_frames, &mut buf); self.path .congestion .on_sent(now, buf.len() as u64, last_packet_number); @@ -922,7 +922,7 @@ impl Connection { non_retransmits: true, ..Default::default() }; - builder.finish_and_track(now, self, Some(sent_frames), buf.buf); + builder.finish_and_track(now, self, Some(sent_frames), &mut buf); self.stats.path.sent_plpmtud_probes += 1; @@ -996,7 +996,7 @@ impl Connection { // sending a datagram of this size builder.pad_to(MIN_INITIAL_SIZE); - builder.finish(self, buf.buf); + builder.finish(self, buf); self.stats.udp_tx.on_sent(1, buf.len()); Some(Transmit { diff --git a/quinn-proto/src/connection/packet_builder.rs b/quinn-proto/src/connection/packet_builder.rs index 48a5b3b8f0..91713d0a6a 100644 --- a/quinn-proto/src/connection/packet_builder.rs +++ b/quinn-proto/src/connection/packet_builder.rs @@ -1,4 +1,4 @@ -use bytes::Bytes; +use bytes::{BufMut, Bytes}; use rand::Rng; use tracing::{trace, trace_span}; @@ -120,9 +120,9 @@ impl PacketBuilder { version, }), }; - let partial_encode = header.encode(buffer.buf); + let partial_encode = header.encode(buffer); if conn.peer_params.grease_quic_bit && conn.rng.random() { - buffer.buf[partial_encode.start] ^= FIXED_BIT; + buffer.as_mut_slice()[partial_encode.start] ^= FIXED_BIT; } let (sample_size, tag_len) = if let Some(ref crypto) = space.crypto { @@ -183,7 +183,7 @@ impl PacketBuilder { now: Instant, conn: &mut Connection, sent: Option, - buffer: &mut Vec, + buffer: &mut TransmitBuf<'_>, ) { let ack_eliciting = self.ack_eliciting; let exact_number = self.exact_number; @@ -226,11 +226,15 @@ impl PacketBuilder { } /// Encrypt packet, returning the length of the packet and whether padding was added - pub(super) fn finish(self, conn: &mut Connection, buffer: &mut Vec) -> (usize, bool) { + pub(super) fn finish( + self, + conn: &mut Connection, + buffer: &mut TransmitBuf<'_>, + ) -> (usize, bool) { let pad = buffer.len() < self.min_size; if pad { trace!("PADDING * {}", self.min_size - buffer.len()); - buffer.resize(self.min_size, 0); + buffer.put_bytes(0, self.min_size - buffer.len()); } let space = &conn.spaces[self.space]; @@ -249,9 +253,9 @@ impl PacketBuilder { "Mismatching crypto tag len" ); - buffer.resize(buffer.len() + packet_crypto.tag_len(), 0); + buffer.put_bytes(0, packet_crypto.tag_len()); let encode_start = self.partial_encode.start; - let packet_buf = &mut buffer[encode_start..]; + let packet_buf = &mut buffer.as_mut_slice()[encode_start..]; self.partial_encode.finish( packet_buf, header_crypto, diff --git a/quinn-proto/src/connection/transmit_buf.rs b/quinn-proto/src/connection/transmit_buf.rs index a8188e156c..61d3217c34 100644 --- a/quinn-proto/src/connection/transmit_buf.rs +++ b/quinn-proto/src/connection/transmit_buf.rs @@ -27,7 +27,7 @@ use super::BufLen; #[derive(Debug)] pub(super) struct TransmitBuf<'a> { /// The buffer itself, packets are written to this buffer - pub(super) buf: &'a mut Vec, + buf: &'a mut Vec, /// Offset into the buffer at which the current datagram starts /// /// Note that when coalescing packets this might be before the start of the current @@ -187,6 +187,11 @@ impl<'a> TransmitBuf<'a> { pub(super) fn len(&self) -> usize { self.buf.len() } + + /// Returns the already written bytes in the buffer + pub(super) fn as_mut_slice(&mut self) -> &mut [u8] { + self.buf.as_mut_slice() + } } unsafe impl BufMut for TransmitBuf<'_> { From e7ea03832f5ad7dc29067899efb94b068d83e06b Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Fri, 28 Mar 2025 14:50:11 +0100 Subject: [PATCH 07/19] Finish packets at end of each poll_transmit loop This re-arranges the loop in poll_transmit to always finish the packet before going to the next iteration. This primarily enables to mutably borrow the TransmitBuf into a packet-specific buffer while the packet is being built. But this is not yet utilised in this commit. It does however remove the need of the mutable builder_storage Option, which makes reasoning over packet building slightly easier. - The logic to know on which packet space to send next, or whether there is no longer anything to send, has been moved to the next_send_space method. - The logic to decide whether to pad a packet before finishing it is moved to the end of the loop. - The logic to check the congestion controller and pacing is kept at the start of the loop. Before a new packet is started. Starting a new datagram also stays there. --- quinn-proto/src/connection/mod.rs | 351 ++++++++++++++++-------------- quinn-proto/src/packet.rs | 11 + 2 files changed, 202 insertions(+), 160 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 3a39a43346..cb0d933bdb 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -499,68 +499,31 @@ impl Connection { } let mut coalesce = true; - let mut builder_storage: Option = None; let mut sent_frames = None; let mut pad_datagram = false; let mut congestion_blocked = false; + let mut last_packet_number = None; // Iterate over all spaces and find data to send - let mut space_idx = 0; - let spaces = [SpaceId::Initial, SpaceId::Handshake, SpaceId::Data]; - // This loop will potentially spend multiple iterations in the same `SpaceId`, - // so we cannot trivially rewrite it to take advantage of `SpaceId::iter()`. - while space_idx < spaces.len() { - let space_id = spaces[space_idx]; - // Number of bytes available for frames if this is a 1-RTT packet. We're guaranteed to - // be able to send an individual frame at least this large in the next 1-RTT - // packet. This could be generalized to support every space, but it's only needed to - // handle large fixed-size frames, which only exist in 1-RTT (application datagrams). We - // don't account for coalesced packets potentially occupying space because frames can - // always spill into the next datagram. - let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]); - let frame_space_1rtt = buf - .segment_size() - .saturating_sub(self.predict_1rtt_overhead(Some(pn))); - - // Is there data or a close message to send in this space? - let can_send = self.space_can_send(space_id, frame_space_1rtt); - if can_send.is_empty() && (!close || self.spaces[space_id].crypto.is_none()) { - space_idx += 1; - continue; - } - + // + // Each loop builds one packet. When packets are coalesced a datagram is filled + // over multiple loops. + let mut next_space_id = self.next_send_space(SpaceId::Initial, &buf, close); + while let Some(space_id) = next_space_id { + // Whether the next packet will contain ack-eliciting frames. let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams) || self.spaces[space_id].ping_pending || self.spaces[space_id].immediate_ack_pending; if space_id == SpaceId::Data { + let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]); + let frame_space_1rtt = buf + .segment_size() + .saturating_sub(self.predict_1rtt_overhead(Some(pn))); ack_eliciting |= self.can_send_1rtt(frame_space_1rtt); } - // Can we append more data into the current buffer? - // It is not safe to assume that `buf.len()` is the end of the data, - // since the last packet might not have been finished. - let buf_end = if let Some(builder) = &builder_storage { - buf.len().max(builder.min_size) + builder.tag_len - } else { - buf.len() - }; - - let tag_len = if let Some(ref crypto) = self.spaces[space_id].crypto { - crypto.packet.local.tag_len() - } else if space_id == SpaceId::Data { - self.zero_rtt_crypto.as_ref().expect( - "sending packets in the application data space requires known 0-RTT or 1-RTT keys", - ).packet.tag_len() - } else { - unreachable!("tried to send {:?} packet without keys", space_id) - }; - - // We are NOT coalescing (the default is we are, so this was turned off in an - // earlier iteration) OR there is not enough space for another *packet* in this - // datagram (buf_capacity - buf_end == unused space in datagram). - if !coalesce || buf.datagram_max_offset() - buf_end < MIN_PACKET_SPACE + tag_len { - // We need to send 1 more datagram and extend the buffer for that. - + // If the datagram is full, we need to start a new one. + if buf.len() == buf.datagram_max_offset() { // Is 1 more datagram allowed? if buf.num_datagrams() >= buf.max_datagrams() { // No more datagrams allowed @@ -583,22 +546,20 @@ impl Connection { // Congestion control and pacing checks // Tail loss probes must not be blocked by congestion, or a deadlock could arise if ack_eliciting && self.spaces[space_id].loss_probes == 0 { - // Assume the current packet will get padded to fill the segment - let untracked_bytes = if let Some(builder) = &builder_storage { - buf.datagram_max_offset() - builder.partial_encode.start - } else { - 0 - } as u64; - debug_assert!(untracked_bytes <= buf.segment_size() as u64); - - let bytes_to_send = buf.segment_size() as u64 + untracked_bytes; + let bytes_to_send = buf.segment_size() as u64; if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() { - space_idx += 1; + next_space_id = self.next_send_space(space_id.next(), &buf, close); congestion_blocked = true; - // We continue instead of breaking here in order to avoid - // blocking loss probes queued for higher spaces. trace!("blocked by congestion control"); - continue; + if next_space_id == Some(space_id) { + // We are in the highest space, nothing more to do. + break; + } else { + // We continue looking for packets in higher spaces because we + // might still have to send loss probes in them, which are not + // congestion controlled. + continue; + } } // Check whether the next datagram is blocked by pacing @@ -619,70 +580,6 @@ impl Connection { } } - // Finish current packet - if let Some(mut builder) = builder_storage.take() { - if pad_datagram { - builder.pad_to(MIN_INITIAL_SIZE); - } - - if buf.num_datagrams() > 1 { - // If too many padding bytes would be required to continue the GSO batch - // after this packet, end the GSO batch here. Ensures that fixed-size frames - // with heterogeneous sizes (e.g. application datagrams) won't inadvertently - // waste large amounts of bandwidth. The exact threshold is a bit arbitrary - // and might benefit from further tuning, though there's no universally - // optimal value. - // - // Additionally, if this datagram is a loss probe and `segment_size` is - // larger than `INITIAL_MTU`, then padding it to `segment_size` to continue - // the GSO batch would risk failure to recover from a reduction in path - // MTU. Loss probes are the only packets for which we might grow - // `buf_capacity` by less than `segment_size`. - const MAX_PADDING: usize = 16; - let packet_len_unpadded = cmp::max(builder.min_size, buf.len()) - - buf.datagram_start_offset() - + builder.tag_len; - if packet_len_unpadded + MAX_PADDING < buf.segment_size() - || buf.datagram_start_offset() + buf.segment_size() - > buf.datagram_max_offset() - { - trace!( - "GSO truncated by demand for {} padding bytes or loss probe", - buf.segment_size() - packet_len_unpadded - ); - builder_storage = Some(builder); - break; - } - - // Pad the current datagram to GSO segment size so it can be included in the - // GSO batch. - builder.pad_to(buf.segment_size() as u16); - } - - builder.finish_and_track(now, self, sent_frames.take(), &mut buf); - - if buf.num_datagrams() == 1 { - buf.clip_datagram_size(); - if space_id == SpaceId::Data { - // Now that we know the size of the first datagram, check - // whether the data we planned to send will fit in the next - // segment. If not, bails out and leave it for the next GSO - // batch. We can't easily compute the right segment size before - // the original call to `space_can_send`, because at that time - // we haven't determined whether we're going to coalesce with - // the first datagram or potentially pad it to - // `MIN_INITIAL_SIZE`. - - let frame_space_1rtt = buf - .segment_size() - .saturating_sub(self.predict_1rtt_overhead(Some(pn))); - if self.space_can_send(space_id, frame_space_1rtt).is_empty() { - break; - } - } - } - } - // Start the next datagram match self.spaces[space_id].loss_probes { 0 => buf.start_new_datagram(), @@ -699,13 +596,6 @@ impl Connection { }; coalesce = true; pad_datagram = false; - } else { - // We can append/coalesce the next packet into the current - // datagram. - // Finish current packet without adding extra padding - if let Some(builder) = builder_storage.take() { - builder.finish_and_track(now, self, sent_frames.take(), &mut buf); - } } debug_assert!(buf.datagram_max_offset() - buf.len() >= MIN_PACKET_SPACE); @@ -727,18 +617,19 @@ impl Connection { } debug_assert!( - builder_storage.is_none() && sent_frames.is_none(), + sent_frames.is_none(), "Previous packet must have been finished" ); - let builder = builder_storage.insert(PacketBuilder::new( + let mut builder = PacketBuilder::new( now, space_id, self.rem_cids.active(), &mut buf, ack_eliciting, self, - )?); + )?; + last_packet_number = Some(builder.exact_number); coalesce = coalesce && !builder.short_header; // https://tools.ietf.org/html/draft-ietf-quic-transport-34#section-14.1 @@ -751,11 +642,12 @@ impl Connection { // a better approximate on what data has been processed. This is // especially important with ack delay, since the peer might not // have gotten any other ACK for the data earlier on. + let mut sent_frames = SentFrames::default(); if !self.spaces[space_id].pending_acks.ranges().is_empty() { Self::populate_acks( now, self.receiving_ecn, - &mut SentFrames::default(), + &mut sent_frames, &mut self.spaces[space_id], &mut buf, &mut self.stats, @@ -795,6 +687,10 @@ impl Connection { ), } } + if pad_datagram { + builder.pad_to(MIN_INITIAL_SIZE); + } + builder.finish_and_track(now, self, Some(sent_frames), &mut buf); if space_id == self.highest_space { // Don't send another close packet self.close = false; @@ -804,7 +700,7 @@ impl Connection { // Send a close frame in every possible space for robustness, per RFC9000 // "Immediate Close during the Handshake". Don't bother trying to send anything // else. - space_idx += 1; + next_space_id = self.next_send_space(space_id.next(), &buf, close); continue; } } @@ -813,9 +709,6 @@ impl Connection { // validation can occur while the link is saturated. if space_id == SpaceId::Data && buf.num_datagrams() == 1 { if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) { - // `unwrap` guaranteed to succeed because `builder_storage` was populated just - // above. - let mut builder = builder_storage.take().unwrap(); trace!("PATH_RESPONSE {:08x} (off-path)", token); buf.write(frame::FrameType::PATH_RESPONSE); buf.write(token); @@ -855,15 +748,26 @@ impl Connection { // only checked if the full MTU is available and when potentially large fixed-size // frames aren't queued, so that lack of space in the datagram isn't the reason for just // writing ACKs. - debug_assert!( - !(sent.is_ack_only(&self.streams) - && !can_send.acks - && can_send.other - && (buf.datagram_max_offset() - builder.datagram_start) - == self.path.current_mtu() as usize - && self.datagrams.outgoing.is_empty()), - "SendableFrames was {can_send:?}, but only ACKs have been written" - ); + { + let pn = if builder.space == SpaceId::Data { + builder.exact_number + } else { + self.packet_number_filter.peek(&self.spaces[SpaceId::Data]) + }; + let frame_space_1rtt = buf + .segment_size() + .saturating_sub(self.predict_1rtt_overhead(Some(pn))); + let can_send = self.space_can_send(space_id, frame_space_1rtt); + debug_assert!( + !(sent.is_ack_only(&self.streams) + && !can_send.acks + && can_send.other + && (buf.datagram_max_offset() - builder.datagram_start) + == self.path.current_mtu() as usize + && self.datagrams.outgoing.is_empty()), + "SendableFrames was {can_send:?}, but only ACKs have been written" + ); + } pad_datagram |= sent.requires_padding; if sent.largest_acked.is_some() { @@ -874,17 +778,110 @@ impl Connection { // Keep information about the packet around until it gets finalized sent_frames = Some(sent); - // Don't increment space_idx. - // We stay in the current space and check if there is more data to send. - } + // Now we need to finish the packet. Before we do so we need to know if we will + // be coalescing the next packet into this one, or will be ending the datagram + // as well. Because if this is the last packet in the datagram more padding + // might be needed because of the packet type, or to fill the GSO segment size. + next_space_id = self.next_send_space(space_id, &buf, close); + if let Some(next_space_id) = next_space_id { + // Can we append another packet into the current datagram? + let buf_end = buf.len().max(builder.min_size) + builder.tag_len; + let tag_len = if let Some(ref crypto) = self.spaces[next_space_id].crypto { + crypto.packet.local.tag_len() + } else if next_space_id == SpaceId::Data { + self.zero_rtt_crypto.as_ref().expect( + "sending packets in the application data space requires known 0-RTT or 1-RTT keys", + ).packet.tag_len() + } else { + unreachable!("tried to send {:?} packet without keys", next_space_id); + }; + + // Are we allowed to coalesce AND is there enough space for another *packet* + // in this datagram? + if coalesce && buf.datagram_max_offset() - buf_end > MIN_PACKET_SPACE + tag_len { + // We can append/coalesce the next packet into the current + // datagram. Finish the current packet without adding extra padding. + builder.finish_and_track(now, self, sent_frames.take(), &mut buf); + } else { + // We need a new datagram for the next packet. Finish the current + // packet with padding. + if pad_datagram { + builder.pad_to(MIN_INITIAL_SIZE); + } + if buf.num_datagrams() > 1 { + // If too many padding bytes would be required to continue the + // GSO batch after this packet, end the GSO batch here. Ensures + // that fixed-size frames with heterogeneous sizes + // (e.g. application datagrams) won't inadvertently waste large + // amounts of bandwidth. The exact threshold is a bit arbitrary + // and might benefit from further tuning, though there's no + // universally optimal value. + // + // Additionally, if this datagram is a loss probe and + // `segment_size` is larger than `INITIAL_MTU`, then padding it + // to `segment_size` to continue the GSO batch would risk + // failure to recover from a reduction in path MTU. Loss probes + // are the only packets for which we might grow `buf_capacity` + // by less than `segment_size`. + const MAX_PADDING: usize = 16; + let packet_len_unpadded = cmp::max(builder.min_size, buf.len()) + - buf.datagram_start_offset() + + builder.tag_len; + if packet_len_unpadded + MAX_PADDING < buf.segment_size() + || buf.datagram_start_offset() + buf.segment_size() + > buf.datagram_max_offset() + { + trace!( + "GSO truncated by demand for {} padding bytes or loss probe", + buf.segment_size() - packet_len_unpadded + ); + builder.finish_and_track(now, self, sent_frames, &mut buf); + break; + } - // Finish the last packet - if let Some(mut builder) = builder_storage { - if pad_datagram { - builder.pad_to(MIN_INITIAL_SIZE); + // Pad the current datagram to GSO segment size so it can be + // included in the GSO batch. + builder.pad_to(buf.segment_size() as u16); + } + + builder.finish_and_track(now, self, sent_frames.take(), &mut buf); + + if buf.num_datagrams() == 1 { + buf.clip_datagram_size(); + if next_space_id == SpaceId::Data { + // Now that we know the size of the first datagram, check whether + // the data we planned to send will fit in the next segment. If + // not, bail out and leave it for the next GSO batch. We can't + // easily compute the right segment size before the original call to + // `space_can_send`, because at that time we haven't determined + // whether we're going to coalesce with the first datagram or + // potentially pad it to `MIN_INITIAL_SIZE`. + let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]); + let frame_space_1rtt = buf + .segment_size() + .saturating_sub(self.predict_1rtt_overhead(Some(pn))); + if self + .space_can_send(next_space_id, frame_space_1rtt) + .is_empty() + { + break; + } + } + } + } + } else { + // Nothing more to send. This was the last packet. + if pad_datagram { + builder.pad_to(MIN_INITIAL_SIZE); + } + builder.finish_and_track(now, self, sent_frames, &mut buf); + break; } - let last_packet_number = builder.exact_number; - builder.finish_and_track(now, self, sent_frames, &mut buf); + } + + if let Some(last_packet_number) = last_packet_number { + // Note that when sending in multiple packet spaces the last packet number will + // be the one from the highest packet space. self.path .congestion .on_sent(now, buf.len() as u64, last_packet_number); @@ -960,6 +957,40 @@ impl Connection { }) } + /// Returns the [`SpaceId`] of the next packet space which has data to send + /// + /// This takes into account the space available to frames in the next datagram. + fn next_send_space( + &self, + current_space_id: SpaceId, + buf: &TransmitBuf<'_>, + close: bool, + ) -> Option { + // Number of bytes available for frames if this is a 1-RTT packet. We're guaranteed + // to be able to send an individual frame at least this large in the next 1-RTT + // packet. This could be generalized to support every space, but it's only needed to + // handle large fixed-size frames, which only exist in 1-RTT (application + // datagrams). We don't account for coalesced packets potentially occupying space + // because frames can always spill into the next datagram. + let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]); + let frame_space_1rtt = buf + .segment_size() + .saturating_sub(self.predict_1rtt_overhead(Some(pn))); + let mut space_id = current_space_id; + loop { + let can_send = self.space_can_send(space_id, frame_space_1rtt); + if !can_send.is_empty() || (close && self.spaces[space_id].crypto.is_some()) { + return Some(space_id); + } + space_id = match space_id { + SpaceId::Initial => SpaceId::Handshake, + SpaceId::Handshake => SpaceId::Data, + SpaceId::Data => break, + } + } + None + } + /// Send PATH_CHALLENGE for a previous path if necessary fn send_path_challenge(&mut self, now: Instant, buf: &mut TransmitBuf<'_>) -> Option { let (prev_cid, prev_path) = self.prev_path.as_mut()?; diff --git a/quinn-proto/src/packet.rs b/quinn-proto/src/packet.rs index d092b0124d..d5bfd38f6c 100644 --- a/quinn-proto/src/packet.rs +++ b/quinn-proto/src/packet.rs @@ -895,6 +895,17 @@ impl SpaceId { pub fn iter() -> impl Iterator { [Self::Initial, Self::Handshake, Self::Data].iter().cloned() } + + /// Returns the next higher packet space. + /// + /// Keeps returning [`SpaceId::Data`] as the highest space. + pub fn next(&self) -> Self { + match self { + Self::Initial => Self::Handshake, + Self::Handshake => Self::Data, + Self::Data => Self::Data, + } + } } #[cfg(test)] From 22dc951a1f47e77d630193b34276d903b639921b Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Fri, 28 Mar 2025 15:04:53 +0100 Subject: [PATCH 08/19] Remove mutable send_frames Option Now that the packet is always finished at the end of the loop we no longer need to carry around the mutable SentFrames. Reducing further the number of things to keep track of. --- quinn-proto/src/connection/mod.rs | 25 ++++++++----------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index cb0d933bdb..5900b4bce2 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -499,7 +499,6 @@ impl Connection { } let mut coalesce = true; - let mut sent_frames = None; let mut pad_datagram = false; let mut congestion_blocked = false; let mut last_packet_number = None; @@ -616,11 +615,6 @@ impl Connection { prev.update_unacked = false; } - debug_assert!( - sent_frames.is_none(), - "Previous packet must have been finished" - ); - let mut builder = PacketBuilder::new( now, space_id, @@ -734,7 +728,7 @@ impl Connection { } } - let sent = self.populate_packet( + let sent_frames = self.populate_packet( now, space_id, &mut buf, @@ -759,7 +753,7 @@ impl Connection { .saturating_sub(self.predict_1rtt_overhead(Some(pn))); let can_send = self.space_can_send(space_id, frame_space_1rtt); debug_assert!( - !(sent.is_ack_only(&self.streams) + !(sent_frames.is_ack_only(&self.streams) && !can_send.acks && can_send.other && (buf.datagram_max_offset() - builder.datagram_start) @@ -768,16 +762,13 @@ impl Connection { "SendableFrames was {can_send:?}, but only ACKs have been written" ); } - pad_datagram |= sent.requires_padding; + pad_datagram |= sent_frames.requires_padding; - if sent.largest_acked.is_some() { + if sent_frames.largest_acked.is_some() { self.spaces[space_id].pending_acks.acks_sent(); self.timers.stop(Timer::MaxAckDelay); } - // Keep information about the packet around until it gets finalized - sent_frames = Some(sent); - // Now we need to finish the packet. Before we do so we need to know if we will // be coalescing the next packet into this one, or will be ending the datagram // as well. Because if this is the last packet in the datagram more padding @@ -801,7 +792,7 @@ impl Connection { if coalesce && buf.datagram_max_offset() - buf_end > MIN_PACKET_SPACE + tag_len { // We can append/coalesce the next packet into the current // datagram. Finish the current packet without adding extra padding. - builder.finish_and_track(now, self, sent_frames.take(), &mut buf); + builder.finish_and_track(now, self, Some(sent_frames), &mut buf); } else { // We need a new datagram for the next packet. Finish the current // packet with padding. @@ -835,7 +826,7 @@ impl Connection { "GSO truncated by demand for {} padding bytes or loss probe", buf.segment_size() - packet_len_unpadded ); - builder.finish_and_track(now, self, sent_frames, &mut buf); + builder.finish_and_track(now, self, Some(sent_frames), &mut buf); break; } @@ -844,7 +835,7 @@ impl Connection { builder.pad_to(buf.segment_size() as u16); } - builder.finish_and_track(now, self, sent_frames.take(), &mut buf); + builder.finish_and_track(now, self, Some(sent_frames), &mut buf); if buf.num_datagrams() == 1 { buf.clip_datagram_size(); @@ -874,7 +865,7 @@ impl Connection { if pad_datagram { builder.pad_to(MIN_INITIAL_SIZE); } - builder.finish_and_track(now, self, sent_frames, &mut buf); + builder.finish_and_track(now, self, Some(sent_frames), &mut buf); break; } } From cce06c4615dc2b1db68d6a2e134e3a74a3de9b60 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Fri, 28 Mar 2025 15:05:57 +0100 Subject: [PATCH 09/19] Remove Option from SentFrames argument for finish_and_track The PacketBuilder::finish_and_track function took a SentFrames argument as an Option. However this was an artifact of the poll_transmit loop tracking it in an Option before, and it not being clear this was always Some when things were going correctly. Now all the callers clearly always pass this in so we can remove the Option. --- quinn-proto/src/connection/mod.rs | 16 ++++++++-------- quinn-proto/src/connection/packet_builder.rs | 6 +----- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 5900b4bce2..d544834bd1 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -684,7 +684,7 @@ impl Connection { if pad_datagram { builder.pad_to(MIN_INITIAL_SIZE); } - builder.finish_and_track(now, self, Some(sent_frames), &mut buf); + builder.finish_and_track(now, self, sent_frames, &mut buf); if space_id == self.highest_space { // Don't send another close packet self.close = false; @@ -711,10 +711,10 @@ impl Connection { builder.finish_and_track( now, self, - Some(SentFrames { + SentFrames { non_retransmits: true, ..SentFrames::default() - }), + }, &mut buf, ); self.stats.udp_tx.on_sent(1, buf.len()); @@ -792,7 +792,7 @@ impl Connection { if coalesce && buf.datagram_max_offset() - buf_end > MIN_PACKET_SPACE + tag_len { // We can append/coalesce the next packet into the current // datagram. Finish the current packet without adding extra padding. - builder.finish_and_track(now, self, Some(sent_frames), &mut buf); + builder.finish_and_track(now, self, sent_frames, &mut buf); } else { // We need a new datagram for the next packet. Finish the current // packet with padding. @@ -826,7 +826,7 @@ impl Connection { "GSO truncated by demand for {} padding bytes or loss probe", buf.segment_size() - packet_len_unpadded ); - builder.finish_and_track(now, self, Some(sent_frames), &mut buf); + builder.finish_and_track(now, self, sent_frames, &mut buf); break; } @@ -835,7 +835,7 @@ impl Connection { builder.pad_to(buf.segment_size() as u16); } - builder.finish_and_track(now, self, Some(sent_frames), &mut buf); + builder.finish_and_track(now, self, sent_frames, &mut buf); if buf.num_datagrams() == 1 { buf.clip_datagram_size(); @@ -865,7 +865,7 @@ impl Connection { if pad_datagram { builder.pad_to(MIN_INITIAL_SIZE); } - builder.finish_and_track(now, self, Some(sent_frames), &mut buf); + builder.finish_and_track(now, self, sent_frames, &mut buf); break; } } @@ -910,7 +910,7 @@ impl Connection { non_retransmits: true, ..Default::default() }; - builder.finish_and_track(now, self, Some(sent_frames), &mut buf); + builder.finish_and_track(now, self, sent_frames, &mut buf); self.stats.path.sent_plpmtud_probes += 1; diff --git a/quinn-proto/src/connection/packet_builder.rs b/quinn-proto/src/connection/packet_builder.rs index 91713d0a6a..e87de2cc2e 100644 --- a/quinn-proto/src/connection/packet_builder.rs +++ b/quinn-proto/src/connection/packet_builder.rs @@ -182,17 +182,13 @@ impl PacketBuilder { self, now: Instant, conn: &mut Connection, - sent: Option, + sent: SentFrames, buffer: &mut TransmitBuf<'_>, ) { let ack_eliciting = self.ack_eliciting; let exact_number = self.exact_number; let space_id = self.space; let (size, padded) = self.finish(conn, buffer); - let sent = match sent { - Some(sent) => sent, - None => return, - }; let size = match padded || ack_eliciting { true => size as u16, From 5abaf0fbda3d6bd069cc6430f760970a6fd73971 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Fri, 28 Mar 2025 18:13:23 +0100 Subject: [PATCH 10/19] Make the PacketBuilder own the TransmitBuf Now the lifetimes allow for this the PacketBuilder can own the TransmitBuf. This is gives it more control over the buffer into which the packet can be written. This commit itself does nothing interesting with this yet. It merely moves the buffer ownership in a mechanical way. However, this enables future changes to reduce the use of offsets in so many places. --- quinn-proto/src/connection/mod.rs | 84 ++++++++++---------- quinn-proto/src/connection/packet_builder.rs | 74 +++++++++++------ 2 files changed, 91 insertions(+), 67 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index d544834bd1..022ec66fda 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -643,7 +643,7 @@ impl Connection { self.receiving_ecn, &mut sent_frames, &mut self.spaces[space_id], - &mut buf, + &mut builder, &mut self.stats, ); } @@ -652,22 +652,22 @@ impl Connection { // to encode the ConnectionClose frame too. However we still have the // check here to prevent crashes if something changes. debug_assert!( - buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size, + builder.buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size, "ACKs should leave space for ConnectionClose" ); - if buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size { - let max_frame_size = builder.max_size - buf.len(); + if builder.buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size { + let max_frame_size = builder.max_size - builder.buf.len(); match self.state { State::Closed(state::Closed { ref reason }) => { if space_id == SpaceId::Data || reason.is_transport_layer() { - reason.encode(&mut buf, max_frame_size) + reason.encode(&mut builder, max_frame_size) } else { frame::ConnectionClose { error_code: TransportErrorCode::APPLICATION_ERROR, frame_type: None, reason: Bytes::new(), } - .encode(&mut buf, max_frame_size) + .encode(&mut builder, max_frame_size) } } State::Draining => frame::ConnectionClose { @@ -675,7 +675,7 @@ impl Connection { frame_type: None, reason: Bytes::new(), } - .encode(&mut buf, max_frame_size), + .encode(&mut builder, max_frame_size), _ => unreachable!( "tried to make a close packet when the connection wasn't closed" ), @@ -684,7 +684,7 @@ impl Connection { if pad_datagram { builder.pad_to(MIN_INITIAL_SIZE); } - builder.finish_and_track(now, self, sent_frames, &mut buf); + builder.finish_and_track(now, self, sent_frames); if space_id == self.highest_space { // Don't send another close packet self.close = false; @@ -701,11 +701,11 @@ impl Connection { // Send an off-path PATH_RESPONSE. Prioritized over on-path data to ensure that path // validation can occur while the link is saturated. - if space_id == SpaceId::Data && buf.num_datagrams() == 1 { + if space_id == SpaceId::Data && builder.buf.num_datagrams() == 1 { if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) { trace!("PATH_RESPONSE {:08x} (off-path)", token); - buf.write(frame::FrameType::PATH_RESPONSE); - buf.write(token); + builder.write(frame::FrameType::PATH_RESPONSE); + builder.write(token); self.stats.frame_tx.path_response += 1; builder.pad_to(MIN_INITIAL_SIZE); builder.finish_and_track( @@ -715,7 +715,6 @@ impl Connection { non_retransmits: true, ..SentFrames::default() }, - &mut buf, ); self.stats.udp_tx.on_sent(1, buf.len()); return Some(Transmit { @@ -728,13 +727,11 @@ impl Connection { } } - let sent_frames = self.populate_packet( - now, - space_id, - &mut buf, - builder.max_size, - builder.exact_number, - ); + let sent_frames = { + let max_size = builder.max_size; + let pn = builder.exact_number; + self.populate_packet(now, space_id, &mut builder, max_size, pn) + }; // ACK-only packets should only be sent when explicitly allowed. If we write them due to // any other reason, there is a bug which leads to one component announcing write @@ -748,7 +745,8 @@ impl Connection { } else { self.packet_number_filter.peek(&self.spaces[SpaceId::Data]) }; - let frame_space_1rtt = buf + let frame_space_1rtt = builder + .buf .segment_size() .saturating_sub(self.predict_1rtt_overhead(Some(pn))); let can_send = self.space_can_send(space_id, frame_space_1rtt); @@ -756,7 +754,7 @@ impl Connection { !(sent_frames.is_ack_only(&self.streams) && !can_send.acks && can_send.other - && (buf.datagram_max_offset() - builder.datagram_start) + && (builder.buf.datagram_max_offset() - builder.datagram_start) == self.path.current_mtu() as usize && self.datagrams.outgoing.is_empty()), "SendableFrames was {can_send:?}, but only ACKs have been written" @@ -773,10 +771,10 @@ impl Connection { // be coalescing the next packet into this one, or will be ending the datagram // as well. Because if this is the last packet in the datagram more padding // might be needed because of the packet type, or to fill the GSO segment size. - next_space_id = self.next_send_space(space_id, &buf, close); + next_space_id = self.next_send_space(space_id, builder.buf, close); if let Some(next_space_id) = next_space_id { // Can we append another packet into the current datagram? - let buf_end = buf.len().max(builder.min_size) + builder.tag_len; + let buf_end = builder.len().max(builder.min_size) + builder.tag_len; let tag_len = if let Some(ref crypto) = self.spaces[next_space_id].crypto { crypto.packet.local.tag_len() } else if next_space_id == SpaceId::Data { @@ -789,17 +787,19 @@ impl Connection { // Are we allowed to coalesce AND is there enough space for another *packet* // in this datagram? - if coalesce && buf.datagram_max_offset() - buf_end > MIN_PACKET_SPACE + tag_len { + if coalesce + && builder.buf.datagram_max_offset() - buf_end > MIN_PACKET_SPACE + tag_len + { // We can append/coalesce the next packet into the current // datagram. Finish the current packet without adding extra padding. - builder.finish_and_track(now, self, sent_frames, &mut buf); + builder.finish_and_track(now, self, sent_frames); } else { // We need a new datagram for the next packet. Finish the current // packet with padding. if pad_datagram { builder.pad_to(MIN_INITIAL_SIZE); } - if buf.num_datagrams() > 1 { + if builder.buf.num_datagrams() > 1 { // If too many padding bytes would be required to continue the // GSO batch after this packet, end the GSO batch here. Ensures // that fixed-size frames with heterogeneous sizes @@ -815,27 +815,27 @@ impl Connection { // are the only packets for which we might grow `buf_capacity` // by less than `segment_size`. const MAX_PADDING: usize = 16; - let packet_len_unpadded = cmp::max(builder.min_size, buf.len()) - - buf.datagram_start_offset() + let packet_len_unpadded = cmp::max(builder.min_size, builder.buf.len()) + - builder.buf.datagram_start_offset() + builder.tag_len; - if packet_len_unpadded + MAX_PADDING < buf.segment_size() - || buf.datagram_start_offset() + buf.segment_size() - > buf.datagram_max_offset() + if packet_len_unpadded + MAX_PADDING < builder.buf.segment_size() + || builder.buf.datagram_start_offset() + builder.buf.segment_size() + > builder.buf.datagram_max_offset() { trace!( "GSO truncated by demand for {} padding bytes or loss probe", - buf.segment_size() - packet_len_unpadded + builder.buf.segment_size() - packet_len_unpadded ); - builder.finish_and_track(now, self, sent_frames, &mut buf); + builder.finish_and_track(now, self, sent_frames); break; } // Pad the current datagram to GSO segment size so it can be // included in the GSO batch. - builder.pad_to(buf.segment_size() as u16); + builder.pad_to(builder.buf.segment_size() as u16); } - builder.finish_and_track(now, self, sent_frames, &mut buf); + builder.finish_and_track(now, self, sent_frames); if buf.num_datagrams() == 1 { buf.clip_datagram_size(); @@ -865,7 +865,7 @@ impl Connection { if pad_datagram { builder.pad_to(MIN_INITIAL_SIZE); } - builder.finish_and_track(now, self, sent_frames, &mut buf); + builder.finish_and_track(now, self, sent_frames); break; } } @@ -896,12 +896,12 @@ impl Connection { PacketBuilder::new(now, space_id, self.rem_cids.active(), &mut buf, true, self)?; // We implement MTU probes as ping packets padded up to the probe size - buf.write(frame::FrameType::PING); + builder.write(frame::FrameType::PING); self.stats.frame_tx.ping += 1; // If supported by the peer, we want no delays to the probe's ACK if self.peer_supports_ack_frequency() { - buf.write(frame::FrameType::IMMEDIATE_ACK); + builder.write(frame::FrameType::IMMEDIATE_ACK); self.stats.frame_tx.immediate_ack += 1; } @@ -910,7 +910,7 @@ impl Connection { non_retransmits: true, ..Default::default() }; - builder.finish_and_track(now, self, sent_frames, &mut buf); + builder.finish_and_track(now, self, sent_frames); self.stats.path.sent_plpmtud_probes += 1; @@ -1008,8 +1008,8 @@ impl Connection { debug_assert_eq!(buf.datagram_start_offset(), 0); let mut builder = PacketBuilder::new(now, SpaceId::Data, *prev_cid, buf, false, self)?; trace!("validating previous path with PATH_CHALLENGE {:08x}", token); - buf.write(frame::FrameType::PATH_CHALLENGE); - buf.write(token); + builder.write(frame::FrameType::PATH_CHALLENGE); + builder.write(token); self.stats.frame_tx.path_challenge += 1; // An endpoint MUST expand datagrams that contain a PATH_CHALLENGE frame @@ -1018,7 +1018,7 @@ impl Connection { // sending a datagram of this size builder.pad_to(MIN_INITIAL_SIZE); - builder.finish(self, buf); + builder.finish(self); self.stats.udp_tx.on_sent(1, buf.len()); Some(Transmit { diff --git a/quinn-proto/src/connection/packet_builder.rs b/quinn-proto/src/connection/packet_builder.rs index e87de2cc2e..fc3914b3bf 100644 --- a/quinn-proto/src/connection/packet_builder.rs +++ b/quinn-proto/src/connection/packet_builder.rs @@ -2,7 +2,7 @@ use bytes::{BufMut, Bytes}; use rand::Rng; use tracing::{trace, trace_span}; -use super::{Connection, SentFrames, TransmitBuf, spaces::SentPacket}; +use super::{BufLen, Connection, SentFrames, TransmitBuf, spaces::SentPacket}; use crate::{ ConnectionId, Instant, TransportError, TransportErrorCode, connection::ConnectionSide, @@ -10,7 +10,16 @@ use crate::{ packet::{FIXED_BIT, Header, InitialHeader, LongType, PacketNumber, PartialEncode, SpaceId}, }; -pub(super) struct PacketBuilder { +/// QUIC packet builder +/// +/// This allows building QUIC packets: it takes care of writing the header, allows writing +/// frames and on [`PacketBuilder::finalize`] (or [`PacketBuilder::finalize_and_track`]) it +/// encrypts the packet so it is ready to be sent on the wire. +/// +/// The builder manages the write buffer into which the packet is written, and directly +/// implements [`BufMut`] to write frames into the packet. +pub(super) struct PacketBuilder<'a, 'b> { + pub(super) buf: &'a mut TransmitBuf<'b>, pub(super) datagram_start: usize, pub(super) space: SpaceId, pub(super) partial_encode: PartialEncode, @@ -27,7 +36,7 @@ pub(super) struct PacketBuilder { pub(super) _span: tracing::span::EnteredSpan, } -impl PacketBuilder { +impl<'a, 'b> PacketBuilder<'a, 'b> { /// Write a new packet header to `buffer` and determine the packet's properties /// /// Marks the connection drained and returns `None` if the confidentiality limit would be @@ -36,10 +45,13 @@ impl PacketBuilder { now: Instant, space_id: SpaceId, dst_cid: ConnectionId, - buffer: &mut TransmitBuf<'_>, + buffer: &'a mut TransmitBuf<'b>, ack_eliciting: bool, conn: &mut Connection, - ) -> Option { + ) -> Option + where + 'b: 'a, + { let version = conn.version; // Initiate key update if we're approaching the confidentiality limit let sent_with_keys = conn.spaces[space_id].sent_with_keys; @@ -152,8 +164,10 @@ impl PacketBuilder { let max_size = buffer.datagram_max_offset() - tag_len; debug_assert!(max_size >= min_size); + let datagram_start = buffer.datagram_start_offset(); Some(Self { - datagram_start: buffer.datagram_start_offset(), + buf: buffer, + datagram_start, space: space_id, partial_encode, exact_number, @@ -178,17 +192,11 @@ impl PacketBuilder { ); } - pub(super) fn finish_and_track( - self, - now: Instant, - conn: &mut Connection, - sent: SentFrames, - buffer: &mut TransmitBuf<'_>, - ) { + pub(super) fn finish_and_track(self, now: Instant, conn: &mut Connection, sent: SentFrames) { let ack_eliciting = self.ack_eliciting; let exact_number = self.exact_number; let space_id = self.space; - let (size, padded) = self.finish(conn, buffer); + let (size, padded) = self.finish(conn); let size = match padded || ack_eliciting { true => size as u16, @@ -222,15 +230,11 @@ impl PacketBuilder { } /// Encrypt packet, returning the length of the packet and whether padding was added - pub(super) fn finish( - self, - conn: &mut Connection, - buffer: &mut TransmitBuf<'_>, - ) -> (usize, bool) { - let pad = buffer.len() < self.min_size; + pub(super) fn finish(self, conn: &mut Connection) -> (usize, bool) { + let pad = self.buf.len() < self.min_size; if pad { - trace!("PADDING * {}", self.min_size - buffer.len()); - buffer.put_bytes(0, self.min_size - buffer.len()); + trace!("PADDING * {}", self.min_size - self.buf.len()); + self.buf.put_bytes(0, self.min_size - self.buf.len()); } let space = &conn.spaces[self.space]; @@ -249,15 +253,35 @@ impl PacketBuilder { "Mismatching crypto tag len" ); - buffer.put_bytes(0, packet_crypto.tag_len()); + self.buf.put_bytes(0, packet_crypto.tag_len()); let encode_start = self.partial_encode.start; - let packet_buf = &mut buffer.as_mut_slice()[encode_start..]; + let packet_buf = &mut self.buf.as_mut_slice()[encode_start..]; self.partial_encode.finish( packet_buf, header_crypto, Some((self.exact_number, packet_crypto)), ); - (buffer.len() - encode_start, pad) + (self.buf.len() - encode_start, pad) + } +} + +unsafe impl BufMut for PacketBuilder<'_, '_> { + fn remaining_mut(&self) -> usize { + self.buf.remaining_mut() + } + + unsafe fn advance_mut(&mut self, cnt: usize) { + self.buf.advance_mut(cnt); + } + + fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice { + self.buf.chunk_mut() + } +} + +impl BufLen for PacketBuilder<'_, '_> { + fn len(&self) -> usize { + self.buf.len() } } From c23c77228b744f9f720de06665cf2beb36a0ded0 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Tue, 1 Apr 2025 13:17:51 +0200 Subject: [PATCH 11/19] Make PacketBuilder manage available frame space This moves keeping track of the available frame space to the packet builder. The available space is now encoded into the BufMut returned by PacketBuilder::frame_space_mut(). This removes the need for the BufLen trait in many of the places writing frames. --- quinn-proto/src/connection/datagrams.rs | 6 +- quinn-proto/src/connection/mod.rs | 38 ++++++------ quinn-proto/src/connection/packet_builder.rs | 27 ++++++++- quinn-proto/src/connection/streams/state.rs | 61 ++++++++++---------- 4 files changed, 74 insertions(+), 58 deletions(-) diff --git a/quinn-proto/src/connection/datagrams.rs b/quinn-proto/src/connection/datagrams.rs index 8e9fea0bdc..1d38361e33 100644 --- a/quinn-proto/src/connection/datagrams.rs +++ b/quinn-proto/src/connection/datagrams.rs @@ -4,7 +4,7 @@ use bytes::{BufMut, Bytes}; use thiserror::Error; use tracing::{debug, trace}; -use super::{BufLen, Connection}; +use super::Connection; use crate::{ TransportError, frame::{Datagram, FrameStruct}, @@ -163,13 +163,13 @@ impl DatagramState { /// /// Returns whether a frame was written. At most `max_size` bytes will be written, including /// framing. - pub(super) fn write(&mut self, buf: &mut (impl BufMut + BufLen), max_size: usize) -> bool { + pub(super) fn write(&mut self, buf: &mut impl BufMut) -> bool { let datagram = match self.outgoing.pop_front() { Some(x) => x, None => return false, }; - if buf.len() + datagram.size(true) > max_size { + if buf.remaining_mut() < datagram.size(true) { // Future work: we could be more clever about cramming small datagrams into // mostly-full packets when a larger one is queued first self.outgoing.push_front(datagram); diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 022ec66fda..a8e53639d8 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -652,11 +652,11 @@ impl Connection { // to encode the ConnectionClose frame too. However we still have the // check here to prevent crashes if something changes. debug_assert!( - builder.buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size, + builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND, "ACKs should leave space for ConnectionClose" ); - if builder.buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size { - let max_frame_size = builder.max_size - builder.buf.len(); + if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() { + let max_frame_size = builder.frame_space_remaining(); match self.state { State::Closed(state::Closed { ref reason }) => { if space_id == SpaceId::Data || reason.is_transport_layer() { @@ -728,9 +728,8 @@ impl Connection { } let sent_frames = { - let max_size = builder.max_size; let pn = builder.exact_number; - self.populate_packet(now, space_id, &mut builder, max_size, pn) + self.populate_packet(now, space_id, &mut builder.frame_space_mut(), pn) }; // ACK-only packets should only be sent when explicitly allowed. If we write them due to @@ -3044,8 +3043,7 @@ impl Connection { &mut self, now: Instant, space_id: SpaceId, - buf: &mut (impl BufMut + BufLen), - max_size: usize, + buf: &mut impl BufMut, pn: u64, ) -> SentFrames { let mut sent = SentFrames::default(); @@ -3121,7 +3119,7 @@ impl Connection { } // PATH_CHALLENGE - if buf.len() + 9 < max_size && space_id == SpaceId::Data { + if buf.remaining_mut() > 9 && space_id == SpaceId::Data { // Transmit challenges with every outgoing frame on an unvalidated path if let Some(token) = self.path.challenge { // But only send a packet solely for that purpose at most once @@ -3136,7 +3134,7 @@ impl Connection { } // PATH_RESPONSE - if buf.len() + 9 < max_size && space_id == SpaceId::Data { + if buf.remaining_mut() > 9 && space_id == SpaceId::Data { if let Some(token) = self.path_responses.pop_on_path(self.path.remote) { sent.non_retransmits = true; sent.requires_padding = true; @@ -3148,7 +3146,7 @@ impl Connection { } // CRYPTO - while buf.len() + frame::Crypto::SIZE_BOUND < max_size && !is_0rtt { + while buf.remaining_mut() > frame::Crypto::SIZE_BOUND && !is_0rtt { let mut frame = match space.pending.crypto.pop_front() { Some(x) => x, None => break, @@ -3158,8 +3156,7 @@ impl Connection { // Since the offset is known, we can reserve the exact size required to encode it. // For length we reserve 2bytes which allows to encode up to 2^14, // which is more than what fits into normally sized QUIC frames. - let max_crypto_data_size = max_size - - buf.len() + let max_crypto_data_size = buf.remaining_mut() - 1 // Frame Type - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) }) - 2; // Maximum encoded length for frame size, given we send less than 2^14 bytes @@ -3195,12 +3192,11 @@ impl Connection { &mut space.pending, &mut sent.retransmits, &mut self.stats.frame_tx, - max_size, ); } // NEW_CONNECTION_ID - while buf.len() + 44 < max_size { + while buf.remaining_mut() > 44 { let issued = match space.pending.new_cids.pop() { Some(x) => x, None => break, @@ -3222,7 +3218,7 @@ impl Connection { } // RETIRE_CONNECTION_ID - while buf.len() + frame::RETIRE_CONNECTION_ID_SIZE_BOUND < max_size { + while buf.remaining_mut() > frame::RETIRE_CONNECTION_ID_SIZE_BOUND { let seq = match space.pending.retire_cids.pop() { Some(x) => x, None => break, @@ -3236,8 +3232,8 @@ impl Connection { // DATAGRAM let mut sent_datagrams = false; - while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data { - match self.datagrams.write(buf, max_size) { + while buf.remaining_mut() > Datagram::SIZE_BOUND && space_id == SpaceId::Data { + match self.datagrams.write(buf) { true => { sent_datagrams = true; sent.non_retransmits = true; @@ -3277,7 +3273,7 @@ impl Connection { token: token.encode(&*server_config.token_key).into(), }; - if buf.len() + new_token.size() >= max_size { + if buf.remaining_mut() < new_token.size() { space.pending.new_tokens.push(remote_addr); break; } @@ -3292,9 +3288,9 @@ impl Connection { // STREAM if space_id == SpaceId::Data { - sent.stream_frames = - self.streams - .write_stream_frames(buf, max_size, self.config.send_fairness); + sent.stream_frames = self + .streams + .write_stream_frames(buf, self.config.send_fairness); self.stats.frame_tx.stream += sent.stream_frames.len() as u64; } diff --git a/quinn-proto/src/connection/packet_builder.rs b/quinn-proto/src/connection/packet_builder.rs index fc3914b3bf..4bb274368d 100644 --- a/quinn-proto/src/connection/packet_builder.rs +++ b/quinn-proto/src/connection/packet_builder.rs @@ -29,8 +29,9 @@ pub(super) struct PacketBuilder<'a, 'b> { /// Smallest absolute position in the associated buffer that must be occupied by this packet's /// frames pub(super) min_size: usize, - /// Largest absolute position in the associated buffer that may be occupied by this packet's - /// frames + /// Largest absolute position in the buffer that may be occupied by this packet's frames + /// + /// This takes the size of the cryptographic tag into account. pub(super) max_size: usize, pub(super) tag_len: usize, pub(super) _span: tracing::span::EnteredSpan, @@ -192,6 +193,15 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { ); } + /// Returns a writable buffer limited to the remaining frame space + /// + /// The [`BufMut::remaining_mut`] call on the returned buffer indicates the amount of + /// space available to write QUIC frames into. + // In rust 1.82 we can use `-> impl BufMut + use<'_, 'a, 'b>` + pub(super) fn frame_space_mut(&mut self) -> bytes::buf::Limit<&mut Self> { + self.limit(self.frame_space_remaining()) + } + pub(super) fn finish_and_track(self, now: Instant, conn: &mut Connection, sent: SentFrames) { let ack_eliciting = self.ack_eliciting; let exact_number = self.exact_number; @@ -231,6 +241,10 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { /// Encrypt packet, returning the length of the packet and whether padding was added pub(super) fn finish(self, conn: &mut Connection) -> (usize, bool) { + debug_assert!( + self.buf.len() <= self.max_size, + "packet exceeds maximum size" + ); let pad = self.buf.len() < self.min_size; if pad { trace!("PADDING * {}", self.min_size - self.buf.len()); @@ -264,6 +278,15 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { (self.buf.len() - encode_start, pad) } + + /// Returns the remaining space in the packet that can be taken up by QUIC frames + /// + /// This leaves space in the datagram for the cryptographic tag that needs to be written + /// when the packet is finished. + pub(super) fn frame_space_remaining(&self) -> usize { + debug_assert!(self.max_size >= self.buf.len(), "packet exceeds bounds"); + self.max_size.saturating_sub(self.buf.len()) + } } unsafe impl BufMut for PacketBuilder<'_, '_> { diff --git a/quinn-proto/src/connection/streams/state.rs b/quinn-proto/src/connection/streams/state.rs index d6be209236..fe30e1bd2b 100644 --- a/quinn-proto/src/connection/streams/state.rs +++ b/quinn-proto/src/connection/streams/state.rs @@ -15,7 +15,7 @@ use super::{ use crate::{ Dir, MAX_STREAM_COUNT, Side, StreamId, TransportError, VarInt, coding::BufMutExt, - connection::{BufLen, stats::FrameStats}, + connection::stats::FrameStats, frame::{self, FrameStruct, StreamMetaVec}, transport_parameters::TransportParameters, }; @@ -411,14 +411,13 @@ impl StreamsState { pub(in crate::connection) fn write_control_frames( &mut self, - buf: &mut (impl BufMut + BufLen), + buf: &mut impl BufMut, pending: &mut Retransmits, retransmits: &mut ThinRetransmits, stats: &mut FrameStats, - max_size: usize, ) { // RESET_STREAM - while buf.len() + frame::ResetStream::SIZE_BOUND < max_size { + while buf.remaining_mut() > frame::ResetStream::SIZE_BOUND { let (id, error_code) = match pending.reset_stream.pop() { Some(x) => x, None => break, @@ -442,7 +441,7 @@ impl StreamsState { } // STOP_SENDING - while buf.len() + frame::StopSending::SIZE_BOUND < max_size { + while buf.remaining_mut() > frame::StopSending::SIZE_BOUND { let frame = match pending.stop_sending.pop() { Some(x) => x, None => break, @@ -461,7 +460,7 @@ impl StreamsState { } // MAX_DATA - if pending.max_data && buf.len() + 9 < max_size { + if pending.max_data && buf.remaining_mut() > 9 { pending.max_data = false; // `local_max_data` can grow bigger than `VarInt`. @@ -484,7 +483,7 @@ impl StreamsState { } // MAX_STREAM_DATA - while buf.len() + 17 < max_size { + while buf.remaining_mut() > 17 { let id = match pending.max_stream_data.iter().next() { Some(x) => *x, None => break, @@ -516,7 +515,7 @@ impl StreamsState { // MAX_STREAMS for dir in Dir::iter() { - if !pending.max_stream_id[dir as usize] || buf.len() + 9 >= max_size { + if !pending.max_stream_id[dir as usize] || buf.remaining_mut() <= 9 { continue; } @@ -541,21 +540,14 @@ impl StreamsState { pub(crate) fn write_stream_frames( &mut self, - buf: &mut (impl BufMut + BufLen), - max_buf_size: usize, + buf: &mut impl BufMut, fair: bool, ) -> StreamMetaVec { let mut stream_frames = StreamMetaVec::new(); - while buf.len() + frame::Stream::SIZE_BOUND < max_buf_size { - if max_buf_size - .checked_sub(buf.len() + frame::Stream::SIZE_BOUND) - .is_none() - { - break; - } - - // Pop the stream of the highest priority that currently has pending data - // If the stream still has some pending data left after writing, it will be reinserted, otherwise not + while buf.remaining_mut() > frame::Stream::SIZE_BOUND { + // Pop the stream of the highest priority that currently has pending data. If + // the stream still has some pending data left after writing, it will be + // reinserted, otherwise not let Some(stream) = self.pending.pop() else { break; }; @@ -577,7 +569,7 @@ impl StreamsState { // Now that we know the `StreamId`, we can better account for how many bytes // are required to encode it. - let max_buf_size = max_buf_size - buf.len() - 1 - VarInt::size(id.into()); + let max_buf_size = buf.remaining_mut() - 1 - VarInt::size(id.into()); let (offsets, encode_length) = stream.pending.poll_transmit(max_buf_size); let fin = offsets.end == stream.pending.offset() && matches!(stream.state, SendState::DataSent { .. }); @@ -1380,7 +1372,7 @@ mod tests { high.write(b"high").unwrap(); let mut buf = Vec::with_capacity(40); - let meta = server.write_stream_frames(&mut buf, 40, true); + let meta = server.write_stream_frames(&mut buf, true); assert_eq!(meta[0].id, id_high); assert_eq!(meta[1].id, id_mid); assert_eq!(meta[2].id, id_low); @@ -1438,16 +1430,18 @@ mod tests { }; high.set_priority(-1).unwrap(); - let mut buf = Vec::with_capacity(1000); - let meta = server.write_stream_frames(&mut buf, 40, true); + let mut buf = Vec::with_capacity(1000).limit(40); + let meta = server.write_stream_frames(&mut buf, true); assert_eq!(meta.len(), 1); assert_eq!(meta[0].id, id_high); // After requeuing we should end up with 2 priorities - not 3 assert_eq!(server.pending.len(), 2); + let mut buf = buf.into_inner(); + // Send the remaining data. The initial mid priority one should go first now - let meta = server.write_stream_frames(&mut buf, 1000, true); + let meta = server.write_stream_frames(&mut buf, true); assert_eq!(meta.len(), 2); assert_eq!(meta[0].id, id_mid); assert_eq!(meta[1].id, id_high); @@ -1507,12 +1501,13 @@ mod tests { // loop until all the streams are written loop { - let buf_len = buf.len(); - let meta = server.write_stream_frames(&mut buf, buf_len + 40, fair); + let mut chunk_buf = buf.limit(40); + let meta = server.write_stream_frames(&mut chunk_buf, fair); if meta.is_empty() { break; } metas.extend(meta); + buf = chunk_buf.into_inner(); } assert!(!server.can_send_stream_data()); @@ -1575,11 +1570,12 @@ mod tests { stream_b.write(&[b'b'; 100]).unwrap(); let mut metas = vec![]; - let mut buf = Vec::with_capacity(1024); + let buf = Vec::with_capacity(1024); // Write the first chunk of stream_a - let buf_len = buf.len(); - let meta = server.write_stream_frames(&mut buf, buf_len + 40, false); + let mut chunk_buf = buf.limit(40); + let meta = server.write_stream_frames(&mut chunk_buf, false); + let mut buf = chunk_buf.into_inner(); assert!(!meta.is_empty()); metas.extend(meta); @@ -1595,8 +1591,9 @@ mod tests { // loop until all the streams are written loop { - let buf_len = buf.len(); - let meta = server.write_stream_frames(&mut buf, buf_len + 40, false); + let mut chunk_buf = buf.limit(40); + let meta = server.write_stream_frames(&mut chunk_buf, false); + buf = chunk_buf.into_inner(); if meta.is_empty() { break; } From 30a3e4031f1b098cc9f2914596f348b9dc78e426 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Tue, 1 Apr 2025 14:52:35 +0200 Subject: [PATCH 12/19] Remove BufMut impl on PacketBuilder again Now that the PacketBuilder::frame_space_mut exists the direct BufMut impl on it can be removed. Nothing external needs to directly write into the packet buffer outside of the frame space. --- quinn-proto/src/connection/mod.rs | 40 +++++++++----------- quinn-proto/src/connection/packet_builder.rs | 34 ++++++----------- 2 files changed, 28 insertions(+), 46 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index a8e53639d8..c813d5d00f 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -643,7 +643,7 @@ impl Connection { self.receiving_ecn, &mut sent_frames, &mut self.spaces[space_id], - &mut builder, + &mut builder.frame_space_mut(), &mut self.stats, ); } @@ -660,14 +660,14 @@ impl Connection { match self.state { State::Closed(state::Closed { ref reason }) => { if space_id == SpaceId::Data || reason.is_transport_layer() { - reason.encode(&mut builder, max_frame_size) + reason.encode(&mut builder.frame_space_mut(), max_frame_size) } else { frame::ConnectionClose { error_code: TransportErrorCode::APPLICATION_ERROR, frame_type: None, reason: Bytes::new(), } - .encode(&mut builder, max_frame_size) + .encode(&mut builder.frame_space_mut(), max_frame_size) } } State::Draining => frame::ConnectionClose { @@ -675,7 +675,7 @@ impl Connection { frame_type: None, reason: Bytes::new(), } - .encode(&mut builder, max_frame_size), + .encode(&mut builder.frame_space_mut(), max_frame_size), _ => unreachable!( "tried to make a close packet when the connection wasn't closed" ), @@ -704,8 +704,10 @@ impl Connection { if space_id == SpaceId::Data && builder.buf.num_datagrams() == 1 { if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) { trace!("PATH_RESPONSE {:08x} (off-path)", token); - builder.write(frame::FrameType::PATH_RESPONSE); - builder.write(token); + builder + .frame_space_mut() + .write(frame::FrameType::PATH_RESPONSE); + builder.frame_space_mut().write(token); self.stats.frame_tx.path_response += 1; builder.pad_to(MIN_INITIAL_SIZE); builder.finish_and_track( @@ -772,22 +774,10 @@ impl Connection { // might be needed because of the packet type, or to fill the GSO segment size. next_space_id = self.next_send_space(space_id, builder.buf, close); if let Some(next_space_id) = next_space_id { - // Can we append another packet into the current datagram? - let buf_end = builder.len().max(builder.min_size) + builder.tag_len; - let tag_len = if let Some(ref crypto) = self.spaces[next_space_id].crypto { - crypto.packet.local.tag_len() - } else if next_space_id == SpaceId::Data { - self.zero_rtt_crypto.as_ref().expect( - "sending packets in the application data space requires known 0-RTT or 1-RTT keys", - ).packet.tag_len() - } else { - unreachable!("tried to send {:?} packet without keys", next_space_id); - }; - // Are we allowed to coalesce AND is there enough space for another *packet* // in this datagram? if coalesce - && builder.buf.datagram_max_offset() - buf_end > MIN_PACKET_SPACE + tag_len + && builder.buf.segment_size() - builder.predict_packet_size() > MIN_PACKET_SPACE { // We can append/coalesce the next packet into the current // datagram. Finish the current packet without adding extra padding. @@ -895,12 +885,14 @@ impl Connection { PacketBuilder::new(now, space_id, self.rem_cids.active(), &mut buf, true, self)?; // We implement MTU probes as ping packets padded up to the probe size - builder.write(frame::FrameType::PING); + builder.frame_space_mut().write(frame::FrameType::PING); self.stats.frame_tx.ping += 1; // If supported by the peer, we want no delays to the probe's ACK if self.peer_supports_ack_frequency() { - builder.write(frame::FrameType::IMMEDIATE_ACK); + builder + .frame_space_mut() + .write(frame::FrameType::IMMEDIATE_ACK); self.stats.frame_tx.immediate_ack += 1; } @@ -1007,8 +999,10 @@ impl Connection { debug_assert_eq!(buf.datagram_start_offset(), 0); let mut builder = PacketBuilder::new(now, SpaceId::Data, *prev_cid, buf, false, self)?; trace!("validating previous path with PATH_CHALLENGE {:08x}", token); - builder.write(frame::FrameType::PATH_CHALLENGE); - builder.write(token); + builder + .frame_space_mut() + .write(frame::FrameType::PATH_CHALLENGE); + builder.frame_space_mut().write(token); self.stats.frame_tx.path_challenge += 1; // An endpoint MUST expand datagrams that contain a PATH_CHALLENGE frame diff --git a/quinn-proto/src/connection/packet_builder.rs b/quinn-proto/src/connection/packet_builder.rs index 4bb274368d..ec072e65a3 100644 --- a/quinn-proto/src/connection/packet_builder.rs +++ b/quinn-proto/src/connection/packet_builder.rs @@ -2,7 +2,7 @@ use bytes::{BufMut, Bytes}; use rand::Rng; use tracing::{trace, trace_span}; -use super::{BufLen, Connection, SentFrames, TransmitBuf, spaces::SentPacket}; +use super::{Connection, SentFrames, TransmitBuf, spaces::SentPacket}; use crate::{ ConnectionId, Instant, TransportError, TransportErrorCode, connection::ConnectionSide, @@ -198,8 +198,8 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { /// The [`BufMut::remaining_mut`] call on the returned buffer indicates the amount of /// space available to write QUIC frames into. // In rust 1.82 we can use `-> impl BufMut + use<'_, 'a, 'b>` - pub(super) fn frame_space_mut(&mut self) -> bytes::buf::Limit<&mut Self> { - self.limit(self.frame_space_remaining()) + pub(super) fn frame_space_mut(&mut self) -> bytes::buf::Limit<&mut TransmitBuf<'b>> { + self.buf.limit(self.frame_space_remaining()) } pub(super) fn finish_and_track(self, now: Instant, conn: &mut Connection, sent: SentFrames) { @@ -279,6 +279,14 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { (self.buf.len() - encode_start, pad) } + /// Predicts the size of the packet if it were finished now without additional padding + /// + /// This will include any padding which is required to make the size large enough to be + /// encrypted correctly. + pub(super) fn predict_packet_size(&self) -> usize { + self.buf.len().max(self.min_size) - self.buf.datagram_start_offset() + self.tag_len + } + /// Returns the remaining space in the packet that can be taken up by QUIC frames /// /// This leaves space in the datagram for the cryptographic tag that needs to be written @@ -288,23 +296,3 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { self.max_size.saturating_sub(self.buf.len()) } } - -unsafe impl BufMut for PacketBuilder<'_, '_> { - fn remaining_mut(&self) -> usize { - self.buf.remaining_mut() - } - - unsafe fn advance_mut(&mut self, cnt: usize) { - self.buf.advance_mut(cnt); - } - - fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice { - self.buf.chunk_mut() - } -} - -impl BufLen for PacketBuilder<'_, '_> { - fn len(&self) -> usize { - self.buf.len() - } -} From 77cff2a033f593e816cf04ec61d3f060aa27ad69 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Tue, 1 Apr 2025 15:04:10 +0200 Subject: [PATCH 13/19] Move BufLen trait to where it is used This trait is still the simplest way for the PartialEncode to keep track of the length of the header it just wrote. --- quinn-proto/src/connection/mod.rs | 17 ----------------- quinn-proto/src/connection/transmit_buf.rs | 2 +- quinn-proto/src/packet.rs | 18 +++++++++++++++++- 3 files changed, 18 insertions(+), 19 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index c813d5d00f..44348103d7 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -3970,23 +3970,6 @@ fn negotiate_max_idle_timeout(x: Option, y: Option) -> Option usize; -} - -impl BufLen for Vec { - fn len(&self) -> usize { - self.len() - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/quinn-proto/src/connection/transmit_buf.rs b/quinn-proto/src/connection/transmit_buf.rs index 61d3217c34..b017303eff 100644 --- a/quinn-proto/src/connection/transmit_buf.rs +++ b/quinn-proto/src/connection/transmit_buf.rs @@ -1,6 +1,6 @@ use bytes::BufMut; -use super::BufLen; +use crate::packet::BufLen; /// The buffer in which to write datagrams for [`Connection::poll_transmit`] /// diff --git a/quinn-proto/src/packet.rs b/quinn-proto/src/packet.rs index d5bfd38f6c..820668777d 100644 --- a/quinn-proto/src/packet.rs +++ b/quinn-proto/src/packet.rs @@ -6,7 +6,6 @@ use thiserror::Error; use crate::{ ConnectionId, coding::{self, BufExt, BufMutExt}, - connection::BufLen, crypto, }; @@ -220,6 +219,23 @@ impl PartialDecode { } } +/// A buffer that can tell how much has been written to it already +/// +/// This is commonly used for when a buffer is passed and the user may not write past a +/// given size. It allows the user of such a buffer to know the current cursor position in +/// the buffer. The maximum write size is usually passed in the same unit as +/// [`BufLen::len`]: bytes since the buffer start. +pub(crate) trait BufLen { + /// Returns the number of bytes written into the buffer so far + fn len(&self) -> usize; +} + +impl BufLen for Vec { + fn len(&self) -> usize { + self.len() + } +} + pub(crate) struct Packet { pub(crate) header: Header, pub(crate) header_data: Bytes, From 5626e5f747bd9044bc49bfbb9acea463057dba34 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Tue, 1 Apr 2025 15:11:57 +0200 Subject: [PATCH 14/19] Remove PacketBuilder::datagram_start It no longer needs to keep track of this field because the TransmitBuf already does. Removing duplicate state is nice. --- quinn-proto/src/connection/mod.rs | 3 +-- quinn-proto/src/connection/packet_builder.rs | 5 +---- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 44348103d7..e5189ba91f 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -755,8 +755,7 @@ impl Connection { !(sent_frames.is_ack_only(&self.streams) && !can_send.acks && can_send.other - && (builder.buf.datagram_max_offset() - builder.datagram_start) - == self.path.current_mtu() as usize + && builder.buf.segment_size() == self.path.current_mtu() as usize && self.datagrams.outgoing.is_empty()), "SendableFrames was {can_send:?}, but only ACKs have been written" ); diff --git a/quinn-proto/src/connection/packet_builder.rs b/quinn-proto/src/connection/packet_builder.rs index ec072e65a3..664f6a58a6 100644 --- a/quinn-proto/src/connection/packet_builder.rs +++ b/quinn-proto/src/connection/packet_builder.rs @@ -20,7 +20,6 @@ use crate::{ /// implements [`BufMut`] to write frames into the packet. pub(super) struct PacketBuilder<'a, 'b> { pub(super) buf: &'a mut TransmitBuf<'b>, - pub(super) datagram_start: usize, pub(super) space: SpaceId, pub(super) partial_encode: PartialEncode, pub(super) ack_eliciting: bool, @@ -165,10 +164,8 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { let max_size = buffer.datagram_max_offset() - tag_len; debug_assert!(max_size >= min_size); - let datagram_start = buffer.datagram_start_offset(); Some(Self { buf: buffer, - datagram_start, space: space_id, partial_encode, exact_number, @@ -189,7 +186,7 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { // already. self.min_size = Ord::max( self.min_size, - self.datagram_start + (min_size as usize) - self.tag_len, + self.buf.datagram_start_offset() + (min_size as usize) - self.tag_len, ); } From 3661234f04ba526f2797850fe490f5aa70632630 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Tue, 1 Apr 2025 15:28:07 +0200 Subject: [PATCH 15/19] Use encapsulated method instead of builder internals This logic is internal business of the PacketBuilder. Use the provided abstraction. --- quinn-proto/src/connection/mod.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index e5189ba91f..5d2a1ed47c 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -803,9 +803,7 @@ impl Connection { // are the only packets for which we might grow `buf_capacity` // by less than `segment_size`. const MAX_PADDING: usize = 16; - let packet_len_unpadded = cmp::max(builder.min_size, builder.buf.len()) - - builder.buf.datagram_start_offset() - + builder.tag_len; + let packet_len_unpadded = builder.predict_packet_size(); if packet_len_unpadded + MAX_PADDING < builder.buf.segment_size() || builder.buf.datagram_start_offset() + builder.buf.segment_size() > builder.buf.datagram_max_offset() From 1744eb6c860d0cdaaa9ef0b14adbe8a7ccbfe03e Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Tue, 1 Apr 2025 18:48:23 +0200 Subject: [PATCH 16/19] Remove explicit state of PacketBuider::max_size This is duplicate information that is already available. --- quinn-proto/src/connection/packet_builder.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/quinn-proto/src/connection/packet_builder.rs b/quinn-proto/src/connection/packet_builder.rs index 664f6a58a6..74827bda29 100644 --- a/quinn-proto/src/connection/packet_builder.rs +++ b/quinn-proto/src/connection/packet_builder.rs @@ -28,10 +28,6 @@ pub(super) struct PacketBuilder<'a, 'b> { /// Smallest absolute position in the associated buffer that must be occupied by this packet's /// frames pub(super) min_size: usize, - /// Largest absolute position in the buffer that may be occupied by this packet's frames - /// - /// This takes the size of the cryptographic tag into account. - pub(super) max_size: usize, pub(super) tag_len: usize, pub(super) _span: tracing::span::EnteredSpan, } @@ -171,7 +167,6 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { exact_number, short_header: header.is_short(), min_size, - max_size, tag_len, ack_eliciting, _span: span, @@ -239,7 +234,7 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { /// Encrypt packet, returning the length of the packet and whether padding was added pub(super) fn finish(self, conn: &mut Connection) -> (usize, bool) { debug_assert!( - self.buf.len() <= self.max_size, + self.buf.len() <= self.buf.datagram_max_offset() - self.tag_len, "packet exceeds maximum size" ); let pad = self.buf.len() < self.min_size; @@ -289,7 +284,7 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { /// This leaves space in the datagram for the cryptographic tag that needs to be written /// when the packet is finished. pub(super) fn frame_space_remaining(&self) -> usize { - debug_assert!(self.max_size >= self.buf.len(), "packet exceeds bounds"); - self.max_size.saturating_sub(self.buf.len()) + let max_offset = self.buf.datagram_max_offset() - self.tag_len; + max_offset.saturating_sub(self.buf.len()) } } From 2c60cfc3e8cbce60a65718e9bd64a1c32c428f1b Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 2 Apr 2025 13:07:12 +0200 Subject: [PATCH 17/19] Improve comments --- quinn-proto/src/connection/mod.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 5d2a1ed47c..271c9f60eb 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -498,15 +498,25 @@ impl Connection { && self.peer_supports_ack_frequency(); } + // Whether this packet can be coalesced with another one in the same datagram. let mut coalesce = true; + + // Whether the last packet in the datagram must be padded to at least + // MIN_INITIAL_SIZE. let mut pad_datagram = false; + + // Whether congestion control stopped the next packet from being sent. Further + // packets could still be built, as e.g. tail-loss probes are not congestion + // limited. let mut congestion_blocked = false; + + // The packet number of the last built packet. let mut last_packet_number = None; // Iterate over all spaces and find data to send // - // Each loop builds one packet. When packets are coalesced a datagram is filled - // over multiple loops. + // Each loop builds one packet, which is finished before the next iteration of the + // loop. When packets are coalesced a datagram is filled over multiple loops. let mut next_space_id = self.next_send_space(SpaceId::Initial, &buf, close); while let Some(space_id) = next_space_id { // Whether the next packet will contain ack-eliciting frames. From 4c628d45dd25cbdd88697eb005b5246ef0ee319d Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Thu, 3 Apr 2025 10:57:59 +0200 Subject: [PATCH 18/19] Reduce usage of absolute offsets in TransmitBuf This starts removing the usage of absolute offsets in the TransmitBuf. It simplifies the resulting logic in the poll_transmit function. The absolute offsets are still available since the PacketBuilder still uses those. --- quinn-proto/src/connection/mod.rs | 20 +++++++++++--------- quinn-proto/src/connection/packet_builder.rs | 10 ++++++---- quinn-proto/src/connection/transmit_buf.rs | 18 ++++++++++++++++++ 3 files changed, 35 insertions(+), 13 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 271c9f60eb..e040bd4ef7 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -532,7 +532,7 @@ impl Connection { } // If the datagram is full, we need to start a new one. - if buf.len() == buf.datagram_max_offset() { + if buf.datagram_remaining_mut() == 0 { // Is 1 more datagram allowed? if buf.num_datagrams() >= buf.max_datagrams() { // No more datagrams allowed @@ -607,7 +607,7 @@ impl Connection { pad_datagram = false; } - debug_assert!(buf.datagram_max_offset() - buf.len() >= MIN_PACKET_SPACE); + debug_assert!(buf.datagram_remaining_mut() >= MIN_PACKET_SPACE); // // From here on, we've determined that a packet will definitely be sent. @@ -786,7 +786,11 @@ impl Connection { // Are we allowed to coalesce AND is there enough space for another *packet* // in this datagram? if coalesce - && builder.buf.segment_size() - builder.predict_packet_size() > MIN_PACKET_SPACE + && builder + .buf + .datagram_remaining_mut() + .saturating_sub(builder.predict_packet_end()) + > MIN_PACKET_SPACE { // We can append/coalesce the next packet into the current // datagram. Finish the current packet without adding extra padding. @@ -813,14 +817,12 @@ impl Connection { // are the only packets for which we might grow `buf_capacity` // by less than `segment_size`. const MAX_PADDING: usize = 16; - let packet_len_unpadded = builder.predict_packet_size(); - if packet_len_unpadded + MAX_PADDING < builder.buf.segment_size() - || builder.buf.datagram_start_offset() + builder.buf.segment_size() - > builder.buf.datagram_max_offset() + if builder.buf.datagram_remaining_mut() + > builder.predict_packet_end() + MAX_PADDING { trace!( - "GSO truncated by demand for {} padding bytes or loss probe", - builder.buf.segment_size() - packet_len_unpadded + "GSO truncated by demand for {} padding bytes", + builder.buf.datagram_remaining_mut() - builder.predict_packet_end() ); builder.finish_and_track(now, self, sent_frames); break; diff --git a/quinn-proto/src/connection/packet_builder.rs b/quinn-proto/src/connection/packet_builder.rs index 74827bda29..cdb0baaa0b 100644 --- a/quinn-proto/src/connection/packet_builder.rs +++ b/quinn-proto/src/connection/packet_builder.rs @@ -268,15 +268,17 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { Some((self.exact_number, packet_crypto)), ); - (self.buf.len() - encode_start, pad) + let packet_len = self.buf.len() - encode_start; + trace!(size = %packet_len, short_header = %self.short_header, "wrote packet"); + (packet_len, pad) } - /// Predicts the size of the packet if it were finished now without additional padding + /// The number of additional bytes the current packet would take up if it was finished now /// /// This will include any padding which is required to make the size large enough to be /// encrypted correctly. - pub(super) fn predict_packet_size(&self) -> usize { - self.buf.len().max(self.min_size) - self.buf.datagram_start_offset() + self.tag_len + pub(super) fn predict_packet_end(&self) -> usize { + self.buf.len().max(self.min_size) + self.tag_len - self.buf.len() } /// Returns the remaining space in the packet that can be taken up by QUIC frames diff --git a/quinn-proto/src/connection/transmit_buf.rs b/quinn-proto/src/connection/transmit_buf.rs index b017303eff..df59a89ba4 100644 --- a/quinn-proto/src/connection/transmit_buf.rs +++ b/quinn-proto/src/connection/transmit_buf.rs @@ -1,4 +1,5 @@ use bytes::BufMut; +use tracing::trace; use crate::packet::BufLen; @@ -138,6 +139,13 @@ impl<'a> TransmitBuf<'a> { /// the last datagram in a batch. pub(super) fn clip_datagram_size(&mut self) { debug_assert_eq!(self.num_datagrams, 1); + if self.buf.len() < self.segment_size { + trace!( + segment_size = self.buf.len(), + prev_segment_size = self.segment_size, + "clipped datagram size" + ); + } self.segment_size = self.buf.len(); self.buf_capacity = self.buf.len(); } @@ -147,6 +155,11 @@ impl<'a> TransmitBuf<'a> { /// This is also the maximum size datagrams are allowed to be. The first and last /// datagram in a batch are allowed to be smaller however. After the first datagram the /// segment size is clipped to the size of the first datagram. + /// + /// If the last datagram was created using [`TransmitBuf::start_new_datagram_with_size`] + /// the the segment size will be greater than the current datagram is allowed to be. + /// Thus [`TransmitBuf::datagram_remaining_mut`] should be used if you need to know the + /// amount of data that can be written into the datagram. pub(super) fn segment_size(&self) -> usize { self.segment_size } @@ -178,6 +191,11 @@ impl<'a> TransmitBuf<'a> { self.buf_capacity } + /// Returns the number of bytes that may still be written into this datagram + pub(super) fn datagram_remaining_mut(&self) -> usize { + self.buf_capacity.saturating_sub(self.buf.len()) + } + /// Returns `true` if the buffer did not have anything written into it pub(super) fn is_empty(&self) -> bool { self.len() == 0 From d00b7a0bca749f5f151cd7b25992d50a4667d206 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Thu, 3 Apr 2025 12:01:12 +0200 Subject: [PATCH 19/19] Move pad_datagram to PacketBuilder::finish_and_track You always need to remember to handle pad_datagram if needed. While really this always happens just before the call to finish_and_track. Instead this can be done in finish_and_track without any logic change, and this helps avoiding mistakes. --- quinn-proto/src/connection/mod.rs | 22 +++++++------------- quinn-proto/src/connection/packet_builder.rs | 13 ++++++++++-- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index e040bd4ef7..7aa0ce68dd 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -691,10 +691,7 @@ impl Connection { ), } } - if pad_datagram { - builder.pad_to(MIN_INITIAL_SIZE); - } - builder.finish_and_track(now, self, sent_frames); + builder.finish_and_track(now, self, sent_frames, pad_datagram); if space_id == self.highest_space { // Don't send another close packet self.close = false; @@ -727,6 +724,7 @@ impl Connection { non_retransmits: true, ..SentFrames::default() }, + false, ); self.stats.udp_tx.on_sent(1, buf.len()); return Some(Transmit { @@ -794,13 +792,10 @@ impl Connection { { // We can append/coalesce the next packet into the current // datagram. Finish the current packet without adding extra padding. - builder.finish_and_track(now, self, sent_frames); + builder.finish_and_track(now, self, sent_frames, false); } else { // We need a new datagram for the next packet. Finish the current // packet with padding. - if pad_datagram { - builder.pad_to(MIN_INITIAL_SIZE); - } if builder.buf.num_datagrams() > 1 { // If too many padding bytes would be required to continue the // GSO batch after this packet, end the GSO batch here. Ensures @@ -824,7 +819,7 @@ impl Connection { "GSO truncated by demand for {} padding bytes", builder.buf.datagram_remaining_mut() - builder.predict_packet_end() ); - builder.finish_and_track(now, self, sent_frames); + builder.finish_and_track(now, self, sent_frames, pad_datagram); break; } @@ -833,7 +828,7 @@ impl Connection { builder.pad_to(builder.buf.segment_size() as u16); } - builder.finish_and_track(now, self, sent_frames); + builder.finish_and_track(now, self, sent_frames, pad_datagram); if buf.num_datagrams() == 1 { buf.clip_datagram_size(); @@ -860,10 +855,7 @@ impl Connection { } } else { // Nothing more to send. This was the last packet. - if pad_datagram { - builder.pad_to(MIN_INITIAL_SIZE); - } - builder.finish_and_track(now, self, sent_frames); + builder.finish_and_track(now, self, sent_frames, pad_datagram); break; } } @@ -910,7 +902,7 @@ impl Connection { non_retransmits: true, ..Default::default() }; - builder.finish_and_track(now, self, sent_frames); + builder.finish_and_track(now, self, sent_frames, false); self.stats.path.sent_plpmtud_probes += 1; diff --git a/quinn-proto/src/connection/packet_builder.rs b/quinn-proto/src/connection/packet_builder.rs index cdb0baaa0b..ad5ffd8567 100644 --- a/quinn-proto/src/connection/packet_builder.rs +++ b/quinn-proto/src/connection/packet_builder.rs @@ -4,7 +4,7 @@ use tracing::{trace, trace_span}; use super::{Connection, SentFrames, TransmitBuf, spaces::SentPacket}; use crate::{ - ConnectionId, Instant, TransportError, TransportErrorCode, + ConnectionId, Instant, MIN_INITIAL_SIZE, TransportError, TransportErrorCode, connection::ConnectionSide, frame::{self, Close}, packet::{FIXED_BIT, Header, InitialHeader, LongType, PacketNumber, PartialEncode, SpaceId}, @@ -194,7 +194,16 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { self.buf.limit(self.frame_space_remaining()) } - pub(super) fn finish_and_track(self, now: Instant, conn: &mut Connection, sent: SentFrames) { + pub(super) fn finish_and_track( + mut self, + now: Instant, + conn: &mut Connection, + sent: SentFrames, + pad_datagram: bool, + ) { + if pad_datagram { + self.pad_to(MIN_INITIAL_SIZE); + } let ack_eliciting = self.ack_eliciting; let exact_number = self.exact_number; let space_id = self.space;