diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index c6961558e..d0a2cafad 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -229,7 +229,7 @@ pub struct Connection { // Queued non-retransmittable 1-RTT data // /// If the CONNECTION_CLOSE frame needs to be sent - close: bool, + connection_close_pending: bool, // // ACK frequency @@ -325,6 +325,57 @@ pub struct Connection { qlog: QlogSink, } +/// Return value for [`Connection::poll_transmit_path`]. +#[derive(Debug)] +enum PollPathStatus { + /// Nothing to send on the path, nothing was written into the [`TransmitBuf`]. + NothingToSend { + /// If true there was data to send but congestion control did not allow so. + congestion_blocked: bool, + }, + /// One or more packets have been written into the [`TransmitBuf`] and should be sent. + Send, +} + +/// Return value for [`Connection::poll_transmit_path_space`]. +#[derive(Debug)] +enum PollPathSpaceStatus { + /// Nothing to send in the space, nothing was written into the [`TransmitBuf`]. + NothingToSend { + /// If true there was data to send but congestion control did not allow so. + congestion_blocked: bool, + }, + /// One or more packets have been written into the [`TransmitBuf`]. + WrotePacket { + /// The highest packet number. + last_packet_number: u64, + /// Whether to pad an already started datagram in the next packet. + /// + /// When packets in Initial, 0-RTT or Handshake packet do not fill the entire + /// datagram they may decide to coalesce with the next packet from a higher + /// encryption level on the same path. But the earlier packet may require specific + /// size requirements for the datagram they are sent in. + /// + /// If a space did not complete the datagram, they use this to request the correct + /// padding in the final packet of the datagram so that the final datagram will have + /// the correct size. + /// + /// If a space did fill an entire datagram, it leaves this to the default of + /// [`PadDatagram::No`]. + pad_datagram: PadDatagram, + }, + /// Send the contents of the transmit immediately. + /// + /// Packets were written and the GSO batch must end now, regardless from whether higher + /// spaces still have frames to write. This is used when the last datagram written would + /// require too much padding to continue a GSO batch, which would waste space on the + /// wire. + Send { + /// The highest packet number written into the transmit. + last_packet_number: u64, + }, +} + impl Connection { pub(crate) fn new( endpoint_config: Arc, @@ -426,7 +477,7 @@ impl Connection { }, timers: TimerTable::default(), authentication_failures: 0, - close: false, + connection_close_pending: false, ack_frequency: AckFrequencyState::new(get_max_ack_delay( &TransportParameters::default(), @@ -941,20 +992,7 @@ impl Connection { // Each call to poll_transmit can only send datagrams to one destination, because // all datagrams in a GSO batch are for the same destination. Therefore only - // datagrams for one Path ID are produced for each poll_transmit call. - - // TODO(flub): this is wishful thinking and not actually implemented, but perhaps it - // should be: - - // First, if we have to send a close, select a path for that. - // Next, all paths that have a PATH_CHALLENGE or PATH_RESPONSE pending. - - // For all, open, validated and AVAILABLE paths: - // - Is the path congestion blocked or pacing blocked? - // - call maybe_queue_ to ensure a tail-loss probe would be sent? - // - do we need to send a close message? - // - call can_send - // Once there's nothing more to send on the AVAILABLE paths, do the same for BACKUP paths + // datagrams for one destination address are produced for each poll_transmit call. // Check whether we need to send a close message let close = match self.state.as_type() { @@ -965,7 +1003,7 @@ impl Connection { StateType::Draining | StateType::Closed => { // self.close is only reset once the associated packet had been // encoded successfully - if !self.close { + if !self.connection_close_pending { self.app_limited = true; return None; } @@ -974,7 +1012,7 @@ impl Connection { _ => false, }; - // Check whether we need to send an ACK_FREQUENCY frame + // Schedule an ACK_FREQUENCY frame if a new one needs to be sent. if let Some(config) = &self.config.ack_frequency_config { let rtt = self .paths @@ -989,119 +1027,330 @@ impl Connection { && self.peer_supports_ack_frequency(); } - // Whether this packet can be coalesced with another one in the same datagram. - let mut coalesce = true; + // TODO(flub): path scheduling logic might be buggy if there are only un-validated + // paths and PATH_STATUS_BACKUP paths. + + // Path scheduling logic is currently as such: + // + // - For any un-validated paths we only send frames that *must* be sent on that + // path. E.g. PATH_CHALLENGE, PATH_RESPONSE. + // + // - If there are any validated paths with CIDs and PathStatus::Available: + // - Frames that can be sent on any path, e.g. STREAM, DATAGRAM, are only sent on + // these available paths. + // - All other paths only send frames that *must* be sent on those paths, + // e.g. PATH_CHALLENGE, PATH_RESPONSE, tail-loss probes, keep alive PING. + // + // - If there are no validated paths with CIDs and PathStatus::Available all frames + // are sent on the earlierst possible path. + // + // For all this we use the *path_exclusive_only* boolean: If set to true, only + // frames that must be sent on the path will be built into the packet. + + // Is there any open, validated and status available path with dst CIDs? If so we'll + // want to set path_exclusive_only for any other paths. + let have_available_path = self.paths.iter().any(|(id, path)| { + path.data.validated + && path.data.local_status() == PathStatus::Available + && self.rem_cids.contains_key(id) + }); - // Whether the last packet in the datagram must be padded so the datagram takes up - // to at least MIN_INITIAL_SIZE, or to the maximum segment size if this is smaller. - let mut pad_datagram = PadDatagram::No; + // TODO: how to avoid the allocation? Cannot use a for loop because of + // borrowing. Maybe SmallVec or similar. + let path_ids: Vec<_> = self.paths.keys().copied().collect(); - // Whether congestion control stopped the next packet from being sent. Further - // packets could still be built, as e.g. tail-loss probes are not congestion - // limited. + // If we end up not sending anything, we need to know if that was because there was + // nothing to send or because we were congestion blocked. let mut congestion_blocked = false; - // The packet number of the last built packet. - let mut last_packet_number = None; + for &path_id in &path_ids { + if let Some(transmit) = self.poll_transmit_off_path(now, buf, path_id) { + return Some(transmit); + } - let mut path_id = *self.paths.first_key_value().expect("one path must exist").0; + // // Set the segment size to this path's MTU for on-path data. + // let pmtu = self.path_data(path_id).current_mtu().into(); + let mut transmit = TransmitBuf::new(buf, max_datagrams); - // If there is any open, validated and available path we only want to send frames to - // any backup path that must be sent on that backup path exclusively. - let have_available_path = self.paths.iter().any(|(id, path)| { - path.data.validated - && path.data.local_status() == PathStatus::Available - && self.rem_cids.contains_key(id) - }); + // Poll for on-path transmits. + match self.poll_transmit_on_path( + now, + &mut transmit, + path_id, + have_available_path, + close, + ) { + PollPathStatus::Send => { + let transmit = self.build_transmit(path_id, transmit); + return Some(transmit); + } + PollPathStatus::NothingToSend { + congestion_blocked: cb, + } => { + congestion_blocked |= cb; + // Continue checking other paths, tail-loss probes may need to be sent + // in all spaces. + debug_assert!( + transmit.is_empty(), + "nothing to send on path but transmit not empty" + ); + } + } + } + // We didn't produce any application data packet + debug_assert!( + buf.is_empty(), + "there was data in the buffer, but it was not sent" + ); + + self.app_limited = !congestion_blocked; + + if self.state.is_established() { + // Try MTU probing now + for path_id in path_ids { + // The MTU parameter here is really a dummy one, poll_transmit_mut_probe + // explicitly sets the size. + let mut transmit = TransmitBuf::new(buf, max_datagrams); + self.poll_transmit_mtu_probe(now, &mut transmit, path_id); + if !transmit.is_empty() { + let transmit = self.build_transmit(path_id, transmit); + return Some(transmit); + } + } + } + + None + } + + fn build_transmit(&mut self, path_id: PathId, transmit: TransmitBuf<'_>) -> Transmit { + debug_assert!( + !transmit.is_empty(), + "must not be called with an empty transmit buffer" + ); + + let network_path = self.path_data(path_id).network_path; + let last_datagram_len = transmit + .segment_size() + .map(|segment_size| transmit.len() % segment_size) + .unwrap_or_else(|| transmit.len()); + trace!( + segment_size = transmit.segment_size(), + last_datagram_len, + %network_path, + "sending {} bytes in {} datagrams", + transmit.len(), + transmit.num_datagrams() + ); + self.path_data_mut(path_id) + .inc_total_sent(transmit.len() as u64); + + self.stats + .udp_tx + .on_sent(transmit.num_datagrams() as u64, transmit.len()); + self.path_stats + .entry(path_id) + .or_default() + .udp_tx + .on_sent(transmit.num_datagrams() as u64, transmit.len()); + + Transmit { + destination: network_path.remote, + size: transmit.len(), + ecn: if self.path_data(path_id).sending_ecn { + Some(EcnCodepoint::Ect0) + } else { + None + }, + segment_size: transmit.segment_size(), + src_ip: network_path.local_ip, + } + } + + /// poll_transmit logic for off-path data. + fn poll_transmit_off_path( + &mut self, + now: Instant, + buf: &mut Vec, + path_id: PathId, + ) -> Option { if let Some(challenge) = self.send_prev_path_challenge(now, buf, path_id) { return Some(challenge); } - - // Send an off-path PATH_RESPONSE. Prioritized over on-path data to ensure that - // path validation can occur while the link is saturated. - // TODO(@divma): I have no idea what this comment is supposed to mean if let Some(response) = self.send_off_path_path_response(now, buf, path_id) { return Some(response); } + None + } - let mut space_id = match path_id { - PathId::ZERO => SpaceId::Initial, - _ => SpaceId::Data, + /// poll_transmit logic for on-path data. + /// + /// This is not quite the same as for a multipath packet space, since [`PathId::ZERO`] + /// has 3 packet spaces, which this handles. + /// + /// See [`Self::poll_transmit_off_path`] for off-path data. + #[must_use] + fn poll_transmit_on_path( + &mut self, + now: Instant, + transmit: &mut TransmitBuf<'_>, + path_id: PathId, + have_available_path: bool, + close: bool, + ) -> PollPathStatus { + // Check if there is at least one active CID to use for sending + let Some(remote_cid) = self.rem_cids.get(&path_id).map(CidQueue::active) else { + if self.abandoned_paths.contains(&path_id) { + trace!(%path_id, "remote CIDs retired for abandoned path"); + } else { + debug!(%path_id, "no remote CIDs for path"); + } + return PollPathStatus::NothingToSend { + congestion_blocked: false, + }; }; - // Setup for the first path_id - let mut transmit = TransmitBuf::new( - buf, - max_datagrams, - self.path_data(path_id).current_mtu().into(), - ); + // Whether the last packet in the datagram must be padded so the datagram takes up + // an exact size. An earlier space can decide to not fill an entire datagram and + // require the next space to fill it further. But may need a specific size of the + // datagram containing the packet. The final packet built in the datagram must pad + // to this size. + let mut pad_datagram = PadDatagram::No; - loop { - // check if there is at least one active CID to use for sending - let Some(remote_cid) = self.rem_cids.get(&path_id).map(CidQueue::active) else { - let err = PathError::RemoteCidsExhausted; - if !self.abandoned_paths.contains(&path_id) { - debug!(?err, %path_id, "no active CID for path"); - self.events.push_back(Event::Path(PathEvent::LocallyClosed { - id: path_id, - error: err, - })); - // Locally we should have refused to open this path, the remote should - // have given us CIDs for this path before opening it. So we can always - // abandon this here. - self.close_path( - now, - path_id, - TransportErrorCode::NO_CID_AVAILABLE_FOR_PATH.into(), - ) - .ok(); - self.spaces[SpaceId::Data] - .pending - .path_cids_blocked - .insert(path_id); - } else { - trace!(%path_id, "remote CIDs retired for abandoned path"); - } + // The packet number of the last built packet. This is kept kept across spaces. + // QUIC is supposed to have a single congestion controller for the Initial, + // Handshake and Data(PathId::ZERO) spaces. + let mut last_packet_number = None; - let Some(next_path_id) = self.paths.keys().find(|&&next| next > path_id) else { - // Nothing more to send. - trace!(?space_id, %path_id, "no CIDs to send on path, no more paths"); - break; - }; - // See if this next path can send anything. - path_id = *next_path_id; - space_id = SpaceId::Data; - - transmit = { - let (buf, _) = transmit.finish(); - if let Some(challenge) = self.send_prev_path_challenge(now, buf, path_id) { - return Some(challenge); - } else if let Some(response) = - self.send_off_path_path_response(now, buf, path_id) - { - return Some(response); - } - { - let pmtu = self.path_data(path_id).current_mtu().into(); - TransmitBuf::new(buf, max_datagrams, pmtu) - } - }; + // If we end up not sending anything, we need to know if that was because there was + // nothing to send or because we were congestion blocked. + let mut congestion_blocked = false; + // Iterate over the available spaces. + for space_id in SpaceId::iter() { + // Only PathId::ZERO uses non Data space ids. + if path_id != PathId::ZERO && space_id != SpaceId::Data { continue; - }; + } + match self.poll_transmit_path_space( + now, + transmit, + path_id, + space_id, + remote_cid, + have_available_path, + close, + pad_datagram, + ) { + PollPathSpaceStatus::NothingToSend { + congestion_blocked: cb, + } => { + congestion_blocked |= cb; + // Continue checking other spaces, tail-loss probes may need to be sent + // in all spaces. + } + PollPathSpaceStatus::WrotePacket { + last_packet_number: pn, + pad_datagram: pad, + } => { + debug_assert!(!transmit.is_empty(), "transmit must contain packets"); + last_packet_number = Some(pn); + pad_datagram = pad; + // Always check higher spaces. If the transmit is full or they have + // nothing to send they will not write packets. But if they can, they + // must always be allowed to add to this transmit because coalescing may + // be required. + continue; + } + PollPathSpaceStatus::Send { + last_packet_number: pn, + } => { + debug_assert!(!transmit.is_empty(), "transmit must contain packets"); + last_packet_number = Some(pn); + break; + } + } + } + + if last_packet_number.is_some() || congestion_blocked { + self.qlog.emit_recovery_metrics( + path_id, + &mut self.paths.get_mut(&path_id).unwrap().data, + now, + ); + } + + match last_packet_number { + Some(last_packet_number) => { + // Note that when sending in multiple spaces the last packet number will be + // the one from the highest space. + self.path_data_mut(path_id).congestion.on_sent( + now, + transmit.len() as u64, + last_packet_number, + ); + + PollPathStatus::Send + } + None => PollPathStatus::NothingToSend { congestion_blocked }, + } + } + + /// poll_transmit logic for a QUIC-MULTIPATH packet number space (PathID + SpaceId). + #[must_use] + fn poll_transmit_path_space( + &mut self, + now: Instant, + transmit: &mut TransmitBuf<'_>, + path_id: PathId, + space_id: SpaceId, + remote_cid: ConnectionId, + // If any other packet space has a usable path with PathStatus::Available. + have_available_path: bool, + // If we need to send a CONNECTION_CLOSE frame. + connection_close_pending: bool, + // Whether the current datagram needs to be padded to a certain size. + mut pad_datagram: PadDatagram, + ) -> PollPathSpaceStatus { + // Keep track of the last packet number we wrote. If None we did not write any + // packets. + let mut last_packet_number = None; - // Determine if anything can be sent in this packet number space (SpaceId + - // PathId). + // Each loop of this may build one packet. It works logically as follows: + // + // - Check if something *needs* to be sent in this space and *can* be sent. + // - If not, return to the caller who will call us again for the next space. + // - Start a new datagram. + // - Unless coalescing the packet into an existing datagram. + // - Write the packet header and payload. + // - Check if coalescing a next packet into the datagram is possible. + // - If coalescing, finish packet without padding to leave space in the datagram. + // - If not coalescing, complete the datagram: + // - Finish packet with padding. + // - Set the transmit segment size if this is the first datagram. + // - Loop: next iteration will exit the loop if nothing more to send in this + // space. The TransmitBuf will contain a started datagram with space if + // coalescing, or completely filled datagram if not coalescing. + loop { + // Determine if anything can be sent in this packet number space (SpaceId + PathId). let max_packet_size = if transmit.datagram_remaining_mut() > 0 { - // We are trying to coalesce another packet into this datagram. + // A datagram is started already, we are coalescing another packet into it. transmit.datagram_remaining_mut() } else { - // A new datagram needs to be started. - transmit.segment_size() + // A new datagram needs to be started. The first datagram is sized to the + // MTU. All other datagrams can be at most the size of the previous + // datagram. + match transmit.num_datagrams { + 0 => self.path_data(path_id).current_mtu().into(), + _ => transmit.max_datagram_size(), + } }; - let can_send = self.space_can_send(space_id, path_id, max_packet_size, close); - let path_should_send = { + let can_send = + self.space_can_send(space_id, path_id, max_packet_size, connection_close_pending); + + // Whether we would like to send any frames on this packet space. See the packet + // scheduling described in poll_transmit. + let space_should_send = { let path_exclusive_only = space_id == SpaceId::Data && have_available_path && self.path_data(path_id).local_status() == PathStatus::Backup; @@ -1114,102 +1363,94 @@ impl Connection { path_should_send || needs_loss_probe }; - if !path_should_send && space_id < SpaceId::Data { - if self.spaces[space_id].crypto.is_some() { - trace!(?space_id, %path_id, "nothing to send in space"); - } - space_id = space_id.next(); - continue; - } - - let send_blocked = if path_should_send && transmit.datagram_remaining_mut() == 0 { - // Only check congestion control if a new datagram is needed. - self.path_congestion_check(space_id, path_id, &transmit, &can_send, now) - } else { - PathBlocked::No - }; - if send_blocked != PathBlocked::No { - trace!(?space_id, %path_id, ?send_blocked, "congestion blocked"); - congestion_blocked = true; - } - if send_blocked != PathBlocked::No && space_id < SpaceId::Data { - // Higher spaces might still have tail-loss probes to send, which are not - // congestion blocked. - space_id = space_id.next(); - continue; - } - if !path_should_send || send_blocked != PathBlocked::No { - // Nothing more to send on this path, check the next path if possible. - - // If there are any datagrams in the transmit, packets for another path can - // not be built. - if transmit.num_datagrams() > 0 { - break; - } - - let Some(next_path_id) = self.paths.keys().find(|&&next| next > path_id) else { - // Nothing more to send and no more paths to check - trace!(?space_id, %path_id, next_path_id=?None::, "nothing to send on path"); - break; - }; - - // See if this next path can send anything. - trace!(?space_id, %path_id, %next_path_id, "nothing to send on path"); - path_id = *next_path_id; - space_id = SpaceId::Data; - - transmit = { - let (buf, _) = transmit.finish(); - // Check off path data for the new path_id - if let Some(challenge) = self.send_prev_path_challenge(now, buf, path_id) { - return Some(challenge); - } else if let Some(response) = - self.send_off_path_path_response(now, buf, path_id) - { - return Some(response); - } else { - // Continue with on path data - // New path_id => fresh transmit - let pmtu = self.path_data(path_id).current_mtu().into(); - TransmitBuf::new(buf, max_datagrams, pmtu) + if !space_should_send { + // Nothing more to send. Previous iterations of this loop may have built + // packets already. + return match last_packet_number { + Some(pn) => PollPathSpaceStatus::WrotePacket { + last_packet_number: pn, + pad_datagram, + }, + None => { + // Only log for spaces which have crypto. + if self.spaces[space_id].crypto.is_some() + || (space_id == SpaceId::Data && self.zero_rtt_crypto.is_some()) + { + trace!(?space_id, %path_id, "nothing to send in space"); + } + return PollPathSpaceStatus::NothingToSend { + congestion_blocked: false, + }; } }; + } - continue; + // We want to send on this space, check congestion control if we can. But only + // if we will need to start a new datagram. If we are coalescing into an already + // started datagram we do not need to check congestion control again. + if transmit.datagram_remaining_mut() == 0 { + let congestion_blocked = + self.path_congestion_check(space_id, path_id, transmit, &can_send, now); + if congestion_blocked != PathBlocked::No { + // Previous iterations of this loop may have built packets already. + return match last_packet_number { + Some(pn) => PollPathSpaceStatus::WrotePacket { + last_packet_number: pn, + pad_datagram, + }, + None => { + return PollPathSpaceStatus::NothingToSend { + congestion_blocked: true, + }; + } + }; + } } - // If the datagram is full, we need to start a new one. + // If the datagram is full (or there never was one started), we need to start a + // new one. if transmit.datagram_remaining_mut() == 0 { if transmit.num_datagrams() >= transmit.max_datagrams().get() { - // No more datagrams allowed - break; + // No more datagrams allowed. + // Previous iterations of this loop may have built packets already. + return match last_packet_number { + Some(pn) => PollPathSpaceStatus::WrotePacket { + last_packet_number: pn, + pad_datagram, + }, + None => { + return PollPathSpaceStatus::NothingToSend { + congestion_blocked: false, + }; + } + }; } - match self.spaces[space_id].for_path(path_id).loss_probes { - 0 => transmit.start_new_datagram(), - _ => { - // We need something to send for a tail-loss probe. - let request_immediate_ack = - space_id == SpaceId::Data && self.peer_supports_ack_frequency(); - self.spaces[space_id].maybe_queue_probe( - path_id, - request_immediate_ack, - &self.streams, - ); + if transmit.num_datagrams() == 0 { + let mtu = match self.spaces[space_id].for_path(path_id).loss_probes { + 0 => self.path_data(path_id).current_mtu(), + _ => INITIAL_MTU, + }; + transmit.start_first_datagram(mtu.into()); + } else { + transmit.start_datagram(); + } + trace!(count = transmit.num_datagrams(), "new datagram started"); - self.spaces[space_id].for_path(path_id).loss_probes -= 1; + // We need something to send for a tail-loss probe. + if self.spaces[space_id].for_path(path_id).loss_probes > 0 { + let request_immediate_ack = + space_id == SpaceId::Data && self.peer_supports_ack_frequency(); + self.spaces[space_id].maybe_queue_probe( + path_id, + request_immediate_ack, + &self.streams, + ); - // Clamp the datagram to at most the minimum MTU to ensure that loss - // probes can get through and enable recovery even if the path MTU - // has shrank unexpectedly. - transmit.start_new_datagram_with_size(std::cmp::min( - usize::from(INITIAL_MTU), - transmit.segment_size(), - )); - } + self.spaces[space_id].for_path(path_id).loss_probes -= 1; } - trace!(count = transmit.num_datagrams(), "new datagram started"); - coalesce = true; + + // We started a new datagram, we decide later if it needs padding. pad_datagram = PadDatagram::No; } @@ -1235,24 +1476,33 @@ impl Connection { prev.update_unacked = false; } - let mut builder = PacketBuilder::new( + let Some(mut builder) = PacketBuilder::new( now, space_id, path_id, remote_cid, - &mut transmit, + transmit, can_send.other, self, - )?; - last_packet_number = Some(builder.exact_number); - coalesce = coalesce && builder.can_coalesce; + ) else { + // Confidentiality limit is exceeded and the connection has been killed. We + // should not send any other packets. This works in a roundabout way: We + // have started a datagram but not written anything into it. So even if we + // get called again for another space we will see an already started + // datagram and try and start another packet here. Then be stopped by the + // same confidentiality limit. + return PollPathSpaceStatus::NothingToSend { + congestion_blocked: false, + }; + }; + last_packet_number = Some(builder.packet_number); if space_id == SpaceId::Initial && (self.side.is_client() || can_send.other) { // https://www.rfc-editor.org/rfc/rfc9000.html#section-14.1 pad_datagram |= PadDatagram::ToMinMtu; } if space_id == SpaceId::Data && self.config.pad_to_mtu { - pad_datagram |= PadDatagram::ToSegmentSize; + pad_datagram |= PadDatagram::ToMaxSize; } if can_send.close { @@ -1284,6 +1534,7 @@ impl Connection { // Since there only 64 ACK frames there will always be enough space // to encode the ConnectionClose frame too. However we still have the // check here to prevent crashes if something changes. + // TODO(flub): This needs fixing for multipath. debug_assert!( builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND, "ACKs should leave space for ConnectionClose" @@ -1308,24 +1559,36 @@ impl Connection { }; builder.write_frame(close.encoder(max_frame_size), stats); } + let last_pn = builder.packet_number; builder.finish_and_track(now, self, path_id, pad_datagram); if space_id == self.highest_space { // Don't send another close packet. Even with multipath we only send // CONNECTION_CLOSE on a single path since we expect our paths to work. - self.close = false; - // `CONNECTION_CLOSE` is the final packet - break; - } else { - // Send a close frame in every possible space for robustness, per - // RFC9000 "Immediate Close during the Handshake". Don't bother trying - // to send anything else. - space_id = space_id.next(); - continue; + self.connection_close_pending = false; } + // Send a close frame in every possible space for robustness, per + // RFC9000 "Immediate Close during the Handshake". Don't bother trying + // to send anything else. + // TODO(flub): This breaks during the handshake if we can not coalesce + // packets due to space reasons: the next space would either fail a + // debug_assert checking for enough packet space or produce an invalid + // packet. We need to keep track of per-space pending CONNECTION_CLOSE to + // be able to send these across multiple calls to poll_transmit. Then + // check for coalescing space here because initial packets need to be in + // padded datagrams. And also add space checks for CONNECTION_CLOSE in + // space_can_send so it would stop a GSO batch if the datagram is too + // small for another CONNECTION_CLOSE packet. + return PollPathSpaceStatus::WrotePacket { + last_packet_number: last_pn, + pad_datagram, + }; } + // If this boolean is true we only want to send frames which can not be sent on + // any other path. See the path scheduling notes in Self::poll_transmit. let path_exclusive_only = have_available_path && self.path_data(path_id).local_status() == PathStatus::Backup; + self.populate_packet(now, space_id, path_id, path_exclusive_only, &mut builder); // ACK-only packets should only be sent when explicitly allowed. If we write them due to @@ -1338,7 +1601,7 @@ impl Connection { !(builder.sent_frames().is_ack_only(&self.streams) && !can_send.acks && can_send.other - && builder.buf.segment_size() + && builder.buf.max_datagram_size() == self.path_data(path_id).current_mtu() as usize && self.datagrams.outgoing.is_empty()), "SendableFrames was {can_send:?}, but only ACKs have been written" @@ -1364,23 +1627,25 @@ impl Connection { // might be needed because of the packet type, or to fill the GSO segment size. // Are we allowed to coalesce AND is there enough space for another *packet* in - // this datagram AND is there another packet to send in this or the next space? - if coalesce - && builder + // this datagram AND will we definitely send another packet? + if builder.can_coalesce && path_id == PathId::ZERO && { + let max_packet_size = builder .buf .datagram_remaining_mut() - .saturating_sub(builder.predict_packet_end()) - > MIN_PACKET_SPACE - && self - .next_send_space(space_id, path_id, builder.buf, close) - .is_some() - { + .saturating_sub(builder.predict_packet_end()); + max_packet_size > MIN_PACKET_SPACE + && self.has_pending_packet(space_id, max_packet_size, connection_close_pending) + } { // We can append/coalesce the next packet into the current // datagram. Finish the current packet without adding extra padding. + trace!("will coalesce with next packet"); builder.finish_and_track(now, self, path_id, PadDatagram::No); } else { // We need a new datagram for the next packet. Finish the current // packet with padding. + // TODO(flub): if there isn't any more data to be sent, this will still pad + // to the segment size and only discover there is nothing to send before + // starting the next packet. That is wasting up to 32 bytes. if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) { // If too many padding bytes would be required to continue the // GSO batch after this packet, end the GSO batch here. Ensures @@ -1397,182 +1662,137 @@ impl Connection { "GSO truncated by demand for {} padding bytes", builder.buf.datagram_remaining_mut() - builder.predict_packet_end() ); + let last_pn = builder.packet_number; builder.finish_and_track(now, self, path_id, PadDatagram::No); - break; + return PollPathSpaceStatus::Send { + last_packet_number: last_pn, + }; } // Pad the current datagram to GSO segment size so it can be // included in the GSO batch. - builder.finish_and_track(now, self, path_id, PadDatagram::ToSegmentSize); + builder.finish_and_track(now, self, path_id, PadDatagram::ToMaxSize); } else { builder.finish_and_track(now, self, path_id, pad_datagram); } + + // If this is the first datagram we set its maximum size to this finished + // packet. if transmit.num_datagrams() == 1 { - transmit.clip_datagram_size(); + transmit.end_first_datagram(); } } } + } - if let Some(last_packet_number) = last_packet_number { - // Note that when sending in multiple packet spaces the last packet number will - // be the one from the highest packet space. - self.path_data_mut(path_id).congestion.on_sent( - now, - transmit.len() as u64, - last_packet_number, - ); - } - - self.qlog.emit_recovery_metrics( - path_id, - &mut self.paths.get_mut(&path_id).unwrap().data, - now, - ); - - self.app_limited = transmit.is_empty() && !congestion_blocked; - - // Send MTU probe if necessary - if transmit.is_empty() && self.state.is_established() { - // MTU probing happens only in Data space. - let space_id = SpaceId::Data; - path_id = *self.paths.first_key_value().expect("one path must exist").0; - let probe_data = loop { - // We MTU probe all paths for which all of the following is true: - // - We have an active destination CID for the path. - // - The remote address *and* path are validated. - // - The path is not abandoned. - // - The MTU Discovery subsystem wants to probe the path. - let active_cid = self.rem_cids.get(&path_id).map(CidQueue::active); - let eligible = self.path_data(path_id).validated - && !self.path_data(path_id).is_validating_path() - && !self.abandoned_paths.contains(&path_id); - let probe_size = eligible - .then(|| { - let next_pn = self.spaces[space_id].for_path(path_id).peek_tx_number(); - self.path_data_mut(path_id).mtud.poll_transmit(now, next_pn) - }) - .flatten(); - match (active_cid, probe_size) { - (Some(active_cid), Some(probe_size)) => { - // Let's send an MTUD probe! - break Some((active_cid, probe_size)); - } - _ => { - // Find the next path to check if it needs an MTUD probe. - match self.paths.keys().find(|&&next| next > path_id) { - Some(next) => { - path_id = *next; - continue; - } - None => break None, - } - } - } - }; - if let Some((active_cid, probe_size)) = probe_data { - // We are definitely sending a DPLPMTUD probe. - debug_assert_eq!(transmit.num_datagrams(), 0); - transmit.start_new_datagram_with_size(probe_size as usize); - - let mut builder = PacketBuilder::new( - now, - space_id, - path_id, - active_cid, - &mut transmit, - true, - self, - )?; + fn poll_transmit_mtu_probe( + &mut self, + now: Instant, + transmit: &mut TransmitBuf<'_>, + path_id: PathId, + ) { + let Some((active_cid, probe_size)) = self.get_mtu_probe_data(now, path_id) else { + return; + }; - // We implement MTU probes as ping packets padded up to the probe size - trace!(?probe_size, "writing MTUD probe"); - builder.write_frame(frame::Ping, &mut self.stats.frame_tx); + // We are definitely sending a DPLPMTUD probe. + transmit.start_first_datagram(probe_size.into()); - // If supported by the peer, we want no delays to the probe's ACK - if self.peer_supports_ack_frequency() { - builder.write_frame(frame::ImmediateAck, &mut self.stats.frame_tx); - } + let Some(mut builder) = PacketBuilder::new( + now, + SpaceId::Data, + path_id, + active_cid, + transmit, + true, + self, + ) else { + return; + }; - builder.finish_and_track(now, self, path_id, PadDatagram::ToSize(probe_size)); + // We implement MTU probes as ping packets padded up to the probe size + trace!(?probe_size, "writing MTUD probe"); + builder.write_frame(frame::Ping, &mut self.stats.frame_tx); - self.path_stats - .entry(path_id) - .or_default() - .sent_plpmtud_probes += 1; - } + // If supported by the peer, we want no delays to the probe's ACK + if self.peer_supports_ack_frequency() { + builder.write_frame(frame::ImmediateAck, &mut self.stats.frame_tx); } - if transmit.is_empty() { - return None; - } + builder.finish_and_track(now, self, path_id, PadDatagram::ToSize(probe_size)); - let network_path = self.path_data(path_id).network_path; - trace!( - segment_size = transmit.segment_size(), - last_datagram_len = transmit.len() % transmit.segment_size(), - %network_path, - "sending {} bytes in {} datagrams", - transmit.len(), - transmit.num_datagrams() - ); - self.path_data_mut(path_id) - .inc_total_sent(transmit.len() as u64); - - self.stats - .udp_tx - .on_sent(transmit.num_datagrams() as u64, transmit.len()); self.path_stats .entry(path_id) .or_default() - .udp_tx - .on_sent(transmit.num_datagrams() as u64, transmit.len()); + .sent_plpmtud_probes += 1; + } - Some(Transmit { - destination: network_path.remote, - size: transmit.len(), - ecn: if self.path_data(path_id).sending_ecn { - Some(EcnCodepoint::Ect0) - } else { - None - }, - segment_size: match transmit.num_datagrams() { - 1 => None, - _ => Some(transmit.segment_size()), - }, - src_ip: network_path.local_ip, - }) + /// Returns the CID and probe size if a DPLPMTUD probe is needed. + /// + /// We MTU probe all paths for which all of the following is true: + /// - We have an active destination CID for the path. + /// - The remote address *and* path are validated. + /// - The path is not abandoned. + /// - The MTU Discovery subsystem wants to probe the path. + fn get_mtu_probe_data(&mut self, now: Instant, path_id: PathId) -> Option<(ConnectionId, u16)> { + let active_cid = self.rem_cids.get(&path_id).map(CidQueue::active)?; + let is_eligible = self.path_data(path_id).validated + && !self.path_data(path_id).is_validating_path() + && !self.abandoned_paths.contains(&path_id); + + if !is_eligible { + return None; + } + let next_pn = self.spaces[SpaceId::Data] + .for_path(path_id) + .peek_tx_number(); + let probe_size = self + .path_data_mut(path_id) + .mtud + .poll_transmit(now, next_pn)?; + + Some((active_cid, probe_size)) } - /// Returns the [`SpaceId`] of the next packet space which has data to send + /// Returns true if there is a further packet to send on [`PathId::ZERO`]. + /// + /// In other words this is predicting whether the next call to + /// [`Connection::space_can_send`] issued will return some frames to be sent. Including + /// having to predict which packet number space it will be invoked with. This depends on + /// how both [`Connection::poll_transmit_path`] and + /// [`Connection::poll_transmit_path_space`] behave. /// - /// This takes into account the space available to frames in the next datagram. - // TODO(flub): This duplication is not nice. - fn next_send_space( + /// This is needed to determine if packet coalescing can happen. Because the last packet + /// in a datagram may need to be padded and thus we must know if another packet will + /// follow or not. + /// + /// The next packet can be either in the same space, or in one of the following spaces + /// on the same path. Because a 0-RTT packet can be coalesced with a 1-RTT packet and + /// both are in the Data(PathId::ZERO) space. Previous spaces are not checked, because + /// packets are built from Initial to Handshake to Data spaces. + fn has_pending_packet( &mut self, current_space_id: SpaceId, - path_id: PathId, - buf: &TransmitBuf<'_>, - close: bool, - ) -> Option { - // Number of bytes available for frames if this is a 1-RTT packet. We're guaranteed - // to be able to send an individual frame at least this large in the next 1-RTT - // packet. This could be generalized to support every space, but it's only needed to - // handle large fixed-size frames, which only exist in 1-RTT (application - // datagrams). We don't account for coalesced packets potentially occupying space - // because frames can always spill into the next datagram. + max_packet_size: usize, + connection_close_pending: bool, + ) -> bool { let mut space_id = current_space_id; loop { - let can_send = self.space_can_send(space_id, path_id, buf.segment_size(), close); - if !can_send.is_empty() || (close && self.spaces[space_id].crypto.is_some()) { - return Some(space_id); + let can_send = self.space_can_send( + space_id, + PathId::ZERO, + max_packet_size, + connection_close_pending, + ); + if !can_send.is_empty() { + return true; } - space_id = match space_id { - SpaceId::Initial => SpaceId::Handshake, - SpaceId::Handshake => SpaceId::Data, - SpaceId::Data => break, + match space_id.next() { + Some(next_space_id) => space_id = next_space_id, + None => break, } } - None + false } /// Checks if creating a new datagram would be blocked by congestion control @@ -1600,7 +1820,10 @@ impl Connection { // Congestion control check. // Tail loss probes must not be blocked by congestion, or a deadlock could arise. - let bytes_to_send = transmit.segment_size() as u64; + let bytes_to_send = match transmit.num_datagrams { + 0 => self.path_data(path_id).current_mtu().into(), + _ => transmit.max_datagram_size(), + } as u64; let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0; if can_send.other && !need_loss_probe && !can_send.close { @@ -1656,8 +1879,8 @@ impl Connection { SpaceId::Data, "PATH_CHALLENGE queued without 1-RTT keys" ); - let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into()); - buf.start_new_datagram(); + let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN); + buf.start_first_datagram(MIN_INITIAL_SIZE.into()); // Use the previous CID to avoid linking the new path with the previous path. We // don't bother accounting for possible retirement of that prev_cid because this is @@ -1711,8 +1934,8 @@ impl Connection { let frame = frame::PathResponse(token); - let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into()); - buf.start_new_datagram(); + let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN); + buf.start_first_datagram(MIN_INITIAL_SIZE.into()); let mut builder = PacketBuilder::new(now, SpaceId::Data, path_id, cid, buf, false, self)?; let stats = &mut self.stats.frame_tx; @@ -1739,13 +1962,14 @@ impl Connection { /// Indicate what types of frames are ready to send for the given space /// /// *packet_size* is the number of bytes available to build the next packet. - /// *close* indicates whether a CONNECTION_CLOSE frame needs to be sent. + /// *connection_close_pending* indicates whether a CONNECTION_CLOSE frame needs to be + /// sent. fn space_can_send( &mut self, space_id: SpaceId, path_id: PathId, packet_size: usize, - close: bool, + connection_close_pending: bool, ) -> SendableFrames { let space = &mut self.spaces[space_id]; let space_has_crypto = space.crypto.is_some(); @@ -1761,15 +1985,20 @@ impl Connection { let mut can_send = space.can_send(path_id, &self.streams); + // Check for 1RTT space. if space_id == SpaceId::Data { - // Check for 1RTT space. let pn = space.for_path(path_id).peek_tx_number(); + // Number of bytes available for frames if this is a 1-RTT packet. We're + // guaranteed to be able to send an individual frame at least this large in the + // next 1-RTT packet. This could be generalized to support every space, but it's + // only needed to handle large fixed-size frames, which only exist in 1-RTT + // (application datagrams). let frame_space_1rtt = packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id)); can_send |= self.can_send_1rtt(path_id, frame_space_1rtt); } - can_send.close = close && space_has_crypto; + can_send.close = connection_close_pending && space_has_crypto; can_send } @@ -2104,7 +2333,7 @@ impl Connection { if !was_closed { self.close_common(); self.set_close_timer(now); - self.close = true; + self.connection_close_pending = true; self.state.move_to_closed_local(reason); } } @@ -3165,7 +3394,7 @@ impl Connection { )); self.close_common(); self.set_close_timer(now); - self.close = true; + self.connection_close_pending = true; } return; } @@ -3812,7 +4041,7 @@ impl Connection { .get(&path_id) .map(|p| p.data.network_path) .unwrap_or(network_path); - self.close = network_path == path_remote; + self.connection_close_pending = network_path == path_remote; } } @@ -4885,7 +5114,7 @@ impl Connection { if let Some(reason) = close { self.state.move_to_draining(Some(reason.into())); - self.close = true; + self.connection_close_pending = true; } if Some(number) == self.spaces[SpaceId::Data].for_path(path_id).rx_packet @@ -5079,7 +5308,7 @@ impl Connection { path_exclusive_only: bool, builder: &mut PacketBuilder<'a, 'b>, ) { - let pn = builder.exact_number; + let pn = builder.packet_number; let is_multipath_negotiated = self.is_multipath_negotiated(); let stats = &mut self.stats.frame_tx; let space = &mut self.spaces[space_id]; diff --git a/quinn-proto/src/connection/packet_builder.rs b/quinn-proto/src/connection/packet_builder.rs index 80f0d990d..f3f766ddb 100644 --- a/quinn-proto/src/connection/packet_builder.rs +++ b/quinn-proto/src/connection/packet_builder.rs @@ -25,7 +25,7 @@ pub(super) struct PacketBuilder<'a, 'b> { path: PathId, pub(super) partial_encode: PartialEncode, pub(super) ack_eliciting: bool, - pub(super) exact_number: u64, + pub(super) packet_number: u64, /// Is this packet allowed to be coalesced? pub(super) can_coalesce: bool, /// Smallest absolute position in the associated buffer that must be occupied by this packet's @@ -93,11 +93,11 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { } let space = &mut conn.spaces[space_id]; - let exact_number = space.for_path(path_id).get_tx_number(&mut conn.rng); - let span = trace_span!("send", space = ?space_id, pn = exact_number, %path_id).entered(); + let packet_number = space.for_path(path_id).get_tx_number(&mut conn.rng); + let span = trace_span!("send", space = ?space_id, pn = packet_number, %path_id).entered(); let number = PacketNumber::new( - exact_number, + packet_number, space.for_path(path_id).largest_acked_packet.unwrap_or(0), ); let header = match space_id { @@ -171,7 +171,7 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { qlog.header( &header, - Some(exact_number), + Some(packet_number), space_id, space_id == SpaceId::Data && conn.spaces[SpaceId::Data].crypto.is_none(), path_id, @@ -182,7 +182,7 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { space: space_id, path: path_id, partial_encode, - exact_number, + packet_number, can_coalesce: header.can_coalesce(), min_size, tag_len, @@ -201,7 +201,7 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { path: PathId::ZERO, partial_encode: PartialEncode::no_header(), ack_eliciting: true, - exact_number: 0, + packet_number: 0, can_coalesce: true, min_size: 0, tag_len: 0, @@ -287,11 +287,11 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { match pad_datagram { PadDatagram::No => (), PadDatagram::ToSize(size) => self.pad_to(size), - PadDatagram::ToSegmentSize => self.pad_to(self.buf.segment_size() as u16), + PadDatagram::ToMaxSize => self.pad_to(self.buf.max_datagram_size() as u16), PadDatagram::ToMinMtu => self.pad_to(MIN_INITIAL_SIZE), } let ack_eliciting = self.ack_eliciting; - let exact_number = self.exact_number; + let packet_number = self.packet_number; let space_id = self.space; let (size, padded, sent) = self.finish(conn, now); @@ -311,7 +311,7 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { }; conn.paths.get_mut(&path_id).unwrap().data.sent( - exact_number, + packet_number, packet, conn.spaces[space_id].for_path(path_id), ); @@ -372,7 +372,7 @@ impl<'a, 'b> PacketBuilder<'a, 'b> { self.partial_encode.finish( packet_buf, header_crypto, - Some((self.exact_number, self.path, packet_crypto)), + Some((self.packet_number, self.path, packet_crypto)), ); let packet_len = self.buf.len() - encode_start; @@ -414,11 +414,11 @@ pub(super) enum PadDatagram { No, /// To a specific size ToSize(u16), - /// Pad to the current MTU/segment size + /// Pad to the maximum allowed size for this datagram. /// - /// For the first datagram in a transmit the MTU is the same as the - /// [`TransmitBuf::segment_size`]. - ToSegmentSize, + /// Usually this will be the path MTU as initialised by + /// [`TransmitBuf::start_first_datagram`]. + ToMaxSize, /// Pad to [`MIN_INITIAL_SIZE`], the minimal QUIC MTU of 1200 bytes ToMinMtu, } @@ -437,12 +437,12 @@ impl std::ops::BitOr for PadDatagram { (Self::No, rhs) => rhs, (Self::ToSize(size), Self::No) => Self::ToSize(size), (Self::ToSize(a), Self::ToSize(b)) => Self::ToSize(a.max(b)), - (Self::ToSize(_), Self::ToSegmentSize) => Self::ToSegmentSize, + (Self::ToSize(_), Self::ToMaxSize) => Self::ToMaxSize, (Self::ToSize(_), Self::ToMinMtu) => Self::ToMinMtu, - (Self::ToSegmentSize, Self::No) => Self::ToSegmentSize, - (Self::ToSegmentSize, Self::ToSize(_)) => Self::ToSegmentSize, - (Self::ToSegmentSize, Self::ToSegmentSize) => Self::ToSegmentSize, - (Self::ToSegmentSize, Self::ToMinMtu) => Self::ToMinMtu, + (Self::ToMaxSize, Self::No) => Self::ToMaxSize, + (Self::ToMaxSize, Self::ToSize(_)) => Self::ToMaxSize, + (Self::ToMaxSize, Self::ToMaxSize) => Self::ToMaxSize, + (Self::ToMaxSize, Self::ToMinMtu) => Self::ToMinMtu, (Self::ToMinMtu, _) => Self::ToMinMtu, } } diff --git a/quinn-proto/src/connection/streams/state.rs b/quinn-proto/src/connection/streams/state.rs index 5ed48991a..bfb5d59fb 100644 --- a/quinn-proto/src/connection/streams/state.rs +++ b/quinn-proto/src/connection/streams/state.rs @@ -565,8 +565,8 @@ impl StreamsState { #[cfg(test)] fn write_frames_for_test(&mut self, capacity: usize, fair: bool) -> frame::StreamMetaVec { let buf = &mut Vec::with_capacity(capacity); - let mut tbuf = crate::connection::TransmitBuf::new(buf, std::num::NonZeroUsize::MIN, 1_200); - tbuf.start_new_datagram_with_size(capacity); + let mut tbuf = crate::connection::TransmitBuf::new(buf, std::num::NonZeroUsize::MIN); + tbuf.start_first_datagram(capacity); let builder = &mut PacketBuilder::simple_data_buf(&mut tbuf); let stats = &mut FrameStats::default(); self.write_stream_frames(builder, fair, stats); diff --git a/quinn-proto/src/connection/transmit_buf.rs b/quinn-proto/src/connection/transmit_buf.rs index a62b969f6..18fba768e 100644 --- a/quinn-proto/src/connection/transmit_buf.rs +++ b/quinn-proto/src/connection/transmit_buf.rs @@ -35,138 +35,125 @@ pub(super) struct TransmitBuf<'a> { /// /// Note that when coalescing packets this might be before the start of the current /// packet. - datagram_start: usize, + datagram_start_offset: usize, /// The maximum offset allowed to be used for the current datagram in the buffer /// /// The first and last datagram in a batch are allowed to be smaller then the maximum /// size. All datagrams in between need to be exactly this size. - buf_capacity: usize, + datagram_max_offset: usize, /// The maximum number of datagrams allowed to write into [`TransmitBuf::buf`] max_datagrams: NonZeroUsize, /// The number of datagrams already (partially) written into the buffer /// /// Incremented by a call to [`TransmitBuf::start_new_datagram`]. pub(super) num_datagrams: usize, - /// The segment size of this GSO batch + /// The segment size of this GSO batch, set once the second datagram is started. /// /// The segment size is the size of each datagram in the GSO batch, only the last /// datagram in the batch may be smaller. /// - /// For the first datagram this is set to the maximum size a datagram is allowed to be: - /// the current path MTU. After the first datagram is finished this is reduced to the - /// size of the first datagram and can no longer change. - segment_size: usize, + /// Only set once there is more than one datagram. + segment_size: Option, } impl<'a> TransmitBuf<'a> { - pub(super) fn new(buf: &'a mut Vec, max_datagrams: NonZeroUsize, mtu: usize) -> Self { + pub(super) fn new(buf: &'a mut Vec, max_datagrams: NonZeroUsize) -> Self { buf.clear(); Self { buf, - datagram_start: 0, - buf_capacity: 0, + datagram_start_offset: 0, + datagram_max_offset: 0, max_datagrams, num_datagrams: 0, - segment_size: mtu, + segment_size: None, } } - /// Starts a datagram with a custom datagram size + /// Starts the first datagram in the GSO batch. /// - /// This is a specialized version of [`TransmitBuf::start_new_datagram`] which sets the - /// datagram size. Useful for e.g. PATH_CHALLENGE, tail-loss probes or MTU probes. - /// - /// After the first datagram you can never increase the segment size. If you decrease - /// the size of a datagram in a batch, it must be the last datagram of the batch. - pub(super) fn start_new_datagram_with_size(&mut self, datagram_size: usize) { - // Only reserve space for this datagram, usually it is the last one in the batch. - let max_capacity_hint = datagram_size; - self.new_datagram_inner(datagram_size, max_capacity_hint) + /// The size of the first datagram sets the segment size of the GSO batch. + pub(super) fn start_first_datagram(&mut self, max_size: usize) { + debug_assert_eq!(self.num_datagrams, 0, "No datagram can be stared yet"); + debug_assert!( + self.buf.is_empty(), + "Buffer must be empty for first datagram" + ); + self.datagram_max_offset = max_size; + self.num_datagrams = 1; + if self.datagram_max_offset > self.buf.capacity() { + // Reserve all remaining capacity right away. + let max_batch_capacity = max_size * self.max_datagrams.get(); + self.buf + .reserve_exact(max_batch_capacity.saturating_sub(self.buf.capacity())); + } } - /// Starts a new datagram in the transmit buffer - /// - /// If this starts the second datagram the segment size will be set to the size of the - /// first datagram. - /// - /// If the underlying buffer does not have enough capacity yet this will allocate enough - /// capacity for all the datagrams allowed in a single batch. Use - /// [`TransmitBuf::start_new_datagram_with_size`] if you know you will need less. - pub(super) fn start_new_datagram(&mut self) { - // We reserve the maximum space for sending `max_datagrams` upfront to avoid any - // reallocations if more datagrams have to be appended later on. Benchmarks have - // shown a 5-10% throughput improvement compared to continuously resizing the - // datagram buffer. While this will lead to over-allocation for small transmits - // (e.g. purely containing ACKs), modern memory allocators (e.g. mimalloc and - // jemalloc) will pool certain allocation sizes and therefore this is still rather - // efficient. - let max_capacity_hint = self.max_datagrams.get() * self.segment_size; - self.new_datagram_inner(self.segment_size, max_capacity_hint) - } - - fn new_datagram_inner(&mut self, datagram_size: usize, max_capacity_hint: usize) { - debug_assert!(self.num_datagrams < self.max_datagrams.into()); - if self.num_datagrams == 1 { - // Set the segment size to the size of the first datagram. - self.segment_size = self.buf.len(); - } - if self.num_datagrams >= 1 { - debug_assert!(datagram_size <= self.segment_size); - if datagram_size < self.segment_size { - // If this is a GSO batch and this datagram is smaller than the segment - // size, this must be the last datagram in the batch. - self.max_datagrams = NonZeroUsize::MIN.saturating_add(self.num_datagrams); - } - } - self.datagram_start = self.buf.len(); - debug_assert_eq!( - self.datagram_start % self.segment_size, - 0, - "datagrams in a GSO batch must be aligned to the segment size" + /// Starts a subsequent datagram in the GSO batch. + pub(super) fn start_datagram(&mut self) { + // Could be enforced with typestate, but that's probably also meh. + debug_assert!( + self.num_datagrams >= 1, + "Use start_first_datagram for first datagram" ); - self.buf_capacity = self.datagram_start + datagram_size; - if self.buf_capacity > self.buf.capacity() { - self.buf - .reserve_exact(max_capacity_hint.saturating_sub(self.buf.capacity())); + debug_assert!( + self.buf.len() <= self.datagram_max_offset, + "Datagram exceeded max offset" + ); + let segment_size = self + .segment_size + .get_or_insert_with(|| self.buf.len() - self.datagram_start_offset); + let segment_size = *segment_size; + if self.num_datagrams > 1 { + debug_assert_eq!( + self.buf.len(), + self.datagram_max_offset, + "Subsequent datagrams must be exactly the segment size" + ); } + self.num_datagrams += 1; + self.datagram_start_offset = self.buf.len(); + self.datagram_max_offset = self.buf.len() + segment_size; + if self.datagram_max_offset > self.buf.capacity() { + // Reserve all remaining capacity right away. + let max_batch_capacity = segment_size * self.max_datagrams.get(); + self.buf + .reserve_exact(max_batch_capacity.saturating_sub(self.buf.capacity())); + } } - /// Clips the datagram size to the current size + /// Mark the first datagram as completely written, setting its size. /// /// Only valid for the first datagram, when the datagram might be smaller than the - /// segment size. Needed before estimating the available space in the next datagram - /// based on [`TransmitBuf::segment_size`]. + /// maximum size it was allowed to be. Needed before estimating the available space in + /// the next datagram based on [`TransmitBuf::segment_size`]. /// /// Use [`TransmitBuf::start_new_datagram_with_size`] if you need to reduce the size of /// the last datagram in a batch. - pub(super) fn clip_datagram_size(&mut self) { + pub(super) fn end_first_datagram(&mut self) { debug_assert_eq!(self.num_datagrams, 1); - if self.buf.len() < self.segment_size { + if self.buf.len() < self.datagram_max_offset { trace!( - segment_size = self.buf.len(), - prev_segment_size = self.segment_size, + size = self.buf.len(), + max_size = self.datagram_max_offset, "clipped datagram size" ); } - self.segment_size = self.buf.len(); - self.buf_capacity = self.buf.len(); + self.datagram_max_offset = self.buf.len(); } - /// Returns the GSO segment size - /// - /// This is also the maximum size datagrams are allowed to be. The first and last - /// datagram in a batch are allowed to be smaller however. After the first datagram the - /// segment size is clipped to the size of the first datagram. - /// - /// If the last datagram was created using [`TransmitBuf::start_new_datagram_with_size`] - /// the the segment size will be greater than the current datagram is allowed to be. - /// Thus [`TransmitBuf::datagram_remaining_mut`] should be used if you need to know the - /// amount of data that can be written into the datagram. - pub(super) fn segment_size(&self) -> usize { + /// Returns the GSO segment size. + pub(super) fn segment_size(&self) -> Option { self.segment_size } + /// Returns the maximum size this datagram is allowed to be. + /// + /// Once a second datagram is started this is equivalent to the segment size. + pub(super) fn max_datagram_size(&self) -> usize { + self.datagram_max_offset - self.datagram_start_offset + } + /// Returns the number of datagrams written into the buffer /// /// The last datagram is not necessarily finished yet. @@ -183,7 +170,7 @@ impl<'a> TransmitBuf<'a> { /// /// In other words, this offset contains the first byte of the current datagram. pub(super) fn datagram_start_offset(&self) -> usize { - self.datagram_start + self.datagram_start_offset } /// Returns the maximum offset in the buffer allowed for the current datagram @@ -191,12 +178,12 @@ impl<'a> TransmitBuf<'a> { /// The first and last datagram in a batch are allowed to be smaller then the maximum /// size. All datagrams in between need to be exactly this size. pub(super) fn datagram_max_offset(&self) -> usize { - self.buf_capacity + self.datagram_max_offset } /// Returns the number of bytes that may still be written into this datagram pub(super) fn datagram_remaining_mut(&self) -> usize { - self.buf_capacity.saturating_sub(self.buf.len()) + self.datagram_max_offset.saturating_sub(self.buf.len()) } /// Returns `true` if the buffer did not have anything written into it @@ -213,18 +200,6 @@ impl<'a> TransmitBuf<'a> { pub(super) fn as_mut_slice(&mut self) -> &mut [u8] { self.buf.as_mut_slice() } - - /// Returns the underlying buffer and the GSO segment size, if any. - /// - /// Note that the GSO segment size is only defined if there is more than one segment. - pub(super) fn finish(self) -> (&'a mut Vec, Option) { - let gso_segment_size = if self.num_datagrams() > 1 { - Some(self.segment_size) - } else { - None - }; - (self.buf, gso_segment_size) - } } unsafe impl BufMut for TransmitBuf<'_> { diff --git a/quinn-proto/src/packet.rs b/quinn-proto/src/packet.rs index 8e05c86cc..1c53ecc16 100644 --- a/quinn-proto/src/packet.rs +++ b/quinn-proto/src/packet.rs @@ -951,12 +951,12 @@ impl SpaceId { /// Returns the next higher packet space. /// - /// Keeps returning [`SpaceId::Data`] as the highest space. - pub(crate) fn next(&self) -> Self { + /// Returns `None` if at [`SpaceId::Data`]. + pub(crate) fn next(&self) -> Option { match self { - Self::Initial => Self::Handshake, - Self::Handshake => Self::Data, - Self::Data => Self::Data, + Self::Initial => Some(Self::Handshake), + Self::Handshake => Some(Self::Data), + Self::Data => None, } } } diff --git a/quinn-proto/src/tests/mod.rs b/quinn-proto/src/tests/mod.rs index dd3b3fe11..909a9d28d 100644 --- a/quinn-proto/src/tests/mod.rs +++ b/quinn-proto/src/tests/mod.rs @@ -3485,6 +3485,7 @@ fn large_datagram_with_acks() { pair.drive_server(); } + info!("sending datagrams"); let max_size = pair.client_datagrams(client_ch).max_size().unwrap(); let msg = Bytes::from(vec![0; max_size]); pair.client_datagrams(client_ch) diff --git a/quinn-proto/src/tests/util.rs b/quinn-proto/src/tests/util.rs index d9eb56782..adff80efb 100644 --- a/quinn-proto/src/tests/util.rs +++ b/quinn-proto/src/tests/util.rs @@ -318,6 +318,7 @@ impl Pair { self.client_conn_mut(client_ch).poll(), Some(Event::HandshakeConfirmed) ); + info!("connected"); } pub(super) fn client_conn_mut(&mut self, ch: ConnectionHandle) -> &mut Connection {