diff --git a/quinn-proto/src/connection/datagrams.rs b/quinn-proto/src/connection/datagrams.rs index c22e8d7155..1d38361e33 100644 --- a/quinn-proto/src/connection/datagrams.rs +++ b/quinn-proto/src/connection/datagrams.rs @@ -1,6 +1,6 @@ use std::collections::VecDeque; -use bytes::Bytes; +use bytes::{BufMut, Bytes}; use thiserror::Error; use tracing::{debug, trace}; @@ -163,13 +163,13 @@ impl DatagramState { /// /// Returns whether a frame was written. At most `max_size` bytes will be written, including /// framing. - pub(super) fn write(&mut self, buf: &mut Vec, max_size: usize) -> bool { + pub(super) fn write(&mut self, buf: &mut impl BufMut) -> bool { let datagram = match self.outgoing.pop_front() { Some(x) => x, None => return false, }; - if buf.len() + datagram.size(true) > max_size { + if buf.remaining_mut() < datagram.size(true) { // Future work: we could be more clever about cramming small datagrams into // mostly-full packets when a larger one is queued first self.outgoing.push_front(datagram); diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 9f10ec103c..7aa0ce68dd 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -7,7 +7,7 @@ use std::{ sync::Arc, }; -use bytes::{Bytes, BytesMut}; +use bytes::{BufMut, Bytes, BytesMut}; use frame::StreamMetaVec; use rand::{Rng, SeedableRng, rngs::StdRng}; use thiserror::Error; @@ -21,6 +21,7 @@ use crate::{ cid_queue::CidQueue, coding::BufMutExt, config::{ServerConfig, TransportConfig}, + congestion::Controller, crypto::{self, KeyPair, Keys, PacketKey}, frame::{self, Close, Datagram, FrameStruct, NewToken}, packet::{ @@ -85,9 +86,11 @@ pub use streams::{ }; mod timer; -use crate::congestion::Controller; use timer::{Timer, TimerTable}; +mod transmit_buf; +use transmit_buf::TransmitBuf; + /// Protocol state and logic for a single QUIC connection /// /// Objects of this type receive [`ConnectionEvent`]s and emit [`EndpointEvent`]s and application @@ -455,14 +458,9 @@ impl Connection { false => 1, true => max_datagrams, }; + let mut buf = TransmitBuf::new(buf, max_datagrams, self.path.current_mtu().into()); - let mut num_datagrams = 0; - // Position in `buf` of the first byte of the current UDP datagram. When coalescing QUIC - // packets, this can be earlier than the start of the current QUIC packet. - let mut datagram_start = 0; - let mut segment_size = usize::from(self.path.current_mtu()); - - if let Some(challenge) = self.send_path_challenge(now, buf) { + if let Some(challenge) = self.send_path_challenge(now, &mut buf) { return Some(challenge); } @@ -500,71 +498,43 @@ impl Connection { && self.peer_supports_ack_frequency(); } - // Reserving capacity can provide more capacity than we asked for. However, we are not - // allowed to write more than `segment_size`. Therefore the maximum capacity is tracked - // separately. - let mut buf_capacity = 0; - + // Whether this packet can be coalesced with another one in the same datagram. let mut coalesce = true; - let mut builder_storage: Option = None; - let mut sent_frames = None; + + // Whether the last packet in the datagram must be padded to at least + // MIN_INITIAL_SIZE. let mut pad_datagram = false; + + // Whether congestion control stopped the next packet from being sent. Further + // packets could still be built, as e.g. tail-loss probes are not congestion + // limited. let mut congestion_blocked = false; - // Iterate over all spaces and find data to send - let mut space_idx = 0; - let spaces = [SpaceId::Initial, SpaceId::Handshake, SpaceId::Data]; - // This loop will potentially spend multiple iterations in the same `SpaceId`, - // so we cannot trivially rewrite it to take advantage of `SpaceId::iter()`. - while space_idx < spaces.len() { - let space_id = spaces[space_idx]; - // Number of bytes available for frames if this is a 1-RTT packet. We're guaranteed to - // be able to send an individual frame at least this large in the next 1-RTT - // packet. This could be generalized to support every space, but it's only needed to - // handle large fixed-size frames, which only exist in 1-RTT (application datagrams). We - // don't account for coalesced packets potentially occupying space because frames can - // always spill into the next datagram. - let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]); - let frame_space_1rtt = - segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn))); - - // Is there data or a close message to send in this space? - let can_send = self.space_can_send(space_id, frame_space_1rtt); - if can_send.is_empty() && (!close || self.spaces[space_id].crypto.is_none()) { - space_idx += 1; - continue; - } + // The packet number of the last built packet. + let mut last_packet_number = None; + // Iterate over all spaces and find data to send + // + // Each loop builds one packet, which is finished before the next iteration of the + // loop. When packets are coalesced a datagram is filled over multiple loops. + let mut next_space_id = self.next_send_space(SpaceId::Initial, &buf, close); + while let Some(space_id) = next_space_id { + // Whether the next packet will contain ack-eliciting frames. let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams) || self.spaces[space_id].ping_pending || self.spaces[space_id].immediate_ack_pending; if space_id == SpaceId::Data { + let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]); + let frame_space_1rtt = buf + .segment_size() + .saturating_sub(self.predict_1rtt_overhead(Some(pn))); ack_eliciting |= self.can_send_1rtt(frame_space_1rtt); } - // Can we append more data into the current buffer? - // It is not safe to assume that `buf.len()` is the end of the data, - // since the last packet might not have been finished. - let buf_end = if let Some(builder) = &builder_storage { - buf.len().max(builder.min_size) + builder.tag_len - } else { - buf.len() - }; - - let tag_len = if let Some(ref crypto) = self.spaces[space_id].crypto { - crypto.packet.local.tag_len() - } else if space_id == SpaceId::Data { - self.zero_rtt_crypto.as_ref().expect( - "sending packets in the application data space requires known 0-RTT or 1-RTT keys", - ).packet.tag_len() - } else { - unreachable!("tried to send {:?} packet without keys", space_id) - }; - if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE + tag_len { - // We need to send 1 more datagram and extend the buffer for that. - + // If the datagram is full, we need to start a new one. + if buf.datagram_remaining_mut() == 0 { // Is 1 more datagram allowed? - if num_datagrams >= max_datagrams { + if buf.num_datagrams() >= buf.max_datagrams() { // No more datagrams allowed break; } @@ -575,10 +545,9 @@ impl Connection { // for starting another datagram. If there is any anti-amplification // budget left, we always allow a full MTU to be sent // (see https://github.com/quinn-rs/quinn/issues/1082) - if self - .path - .anti_amplification_blocked(segment_size as u64 * (num_datagrams as u64) + 1) - { + if self.path.anti_amplification_blocked( + (buf.segment_size() * buf.num_datagrams()) as u64 + 1, + ) { trace!("blocked by anti-amplification"); break; } @@ -586,22 +555,20 @@ impl Connection { // Congestion control and pacing checks // Tail loss probes must not be blocked by congestion, or a deadlock could arise if ack_eliciting && self.spaces[space_id].loss_probes == 0 { - // Assume the current packet will get padded to fill the segment - let untracked_bytes = if let Some(builder) = &builder_storage { - buf_capacity - builder.partial_encode.start - } else { - 0 - } as u64; - debug_assert!(untracked_bytes <= segment_size as u64); - - let bytes_to_send = segment_size as u64 + untracked_bytes; + let bytes_to_send = buf.segment_size() as u64; if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() { - space_idx += 1; + next_space_id = self.next_send_space(space_id.next(), &buf, close); congestion_blocked = true; - // We continue instead of breaking here in order to avoid - // blocking loss probes queued for higher spaces. trace!("blocked by congestion control"); - continue; + if next_space_id == Some(space_id) { + // We are in the highest space, nothing more to do. + break; + } else { + // We continue looking for packets in higher spaces because we + // might still have to send loss probes in them, which are not + // congestion controlled. + continue; + } } // Check whether the next datagram is blocked by pacing @@ -622,118 +589,25 @@ impl Connection { } } - // Finish current packet - if let Some(mut builder) = builder_storage.take() { - if pad_datagram { - builder.pad_to(MIN_INITIAL_SIZE); - } - - if num_datagrams > 1 { - // If too many padding bytes would be required to continue the GSO batch - // after this packet, end the GSO batch here. Ensures that fixed-size frames - // with heterogeneous sizes (e.g. application datagrams) won't inadvertently - // waste large amounts of bandwidth. The exact threshold is a bit arbitrary - // and might benefit from further tuning, though there's no universally - // optimal value. - // - // Additionally, if this datagram is a loss probe and `segment_size` is - // larger than `INITIAL_MTU`, then padding it to `segment_size` to continue - // the GSO batch would risk failure to recover from a reduction in path - // MTU. Loss probes are the only packets for which we might grow - // `buf_capacity` by less than `segment_size`. - const MAX_PADDING: usize = 16; - let packet_len_unpadded = cmp::max(builder.min_size, buf.len()) - - datagram_start - + builder.tag_len; - if packet_len_unpadded + MAX_PADDING < segment_size - || datagram_start + segment_size > buf_capacity - { - trace!( - "GSO truncated by demand for {} padding bytes or loss probe", - segment_size - packet_len_unpadded - ); - builder_storage = Some(builder); - break; - } - - // Pad the current datagram to GSO segment size so it can be included in the - // GSO batch. - builder.pad_to(segment_size as u16); - } - - builder.finish_and_track(now, self, sent_frames.take(), buf); - - if num_datagrams == 1 { - // Set the segment size for this GSO batch to the size of the first UDP - // datagram in the batch. Larger data that cannot be fragmented - // (e.g. application datagrams) will be included in a future batch. When - // sending large enough volumes of data for GSO to be useful, we expect - // packet sizes to usually be consistent, e.g. populated by max-size STREAM - // frames or uniformly sized datagrams. - segment_size = buf.len(); - // Clip the unused capacity out of the buffer so future packets don't - // overrun - buf_capacity = buf.len(); - - // Check whether the data we planned to send will fit in the reduced segment - // size. If not, bail out and leave it for the next GSO batch so we don't - // end up trying to send an empty packet. We can't easily compute the right - // segment size before the original call to `space_can_send`, because at - // that time we haven't determined whether we're going to coalesce with the - // first datagram or potentially pad it to `MIN_INITIAL_SIZE`. - if space_id == SpaceId::Data { - let frame_space_1rtt = - segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn))); - if self.space_can_send(space_id, frame_space_1rtt).is_empty() { - break; - } - } - } - } - - // Allocate space for another datagram - let next_datagram_size_limit = match self.spaces[space_id].loss_probes { - 0 => segment_size, + // Start the next datagram + match self.spaces[space_id].loss_probes { + 0 => buf.start_new_datagram(), _ => { self.spaces[space_id].loss_probes -= 1; // Clamp the datagram to at most the minimum MTU to ensure that loss probes // can get through and enable recovery even if the path MTU has shrank // unexpectedly. - std::cmp::min(segment_size, usize::from(INITIAL_MTU)) + buf.start_new_datagram_with_size(std::cmp::min( + usize::from(INITIAL_MTU), + buf.segment_size(), + )); } }; - buf_capacity += next_datagram_size_limit; - if buf.capacity() < buf_capacity { - // We reserve the maximum space for sending `max_datagrams` upfront - // to avoid any reallocations if more datagrams have to be appended later on. - // Benchmarks have shown shown a 5-10% throughput improvement - // compared to continuously resizing the datagram buffer. - // While this will lead to over-allocation for small transmits - // (e.g. purely containing ACKs), modern memory allocators - // (e.g. mimalloc and jemalloc) will pool certain allocation sizes - // and therefore this is still rather efficient. - buf.reserve(max_datagrams * segment_size); - } - num_datagrams += 1; coalesce = true; pad_datagram = false; - datagram_start = buf.len(); - - debug_assert_eq!( - datagram_start % segment_size, - 0, - "datagrams in a GSO batch must be aligned to the segment size" - ); - } else { - // We can append/coalesce the next packet into the current - // datagram. - // Finish current packet without adding extra padding - if let Some(builder) = builder_storage.take() { - builder.finish_and_track(now, self, sent_frames.take(), buf); - } } - debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE); + debug_assert!(buf.datagram_remaining_mut() >= MIN_PACKET_SPACE); // // From here on, we've determined that a packet will definitely be sent. @@ -751,21 +625,15 @@ impl Connection { prev.update_unacked = false; } - debug_assert!( - builder_storage.is_none() && sent_frames.is_none(), - "Previous packet must have been finished" - ); - - let builder = builder_storage.insert(PacketBuilder::new( + let mut builder = PacketBuilder::new( now, space_id, self.rem_cids.active(), - buf, - buf_capacity, - datagram_start, + &mut buf, ack_eliciting, self, - )?); + )?; + last_packet_number = Some(builder.exact_number); coalesce = coalesce && !builder.short_header; // https://tools.ietf.org/html/draft-ietf-quic-transport-34#section-14.1 @@ -778,13 +646,14 @@ impl Connection { // a better approximate on what data has been processed. This is // especially important with ack delay, since the peer might not // have gotten any other ACK for the data earlier on. + let mut sent_frames = SentFrames::default(); if !self.spaces[space_id].pending_acks.ranges().is_empty() { Self::populate_acks( now, self.receiving_ecn, - &mut SentFrames::default(), + &mut sent_frames, &mut self.spaces[space_id], - buf, + &mut builder.frame_space_mut(), &mut self.stats, ); } @@ -793,22 +662,22 @@ impl Connection { // to encode the ConnectionClose frame too. However we still have the // check here to prevent crashes if something changes. debug_assert!( - buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size, + builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND, "ACKs should leave space for ConnectionClose" ); - if buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size { - let max_frame_size = builder.max_size - buf.len(); + if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() { + let max_frame_size = builder.frame_space_remaining(); match self.state { State::Closed(state::Closed { ref reason }) => { if space_id == SpaceId::Data || reason.is_transport_layer() { - reason.encode(buf, max_frame_size) + reason.encode(&mut builder.frame_space_mut(), max_frame_size) } else { frame::ConnectionClose { error_code: TransportErrorCode::APPLICATION_ERROR, frame_type: None, reason: Bytes::new(), } - .encode(buf, max_frame_size) + .encode(&mut builder.frame_space_mut(), max_frame_size) } } State::Draining => frame::ConnectionClose { @@ -816,12 +685,13 @@ impl Connection { frame_type: None, reason: Bytes::new(), } - .encode(buf, max_frame_size), + .encode(&mut builder.frame_space_mut(), max_frame_size), _ => unreachable!( "tried to make a close packet when the connection wasn't closed" ), } } + builder.finish_and_track(now, self, sent_frames, pad_datagram); if space_id == self.highest_space { // Don't send another close packet self.close = false; @@ -831,31 +701,30 @@ impl Connection { // Send a close frame in every possible space for robustness, per RFC9000 // "Immediate Close during the Handshake". Don't bother trying to send anything // else. - space_idx += 1; + next_space_id = self.next_send_space(space_id.next(), &buf, close); continue; } } // Send an off-path PATH_RESPONSE. Prioritized over on-path data to ensure that path // validation can occur while the link is saturated. - if space_id == SpaceId::Data && num_datagrams == 1 { + if space_id == SpaceId::Data && builder.buf.num_datagrams() == 1 { if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) { - // `unwrap` guaranteed to succeed because `builder_storage` was populated just - // above. - let mut builder = builder_storage.take().unwrap(); trace!("PATH_RESPONSE {:08x} (off-path)", token); - buf.write(frame::FrameType::PATH_RESPONSE); - buf.write(token); + builder + .frame_space_mut() + .write(frame::FrameType::PATH_RESPONSE); + builder.frame_space_mut().write(token); self.stats.frame_tx.path_response += 1; builder.pad_to(MIN_INITIAL_SIZE); builder.finish_and_track( now, self, - Some(SentFrames { + SentFrames { non_retransmits: true, ..SentFrames::default() - }), - buf, + }, + false, ); self.stats.udp_tx.on_sent(1, buf.len()); return Some(Transmit { @@ -868,8 +737,10 @@ impl Connection { } } - let sent = - self.populate_packet(now, space_id, buf, builder.max_size, builder.exact_number); + let sent_frames = { + let pn = builder.exact_number; + self.populate_packet(now, space_id, &mut builder.frame_space_mut(), pn) + }; // ACK-only packets should only be sent when explicitly allowed. If we write them due to // any other reason, there is a bug which leads to one component announcing write @@ -877,35 +748,121 @@ impl Connection { // only checked if the full MTU is available and when potentially large fixed-size // frames aren't queued, so that lack of space in the datagram isn't the reason for just // writing ACKs. - debug_assert!( - !(sent.is_ack_only(&self.streams) - && !can_send.acks - && can_send.other - && (buf_capacity - builder.datagram_start) == self.path.current_mtu() as usize - && self.datagrams.outgoing.is_empty()), - "SendableFrames was {can_send:?}, but only ACKs have been written" - ); - pad_datagram |= sent.requires_padding; + { + let pn = if builder.space == SpaceId::Data { + builder.exact_number + } else { + self.packet_number_filter.peek(&self.spaces[SpaceId::Data]) + }; + let frame_space_1rtt = builder + .buf + .segment_size() + .saturating_sub(self.predict_1rtt_overhead(Some(pn))); + let can_send = self.space_can_send(space_id, frame_space_1rtt); + debug_assert!( + !(sent_frames.is_ack_only(&self.streams) + && !can_send.acks + && can_send.other + && builder.buf.segment_size() == self.path.current_mtu() as usize + && self.datagrams.outgoing.is_empty()), + "SendableFrames was {can_send:?}, but only ACKs have been written" + ); + } + pad_datagram |= sent_frames.requires_padding; - if sent.largest_acked.is_some() { + if sent_frames.largest_acked.is_some() { self.spaces[space_id].pending_acks.acks_sent(); self.timers.stop(Timer::MaxAckDelay); } - // Keep information about the packet around until it gets finalized - sent_frames = Some(sent); + // Now we need to finish the packet. Before we do so we need to know if we will + // be coalescing the next packet into this one, or will be ending the datagram + // as well. Because if this is the last packet in the datagram more padding + // might be needed because of the packet type, or to fill the GSO segment size. + next_space_id = self.next_send_space(space_id, builder.buf, close); + if let Some(next_space_id) = next_space_id { + // Are we allowed to coalesce AND is there enough space for another *packet* + // in this datagram? + if coalesce + && builder + .buf + .datagram_remaining_mut() + .saturating_sub(builder.predict_packet_end()) + > MIN_PACKET_SPACE + { + // We can append/coalesce the next packet into the current + // datagram. Finish the current packet without adding extra padding. + builder.finish_and_track(now, self, sent_frames, false); + } else { + // We need a new datagram for the next packet. Finish the current + // packet with padding. + if builder.buf.num_datagrams() > 1 { + // If too many padding bytes would be required to continue the + // GSO batch after this packet, end the GSO batch here. Ensures + // that fixed-size frames with heterogeneous sizes + // (e.g. application datagrams) won't inadvertently waste large + // amounts of bandwidth. The exact threshold is a bit arbitrary + // and might benefit from further tuning, though there's no + // universally optimal value. + // + // Additionally, if this datagram is a loss probe and + // `segment_size` is larger than `INITIAL_MTU`, then padding it + // to `segment_size` to continue the GSO batch would risk + // failure to recover from a reduction in path MTU. Loss probes + // are the only packets for which we might grow `buf_capacity` + // by less than `segment_size`. + const MAX_PADDING: usize = 16; + if builder.buf.datagram_remaining_mut() + > builder.predict_packet_end() + MAX_PADDING + { + trace!( + "GSO truncated by demand for {} padding bytes", + builder.buf.datagram_remaining_mut() - builder.predict_packet_end() + ); + builder.finish_and_track(now, self, sent_frames, pad_datagram); + break; + } - // Don't increment space_idx. - // We stay in the current space and check if there is more data to send. - } + // Pad the current datagram to GSO segment size so it can be + // included in the GSO batch. + builder.pad_to(builder.buf.segment_size() as u16); + } - // Finish the last packet - if let Some(mut builder) = builder_storage { - if pad_datagram { - builder.pad_to(MIN_INITIAL_SIZE); + builder.finish_and_track(now, self, sent_frames, pad_datagram); + + if buf.num_datagrams() == 1 { + buf.clip_datagram_size(); + if next_space_id == SpaceId::Data { + // Now that we know the size of the first datagram, check whether + // the data we planned to send will fit in the next segment. If + // not, bail out and leave it for the next GSO batch. We can't + // easily compute the right segment size before the original call to + // `space_can_send`, because at that time we haven't determined + // whether we're going to coalesce with the first datagram or + // potentially pad it to `MIN_INITIAL_SIZE`. + let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]); + let frame_space_1rtt = buf + .segment_size() + .saturating_sub(self.predict_1rtt_overhead(Some(pn))); + if self + .space_can_send(next_space_id, frame_space_1rtt) + .is_empty() + { + break; + } + } + } + } + } else { + // Nothing more to send. This was the last packet. + builder.finish_and_track(now, self, sent_frames, pad_datagram); + break; } - let last_packet_number = builder.exact_number; - builder.finish_and_track(now, self, sent_frames, buf); + } + + if let Some(last_packet_number) = last_packet_number { + // Note that when sending in multiple packet spaces the last packet number will + // be the one from the highest packet space. self.path .congestion .on_sent(now, buf.len() as u64, last_packet_number); @@ -921,27 +878,22 @@ impl Connection { .mtud .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))?; - let buf_capacity = probe_size as usize; - buf.reserve(buf_capacity); + debug_assert_eq!(buf.num_datagrams(), 0); + buf.start_new_datagram_with_size(probe_size as usize); - let mut builder = PacketBuilder::new( - now, - space_id, - self.rem_cids.active(), - buf, - buf_capacity, - 0, - true, - self, - )?; + debug_assert_eq!(buf.datagram_start_offset(), 0); + let mut builder = + PacketBuilder::new(now, space_id, self.rem_cids.active(), &mut buf, true, self)?; // We implement MTU probes as ping packets padded up to the probe size - buf.write(frame::FrameType::PING); + builder.frame_space_mut().write(frame::FrameType::PING); self.stats.frame_tx.ping += 1; // If supported by the peer, we want no delays to the probe's ACK if self.peer_supports_ack_frequency() { - buf.write(frame::FrameType::IMMEDIATE_ACK); + builder + .frame_space_mut() + .write(frame::FrameType::IMMEDIATE_ACK); self.stats.frame_tx.immediate_ack += 1; } @@ -950,10 +902,9 @@ impl Connection { non_retransmits: true, ..Default::default() }; - builder.finish_and_track(now, self, Some(sent_frames), buf); + builder.finish_and_track(now, self, sent_frames, false); self.stats.path.sent_plpmtud_probes += 1; - num_datagrams = 1; trace!(?probe_size, "writing MTUD probe"); } @@ -962,10 +913,16 @@ impl Connection { return None; } - trace!("sending {} bytes in {} datagrams", buf.len(), num_datagrams); + trace!( + "sending {} bytes in {} datagrams", + buf.len(), + buf.num_datagrams() + ); self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64); - self.stats.udp_tx.on_sent(num_datagrams as u64, buf.len()); + self.stats + .udp_tx + .on_sent(buf.num_datagrams() as u64, buf.len()); Some(Transmit { destination: self.path.remote, @@ -975,16 +932,50 @@ impl Connection { } else { None }, - segment_size: match num_datagrams { + segment_size: match buf.num_datagrams() { 1 => None, - _ => Some(segment_size), + _ => Some(buf.segment_size()), }, src_ip: self.local_ip, }) } + /// Returns the [`SpaceId`] of the next packet space which has data to send + /// + /// This takes into account the space available to frames in the next datagram. + fn next_send_space( + &self, + current_space_id: SpaceId, + buf: &TransmitBuf<'_>, + close: bool, + ) -> Option { + // Number of bytes available for frames if this is a 1-RTT packet. We're guaranteed + // to be able to send an individual frame at least this large in the next 1-RTT + // packet. This could be generalized to support every space, but it's only needed to + // handle large fixed-size frames, which only exist in 1-RTT (application + // datagrams). We don't account for coalesced packets potentially occupying space + // because frames can always spill into the next datagram. + let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]); + let frame_space_1rtt = buf + .segment_size() + .saturating_sub(self.predict_1rtt_overhead(Some(pn))); + let mut space_id = current_space_id; + loop { + let can_send = self.space_can_send(space_id, frame_space_1rtt); + if !can_send.is_empty() || (close && self.spaces[space_id].crypto.is_some()) { + return Some(space_id); + } + space_id = match space_id { + SpaceId::Initial => SpaceId::Handshake, + SpaceId::Handshake => SpaceId::Data, + SpaceId::Data => break, + } + } + None + } + /// Send PATH_CHALLENGE for a previous path if necessary - fn send_path_challenge(&mut self, now: Instant, buf: &mut Vec) -> Option { + fn send_path_challenge(&mut self, now: Instant, buf: &mut TransmitBuf<'_>) -> Option { let (prev_cid, prev_path) = self.prev_path.as_mut()?; if !prev_path.challenge_pending { return None; @@ -999,28 +990,20 @@ impl Connection { SpaceId::Data, "PATH_CHALLENGE queued without 1-RTT keys" ); - buf.reserve(MIN_INITIAL_SIZE as usize); - - let buf_capacity = buf.capacity(); + buf.start_new_datagram_with_size(MIN_INITIAL_SIZE as usize); // Use the previous CID to avoid linking the new path with the previous path. We // don't bother accounting for possible retirement of that prev_cid because this is // sent once, immediately after migration, when the CID is known to be valid. Even // if a post-migration packet caused the CID to be retired, it's fair to pretend // this is sent first. - let mut builder = PacketBuilder::new( - now, - SpaceId::Data, - *prev_cid, - buf, - buf_capacity, - 0, - false, - self, - )?; + debug_assert_eq!(buf.datagram_start_offset(), 0); + let mut builder = PacketBuilder::new(now, SpaceId::Data, *prev_cid, buf, false, self)?; trace!("validating previous path with PATH_CHALLENGE {:08x}", token); - buf.write(frame::FrameType::PATH_CHALLENGE); - buf.write(token); + builder + .frame_space_mut() + .write(frame::FrameType::PATH_CHALLENGE); + builder.frame_space_mut().write(token); self.stats.frame_tx.path_challenge += 1; // An endpoint MUST expand datagrams that contain a PATH_CHALLENGE frame @@ -1029,7 +1012,7 @@ impl Connection { // sending a datagram of this size builder.pad_to(MIN_INITIAL_SIZE); - builder.finish(self, buf); + builder.finish(self); self.stats.udp_tx.on_sent(1, buf.len()); Some(Transmit { @@ -3055,8 +3038,7 @@ impl Connection { &mut self, now: Instant, space_id: SpaceId, - buf: &mut Vec, - max_size: usize, + buf: &mut impl BufMut, pn: u64, ) -> SentFrames { let mut sent = SentFrames::default(); @@ -3132,7 +3114,7 @@ impl Connection { } // PATH_CHALLENGE - if buf.len() + 9 < max_size && space_id == SpaceId::Data { + if buf.remaining_mut() > 9 && space_id == SpaceId::Data { // Transmit challenges with every outgoing frame on an unvalidated path if let Some(token) = self.path.challenge { // But only send a packet solely for that purpose at most once @@ -3147,7 +3129,7 @@ impl Connection { } // PATH_RESPONSE - if buf.len() + 9 < max_size && space_id == SpaceId::Data { + if buf.remaining_mut() > 9 && space_id == SpaceId::Data { if let Some(token) = self.path_responses.pop_on_path(self.path.remote) { sent.non_retransmits = true; sent.requires_padding = true; @@ -3159,7 +3141,7 @@ impl Connection { } // CRYPTO - while buf.len() + frame::Crypto::SIZE_BOUND < max_size && !is_0rtt { + while buf.remaining_mut() > frame::Crypto::SIZE_BOUND && !is_0rtt { let mut frame = match space.pending.crypto.pop_front() { Some(x) => x, None => break, @@ -3169,8 +3151,7 @@ impl Connection { // Since the offset is known, we can reserve the exact size required to encode it. // For length we reserve 2bytes which allows to encode up to 2^14, // which is more than what fits into normally sized QUIC frames. - let max_crypto_data_size = max_size - - buf.len() + let max_crypto_data_size = buf.remaining_mut() - 1 // Frame Type - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) }) - 2; // Maximum encoded length for frame size, given we send less than 2^14 bytes @@ -3206,12 +3187,11 @@ impl Connection { &mut space.pending, &mut sent.retransmits, &mut self.stats.frame_tx, - max_size, ); } // NEW_CONNECTION_ID - while buf.len() + 44 < max_size { + while buf.remaining_mut() > 44 { let issued = match space.pending.new_cids.pop() { Some(x) => x, None => break, @@ -3233,7 +3213,7 @@ impl Connection { } // RETIRE_CONNECTION_ID - while buf.len() + frame::RETIRE_CONNECTION_ID_SIZE_BOUND < max_size { + while buf.remaining_mut() > frame::RETIRE_CONNECTION_ID_SIZE_BOUND { let seq = match space.pending.retire_cids.pop() { Some(x) => x, None => break, @@ -3247,8 +3227,8 @@ impl Connection { // DATAGRAM let mut sent_datagrams = false; - while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data { - match self.datagrams.write(buf, max_size) { + while buf.remaining_mut() > Datagram::SIZE_BOUND && space_id == SpaceId::Data { + match self.datagrams.write(buf) { true => { sent_datagrams = true; sent.non_retransmits = true; @@ -3288,7 +3268,7 @@ impl Connection { token: token.encode(&*server_config.token_key).into(), }; - if buf.len() + new_token.size() >= max_size { + if buf.remaining_mut() < new_token.size() { space.pending.new_tokens.push(remote_addr); break; } @@ -3303,9 +3283,9 @@ impl Connection { // STREAM if space_id == SpaceId::Data { - sent.stream_frames = - self.streams - .write_stream_frames(buf, max_size, self.config.send_fairness); + sent.stream_frames = self + .streams + .write_stream_frames(buf, self.config.send_fairness); self.stats.frame_tx.stream += sent.stream_frames.len() as u64; } @@ -3321,7 +3301,7 @@ impl Connection { receiving_ecn: bool, sent: &mut SentFrames, space: &mut PacketSpace, - buf: &mut Vec, + buf: &mut impl BufMut, stats: &mut ConnectionStats, ) { debug_assert!(!space.pending_acks.ranges().is_empty()); diff --git a/quinn-proto/src/connection/packet_builder.rs b/quinn-proto/src/connection/packet_builder.rs index d99f012b1a..ad5ffd8567 100644 --- a/quinn-proto/src/connection/packet_builder.rs +++ b/quinn-proto/src/connection/packet_builder.rs @@ -1,17 +1,25 @@ -use bytes::Bytes; +use bytes::{BufMut, Bytes}; use rand::Rng; use tracing::{trace, trace_span}; -use super::{Connection, SentFrames, spaces::SentPacket}; +use super::{Connection, SentFrames, TransmitBuf, spaces::SentPacket}; use crate::{ - ConnectionId, Instant, TransportError, TransportErrorCode, + ConnectionId, Instant, MIN_INITIAL_SIZE, TransportError, TransportErrorCode, connection::ConnectionSide, frame::{self, Close}, packet::{FIXED_BIT, Header, InitialHeader, LongType, PacketNumber, PartialEncode, SpaceId}, }; -pub(super) struct PacketBuilder { - pub(super) datagram_start: usize, +/// QUIC packet builder +/// +/// This allows building QUIC packets: it takes care of writing the header, allows writing +/// frames and on [`PacketBuilder::finalize`] (or [`PacketBuilder::finalize_and_track`]) it +/// encrypts the packet so it is ready to be sent on the wire. +/// +/// The builder manages the write buffer into which the packet is written, and directly +/// implements [`BufMut`] to write frames into the packet. +pub(super) struct PacketBuilder<'a, 'b> { + pub(super) buf: &'a mut TransmitBuf<'b>, pub(super) space: SpaceId, pub(super) partial_encode: PartialEncode, pub(super) ack_eliciting: bool, @@ -20,14 +28,11 @@ pub(super) struct PacketBuilder { /// Smallest absolute position in the associated buffer that must be occupied by this packet's /// frames pub(super) min_size: usize, - /// Largest absolute position in the associated buffer that may be occupied by this packet's - /// frames - pub(super) max_size: usize, pub(super) tag_len: usize, pub(super) _span: tracing::span::EnteredSpan, } -impl PacketBuilder { +impl<'a, 'b> PacketBuilder<'a, 'b> { /// Write a new packet header to `buffer` and determine the packet's properties /// /// Marks the connection drained and returns `None` if the confidentiality limit would be @@ -36,12 +41,13 @@ impl PacketBuilder { now: Instant, space_id: SpaceId, dst_cid: ConnectionId, - buffer: &mut Vec, - buffer_capacity: usize, - datagram_start: usize, + buffer: &'a mut TransmitBuf<'b>, ack_eliciting: bool, conn: &mut Connection, - ) -> Option { + ) -> Option + where + 'b: 'a, + { let version = conn.version; // Initiate key update if we're approaching the confidentiality limit let sent_with_keys = conn.spaces[space_id].sent_with_keys; @@ -124,7 +130,7 @@ impl PacketBuilder { }; let partial_encode = header.encode(buffer); if conn.peer_params.grease_quic_bit && conn.rng.random() { - buffer[partial_encode.start] ^= FIXED_BIT; + buffer.as_mut_slice()[partial_encode.start] ^= FIXED_BIT; } let (sample_size, tag_len) = if let Some(ref crypto) = space.crypto { @@ -151,17 +157,16 @@ impl PacketBuilder { buffer.len() + (sample_size + 4).saturating_sub(number.len() + tag_len), partial_encode.start + dst_cid.len() + 6, ); - let max_size = buffer_capacity - tag_len; + let max_size = buffer.datagram_max_offset() - tag_len; debug_assert!(max_size >= min_size); Some(Self { - datagram_start, + buf: buffer, space: space_id, partial_encode, exact_number, short_header: header.is_short(), min_size, - max_size, tag_len, ack_eliciting, _span: span, @@ -176,25 +181,33 @@ impl PacketBuilder { // already. self.min_size = Ord::max( self.min_size, - self.datagram_start + (min_size as usize) - self.tag_len, + self.buf.datagram_start_offset() + (min_size as usize) - self.tag_len, ); } + /// Returns a writable buffer limited to the remaining frame space + /// + /// The [`BufMut::remaining_mut`] call on the returned buffer indicates the amount of + /// space available to write QUIC frames into. + // In rust 1.82 we can use `-> impl BufMut + use<'_, 'a, 'b>` + pub(super) fn frame_space_mut(&mut self) -> bytes::buf::Limit<&mut TransmitBuf<'b>> { + self.buf.limit(self.frame_space_remaining()) + } + pub(super) fn finish_and_track( - self, + mut self, now: Instant, conn: &mut Connection, - sent: Option, - buffer: &mut Vec, + sent: SentFrames, + pad_datagram: bool, ) { + if pad_datagram { + self.pad_to(MIN_INITIAL_SIZE); + } let ack_eliciting = self.ack_eliciting; let exact_number = self.exact_number; let space_id = self.space; - let (size, padded) = self.finish(conn, buffer); - let sent = match sent { - Some(sent) => sent, - None => return, - }; + let (size, padded) = self.finish(conn); let size = match padded || ack_eliciting { true => size as u16, @@ -228,11 +241,15 @@ impl PacketBuilder { } /// Encrypt packet, returning the length of the packet and whether padding was added - pub(super) fn finish(self, conn: &mut Connection, buffer: &mut Vec) -> (usize, bool) { - let pad = buffer.len() < self.min_size; + pub(super) fn finish(self, conn: &mut Connection) -> (usize, bool) { + debug_assert!( + self.buf.len() <= self.buf.datagram_max_offset() - self.tag_len, + "packet exceeds maximum size" + ); + let pad = self.buf.len() < self.min_size; if pad { - trace!("PADDING * {}", self.min_size - buffer.len()); - buffer.resize(self.min_size, 0); + trace!("PADDING * {}", self.min_size - self.buf.len()); + self.buf.put_bytes(0, self.min_size - self.buf.len()); } let space = &conn.spaces[self.space]; @@ -251,15 +268,34 @@ impl PacketBuilder { "Mismatching crypto tag len" ); - buffer.resize(buffer.len() + packet_crypto.tag_len(), 0); + self.buf.put_bytes(0, packet_crypto.tag_len()); let encode_start = self.partial_encode.start; - let packet_buf = &mut buffer[encode_start..]; + let packet_buf = &mut self.buf.as_mut_slice()[encode_start..]; self.partial_encode.finish( packet_buf, header_crypto, Some((self.exact_number, packet_crypto)), ); - (buffer.len() - encode_start, pad) + let packet_len = self.buf.len() - encode_start; + trace!(size = %packet_len, short_header = %self.short_header, "wrote packet"); + (packet_len, pad) + } + + /// The number of additional bytes the current packet would take up if it was finished now + /// + /// This will include any padding which is required to make the size large enough to be + /// encrypted correctly. + pub(super) fn predict_packet_end(&self) -> usize { + self.buf.len().max(self.min_size) + self.tag_len - self.buf.len() + } + + /// Returns the remaining space in the packet that can be taken up by QUIC frames + /// + /// This leaves space in the datagram for the cryptographic tag that needs to be written + /// when the packet is finished. + pub(super) fn frame_space_remaining(&self) -> usize { + let max_offset = self.buf.datagram_max_offset() - self.tag_len; + max_offset.saturating_sub(self.buf.len()) } } diff --git a/quinn-proto/src/connection/streams/state.rs b/quinn-proto/src/connection/streams/state.rs index e05311423d..fe30e1bd2b 100644 --- a/quinn-proto/src/connection/streams/state.rs +++ b/quinn-proto/src/connection/streams/state.rs @@ -411,14 +411,13 @@ impl StreamsState { pub(in crate::connection) fn write_control_frames( &mut self, - buf: &mut Vec, + buf: &mut impl BufMut, pending: &mut Retransmits, retransmits: &mut ThinRetransmits, stats: &mut FrameStats, - max_size: usize, ) { // RESET_STREAM - while buf.len() + frame::ResetStream::SIZE_BOUND < max_size { + while buf.remaining_mut() > frame::ResetStream::SIZE_BOUND { let (id, error_code) = match pending.reset_stream.pop() { Some(x) => x, None => break, @@ -442,7 +441,7 @@ impl StreamsState { } // STOP_SENDING - while buf.len() + frame::StopSending::SIZE_BOUND < max_size { + while buf.remaining_mut() > frame::StopSending::SIZE_BOUND { let frame = match pending.stop_sending.pop() { Some(x) => x, None => break, @@ -461,7 +460,7 @@ impl StreamsState { } // MAX_DATA - if pending.max_data && buf.len() + 9 < max_size { + if pending.max_data && buf.remaining_mut() > 9 { pending.max_data = false; // `local_max_data` can grow bigger than `VarInt`. @@ -484,7 +483,7 @@ impl StreamsState { } // MAX_STREAM_DATA - while buf.len() + 17 < max_size { + while buf.remaining_mut() > 17 { let id = match pending.max_stream_data.iter().next() { Some(x) => *x, None => break, @@ -516,7 +515,7 @@ impl StreamsState { // MAX_STREAMS for dir in Dir::iter() { - if !pending.max_stream_id[dir as usize] || buf.len() + 9 >= max_size { + if !pending.max_stream_id[dir as usize] || buf.remaining_mut() <= 9 { continue; } @@ -541,21 +540,14 @@ impl StreamsState { pub(crate) fn write_stream_frames( &mut self, - buf: &mut Vec, - max_buf_size: usize, + buf: &mut impl BufMut, fair: bool, ) -> StreamMetaVec { let mut stream_frames = StreamMetaVec::new(); - while buf.len() + frame::Stream::SIZE_BOUND < max_buf_size { - if max_buf_size - .checked_sub(buf.len() + frame::Stream::SIZE_BOUND) - .is_none() - { - break; - } - - // Pop the stream of the highest priority that currently has pending data - // If the stream still has some pending data left after writing, it will be reinserted, otherwise not + while buf.remaining_mut() > frame::Stream::SIZE_BOUND { + // Pop the stream of the highest priority that currently has pending data. If + // the stream still has some pending data left after writing, it will be + // reinserted, otherwise not let Some(stream) = self.pending.pop() else { break; }; @@ -577,7 +569,7 @@ impl StreamsState { // Now that we know the `StreamId`, we can better account for how many bytes // are required to encode it. - let max_buf_size = max_buf_size - buf.len() - 1 - VarInt::size(id.into()); + let max_buf_size = buf.remaining_mut() - 1 - VarInt::size(id.into()); let (offsets, encode_length) = stream.pending.poll_transmit(max_buf_size); let fin = offsets.end == stream.pending.offset() && matches!(stream.state, SendState::DataSent { .. }); @@ -1380,7 +1372,7 @@ mod tests { high.write(b"high").unwrap(); let mut buf = Vec::with_capacity(40); - let meta = server.write_stream_frames(&mut buf, 40, true); + let meta = server.write_stream_frames(&mut buf, true); assert_eq!(meta[0].id, id_high); assert_eq!(meta[1].id, id_mid); assert_eq!(meta[2].id, id_low); @@ -1438,16 +1430,18 @@ mod tests { }; high.set_priority(-1).unwrap(); - let mut buf = Vec::with_capacity(1000); - let meta = server.write_stream_frames(&mut buf, 40, true); + let mut buf = Vec::with_capacity(1000).limit(40); + let meta = server.write_stream_frames(&mut buf, true); assert_eq!(meta.len(), 1); assert_eq!(meta[0].id, id_high); // After requeuing we should end up with 2 priorities - not 3 assert_eq!(server.pending.len(), 2); + let mut buf = buf.into_inner(); + // Send the remaining data. The initial mid priority one should go first now - let meta = server.write_stream_frames(&mut buf, 1000, true); + let meta = server.write_stream_frames(&mut buf, true); assert_eq!(meta.len(), 2); assert_eq!(meta[0].id, id_mid); assert_eq!(meta[1].id, id_high); @@ -1507,12 +1501,13 @@ mod tests { // loop until all the streams are written loop { - let buf_len = buf.len(); - let meta = server.write_stream_frames(&mut buf, buf_len + 40, fair); + let mut chunk_buf = buf.limit(40); + let meta = server.write_stream_frames(&mut chunk_buf, fair); if meta.is_empty() { break; } metas.extend(meta); + buf = chunk_buf.into_inner(); } assert!(!server.can_send_stream_data()); @@ -1575,11 +1570,12 @@ mod tests { stream_b.write(&[b'b'; 100]).unwrap(); let mut metas = vec![]; - let mut buf = Vec::with_capacity(1024); + let buf = Vec::with_capacity(1024); // Write the first chunk of stream_a - let buf_len = buf.len(); - let meta = server.write_stream_frames(&mut buf, buf_len + 40, false); + let mut chunk_buf = buf.limit(40); + let meta = server.write_stream_frames(&mut chunk_buf, false); + let mut buf = chunk_buf.into_inner(); assert!(!meta.is_empty()); metas.extend(meta); @@ -1595,8 +1591,9 @@ mod tests { // loop until all the streams are written loop { - let buf_len = buf.len(); - let meta = server.write_stream_frames(&mut buf, buf_len + 40, false); + let mut chunk_buf = buf.limit(40); + let meta = server.write_stream_frames(&mut chunk_buf, false); + buf = chunk_buf.into_inner(); if meta.is_empty() { break; } diff --git a/quinn-proto/src/connection/transmit_buf.rs b/quinn-proto/src/connection/transmit_buf.rs new file mode 100644 index 0000000000..df59a89ba4 --- /dev/null +++ b/quinn-proto/src/connection/transmit_buf.rs @@ -0,0 +1,233 @@ +use bytes::BufMut; +use tracing::trace; + +use crate::packet::BufLen; + +/// The buffer in which to write datagrams for [`Connection::poll_transmit`] +/// +/// The `poll_transmit` function writes zero or more datagrams to a buffer. Multiple +/// datagrams are possible in case GSO (Generic Segmentation Offload) is supported. +/// +/// This buffer tracks datagrams being written to it. There is always a "current" datagram, +/// which is started by calling [`TransmitBuf::start_new_datagram`]. Writing to the buffer +/// is done through the [`BufMut`] interface. +/// +/// Usually a datagram contains one QUIC packet, though QUIC-TRANSPORT 12.2 Coalescing +/// Packets allows for placing multiple packets into a single datagram provided all but the +/// last packet uses long headers. This is normally used during connection setup where often +/// the initial, handshake and sometimes even a 1-RTT packet can be coalesced into a single +/// datagram. +/// +/// Inside a single packet multiple QUIC frames are written. +/// +/// The buffer managed here is passed straight to the OS' `sendmsg` call (or variant) once +/// `poll_transmit` returns. So needs to contain the datagrams as they are sent on the +/// wire. +/// +/// [`Connection::poll_transmit`]: super::Connection::poll_transmit +#[derive(Debug)] +pub(super) struct TransmitBuf<'a> { + /// The buffer itself, packets are written to this buffer + buf: &'a mut Vec, + /// Offset into the buffer at which the current datagram starts + /// + /// Note that when coalescing packets this might be before the start of the current + /// packet. + datagram_start: usize, + /// The maximum offset allowed to be used for the current datagram in the buffer + /// + /// The first and last datagram in a batch are allowed to be smaller then the maximum + /// size. All datagrams in between need to be exactly this size. + buf_capacity: usize, + /// The maximum number of datagrams allowed to write into [`TransmitBuf::buf`] + max_datagrams: usize, + /// The number of datagrams already (partially) written into the buffer + /// + /// Incremented by a call to [`TransmitBuf::start_new_datagram`]. + pub(super) num_datagrams: usize, + /// The segment size of this GSO batch + /// + /// The segment size is the size of each datagram in the GSO batch, only the last + /// datagram in the batch may be smaller. + /// + /// For the first datagram this is set to the maximum size a datagram is allowed to be: + /// the current path MTU. After the first datagram is finished this is reduced to the + /// size of the first datagram and can no longer change. + segment_size: usize, +} + +impl<'a> TransmitBuf<'a> { + pub(super) fn new(buf: &'a mut Vec, max_datagrams: usize, mtu: usize) -> Self { + Self { + buf, + datagram_start: 0, + buf_capacity: 0, + max_datagrams, + num_datagrams: 0, + segment_size: mtu, + } + } + + /// Starts a datagram with a custom datagram size + /// + /// This is a specialized version of [`TransmitBuf::start_new_datagram`] which sets the + /// datagram size. Useful for e.g. PATH_CHALLENGE, tail-loss probes or MTU probes. + /// + /// After the first datagram you can never increase the segment size. If you decrease + /// the size of a datagram in a batch, it must be the last datagram of the batch. + pub(super) fn start_new_datagram_with_size(&mut self, datagram_size: usize) { + // Only reserve space for this datagram, usually it is the last one in the batch. + let max_capacity_hint = datagram_size; + self.new_datagram_inner(datagram_size, max_capacity_hint) + } + + /// Starts a new datagram in the transmit buffer + /// + /// If this starts the second datagram the segment size will be set to the size of the + /// first datagram. + /// + /// If the underlying buffer does not have enough capacity yet this will allocate enough + /// capacity for all the datagrams allowed in a single batch. Use + /// [`TransmitBuf::start_new_datagram_with_size`] if you know you will need less. + pub(super) fn start_new_datagram(&mut self) { + // We reserve the maximum space for sending `max_datagrams` upfront to avoid any + // reallocations if more datagrams have to be appended later on. Benchmarks have + // shown a 5-10% throughput improvement compared to continuously resizing the + // datagram buffer. While this will lead to over-allocation for small transmits + // (e.g. purely containing ACKs), modern memory allocators (e.g. mimalloc and + // jemalloc) will pool certain allocation sizes and therefore this is still rather + // efficient. + let max_capacity_hint = self.max_datagrams * self.segment_size; + self.new_datagram_inner(self.segment_size, max_capacity_hint) + } + + fn new_datagram_inner(&mut self, datagram_size: usize, max_capacity_hint: usize) { + debug_assert!(self.num_datagrams < self.max_datagrams); + if self.num_datagrams == 1 { + // Set the segment size to the size of the first datagram. + self.segment_size = self.buf.len(); + } + if self.num_datagrams >= 1 { + debug_assert!(datagram_size <= self.segment_size); + if datagram_size < self.segment_size { + // If this is a GSO batch and this datagram is smaller than the segment + // size, this must be the last datagram in the batch. + self.max_datagrams = self.num_datagrams + 1; + } + } + self.datagram_start = self.buf.len(); + debug_assert_eq!( + self.datagram_start % self.segment_size, + 0, + "datagrams in a GSO batch must be aligned to the segment size" + ); + self.buf_capacity = self.datagram_start + datagram_size; + if self.buf_capacity > self.buf.capacity() { + self.buf + .reserve_exact(max_capacity_hint.saturating_sub(self.buf.capacity())); + } + self.num_datagrams += 1; + } + + /// Clips the datagram size to the current size + /// + /// Only valid for the first datagram, when the datagram might be smaller than the + /// segment size. Needed before estimating the available space in the next datagram + /// based on [`TransmitBuf::segment_size`]. + /// + /// Use [`TransmitBuf::start_new_datagram_with_size`] if you need to reduce the size of + /// the last datagram in a batch. + pub(super) fn clip_datagram_size(&mut self) { + debug_assert_eq!(self.num_datagrams, 1); + if self.buf.len() < self.segment_size { + trace!( + segment_size = self.buf.len(), + prev_segment_size = self.segment_size, + "clipped datagram size" + ); + } + self.segment_size = self.buf.len(); + self.buf_capacity = self.buf.len(); + } + + /// Returns the GSO segment size + /// + /// This is also the maximum size datagrams are allowed to be. The first and last + /// datagram in a batch are allowed to be smaller however. After the first datagram the + /// segment size is clipped to the size of the first datagram. + /// + /// If the last datagram was created using [`TransmitBuf::start_new_datagram_with_size`] + /// the the segment size will be greater than the current datagram is allowed to be. + /// Thus [`TransmitBuf::datagram_remaining_mut`] should be used if you need to know the + /// amount of data that can be written into the datagram. + pub(super) fn segment_size(&self) -> usize { + self.segment_size + } + + /// Returns the number of datagrams written into the buffer + /// + /// The last datagram is not necessarily finished yet. + pub(super) fn num_datagrams(&self) -> usize { + self.num_datagrams + } + + /// Returns the maximum number of datagrams allowed to be written into the buffer + pub(super) fn max_datagrams(&self) -> usize { + self.max_datagrams + } + + /// Returns the start offset of the current datagram in the buffer + /// + /// In other words, this offset contains the first byte of the current datagram. + pub(super) fn datagram_start_offset(&self) -> usize { + self.datagram_start + } + + /// Returns the maximum offset in the buffer allowed for the current datagram + /// + /// The first and last datagram in a batch are allowed to be smaller then the maximum + /// size. All datagrams in between need to be exactly this size. + pub(super) fn datagram_max_offset(&self) -> usize { + self.buf_capacity + } + + /// Returns the number of bytes that may still be written into this datagram + pub(super) fn datagram_remaining_mut(&self) -> usize { + self.buf_capacity.saturating_sub(self.buf.len()) + } + + /// Returns `true` if the buffer did not have anything written into it + pub(super) fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// The number of bytes written into the buffer so far + pub(super) fn len(&self) -> usize { + self.buf.len() + } + + /// Returns the already written bytes in the buffer + pub(super) fn as_mut_slice(&mut self) -> &mut [u8] { + self.buf.as_mut_slice() + } +} + +unsafe impl BufMut for TransmitBuf<'_> { + fn remaining_mut(&self) -> usize { + self.buf.remaining_mut() + } + + unsafe fn advance_mut(&mut self, cnt: usize) { + self.buf.advance_mut(cnt); + } + + fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice { + self.buf.chunk_mut() + } +} + +impl BufLen for TransmitBuf<'_> { + fn len(&self) -> usize { + self.len() + } +} diff --git a/quinn-proto/src/frame.rs b/quinn-proto/src/frame.rs index 21fe0340c2..9c2ca47ca3 100644 --- a/quinn-proto/src/frame.rs +++ b/quinn-proto/src/frame.rs @@ -899,13 +899,13 @@ impl FrameStruct for Datagram { } impl Datagram { - pub(crate) fn encode(&self, length: bool, out: &mut Vec) { + pub(crate) fn encode(&self, length: bool, out: &mut impl BufMut) { out.write(FrameType(*DATAGRAM_TYS.start() | u64::from(length))); // 1 byte if length { // Safe to unwrap because we check length sanity before queueing datagrams out.write(VarInt::from_u64(self.data.len() as u64).unwrap()); // <= 8 bytes } - out.extend_from_slice(&self.data); + out.put_slice(&self.data); } pub(crate) fn size(&self, length: bool) -> usize { diff --git a/quinn-proto/src/packet.rs b/quinn-proto/src/packet.rs index b5ef0c4026..820668777d 100644 --- a/quinn-proto/src/packet.rs +++ b/quinn-proto/src/packet.rs @@ -219,6 +219,23 @@ impl PartialDecode { } } +/// A buffer that can tell how much has been written to it already +/// +/// This is commonly used for when a buffer is passed and the user may not write past a +/// given size. It allows the user of such a buffer to know the current cursor position in +/// the buffer. The maximum write size is usually passed in the same unit as +/// [`BufLen::len`]: bytes since the buffer start. +pub(crate) trait BufLen { + /// Returns the number of bytes written into the buffer so far + fn len(&self) -> usize; +} + +impl BufLen for Vec { + fn len(&self) -> usize { + self.len() + } +} + pub(crate) struct Packet { pub(crate) header: Header, pub(crate) header_data: Bytes, @@ -281,7 +298,7 @@ pub(crate) enum Header { } impl Header { - pub(crate) fn encode(&self, w: &mut Vec) -> PartialEncode { + pub(crate) fn encode(&self, w: &mut (impl BufMut + BufLen)) -> PartialEncode { use Header::*; let start = w.len(); match *self { @@ -894,6 +911,17 @@ impl SpaceId { pub fn iter() -> impl Iterator { [Self::Initial, Self::Handshake, Self::Data].iter().cloned() } + + /// Returns the next higher packet space. + /// + /// Keeps returning [`SpaceId::Data`] as the highest space. + pub fn next(&self) -> Self { + match self { + Self::Initial => Self::Handshake, + Self::Handshake => Self::Data, + Self::Data => Self::Data, + } + } } #[cfg(test)]