diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 10552695edf..b778f8e19c8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,7 +18,7 @@ env: RUSTDOCFLAGS: -Dwarnings MSRV: "1.81" SCCACHE_CACHE_SIZE: "10G" - IROH_FORCE_STAGING_RELAYS: "1" + # IROH_FORCE_STAGING_RELAYS: "1" jobs: tests: diff --git a/iroh/src/magicsock/node_map.rs b/iroh/src/magicsock/node_map.rs index af0634a13bb..0597b30ea3b 100644 --- a/iroh/src/magicsock/node_map.rs +++ b/iroh/src/magicsock/node_map.rs @@ -11,10 +11,7 @@ use serde::{Deserialize, Serialize}; use stun_rs::TransactionId; use tracing::{debug, info, instrument, trace, warn}; -use self::{ - best_addr::ClearReason, - node_state::{NodeState, Options, PingHandled}, -}; +use self::node_state::{NodeState, Options, PingHandled}; use super::{metrics::Metrics, ActorMessage, DiscoMessageSource, NodeIdMappedAddr}; #[cfg(any(test, feature = "test-utils"))] use crate::endpoint::PathSelection; @@ -23,9 +20,9 @@ use crate::{ watchable::Watcher, }; -mod best_addr; mod node_state; mod path_state; +mod path_validity; mod udp_paths; pub use node_state::{ConnectionType, ControlMsg, DirectAddrInfo, RemoteInfo}; @@ -199,7 +196,7 @@ impl NodeMap { .expect("poisoned") .get_mut(NodeStateKey::Idx(id)) { - ep.ping_timeout(tx_id); + ep.ping_timeout(tx_id, Instant::now()); } } @@ -276,9 +273,10 @@ impl NodeMap { } pub(super) fn reset_node_states(&self) { + let now = Instant::now(); let mut inner = self.inner.lock().expect("poisoned"); for (_, ep) in inner.node_states_mut() { - ep.note_connectivity_change(); + ep.note_connectivity_change(now); } } @@ -327,7 +325,7 @@ impl NodeMap { self.inner .lock() .expect("poisoned") - .on_direct_addr_discovered(discovered); + .on_direct_addr_discovered(discovered, Instant::now()); } } @@ -388,24 +386,28 @@ impl NodeMapInner { } /// Prunes direct addresses from nodes that claim to share an address we know points to us. - pub(super) fn on_direct_addr_discovered(&mut self, discovered: BTreeSet) { + pub(super) fn on_direct_addr_discovered( + &mut self, + discovered: BTreeSet, + now: Instant, + ) { for addr in discovered { - self.remove_by_ipp(addr.into(), ClearReason::MatchesOurLocalAddr) + self.remove_by_ipp(addr.into(), now, "matches our local addr") } } /// Removes a direct address from a node. - fn remove_by_ipp(&mut self, ipp: IpPort, reason: ClearReason) { + fn remove_by_ipp(&mut self, ipp: IpPort, now: Instant, why: &'static str) { if let Some(id) = self.by_ip_port.remove(&ipp) { if let Entry::Occupied(mut entry) = self.by_id.entry(id) { let node = entry.get_mut(); - node.remove_direct_addr(&ipp, reason); + node.remove_direct_addr(&ipp, now, why); if node.direct_addresses().count() == 0 { let node_id = node.public_key(); let mapped_addr = node.quic_mapped_addr(); self.by_node_key.remove(node_id); self.by_quic_mapped_addr.remove(mapped_addr); - debug!(node_id=%node_id.fmt_short(), ?reason, "removing node"); + debug!(node_id=%node_id.fmt_short(), why, "removing node"); entry.remove(); } } @@ -852,7 +854,7 @@ mod tests { } info!("Pruning addresses"); - endpoint.prune_direct_addresses(); + endpoint.prune_direct_addresses(Instant::now()); // Half the offline addresses should have been pruned. All the active and alive // addresses should have been kept. diff --git a/iroh/src/magicsock/node_map/best_addr.rs b/iroh/src/magicsock/node_map/best_addr.rs deleted file mode 100644 index 18d9ef960be..00000000000 --- a/iroh/src/magicsock/node_map/best_addr.rs +++ /dev/null @@ -1,222 +0,0 @@ -//! The [`BestAddr`] is the currently active best address for UDP sends. - -use std::net::SocketAddr; - -use n0_future::time::{Duration, Instant}; -use tracing::{debug, info}; - -/// How long we trust a UDP address as the exclusive path (without using relay) without having heard a Pong reply. -const TRUST_UDP_ADDR_DURATION: Duration = Duration::from_millis(6500); - -#[derive(Debug, Default)] -pub(super) struct BestAddr(Option); - -#[derive(Debug)] -struct BestAddrInner { - addr: AddrLatency, - trust_until: Option, - confirmed_at: Instant, -} - -impl BestAddrInner { - fn is_trusted(&self, now: Instant) -> bool { - self.trust_until - .map(|trust_until| trust_until >= now) - .unwrap_or(false) - } - - fn addr(&self) -> SocketAddr { - self.addr.addr - } -} - -#[derive(Debug)] -pub(super) enum Source { - ReceivedPong, - BestCandidate, - Udp, -} - -impl Source { - fn trust_until(&self, from: Instant) -> Instant { - match self { - Source::ReceivedPong => from + TRUST_UDP_ADDR_DURATION, - // TODO: Fix time - Source::BestCandidate => from + Duration::from_secs(60 * 60), - Source::Udp => from + TRUST_UDP_ADDR_DURATION, - } - } -} - -#[derive(Debug)] -pub(super) enum State<'a> { - Valid(&'a AddrLatency), - Outdated(&'a AddrLatency), - Empty, -} - -#[derive(Debug, Clone, Copy)] -pub enum ClearReason { - Reset, - Inactive, - PongTimeout, - MatchesOurLocalAddr, -} - -impl BestAddr { - #[cfg(test)] - pub fn from_parts( - addr: SocketAddr, - latency: Duration, - confirmed_at: Instant, - trust_until: Instant, - ) -> Self { - let inner = BestAddrInner { - addr: AddrLatency { addr, latency }, - confirmed_at, - trust_until: Some(trust_until), - }; - Self(Some(inner)) - } - - pub fn is_empty(&self) -> bool { - self.0.is_none() - } - - /// Unconditionally clears the best address. - pub fn clear(&mut self, reason: ClearReason, has_relay: bool) { - let old = self.0.take(); - if let Some(old_addr) = old.as_ref().map(BestAddrInner::addr) { - info!(?reason, ?has_relay, %old_addr, "clearing best_addr"); - } - } - - /// Clears the best address if equal to `addr`. - pub fn clear_if_equals(&mut self, addr: SocketAddr, reason: ClearReason, has_relay: bool) { - if self.addr() == Some(addr) { - self.clear(reason, has_relay) - } - } - - pub fn clear_trust(&mut self, why: &'static str) { - if let Some(state) = self.0.as_mut() { - info!( - %why, - prev_trust_until = ?state.trust_until, - "clearing best_addr trust", - ); - state.trust_until = None; - } - } - - pub fn insert_if_better_or_reconfirm( - &mut self, - addr: SocketAddr, - latency: Duration, - source: Source, - confirmed_at: Instant, - ) { - match self.0.as_mut() { - None => { - self.insert(addr, latency, source, confirmed_at); - } - Some(state) => { - let candidate = AddrLatency { addr, latency }; - if !state.is_trusted(confirmed_at) || candidate.is_better_than(&state.addr) { - self.insert(addr, latency, source, confirmed_at); - } else if state.addr.addr == addr { - state.confirmed_at = confirmed_at; - state.trust_until = Some(source.trust_until(confirmed_at)); - } - } - } - } - - /// Reset the expiry, if the passed in addr matches the currently used one. - #[cfg(not(wasm_browser))] - pub fn reconfirm_if_used(&mut self, addr: SocketAddr, source: Source, confirmed_at: Instant) { - if let Some(state) = self.0.as_mut() { - if state.addr.addr == addr { - state.confirmed_at = confirmed_at; - state.trust_until = Some(source.trust_until(confirmed_at)); - } - } - } - - fn insert( - &mut self, - addr: SocketAddr, - latency: Duration, - source: Source, - confirmed_at: Instant, - ) { - let trust_until = source.trust_until(confirmed_at); - - if self - .0 - .as_ref() - .map(|prev| prev.addr.addr == addr) - .unwrap_or_default() - { - debug!( - %addr, - latency = ?latency, - trust_for = ?trust_until.duration_since(Instant::now()), - "re-selecting direct path for node" - ); - } else { - info!( - %addr, - latency = ?latency, - trust_for = ?trust_until.duration_since(Instant::now()), - "selecting new direct path for node" - ); - } - let inner = BestAddrInner { - addr: AddrLatency { addr, latency }, - trust_until: Some(trust_until), - confirmed_at, - }; - self.0 = Some(inner); - } - - pub fn state(&self, now: Instant) -> State { - match &self.0 { - None => State::Empty, - Some(state) => match state.trust_until { - Some(expiry) if now < expiry => State::Valid(&state.addr), - Some(_) | None => State::Outdated(&state.addr), - }, - } - } - - pub fn addr(&self) -> Option { - self.0.as_ref().map(BestAddrInner::addr) - } -} - -/// A `SocketAddr` with an associated latency. -#[derive(Debug, Clone)] -pub struct AddrLatency { - pub addr: SocketAddr, - pub latency: Duration, -} - -impl AddrLatency { - /// Reports whether `self` is a better addr to use than `other`. - fn is_better_than(&self, other: &Self) -> bool { - if self.addr == other.addr { - return false; - } - if self.addr.is_ipv6() && other.addr.is_ipv4() { - // Prefer IPv6 for being a bit more robust, as long as - // the latencies are roughly equivalent. - if self.latency / 10 * 9 < other.latency { - return true; - } - } else if self.addr.is_ipv4() && other.addr.is_ipv6() && other.is_better_than(self) { - return false; - } - self.latency < other.latency - } -} diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index adbd155a791..b96bfe33fd7 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -2,6 +2,7 @@ use std::{ collections::{btree_map::Entry, BTreeSet, HashMap}, hash::Hash, net::{IpAddr, SocketAddr}, + sync::atomic::AtomicBool, }; use data_encoding::HEXLOWER; @@ -16,7 +17,6 @@ use tokio::sync::mpsc; use tracing::{debug, event, info, instrument, trace, warn, Level}; use super::{ - best_addr::{self, ClearReason, Source as BestAddrSource}, path_state::{summarize_node_paths, PathState}, udp_paths::{NodeUdpPaths, UdpSendAddr}, IpPort, Source, @@ -25,7 +25,10 @@ use super::{ use crate::endpoint::PathSelection; use crate::{ disco::{self, SendAddr}, - magicsock::{ActorMessage, MagicsockMetrics, NodeIdMappedAddr, HEARTBEAT_INTERVAL}, + magicsock::{ + node_map::path_validity::PathValidity, ActorMessage, MagicsockMetrics, NodeIdMappedAddr, + HEARTBEAT_INTERVAL, + }, watchable::{Watchable, Watcher}, }; @@ -137,7 +140,7 @@ pub(super) struct NodeState { /// Whether the conn_type was ever observed to be `Direct` at some point. /// /// Used for metric reporting. - has_been_direct: bool, + has_been_direct: AtomicBool, /// Configuration for what path selection to use #[cfg(any(test, feature = "test-utils"))] path_selection: PathSelection, @@ -184,7 +187,7 @@ impl NodeState { last_used: options.active.then(Instant::now), last_call_me_maybe: None, conn_type: Watchable::new(ConnectionType::None), - has_been_direct: false, + has_been_direct: AtomicBool::new(false), #[cfg(any(test, feature = "test-utils"))] path_selection: options.path_selection, } @@ -242,7 +245,7 @@ impl NodeState { .iter() .map(|(addr, path_state)| DirectAddrInfo { addr: SocketAddr::from(*addr), - latency: path_state.recent_pong.as_ref().map(|pong| pong.latency), + latency: path_state.validity.get_pong().map(|pong| pong.latency), last_control: path_state.last_control_msg(now), last_payload: path_state .last_payload_msg @@ -278,8 +281,7 @@ impl NodeState { /// /// This may return to send on one, both or no paths. fn addr_for_send( - &mut self, - now: &Instant, + &self, have_ipv6: bool, metrics: &MagicsockMetrics, ) -> (Option, Option) { @@ -288,22 +290,22 @@ impl NodeState { debug!("in `RelayOnly` mode, giving the relay address as the only viable address for this endpoint"); return (None, self.relay_url()); } - let (best_addr, relay_url) = match self.udp_paths.send_addr(*now, have_ipv6) { + let (best_addr, relay_url) = match self.udp_paths.send_addr(have_ipv6) { UdpSendAddr::Valid(addr) => { // If we have a valid address we use it. trace!(%addr, "UdpSendAddr is valid, use it"); - (Some(addr), None) + (Some(*addr), None) } UdpSendAddr::Outdated(addr) => { // If the address is outdated we use it, but send via relay at the same time. // We also send disco pings so that it will become valid again if it still // works (i.e. we don't need to holepunch again). trace!(%addr, "UdpSendAddr is outdated, use it together with relay"); - (Some(addr), self.relay_url()) + (Some(*addr), self.relay_url()) } UdpSendAddr::Unconfirmed(addr) => { trace!(%addr, "UdpSendAddr is unconfirmed, use it together with relay"); - (Some(addr), self.relay_url()) + (Some(*addr), self.relay_url()) } UdpSendAddr::None => { trace!("No UdpSendAddr, use relay"); @@ -316,9 +318,13 @@ impl NodeState { (None, Some(relay_url)) => ConnectionType::Relay(relay_url), (None, None) => ConnectionType::None, }; - if !self.has_been_direct && matches!(&typ, ConnectionType::Direct(_)) { - self.has_been_direct = true; - metrics.nodes_contacted_directly.inc(); + if matches!(&typ, ConnectionType::Direct(_)) { + let before = self + .has_been_direct + .swap(true, std::sync::atomic::Ordering::Relaxed); + if !before { + metrics.nodes_contacted_directly.inc(); + } } if let Ok(prev_typ) = self.conn_type.set(typ.clone()) { // The connection type has changed. @@ -365,21 +371,17 @@ impl NodeState { /// Removes a direct address for this node. /// /// If this is also the best address, it will be cleared as well. - pub(super) fn remove_direct_addr(&mut self, ip_port: &IpPort, reason: ClearReason) { + pub(super) fn remove_direct_addr(&mut self, ip_port: &IpPort, now: Instant, why: &'static str) { let Some(state) = self.udp_paths.paths.remove(ip_port) else { return; }; match state.last_alive().map(|instant| instant.elapsed()) { - Some(last_alive) => debug!(%ip_port, ?last_alive, ?reason, "pruning address"), - None => debug!(%ip_port, last_seen=%"never", ?reason, "pruning address"), + Some(last_alive) => debug!(%ip_port, ?last_alive, why, "pruning address"), + None => debug!(%ip_port, last_seen=%"never", why, "pruning address"), } - self.udp_paths.best_addr.clear_if_equals( - (*ip_port).into(), - reason, - self.relay_url.is_some(), - ); + self.udp_paths.update_to_best_addr(now); } /// Whether we need to send another call-me-maybe to the endpoint. @@ -397,20 +399,27 @@ impl NodeState { debug!("no previous full ping: need full ping"); return true; }; - match self.udp_paths.best_addr.state(*now) { - best_addr::State::Empty => { + match &self.udp_paths.best { + UdpSendAddr::None | UdpSendAddr::Unconfirmed(_) => { debug!("best addr not set: need full ping"); true } - best_addr::State::Outdated(_) => { + UdpSendAddr::Outdated(_) => { debug!("best addr expired: need full ping"); true } - best_addr::State::Valid(addr) => { - if addr.latency > GOOD_ENOUGH_LATENCY && *now - last_full_ping >= UPGRADE_INTERVAL { + UdpSendAddr::Valid(addr) => { + let latency = self + .udp_paths + .paths + .get(&(*addr).into()) + .expect("send path not tracked?") + .latency() + .expect("send_addr marked valid incorrectly"); + if latency > GOOD_ENOUGH_LATENCY && *now - last_full_ping >= UPGRADE_INTERVAL { debug!( "full ping interval expired and latency is only {}ms: need full ping", - addr.latency.as_millis() + latency.as_millis() ); true } else { @@ -429,7 +438,7 @@ impl NodeState { /// Cleanup the expired ping for the passed in txid. #[instrument("disco", skip_all, fields(node = %self.node_id.fmt_short()))] - pub(super) fn ping_timeout(&mut self, txid: stun::TransactionId) { + pub(super) fn ping_timeout(&mut self, txid: stun::TransactionId, now: Instant) { if let Some(sp) = self.sent_pings.remove(&txid) { debug!(tx = %HEXLOWER.encode(&txid), addr = %sp.to, "pong not received in timeout"); match sp.to { @@ -445,21 +454,13 @@ impl NodeState { // which we should have received the pong, clear best addr and // pong. Both are used to select this path again, but we know // it's not a usable path now. - path_state.recent_pong = None; - self.udp_paths.best_addr.clear_if_equals( - addr, - ClearReason::PongTimeout, - self.relay_url().is_some(), - ) + path_state.validity = PathValidity::empty(); + self.udp_paths.update_to_best_addr(now); } } else { // If we have no state for the best addr it should have been cleared // anyway. - self.udp_paths.best_addr.clear_if_equals( - addr, - ClearReason::PongTimeout, - self.relay_url.is_some(), - ); + self.udp_paths.update_to_best_addr(now); } } SendAddr::Relay(ref url) => { @@ -635,7 +636,7 @@ impl NodeState { return ping_msgs; } - self.prune_direct_addresses(); + self.prune_direct_addresses(now); let mut ping_dsts = String::from("["); self.udp_paths .paths @@ -667,7 +668,10 @@ impl NodeState { source: super::Source, metrics: &MagicsockMetrics, ) { - if self.udp_paths.best_addr.is_empty() { + if matches!( + self.udp_paths.best, + UdpSendAddr::None | UdpSendAddr::Unconfirmed(_) + ) { // we do not have a direct connection, so changing the relay information may // have an effect on our connection status if self.relay_url.is_none() && new_relay_url.is_some() { @@ -705,6 +709,7 @@ impl NodeState { PathState::new(self.node_id, SendAddr::from(addr), source.clone(), now) }); } + self.udp_paths.update_to_best_addr(now); let paths = summarize_node_paths(&self.udp_paths.paths); debug!(new = ?new_addrs , %paths, "added new direct paths for endpoint"); } @@ -713,13 +718,12 @@ impl NodeState { #[instrument(skip_all, fields(node = %self.node_id.fmt_short()))] pub(super) fn reset(&mut self) { self.last_full_ping = None; - self.udp_paths - .best_addr - .clear(ClearReason::Reset, self.relay_url.is_some()); for es in self.udp_paths.paths.values_mut() { es.last_ping = None; } + + self.udp_paths.update_to_best_addr(Instant::now()); } /// Handle a received Disco Ping. @@ -799,14 +803,14 @@ impl NodeState { ); if matches!(path, SendAddr::Udp(_)) && matches!(role, PingRole::NewPath) { - self.prune_direct_addresses(); + self.prune_direct_addresses(now); } - // if the endpoint does not yet have a best_addrr + // if the endpoint does not yet have a best_addr let needs_ping_back = if matches!(path, SendAddr::Udp(_)) && matches!( - self.udp_paths.best_addr.state(now), - best_addr::State::Empty | best_addr::State::Outdated(_) + self.udp_paths.best, + UdpSendAddr::None | UdpSendAddr::Unconfirmed(_) | UdpSendAddr::Outdated(_) ) { // We also need to send a ping to make this path available to us as well. This // is always sent together with a pong. So in the worst case the pong gets lost @@ -834,7 +838,7 @@ impl NodeState { /// /// This trims the list of inactive paths for an endpoint. At most /// [`MAX_INACTIVE_DIRECT_ADDRESSES`] are kept. - pub(super) fn prune_direct_addresses(&mut self) { + pub(super) fn prune_direct_addresses(&mut self, now: Instant) { // prune candidates are addresses that are not active let mut prune_candidates: Vec<_> = self .udp_paths @@ -864,7 +868,7 @@ impl NodeState { prune_candidates.sort_unstable_by_key(|(_ip_port, last_alive)| *last_alive); prune_candidates.truncate(prune_count); for (ip_port, _last_alive) in prune_candidates.into_iter() { - self.remove_direct_addr(&ip_port, ClearReason::Inactive) + self.remove_direct_addr(&ip_port, now, "inactive"); } debug!( paths = %summarize_node_paths(&self.udp_paths.paths), @@ -875,11 +879,11 @@ impl NodeState { /// Called when connectivity changes enough that we should question our earlier /// assumptions about which paths work. #[instrument("disco", skip_all, fields(node = %self.node_id.fmt_short()))] - pub(super) fn note_connectivity_change(&mut self) { - self.udp_paths.best_addr.clear_trust("connectivity changed"); + pub(super) fn note_connectivity_change(&mut self, now: Instant) { for es in self.udp_paths.paths.values_mut() { es.clear(); } + self.udp_paths.update_to_best_addr(now); } /// Handles a Pong message (a reply to an earlier ping). @@ -894,7 +898,7 @@ impl NodeState { event!( target: "iroh::_events::pong::recv", Level::DEBUG, - remote_node = self.node_id.fmt_short(), + remote_node = %self.node_id.fmt_short(), ?src, txn = ?m.tx_id, ); @@ -971,14 +975,9 @@ impl NodeState { // Promote this pong response to our current best address if it's lower latency. // TODO(bradfitz): decide how latency vs. preference order affects decision - if let SendAddr::Udp(to) = sp.to { + if let SendAddr::Udp(_to) = sp.to { debug_assert!(!is_relay, "mismatching relay & udp"); - self.udp_paths.best_addr.insert_if_better_or_reconfirm( - to, - latency, - best_addr::Source::ReceivedPong, - now, - ); + self.udp_paths.update_to_best_addr(now); } node_map_insert @@ -1033,22 +1032,17 @@ impl NodeState { if !call_me_maybe_ipps.contains(ipp) { // TODO: This seems like a weird way to signal that the endpoint no longer // thinks it has this IpPort as an available path. - if st.recent_pong.is_some() { + if !st.validity.is_empty() { debug!(path=?ipp ,"clearing recent pong"); - st.recent_pong = None; + st.validity = PathValidity::empty(); } } } - // Clear trust on our best_addr if it is not included in the updated set. Also - // clear the last call-me-maybe send time so we will send one again. - if let Some(addr) = self.udp_paths.best_addr.addr() { - let ipp: IpPort = addr.into(); - if !call_me_maybe_ipps.contains(&ipp) { - self.udp_paths - .best_addr - .clear_trust("best_addr not in new call-me-maybe"); - self.last_call_me_maybe = None; - } + // Clear trust on our best_addr if it is not included in the updated set. + let changed = self.udp_paths.update_to_best_addr(now); + if changed { + // Clear the last call-me-maybe send time so we will send one again. + self.last_call_me_maybe = None; } debug!( paths = %summarize_node_paths(&self.udp_paths.paths), @@ -1064,18 +1058,16 @@ impl NodeState { debug_assert!(false, "node map inconsistency by_ip_port <-> direct addr"); return; }; - state.last_payload_msg = Some(now); + state.receive_payload(now); self.last_used = Some(now); - self.udp_paths - .best_addr - .reconfirm_if_used(addr.into(), BestAddrSource::Udp, now); + self.udp_paths.update_to_best_addr(now); } pub(super) fn receive_relay(&mut self, url: &RelayUrl, src: NodeId, now: Instant) { match self.relay_url.as_mut() { Some((current_home, state)) if current_home == url => { // We received on the expected url. update state. - state.last_payload_msg = Some(now); + state.receive_payload(now); } Some((_current_home, _state)) => { // we have a different url. we only update on ping, not on receive_relay. @@ -1136,7 +1128,7 @@ impl NodeState { } // Send heartbeat ping to keep the current addr going as long as we need it. - if let Some(udp_addr) = self.udp_paths.best_addr.addr() { + if let Some(udp_addr) = self.udp_paths.best.get_addr() { let elapsed = self.last_ping(&SendAddr::Udp(udp_addr)).map(|l| now - l); // Send a ping if the last ping is older than 2 seconds. let needs_ping = match elapsed { @@ -1164,6 +1156,8 @@ impl NodeState { /// Returns the addresses on which a payload should be sent right now. /// /// This is in the hot path of `.poll_send()`. + // TODO(matheus23): Make this take &self. That's not quite possible yet due to `send_call_me_maybe` + // eventually calling `prune_direct_addresses` (which needs &mut self) #[instrument("get_send_addrs", skip_all, fields(node = %self.node_id.fmt_short()))] pub(crate) fn get_send_addrs( &mut self, @@ -1176,7 +1170,7 @@ impl NodeState { // this is the first time we are trying to connect to this node metrics.nodes_contacted.inc(); } - let (udp_addr, relay_url) = self.addr_for_send(&now, have_ipv6, metrics); + let (udp_addr, relay_url) = self.addr_for_send(have_ipv6, metrics); let mut ping_msgs = Vec::new(); if self.want_call_me_maybe(&now) { @@ -1457,7 +1451,6 @@ pub enum ConnectionType { mod tests { use std::{collections::BTreeMap, net::Ipv4Addr}; - use best_addr::BestAddr; use iroh_base::SecretKey; use super::*; @@ -1514,18 +1507,13 @@ mod tests { relay_url: None, udp_paths: NodeUdpPaths::from_parts( endpoint_state, - BestAddr::from_parts( - ip_port.into(), - latency, - now, - now + Duration::from_secs(100), - ), + UdpSendAddr::Valid(ip_port.into()), ), sent_pings: HashMap::new(), last_used: Some(now), last_call_me_maybe: None, conn_type: Watchable::new(ConnectionType::Direct(ip_port.into())), - has_been_direct: true, + has_been_direct: AtomicBool::new(true), #[cfg(any(test, feature = "test-utils"))] path_selection: PathSelection::default(), }, @@ -1547,7 +1535,7 @@ mod tests { last_used: Some(now), last_call_me_maybe: None, conn_type: Watchable::new(ConnectionType::Relay(send_addr.clone())), - has_been_direct: false, + has_been_direct: AtomicBool::new(false), #[cfg(any(test, feature = "test-utils"))] path_selection: PathSelection::default(), } @@ -1576,7 +1564,7 @@ mod tests { last_used: Some(now), last_call_me_maybe: None, conn_type: Watchable::new(ConnectionType::Relay(send_addr.clone())), - has_been_direct: false, + has_been_direct: AtomicBool::new(false), #[cfg(any(test, feature = "test-utils"))] path_selection: PathSelection::default(), } @@ -1585,7 +1573,6 @@ mod tests { // endpoint w/ expired best addr and relay w/ latency let (d_endpoint, d_socket_addr) = { let socket_addr: SocketAddr = "0.0.0.0:7".parse().unwrap(); - let expired = now.checked_sub(Duration::from_secs(100)).unwrap(); let key = SecretKey::generate(rand::thread_rng()); let node_id = key.public(); let endpoint_state = BTreeMap::from([( @@ -1609,7 +1596,7 @@ mod tests { relay_url: relay_and_state(key.public(), send_addr.clone()), udp_paths: NodeUdpPaths::from_parts( endpoint_state, - BestAddr::from_parts(socket_addr, Duration::from_millis(80), now, expired), + UdpSendAddr::Outdated(socket_addr), ), sent_pings: HashMap::new(), last_used: Some(now), @@ -1618,7 +1605,7 @@ mod tests { socket_addr, send_addr.clone(), )), - has_been_direct: false, + has_been_direct: AtomicBool::new(false), #[cfg(any(test, feature = "test-utils"))] path_selection: PathSelection::default(), }, diff --git a/iroh/src/magicsock/node_map/path_state.rs b/iroh/src/magicsock/node_map/path_state.rs index 7241121722a..025658a7c04 100644 --- a/iroh/src/magicsock/node_map/path_state.rs +++ b/iroh/src/magicsock/node_map/path_state.rs @@ -1,9 +1,6 @@ //! The state kept for each network path to a remote node. -use std::{ - collections::{BTreeMap, HashMap}, - net::SocketAddr, -}; +use std::collections::{BTreeMap, HashMap}; use iroh_base::NodeId; use iroh_relay::protos::stun; @@ -14,7 +11,13 @@ use super::{ node_state::{ControlMsg, PongReply, SESSION_ACTIVE_TIMEOUT}, IpPort, PingRole, Source, }; -use crate::{disco::SendAddr, magicsock::HEARTBEAT_INTERVAL}; +use crate::{ + disco::SendAddr, + magicsock::{ + node_map::path_validity::{self, PathValidity}, + HEARTBEAT_INTERVAL, + }, +}; /// The minimum time between pings to an endpoint. /// @@ -27,7 +30,7 @@ const DISCO_PING_INTERVAL: Duration = Duration::from_secs(5); /// This state is used for both the relay path and any direct UDP paths. /// /// [`NodeState`]: super::node_state::NodeState -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone)] pub(super) struct PathState { /// The node for which this path exists. node_id: NodeId, @@ -44,11 +47,12 @@ pub(super) struct PathState { /// The time this endpoint was last advertised via a call-me-maybe DISCO message. pub(super) call_me_maybe_time: Option, - /// The most recent [`PongReply`]. + /// Tracks whether this path is valid. + /// + /// Also stores the latest [`PongReply`], if there is one. /// - /// Previous replies are cleared when they are no longer relevant to determine whether - /// this path can still be used to reach the remote node. - pub(super) recent_pong: Option, + /// See [`PathValidity`] docs. + pub(super) validity: PathValidity, /// When the last payload data was **received** via this path. /// /// This excludes DISCO messages. @@ -71,19 +75,12 @@ impl PathState { last_ping: None, last_got_ping: None, call_me_maybe_time: None, - recent_pong: None, + validity: PathValidity::empty(), last_payload_msg: None, sources, } } - pub(super) fn udp_addr(&self) -> Option { - match self.path { - SendAddr::Udp(addr) => Some(addr), - SendAddr::Relay(_) => None, - } - } - pub(super) fn with_last_payload( node_id: NodeId, path: SendAddr, @@ -98,7 +95,7 @@ impl PathState { last_ping: None, last_got_ping: None, call_me_maybe_time: None, - recent_pong: None, + validity: PathValidity::empty(), last_payload_msg: Some(now), sources, } @@ -118,7 +115,7 @@ impl PathState { pub(super) fn add_pong_reply(&mut self, r: PongReply) { if let SendAddr::Udp(ref path) = self.path { - if self.recent_pong.is_none() { + if self.validity.is_empty() { event!( target: "iroh::_events::holepunched", Level::DEBUG, @@ -128,7 +125,14 @@ impl PathState { ); } } - self.recent_pong = Some(r); + + self.validity = PathValidity::new(r); + } + + pub(super) fn receive_payload(&mut self, now: Instant) { + self.last_payload_msg = Some(now); + self.validity + .receive_payload(now, path_validity::Source::QuicPayload); } #[cfg(test)] @@ -139,7 +143,7 @@ impl PathState { last_ping: None, last_got_ping: None, call_me_maybe_time: None, - recent_pong: Some(r), + validity: PathValidity::new(r), last_payload_msg: None, sources: HashMap::new(), } @@ -176,8 +180,8 @@ impl PathState { /// - When the last payload transmission occurred. /// - when the last ping from them was received. pub(super) fn last_alive(&self) -> Option { - self.recent_pong - .as_ref() + self.validity + .get_pong() .map(|pong| &pong.pong_at) .into_iter() .chain(self.last_payload_msg.as_ref()) @@ -198,8 +202,8 @@ impl PathState { pub(super) fn last_control_msg(&self, now: Instant) -> Option<(Duration, ControlMsg)> { // get every control message and assign it its kind let last_pong = self - .recent_pong - .as_ref() + .validity + .get_pong() .map(|pong| (pong.pong_at, ControlMsg::Pong)); let last_call_me_maybe = self .call_me_maybe_time @@ -219,7 +223,7 @@ impl PathState { /// Returns the latency from the most recent pong, if available. pub(super) fn latency(&self) -> Option { - self.recent_pong.as_ref().map(|p| p.latency) + self.validity.get_pong().map(|p| p.latency) } pub(super) fn needs_ping(&self, now: &Instant) -> bool { @@ -280,7 +284,7 @@ impl PathState { self.last_ping = None; self.last_got_ping = None; self.call_me_maybe_time = None; - self.recent_pong = None; + self.validity = PathValidity::empty(); } fn summary(&self, mut w: impl std::fmt::Write) -> std::fmt::Result { @@ -288,7 +292,7 @@ impl PathState { if self.is_active() { write!(w, "active ")?; } - if let Some(ref pong) = self.recent_pong { + if let Some(pong) = self.validity.get_pong() { write!(w, "pong-received({:?} ago) ", pong.pong_at.elapsed())?; } if let Some(when) = self.last_incoming_ping() { diff --git a/iroh/src/magicsock/node_map/path_validity.rs b/iroh/src/magicsock/node_map/path_validity.rs new file mode 100644 index 00000000000..ed91f266aae --- /dev/null +++ b/iroh/src/magicsock/node_map/path_validity.rs @@ -0,0 +1,128 @@ +use n0_future::time::{Duration, Instant}; + +use crate::magicsock::node_map::node_state::PongReply; + +/// How long we trust a UDP address as the exclusive path (i.e. without also sending via the relay). +/// +/// Trust for a UDP address begins when we receive a DISCO UDP pong on that address. +/// It is then further extended by this duration every time we receive QUIC payload data while it's +/// currently trusted. +/// +/// If trust goes away, it can be brought back with another valid DISCO UDP pong. +const TRUST_UDP_ADDR_DURATION: Duration = Duration::from_millis(6500); + +/// Tracks a path's validity. +/// +/// A path is valid: +/// - For [`Source::trust_duration`] after a successful [`PongReply`]. +/// - For [`Source::trust_duration`] longer starting at the most recent +/// received application payload *while the path was valid*. +#[derive(Debug, Clone, Default)] +pub(super) struct PathValidity(Option); + +#[derive(Debug, Clone)] +struct Inner { + recent_pong: PongReply, + confirmed_at: Instant, + trust_until: Instant, +} + +#[derive(Debug)] +pub(super) enum Source { + ReceivedPong, + QuicPayload, +} + +impl Source { + fn trust_duration(&self) -> Duration { + match self { + Source::ReceivedPong => TRUST_UDP_ADDR_DURATION, + Source::QuicPayload => TRUST_UDP_ADDR_DURATION, + } + } +} + +impl PathValidity { + pub(super) fn new(recent_pong: PongReply) -> Self { + Self(Some(Inner { + confirmed_at: recent_pong.pong_at, + trust_until: recent_pong.pong_at + Source::ReceivedPong.trust_duration(), + recent_pong, + })) + } + + pub(super) fn empty() -> Self { + Self(None) + } + + pub(super) fn is_empty(&self) -> bool { + self.0.is_none() + } + + pub(super) fn is_valid(&self, now: Instant) -> bool { + let Some(state) = self.0.as_ref() else { + return false; + }; + + state.is_valid(now) + } + + pub(super) fn latency_if_valid(&self, now: Instant) -> Option { + let Some(state) = self.0.as_ref() else { + return None; + }; + + state.is_valid(now).then_some(state.recent_pong.latency) + } + + pub(super) fn is_outdated(&self, now: Instant) -> bool { + let Some(state) = self.0.as_ref() else { + return false; + }; + + // We *used* to be valid, but are now outdated. + // This happens when we had a DISCO pong but didn't receive + // any payload data or further pongs for at least TRUST_UDP_ADDR_DURATION + state.is_outdated(now) + } + + pub(super) fn latency_if_outdated(&self, now: Instant) -> Option { + let Some(state) = self.0.as_ref() else { + return None; + }; + + state.is_outdated(now).then_some(state.recent_pong.latency) + } + + /// Reconfirms path validity, if a payload was received while the + /// path was valid. + pub(super) fn receive_payload(&mut self, now: Instant, source: Source) { + let Some(state) = self.0.as_mut() else { + return; + }; + + if state.is_valid(now) { + state.confirmed_at = now; + state.trust_until = now + source.trust_duration(); + } + } + + pub(super) fn get_pong(&self) -> Option<&PongReply> { + self.0.as_ref().map(|inner| &inner.recent_pong) + } + + // TODO(matheus23): Use this to bias the choice of best outdated addr maybe? + // pub(super) fn confirmed_at(&self) -> Option { + // self.0.as_ref().map(|inner| inner.confirmed_at) + // } +} + +impl Inner { + fn is_valid(&self, now: Instant) -> bool { + self.confirmed_at <= now && now < self.trust_until + } + + fn is_outdated(&self, now: Instant) -> bool { + self.confirmed_at <= now && self.trust_until <= now + } +} diff --git a/iroh/src/magicsock/node_map/udp_paths.rs b/iroh/src/magicsock/node_map/udp_paths.rs index 0d96c3bba0b..12cd7ee8e04 100644 --- a/iroh/src/magicsock/node_map/udp_paths.rs +++ b/iroh/src/magicsock/node_map/udp_paths.rs @@ -7,17 +7,10 @@ //! [`NodeState`]: super::node_state::NodeState use std::{collections::BTreeMap, net::SocketAddr}; -use n0_future::time::{Duration, Instant}; -use rand::seq::IteratorRandom; -use tracing::warn; +use n0_future::time::Instant; +use tracing::{event, Level}; -use super::{ - best_addr::{self, BestAddr}, - node_state::PongReply, - path_state::PathState, - IpPort, -}; -use crate::disco::SendAddr; +use super::{path_state::PathState, IpPort}; /// The address on which to send datagrams over UDP. /// @@ -32,7 +25,7 @@ use crate::disco::SendAddr; /// /// [`MagicSock`]: crate::magicsock::MagicSock /// [`NodeState`]: super::node_state::NodeState -#[derive(Debug)] +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] pub(super) enum UdpSendAddr { /// The UDP address can be relied on to deliver data to the remote node. /// @@ -54,9 +47,21 @@ pub(super) enum UdpSendAddr { /// establish a connection. Unconfirmed(SocketAddr), /// No known UDP path exists to the remote node. + #[default] None, } +impl UdpSendAddr { + pub fn get_addr(&self) -> Option { + match self { + UdpSendAddr::Valid(addr) + | UdpSendAddr::Outdated(addr) + | UdpSendAddr::Unconfirmed(addr) => Some(*addr), + UdpSendAddr::None => None, + } + } +} + /// The UDP paths for a single node. /// /// Paths are identified by the [`IpPort`] of their UDP address. @@ -72,10 +77,19 @@ pub(super) enum UdpSendAddr { pub(super) struct NodeUdpPaths { /// The state for each of this node's direct paths. pub(super) paths: BTreeMap, - /// Best UDP path currently selected. - pub(super) best_addr: BestAddr, - /// If we had to choose a path because we had no `best_addr` it is stored here. - chosen_candidate: Option, + /// The current address we use to send on. + /// + /// This is *almost* the same as going through `paths` and finding + /// the best one, except that this is + /// 1. Not updated in `send_addr`, but instead when there's changes to `paths`, so that `send_addr` can take `&self`. + /// 2. Slightly sticky: It only changes when + /// - the current send addr is not a validated path anymore or + /// - we received a pong with lower latency. + pub(super) best: UdpSendAddr, + /// The current best address to send on from all IPv4 addresses we have available. + /// + /// Follows the same logic as `best` above, but doesn't include any IPv6 addresses. + pub(super) best_ipv4: UdpSendAddr, } impl NodeUdpPaths { @@ -84,97 +98,100 @@ impl NodeUdpPaths { } #[cfg(test)] - pub(super) fn from_parts(paths: BTreeMap, best_addr: BestAddr) -> Self { + pub(super) fn from_parts(paths: BTreeMap, best: UdpSendAddr) -> Self { Self { paths, - best_addr, - chosen_candidate: None, + best_ipv4: best, // we only use ipv4 addrs in tests + best, } } /// Returns the current UDP address to send on. - /// - /// TODO: The goal here is for this to simply return the already known send address, so - /// it should be `&self` and not `&mut self`. This is only possible once the state from - /// [`NodeUdpPaths`] is no longer modified from outside. - pub(super) fn send_addr(&mut self, now: Instant, have_ipv6: bool) -> UdpSendAddr { - self.assign_best_addr_from_candidates_if_empty(); - match self.best_addr.state(now) { - best_addr::State::Valid(addr) => UdpSendAddr::Valid(addr.addr), - best_addr::State::Outdated(addr) => UdpSendAddr::Outdated(addr.addr), - best_addr::State::Empty => { - // No direct connection has been used before. If we know of any possible - // candidate addresses, randomly try to use one. This path is most - // effective when folks use a NodeAddr with exactly one direct address which - // they know to work, effectively like using a traditional socket or QUIC - // endpoint. - let addr = self - .chosen_candidate - .and_then(|ipp| self.paths.get(&ipp)) - .and_then(|path| path.udp_addr()) - .filter(|addr| addr.is_ipv4() || have_ipv6) - .or_else(|| { - // Look for a new candidate in all the known paths. This may look - // like a RNG use on the hot-path but this is normally invoked at - // most most once at startup. - let addr = self - .paths - .values() - .filter_map(|path| path.udp_addr()) - .filter(|addr| addr.is_ipv4() || have_ipv6) - .choose(&mut rand::thread_rng()); - self.chosen_candidate = addr.map(IpPort::from); - addr - }); - match addr { - Some(addr) => UdpSendAddr::Unconfirmed(addr), - None => UdpSendAddr::None, - } - } + pub(super) fn send_addr(&self, have_ipv6: bool) -> &UdpSendAddr { + if !have_ipv6 { + return &self.best_ipv4; } + &self.best } - /// Fixup best_addr from candidates. + /// Changes the current best address(es) to ones chosen as described in [`Self::best_addr`] docs. + /// + /// Returns whether one of the best addresses had to change. /// - /// If somehow we end up in a state where we failed to set a best_addr, while we do have - /// valid candidates, this will chose a candidate and set best_addr again. Most likely - /// this is a bug elsewhere though. - fn assign_best_addr_from_candidates_if_empty(&mut self) { - if !self.best_addr.is_empty() { - return; + /// This should be called any time that `paths` is modified. + pub(super) fn update_to_best_addr(&mut self, now: Instant) -> bool { + let best_ipv4 = self.best_addr(false, now); + let best = self.best_addr(true, now); + let mut changed = false; + if best_ipv4 != self.best_ipv4 { + event!( + target: "iroh::_events::udp::best_ipv4", + Level::DEBUG, + ?best_ipv4, + ); + changed = true; } + if best != self.best { + event!( + target: "iroh::_events::udp::best", + Level::DEBUG, + ?best, + ); + changed = true; + } + self.best_ipv4 = best_ipv4; + self.best = best; + changed + } - // The highest acceptable latency for an endpoint path. If the latency is higher - // then this the path will be ignored. - const MAX_LATENCY: Duration = Duration::from_secs(60 * 60); - let best_pong = self.paths.iter().fold(None, |best_pong, (ipp, state)| { - let best_latency = best_pong - .map(|p: &PongReply| p.latency) - .unwrap_or(MAX_LATENCY); - match state.recent_pong { - // This pong is better if it has a lower latency, or if it has the same - // latency but on an IPv6 path. - Some(ref pong) - if pong.latency < best_latency - || (pong.latency == best_latency && ipp.ip().is_ipv6()) => - { - Some(pong) + pub(super) fn best_addr(&self, have_ipv6: bool, now: Instant) -> UdpSendAddr { + let Some((ipp, path)) = self + .paths + .iter() + .filter(|(ipp, _)| have_ipv6 || ipp.ip.is_ipv4()) + .max_by_key(|(ipp, path)| { + // We find the best by sorting on a key of type (Option>, Option>, bool) + // where the first is set to Some(ReverseOrd(latency)) iff path.is_valid(now) and + // the second is set to Some(ReverseOrd(latency)) if path.is_outdated(now) and + // the third is set to whether the ipp is ipv6. + // This makes max_by_key sort for the lowest valid latency first, then sort for + // the lowest outdated latency second, and if latencies are equal, it'll sort IPv6 paths first. + let is_ipv6 = ipp.ip.is_ipv6(); + if let Some(latency) = path.validity.latency_if_valid(now) { + (Some(ReverseOrd(latency)), None, is_ipv6) + } else if let Some(latency) = path.validity.latency_if_outdated(now) { + (None, Some(ReverseOrd(latency)), is_ipv6) + } else { + (None, None, is_ipv6) } - _ => best_pong, - } - }); + }) + else { + return UdpSendAddr::None; + }; - // If we found a candidate, set to best addr - if let Some(pong) = best_pong { - if let SendAddr::Udp(addr) = pong.from { - warn!(%addr, "No best_addr was set, choose candidate with lowest latency"); - self.best_addr.insert_if_better_or_reconfirm( - addr, - pong.latency, - best_addr::Source::BestCandidate, - pong.pong_at, - ) - } + if path.validity.is_valid(now) { + UdpSendAddr::Valid((*ipp).into()) + } else if path.validity.is_outdated(now) { + UdpSendAddr::Outdated((*ipp).into()) + } else { + UdpSendAddr::Unconfirmed((*ipp).into()) } } } + +#[derive(PartialEq, Eq)] +struct ReverseOrd(N); + +impl Ord for ReverseOrd { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.0.cmp(&other.0).reverse() + } +} + +impl PartialOrd for ReverseOrd { + fn partial_cmp(&self, other: &Self) -> Option { + self.0 + .partial_cmp(&other.0) + .map(std::cmp::Ordering::reverse) + } +} diff --git a/iroh/tests/integration.rs b/iroh/tests/integration.rs index 0ccac01cb2f..a0ed467ba71 100644 --- a/iroh/tests/integration.rs +++ b/iroh/tests/integration.rs @@ -36,8 +36,15 @@ const ECHO_ALPN: &[u8] = b"echo"; async fn simple_node_id_based_connection_transfer() -> TestResult { setup_logging(); - let client = Endpoint::builder().discovery_n0().bind().await?; + let relay_map = iroh::defaults::prod::default_relay_map(); + + let client = Endpoint::builder() + .relay_mode(iroh::RelayMode::Custom(relay_map.clone())) + .discovery_n0() + .bind() + .await?; let server = Endpoint::builder() + .relay_mode(iroh::RelayMode::Custom(relay_map)) .discovery_n0() .alpns(vec![ECHO_ALPN.to_vec()]) .bind()