Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions quinn-proto/src/connection/datagrams.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::VecDeque;

use bytes::Bytes;
use bytes::{BufMut, Bytes};
use thiserror::Error;
use tracing::{debug, trace};

Expand Down Expand Up @@ -163,13 +163,13 @@ impl DatagramState {
///
/// Returns whether a frame was written. At most `max_size` bytes will be written, including
/// framing.
pub(super) fn write(&mut self, buf: &mut Vec<u8>, max_size: usize) -> bool {
pub(super) fn write(&mut self, buf: &mut impl BufMut) -> bool {
let datagram = match self.outgoing.pop_front() {
Some(x) => x,
None => return false,
};

if buf.len() + datagram.size(true) > max_size {
if buf.remaining_mut() < datagram.size(true) {
// Future work: we could be more clever about cramming small datagrams into
// mostly-full packets when a larger one is queued first
self.outgoing.push_front(datagram);
Expand Down
548 changes: 264 additions & 284 deletions quinn-proto/src/connection/mod.rs

Large diffs are not rendered by default.

102 changes: 69 additions & 33 deletions quinn-proto/src/connection/packet_builder.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
use bytes::Bytes;
use bytes::{BufMut, Bytes};
use rand::Rng;
use tracing::{trace, trace_span};

use super::{Connection, SentFrames, spaces::SentPacket};
use super::{Connection, SentFrames, TransmitBuf, spaces::SentPacket};
use crate::{
ConnectionId, Instant, TransportError, TransportErrorCode,
ConnectionId, Instant, MIN_INITIAL_SIZE, TransportError, TransportErrorCode,
connection::ConnectionSide,
frame::{self, Close},
packet::{FIXED_BIT, Header, InitialHeader, LongType, PacketNumber, PartialEncode, SpaceId},
};

pub(super) struct PacketBuilder {
pub(super) datagram_start: usize,
/// QUIC packet builder
///
/// This allows building QUIC packets: it takes care of writing the header, allows writing
/// frames and on [`PacketBuilder::finalize`] (or [`PacketBuilder::finalize_and_track`]) it
/// encrypts the packet so it is ready to be sent on the wire.
///
/// The builder manages the write buffer into which the packet is written, and directly
/// implements [`BufMut`] to write frames into the packet.
pub(super) struct PacketBuilder<'a, 'b> {
pub(super) buf: &'a mut TransmitBuf<'b>,
pub(super) space: SpaceId,
pub(super) partial_encode: PartialEncode,
pub(super) ack_eliciting: bool,
Expand All @@ -20,14 +28,11 @@ pub(super) struct PacketBuilder {
/// Smallest absolute position in the associated buffer that must be occupied by this packet's
/// frames
pub(super) min_size: usize,
/// Largest absolute position in the associated buffer that may be occupied by this packet's
/// frames
pub(super) max_size: usize,
pub(super) tag_len: usize,
pub(super) _span: tracing::span::EnteredSpan,
}

impl PacketBuilder {
impl<'a, 'b> PacketBuilder<'a, 'b> {
/// Write a new packet header to `buffer` and determine the packet's properties
///
/// Marks the connection drained and returns `None` if the confidentiality limit would be
Expand All @@ -36,12 +41,13 @@ impl PacketBuilder {
now: Instant,
space_id: SpaceId,
dst_cid: ConnectionId,
buffer: &mut Vec<u8>,
buffer_capacity: usize,
datagram_start: usize,
buffer: &'a mut TransmitBuf<'b>,
ack_eliciting: bool,
conn: &mut Connection,
) -> Option<Self> {
) -> Option<Self>
where
'b: 'a,
{
let version = conn.version;
// Initiate key update if we're approaching the confidentiality limit
let sent_with_keys = conn.spaces[space_id].sent_with_keys;
Expand Down Expand Up @@ -124,7 +130,7 @@ impl PacketBuilder {
};
let partial_encode = header.encode(buffer);
if conn.peer_params.grease_quic_bit && conn.rng.random() {
buffer[partial_encode.start] ^= FIXED_BIT;
buffer.as_mut_slice()[partial_encode.start] ^= FIXED_BIT;
}

let (sample_size, tag_len) = if let Some(ref crypto) = space.crypto {
Expand All @@ -151,17 +157,16 @@ impl PacketBuilder {
buffer.len() + (sample_size + 4).saturating_sub(number.len() + tag_len),
partial_encode.start + dst_cid.len() + 6,
);
let max_size = buffer_capacity - tag_len;
let max_size = buffer.datagram_max_offset() - tag_len;
debug_assert!(max_size >= min_size);

Some(Self {
datagram_start,
buf: buffer,
space: space_id,
partial_encode,
exact_number,
short_header: header.is_short(),
min_size,
max_size,
tag_len,
ack_eliciting,
_span: span,
Expand All @@ -176,25 +181,33 @@ impl PacketBuilder {
// already.
self.min_size = Ord::max(
self.min_size,
self.datagram_start + (min_size as usize) - self.tag_len,
self.buf.datagram_start_offset() + (min_size as usize) - self.tag_len,
);
}

/// Returns a writable buffer limited to the remaining frame space
///
/// The [`BufMut::remaining_mut`] call on the returned buffer indicates the amount of
/// space available to write QUIC frames into.
// In rust 1.82 we can use `-> impl BufMut + use<'_, 'a, 'b>`
pub(super) fn frame_space_mut(&mut self) -> bytes::buf::Limit<&mut TransmitBuf<'b>> {
self.buf.limit(self.frame_space_remaining())
}

pub(super) fn finish_and_track(
self,
mut self,
now: Instant,
conn: &mut Connection,
sent: Option<SentFrames>,
buffer: &mut Vec<u8>,
sent: SentFrames,
pad_datagram: bool,
) {
if pad_datagram {
self.pad_to(MIN_INITIAL_SIZE);
}
let ack_eliciting = self.ack_eliciting;
let exact_number = self.exact_number;
let space_id = self.space;
let (size, padded) = self.finish(conn, buffer);
let sent = match sent {
Some(sent) => sent,
None => return,
};
let (size, padded) = self.finish(conn);

let size = match padded || ack_eliciting {
true => size as u16,
Expand Down Expand Up @@ -228,11 +241,15 @@ impl PacketBuilder {
}

/// Encrypt packet, returning the length of the packet and whether padding was added
pub(super) fn finish(self, conn: &mut Connection, buffer: &mut Vec<u8>) -> (usize, bool) {
let pad = buffer.len() < self.min_size;
pub(super) fn finish(self, conn: &mut Connection) -> (usize, bool) {
debug_assert!(
self.buf.len() <= self.buf.datagram_max_offset() - self.tag_len,
"packet exceeds maximum size"
);
let pad = self.buf.len() < self.min_size;
if pad {
trace!("PADDING * {}", self.min_size - buffer.len());
buffer.resize(self.min_size, 0);
trace!("PADDING * {}", self.min_size - self.buf.len());
self.buf.put_bytes(0, self.min_size - self.buf.len());
}

let space = &conn.spaces[self.space];
Expand All @@ -251,15 +268,34 @@ impl PacketBuilder {
"Mismatching crypto tag len"
);

buffer.resize(buffer.len() + packet_crypto.tag_len(), 0);
self.buf.put_bytes(0, packet_crypto.tag_len());
let encode_start = self.partial_encode.start;
let packet_buf = &mut buffer[encode_start..];
let packet_buf = &mut self.buf.as_mut_slice()[encode_start..];
self.partial_encode.finish(
packet_buf,
header_crypto,
Some((self.exact_number, packet_crypto)),
);

(buffer.len() - encode_start, pad)
let packet_len = self.buf.len() - encode_start;
trace!(size = %packet_len, short_header = %self.short_header, "wrote packet");
(packet_len, pad)
}

/// The number of additional bytes the current packet would take up if it was finished now
///
/// This will include any padding which is required to make the size large enough to be
/// encrypted correctly.
pub(super) fn predict_packet_end(&self) -> usize {
self.buf.len().max(self.min_size) + self.tag_len - self.buf.len()
}

/// Returns the remaining space in the packet that can be taken up by QUIC frames
///
/// This leaves space in the datagram for the cryptographic tag that needs to be written
/// when the packet is finished.
pub(super) fn frame_space_remaining(&self) -> usize {
let max_offset = self.buf.datagram_max_offset() - self.tag_len;
max_offset.saturating_sub(self.buf.len())
}
}
59 changes: 28 additions & 31 deletions quinn-proto/src/connection/streams/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,14 +411,13 @@ impl StreamsState {

pub(in crate::connection) fn write_control_frames(
&mut self,
buf: &mut Vec<u8>,
buf: &mut impl BufMut,
pending: &mut Retransmits,
retransmits: &mut ThinRetransmits,
stats: &mut FrameStats,
max_size: usize,
) {
// RESET_STREAM
while buf.len() + frame::ResetStream::SIZE_BOUND < max_size {
while buf.remaining_mut() > frame::ResetStream::SIZE_BOUND {
let (id, error_code) = match pending.reset_stream.pop() {
Some(x) => x,
None => break,
Expand All @@ -442,7 +441,7 @@ impl StreamsState {
}

// STOP_SENDING
while buf.len() + frame::StopSending::SIZE_BOUND < max_size {
while buf.remaining_mut() > frame::StopSending::SIZE_BOUND {
let frame = match pending.stop_sending.pop() {
Some(x) => x,
None => break,
Expand All @@ -461,7 +460,7 @@ impl StreamsState {
}

// MAX_DATA
if pending.max_data && buf.len() + 9 < max_size {
if pending.max_data && buf.remaining_mut() > 9 {
pending.max_data = false;

// `local_max_data` can grow bigger than `VarInt`.
Expand All @@ -484,7 +483,7 @@ impl StreamsState {
}

// MAX_STREAM_DATA
while buf.len() + 17 < max_size {
while buf.remaining_mut() > 17 {
let id = match pending.max_stream_data.iter().next() {
Some(x) => *x,
None => break,
Expand Down Expand Up @@ -516,7 +515,7 @@ impl StreamsState {

// MAX_STREAMS
for dir in Dir::iter() {
if !pending.max_stream_id[dir as usize] || buf.len() + 9 >= max_size {
if !pending.max_stream_id[dir as usize] || buf.remaining_mut() <= 9 {
continue;
}

Expand All @@ -541,21 +540,14 @@ impl StreamsState {

pub(crate) fn write_stream_frames(
&mut self,
buf: &mut Vec<u8>,
max_buf_size: usize,
buf: &mut impl BufMut,
fair: bool,
) -> StreamMetaVec {
let mut stream_frames = StreamMetaVec::new();
while buf.len() + frame::Stream::SIZE_BOUND < max_buf_size {
if max_buf_size
.checked_sub(buf.len() + frame::Stream::SIZE_BOUND)
.is_none()
{
break;
}

// Pop the stream of the highest priority that currently has pending data
// If the stream still has some pending data left after writing, it will be reinserted, otherwise not
while buf.remaining_mut() > frame::Stream::SIZE_BOUND {
// Pop the stream of the highest priority that currently has pending data. If
// the stream still has some pending data left after writing, it will be
// reinserted, otherwise not
let Some(stream) = self.pending.pop() else {
break;
};
Expand All @@ -577,7 +569,7 @@ impl StreamsState {

// Now that we know the `StreamId`, we can better account for how many bytes
// are required to encode it.
let max_buf_size = max_buf_size - buf.len() - 1 - VarInt::size(id.into());
let max_buf_size = buf.remaining_mut() - 1 - VarInt::size(id.into());
let (offsets, encode_length) = stream.pending.poll_transmit(max_buf_size);
let fin = offsets.end == stream.pending.offset()
&& matches!(stream.state, SendState::DataSent { .. });
Expand Down Expand Up @@ -1380,7 +1372,7 @@ mod tests {
high.write(b"high").unwrap();

let mut buf = Vec::with_capacity(40);
let meta = server.write_stream_frames(&mut buf, 40, true);
let meta = server.write_stream_frames(&mut buf, true);
assert_eq!(meta[0].id, id_high);
assert_eq!(meta[1].id, id_mid);
assert_eq!(meta[2].id, id_low);
Expand Down Expand Up @@ -1438,16 +1430,18 @@ mod tests {
};
high.set_priority(-1).unwrap();

let mut buf = Vec::with_capacity(1000);
let meta = server.write_stream_frames(&mut buf, 40, true);
let mut buf = Vec::with_capacity(1000).limit(40);
let meta = server.write_stream_frames(&mut buf, true);
assert_eq!(meta.len(), 1);
assert_eq!(meta[0].id, id_high);

// After requeuing we should end up with 2 priorities - not 3
assert_eq!(server.pending.len(), 2);

let mut buf = buf.into_inner();

// Send the remaining data. The initial mid priority one should go first now
let meta = server.write_stream_frames(&mut buf, 1000, true);
let meta = server.write_stream_frames(&mut buf, true);
assert_eq!(meta.len(), 2);
assert_eq!(meta[0].id, id_mid);
assert_eq!(meta[1].id, id_high);
Expand Down Expand Up @@ -1507,12 +1501,13 @@ mod tests {

// loop until all the streams are written
loop {
let buf_len = buf.len();
let meta = server.write_stream_frames(&mut buf, buf_len + 40, fair);
let mut chunk_buf = buf.limit(40);
let meta = server.write_stream_frames(&mut chunk_buf, fair);
if meta.is_empty() {
break;
}
metas.extend(meta);
buf = chunk_buf.into_inner();
}

assert!(!server.can_send_stream_data());
Expand Down Expand Up @@ -1575,11 +1570,12 @@ mod tests {
stream_b.write(&[b'b'; 100]).unwrap();

let mut metas = vec![];
let mut buf = Vec::with_capacity(1024);
let buf = Vec::with_capacity(1024);

// Write the first chunk of stream_a
let buf_len = buf.len();
let meta = server.write_stream_frames(&mut buf, buf_len + 40, false);
let mut chunk_buf = buf.limit(40);
let meta = server.write_stream_frames(&mut chunk_buf, false);
let mut buf = chunk_buf.into_inner();
assert!(!meta.is_empty());
metas.extend(meta);

Expand All @@ -1595,8 +1591,9 @@ mod tests {

// loop until all the streams are written
loop {
let buf_len = buf.len();
let meta = server.write_stream_frames(&mut buf, buf_len + 40, false);
let mut chunk_buf = buf.limit(40);
let meta = server.write_stream_frames(&mut chunk_buf, false);
buf = chunk_buf.into_inner();
if meta.is_empty() {
break;
}
Expand Down
Loading
Loading