Skip to content

Commit 0cb9867

Browse files
authored
Flub/poll transmit ref (#54)
This applies the refactors from quinn-rs#2168 and quinn-rs#2195 onto our multipath branch!
2 parents c2f364e + a0ba958 commit 0cb9867

File tree

7 files changed

+716
-448
lines changed

7 files changed

+716
-448
lines changed

quinn-proto/src/connection/datagrams.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::collections::VecDeque;
22

3-
use bytes::Bytes;
3+
use bytes::{BufMut, Bytes};
44
use thiserror::Error;
55
use tracing::{debug, trace};
66

@@ -164,13 +164,13 @@ impl DatagramState {
164164
///
165165
/// Returns whether a frame was written. At most `max_size` bytes will be written, including
166166
/// framing.
167-
pub(super) fn write(&mut self, buf: &mut Vec<u8>, max_size: usize) -> bool {
167+
pub(super) fn write(&mut self, buf: &mut impl BufMut) -> bool {
168168
let datagram = match self.outgoing.pop_front() {
169169
Some(x) => x,
170170
None => return false,
171171
};
172172

173-
if buf.len() + datagram.size(true) > max_size {
173+
if buf.remaining_mut() < datagram.size(true) {
174174
// Future work: we could be more clever about cramming small datagrams into
175175
// mostly-full packets when a larger one is queued first
176176
self.outgoing.push_front(datagram);

quinn-proto/src/connection/mod.rs

Lines changed: 352 additions & 378 deletions
Large diffs are not rendered by default.

quinn-proto/src/connection/packet_builder.rs

Lines changed: 69 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,25 @@
1-
use bytes::Bytes;
1+
use bytes::{BufMut, Bytes};
22
use rand::Rng;
33
use tracing::{trace, trace_span};
44

5-
use super::{Connection, PathId, SentFrames, spaces::SentPacket};
5+
use super::{Connection, PathId, SentFrames, TransmitBuf, spaces::SentPacket};
66
use crate::{
7-
ConnectionId, Instant, TransportError, TransportErrorCode,
7+
ConnectionId, Instant, MIN_INITIAL_SIZE, TransportError, TransportErrorCode,
88
connection::ConnectionSide,
99
frame::{self, Close},
1010
packet::{FIXED_BIT, Header, InitialHeader, LongType, PacketNumber, PartialEncode, SpaceId},
1111
};
1212

13-
pub(super) struct PacketBuilder {
14-
pub(super) datagram_start: usize,
13+
/// QUIC packet builder
14+
///
15+
/// This allows building QUIC packets: it takes care of writing the header, allows writing
16+
/// frames and on [`PacketBuilder::finish`] (or [`PacketBuilder::finish_and_track`]) it
17+
/// encrypts the packet so it is ready to be sent on the wire.
18+
///
19+
/// The builder manages the write buffer into which the packet is written, and directly
20+
/// implements [`BufMut`] to write frames into the packet.
21+
pub(super) struct PacketBuilder<'a, 'b> {
22+
pub(super) buf: &'a mut TransmitBuf<'b>,
1523
pub(super) space: SpaceId,
1624
path: PathId,
1725
pub(super) partial_encode: PartialEncode,
@@ -21,14 +29,11 @@ pub(super) struct PacketBuilder {
2129
/// Smallest absolute position in the associated buffer that must be occupied by this packet's
2230
/// frames
2331
pub(super) min_size: usize,
24-
/// Largest absolute position in the associated buffer that may be occupied by this packet's
25-
/// frames
26-
pub(super) max_size: usize,
2732
pub(super) tag_len: usize,
2833
pub(super) _span: tracing::span::EnteredSpan,
2934
}
3035

31-
impl PacketBuilder {
36+
impl<'a, 'b> PacketBuilder<'a, 'b> {
3237
/// Write a new packet header to `buffer` and determine the packet's properties
3338
///
3439
/// Marks the connection drained and returns `None` if the confidentiality limit would be
@@ -38,12 +43,13 @@ impl PacketBuilder {
3843
space_id: SpaceId,
3944
path_id: PathId,
4045
dst_cid: ConnectionId,
41-
buffer: &mut Vec<u8>,
42-
buffer_capacity: usize,
43-
datagram_start: usize,
46+
buffer: &'a mut TransmitBuf<'b>,
4447
ack_eliciting: bool,
4548
conn: &mut Connection,
46-
) -> Option<Self> {
49+
) -> Option<Self>
50+
where
51+
'b: 'a,
52+
{
4753
let version = conn.version;
4854
// Initiate key update if we're approaching the confidentiality limit
4955
let sent_with_keys = conn.spaces[space_id].sent_with_keys();
@@ -125,7 +131,7 @@ impl PacketBuilder {
125131
};
126132
let partial_encode = header.encode(buffer);
127133
if conn.peer_params.grease_quic_bit && conn.rng.random() {
128-
buffer[partial_encode.start] ^= FIXED_BIT;
134+
buffer.as_mut_slice()[partial_encode.start] ^= FIXED_BIT;
129135
}
130136

131137
let (sample_size, tag_len) = if let Some(ref crypto) = space.crypto {
@@ -152,18 +158,17 @@ impl PacketBuilder {
152158
buffer.len() + (sample_size + 4).saturating_sub(number.len() + tag_len),
153159
partial_encode.start + dst_cid.len() + 6,
154160
);
155-
let max_size = buffer_capacity - tag_len;
161+
let max_size = buffer.datagram_max_offset() - tag_len;
156162
debug_assert!(max_size >= min_size);
157163

158164
Some(Self {
159-
datagram_start,
165+
buf: buffer,
160166
space: space_id,
161167
path: path_id,
162168
partial_encode,
163169
exact_number,
164170
short_header: header.is_short(),
165171
min_size,
166-
max_size,
167172
tag_len,
168173
ack_eliciting,
169174
_span: span,
@@ -178,26 +183,34 @@ impl PacketBuilder {
178183
// already.
179184
self.min_size = Ord::max(
180185
self.min_size,
181-
self.datagram_start + (min_size as usize) - self.tag_len,
186+
self.buf.datagram_start_offset() + (min_size as usize) - self.tag_len,
182187
);
183188
}
184189

190+
/// Returns a writable buffer limited to the remaining frame space
191+
///
192+
/// The [`BufMut::remaining_mut`] call on the returned buffer indicates the amount of
193+
/// space available to write QUIC frames into.
194+
// In rust 1.82 we can use `-> impl BufMut + use<'_, 'a, 'b>`
195+
pub(super) fn frame_space_mut(&mut self) -> bytes::buf::Limit<&mut TransmitBuf<'b>> {
196+
self.buf.limit(self.frame_space_remaining())
197+
}
198+
185199
pub(super) fn finish_and_track(
186-
self,
200+
mut self,
187201
now: Instant,
188202
conn: &mut Connection,
189203
path_id: PathId,
190-
sent: Option<SentFrames>,
191-
buffer: &mut Vec<u8>,
204+
sent: SentFrames,
205+
pad_datagram: bool,
192206
) {
207+
if pad_datagram {
208+
self.pad_to(MIN_INITIAL_SIZE);
209+
}
193210
let ack_eliciting = self.ack_eliciting;
194211
let exact_number = self.exact_number;
195212
let space_id = self.space;
196-
let (size, padded) = self.finish(conn, buffer);
197-
let sent = match sent {
198-
Some(sent) => sent,
199-
None => return,
200-
};
213+
let (size, padded) = self.finish(conn);
201214

202215
let size = match padded || ack_eliciting {
203216
true => size as u16,
@@ -237,11 +250,15 @@ impl PacketBuilder {
237250
}
238251

239252
/// Encrypt packet, returning the length of the packet and whether padding was added
240-
pub(super) fn finish(self, conn: &mut Connection, buffer: &mut Vec<u8>) -> (usize, bool) {
241-
let pad = buffer.len() < self.min_size;
253+
pub(super) fn finish(self, conn: &mut Connection) -> (usize, bool) {
254+
debug_assert!(
255+
self.buf.len() <= self.buf.datagram_max_offset() - self.tag_len,
256+
"packet exceeds maximum size"
257+
);
258+
let pad = self.buf.len() < self.min_size;
242259
if pad {
243-
trace!("PADDING * {}", self.min_size - buffer.len());
244-
buffer.resize(self.min_size, 0);
260+
trace!("PADDING * {}", self.min_size - self.buf.len());
261+
self.buf.put_bytes(0, self.min_size - self.buf.len());
245262
}
246263

247264
let space = &conn.spaces[self.space];
@@ -260,16 +277,35 @@ impl PacketBuilder {
260277
"Mismatching crypto tag len"
261278
);
262279

263-
buffer.resize(buffer.len() + packet_crypto.tag_len(), 0);
280+
self.buf.put_bytes(0, packet_crypto.tag_len());
264281
let encode_start = self.partial_encode.start;
265-
let packet_buf = &mut buffer[encode_start..];
282+
let packet_buf = &mut self.buf.as_mut_slice()[encode_start..];
266283
// for packet protection, PathId(0) and no path are equivalent.
267284
self.partial_encode.finish(
268285
packet_buf,
269286
header_crypto,
270287
Some((self.exact_number, self.path, packet_crypto)),
271288
);
272289

273-
(buffer.len() - encode_start, pad)
290+
let packet_len = self.buf.len() - encode_start;
291+
trace!(size = %packet_len, short_header = %self.short_header, "wrote packet");
292+
(packet_len, pad)
293+
}
294+
295+
/// The number of additional bytes the current packet would take up if it was finished now
296+
///
297+
/// This will include any padding which is required to make the size large enough to be
298+
/// encrypted correctly.
299+
pub(super) fn predict_packet_end(&self) -> usize {
300+
self.buf.len().max(self.min_size) + self.tag_len - self.buf.len()
301+
}
302+
303+
/// Returns the remaining space in the packet that can be taken up by QUIC frames
304+
///
305+
/// This leaves space in the datagram for the cryptographic tag that needs to be written
306+
/// when the packet is finished.
307+
pub(super) fn frame_space_remaining(&self) -> usize {
308+
let max_offset = self.buf.datagram_max_offset() - self.tag_len;
309+
max_offset.saturating_sub(self.buf.len())
274310
}
275311
}

quinn-proto/src/connection/streams/state.rs

Lines changed: 28 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -411,14 +411,13 @@ impl StreamsState {
411411

412412
pub(in crate::connection) fn write_control_frames(
413413
&mut self,
414-
buf: &mut Vec<u8>,
414+
buf: &mut impl BufMut,
415415
pending: &mut Retransmits,
416416
retransmits: &mut ThinRetransmits,
417417
stats: &mut FrameStats,
418-
max_size: usize,
419418
) {
420419
// RESET_STREAM
421-
while buf.len() + frame::ResetStream::SIZE_BOUND < max_size {
420+
while buf.remaining_mut() > frame::ResetStream::SIZE_BOUND {
422421
let (id, error_code) = match pending.reset_stream.pop() {
423422
Some(x) => x,
424423
None => break,
@@ -442,7 +441,7 @@ impl StreamsState {
442441
}
443442

444443
// STOP_SENDING
445-
while buf.len() + frame::StopSending::SIZE_BOUND < max_size {
444+
while buf.remaining_mut() > frame::StopSending::SIZE_BOUND {
446445
let frame = match pending.stop_sending.pop() {
447446
Some(x) => x,
448447
None => break,
@@ -461,7 +460,7 @@ impl StreamsState {
461460
}
462461

463462
// MAX_DATA
464-
if pending.max_data && buf.len() + 9 < max_size {
463+
if pending.max_data && buf.remaining_mut() > 9 {
465464
pending.max_data = false;
466465

467466
// `local_max_data` can grow bigger than `VarInt`.
@@ -484,7 +483,7 @@ impl StreamsState {
484483
}
485484

486485
// MAX_STREAM_DATA
487-
while buf.len() + 17 < max_size {
486+
while buf.remaining_mut() > 17 {
488487
let id = match pending.max_stream_data.iter().next() {
489488
Some(x) => *x,
490489
None => break,
@@ -516,7 +515,7 @@ impl StreamsState {
516515

517516
// MAX_STREAMS
518517
for dir in Dir::iter() {
519-
if !pending.max_stream_id[dir as usize] || buf.len() + 9 >= max_size {
518+
if !pending.max_stream_id[dir as usize] || buf.remaining_mut() <= 9 {
520519
continue;
521520
}
522521

@@ -541,21 +540,14 @@ impl StreamsState {
541540

542541
pub(crate) fn write_stream_frames(
543542
&mut self,
544-
buf: &mut Vec<u8>,
545-
max_buf_size: usize,
543+
buf: &mut impl BufMut,
546544
fair: bool,
547545
) -> StreamMetaVec {
548546
let mut stream_frames = StreamMetaVec::new();
549-
while buf.len() + frame::Stream::SIZE_BOUND < max_buf_size {
550-
if max_buf_size
551-
.checked_sub(buf.len() + frame::Stream::SIZE_BOUND)
552-
.is_none()
553-
{
554-
break;
555-
}
556-
557-
// Pop the stream of the highest priority that currently has pending data
558-
// If the stream still has some pending data left after writing, it will be reinserted, otherwise not
547+
while buf.remaining_mut() > frame::Stream::SIZE_BOUND {
548+
// Pop the stream of the highest priority that currently has pending data. If
549+
// the stream still has some pending data left after writing, it will be
550+
// reinserted, otherwise not
559551
let Some(stream) = self.pending.pop() else {
560552
break;
561553
};
@@ -577,7 +569,7 @@ impl StreamsState {
577569

578570
// Now that we know the `StreamId`, we can better account for how many bytes
579571
// are required to encode it.
580-
let max_buf_size = max_buf_size - buf.len() - 1 - VarInt::size(id.into());
572+
let max_buf_size = buf.remaining_mut() - 1 - VarInt::size(id.into());
581573
let (offsets, encode_length) = stream.pending.poll_transmit(max_buf_size);
582574
let fin = offsets.end == stream.pending.offset()
583575
&& matches!(stream.state, SendState::DataSent { .. });
@@ -1380,7 +1372,7 @@ mod tests {
13801372
high.write(b"high").unwrap();
13811373

13821374
let mut buf = Vec::with_capacity(40);
1383-
let meta = server.write_stream_frames(&mut buf, 40, true);
1375+
let meta = server.write_stream_frames(&mut buf, true);
13841376
assert_eq!(meta[0].id, id_high);
13851377
assert_eq!(meta[1].id, id_mid);
13861378
assert_eq!(meta[2].id, id_low);
@@ -1438,16 +1430,18 @@ mod tests {
14381430
};
14391431
high.set_priority(-1).unwrap();
14401432

1441-
let mut buf = Vec::with_capacity(1000);
1442-
let meta = server.write_stream_frames(&mut buf, 40, true);
1433+
let mut buf = Vec::with_capacity(1000).limit(40);
1434+
let meta = server.write_stream_frames(&mut buf, true);
14431435
assert_eq!(meta.len(), 1);
14441436
assert_eq!(meta[0].id, id_high);
14451437

14461438
// After requeuing we should end up with 2 priorities - not 3
14471439
assert_eq!(server.pending.len(), 2);
14481440

1441+
let mut buf = buf.into_inner();
1442+
14491443
// Send the remaining data. The initial mid priority one should go first now
1450-
let meta = server.write_stream_frames(&mut buf, 1000, true);
1444+
let meta = server.write_stream_frames(&mut buf, true);
14511445
assert_eq!(meta.len(), 2);
14521446
assert_eq!(meta[0].id, id_mid);
14531447
assert_eq!(meta[1].id, id_high);
@@ -1507,12 +1501,13 @@ mod tests {
15071501

15081502
// loop until all the streams are written
15091503
loop {
1510-
let buf_len = buf.len();
1511-
let meta = server.write_stream_frames(&mut buf, buf_len + 40, fair);
1504+
let mut chunk_buf = buf.limit(40);
1505+
let meta = server.write_stream_frames(&mut chunk_buf, fair);
15121506
if meta.is_empty() {
15131507
break;
15141508
}
15151509
metas.extend(meta);
1510+
buf = chunk_buf.into_inner();
15161511
}
15171512

15181513
assert!(!server.can_send_stream_data());
@@ -1575,11 +1570,12 @@ mod tests {
15751570
stream_b.write(&[b'b'; 100]).unwrap();
15761571

15771572
let mut metas = vec![];
1578-
let mut buf = Vec::with_capacity(1024);
1573+
let buf = Vec::with_capacity(1024);
15791574

15801575
// Write the first chunk of stream_a
1581-
let buf_len = buf.len();
1582-
let meta = server.write_stream_frames(&mut buf, buf_len + 40, false);
1576+
let mut chunk_buf = buf.limit(40);
1577+
let meta = server.write_stream_frames(&mut chunk_buf, false);
1578+
let mut buf = chunk_buf.into_inner();
15831579
assert!(!meta.is_empty());
15841580
metas.extend(meta);
15851581

@@ -1595,8 +1591,9 @@ mod tests {
15951591

15961592
// loop until all the streams are written
15971593
loop {
1598-
let buf_len = buf.len();
1599-
let meta = server.write_stream_frames(&mut buf, buf_len + 40, false);
1594+
let mut chunk_buf = buf.limit(40);
1595+
let meta = server.write_stream_frames(&mut chunk_buf, false);
1596+
buf = chunk_buf.into_inner();
16001597
if meta.is_empty() {
16011598
break;
16021599
}

0 commit comments

Comments
 (0)