diff --git a/iroh/src/magicsock/node_map.rs b/iroh/src/magicsock/node_map.rs index 6c064062937..a67a570c3dd 100644 --- a/iroh/src/magicsock/node_map.rs +++ b/iroh/src/magicsock/node_map.rs @@ -11,18 +11,15 @@ use serde::{Deserialize, Serialize}; use stun_rs::TransactionId; use tracing::{debug, info, instrument, trace, warn}; -use self::{ - best_addr::ClearReason, - node_state::{NodeState, Options, PingHandled}, -}; +use self::node_state::{NodeState, Options, PingHandled}; use super::{ActorMessage, NodeIdMappedAddr, metrics::Metrics, transports}; use crate::disco::{CallMeMaybe, Pong, SendAddr}; #[cfg(any(test, feature = "test-utils"))] use crate::endpoint::PathSelection; -mod best_addr; mod node_state; mod path_state; +mod path_validity; mod udp_paths; pub use node_state::{ConnectionType, ControlMsg, DirectAddrInfo, RemoteInfo}; @@ -196,7 +193,7 @@ impl NodeMap { .expect("poisoned") .get_mut(NodeStateKey::Idx(id)) { - ep.ping_timeout(tx_id); + ep.ping_timeout(tx_id, Instant::now()); } } @@ -266,9 +263,10 @@ impl NodeMap { } pub(super) fn reset_node_states(&self) { + let now = Instant::now(); let mut inner = self.inner.lock().expect("poisoned"); for (_, ep) in inner.node_states_mut() { - ep.note_connectivity_change(); + ep.note_connectivity_change(now); } } @@ -317,7 +315,7 @@ impl NodeMap { self.inner .lock() .expect("poisoned") - .on_direct_addr_discovered(discovered); + .on_direct_addr_discovered(discovered, Instant::now()); } } @@ -378,24 +376,28 @@ impl NodeMapInner { } /// Prunes direct addresses from nodes that claim to share an address we know points to us. - pub(super) fn on_direct_addr_discovered(&mut self, discovered: BTreeSet) { + pub(super) fn on_direct_addr_discovered( + &mut self, + discovered: BTreeSet, + now: Instant, + ) { for addr in discovered { - self.remove_by_ipp(addr.into(), ClearReason::MatchesOurLocalAddr) + self.remove_by_ipp(addr.into(), now, "matches our local addr") } } /// Removes a direct address from a node. - fn remove_by_ipp(&mut self, ipp: IpPort, reason: ClearReason) { + fn remove_by_ipp(&mut self, ipp: IpPort, now: Instant, why: &'static str) { if let Some(id) = self.by_ip_port.remove(&ipp) { if let Entry::Occupied(mut entry) = self.by_id.entry(id) { let node = entry.get_mut(); - node.remove_direct_addr(&ipp, reason); + node.remove_direct_addr(&ipp, now, why); if node.direct_addresses().count() == 0 { let node_id = node.public_key(); let mapped_addr = node.quic_mapped_addr(); self.by_node_key.remove(node_id); self.by_quic_mapped_addr.remove(mapped_addr); - debug!(node_id=%node_id.fmt_short(), ?reason, "removing node"); + debug!(node_id=%node_id.fmt_short(), why, "removing node"); entry.remove(); } } @@ -840,7 +842,7 @@ mod tests { } info!("Pruning addresses"); - endpoint.prune_direct_addresses(); + endpoint.prune_direct_addresses(Instant::now()); // Half the offline addresses should have been pruned. All the active and alive // addresses should have been kept. diff --git a/iroh/src/magicsock/node_map/best_addr.rs b/iroh/src/magicsock/node_map/best_addr.rs deleted file mode 100644 index 48866e27813..00000000000 --- a/iroh/src/magicsock/node_map/best_addr.rs +++ /dev/null @@ -1,221 +0,0 @@ -//! The [`BestAddr`] is the currently active best address for UDP sends. - -use std::net::SocketAddr; - -use n0_future::time::{Duration, Instant}; -use tracing::{debug, info}; - -/// How long we trust a UDP address as the exclusive path (without using relay) without having heard a Pong reply. -const TRUST_UDP_ADDR_DURATION: Duration = Duration::from_millis(6500); - -#[derive(Debug, Default)] -pub(super) struct BestAddr(Option); - -#[derive(Debug)] -struct BestAddrInner { - addr: AddrLatency, - trust_until: Option, - confirmed_at: Instant, -} - -impl BestAddrInner { - fn is_trusted(&self, now: Instant) -> bool { - self.trust_until - .map(|trust_until| trust_until >= now) - .unwrap_or(false) - } - - fn addr(&self) -> SocketAddr { - self.addr.addr - } -} - -#[derive(Debug)] -pub(super) enum Source { - ReceivedPong, - BestCandidate, - Udp, -} - -impl Source { - fn trust_until(&self, from: Instant) -> Instant { - match self { - Source::ReceivedPong => from + TRUST_UDP_ADDR_DURATION, - // TODO: Fix time - Source::BestCandidate => from + Duration::from_secs(60 * 60), - Source::Udp => from + TRUST_UDP_ADDR_DURATION, - } - } -} - -#[derive(Debug)] -pub(super) enum State<'a> { - Valid(&'a AddrLatency), - Outdated(&'a AddrLatency), - Empty, -} - -#[derive(Debug, Clone, Copy)] -pub enum ClearReason { - Inactive, - PongTimeout, - MatchesOurLocalAddr, -} - -impl BestAddr { - #[cfg(test)] - pub fn from_parts( - addr: SocketAddr, - latency: Duration, - confirmed_at: Instant, - trust_until: Instant, - ) -> Self { - let inner = BestAddrInner { - addr: AddrLatency { addr, latency }, - confirmed_at, - trust_until: Some(trust_until), - }; - Self(Some(inner)) - } - - pub fn is_empty(&self) -> bool { - self.0.is_none() - } - - /// Unconditionally clears the best address. - pub fn clear(&mut self, reason: ClearReason, has_relay: bool) { - let old = self.0.take(); - if let Some(old_addr) = old.as_ref().map(BestAddrInner::addr) { - info!(?reason, ?has_relay, %old_addr, "clearing best_addr"); - } - } - - /// Clears the best address if equal to `addr`. - pub fn clear_if_equals(&mut self, addr: SocketAddr, reason: ClearReason, has_relay: bool) { - if self.addr() == Some(addr) { - self.clear(reason, has_relay) - } - } - - pub fn clear_trust(&mut self, why: &'static str) { - if let Some(state) = self.0.as_mut() { - info!( - %why, - prev_trust_until = ?state.trust_until, - "clearing best_addr trust", - ); - state.trust_until = None; - } - } - - pub fn insert_if_better_or_reconfirm( - &mut self, - addr: SocketAddr, - latency: Duration, - source: Source, - confirmed_at: Instant, - ) { - match self.0.as_mut() { - None => { - self.insert(addr, latency, source, confirmed_at); - } - Some(state) => { - let candidate = AddrLatency { addr, latency }; - if !state.is_trusted(confirmed_at) || candidate.is_better_than(&state.addr) { - self.insert(addr, latency, source, confirmed_at); - } else if state.addr.addr == addr { - state.confirmed_at = confirmed_at; - state.trust_until = Some(source.trust_until(confirmed_at)); - } - } - } - } - - /// Reset the expiry, if the passed in addr matches the currently used one. - #[cfg(not(wasm_browser))] - pub fn reconfirm_if_used(&mut self, addr: SocketAddr, source: Source, confirmed_at: Instant) { - if let Some(state) = self.0.as_mut() { - if state.addr.addr == addr { - state.confirmed_at = confirmed_at; - state.trust_until = Some(source.trust_until(confirmed_at)); - } - } - } - - fn insert( - &mut self, - addr: SocketAddr, - latency: Duration, - source: Source, - confirmed_at: Instant, - ) { - let trust_until = source.trust_until(confirmed_at); - - if self - .0 - .as_ref() - .map(|prev| prev.addr.addr == addr) - .unwrap_or_default() - { - debug!( - %addr, - latency = ?latency, - trust_for = ?trust_until.duration_since(Instant::now()), - "re-selecting direct path for node" - ); - } else { - info!( - %addr, - latency = ?latency, - trust_for = ?trust_until.duration_since(Instant::now()), - "selecting new direct path for node" - ); - } - let inner = BestAddrInner { - addr: AddrLatency { addr, latency }, - trust_until: Some(trust_until), - confirmed_at, - }; - self.0 = Some(inner); - } - - pub fn state(&self, now: Instant) -> State<'_> { - match &self.0 { - None => State::Empty, - Some(state) => match state.trust_until { - Some(expiry) if now < expiry => State::Valid(&state.addr), - Some(_) | None => State::Outdated(&state.addr), - }, - } - } - - pub fn addr(&self) -> Option { - self.0.as_ref().map(BestAddrInner::addr) - } -} - -/// A `SocketAddr` with an associated latency. -#[derive(Debug, Clone)] -pub struct AddrLatency { - pub addr: SocketAddr, - pub latency: Duration, -} - -impl AddrLatency { - /// Reports whether `self` is a better addr to use than `other`. - fn is_better_than(&self, other: &Self) -> bool { - if self.addr == other.addr { - return false; - } - if self.addr.is_ipv6() && other.addr.is_ipv4() { - // Prefer IPv6 for being a bit more robust, as long as - // the latencies are roughly equivalent. - if self.latency / 10 * 9 < other.latency { - return true; - } - } else if self.addr.is_ipv4() && other.addr.is_ipv6() && other.is_better_than(self) { - return false; - } - self.latency < other.latency - } -} diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index 53e28a1b21f..13278c33673 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -2,6 +2,7 @@ use std::{ collections::{BTreeSet, HashMap, btree_map::Entry}, hash::Hash, net::{IpAddr, SocketAddr}, + sync::atomic::AtomicBool, }; use data_encoding::HEXLOWER; @@ -17,7 +18,6 @@ use tracing::{Level, debug, event, info, instrument, trace, warn}; use super::{ IpPort, Source, - best_addr::{self, ClearReason, Source as BestAddrSource}, path_state::{PathState, summarize_node_paths}, udp_paths::{NodeUdpPaths, UdpSendAddr}, }; @@ -25,7 +25,10 @@ use super::{ use crate::endpoint::PathSelection; use crate::{ disco::{self, SendAddr}, - magicsock::{ActorMessage, HEARTBEAT_INTERVAL, MagicsockMetrics, NodeIdMappedAddr}, + magicsock::{ + ActorMessage, HEARTBEAT_INTERVAL, MagicsockMetrics, NodeIdMappedAddr, + node_map::path_validity::PathValidity, + }, }; /// Number of addresses that are not active that we keep around per node. @@ -136,7 +139,7 @@ pub(super) struct NodeState { /// Whether the conn_type was ever observed to be `Direct` at some point. /// /// Used for metric reporting. - has_been_direct: bool, + has_been_direct: AtomicBool, /// Configuration for what path selection to use #[cfg(any(test, feature = "test-utils"))] path_selection: PathSelection, @@ -183,7 +186,7 @@ impl NodeState { last_used: options.active.then(Instant::now), last_call_me_maybe: None, conn_type: Watchable::new(ConnectionType::None), - has_been_direct: false, + has_been_direct: AtomicBool::new(false), #[cfg(any(test, feature = "test-utils"))] path_selection: options.path_selection, } @@ -241,7 +244,7 @@ impl NodeState { .iter() .map(|(addr, path_state)| DirectAddrInfo { addr: SocketAddr::from(*addr), - latency: path_state.recent_pong.as_ref().map(|pong| pong.latency), + latency: path_state.validity.latency(), last_control: path_state.last_control_msg(now), last_payload: path_state .last_payload_msg @@ -277,8 +280,7 @@ impl NodeState { /// /// This may return to send on one, both or no paths. fn addr_for_send( - &mut self, - now: &Instant, + &self, have_ipv6: bool, metrics: &MagicsockMetrics, ) -> (Option, Option) { @@ -289,22 +291,22 @@ impl NodeState { ); return (None, self.relay_url()); } - let (best_addr, relay_url) = match self.udp_paths.send_addr(*now, have_ipv6) { + let (best_addr, relay_url) = match self.udp_paths.send_addr(have_ipv6) { UdpSendAddr::Valid(addr) => { // If we have a valid address we use it. trace!(%addr, "UdpSendAddr is valid, use it"); - (Some(addr), None) + (Some(*addr), None) } UdpSendAddr::Outdated(addr) => { // If the address is outdated we use it, but send via relay at the same time. // We also send disco pings so that it will become valid again if it still // works (i.e. we don't need to holepunch again). trace!(%addr, "UdpSendAddr is outdated, use it together with relay"); - (Some(addr), self.relay_url()) + (Some(*addr), self.relay_url()) } UdpSendAddr::Unconfirmed(addr) => { trace!(%addr, "UdpSendAddr is unconfirmed, use it together with relay"); - (Some(addr), self.relay_url()) + (Some(*addr), self.relay_url()) } UdpSendAddr::None => { trace!("No UdpSendAddr, use relay"); @@ -317,9 +319,13 @@ impl NodeState { (None, Some(relay_url)) => ConnectionType::Relay(relay_url), (None, None) => ConnectionType::None, }; - if !self.has_been_direct && matches!(&typ, ConnectionType::Direct(_)) { - self.has_been_direct = true; - metrics.nodes_contacted_directly.inc(); + if matches!(&typ, ConnectionType::Direct(_)) { + let before = self + .has_been_direct + .swap(true, std::sync::atomic::Ordering::Relaxed); + if !before { + metrics.nodes_contacted_directly.inc(); + } } if let Ok(prev_typ) = self.conn_type.set(typ.clone()) { // The connection type has changed. @@ -366,21 +372,17 @@ impl NodeState { /// Removes a direct address for this node. /// /// If this is also the best address, it will be cleared as well. - pub(super) fn remove_direct_addr(&mut self, ip_port: &IpPort, reason: ClearReason) { + pub(super) fn remove_direct_addr(&mut self, ip_port: &IpPort, now: Instant, why: &'static str) { let Some(state) = self.udp_paths.paths.remove(ip_port) else { return; }; match state.last_alive().map(|instant| instant.elapsed()) { - Some(last_alive) => debug!(%ip_port, ?last_alive, ?reason, "pruning address"), - None => debug!(%ip_port, last_seen=%"never", ?reason, "pruning address"), + Some(last_alive) => debug!(%ip_port, ?last_alive, why, "pruning address"), + None => debug!(%ip_port, last_seen=%"never", why, "pruning address"), } - self.udp_paths.best_addr.clear_if_equals( - (*ip_port).into(), - reason, - self.relay_url.is_some(), - ); + self.udp_paths.update_to_best_addr(now); } /// Whether we need to send another call-me-maybe to the endpoint. @@ -398,20 +400,27 @@ impl NodeState { debug!("no previous full ping: need full ping"); return true; }; - match self.udp_paths.best_addr.state(*now) { - best_addr::State::Empty => { + match &self.udp_paths.best { + UdpSendAddr::None | UdpSendAddr::Unconfirmed(_) => { debug!("best addr not set: need full ping"); true } - best_addr::State::Outdated(_) => { + UdpSendAddr::Outdated(_) => { debug!("best addr expired: need full ping"); true } - best_addr::State::Valid(addr) => { - if addr.latency > GOOD_ENOUGH_LATENCY && *now - last_full_ping >= UPGRADE_INTERVAL { + UdpSendAddr::Valid(addr) => { + let latency = self + .udp_paths + .paths + .get(&(*addr).into()) + .expect("send path not tracked?") + .latency() + .expect("send_addr marked valid incorrectly"); + if latency > GOOD_ENOUGH_LATENCY && *now - last_full_ping >= UPGRADE_INTERVAL { debug!( "full ping interval expired and latency is only {}ms: need full ping", - addr.latency.as_millis() + latency.as_millis() ); true } else { @@ -430,7 +439,7 @@ impl NodeState { /// Cleanup the expired ping for the passed in txid. #[instrument("disco", skip_all, fields(node = %self.node_id.fmt_short()))] - pub(super) fn ping_timeout(&mut self, txid: stun_rs::TransactionId) { + pub(super) fn ping_timeout(&mut self, txid: stun_rs::TransactionId, now: Instant) { if let Some(sp) = self.sent_pings.remove(&txid) { debug!(tx = %HEXLOWER.encode(&txid), addr = %sp.to, "pong not received in timeout"); match sp.to { @@ -446,21 +455,13 @@ impl NodeState { // which we should have received the pong, clear best addr and // pong. Both are used to select this path again, but we know // it's not a usable path now. - path_state.recent_pong = None; - self.udp_paths.best_addr.clear_if_equals( - addr, - ClearReason::PongTimeout, - self.relay_url().is_some(), - ) + path_state.validity = PathValidity::empty(); + self.udp_paths.update_to_best_addr(now); } } else { // If we have no state for the best addr it should have been cleared // anyway. - self.udp_paths.best_addr.clear_if_equals( - addr, - ClearReason::PongTimeout, - self.relay_url.is_some(), - ); + self.udp_paths.update_to_best_addr(now); } } SendAddr::Relay(ref url) => { @@ -635,7 +636,7 @@ impl NodeState { return ping_msgs; } - self.prune_direct_addresses(); + self.prune_direct_addresses(now); let mut ping_dsts = String::from("["); self.udp_paths .paths @@ -667,7 +668,10 @@ impl NodeState { source: super::Source, metrics: &MagicsockMetrics, ) { - if self.udp_paths.best_addr.is_empty() { + if matches!( + self.udp_paths.best, + UdpSendAddr::None | UdpSendAddr::Unconfirmed(_) + ) { // we do not have a direct connection, so changing the relay information may // have an effect on our connection status if self.relay_url.is_none() && new_relay_url.is_some() { @@ -786,14 +790,14 @@ impl NodeState { ); if matches!(path, SendAddr::Udp(_)) && matches!(role, PingRole::NewPath) { - self.prune_direct_addresses(); + self.prune_direct_addresses(now); } - // if the endpoint does not yet have a best_addrr + // if the endpoint does not yet have a best_addr let needs_ping_back = if matches!(path, SendAddr::Udp(_)) && matches!( - self.udp_paths.best_addr.state(now), - best_addr::State::Empty | best_addr::State::Outdated(_) + self.udp_paths.best, + UdpSendAddr::None | UdpSendAddr::Unconfirmed(_) | UdpSendAddr::Outdated(_) ) { // We also need to send a ping to make this path available to us as well. This // is always sent together with a pong. So in the worst case the pong gets lost @@ -821,7 +825,7 @@ impl NodeState { /// /// This trims the list of inactive paths for an endpoint. At most /// [`MAX_INACTIVE_DIRECT_ADDRESSES`] are kept. - pub(super) fn prune_direct_addresses(&mut self) { + pub(super) fn prune_direct_addresses(&mut self, now: Instant) { // prune candidates are addresses that are not active let mut prune_candidates: Vec<_> = self .udp_paths @@ -851,7 +855,7 @@ impl NodeState { prune_candidates.sort_unstable_by_key(|(_ip_port, last_alive)| *last_alive); prune_candidates.truncate(prune_count); for (ip_port, _last_alive) in prune_candidates.into_iter() { - self.remove_direct_addr(&ip_port, ClearReason::Inactive) + self.remove_direct_addr(&ip_port, now, "inactive"); } debug!( paths = %summarize_node_paths(&self.udp_paths.paths), @@ -862,11 +866,11 @@ impl NodeState { /// Called when connectivity changes enough that we should question our earlier /// assumptions about which paths work. #[instrument("disco", skip_all, fields(node = %self.node_id.fmt_short()))] - pub(super) fn note_connectivity_change(&mut self) { - self.udp_paths.best_addr.clear_trust("connectivity changed"); + pub(super) fn note_connectivity_change(&mut self, now: Instant) { for es in self.udp_paths.paths.values_mut() { es.clear(); } + self.udp_paths.update_to_best_addr(now); } /// Handles a Pong message (a reply to an earlier ping). @@ -881,7 +885,7 @@ impl NodeState { event!( target: "iroh::_events::pong::recv", Level::DEBUG, - remote_node = self.node_id.fmt_short(), + remote_node = %self.node_id.fmt_short(), ?src, txn = ?m.tx_id, ); @@ -958,14 +962,9 @@ impl NodeState { // Promote this pong response to our current best address if it's lower latency. // TODO(bradfitz): decide how latency vs. preference order affects decision - if let SendAddr::Udp(to) = sp.to { + if let SendAddr::Udp(_to) = sp.to { debug_assert!(!is_relay, "mismatching relay & udp"); - self.udp_paths.best_addr.insert_if_better_or_reconfirm( - to, - latency, - best_addr::Source::ReceivedPong, - now, - ); + self.udp_paths.update_to_best_addr(now); } node_map_insert @@ -1020,23 +1019,16 @@ impl NodeState { if !call_me_maybe_ipps.contains(ipp) { // TODO: This seems like a weird way to signal that the endpoint no longer // thinks it has this IpPort as an available path. - if st.recent_pong.is_some() { + if !st.validity.is_empty() { debug!(path=?ipp ,"clearing recent pong"); - st.recent_pong = None; + st.validity = PathValidity::empty(); } } } // Clear trust on our best_addr if it is not included in the updated set. Also // clear the last call-me-maybe send time so we will send one again. - if let Some(addr) = self.udp_paths.best_addr.addr() { - let ipp: IpPort = addr.into(); - if !call_me_maybe_ipps.contains(&ipp) { - self.udp_paths - .best_addr - .clear_trust("best_addr not in new call-me-maybe"); - self.last_call_me_maybe = None; - } - } + self.udp_paths.update_to_best_addr(now); + self.last_call_me_maybe = None; debug!( paths = %summarize_node_paths(&self.udp_paths.paths), "updated endpoint paths from call-me-maybe", @@ -1051,18 +1043,16 @@ impl NodeState { debug_assert!(false, "node map inconsistency by_ip_port <-> direct addr"); return; }; - state.last_payload_msg = Some(now); + state.receive_payload(now); self.last_used = Some(now); - self.udp_paths - .best_addr - .reconfirm_if_used(addr.into(), BestAddrSource::Udp, now); + self.udp_paths.update_to_best_addr(now); } pub(super) fn receive_relay(&mut self, url: &RelayUrl, src: NodeId, now: Instant) { match self.relay_url.as_mut() { Some((current_home, state)) if current_home == url => { // We received on the expected url. update state. - state.last_payload_msg = Some(now); + state.receive_payload(now); } Some((_current_home, _state)) => { // we have a different url. we only update on ping, not on receive_relay. @@ -1123,7 +1113,7 @@ impl NodeState { } // Send heartbeat ping to keep the current addr going as long as we need it. - if let Some(udp_addr) = self.udp_paths.best_addr.addr() { + if let Some(udp_addr) = self.udp_paths.best.get_addr() { let elapsed = self.last_ping(&SendAddr::Udp(udp_addr)).map(|l| now - l); // Send a ping if the last ping is older than 2 seconds. let needs_ping = match elapsed { @@ -1151,6 +1141,8 @@ impl NodeState { /// Returns the addresses on which a payload should be sent right now. /// /// This is in the hot path of `.poll_send()`. + // TODO(matheus23): Make this take &self. That's not quite possible yet due to `send_call_me_maybe` + // eventually calling `prune_direct_addresses` (which needs &mut self) #[instrument("get_send_addrs", skip_all, fields(node = %self.node_id.fmt_short()))] pub(crate) fn get_send_addrs( &mut self, @@ -1163,7 +1155,7 @@ impl NodeState { // this is the first time we are trying to connect to this node metrics.nodes_contacted.inc(); } - let (udp_addr, relay_url) = self.addr_for_send(&now, have_ipv6, metrics); + let (udp_addr, relay_url) = self.addr_for_send(have_ipv6, metrics); let ping_msgs = if self.want_call_me_maybe(&now) { self.send_call_me_maybe(now, SendCallMeMaybe::IfNoRecent) @@ -1443,7 +1435,6 @@ pub enum ConnectionType { mod tests { use std::{collections::BTreeMap, net::Ipv4Addr}; - use best_addr::BestAddr; use iroh_base::SecretKey; use super::*; @@ -1500,18 +1491,13 @@ mod tests { relay_url: None, udp_paths: NodeUdpPaths::from_parts( endpoint_state, - BestAddr::from_parts( - ip_port.into(), - latency, - now, - now + Duration::from_secs(100), - ), + UdpSendAddr::Valid(ip_port.into()), ), sent_pings: HashMap::new(), last_used: Some(now), last_call_me_maybe: None, conn_type: Watchable::new(ConnectionType::Direct(ip_port.into())), - has_been_direct: true, + has_been_direct: AtomicBool::new(true), #[cfg(any(test, feature = "test-utils"))] path_selection: PathSelection::default(), }, @@ -1533,7 +1519,7 @@ mod tests { last_used: Some(now), last_call_me_maybe: None, conn_type: Watchable::new(ConnectionType::Relay(send_addr.clone())), - has_been_direct: false, + has_been_direct: AtomicBool::new(false), #[cfg(any(test, feature = "test-utils"))] path_selection: PathSelection::default(), } @@ -1562,7 +1548,7 @@ mod tests { last_used: Some(now), last_call_me_maybe: None, conn_type: Watchable::new(ConnectionType::Relay(send_addr.clone())), - has_been_direct: false, + has_been_direct: AtomicBool::new(false), #[cfg(any(test, feature = "test-utils"))] path_selection: PathSelection::default(), } @@ -1571,7 +1557,6 @@ mod tests { // endpoint w/ expired best addr and relay w/ latency let (d_endpoint, d_socket_addr) = { let socket_addr: SocketAddr = "0.0.0.0:7".parse().unwrap(); - let expired = now.checked_sub(Duration::from_secs(100)).unwrap(); let key = SecretKey::generate(rand::thread_rng()); let node_id = key.public(); let endpoint_state = BTreeMap::from([( @@ -1595,7 +1580,7 @@ mod tests { relay_url: relay_and_state(key.public(), send_addr.clone()), udp_paths: NodeUdpPaths::from_parts( endpoint_state, - BestAddr::from_parts(socket_addr, Duration::from_millis(80), now, expired), + UdpSendAddr::Outdated(socket_addr), ), sent_pings: HashMap::new(), last_used: Some(now), @@ -1604,7 +1589,7 @@ mod tests { socket_addr, send_addr.clone(), )), - has_been_direct: false, + has_been_direct: AtomicBool::new(false), #[cfg(any(test, feature = "test-utils"))] path_selection: PathSelection::default(), }, diff --git a/iroh/src/magicsock/node_map/path_state.rs b/iroh/src/magicsock/node_map/path_state.rs index 21fbd6e7019..b5047129278 100644 --- a/iroh/src/magicsock/node_map/path_state.rs +++ b/iroh/src/magicsock/node_map/path_state.rs @@ -1,9 +1,6 @@ //! The state kept for each network path to a remote node. -use std::{ - collections::{BTreeMap, HashMap}, - net::SocketAddr, -}; +use std::collections::{BTreeMap, HashMap}; use iroh_base::NodeId; use n0_future::time::{Duration, Instant}; @@ -13,7 +10,13 @@ use super::{ IpPort, PingRole, Source, node_state::{ControlMsg, PongReply, SESSION_ACTIVE_TIMEOUT}, }; -use crate::{disco::SendAddr, magicsock::HEARTBEAT_INTERVAL}; +use crate::{ + disco::SendAddr, + magicsock::{ + HEARTBEAT_INTERVAL, + node_map::path_validity::{self, PathValidity}, + }, +}; /// The minimum time between pings to an endpoint. /// @@ -26,7 +29,7 @@ const DISCO_PING_INTERVAL: Duration = Duration::from_secs(5); /// This state is used for both the relay path and any direct UDP paths. /// /// [`NodeState`]: super::node_state::NodeState -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone)] pub(super) struct PathState { /// The node for which this path exists. node_id: NodeId, @@ -43,11 +46,12 @@ pub(super) struct PathState { /// The time this endpoint was last advertised via a call-me-maybe DISCO message. pub(super) call_me_maybe_time: Option, - /// The most recent [`PongReply`]. + /// Tracks whether this path is valid. + /// + /// Also stores the latest [`PongReply`], if there is one. /// - /// Previous replies are cleared when they are no longer relevant to determine whether - /// this path can still be used to reach the remote node. - pub(super) recent_pong: Option, + /// See [`PathValidity`] docs. + pub(super) validity: PathValidity, /// When the last payload data was **received** via this path. /// /// This excludes DISCO messages. @@ -70,19 +74,12 @@ impl PathState { last_ping: None, last_got_ping: None, call_me_maybe_time: None, - recent_pong: None, + validity: PathValidity::empty(), last_payload_msg: None, sources, } } - pub(super) fn udp_addr(&self) -> Option { - match self.path { - SendAddr::Udp(addr) => Some(addr), - SendAddr::Relay(_) => None, - } - } - pub(super) fn with_last_payload( node_id: NodeId, path: SendAddr, @@ -97,7 +94,7 @@ impl PathState { last_ping: None, last_got_ping: None, call_me_maybe_time: None, - recent_pong: None, + validity: PathValidity::empty(), last_payload_msg: Some(now), sources, } @@ -117,7 +114,7 @@ impl PathState { pub(super) fn add_pong_reply(&mut self, r: PongReply) { if let SendAddr::Udp(ref path) = self.path { - if self.recent_pong.is_none() { + if self.validity.is_empty() { event!( target: "iroh::_events::holepunched", Level::DEBUG, @@ -127,7 +124,14 @@ impl PathState { ); } } - self.recent_pong = Some(r); + + self.validity = PathValidity::new(r.pong_at, r.latency); + } + + pub(super) fn receive_payload(&mut self, now: Instant) { + self.last_payload_msg = Some(now); + self.validity + .receive_payload(now, path_validity::Source::QuicPayload); } #[cfg(test)] @@ -138,7 +142,7 @@ impl PathState { last_ping: None, last_got_ping: None, call_me_maybe_time: None, - recent_pong: Some(r), + validity: PathValidity::new(r.pong_at, r.latency), last_payload_msg: None, sources: HashMap::new(), } @@ -175,15 +179,13 @@ impl PathState { /// - When the last payload transmission occurred. /// - when the last ping from them was received. pub(super) fn last_alive(&self) -> Option { - self.recent_pong - .as_ref() - .map(|pong| &pong.pong_at) + self.validity + .latest_pong() .into_iter() - .chain(self.last_payload_msg.as_ref()) - .chain(self.call_me_maybe_time.as_ref()) - .chain(self.last_incoming_ping()) + .chain(self.last_payload_msg) + .chain(self.call_me_maybe_time) + .chain(self.last_incoming_ping().cloned()) .max() - .copied() } /// The last control or DISCO message **about** this path. @@ -197,9 +199,9 @@ impl PathState { pub(super) fn last_control_msg(&self, now: Instant) -> Option<(Duration, ControlMsg)> { // get every control message and assign it its kind let last_pong = self - .recent_pong - .as_ref() - .map(|pong| (pong.pong_at, ControlMsg::Pong)); + .validity + .latest_pong() + .map(|pong_at| (pong_at, ControlMsg::Pong)); let last_call_me_maybe = self .call_me_maybe_time .as_ref() @@ -218,7 +220,7 @@ impl PathState { /// Returns the latency from the most recent pong, if available. pub(super) fn latency(&self) -> Option { - self.recent_pong.as_ref().map(|p| p.latency) + self.validity.latency() } pub(super) fn needs_ping(&self, now: &Instant) -> bool { @@ -279,7 +281,7 @@ impl PathState { self.last_ping = None; self.last_got_ping = None; self.call_me_maybe_time = None; - self.recent_pong = None; + self.validity = PathValidity::empty(); } fn summary(&self, mut w: impl std::fmt::Write) -> std::fmt::Result { @@ -287,8 +289,8 @@ impl PathState { if self.is_active() { write!(w, "active ")?; } - if let Some(ref pong) = self.recent_pong { - write!(w, "pong-received({:?} ago) ", pong.pong_at.elapsed())?; + if let Some(pong_at) = self.validity.latest_pong() { + write!(w, "pong-received({:?} ago) ", pong_at.elapsed())?; } if let Some(when) = self.last_incoming_ping() { write!(w, "ping-received({:?} ago) ", when.elapsed())?; diff --git a/iroh/src/magicsock/node_map/path_validity.rs b/iroh/src/magicsock/node_map/path_validity.rs new file mode 100644 index 00000000000..cd8a5f442b3 --- /dev/null +++ b/iroh/src/magicsock/node_map/path_validity.rs @@ -0,0 +1,154 @@ +use n0_future::time::{Duration, Instant}; + +/// How long we trust a UDP address as the exclusive path (i.e. without also sending via the relay). +/// +/// Trust for a UDP address begins when we receive a DISCO UDP pong on that address. +/// It is then further extended by this duration every time we receive QUIC payload data while it's +/// currently trusted. +/// +/// If trust goes away, it can be brought back with another valid DISCO UDP pong. +const TRUST_UDP_ADDR_DURATION: Duration = Duration::from_millis(6500); + +/// Tracks a path's validity. +/// +/// A path is valid: +/// - For [`Source::trust_duration`] after a successful [`PongReply`]. +/// - For [`Source::trust_duration`] longer starting at the most recent +/// received application payload *while the path was valid*. +/// +/// [`PongReply`]: super::node_state::PongReply +#[derive(Debug, Clone, Default)] +pub(super) struct PathValidity(Option); + +#[derive(Debug, Clone)] +struct Inner { + latest_pong: Instant, + latency: Duration, + trust_until: Instant, +} + +#[derive(Debug)] +pub(super) enum Source { + ReceivedPong, + QuicPayload, +} + +impl Source { + fn trust_duration(&self) -> Duration { + match self { + Source::ReceivedPong => TRUST_UDP_ADDR_DURATION, + Source::QuicPayload => TRUST_UDP_ADDR_DURATION, + } + } +} + +impl PathValidity { + pub(super) fn new(pong_at: Instant, latency: Duration) -> Self { + Self(Some(Inner { + trust_until: pong_at + Source::ReceivedPong.trust_duration(), + latest_pong: pong_at, + latency, + })) + } + + pub(super) fn empty() -> Self { + Self(None) + } + + pub(super) fn is_empty(&self) -> bool { + self.0.is_none() + } + + pub(super) fn is_valid(&self, now: Instant) -> bool { + let Some(state) = self.0.as_ref() else { + return false; + }; + + state.is_valid(now) + } + + pub(super) fn latency_if_valid(&self, now: Instant) -> Option { + let state = self.0.as_ref()?; + state.is_valid(now).then_some(state.latency) + } + + pub(super) fn is_outdated(&self, now: Instant) -> bool { + let Some(state) = self.0.as_ref() else { + return false; + }; + + // We *used* to be valid, but are now outdated. + // This happens when we had a DISCO pong but didn't receive + // any payload data or further pongs for at least TRUST_UDP_ADDR_DURATION + state.is_outdated(now) + } + + pub(super) fn latency_if_outdated(&self, now: Instant) -> Option { + let state = self.0.as_ref()?; + state.is_outdated(now).then_some(state.latency) + } + + /// Reconfirms path validity, if a payload was received while the + /// path was valid. + pub(super) fn receive_payload(&mut self, now: Instant, source: Source) { + let Some(state) = self.0.as_mut() else { + return; + }; + + if state.is_valid(now) { + state.trust_until = now + source.trust_duration(); + } + } + + pub(super) fn latency(&self) -> Option { + Some(self.0.as_ref()?.latency) + } + + pub(super) fn latest_pong(&self) -> Option { + Some(self.0.as_ref()?.latest_pong) + } +} + +impl Inner { + fn is_valid(&self, now: Instant) -> bool { + self.latest_pong <= now && now < self.trust_until + } + + fn is_outdated(&self, now: Instant) -> bool { + self.latest_pong <= now && self.trust_until <= now + } +} + +#[cfg(test)] +mod tests { + use n0_future::time::{Duration, Instant}; + + use super::{PathValidity, Source, TRUST_UDP_ADDR_DURATION}; + + #[tokio::test(start_paused = true)] + async fn test_basic_path_validity_lifetime() { + let mut validity = PathValidity(None); + assert!(!validity.is_valid(Instant::now())); + assert!(!validity.is_outdated(Instant::now())); + + validity = PathValidity::new(Instant::now(), Duration::from_millis(20)); + assert!(validity.is_valid(Instant::now())); + assert!(!validity.is_outdated(Instant::now())); + + tokio::time::advance(TRUST_UDP_ADDR_DURATION / 2).await; + assert!(validity.is_valid(Instant::now())); + assert!(!validity.is_outdated(Instant::now())); + + validity.receive_payload(Instant::now(), Source::QuicPayload); + assert!(validity.is_valid(Instant::now())); + assert!(!validity.is_outdated(Instant::now())); + + tokio::time::advance(TRUST_UDP_ADDR_DURATION / 2).await; + assert!(validity.is_valid(Instant::now())); + assert!(!validity.is_outdated(Instant::now())); + + tokio::time::advance(TRUST_UDP_ADDR_DURATION / 2).await; + assert!(!validity.is_valid(Instant::now())); + assert!(validity.is_outdated(Instant::now())); + } +} diff --git a/iroh/src/magicsock/node_map/udp_paths.rs b/iroh/src/magicsock/node_map/udp_paths.rs index 88a00037799..51d7d5850e3 100644 --- a/iroh/src/magicsock/node_map/udp_paths.rs +++ b/iroh/src/magicsock/node_map/udp_paths.rs @@ -7,17 +7,9 @@ //! [`NodeState`]: super::node_state::NodeState use std::{collections::BTreeMap, net::SocketAddr}; -use n0_future::time::{Duration, Instant}; -use rand::seq::IteratorRandom; -use tracing::warn; +use n0_future::time::Instant; -use super::{ - IpPort, - best_addr::{self, BestAddr}, - node_state::PongReply, - path_state::PathState, -}; -use crate::disco::SendAddr; +use super::{IpPort, path_state::PathState}; /// The address on which to send datagrams over UDP. /// @@ -32,7 +24,7 @@ use crate::disco::SendAddr; /// /// [`MagicSock`]: crate::magicsock::MagicSock /// [`NodeState`]: super::node_state::NodeState -#[derive(Debug)] +#[derive(Debug, Default, Clone, Copy)] pub(super) enum UdpSendAddr { /// The UDP address can be relied on to deliver data to the remote node. /// @@ -54,9 +46,21 @@ pub(super) enum UdpSendAddr { /// establish a connection. Unconfirmed(SocketAddr), /// No known UDP path exists to the remote node. + #[default] None, } +impl UdpSendAddr { + pub fn get_addr(&self) -> Option { + match self { + UdpSendAddr::Valid(addr) + | UdpSendAddr::Outdated(addr) + | UdpSendAddr::Unconfirmed(addr) => Some(*addr), + UdpSendAddr::None => None, + } + } +} + /// The UDP paths for a single node. /// /// Paths are identified by the [`IpPort`] of their UDP address. @@ -72,10 +76,19 @@ pub(super) enum UdpSendAddr { pub(super) struct NodeUdpPaths { /// The state for each of this node's direct paths. pub(super) paths: BTreeMap, - /// Best UDP path currently selected. - pub(super) best_addr: BestAddr, - /// If we had to choose a path because we had no `best_addr` it is stored here. - chosen_candidate: Option, + /// The current address we use to send on. + /// + /// This is *almost* the same as going through `paths` and finding + /// the best one, except that this is + /// 1. Not updated in `send_addr`, but instead when there's changes to `paths`, so that `send_addr` can take `&self`. + /// 2. Slightly sticky: It only changes when + /// - the current send addr is not a validated path anymore or + /// - we received a pong with lower latency. + pub(super) best: UdpSendAddr, + /// The current best address to send on from all IPv4 addresses we have available. + /// + /// Follows the same logic as `best` above, but doesn't include any IPv6 addresses. + pub(super) best_ipv4: UdpSendAddr, } impl NodeUdpPaths { @@ -84,97 +97,90 @@ impl NodeUdpPaths { } #[cfg(test)] - pub(super) fn from_parts(paths: BTreeMap, best_addr: BestAddr) -> Self { + pub(super) fn from_parts(paths: BTreeMap, best: UdpSendAddr) -> Self { Self { paths, - best_addr, - chosen_candidate: None, + best_ipv4: best, // we only use ipv4 addrs in tests + best, } } /// Returns the current UDP address to send on. - /// - /// TODO: The goal here is for this to simply return the already known send address, so - /// it should be `&self` and not `&mut self`. This is only possible once the state from - /// [`NodeUdpPaths`] is no longer modified from outside. - pub(super) fn send_addr(&mut self, now: Instant, have_ipv6: bool) -> UdpSendAddr { - self.assign_best_addr_from_candidates_if_empty(); - match self.best_addr.state(now) { - best_addr::State::Valid(addr) => UdpSendAddr::Valid(addr.addr), - best_addr::State::Outdated(addr) => UdpSendAddr::Outdated(addr.addr), - best_addr::State::Empty => { - // No direct connection has been used before. If we know of any possible - // candidate addresses, randomly try to use one. This path is most - // effective when folks use a NodeAddr with exactly one direct address which - // they know to work, effectively like using a traditional socket or QUIC - // endpoint. - let addr = self - .chosen_candidate - .and_then(|ipp| self.paths.get(&ipp)) - .and_then(|path| path.udp_addr()) - .filter(|addr| addr.is_ipv4() || have_ipv6) - .or_else(|| { - // Look for a new candidate in all the known paths. This may look - // like a RNG use on the hot-path but this is normally invoked at - // most most once at startup. - let addr = self - .paths - .values() - .filter_map(|path| path.udp_addr()) - .filter(|addr| addr.is_ipv4() || have_ipv6) - .choose(&mut rand::thread_rng()); - self.chosen_candidate = addr.map(IpPort::from); - addr - }); - match addr { - Some(addr) => UdpSendAddr::Unconfirmed(addr), - None => UdpSendAddr::None, - } - } + pub(super) fn send_addr(&self, have_ipv6: bool) -> &UdpSendAddr { + if !have_ipv6 { + return &self.best_ipv4; } + &self.best } - /// Fixup best_addr from candidates. + /// Change the current best address(es) to ones chosen as described in [`Self::best_addr`] docs. /// - /// If somehow we end up in a state where we failed to set a best_addr, while we do have - /// valid candidates, this will chose a candidate and set best_addr again. Most likely - /// this is a bug elsewhere though. - fn assign_best_addr_from_candidates_if_empty(&mut self) { - if !self.best_addr.is_empty() { - return; - } + /// This should be called any time that `paths` is modified. + pub(super) fn update_to_best_addr(&mut self, now: Instant) { + self.best_ipv4 = self.best_addr(false, now); + self.best = self.best_addr(true, now); + } - // The highest acceptable latency for an endpoint path. If the latency is higher - // then this the path will be ignored. - const MAX_LATENCY: Duration = Duration::from_secs(60 * 60); - let best_pong = self.paths.iter().fold(None, |best_pong, (ipp, state)| { - let best_latency = best_pong - .map(|p: &PongReply| p.latency) - .unwrap_or(MAX_LATENCY); - match state.recent_pong { - // This pong is better if it has a lower latency, or if it has the same - // latency but on an IPv6 path. - Some(ref pong) - if pong.latency < best_latency - || (pong.latency == best_latency && ipp.ip().is_ipv6()) => - { - Some(pong) + /// Returns the current best address of all available paths, ignoring + /// the currently chosen best address. + /// + /// We try to find the lowest latency [`UdpSendAddr::Valid`], if one exists, otherwise + /// we try to find the lowest latency [`UdpSendAddr::Outdated`], if one exists, otherwise + /// we return essentially an arbitrary [`UdpSendAddr::Unconfirmed`]. + /// + /// If we don't have any addresses, returns [`UdpSendAddr::None`]. + /// + /// If `have_ipv6` is false, we only search among ipv4 candidates. + fn best_addr(&self, have_ipv6: bool, now: Instant) -> UdpSendAddr { + let Some((ipp, path)) = self + .paths + .iter() + .filter(|(ipp, _)| have_ipv6 || ipp.ip.is_ipv4()) + .max_by_key(|(ipp, path)| { + // We find the best by sorting on a key of type (Option>, Option>, bool) + // where the first is set to Some(ReverseOrd(latency)) iff path.is_valid(now) and + // the second is set to Some(ReverseOrd(latency)) if path.is_outdated(now) and + // the third is set to whether the ipp is ipv6. + // This makes max_by_key sort for the lowest valid latency first, then sort for + // the lowest outdated latency second, and if latencies are equal, it'll sort IPv6 paths first. + let is_ipv6 = ipp.ip.is_ipv6(); + if let Some(latency) = path.validity.latency_if_valid(now) { + (Some(ReverseOrd(latency)), None, is_ipv6) + } else if let Some(latency) = path.validity.latency_if_outdated(now) { + (None, Some(ReverseOrd(latency)), is_ipv6) + } else { + (None, None, is_ipv6) } - _ => best_pong, - } - }); + }) + else { + return UdpSendAddr::None; + }; - // If we found a candidate, set to best addr - if let Some(pong) = best_pong { - if let SendAddr::Udp(addr) = pong.from { - warn!(%addr, "No best_addr was set, choose candidate with lowest latency"); - self.best_addr.insert_if_better_or_reconfirm( - addr, - pong.latency, - best_addr::Source::BestCandidate, - pong.pong_at, - ) - } + if path.validity.is_valid(now) { + UdpSendAddr::Valid((*ipp).into()) + } else if path.validity.is_outdated(now) { + UdpSendAddr::Outdated((*ipp).into()) + } else { + UdpSendAddr::Unconfirmed((*ipp).into()) } } } + +/// Implements the reverse [`Ord`] implementation for the wrapped type. +/// +/// Literally calls [`std::cmp::Ordering::reverse`] on the inner value's +/// ordering. +#[derive(PartialEq, Eq)] +struct ReverseOrd(N); + +impl Ord for ReverseOrd { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.0.cmp(&other.0).reverse() + } +} + +impl PartialOrd for ReverseOrd { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +}