diff --git a/quinn-proto/src/connection/datagrams.rs b/quinn-proto/src/connection/datagrams.rs index c22e8d7155..01ee2a631d 100644 --- a/quinn-proto/src/connection/datagrams.rs +++ b/quinn-proto/src/connection/datagrams.rs @@ -1,6 +1,6 @@ use std::collections::VecDeque; -use bytes::Bytes; +use bytes::{BufMut, Bytes}; use thiserror::Error; use tracing::{debug, trace}; @@ -163,13 +163,13 @@ impl DatagramState { /// /// Returns whether a frame was written. At most `max_size` bytes will be written, including /// framing. - pub(super) fn write(&mut self, buf: &mut Vec, max_size: usize) -> bool { + pub(super) fn write(&mut self, buf: &mut impl BufMut) -> bool { let datagram = match self.outgoing.pop_front() { Some(x) => x, None => return false, }; - if buf.len() + datagram.size(true) > max_size { + if datagram.size(true) > buf.remaining_mut() { // Future work: we could be more clever about cramming small datagrams into // mostly-full packets when a larger one is queued first self.outgoing.push_front(datagram); diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 10a4fc0efd..a0a9fedda8 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -7,7 +7,7 @@ use std::{ sync::Arc, }; -use bytes::{Bytes, BytesMut}; +use bytes::{BufMut, Bytes, BytesMut}; use frame::StreamMetaVec; use rand::{Rng, SeedableRng, rngs::StdRng}; use thiserror::Error; @@ -21,6 +21,7 @@ use crate::{ cid_queue::CidQueue, coding::BufMutExt, config::{ServerConfig, TransportConfig}, + congestion::Controller, crypto::{self, KeyPair, Keys, PacketKey}, frame::{self, Close, Datagram, FrameStruct, NewToken}, packet::{ @@ -85,9 +86,12 @@ pub use streams::{ }; mod timer; -use crate::congestion::Controller; use timer::{Timer, TimerTable}; +mod transmit_builder; +pub(crate) use transmit_builder::DatagramBuffer; +use transmit_builder::TransmitBuilder; + /// Protocol state and logic for a single QUIC connection /// /// Objects of this type receive [`ConnectionEvent`]s and emit [`EndpointEvent`]s and application @@ -455,14 +459,9 @@ impl Connection { false => 1, true => max_datagrams, }; + let mut transmit = TransmitBuilder::new(buf, max_datagrams, self.path.current_mtu().into()); - let mut num_datagrams = 0; - // Position in `buf` of the first byte of the current UDP datagram. When coalescing QUIC - // packets, this can be earlier than the start of the current QUIC packet. - let mut datagram_start = 0; - let mut segment_size = usize::from(self.path.current_mtu()); - - if let Some(challenge) = self.send_path_challenge(now, buf) { + if let Some(challenge) = self.send_path_challenge(now, &mut transmit) { return Some(challenge); } @@ -500,11 +499,6 @@ impl Connection { && self.peer_supports_ack_frequency(); } - // Reserving capacity can provide more capacity than we asked for. However, we are not - // allowed to write more than `segment_size`. Therefore the maximum capacity is tracked - // separately. - let mut buf_capacity = 0; - let mut coalesce = true; let mut builder_storage: Option = None; let mut sent_frames = None; @@ -525,8 +519,9 @@ impl Connection { // don't account for coalesced packets potentially occupying space because frames can // always spill into the next datagram. let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]); - let frame_space_1rtt = - segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn))); + let frame_space_1rtt = transmit + .segment_size() + .saturating_sub(self.predict_1rtt_overhead(Some(pn))); // Is there data or a close message to send in this space? let can_send = self.space_can_send(space_id, frame_space_1rtt); @@ -542,13 +537,13 @@ impl Connection { ack_eliciting |= self.can_send_1rtt(frame_space_1rtt); } - // Can we append more data into the current buffer? - // It is not safe to assume that `buf.len()` is the end of the data, - // since the last packet might not have been finished. + // Can we append more data into the current datagram? + // It is not safe to assume that `transmits.datagram().len()` is the end of the + // data, since the last packet might not have been finished. let buf_end = if let Some(builder) = &builder_storage { - buf.len().max(builder.min_size) + builder.tag_len + transmit.datagram().len().max(builder.min_size) + builder.tag_len } else { - buf.len() + transmit.datagram().len() }; let tag_len = if let Some(ref crypto) = self.spaces[space_id].crypto { @@ -560,11 +555,17 @@ impl Connection { } else { unreachable!("tried to send {:?} packet without keys", space_id) }; - if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE + tag_len { + + // We are NOT coalescing (the default is we are, so this was turned off in an + // earlier iteration) OR there is not enough space for another *packet* in this + // datagram (buf_capacity - buf_end == unused space in datagram). + if !coalesce + || transmit.datagram_mut().capacity() - buf_end < MIN_PACKET_SPACE + tag_len + { // We need to send 1 more datagram and extend the buffer for that. // Is 1 more datagram allowed? - if num_datagrams >= max_datagrams { + if transmit.has_datagram_capacity() { // No more datagrams allowed break; } @@ -577,7 +578,7 @@ impl Connection { // (see https://github.com/quinn-rs/quinn/issues/1082) if self .path - .anti_amplification_blocked(segment_size as u64 * (num_datagrams as u64) + 1) + .anti_amplification_blocked(transmit.capacity() as u64 + 1) { trace!("blocked by anti-amplification"); break; @@ -588,13 +589,13 @@ impl Connection { if ack_eliciting && self.spaces[space_id].loss_probes == 0 { // Assume the current packet will get padded to fill the segment let untracked_bytes = if let Some(builder) = &builder_storage { - buf_capacity - builder.partial_encode.start + transmit.datagram().len() - builder.partial_encode.start } else { 0 } as u64; - debug_assert!(untracked_bytes <= segment_size as u64); + debug_assert!(untracked_bytes <= transmit.segment_size() as u64); - let bytes_to_send = segment_size as u64 + untracked_bytes; + let bytes_to_send = transmit.segment_size() as u64 + untracked_bytes; if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() { space_idx += 1; congestion_blocked = true; @@ -628,7 +629,7 @@ impl Connection { builder.pad_to(MIN_INITIAL_SIZE); } - if num_datagrams > 1 { + if !transmit.is_first_datagram() { // If too many padding bytes would be required to continue the GSO batch // after this packet, end the GSO batch here. Ensures that fixed-size frames // with heterogeneous sizes (e.g. application datagrams) won't inadvertently @@ -642,15 +643,16 @@ impl Connection { // MTU. Loss probes are the only packets for which we might grow // `buf_capacity` by less than `segment_size`. const MAX_PADDING: usize = 16; - let packet_len_unpadded = cmp::max(builder.min_size, buf.len()) - - datagram_start - + builder.tag_len; - if packet_len_unpadded + MAX_PADDING < segment_size - || datagram_start + segment_size > buf_capacity + let packet_len_unpadded = + cmp::max(builder.min_size, transmit.datagram().len()) + - builder.partial_encode.start + + builder.tag_len; + if packet_len_unpadded + MAX_PADDING < transmit.segment_size() + || transmit.datagram_mut().capacity() < transmit.segment_size() { trace!( "GSO truncated by demand for {} padding bytes or loss probe", - segment_size - packet_len_unpadded + transmit.segment_size() - packet_len_unpadded ); builder_storage = Some(builder); break; @@ -658,32 +660,31 @@ impl Connection { // Pad the current datagram to GSO segment size so it can be included in the // GSO batch. - builder.pad_to(segment_size as u16); + builder.pad_to(transmit.segment_size() as u16); } - builder.finish_and_track(now, self, sent_frames.take(), buf); - - if num_datagrams == 1 { - // Set the segment size for this GSO batch to the size of the first UDP - // datagram in the batch. Larger data that cannot be fragmented - // (e.g. application datagrams) will be included in a future batch. When - // sending large enough volumes of data for GSO to be useful, we expect - // packet sizes to usually be consistent, e.g. populated by max-size STREAM - // frames or uniformly sized datagrams. - segment_size = buf.len(); - // Clip the unused capacity out of the buffer so future packets don't - // overrun - buf_capacity = buf.len(); - - // Check whether the data we planned to send will fit in the reduced segment - // size. If not, bail out and leave it for the next GSO batch so we don't - // end up trying to send an empty packet. We can't easily compute the right - // segment size before the original call to `space_can_send`, because at - // that time we haven't determined whether we're going to coalesce with the - // first datagram or potentially pad it to `MIN_INITIAL_SIZE`. + builder.finish_and_track( + now, + self, + sent_frames.take(), + &mut transmit.datagram_mut(), + ); + + if transmit.is_first_datagram() { + transmit.clip_datagram_size(); if space_id == SpaceId::Data { - let frame_space_1rtt = - segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn))); + // Now that we know the size of the first datagram, check + // whether the data we planned to send will fit in the next + // segment. If not, bails out and leave it for the next GSO + // batch. We can't easily compute the right segment size before + // the original call to `space_can_send`, because at that time + // we haven't determined whether we're going to coalesce with + // the first datagram or potentially pad it to + // `MIN_INITIAL_SIZE`. + + let frame_space_1rtt = transmit + .segment_size() + .saturating_sub(self.predict_1rtt_overhead(Some(pn))); if self.space_can_send(space_id, frame_space_1rtt).is_empty() { break; } @@ -691,49 +692,37 @@ impl Connection { } } - // Allocate space for another datagram - let next_datagram_size_limit = match self.spaces[space_id].loss_probes { - 0 => segment_size, + // Start the next datagram + match self.spaces[space_id].loss_probes { + 0 => transmit.start_new_datagram(), _ => { self.spaces[space_id].loss_probes -= 1; // Clamp the datagram to at most the minimum MTU to ensure that loss probes // can get through and enable recovery even if the path MTU has shrank // unexpectedly. - std::cmp::min(segment_size, usize::from(INITIAL_MTU)) + transmit.start_new_datagram_with_size(std::cmp::min( + usize::from(INITIAL_MTU), + transmit.segment_size(), + )); } }; - buf_capacity += next_datagram_size_limit; - if buf.capacity() < buf_capacity { - // We reserve the maximum space for sending `max_datagrams` upfront - // to avoid any reallocations if more datagrams have to be appended later on. - // Benchmarks have shown shown a 5-10% throughput improvement - // compared to continuously resizing the datagram buffer. - // While this will lead to over-allocation for small transmits - // (e.g. purely containing ACKs), modern memory allocators - // (e.g. mimalloc and jemalloc) will pool certain allocation sizes - // and therefore this is still rather efficient. - buf.reserve(max_datagrams * segment_size); - } - num_datagrams += 1; coalesce = true; pad_datagram = false; - datagram_start = buf.len(); - - debug_assert_eq!( - datagram_start % segment_size, - 0, - "datagrams in a GSO batch must be aligned to the segment size" - ); } else { // We can append/coalesce the next packet into the current // datagram. // Finish current packet without adding extra padding if let Some(builder) = builder_storage.take() { - builder.finish_and_track(now, self, sent_frames.take(), buf); + builder.finish_and_track( + now, + self, + sent_frames.take(), + &mut transmit.datagram_mut(), + ); } } - debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE); + debug_assert!(transmit.datagram_mut().remaining_mut() >= MIN_PACKET_SPACE); // // From here on, we've determined that a packet will definitely be sent. @@ -760,9 +749,7 @@ impl Connection { now, space_id, self.rem_cids.active(), - buf, - buf_capacity, - datagram_start, + &mut transmit.datagram_mut(), ack_eliciting, self, )?); @@ -774,6 +761,8 @@ impl Connection { if close { trace!("sending CONNECTION_CLOSE"); + let mut datagram = transmit.datagram_mut(); + // Encode ACKs before the ConnectionClose message, to give the receiver // a better approximate on what data has been processed. This is // especially important with ack delay, since the peer might not @@ -784,7 +773,7 @@ impl Connection { self.receiving_ecn, &mut SentFrames::default(), &mut self.spaces[space_id], - buf, + &mut datagram, &mut self.stats, ); } @@ -793,22 +782,22 @@ impl Connection { // to encode the ConnectionClose frame too. However we still have the // check here to prevent crashes if something changes. debug_assert!( - buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size, - "ACKs should leave space for ConnectionClose" + datagram.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size, + "ACKS should leave space for ConnectionClose" ); - if buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size { - let max_frame_size = builder.max_size - buf.len(); + if datagram.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size { + let max_frame_size = builder.max_size - datagram.len(); match self.state { State::Closed(state::Closed { ref reason }) => { if space_id == SpaceId::Data || reason.is_transport_layer() { - reason.encode(buf, max_frame_size) + reason.encode(&mut datagram, max_frame_size) } else { frame::ConnectionClose { error_code: TransportErrorCode::APPLICATION_ERROR, frame_type: None, reason: Bytes::new(), } - .encode(buf, max_frame_size) + .encode(&mut datagram, max_frame_size) } } State::Draining => frame::ConnectionClose { @@ -816,7 +805,7 @@ impl Connection { frame_type: None, reason: Bytes::new(), } - .encode(buf, max_frame_size), + .encode(&mut datagram, max_frame_size), _ => unreachable!( "tried to make a close packet when the connection wasn't closed" ), @@ -838,14 +827,15 @@ impl Connection { // Send an off-path PATH_RESPONSE. Prioritized over on-path data to ensure that path // validation can occur while the link is saturated. - if space_id == SpaceId::Data && num_datagrams == 1 { + if space_id == SpaceId::Data && transmit.is_first_datagram() { + let mut datagram = transmit.datagram_mut(); if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) { // `unwrap` guaranteed to succeed because `builder_storage` was populated just // above. let mut builder = builder_storage.take().unwrap(); trace!("PATH_RESPONSE {:08x} (off-path)", token); - buf.write(frame::FrameType::PATH_RESPONSE); - buf.write(token); + datagram.write(frame::FrameType::PATH_RESPONSE); + datagram.write(token); self.stats.frame_tx.path_response += 1; builder.pad_to(MIN_INITIAL_SIZE); builder.finish_and_track( @@ -855,12 +845,12 @@ impl Connection { non_retransmits: true, ..SentFrames::default() }), - buf, + &mut datagram, ); - self.stats.udp_tx.on_sent(1, buf.len()); + self.stats.udp_tx.on_sent(1, transmit.len()); return Some(Transmit { destination: remote, - size: buf.len(), + size: transmit.len(), ecn: None, segment_size: None, src_ip: self.local_ip, @@ -868,8 +858,12 @@ impl Connection { } } - let sent = - self.populate_packet(now, space_id, buf, builder.max_size, builder.exact_number); + let sent = { + let datagram = transmit.datagram_mut(); + let frame_space = builder.max_size - datagram.len(); + let mut buf = datagram.limit(frame_space); + self.populate_packet(now, space_id, &mut buf, builder.exact_number) + }; // ACK-only packets should only be sent when explicitly allowed. If we write them due to // any other reason, there is a bug which leads to one component announcing write @@ -881,7 +875,7 @@ impl Connection { !(sent.is_ack_only(&self.streams) && !can_send.acks && can_send.other - && (buf_capacity - builder.datagram_start) == self.path.current_mtu() as usize + && transmit.datagram_mut().capacity() == self.path.current_mtu() as usize && self.datagrams.outgoing.is_empty()), "SendableFrames was {can_send:?}, but only ACKs have been written" ); @@ -905,43 +899,41 @@ impl Connection { builder.pad_to(MIN_INITIAL_SIZE); } let last_packet_number = builder.exact_number; - builder.finish_and_track(now, self, sent_frames, buf); + builder.finish_and_track(now, self, sent_frames, &mut transmit.datagram_mut()); self.path .congestion - .on_sent(now, buf.len() as u64, last_packet_number); + .on_sent(now, transmit.len() as u64, last_packet_number); } - self.app_limited = buf.is_empty() && !congestion_blocked; + self.app_limited = transmit.is_empty() && !congestion_blocked; // Send MTU probe if necessary - if buf.is_empty() && self.state.is_established() { + if transmit.is_empty() && self.state.is_established() { let space_id = SpaceId::Data; let probe_size = self .path .mtud .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))?; - let buf_capacity = probe_size as usize; - buf.reserve(buf_capacity); - + debug_assert_eq!(transmit.num_datagrams(), 0); + transmit.start_new_datagram_with_size(probe_size as usize); + let mut datagram = transmit.datagram_mut(); let mut builder = PacketBuilder::new( now, space_id, self.rem_cids.active(), - buf, - buf_capacity, - 0, + &mut datagram, true, self, )?; // We implement MTU probes as ping packets padded up to the probe size - buf.write(frame::FrameType::PING); + datagram.write(frame::FrameType::PING); self.stats.frame_tx.ping += 1; // If supported by the peer, we want no delays to the probe's ACK if self.peer_supports_ack_frequency() { - buf.write(frame::FrameType::IMMEDIATE_ACK); + datagram.write(frame::FrameType::IMMEDIATE_ACK); self.stats.frame_tx.immediate_ack += 1; } @@ -950,41 +942,42 @@ impl Connection { non_retransmits: true, ..Default::default() }; - builder.finish_and_track(now, self, Some(sent_frames), buf); + builder.finish_and_track(now, self, Some(sent_frames), &mut datagram); self.stats.path.sent_plpmtud_probes += 1; - num_datagrams = 1; trace!(?probe_size, "writing MTUD probe"); } - if buf.is_empty() { + if transmit.is_empty() { return None; } - trace!("sending {} bytes in {} datagrams", buf.len(), num_datagrams); - self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64); + trace!( + "sending {} bytes in {} datagrams", + transmit.len(), + transmit.num_datagrams() + ); + self.path.total_sent = self.path.total_sent.saturating_add(transmit.len() as u64); - self.stats.udp_tx.on_sent(num_datagrams as u64, buf.len()); + self.stats + .udp_tx + .on_sent(transmit.num_datagrams() as u64, transmit.len()); - Some(Transmit { - destination: self.path.remote, - size: buf.len(), - ecn: if self.path.sending_ecn { - Some(EcnCodepoint::Ect0) - } else { - None - }, - segment_size: match num_datagrams { - 1 => None, - _ => Some(segment_size), - }, - src_ip: self.local_ip, - }) + let ecn = if self.path.sending_ecn { + Some(EcnCodepoint::Ect0) + } else { + None + }; + Some(transmit.build(self.path.remote, ecn, self.local_ip)) } /// Send PATH_CHALLENGE for a previous path if necessary - fn send_path_challenge(&mut self, now: Instant, buf: &mut Vec) -> Option { + fn send_path_challenge( + &mut self, + now: Instant, + buf: &mut TransmitBuilder<'_>, + ) -> Option { let (prev_cid, prev_path) = self.prev_path.as_mut()?; if !prev_path.challenge_pending { return None; @@ -999,28 +992,25 @@ impl Connection { SpaceId::Data, "PATH_CHALLENGE queued without 1-RTT keys" ); - buf.reserve(MIN_INITIAL_SIZE as usize); - - let buf_capacity = buf.capacity(); + buf.start_new_datagram_with_size(MIN_INITIAL_SIZE as usize); // Use the previous CID to avoid linking the new path with the previous path. We // don't bother accounting for possible retirement of that prev_cid because this is // sent once, immediately after migration, when the CID is known to be valid. Even // if a post-migration packet caused the CID to be retired, it's fair to pretend // this is sent first. + debug_assert!(buf.datagram().is_empty()); let mut builder = PacketBuilder::new( now, SpaceId::Data, *prev_cid, - buf, - buf_capacity, - 0, + &mut buf.datagram_mut(), false, self, )?; trace!("validating previous path with PATH_CHALLENGE {:08x}", token); - buf.write(frame::FrameType::PATH_CHALLENGE); - buf.write(token); + buf.datagram_mut().write(frame::FrameType::PATH_CHALLENGE); + buf.datagram_mut().write(token); self.stats.frame_tx.path_challenge += 1; // An endpoint MUST expand datagrams that contain a PATH_CHALLENGE frame @@ -1029,7 +1019,7 @@ impl Connection { // sending a datagram of this size builder.pad_to(MIN_INITIAL_SIZE); - builder.finish(self, buf); + builder.finish(self, &mut buf.datagram_mut()); self.stats.udp_tx.on_sent(1, buf.len()); Some(Transmit { @@ -3055,8 +3045,7 @@ impl Connection { &mut self, now: Instant, space_id: SpaceId, - buf: &mut Vec, - max_size: usize, + buf: &mut impl BufMut, pn: u64, ) -> SentFrames { let mut sent = SentFrames::default(); @@ -3132,7 +3121,7 @@ impl Connection { } // PATH_CHALLENGE - if buf.len() + 9 < max_size && space_id == SpaceId::Data { + if 9 < buf.remaining_mut() && space_id == SpaceId::Data { // Transmit challenges with every outgoing frame on an unvalidated path if let Some(token) = self.path.challenge { // But only send a packet solely for that purpose at most once @@ -3147,7 +3136,7 @@ impl Connection { } // PATH_RESPONSE - if buf.len() + 9 < max_size && space_id == SpaceId::Data { + if 9 < buf.remaining_mut() && space_id == SpaceId::Data { if let Some(token) = self.path_responses.pop_on_path(self.path.remote) { sent.non_retransmits = true; sent.requires_padding = true; @@ -3159,7 +3148,7 @@ impl Connection { } // CRYPTO - while buf.len() + frame::Crypto::SIZE_BOUND < max_size && !is_0rtt { + while frame::Crypto::SIZE_BOUND < buf.remaining_mut() && !is_0rtt { let mut frame = match space.pending.crypto.pop_front() { Some(x) => x, None => break, @@ -3169,8 +3158,7 @@ impl Connection { // Since the offset is known, we can reserve the exact size required to encode it. // For length we reserve 2bytes which allows to encode up to 2^14, // which is more than what fits into normally sized QUIC frames. - let max_crypto_data_size = max_size - - buf.len() + let max_crypto_data_size = buf.remaining_mut() - 1 // Frame Type - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) }) - 2; // Maximum encoded length for frame size, given we send less than 2^14 bytes @@ -3206,12 +3194,11 @@ impl Connection { &mut space.pending, &mut sent.retransmits, &mut self.stats.frame_tx, - max_size, ); } // NEW_CONNECTION_ID - while buf.len() + 44 < max_size { + while 44 < buf.remaining_mut() { let issued = match space.pending.new_cids.pop() { Some(x) => x, None => break, @@ -3233,7 +3220,7 @@ impl Connection { } // RETIRE_CONNECTION_ID - while buf.len() + frame::RETIRE_CONNECTION_ID_SIZE_BOUND < max_size { + while frame::RETIRE_CONNECTION_ID_SIZE_BOUND < buf.remaining_mut() { let seq = match space.pending.retire_cids.pop() { Some(x) => x, None => break, @@ -3247,8 +3234,8 @@ impl Connection { // DATAGRAM let mut sent_datagrams = false; - while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data { - match self.datagrams.write(buf, max_size) { + while Datagram::SIZE_BOUND < buf.remaining_mut() && space_id == SpaceId::Data { + match self.datagrams.write(buf) { true => { sent_datagrams = true; sent.non_retransmits = true; @@ -3288,7 +3275,7 @@ impl Connection { token: token.encode(&*server_config.token_key).into(), }; - if buf.len() + new_token.size() >= max_size { + if new_token.size() >= buf.remaining_mut() { space.pending.new_tokens.push(remote_addr); break; } @@ -3303,9 +3290,9 @@ impl Connection { // STREAM if space_id == SpaceId::Data { - sent.stream_frames = - self.streams - .write_stream_frames(buf, max_size, self.config.send_fairness); + sent.stream_frames = self + .streams + .write_stream_frames(buf, self.config.send_fairness); self.stats.frame_tx.stream += sent.stream_frames.len() as u64; } @@ -3321,7 +3308,7 @@ impl Connection { receiving_ecn: bool, sent: &mut SentFrames, space: &mut PacketSpace, - buf: &mut Vec, + buf: &mut impl BufMut, stats: &mut ConnectionStats, ) { debug_assert!(!space.pending_acks.ranges().is_empty()); diff --git a/quinn-proto/src/connection/packet_builder.rs b/quinn-proto/src/connection/packet_builder.rs index d99f012b1a..780a643435 100644 --- a/quinn-proto/src/connection/packet_builder.rs +++ b/quinn-proto/src/connection/packet_builder.rs @@ -1,8 +1,8 @@ -use bytes::Bytes; +use bytes::{BufMut, Bytes}; use rand::Rng; use tracing::{trace, trace_span}; -use super::{Connection, SentFrames, spaces::SentPacket}; +use super::{Connection, DatagramBuffer, SentFrames, spaces::SentPacket}; use crate::{ ConnectionId, Instant, TransportError, TransportErrorCode, connection::ConnectionSide, @@ -11,17 +11,17 @@ use crate::{ }; pub(super) struct PacketBuilder { - pub(super) datagram_start: usize, pub(super) space: SpaceId, pub(super) partial_encode: PartialEncode, pub(super) ack_eliciting: bool, pub(super) exact_number: u64, pub(super) short_header: bool, - /// Smallest absolute position in the associated buffer that must be occupied by this packet's - /// frames + /// The smallest datagram offset that must be occupied by this packet's frames + /// + /// This is the smallest offset into the datagram this packet is being written into, + /// that must contain frames for this packet. pub(super) min_size: usize, - /// Largest absolute position in the associated buffer that may be occupied by this packet's - /// frames + /// The largest datagram offset that may be occupied by this packet's frames pub(super) max_size: usize, pub(super) tag_len: usize, pub(super) _span: tracing::span::EnteredSpan, @@ -36,9 +36,7 @@ impl PacketBuilder { now: Instant, space_id: SpaceId, dst_cid: ConnectionId, - buffer: &mut Vec, - buffer_capacity: usize, - datagram_start: usize, + datagram: &mut DatagramBuffer<'_>, ack_eliciting: bool, conn: &mut Connection, ) -> Option { @@ -122,9 +120,9 @@ impl PacketBuilder { version, }), }; - let partial_encode = header.encode(buffer); + let partial_encode = header.encode(datagram); if conn.peer_params.grease_quic_bit && conn.rng.random() { - buffer[partial_encode.start] ^= FIXED_BIT; + datagram[partial_encode.start] ^= FIXED_BIT; } let (sample_size, tag_len) = if let Some(ref crypto) = space.crypto { @@ -148,14 +146,13 @@ impl PacketBuilder { // pn_len + payload_len + tag_len >= sample_size + 4 // payload_len >= sample_size + 4 - pn_len - tag_len let min_size = Ord::max( - buffer.len() + (sample_size + 4).saturating_sub(number.len() + tag_len), + datagram.len() + (sample_size + 4).saturating_sub(number.len() + tag_len), partial_encode.start + dst_cid.len() + 6, ); - let max_size = buffer_capacity - tag_len; + let max_size = datagram.capacity() - tag_len; debug_assert!(max_size >= min_size); Some(Self { - datagram_start, space: space_id, partial_encode, exact_number, @@ -174,10 +171,7 @@ impl PacketBuilder { // The datagram might already have a larger minimum size than the caller is requesting, if // e.g. we're coalescing packets and have populated more than `min_size` bytes with packets // already. - self.min_size = Ord::max( - self.min_size, - self.datagram_start + (min_size as usize) - self.tag_len, - ); + self.min_size = Ord::max(self.min_size, (min_size as usize) - self.tag_len); } pub(super) fn finish_and_track( @@ -185,12 +179,12 @@ impl PacketBuilder { now: Instant, conn: &mut Connection, sent: Option, - buffer: &mut Vec, + datagram: &mut DatagramBuffer<'_>, ) { let ack_eliciting = self.ack_eliciting; let exact_number = self.exact_number; let space_id = self.space; - let (size, padded) = self.finish(conn, buffer); + let (size, padded) = self.finish(conn, datagram); let sent = match sent { Some(sent) => sent, None => return, @@ -228,11 +222,16 @@ impl PacketBuilder { } /// Encrypt packet, returning the length of the packet and whether padding was added - pub(super) fn finish(self, conn: &mut Connection, buffer: &mut Vec) -> (usize, bool) { - let pad = buffer.len() < self.min_size; + pub(super) fn finish( + self, + conn: &mut Connection, + datagram: &mut DatagramBuffer<'_>, + ) -> (usize, bool) { + let pad = self.min_size > datagram.len(); if pad { - trace!("PADDING * {}", self.min_size - buffer.len()); - buffer.resize(self.min_size, 0); + let padding_bytes = self.min_size - datagram.len(); + trace!("PADDING * {padding_bytes}"); + datagram.put_bytes(0, padding_bytes); } let space = &conn.spaces[self.space]; @@ -251,15 +250,15 @@ impl PacketBuilder { "Mismatching crypto tag len" ); - buffer.resize(buffer.len() + packet_crypto.tag_len(), 0); + datagram.put_bytes(0, packet_crypto.tag_len()); let encode_start = self.partial_encode.start; - let packet_buf = &mut buffer[encode_start..]; + let packet_buf = &mut datagram[encode_start..]; self.partial_encode.finish( packet_buf, header_crypto, Some((self.exact_number, packet_crypto)), ); - (buffer.len() - encode_start, pad) + (datagram.len() - encode_start, pad) } } diff --git a/quinn-proto/src/connection/streams/state.rs b/quinn-proto/src/connection/streams/state.rs index e05311423d..c4424823b7 100644 --- a/quinn-proto/src/connection/streams/state.rs +++ b/quinn-proto/src/connection/streams/state.rs @@ -411,14 +411,13 @@ impl StreamsState { pub(in crate::connection) fn write_control_frames( &mut self, - buf: &mut Vec, + buf: &mut impl BufMut, pending: &mut Retransmits, retransmits: &mut ThinRetransmits, stats: &mut FrameStats, - max_size: usize, ) { // RESET_STREAM - while buf.len() + frame::ResetStream::SIZE_BOUND < max_size { + while frame::ResetStream::SIZE_BOUND < buf.remaining_mut() { let (id, error_code) = match pending.reset_stream.pop() { Some(x) => x, None => break, @@ -442,7 +441,7 @@ impl StreamsState { } // STOP_SENDING - while buf.len() + frame::StopSending::SIZE_BOUND < max_size { + while frame::StopSending::SIZE_BOUND < buf.remaining_mut() { let frame = match pending.stop_sending.pop() { Some(x) => x, None => break, @@ -461,7 +460,7 @@ impl StreamsState { } // MAX_DATA - if pending.max_data && buf.len() + 9 < max_size { + if pending.max_data && 9 < buf.remaining_mut() { pending.max_data = false; // `local_max_data` can grow bigger than `VarInt`. @@ -484,7 +483,7 @@ impl StreamsState { } // MAX_STREAM_DATA - while buf.len() + 17 < max_size { + while 17 < buf.remaining_mut() { let id = match pending.max_stream_data.iter().next() { Some(x) => *x, None => break, @@ -516,7 +515,7 @@ impl StreamsState { // MAX_STREAMS for dir in Dir::iter() { - if !pending.max_stream_id[dir as usize] || buf.len() + 9 >= max_size { + if !pending.max_stream_id[dir as usize] || 9 >= buf.remaining_mut() { continue; } @@ -541,19 +540,11 @@ impl StreamsState { pub(crate) fn write_stream_frames( &mut self, - buf: &mut Vec, - max_buf_size: usize, + buf: &mut impl BufMut, fair: bool, ) -> StreamMetaVec { let mut stream_frames = StreamMetaVec::new(); - while buf.len() + frame::Stream::SIZE_BOUND < max_buf_size { - if max_buf_size - .checked_sub(buf.len() + frame::Stream::SIZE_BOUND) - .is_none() - { - break; - } - + while frame::Stream::SIZE_BOUND < buf.remaining_mut() { // Pop the stream of the highest priority that currently has pending data // If the stream still has some pending data left after writing, it will be reinserted, otherwise not let Some(stream) = self.pending.pop() else { @@ -577,7 +568,7 @@ impl StreamsState { // Now that we know the `StreamId`, we can better account for how many bytes // are required to encode it. - let max_buf_size = max_buf_size - buf.len() - 1 - VarInt::size(id.into()); + let max_buf_size = buf.remaining_mut() - 1 - VarInt::size(id.into()); let (offsets, encode_length) = stream.pending.poll_transmit(max_buf_size); let fin = offsets.end == stream.pending.offset() && matches!(stream.state, SendState::DataSent { .. }); @@ -1379,8 +1370,8 @@ mod tests { high.set_priority(1).unwrap(); high.write(b"high").unwrap(); - let mut buf = Vec::with_capacity(40); - let meta = server.write_stream_frames(&mut buf, 40, true); + let buf = Vec::with_capacity(40); + let meta = server.write_stream_frames(&mut buf.limit(40), true); assert_eq!(meta[0].id, id_high); assert_eq!(meta[1].id, id_mid); assert_eq!(meta[2].id, id_low); @@ -1438,8 +1429,10 @@ mod tests { }; high.set_priority(-1).unwrap(); - let mut buf = Vec::with_capacity(1000); - let meta = server.write_stream_frames(&mut buf, 40, true); + let buf = Vec::with_capacity(1000); + let mut buf = buf.limit(40); + let meta = server.write_stream_frames(&mut buf, true); + let buf = buf.into_inner(); assert_eq!(meta.len(), 1); assert_eq!(meta[0].id, id_high); @@ -1447,7 +1440,7 @@ mod tests { assert_eq!(server.pending.len(), 2); // Send the remaining data. The initial mid priority one should go first now - let meta = server.write_stream_frames(&mut buf, 1000, true); + let meta = server.write_stream_frames(&mut buf.limit(1000), true); assert_eq!(meta.len(), 2); assert_eq!(meta[0].id, id_mid); assert_eq!(meta[1].id, id_high); @@ -1507,8 +1500,9 @@ mod tests { // loop until all the streams are written loop { - let buf_len = buf.len(); - let meta = server.write_stream_frames(&mut buf, buf_len + 40, fair); + let mut lbuf = buf.limit(40); + let meta = server.write_stream_frames(&mut lbuf, fair); + buf = lbuf.into_inner(); if meta.is_empty() { break; } @@ -1575,11 +1569,12 @@ mod tests { stream_b.write(&[b'b'; 100]).unwrap(); let mut metas = vec![]; - let mut buf = Vec::with_capacity(1024); + let buf = Vec::with_capacity(1024); // Write the first chunk of stream_a - let buf_len = buf.len(); - let meta = server.write_stream_frames(&mut buf, buf_len + 40, false); + let mut buf = buf.limit(40); + let meta = server.write_stream_frames(&mut buf, false); + let mut buf = buf.into_inner(); assert!(!meta.is_empty()); metas.extend(meta); @@ -1595,8 +1590,9 @@ mod tests { // loop until all the streams are written loop { - let buf_len = buf.len(); - let meta = server.write_stream_frames(&mut buf, buf_len + 40, false); + let mut lbuf = buf.limit(40); + let meta = server.write_stream_frames(&mut lbuf, false); + buf = lbuf.into_inner(); if meta.is_empty() { break; } diff --git a/quinn-proto/src/connection/transmit_builder.rs b/quinn-proto/src/connection/transmit_builder.rs new file mode 100644 index 0000000000..349abc1f1b --- /dev/null +++ b/quinn-proto/src/connection/transmit_builder.rs @@ -0,0 +1,291 @@ +use std::net::{IpAddr, SocketAddr}; +use std::ops::{Deref, DerefMut}; + +use bytes::BufMut; + +use crate::{EcnCodepoint, Transmit}; + +/// The buffer in which to write datagrams for [`Connection::poll_transmit`] +/// +/// The `poll_transmit` function writes zero or more datagrams to a buffer. Multiple +/// datagrams are possible in case GSO (Generic Segmentation Offload) is supported. +/// +/// This buffer tracks datagrams being written to it. There is always a "current" datagram, +/// which is started by calling [`TransmitBuf::start_new_datagram`]. Writing to the buffer +/// is done through the [`BufMut`] interface. +/// +/// Usually a datagram contains one QUIC packet, though QUIC-TRANSPORT 12.2 Coalescing +/// Packets allows for placing multiple packets into a single datagram provided all but the +/// last packet uses long headers. This is normally used during connection setup where often +/// the initial, handshake and sometimes even a 1-RTT packet can be coalesced into a single +/// datagram. +/// +/// Inside a single packet multiple QUIC frames are written. +/// +/// The buffer managed here is passed straight to the OS' `sendmsg` call (or variant) once +/// `poll_transmit` returns. So needs to contain the datagrams as they are sent on the +/// wire. +/// +/// [`Connection::poll_transmit`]: super::Connection::poll_transmit +#[derive(Debug)] +pub(super) struct TransmitBuilder<'a> { + /// The buffer itself, packets are written to this buffer + buf: &'a mut Vec, + /// Offset into the buffer at which the current datagram starts + /// + /// Note that when coalescing packets this might be before the start of the current + /// packet. + datagram_start: usize, + /// The maximum offset allowed to be used for the current datagram in the buffer + /// + /// The first and last datagram in a batch are allowed to be smaller then the maximum + /// size. All datagrams in between need to be exactly this size. + buf_capacity: usize, + /// The maximum number of datagrams allowed to write into [`TransmitBuf::buf`] + max_datagrams: usize, + /// The number of datagrams already (partially) written into the buffer + /// + /// Incremented by a call to [`TransmitBuf::start_new_datagram`]. + pub(super) num_datagrams: usize, + /// The segment size of this GSO batch + /// + /// The segment size is the size of each datagram in the GSO batch, only the last + /// datagram in the batch may be smaller. + /// + /// For the first datagram this is set to the maximum size a datagram is allowed to be: + /// the current path MTU. After the first datagram is finished this is reduced to the + /// size of the first datagram and can no longer change. + segment_size: usize, +} + +impl<'a> TransmitBuilder<'a> { + pub(super) fn new(buf: &'a mut Vec, max_datagrams: usize, mtu: usize) -> Self { + Self { + buf, + datagram_start: 0, + buf_capacity: 0, + max_datagrams, + num_datagrams: 0, + segment_size: mtu, + } + } + + /// Starts a datagram with a custom datagram size + /// + /// This is a specialized version of [`TransmitBuf::start_new_datagram`] which sets the + /// datagram size. Useful for e.g. PATH_CHALLENGE, tail-loss probes or MTU probes. + /// + /// After the first datagram you can never increase the segment size. If you decrease + /// the size of a datagram in a batch, it must be the last datagram of the batch. + pub(super) fn start_new_datagram_with_size(&mut self, datagram_size: usize) { + // Only reserve space for this datagram, usually it is the last one in the batch. + let max_capacity_hint = datagram_size; + self.new_datagram_inner(datagram_size, max_capacity_hint) + } + + /// Starts a new datagram in the transmit buffer + /// + /// If this starts the second datagram the segment size will be set to the size of the + /// first datagram. + /// + /// If the underlying buffer does not have enough capacity yet this will allocate enough + /// capacity for all the datagrams allowed in a single batch. Use + /// [`TransmitBuf::start_new_datagram_with_size`] if you know you will need less. + pub(super) fn start_new_datagram(&mut self) { + // We reserve the maximum space for sending `max_datagrams` upfront to avoid any + // reallocations if more datagrams have to be appended later on. Benchmarks have + // shown a 5-10% throughput improvement compared to continuously resizing the + // datagram buffer. While this will lead to over-allocation for small transmits + // (e.g. purely containing ACKs), modern memory allocators (e.g. mimalloc and + // jemalloc) will pool certain allocation sizes and therefore this is still rather + // efficient. + let max_capacity_hint = self.max_datagrams * self.segment_size; + self.new_datagram_inner(self.segment_size, max_capacity_hint) + } + + fn new_datagram_inner(&mut self, datagram_size: usize, max_capacity_hint: usize) { + debug_assert!(self.num_datagrams < self.max_datagrams); + if self.num_datagrams == 1 { + // Set the segment size to the size of the first datagram. + self.segment_size = self.buf.len(); + } + if self.num_datagrams >= 1 { + debug_assert!(datagram_size <= self.segment_size); + if datagram_size < self.segment_size { + // If this is a GSO batch and this datagram is smaller than the segment + // size, this must be the last datagram in the batch. + self.max_datagrams = self.num_datagrams + 1; + } + } + self.datagram_start = self.buf.len(); + debug_assert_eq!( + self.datagram_start % self.segment_size, + 0, + "datagrams in a GSO batch must be aligned to the segment size" + ); + self.buf_capacity = self.datagram_start + datagram_size; + if self.buf_capacity > self.buf.capacity() { + self.buf + .reserve_exact(max_capacity_hint.saturating_sub(self.buf.capacity())); + } + self.num_datagrams += 1; + } + + /// Clips the datagram size to the current size + /// + /// Only valid for the first datagram, when the datagram might be smaller than the + /// segment size. Needed before estimating the available space in the next datagram + /// based on [`TransmitBuf::segment_size`]. + /// + /// Use [`TransmitBuf::start_new_datagram_with_size`] if you need to reduce the size of + /// the last datagram in a batch. + pub(super) fn clip_datagram_size(&mut self) { + debug_assert_eq!(self.num_datagrams, 1); + self.segment_size = self.buf.len(); + self.buf_capacity = self.buf.len(); + } + + /// Returns a mutable buffer for the current datagram + /// + /// This buffer implements [`BufSlice`] and thus allows writing into the buffer using + /// [`BufMut`] and both reading and modifying the already written data in the buffer + /// using [`Deref`] and [`DerefMut`]. The buffer also enforces a maximum size. + pub(super) fn datagram_mut(&mut self) -> DatagramBuffer<'_> { + DatagramBuffer::new( + self.buf, + self.datagram_start, + self.buf_capacity - self.datagram_start, + ) + } + + pub(super) fn build( + self, + destination: SocketAddr, + ecn: Option, + src_ip: Option, + ) -> Transmit { + Transmit { + destination, + ecn, + size: self.len(), + segment_size: match self.num_datagrams { + 1 => None, + _ => Some(self.segment_size), + }, + src_ip, + } + } + + /// Returns the bytes written into the current datagram + pub(super) fn datagram(&self) -> &[u8] { + &self.buf[self.datagram_start..] + } + + /// Whether there is space for another datagram in this transmit + pub(super) fn has_datagram_capacity(&self) -> bool { + self.num_datagrams >= self.max_datagrams + } + + /// The sum of the capacity of all started datagrams in the transmit + /// + /// This might be more than [`len`] when the current datagram is not yet fully + /// written. In other words: this is the length of the transmit when the current + /// datagram would fill the entire segment size. + /// + /// [`len`]: TransmitBuf::len + pub(super) fn capacity(&self) -> usize { + self.segment_size * self.num_datagrams + } + + /// Whether the current datagram is the first in the transmit buffer + pub(super) fn is_first_datagram(&self) -> bool { + self.num_datagrams == 1 + } + + /// Returns the GSO segment size + /// + /// This is also the maximum size datagrams are allowed to be. The first and last + /// datagram in a batch are allowed to be smaller however. After the first datagram the + /// segment size is clipped to the size of the first datagram. + pub(super) fn segment_size(&self) -> usize { + self.segment_size + } + + /// Returns the number of datagrams written into the buffer + /// + /// The last datagram is not necessarily finished yet. + pub(super) fn num_datagrams(&self) -> usize { + self.num_datagrams + } + + /// Returns `true` if there are no datagrams in this transmit + pub(super) fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Returns the sum of the bytes for all datagrams in the builder + pub(super) fn len(&self) -> usize { + self.buf.len() + } +} + +/// A [`BufSlice`] implementation for a datagram +#[derive(Debug)] +pub(crate) struct DatagramBuffer<'a> { + /// The underlying storage the datagram buffer exists in + buf: &'a mut Vec, + /// The start offset of the datagram in the underlying buffer + start_offset: usize, + /// The maximum write offset in the underlying buffer for this datagram + max_offset: usize, +} + +impl<'a> DatagramBuffer<'a> { + pub(crate) fn new(buf: &'a mut Vec, start_offset: usize, max_size: usize) -> Self { + // Make sure that at least this datagram is allocated. Does nothing if, like for a + // transmit, already more has been allocated. + buf.reserve(max_size); + + let max_offset = start_offset + max_size; + DatagramBuffer { + buf, + start_offset, + max_offset, + } + } +} + +unsafe impl BufMut for DatagramBuffer<'_> { + fn remaining_mut(&self) -> usize { + self.max_offset - self.buf.len() + } + + unsafe fn advance_mut(&mut self, cnt: usize) { + self.buf.advance_mut(cnt); + } + + fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice { + self.buf.chunk_mut() + } +} + +impl Deref for DatagramBuffer<'_> { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + &self.buf[self.start_offset..] + } +} + +impl DerefMut for DatagramBuffer<'_> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.buf[self.start_offset..] + } +} + +impl DatagramBuffer<'_> { + /// Returns the maximum size of the buffer + pub(crate) fn capacity(&self) -> usize { + self.max_offset - self.start_offset + } +} diff --git a/quinn-proto/src/endpoint.rs b/quinn-proto/src/endpoint.rs index 59d94bbc90..1d864ff9db 100644 --- a/quinn-proto/src/endpoint.rs +++ b/quinn-proto/src/endpoint.rs @@ -20,7 +20,7 @@ use crate::{ cid_generator::ConnectionIdGenerator, coding::BufMutExt, config::{ClientConfig, EndpointConfig, ServerConfig}, - connection::{Connection, ConnectionError, SideArgs}, + connection::{Connection, ConnectionError, DatagramBuffer, SideArgs}, crypto::{self, Keys, UnsupportedVersion}, frame, packet::{ @@ -146,6 +146,8 @@ impl Endpoint { data: BytesMut, buf: &mut Vec, ) -> Option { + let start_pos = buf.len(); + let buf = &mut DatagramBuffer::new(buf, start_pos, MIN_INITIAL_SIZE.into()); // Partially decode packet or short-circuit if unable let datagram_len = data.len(); let event = match PartialDecode::new( @@ -264,7 +266,7 @@ impl Endpoint { inciting_dgram_len: usize, addresses: FourTuple, dst_cid: ConnectionId, - buf: &mut Vec, + buf: &mut DatagramBuffer<'_>, ) -> Option { if self .last_stateless_reset @@ -303,11 +305,10 @@ impl Endpoint { self.rng .random_range(IDEAL_MIN_PADDING_LEN..max_padding_len) }; - buf.reserve(padding_len + RESET_TOKEN_SIZE); - buf.resize(padding_len, 0); + buf.put_bytes(0, padding_len); self.rng.fill_bytes(&mut buf[0..padding_len]); buf[0] = 0b0100_0000 | (buf[0] >> 2); - buf.extend_from_slice(&ResetToken::new(&*self.config.reset_key, dst_cid)); + buf.put_slice(&ResetToken::new(&*self.config.reset_key, dst_cid)); debug_assert!(buf.len() < inciting_dgram_len); @@ -419,7 +420,7 @@ impl Endpoint { datagram_len: usize, event: DatagramConnectionEvent, addresses: FourTuple, - buf: &mut Vec, + buf: &mut DatagramBuffer<'_>, ) -> Option { let dst_cid = event.first_decode.dst_cid(); let header = event.first_decode.initial_header().unwrap(); @@ -525,6 +526,8 @@ impl Endpoint { buf: &mut Vec, server_config: Option>, ) -> Result<(ConnectionHandle, Connection), AcceptError> { + let buf_start = buf.len(); + let mut buf = DatagramBuffer::new(buf, buf_start, MIN_INITIAL_SIZE.into()); let remote_address_validated = incoming.remote_address_validated(); incoming.improper_drop_warner.dismiss(); let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx); @@ -566,7 +569,7 @@ impl Endpoint { &incoming.crypto, &src_cid, TransportError::CONNECTION_REFUSED(""), - buf, + &mut buf, )), }); } @@ -664,7 +667,7 @@ impl Endpoint { &incoming.crypto, &src_cid, e.clone(), - buf, + &mut buf, )), _ => None, }; @@ -708,13 +711,15 @@ impl Endpoint { self.clean_up_incoming(&incoming); incoming.improper_drop_warner.dismiss(); + let start_pos = buf.len(); + let mut buf = DatagramBuffer::new(buf, start_pos, MIN_INITIAL_SIZE.into()); self.initial_close( incoming.packet.header.version, incoming.addresses, &incoming.crypto, &incoming.packet.header.src_cid, TransportError::CONNECTION_REFUSED(""), - buf, + &mut buf, ) } @@ -752,14 +757,16 @@ impl Endpoint { version: incoming.packet.header.version, }; - let encode = header.encode(buf); + let start_pos = buf.len(); + let mut buf = DatagramBuffer::new(buf, start_pos, MIN_INITIAL_SIZE.into()); + let encode = header.encode(&mut buf); buf.put_slice(&token); - buf.extend_from_slice(&server_config.crypto.retry_tag( + buf.put_slice(&server_config.crypto.retry_tag( incoming.packet.header.version, &incoming.packet.header.dst_cid, - buf, + &buf, )); - encode.finish(buf, &*incoming.crypto.header.local, None); + encode.finish(&mut buf, &*incoming.crypto.header.local, None); Ok(Transmit { destination: incoming.addresses.remote, @@ -854,7 +861,7 @@ impl Endpoint { crypto: &Keys, remote_id: &ConnectionId, reason: TransportError, - buf: &mut Vec, + buf: &mut DatagramBuffer<'_>, ) -> Transmit { // We don't need to worry about CID collisions in initial closes because the peer // shouldn't respond, and if it does, and the CID collides, we'll just drop the @@ -873,7 +880,7 @@ impl Endpoint { let max_len = INITIAL_MTU as usize - partial_encode.header_len - crypto.packet.local.tag_len(); frame::Close::from(reason).encode(buf, max_len); - buf.resize(buf.len() + crypto.packet.local.tag_len(), 0); + buf.put_bytes(0, crypto.packet.local.tag_len()); partial_encode.finish(buf, &*crypto.header.local, Some((0, &*crypto.packet.local))); Transmit { destination: addresses.remote, diff --git a/quinn-proto/src/frame.rs b/quinn-proto/src/frame.rs index 21fe0340c2..9c2ca47ca3 100644 --- a/quinn-proto/src/frame.rs +++ b/quinn-proto/src/frame.rs @@ -899,13 +899,13 @@ impl FrameStruct for Datagram { } impl Datagram { - pub(crate) fn encode(&self, length: bool, out: &mut Vec) { + pub(crate) fn encode(&self, length: bool, out: &mut impl BufMut) { out.write(FrameType(*DATAGRAM_TYS.start() | u64::from(length))); // 1 byte if length { // Safe to unwrap because we check length sanity before queueing datagrams out.write(VarInt::from_u64(self.data.len() as u64).unwrap()); // <= 8 bytes } - out.extend_from_slice(&self.data); + out.put_slice(&self.data); } pub(crate) fn size(&self, length: bool) -> usize { diff --git a/quinn-proto/src/packet.rs b/quinn-proto/src/packet.rs index b5ef0c4026..b4d6a25ef7 100644 --- a/quinn-proto/src/packet.rs +++ b/quinn-proto/src/packet.rs @@ -6,6 +6,7 @@ use thiserror::Error; use crate::{ ConnectionId, coding::{self, BufExt, BufMutExt}, + connection::DatagramBuffer, crypto, }; @@ -281,7 +282,11 @@ pub(crate) enum Header { } impl Header { - pub(crate) fn encode(&self, w: &mut Vec) -> PartialEncode { + /// Encodes the QUIC packet header into the buffer + /// + /// The current position of the buffer is stored in the [`PartialEncode`] as the start + /// of the packet in the buffer. + pub(crate) fn encode(&self, w: &mut DatagramBuffer<'_>) -> PartialEncode { use Header::*; let start = w.len(); match *self { @@ -938,8 +943,10 @@ mod tests { #[cfg(any(feature = "rustls-aws-lc-rs", feature = "rustls-ring"))] #[test] fn header_encoding() { - use crate::Side; + use std::ops::Deref; + use crate::crypto::rustls::{initial_keys, initial_suite_from_provider}; + use crate::{MIN_INITIAL_SIZE, Side}; #[cfg(all(feature = "rustls-aws-lc-rs", not(feature = "rustls-ring")))] use rustls::crypto::aws_lc_rs::default_provider; #[cfg(feature = "rustls-ring")] @@ -952,6 +959,7 @@ mod tests { let suite = initial_suite_from_provider(&std::sync::Arc::new(provider)).unwrap(); let client = initial_keys(Version::V1, dcid, Side::Client, &suite); let mut buf = Vec::new(); + let mut buf = DatagramBuffer::new(&mut buf, 0, MIN_INITIAL_SIZE.into()); let header = Header::Initial(InitialHeader { number: PacketNumber::U8(0), src_cid: ConnectionId::new(&[]), @@ -960,15 +968,14 @@ mod tests { version: crate::DEFAULT_SUPPORTED_VERSIONS[0], }); let encode = header.encode(&mut buf); - let header_len = buf.len(); - buf.resize(header_len + 16 + client.packet.local.tag_len(), 0); + buf.put_bytes(0, 16 + client.packet.local.tag_len()); encode.finish( &mut buf, &*client.header.local, Some((0, &*client.packet.local)), ); - for byte in &buf { + for byte in buf.deref() { print!("{byte:02x}"); } println!(); @@ -983,7 +990,7 @@ mod tests { let server = initial_keys(Version::V1, dcid, Side::Server, &suite); let supported_versions = crate::DEFAULT_SUPPORTED_VERSIONS.to_vec(); let decode = PartialDecode::new( - buf.as_slice().into(), + buf.deref().into(), &FixedLengthConnectionIdParser::new(0), &supported_versions, false,