Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions quinn-proto/src/cid_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ pub(crate) struct CidQueue {
///
/// When [`Self::cursor_reserved`] and [`Self::cursor`] are equal, no CID is considered
/// reserved.
///
/// The reserved CIDs section of the buffer, if non-empty will always be ahead of the active
/// CID.
cursor_reserved: usize,
}

Expand Down Expand Up @@ -132,6 +135,14 @@ impl CidQueue {
Some(cid_data.0)
}

/// Returns the number of unused CIDs (neither active nor reserved).
pub(crate) fn remaining(&self) -> usize {
self.iter_from_reserved()
.count()
.checked_sub(1)
.expect("iterator is non empty")
}

/// Iterate CIDs in CidQueue that are not `None`, including the active CID
fn iter_from_active(&self) -> impl Iterator<Item = (usize, CidData)> + '_ {
(0..Self::LEN).filter_map(move |step| {
Expand All @@ -140,9 +151,13 @@ impl CidQueue {
})
}

/// Iterate CIDs in CidQueue that are not `None`, including the active CID.
/// Iterate CIDs in CidQueue that are not `None`, from [`Self::cursor_reserved`].
///
/// The iterator will always have at least one item, as it will include the active CID when no
/// CID has been reserved, or the last reserved CID otherwise.
///
/// Along with the CID, it returns the offset counted from [`Self::cursor_reserved`] where the CID is stored.
/// Along with the CID, it returns the offset counted from [`Self::cursor_reserved`] where the
/// CID is stored.
fn iter_from_reserved(&self) -> impl Iterator<Item = (usize, CidData)> + '_ {
(0..(Self::LEN - self.reserved_len())).filter_map(move |step| {
let index = (self.cursor_reserved + step) % Self::LEN;
Expand Down
97 changes: 76 additions & 21 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
sync::Arc,
};

use bytes::{BufMut, Bytes, BytesMut};
use bytes::{Bytes, BytesMut};
use frame::StreamMetaVec;

use rand::{Rng, SeedableRng, rngs::StdRng};
Expand Down Expand Up @@ -992,26 +992,6 @@ impl Connection {
max_datagrams: NonZeroUsize,
buf: &mut Vec<u8>,
) -> Option<Transmit> {
if let Some(probing) = self
.iroh_hp
.server_side_mut()
.ok()
.and_then(iroh_hp::ServerState::next_probe)
{
let destination = probing.remote();
trace!(%destination, "RAND_DATA packet");
let token: u64 = self.rng.random();
buf.put_u64(token);
probing.finish(token);
return Some(Transmit {
destination,
ecn: None,
size: 8,
segment_size: None,
src_ip: None,
});
}

let max_datagrams = match self.config.enable_segmentation_offload {
false => NonZeroUsize::MIN,
true => max_datagrams,
Expand Down Expand Up @@ -1199,6 +1179,9 @@ impl Connection {
if let Some(response) = self.send_off_path_path_response(now, buf, path_id) {
return Some(response);
}
if let Some(challenge) = self.send_nat_traversal_path_challenge(now, buf, path_id) {
return Some(challenge);
}
None
}

Expand Down Expand Up @@ -1975,6 +1958,78 @@ impl Connection {
})
}

/// Send a nat traversal challenge (off-path) on this path if possible.
///
/// This will ensure the path still has a remaining CID to use if the active one should be
/// retired.
fn send_nat_traversal_path_challenge(
&mut self,
now: Instant,
buf: &mut Vec<u8>,
path_id: PathId,
) -> Option<Transmit> {
let server_side = self.iroh_hp.server_side_mut().ok()?;
let probe = server_side.next_probe()?;
if !self.paths.get(&path_id)?.data.validated {
// Path is not usable for probing
return None;
}

let remote_cids = self.remote_cids.get_mut(&path_id)?;

// Check if this path has enough CIDs to send a probe. One to be reserved, one in case the
// active CID needs to be retired.
if remote_cids.remaining() < 2 {
return None;
}

let cid = remote_cids.next_reserved()?;
let remote = probe.remote();
let token = self.rng.random();
probe.mark_as_sent();

let frame = frame::PathChallenge(token);

let mut buf = TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
buf.start_new_datagram();

let mut builder =
PacketBuilder::new(now, SpaceId::Data, path_id, cid, &mut buf, false, self)?;
let stats = &mut self.stats.frame_tx;
builder.write_frame_with_log_msg(frame, stats, Some("(nat-traversal)"));
builder.finish_and_track(now, self, path_id, PadDatagram::ToMinMtu);

let path = &mut self.paths.get_mut(&path_id).expect("checked").data;

path.challenges_sent.insert(
token,
paths::SentChallengeInfo {
sent_instant: now,
network_path: FourTuple {
remote,
local_ip: None,
},
},
);

let size = buf.len();

self.stats.udp_tx.on_sent(1, size);
self.path_stats
.entry(path_id)
.or_default()
.udp_tx
.on_sent(1, size);

Some(Transmit {
destination: remote,
size,
ecn: None,
segment_size: None,
src_ip: None,
})
}

/// Indicate what types of frames are ready to send for the given space
///
/// *packet_size* is the number of bytes available to build the next packet.
Expand Down
23 changes: 3 additions & 20 deletions quinn-proto/src/iroh_hp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::{
net::{IpAddr, SocketAddr},
};

use identity_hash::IntMap;
use rustc_hash::{FxHashMap, FxHashSet};
use tracing::trace;

Expand Down Expand Up @@ -304,10 +303,6 @@ pub(crate) struct ServerState {
round: VarInt,
/// Addresses to which PATH_CHALLENGES need to be sent.
pending_probes: FxHashSet<IpPort>,
/// Sent PATH_CHALLENGES for this round.
///
/// This is used to validate the remotes assigned to each token.
active_probes: IntMap<u64, IpPort>,
}

impl ServerState {
Expand All @@ -319,7 +314,6 @@ impl ServerState {
next_local_addr_id: Default::default(),
round: Default::default(),
pending_probes: Default::default(),
active_probes: Default::default(),
}
}

Expand Down Expand Up @@ -364,12 +358,6 @@ impl ServerState {
if round > self.round {
self.round = round;
self.pending_probes.clear();
// TODO(@divma): This log is here because I'm not sure if dropping the challenges
// without further interaction with the connection is going to cause issues.
for (token, remote) in self.active_probes.drain() {
let remote: SocketAddr = remote.into();
trace!(token=format!("{:08x}", token), %remote, "dropping nat traversal challenge pending response");
}
} else if self.pending_probes.len() >= self.max_remote_addresses {
return Err(Error::TooManyAddresses);
}
Expand All @@ -385,21 +373,18 @@ impl ServerState {
.map(|remote| ServerProbing {
remote,
pending_probes: &mut self.pending_probes,
active_probes: &mut self.active_probes,
})
}
}

pub(crate) struct ServerProbing<'a> {
remote: IpPort,
pending_probes: &'a mut FxHashSet<IpPort>,
active_probes: &'a mut IntMap<u64, IpPort>,
}

impl<'a> ServerProbing<'a> {
pub(crate) fn finish(self, token: u64) {
pub(crate) fn mark_as_sent(self) {
self.pending_probes.remove(&self.remote);
self.active_probes.insert(token, self.remote);
}

pub(crate) fn remote(&self) -> SocketAddr {
Expand Down Expand Up @@ -555,16 +540,14 @@ mod tests {

dbg!(&state);
assert_eq!(state.pending_probes.len(), 2);
assert_eq!(state.active_probes.len(), 0);

let probe = state.next_probe().unwrap();
probe.finish(1);
probe.mark_as_sent();
let probe = state.next_probe().unwrap();
probe.finish(2);
probe.mark_as_sent();

assert!(state.next_probe().is_none());
assert_eq!(state.pending_probes.len(), 0);
assert_eq!(state.active_probes.len(), 2);
}

#[test]
Expand Down
Loading