diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index b57faf3f8..134d2c582 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -325,6 +325,57 @@ pub struct Connection { qlog: QlogSink, } +/// Return value for [`Connection::poll_transmit_path`]. +#[derive(Debug)] +enum PollPathStatus { + /// Nothing to send on the path, nothing was written into the [`TransmitBuf`]. + NothingToSend { + /// If true there was data to send but congestion control did not allow so. + congestion_blocked: bool, + }, + /// The transmit is ready to be sent. + Send(Transmit), +} + +/// Return value for [`Connection::poll_transmit_path_space`]. +#[derive(Debug)] +enum PollPathSpaceStatus { + /// Nothing to send in the space, nothing was written into the [`TransmitBuf`]. + NothingToSend { + /// If true there was data to send but congestion control did not allow so. + congestion_blocked: bool, + }, + /// One or more packets have been written into the [`TransmitBuf`]. + WrotePacket { + /// The highest packet number. + last_packet_number: u64, + /// Whether to pad an already started datagram in the next packet. + /// + /// When packets in Initial, 0-RTT or Handshake packet do not fill the entire + /// datagram they may decide to coalesce with the next packet from a higher + /// encryption level on the same path. But the earlier packet may require specific + /// size requirements for the datagram they are sent in. + /// + /// If a space did not complete the datagram, they use this to request the correct + /// padding in the final packet of the datagram so that the final datagram will have + /// the correct size. + /// + /// If a space did fill an entire datagram, it leaves this to the default of + /// [`PadDatagram::No`]. + pad_datagram: PadDatagram, + }, + /// Send the contents of the transmit immediately. + /// + /// Packets were written and the GSO batch must end now, regardless from whether higher + /// spaces still have frames to write. This is used when the last datagram written would + /// require too much padding to continue a GSO batch, which would waste space on the + /// wire. + Send { + /// The highest packet number written into the transmit. + last_packet_number: u64, + }, +} + impl Connection { pub(crate) fn new( endpoint_config: Arc, @@ -968,20 +1019,7 @@ impl Connection { // Each call to poll_transmit can only send datagrams to one destination, because // all datagrams in a GSO batch are for the same destination. Therefore only - // datagrams for one Path ID are produced for each poll_transmit call. - - // TODO(flub): this is wishful thinking and not actually implemented, but perhaps it - // should be: - - // First, if we have to send a close, select a path for that. - // Next, all paths that have a PATH_CHALLENGE or PATH_RESPONSE pending. - - // For all, open, validated and AVAILABLE paths: - // - Is the path congestion blocked or pacing blocked? - // - call maybe_queue_ to ensure a tail-loss probe would be sent? - // - do we need to send a close message? - // - call can_send - // Once there's nothing more to send on the AVAILABLE paths, do the same for BACKUP paths + // datagrams for one destination address are produced for each poll_transmit call. // Check whether we need to send a close message let close = match self.state.as_type() { @@ -1016,92 +1054,317 @@ impl Connection { && self.peer_supports_ack_frequency(); } - // Whether this packet can be coalesced with another one in the same datagram. - let mut coalesce = true; + // TODO(flub): path scheduling logic might be buggy if there are only un-validated + // paths and PATH_STATUS_BACKUP paths. + + // Path scheduling logic is currently as such: + // + // - For any un-validated paths we only send frames that *must* be sent on that + // path. E.g. PATH_CHALLENGE, PATH_RESPONSE. + // + // - If there are any validated paths with CIDs and PathStatus::Available: + // - Frames that can be sent on any path, e.g. STREAM, DATAGRAM, are only sent on + // these available paths. + // - All other paths only send frames that *must* be sent on those paths, + // e.g. PATH_CHALLENGE, PATH_RESPONSE, tail-loss probes, keep alive PING. + // + // - If there are no validated paths with CIDs and PathStatus::Available all frames + // are sent on the earlierst possible path. + // + // For all this we use the *path_exclusive_only* boolean: If set to true, only + // frames that must be sent on the path will be built into the packet. + + // Is there any open, validated and status available path with dst CIDs? If so we'll + // want to set path_exclusive_only for any other paths. + let have_available_path = self.paths.iter().any(|(id, path)| { + path.data.validated + && path.data.local_status() == PathStatus::Available + && self.remote_cids.contains_key(id) + }); - // Whether the last packet in the datagram must be padded so the datagram takes up - // to at least MIN_INITIAL_SIZE, or to the maximum segment size if this is smaller. - let mut pad_datagram = PadDatagram::No; + // TODO: how to avoid the allocation? Cannot use a for loop because of + // borrowing. Maybe SmallVec or similar. + let path_ids: Vec<_> = self.paths.keys().copied().collect(); - // Whether congestion control stopped the next packet from being sent. Further - // packets could still be built, as e.g. tail-loss probes are not congestion - // limited. + // If we end up not sending anything, we need to know if that was because there was + // nothing to send or because we were congestion blocked. let mut congestion_blocked = false; - // The packet number of the last built packet. - let mut last_packet_number = None; + for &path_id in &path_ids { + if let Some(transmit) = self.poll_transmit_off_path(now, buf, path_id) { + return Some(transmit); + } + + // Poll for on-path transmits. + match self.poll_transmit_on_path( + now, + buf, + path_id, + max_datagrams, + have_available_path, + close, + ) { + PollPathStatus::Send(transmit) => { + return Some(transmit); + } + PollPathStatus::NothingToSend { + congestion_blocked: cb, + } => { + congestion_blocked |= cb; + // Continue checking other paths, tail-loss probes may need to be sent + // in all spaces. + debug_assert!( + buf.is_empty(), + "nothing to send on path but buffer not empty" + ); + } + } + } - let mut path_id = *self.paths.first_key_value().expect("one path must exist").0; + // We didn't produce any application data packet + debug_assert!( + buf.is_empty(), + "there was data in the buffer, but it was not sent" + ); - // If there is any open, validated and available path we only want to send frames to - // any backup path that must be sent on that backup path exclusively. - let have_available_path = self.paths.iter().any(|(id, path)| { - path.data.validated - && path.data.local_status() == PathStatus::Available - && self.remote_cids.contains_key(id) - }); + self.app_limited = !congestion_blocked; - if let Some(transmit) = self.poll_transmit_off_path(now, buf, path_id) { - return Some(transmit); + if self.state.is_established() { + // Try MTU probing now + for path_id in path_ids { + if let Some(transmit) = self.poll_transmit_mtu_probe(now, buf, path_id) { + return Some(transmit); + } + } } - let mut space_id = match path_id { - PathId::ZERO => SpaceId::Initial, - _ => SpaceId::Data, - }; + None + } - // Setup for the first path_id - let mut transmit = TransmitBuf::new( - buf, - max_datagrams, - self.path_data(path_id).current_mtu().into(), + fn build_transmit(&mut self, path_id: PathId, transmit: TransmitBuf<'_>) -> Transmit { + debug_assert!( + !transmit.is_empty(), + "must not be called with an empty transmit buffer" ); - loop { - // check if there is at least one active CID to use for sending - let Some(remote_cid) = self.remote_cids.get(&path_id).map(CidQueue::active) else { - if !self.abandoned_paths.contains(&path_id) { - debug!(%path_id, "no remote CIDs for path"); - } + let network_path = self.path_data(path_id).network_path; + trace!( + segment_size = transmit.segment_size(), + last_datagram_len = transmit.len() % transmit.segment_size(), + %network_path, + "sending {} bytes in {} datagrams", + transmit.len(), + transmit.num_datagrams() + ); + self.path_data_mut(path_id) + .inc_total_sent(transmit.len() as u64); - let Some(next_path_id) = self.paths.keys().find(|&&next| next > path_id) else { - // Nothing more to send. - trace!(?space_id, %path_id, "no CIDs to send on path, no more paths"); - break; - }; - // See if this next path can send anything. - path_id = *next_path_id; - space_id = SpaceId::Data; - - transmit = { - let (buf, _) = transmit.finish(); - if let Some(challenge) = self.send_prev_path_challenge(now, buf, path_id) { - return Some(challenge); - } else if let Some(response) = - self.send_off_path_path_response(now, buf, path_id) - { - return Some(response); - } - { - let pmtu = self.path_data(path_id).current_mtu().into(); - TransmitBuf::new(buf, max_datagrams, pmtu) - } - }; + self.stats + .udp_tx + .on_sent(transmit.num_datagrams() as u64, transmit.len()); + self.path_stats + .entry(path_id) + .or_default() + .udp_tx + .on_sent(transmit.num_datagrams() as u64, transmit.len()); - continue; + Transmit { + destination: network_path.remote, + size: transmit.len(), + ecn: if self.path_data(path_id).sending_ecn { + Some(EcnCodepoint::Ect0) + } else { + None + }, + segment_size: match transmit.num_datagrams() { + 1 => None, + _ => Some(transmit.segment_size()), + }, + src_ip: network_path.local_ip, + } + } + + /// poll_transmit logic for off-path data. + fn poll_transmit_off_path( + &mut self, + now: Instant, + buf: &mut Vec, + path_id: PathId, + ) -> Option { + if let Some(challenge) = self.send_prev_path_challenge(now, buf, path_id) { + return Some(challenge); + } + if let Some(response) = self.send_off_path_path_response(now, buf, path_id) { + return Some(response); + } + None + } + + /// poll_transmit logic for on-path data. + /// + /// This is not quite the same as for a multipath packet space, since [`PathId::ZERO`] + /// has 3 packet spaces, which this handles. + /// + /// See [`Self::poll_transmit_off_path`] for off-path data. + #[must_use] + fn poll_transmit_on_path( + &mut self, + now: Instant, + buf: &mut Vec, + path_id: PathId, + max_datagrams: NonZeroUsize, + have_available_path: bool, + close: bool, + ) -> PollPathStatus { + // Check if there is at least one active CID to use for sending + let Some(remote_cid) = self.remote_cids.get(&path_id).map(CidQueue::active) else { + if !self.abandoned_paths.contains(&path_id) { + debug!(%path_id, "no remote CIDs for path"); + } + return PollPathStatus::NothingToSend { + congestion_blocked: false, }; + }; + + // Whether the last packet in the datagram must be padded so the datagram takes up + // an exact size. An earlier space can decide to not fill an entire datagram and + // require the next space to fill it further. But may need a specific size of the + // datagram containing the packet. The final packet built in the datagram must pad + // to this size. + let mut pad_datagram = PadDatagram::No; - // Determine if anything can be sent in this packet number space (SpaceId + - // PathId). + // The packet number of the last built packet. This is kept kept across spaces. + // QUIC is supposed to have a single congestion controller for the Initial, + // Handshake and Data(PathId::ZERO) spaces. + let mut last_packet_number = None; + + // If we end up not sending anything, we need to know if that was because there was + // nothing to send or because we were congestion blocked. + let mut congestion_blocked = false; + + // Set the segment size to this path's MTU for on-path data. + let pmtu = self.path_data(path_id).current_mtu().into(); + let mut transmit = TransmitBuf::new(buf, max_datagrams, pmtu); + + // Iterate over the available spaces. + for space_id in SpaceId::iter() { + // Only PathId::ZERO uses non Data space ids. + if path_id != PathId::ZERO && space_id != SpaceId::Data { + continue; + } + match self.poll_transmit_path_space( + now, + &mut transmit, + path_id, + space_id, + remote_cid, + have_available_path, + close, + pad_datagram, + ) { + PollPathSpaceStatus::NothingToSend { + congestion_blocked: cb, + } => { + congestion_blocked |= cb; + // Continue checking other spaces, tail-loss probes may need to be sent + // in all spaces. + } + PollPathSpaceStatus::WrotePacket { + last_packet_number: pn, + pad_datagram: pad, + } => { + debug_assert!(!transmit.is_empty(), "transmit must contain packets"); + last_packet_number = Some(pn); + pad_datagram = pad; + // Always check higher spaces. If the transmit is full or they have + // nothing to send they will not write packets. But if they can, they + // must always be allowed to add to this transmit because coalescing may + // be required. + continue; + } + PollPathSpaceStatus::Send { + last_packet_number: pn, + } => { + debug_assert!(!transmit.is_empty(), "transmit must contain packets"); + last_packet_number = Some(pn); + break; + } + } + } + + if last_packet_number.is_some() || congestion_blocked { + self.qlog.emit_recovery_metrics( + path_id, + &mut self.paths.get_mut(&path_id).unwrap().data, + now, + ); + } + + match last_packet_number { + Some(last_packet_number) => { + // Note that when sending in multiple spaces the last packet number will be + // the one from the highest space. + self.path_data_mut(path_id).congestion.on_sent( + now, + transmit.len() as u64, + last_packet_number, + ); + PollPathStatus::Send(self.build_transmit(path_id, transmit)) + } + None => PollPathStatus::NothingToSend { congestion_blocked }, + } + } + + /// poll_transmit logic for a QUIC-MULTIPATH packet number space (PathID + SpaceId). + #[must_use] + fn poll_transmit_path_space( + &mut self, + now: Instant, + transmit: &mut TransmitBuf<'_>, + path_id: PathId, + space_id: SpaceId, + remote_cid: ConnectionId, + // If any other packet space has a usable path with PathStatus::Available. + have_available_path: bool, + // If we need to send a CONNECTION_CLOSE frame. + connection_close_pending: bool, + // Whether the current datagram needs to be padded to a certain size. + mut pad_datagram: PadDatagram, + ) -> PollPathSpaceStatus { + // Keep track of the last packet number we wrote. If None we did not write any + // packets. + let mut last_packet_number = None; + + // Each loop of this may build one packet. It works logically as follows: + // + // - Check if something *needs* to be sent in this space and *can* be sent. + // - If not, return to the caller who will call us again for the next space. + // - Start a new datagram. + // - Unless coalescing the packet into an existing datagram. + // - Write the packet header and payload. + // - Check if coalescing a next packet into the datagram is possible. + // - If coalescing, finish packet without padding to leave space in the datagram. + // - If not coalescing, complete the datagram: + // - Finish packet with padding. + // - Set the transmit segment size if this is the first datagram. + // - Loop: next iteration will exit the loop if nothing more to send in this + // space. The TransmitBuf will contain a started datagram with space if + // coalescing, or completely filled datagram if not coalescing. + loop { + // Determine if anything can be sent in this packet number space (SpaceId + PathId). let max_packet_size = if transmit.datagram_remaining_mut() > 0 { - // We are trying to coalesce another packet into this datagram. + // A datagram is started already, we are coalescing another packet into it. transmit.datagram_remaining_mut() } else { // A new datagram needs to be started. transmit.segment_size() }; - let can_send = self.space_can_send(space_id, path_id, max_packet_size, close); - let path_should_send = { + let can_send = + self.space_can_send(space_id, path_id, max_packet_size, connection_close_pending); + + // Whether we would like to send any frames on this packet space. See the packet + // scheduling described in poll_transmit. + let space_should_send = { let path_exclusive_only = space_id == SpaceId::Data && have_available_path && self.path_data(path_id).local_status() == PathStatus::Backup; @@ -1114,75 +1377,67 @@ impl Connection { path_should_send || needs_loss_probe }; - if !path_should_send && space_id < SpaceId::Data { - if self.spaces[space_id].crypto.is_some() { - trace!(?space_id, %path_id, "nothing to send in space"); - } - space_id = space_id.next(); - continue; - } - - let send_blocked = if path_should_send && transmit.datagram_remaining_mut() == 0 { - // Only check congestion control if a new datagram is needed. - self.path_congestion_check(space_id, path_id, &transmit, &can_send, now) - } else { - PathBlocked::No - }; - if send_blocked != PathBlocked::No { - trace!(?space_id, %path_id, ?send_blocked, "congestion blocked"); - congestion_blocked = true; - } - if send_blocked != PathBlocked::No && space_id < SpaceId::Data { - // Higher spaces might still have tail-loss probes to send, which are not - // congestion blocked. - space_id = space_id.next(); - continue; - } - if !path_should_send || send_blocked != PathBlocked::No { - // Nothing more to send on this path, check the next path if possible. - - // If there are any datagrams in the transmit, packets for another path can - // not be built. - if transmit.num_datagrams() > 0 { - break; - } - - let Some(next_path_id) = self.paths.keys().find(|&&next| next > path_id) else { - // Nothing more to send and no more paths to check - trace!(?space_id, %path_id, next_path_id=?None::, "nothing to send on path"); - break; - }; - - // See if this next path can send anything. - trace!(?space_id, %path_id, %next_path_id, "nothing to send on path"); - path_id = *next_path_id; - space_id = SpaceId::Data; - - transmit = { - let (buf, _) = transmit.finish(); - // Check off path data for the new path_id - if let Some(challenge) = self.send_prev_path_challenge(now, buf, path_id) { - return Some(challenge); - } else if let Some(response) = - self.send_off_path_path_response(now, buf, path_id) - { - return Some(response); - } else { - // Continue with on path data - // New path_id => fresh transmit - let pmtu = self.path_data(path_id).current_mtu().into(); - TransmitBuf::new(buf, max_datagrams, pmtu) + if !space_should_send { + // Nothing more to send. Previous iterations of this loop may have built + // packets already. + return match last_packet_number { + Some(pn) => PollPathSpaceStatus::WrotePacket { + last_packet_number: pn, + pad_datagram, + }, + None => { + // Only log for spaces which have crypto. + if self.spaces[space_id].crypto.is_some() + || (space_id == SpaceId::Data && self.zero_rtt_crypto.is_some()) + { + trace!(?space_id, %path_id, "nothing to send in space"); + } + PollPathSpaceStatus::NothingToSend { + congestion_blocked: false, + } } }; + } - continue; + // We want to send on this space, check congestion control if we can. But only + // if we will need to start a new datagram. If we are coalescing into an already + // started datagram we do not need to check congestion control again. + if transmit.datagram_remaining_mut() == 0 { + let congestion_blocked = + self.path_congestion_check(space_id, path_id, transmit, &can_send, now); + if congestion_blocked != PathBlocked::No { + // Previous iterations of this loop may have built packets already. + return match last_packet_number { + Some(pn) => PollPathSpaceStatus::WrotePacket { + last_packet_number: pn, + pad_datagram, + }, + None => { + return PollPathSpaceStatus::NothingToSend { + congestion_blocked: true, + }; + } + }; + } } - // If the datagram is full, we need to start a new one. + // If the datagram is full (or there never was one started), we need to start a + // new one. if transmit.datagram_remaining_mut() == 0 { if transmit.num_datagrams() >= transmit.max_datagrams().get() { - // No more datagrams allowed - break; + // No more datagrams allowed. + // Previous iterations of this loop may have built packets already. + return match last_packet_number { + Some(pn) => PollPathSpaceStatus::WrotePacket { + last_packet_number: pn, + pad_datagram, + }, + None => { + return PollPathSpaceStatus::NothingToSend { + congestion_blocked: false, + }; + } + }; } match self.spaces[space_id].for_path(path_id).loss_probes { @@ -1209,7 +1464,8 @@ impl Connection { } } trace!(count = transmit.num_datagrams(), "new datagram started"); - coalesce = true; + + // We started a new datagram, we decide later if it needs padding. pad_datagram = PadDatagram::No; } @@ -1235,17 +1491,26 @@ impl Connection { prev.update_unacked = false; } - let mut builder = PacketBuilder::new( + let Some(mut builder) = PacketBuilder::new( now, space_id, path_id, remote_cid, - &mut transmit, + transmit, can_send.other, self, - )?; + ) else { + // Confidentiality limit is exceeded and the connection has been killed. We + // should not send any other packets. This works in a roundabout way: We + // have started a datagram but not written anything into it. So even if we + // get called again for another space we will see an already started + // datagram and try and start another packet here. Then be stopped by the + // same confidentiality limit. + return PollPathSpaceStatus::NothingToSend { + congestion_blocked: false, + }; + }; last_packet_number = Some(builder.packet_number); - coalesce = coalesce && builder.can_coalesce; if space_id == SpaceId::Initial && (self.side.is_client() || can_send.other) { // https://www.rfc-editor.org/rfc/rfc9000.html#section-14.1 @@ -1284,6 +1549,10 @@ impl Connection { // Since there only 64 ACK frames there will always be enough space // to encode the ConnectionClose frame too. However we still have the // check here to prevent crashes if something changes. + + // TODO(flub): This needs fixing for multipath, to ensure we can always + // write the CONNECTION_CLOSE even if we have many PATH_ACKs to send: + // https://github.com/n0-computer/quinn/issues/367. debug_assert!( builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND, "ACKs should leave space for ConnectionClose" @@ -1308,24 +1577,36 @@ impl Connection { }; builder.write_frame(close.encoder(max_frame_size), stats); } + let last_pn = builder.packet_number; builder.finish_and_track(now, self, path_id, pad_datagram); if space_id == self.highest_space { // Don't send another close packet. Even with multipath we only send // CONNECTION_CLOSE on a single path since we expect our paths to work. self.connection_close_pending = false; - // `CONNECTION_CLOSE` is the final packet - break; - } else { - // Send a close frame in every possible space for robustness, per - // RFC9000 "Immediate Close during the Handshake". Don't bother trying - // to send anything else. - space_id = space_id.next(); - continue; } + // Send a close frame in every possible space for robustness, per + // RFC9000 "Immediate Close during the Handshake". Don't bother trying + // to send anything else. + // TODO(flub): This breaks during the handshake if we can not coalesce + // packets due to space reasons: the next space would either fail a + // debug_assert checking for enough packet space or produce an invalid + // packet. We need to keep track of per-space pending CONNECTION_CLOSE to + // be able to send these across multiple calls to poll_transmit. Then + // check for coalescing space here because initial packets need to be in + // padded datagrams. And also add space checks for CONNECTION_CLOSE in + // space_can_send so it would stop a GSO batch if the datagram is too + // small for another CONNECTION_CLOSE packet. + return PollPathSpaceStatus::WrotePacket { + last_packet_number: last_pn, + pad_datagram, + }; } + // If this boolean is true we only want to send frames which can not be sent on + // any other path. See the path scheduling notes in Self::poll_transmit. let path_exclusive_only = have_available_path && self.path_data(path_id).local_status() == PathStatus::Backup; + self.populate_packet(now, space_id, path_id, path_exclusive_only, &mut builder); // ACK-only packets should only be sent when explicitly allowed. If we write them due to @@ -1364,23 +1645,25 @@ impl Connection { // might be needed because of the packet type, or to fill the GSO segment size. // Are we allowed to coalesce AND is there enough space for another *packet* in - // this datagram AND is there another packet to send in this or the next space? - if coalesce - && builder + // this datagram AND will we definitely send another packet? + if builder.can_coalesce && path_id == PathId::ZERO && { + let max_packet_size = builder .buf .datagram_remaining_mut() - .saturating_sub(builder.predict_packet_end()) - > MIN_PACKET_SPACE - && self - .next_send_space(space_id, path_id, builder.buf, close) - .is_some() - { + .saturating_sub(builder.predict_packet_end()); + max_packet_size > MIN_PACKET_SPACE + && self.has_pending_packet(space_id, max_packet_size, connection_close_pending) + } { // We can append/coalesce the next packet into the current // datagram. Finish the current packet without adding extra padding. + trace!("will coalesce with next packet"); builder.finish_and_track(now, self, path_id, PadDatagram::No); } else { // We need a new datagram for the next packet. Finish the current // packet with padding. + // TODO(flub): if there isn't any more data to be sent, this will still pad + // to the segment size and only discover there is nothing to send before + // starting the next packet. That is wasting up to 32 bytes. if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) { // If too many padding bytes would be required to continue the // GSO batch after this packet, end the GSO batch here. Ensures @@ -1397,8 +1680,11 @@ impl Connection { "GSO truncated by demand for {} padding bytes", builder.buf.datagram_remaining_mut() - builder.predict_packet_end() ); + let last_pn = builder.packet_number; builder.finish_and_track(now, self, path_id, PadDatagram::No); - break; + return PollPathSpaceStatus::Send { + last_packet_number: last_pn, + }; } // Pad the current datagram to GSO segment size so it can be @@ -1415,192 +1701,115 @@ impl Connection { } } } + } - if let Some(last_packet_number) = last_packet_number { - // Note that when sending in multiple packet spaces the last packet number will - // be the one from the highest packet space. - self.path_data_mut(path_id).congestion.on_sent( - now, - transmit.len() as u64, - last_packet_number, - ); - } - - self.qlog.emit_recovery_metrics( - path_id, - &mut self.paths.get_mut(&path_id).unwrap().data, - now, - ); - - self.app_limited = transmit.is_empty() && !congestion_blocked; - - // Send MTU probe if necessary - if transmit.is_empty() && self.state.is_established() { - // MTU probing happens only in Data space. - let space_id = SpaceId::Data; - path_id = *self.paths.first_key_value().expect("one path must exist").0; - let probe_data = loop { - // We MTU probe all paths for which all of the following is true: - // - We have an active destination CID for the path. - // - The remote address *and* path are validated. - // - The path is not abandoned. - // - The MTU Discovery subsystem wants to probe the path. - let active_cid = self.remote_cids.get(&path_id).map(CidQueue::active); - let eligible = self.path_data(path_id).validated - && !self.path_data(path_id).is_validating_path() - && !self.abandoned_paths.contains(&path_id); - let probe_size = eligible - .then(|| { - let next_pn = self.spaces[space_id].for_path(path_id).peek_tx_number(); - self.path_data_mut(path_id).mtud.poll_transmit(now, next_pn) - }) - .flatten(); - match (active_cid, probe_size) { - (Some(active_cid), Some(probe_size)) => { - // Let's send an MTUD probe! - break Some((active_cid, probe_size)); - } - _ => { - // Find the next path to check if it needs an MTUD probe. - match self.paths.keys().find(|&&next| next > path_id) { - Some(next) => { - path_id = *next; - continue; - } - None => break None, - } - } - } - }; - if let Some((active_cid, probe_size)) = probe_data { - // We are definitely sending a DPLPMTUD probe. - debug_assert_eq!(transmit.num_datagrams(), 0); - transmit.start_new_datagram_with_size(probe_size as usize); - - let mut builder = PacketBuilder::new( - now, - space_id, - path_id, - active_cid, - &mut transmit, - true, - self, - )?; - - // We implement MTU probes as ping packets padded up to the probe size - trace!(?probe_size, "writing MTUD probe"); - builder.write_frame(frame::Ping, &mut self.stats.frame_tx); + fn poll_transmit_mtu_probe( + &mut self, + now: Instant, + buf: &mut Vec, + path_id: PathId, + ) -> Option { + let (active_cid, probe_size) = self.get_mtu_probe_data(now, path_id)?; - // If supported by the peer, we want no delays to the probe's ACK - if self.peer_supports_ack_frequency() { - builder.write_frame(frame::ImmediateAck, &mut self.stats.frame_tx); - } + // We are definitely sending a DPLPMTUD probe. + let mut transmit = TransmitBuf::new(buf, NonZeroUsize::MIN, probe_size as usize); + transmit.start_new_datagram_with_size(probe_size as usize); - builder.finish_and_track(now, self, path_id, PadDatagram::ToSize(probe_size)); + let mut builder = PacketBuilder::new( + now, + SpaceId::Data, + path_id, + active_cid, + &mut transmit, + true, + self, + )?; - self.path_stats - .entry(path_id) - .or_default() - .sent_plpmtud_probes += 1; - } - } + // We implement MTU probes as ping packets padded up to the probe size + trace!(?probe_size, "writing MTUD probe"); + builder.write_frame(frame::Ping, &mut self.stats.frame_tx); - if transmit.is_empty() { - return None; + // If supported by the peer, we want no delays to the probe's ACK + if self.peer_supports_ack_frequency() { + builder.write_frame(frame::ImmediateAck, &mut self.stats.frame_tx); } - Some(self.build_transmit(path_id, transmit)) - } - - fn build_transmit(&mut self, path_id: PathId, transmit: TransmitBuf<'_>) -> Transmit { - debug_assert!( - !transmit.is_empty(), - "must not be called with an empty transmit buffer" - ); + builder.finish_and_track(now, self, path_id, PadDatagram::ToSize(probe_size)); - let network_path = self.path_data(path_id).network_path; - trace!( - segment_size = transmit.segment_size(), - last_datagram_len = transmit.len() % transmit.segment_size(), - %network_path, - "sending {} bytes in {} datagrams", - transmit.len(), - transmit.num_datagrams() - ); - self.path_data_mut(path_id) - .inc_total_sent(transmit.len() as u64); - - self.stats - .udp_tx - .on_sent(transmit.num_datagrams() as u64, transmit.len()); self.path_stats .entry(path_id) .or_default() - .udp_tx - .on_sent(transmit.num_datagrams() as u64, transmit.len()); + .sent_plpmtud_probes += 1; - Transmit { - destination: network_path.remote, - size: transmit.len(), - ecn: if self.path_data(path_id).sending_ecn { - Some(EcnCodepoint::Ect0) - } else { - None - }, - segment_size: match transmit.num_datagrams() { - 1 => None, - _ => Some(transmit.segment_size()), - }, - src_ip: network_path.local_ip, - } + Some(self.build_transmit(path_id, transmit)) } - /// poll_transmit logic for off-path data. - fn poll_transmit_off_path( - &mut self, - now: Instant, - buf: &mut Vec, - path_id: PathId, - ) -> Option { - if let Some(challenge) = self.send_prev_path_challenge(now, buf, path_id) { - return Some(challenge); - } - if let Some(response) = self.send_off_path_path_response(now, buf, path_id) { - return Some(response); + /// Returns the CID and probe size if a DPLPMTUD probe is needed. + /// + /// We MTU probe all paths for which all of the following is true: + /// - We have an active destination CID for the path. + /// - The remote address *and* path are validated. + /// - The path is not abandoned. + /// - The MTU Discovery subsystem wants to probe the path. + fn get_mtu_probe_data(&mut self, now: Instant, path_id: PathId) -> Option<(ConnectionId, u16)> { + let active_cid = self.remote_cids.get(&path_id).map(CidQueue::active)?; + let is_eligible = self.path_data(path_id).validated + && !self.path_data(path_id).is_validating_path() + && !self.abandoned_paths.contains(&path_id); + + if !is_eligible { + return None; } - None + let next_pn = self.spaces[SpaceId::Data] + .for_path(path_id) + .peek_tx_number(); + let probe_size = self + .path_data_mut(path_id) + .mtud + .poll_transmit(now, next_pn)?; + + Some((active_cid, probe_size)) } - /// Returns the [`SpaceId`] of the next packet space which has data to send + /// Returns true if there is a further packet to send on [`PathId::ZERO`]. + /// + /// In other words this is predicting whether the next call to + /// [`Connection::space_can_send`] issued will return some frames to be sent. Including + /// having to predict which packet number space it will be invoked with. This depends on + /// how both [`Connection::poll_transmit_path`] and + /// [`Connection::poll_transmit_path_space`] behave. + /// + /// This is needed to determine if packet coalescing can happen. Because the last packet + /// in a datagram may need to be padded and thus we must know if another packet will + /// follow or not. /// - /// This takes into account the space available to frames in the next datagram. - // TODO(flub): This duplication is not nice. - fn next_send_space( + /// The next packet can be either in the same space, or in one of the following spaces + /// on the same path. Because a 0-RTT packet can be coalesced with a 1-RTT packet and + /// both are in the Data(PathId::ZERO) space. Previous spaces are not checked, because + /// packets are built from Initial to Handshake to Data spaces. + fn has_pending_packet( &mut self, current_space_id: SpaceId, - path_id: PathId, - buf: &TransmitBuf<'_>, - close: bool, - ) -> Option { - // Number of bytes available for frames if this is a 1-RTT packet. We're guaranteed - // to be able to send an individual frame at least this large in the next 1-RTT - // packet. This could be generalized to support every space, but it's only needed to - // handle large fixed-size frames, which only exist in 1-RTT (application - // datagrams). We don't account for coalesced packets potentially occupying space - // because frames can always spill into the next datagram. + max_packet_size: usize, + connection_close_pending: bool, + ) -> bool { let mut space_id = current_space_id; loop { - let can_send = self.space_can_send(space_id, path_id, buf.segment_size(), close); - if !can_send.is_empty() || (close && self.spaces[space_id].crypto.is_some()) { - return Some(space_id); + let can_send = self.space_can_send( + space_id, + PathId::ZERO, + max_packet_size, + connection_close_pending, + ); + if !can_send.is_empty() { + return true; } - space_id = match space_id { - SpaceId::Initial => SpaceId::Handshake, - SpaceId::Handshake => SpaceId::Data, - SpaceId::Data => break, + match space_id.next() { + Some(next_space_id) => space_id = next_space_id, + None => break, } } - None + false } /// Checks if creating a new datagram would be blocked by congestion control @@ -1790,9 +1999,14 @@ impl Connection { let mut can_send = space.can_send(path_id, &self.streams); + // Check for 1RTT space. if space_id == SpaceId::Data { - // Check for 1RTT space. let pn = space.for_path(path_id).peek_tx_number(); + // Number of bytes available for frames if this is a 1-RTT packet. We're + // guaranteed to be able to send an individual frame at least this large in the + // next 1-RTT packet. This could be generalized to support every space, but it's + // only needed to handle large fixed-size frames, which only exist in 1-RTT + // (application datagrams). let frame_space_1rtt = packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id)); can_send |= self.can_send_1rtt(path_id, frame_space_1rtt); diff --git a/quinn-proto/src/connection/transmit_buf.rs b/quinn-proto/src/connection/transmit_buf.rs index 9908fe3a5..0e63cfdbc 100644 --- a/quinn-proto/src/connection/transmit_buf.rs +++ b/quinn-proto/src/connection/transmit_buf.rs @@ -213,18 +213,6 @@ impl<'a> TransmitBuf<'a> { pub(super) fn as_mut_slice(&mut self) -> &mut [u8] { self.buf.as_mut_slice() } - - /// Returns the underlying buffer and the GSO segment size, if any. - /// - /// Note that the GSO segment size is only defined if there is more than one segment. - pub(super) fn finish(self) -> (&'a mut Vec, Option) { - let gso_segment_size = if self.num_datagrams() > 1 { - Some(self.segment_size) - } else { - None - }; - (self.buf, gso_segment_size) - } } unsafe impl BufMut for TransmitBuf<'_> { diff --git a/quinn-proto/src/packet.rs b/quinn-proto/src/packet.rs index 8e05c86cc..1c53ecc16 100644 --- a/quinn-proto/src/packet.rs +++ b/quinn-proto/src/packet.rs @@ -951,12 +951,12 @@ impl SpaceId { /// Returns the next higher packet space. /// - /// Keeps returning [`SpaceId::Data`] as the highest space. - pub(crate) fn next(&self) -> Self { + /// Returns `None` if at [`SpaceId::Data`]. + pub(crate) fn next(&self) -> Option { match self { - Self::Initial => Self::Handshake, - Self::Handshake => Self::Data, - Self::Data => Self::Data, + Self::Initial => Some(Self::Handshake), + Self::Handshake => Some(Self::Data), + Self::Data => None, } } }