From 203b55974f337329560605ad1103244c0905a84d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Fri, 18 Jul 2025 18:28:50 +0200 Subject: [PATCH 01/11] fix(iroh): Keep track of non-best-addr path validation and switch to them if best addr is soon outdated --- iroh/src/magicsock/node_map/best_addr.rs | 91 +++++++++++++++++++---- iroh/src/magicsock/node_map/node_state.rs | 15 +++- iroh/src/magicsock/node_map/path_state.rs | 58 ++++++++++++++- iroh/src/magicsock/node_map/udp_paths.rs | 5 +- 4 files changed, 148 insertions(+), 21 deletions(-) diff --git a/iroh/src/magicsock/node_map/best_addr.rs b/iroh/src/magicsock/node_map/best_addr.rs index 18d9ef960be..7e846bf1676 100644 --- a/iroh/src/magicsock/node_map/best_addr.rs +++ b/iroh/src/magicsock/node_map/best_addr.rs @@ -6,7 +6,51 @@ use n0_future::time::{Duration, Instant}; use tracing::{debug, info}; /// How long we trust a UDP address as the exclusive path (without using relay) without having heard a Pong reply. -const TRUST_UDP_ADDR_DURATION: Duration = Duration::from_millis(6500); +pub(super) const TRUST_UDP_ADDR_DURATION: Duration = Duration::from_millis(6500); + +/// The grace period at which we consider switching away from our best addr +/// to another address that we've received data on. +/// +/// The trusted address lifecycle goes as follows: +/// - A UDP DICSO pong is received, this validates that the path is for sure valid. +/// - The disco path that seems to have the lowest latency is the path we use to send on. +/// - We trust this path as a path to send on for at least TRUST_UDP_ADDR_DURATION. +/// - This time is extended every time we receive a UDP DISCO pong on the address. +/// - This time is *also* extended every time we receive *application* payloads on this +/// address (i.e. QUIC datagrams). +/// - If our best address becomes outdated (TRUST_UDP_ADDR_DURATION expires) without +/// another pong or payload data, then we'll start sending over the relay, too! +/// (we switch to ConnectionType::Mixed) +/// +/// However, we might not get any UDP DISCO pongs because they're UDP packets and get +/// lost under e.g. high load. +/// +/// This is usually fine, because we also receive on the best addr, and that extends its +/// "trust period" just as well. +/// +/// However, if *additionally* we send on a different address than the one we receive on, +/// then this extension doesn't happen. +/// +/// To fix this, we also apply the same path validation logic to non-best addresses. +/// I.e. we keep track of when they last received a pong, at which point we consider them +/// validated, and then extend this validation period when we receive application data +/// while they're valid (or when we receive another pong). +/// +/// Now, when our best address becomes outdated, we need to switch to another valid path. +/// +/// We could switch to another path once the best address becomes outdated, but then we'd +/// already start sending on the relay for a couple of iterations! +/// +/// So instead, we switch to another path when it looks like the best address becomes +/// outdated. +/// Not just any path, but the path that we're currently receiving from for this node. +/// +/// Since we might not be receiving constantly from the remote side (e.g. if it's a +/// one-sided transfer), we need to take care to do consider switching early enough. +/// +/// So this duration is chosen as at least 1 keep alive interval (1s default in iroh atm) +/// + at maximum 400ms of latency spike. +const TRUST_UDP_ADDR_SOON_OUTDATED: Duration = Duration::from_millis(1400); #[derive(Debug, Default)] pub(super) struct BestAddr(Option); @@ -38,12 +82,12 @@ pub(super) enum Source { } impl Source { - fn trust_until(&self, from: Instant) -> Instant { + fn trust_duration(&self) -> Duration { match self { - Source::ReceivedPong => from + TRUST_UDP_ADDR_DURATION, + Source::ReceivedPong => TRUST_UDP_ADDR_DURATION, // TODO: Fix time - Source::BestCandidate => from + Duration::from_secs(60 * 60), - Source::Udp => from + TRUST_UDP_ADDR_DURATION, + Source::BestCandidate => Duration::from_secs(60 * 60), + Source::Udp => TRUST_UDP_ADDR_DURATION, } } } @@ -115,6 +159,7 @@ impl BestAddr { latency: Duration, source: Source, confirmed_at: Instant, + now: Instant, ) { match self.0.as_mut() { None => { @@ -122,23 +167,39 @@ impl BestAddr { } Some(state) => { let candidate = AddrLatency { addr, latency }; - if !state.is_trusted(confirmed_at) || candidate.is_better_than(&state.addr) { + if !state.is_trusted(now) || candidate.is_better_than(&state.addr) { self.insert(addr, latency, source, confirmed_at); } else if state.addr.addr == addr { state.confirmed_at = confirmed_at; - state.trust_until = Some(source.trust_until(confirmed_at)); + state.trust_until = Some(confirmed_at + source.trust_duration()); } } } } - /// Reset the expiry, if the passed in addr matches the currently used one. - #[cfg(not(wasm_browser))] - pub fn reconfirm_if_used(&mut self, addr: SocketAddr, source: Source, confirmed_at: Instant) { - if let Some(state) = self.0.as_mut() { - if state.addr.addr == addr { - state.confirmed_at = confirmed_at; - state.trust_until = Some(source.trust_until(confirmed_at)); + pub fn insert_if_soon_outdated_or_reconfirm( + &mut self, + addr: SocketAddr, + latency: Duration, + source: Source, + confirmed_at: Instant, + now: Instant, + ) { + match self.0.as_mut() { + None => { + self.insert(addr, latency, source, confirmed_at); + } + Some(state) => { + // If the current best addr will soon be outdated + // and the given candidate will be trusted for longer + if !state.is_trusted(now + TRUST_UDP_ADDR_SOON_OUTDATED) + && state.confirmed_at < confirmed_at + { + self.insert(addr, latency, source, confirmed_at); + } else if state.addr.addr == addr { + state.confirmed_at = confirmed_at; + state.trust_until = Some(confirmed_at + source.trust_duration()); + } } } } @@ -150,7 +211,7 @@ impl BestAddr { source: Source, confirmed_at: Instant, ) { - let trust_until = source.trust_until(confirmed_at); + let trust_until = confirmed_at + source.trust_duration(); if self .0 diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index adbd155a791..bc2f54780a4 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -978,6 +978,7 @@ impl NodeState { latency, best_addr::Source::ReceivedPong, now, + now, ); } @@ -1064,18 +1065,26 @@ impl NodeState { debug_assert!(false, "node map inconsistency by_ip_port <-> direct addr"); return; }; - state.last_payload_msg = Some(now); + state.receive_payload(now); self.last_used = Some(now); + if let Some((latency, confirmed_at)) = state.valid_best_addr_candidate(now) { self.udp_paths .best_addr - .reconfirm_if_used(addr.into(), BestAddrSource::Udp, now); + .insert_if_soon_outdated_or_reconfirm( + addr.into(), + latency, + BestAddrSource::Udp, + confirmed_at, + now, + ); + } } pub(super) fn receive_relay(&mut self, url: &RelayUrl, src: NodeId, now: Instant) { match self.relay_url.as_mut() { Some((current_home, state)) if current_home == url => { // We received on the expected url. update state. - state.last_payload_msg = Some(now); + state.receive_payload(now); } Some((_current_home, _state)) => { // we have a different url. we only update on ping, not on receive_relay. diff --git a/iroh/src/magicsock/node_map/path_state.rs b/iroh/src/magicsock/node_map/path_state.rs index 7241121722a..c115e90e828 100644 --- a/iroh/src/magicsock/node_map/path_state.rs +++ b/iroh/src/magicsock/node_map/path_state.rs @@ -14,7 +14,10 @@ use super::{ node_state::{ControlMsg, PongReply, SESSION_ACTIVE_TIMEOUT}, IpPort, PingRole, Source, }; -use crate::{disco::SendAddr, magicsock::HEARTBEAT_INTERVAL}; +use crate::{ + disco::SendAddr, + magicsock::{node_map::best_addr::TRUST_UDP_ADDR_DURATION, HEARTBEAT_INTERVAL}, +}; /// The minimum time between pings to an endpoint. /// @@ -53,6 +56,9 @@ pub(super) struct PathState { /// /// This excludes DISCO messages. pub(super) last_payload_msg: Option, + /// Whether the last payload msg was within [`TRUST_UDP_ADDR_DURATION`] of either + /// the last trusted payload message or a recent pong. + pub(super) last_payload_trusted: bool, /// Sources is a map of [`Source`]s to [`Instant`]s, keeping track of all the ways we have /// learned about this path /// @@ -73,6 +79,7 @@ impl PathState { call_me_maybe_time: None, recent_pong: None, last_payload_msg: None, + last_payload_trusted: false, sources, } } @@ -100,6 +107,7 @@ impl PathState { call_me_maybe_time: None, recent_pong: None, last_payload_msg: Some(now), + last_payload_trusted: false, sources, } } @@ -129,6 +137,26 @@ impl PathState { } } self.recent_pong = Some(r); + + if let Some(last_payload_msg) = self.last_payload_msg { + self.last_payload_trusted = self.within_trusted_pong_duration(last_payload_msg); + } + } + + fn within_trusted_pong_duration(&self, instant: Instant) -> bool { + if let Some(pong) = &self.recent_pong { + return pong.pong_at <= instant && instant < pong.pong_at + TRUST_UDP_ADDR_DURATION; + } + + false + } + + pub(super) fn receive_payload(&mut self, now: Instant) { + self.last_payload_msg = Some(now); + + if self.within_trusted_pong_duration(now) { + self.last_payload_trusted = true; + } } #[cfg(test)] @@ -141,6 +169,7 @@ impl PathState { call_me_maybe_time: None, recent_pong: Some(r), last_payload_msg: None, + last_payload_trusted: false, sources: HashMap::new(), } } @@ -187,6 +216,21 @@ impl PathState { .copied() } + fn last_confirmed_at(&self) -> Option { + let last_trusted_payload_msg = if self.last_payload_trusted { + self.last_payload_msg + } else { + None + }; + + self.recent_pong + .as_ref() + .map(|pong| pong.pong_at) + .into_iter() + .chain(last_trusted_payload_msg) + .max() + } + /// The last control or DISCO message **about** this path. /// /// This is the most recent instant among: @@ -222,6 +266,18 @@ impl PathState { self.recent_pong.as_ref().map(|p| p.latency) } + /// Returns the latency and confirmed_at time if this path would make for a valid best addr `now` + pub(crate) fn valid_best_addr_candidate(&self, now: Instant) -> Option<(Duration, Instant)> { + let latency = self.recent_pong.as_ref()?.latency; + let confirmed_at = self.last_confirmed_at()?; + + if confirmed_at <= now && now < confirmed_at + TRUST_UDP_ADDR_DURATION { + return Some((latency, confirmed_at)); + } + + None + } + pub(super) fn needs_ping(&self, now: &Instant) -> bool { match self.last_ping { None => true, diff --git a/iroh/src/magicsock/node_map/udp_paths.rs b/iroh/src/magicsock/node_map/udp_paths.rs index 0d96c3bba0b..53d32e861da 100644 --- a/iroh/src/magicsock/node_map/udp_paths.rs +++ b/iroh/src/magicsock/node_map/udp_paths.rs @@ -98,7 +98,7 @@ impl NodeUdpPaths { /// it should be `&self` and not `&mut self`. This is only possible once the state from /// [`NodeUdpPaths`] is no longer modified from outside. pub(super) fn send_addr(&mut self, now: Instant, have_ipv6: bool) -> UdpSendAddr { - self.assign_best_addr_from_candidates_if_empty(); + self.assign_best_addr_from_candidates_if_empty(now); match self.best_addr.state(now) { best_addr::State::Valid(addr) => UdpSendAddr::Valid(addr.addr), best_addr::State::Outdated(addr) => UdpSendAddr::Outdated(addr.addr), @@ -139,7 +139,7 @@ impl NodeUdpPaths { /// If somehow we end up in a state where we failed to set a best_addr, while we do have /// valid candidates, this will chose a candidate and set best_addr again. Most likely /// this is a bug elsewhere though. - fn assign_best_addr_from_candidates_if_empty(&mut self) { + fn assign_best_addr_from_candidates_if_empty(&mut self, now: Instant) { if !self.best_addr.is_empty() { return; } @@ -173,6 +173,7 @@ impl NodeUdpPaths { pong.latency, best_addr::Source::BestCandidate, pong.pong_at, + now, ) } } From 38a7a6cbe13e0de7ae8585e02492cdae0ba6a409 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Fri, 18 Jul 2025 18:37:39 +0200 Subject: [PATCH 02/11] Turn off forcing staging relays --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 10552695edf..b778f8e19c8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,7 +18,7 @@ env: RUSTDOCFLAGS: -Dwarnings MSRV: "1.81" SCCACHE_CACHE_SIZE: "10G" - IROH_FORCE_STAGING_RELAYS: "1" + # IROH_FORCE_STAGING_RELAYS: "1" jobs: tests: From 5db50dd09be27bdf2968fea5c650e1a76255cb1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Fri, 18 Jul 2025 18:38:14 +0200 Subject: [PATCH 03/11] cargo make format --- iroh/src/magicsock/node_map/node_state.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index bc2f54780a4..ff495c947f6 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -1068,8 +1068,8 @@ impl NodeState { state.receive_payload(now); self.last_used = Some(now); if let Some((latency, confirmed_at)) = state.valid_best_addr_candidate(now) { - self.udp_paths - .best_addr + self.udp_paths + .best_addr .insert_if_soon_outdated_or_reconfirm( addr.into(), latency, From 67ac9b06e3800d572a298a177ed07925fc6c0e4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Mon, 21 Jul 2025 10:25:58 +0200 Subject: [PATCH 04/11] Use production relay map in `simple_node_id_based_connection_transfer` test --- iroh/tests/integration.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/iroh/tests/integration.rs b/iroh/tests/integration.rs index 0ccac01cb2f..a0ed467ba71 100644 --- a/iroh/tests/integration.rs +++ b/iroh/tests/integration.rs @@ -36,8 +36,15 @@ const ECHO_ALPN: &[u8] = b"echo"; async fn simple_node_id_based_connection_transfer() -> TestResult { setup_logging(); - let client = Endpoint::builder().discovery_n0().bind().await?; + let relay_map = iroh::defaults::prod::default_relay_map(); + + let client = Endpoint::builder() + .relay_mode(iroh::RelayMode::Custom(relay_map.clone())) + .discovery_n0() + .bind() + .await?; let server = Endpoint::builder() + .relay_mode(iroh::RelayMode::Custom(relay_map)) .discovery_n0() .alpns(vec![ECHO_ALPN.to_vec()]) .bind() From e521208f89136450d39becd651125475766e689c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Mon, 21 Jul 2025 11:42:05 +0200 Subject: [PATCH 05/11] Track path validity for all paths --- iroh/src/magicsock/node_map.rs | 1 + iroh/src/magicsock/node_map/node_state.rs | 19 ++-- iroh/src/magicsock/node_map/path_state.rs | 95 ++++++-------------- iroh/src/magicsock/node_map/path_validity.rs | 92 +++++++++++++++++++ iroh/src/magicsock/node_map/udp_paths.rs | 4 +- 5 files changed, 136 insertions(+), 75 deletions(-) create mode 100644 iroh/src/magicsock/node_map/path_validity.rs diff --git a/iroh/src/magicsock/node_map.rs b/iroh/src/magicsock/node_map.rs index af0634a13bb..ce912143302 100644 --- a/iroh/src/magicsock/node_map.rs +++ b/iroh/src/magicsock/node_map.rs @@ -26,6 +26,7 @@ use crate::{ mod best_addr; mod node_state; mod path_state; +mod path_validity; mod udp_paths; pub use node_state::{ConnectionType, ControlMsg, DirectAddrInfo, RemoteInfo}; diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index ff495c947f6..cce7ef4d033 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -25,7 +25,10 @@ use super::{ use crate::endpoint::PathSelection; use crate::{ disco::{self, SendAddr}, - magicsock::{ActorMessage, MagicsockMetrics, NodeIdMappedAddr, HEARTBEAT_INTERVAL}, + magicsock::{ + node_map::path_validity::PathValidity, ActorMessage, MagicsockMetrics, NodeIdMappedAddr, + HEARTBEAT_INTERVAL, + }, watchable::{Watchable, Watcher}, }; @@ -242,7 +245,7 @@ impl NodeState { .iter() .map(|(addr, path_state)| DirectAddrInfo { addr: SocketAddr::from(*addr), - latency: path_state.recent_pong.as_ref().map(|pong| pong.latency), + latency: path_state.validity.get_pong().map(|pong| pong.latency), last_control: path_state.last_control_msg(now), last_payload: path_state .last_payload_msg @@ -445,7 +448,7 @@ impl NodeState { // which we should have received the pong, clear best addr and // pong. Both are used to select this path again, but we know // it's not a usable path now. - path_state.recent_pong = None; + path_state.validity = PathValidity::empty(); self.udp_paths.best_addr.clear_if_equals( addr, ClearReason::PongTimeout, @@ -894,7 +897,7 @@ impl NodeState { event!( target: "iroh::_events::pong::recv", Level::DEBUG, - remote_node = self.node_id.fmt_short(), + remote_node = %self.node_id.fmt_short(), ?src, txn = ?m.tx_id, ); @@ -1034,9 +1037,9 @@ impl NodeState { if !call_me_maybe_ipps.contains(ipp) { // TODO: This seems like a weird way to signal that the endpoint no longer // thinks it has this IpPort as an available path. - if st.recent_pong.is_some() { + if !st.validity.is_empty() { debug!(path=?ipp ,"clearing recent pong"); - st.recent_pong = None; + st.validity = PathValidity::empty(); } } } @@ -1067,7 +1070,9 @@ impl NodeState { }; state.receive_payload(now); self.last_used = Some(now); - if let Some((latency, confirmed_at)) = state.valid_best_addr_candidate(now) { + if state.validity.is_valid(now) { + let confirmed_at = state.validity.confirmed_at().unwrap(); + let latency = state.validity.get_pong().unwrap().latency; self.udp_paths .best_addr .insert_if_soon_outdated_or_reconfirm( diff --git a/iroh/src/magicsock/node_map/path_state.rs b/iroh/src/magicsock/node_map/path_state.rs index c115e90e828..913f16be44f 100644 --- a/iroh/src/magicsock/node_map/path_state.rs +++ b/iroh/src/magicsock/node_map/path_state.rs @@ -16,7 +16,10 @@ use super::{ }; use crate::{ disco::SendAddr, - magicsock::{node_map::best_addr::TRUST_UDP_ADDR_DURATION, HEARTBEAT_INTERVAL}, + magicsock::{ + node_map::path_validity::{self, PathValidity}, + HEARTBEAT_INTERVAL, + }, }; /// The minimum time between pings to an endpoint. @@ -30,7 +33,7 @@ const DISCO_PING_INTERVAL: Duration = Duration::from_secs(5); /// This state is used for both the relay path and any direct UDP paths. /// /// [`NodeState`]: super::node_state::NodeState -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone)] pub(super) struct PathState { /// The node for which this path exists. node_id: NodeId, @@ -47,18 +50,19 @@ pub(super) struct PathState { /// The time this endpoint was last advertised via a call-me-maybe DISCO message. pub(super) call_me_maybe_time: Option, - /// The most recent [`PongReply`]. + // /// The most recent [`PongReply`]. + // /// + // /// Previous replies are cleared when they are no longer relevant to determine whether + // /// this path can still be used to reach the remote node. + // pub(super) recent_pong: Option, + /// Tracks whether this path is valid. /// - /// Previous replies are cleared when they are no longer relevant to determine whether - /// this path can still be used to reach the remote node. - pub(super) recent_pong: Option, + /// See [`PathValidity`] docs. + pub(super) validity: PathValidity, /// When the last payload data was **received** via this path. /// /// This excludes DISCO messages. pub(super) last_payload_msg: Option, - /// Whether the last payload msg was within [`TRUST_UDP_ADDR_DURATION`] of either - /// the last trusted payload message or a recent pong. - pub(super) last_payload_trusted: bool, /// Sources is a map of [`Source`]s to [`Instant`]s, keeping track of all the ways we have /// learned about this path /// @@ -77,9 +81,8 @@ impl PathState { last_ping: None, last_got_ping: None, call_me_maybe_time: None, - recent_pong: None, + validity: PathValidity::empty(), last_payload_msg: None, - last_payload_trusted: false, sources, } } @@ -105,9 +108,8 @@ impl PathState { last_ping: None, last_got_ping: None, call_me_maybe_time: None, - recent_pong: None, + validity: PathValidity::empty(), last_payload_msg: Some(now), - last_payload_trusted: false, sources, } } @@ -126,7 +128,7 @@ impl PathState { pub(super) fn add_pong_reply(&mut self, r: PongReply) { if let SendAddr::Udp(ref path) = self.path { - if self.recent_pong.is_none() { + if self.validity.is_empty() { event!( target: "iroh::_events::holepunched", Level::DEBUG, @@ -136,27 +138,16 @@ impl PathState { ); } } - self.recent_pong = Some(r); - if let Some(last_payload_msg) = self.last_payload_msg { - self.last_payload_trusted = self.within_trusted_pong_duration(last_payload_msg); - } - } - - fn within_trusted_pong_duration(&self, instant: Instant) -> bool { - if let Some(pong) = &self.recent_pong { - return pong.pong_at <= instant && instant < pong.pong_at + TRUST_UDP_ADDR_DURATION; - } - - false + self.validity = PathValidity::new(r); } pub(super) fn receive_payload(&mut self, now: Instant) { self.last_payload_msg = Some(now); - - if self.within_trusted_pong_duration(now) { - self.last_payload_trusted = true; - } + // TODO(matheus23): It's not necessarily UDP. Kinda weird to have this thing + // Also, it all results in the same 6.5s timeout anyways, maybe we just remove it? + self.validity + .receive_payload(now, path_validity::Source::Udp); } #[cfg(test)] @@ -167,9 +158,8 @@ impl PathState { last_ping: None, last_got_ping: None, call_me_maybe_time: None, - recent_pong: Some(r), + validity: PathValidity::new(r), last_payload_msg: None, - last_payload_trusted: false, sources: HashMap::new(), } } @@ -205,8 +195,8 @@ impl PathState { /// - When the last payload transmission occurred. /// - when the last ping from them was received. pub(super) fn last_alive(&self) -> Option { - self.recent_pong - .as_ref() + self.validity + .get_pong() .map(|pong| &pong.pong_at) .into_iter() .chain(self.last_payload_msg.as_ref()) @@ -216,21 +206,6 @@ impl PathState { .copied() } - fn last_confirmed_at(&self) -> Option { - let last_trusted_payload_msg = if self.last_payload_trusted { - self.last_payload_msg - } else { - None - }; - - self.recent_pong - .as_ref() - .map(|pong| pong.pong_at) - .into_iter() - .chain(last_trusted_payload_msg) - .max() - } - /// The last control or DISCO message **about** this path. /// /// This is the most recent instant among: @@ -242,8 +217,8 @@ impl PathState { pub(super) fn last_control_msg(&self, now: Instant) -> Option<(Duration, ControlMsg)> { // get every control message and assign it its kind let last_pong = self - .recent_pong - .as_ref() + .validity + .get_pong() .map(|pong| (pong.pong_at, ControlMsg::Pong)); let last_call_me_maybe = self .call_me_maybe_time @@ -263,19 +238,7 @@ impl PathState { /// Returns the latency from the most recent pong, if available. pub(super) fn latency(&self) -> Option { - self.recent_pong.as_ref().map(|p| p.latency) - } - - /// Returns the latency and confirmed_at time if this path would make for a valid best addr `now` - pub(crate) fn valid_best_addr_candidate(&self, now: Instant) -> Option<(Duration, Instant)> { - let latency = self.recent_pong.as_ref()?.latency; - let confirmed_at = self.last_confirmed_at()?; - - if confirmed_at <= now && now < confirmed_at + TRUST_UDP_ADDR_DURATION { - return Some((latency, confirmed_at)); - } - - None + self.validity.get_pong().map(|p| p.latency) } pub(super) fn needs_ping(&self, now: &Instant) -> bool { @@ -336,7 +299,7 @@ impl PathState { self.last_ping = None; self.last_got_ping = None; self.call_me_maybe_time = None; - self.recent_pong = None; + self.validity = PathValidity::empty(); } fn summary(&self, mut w: impl std::fmt::Write) -> std::fmt::Result { @@ -344,7 +307,7 @@ impl PathState { if self.is_active() { write!(w, "active ")?; } - if let Some(ref pong) = self.recent_pong { + if let Some(pong) = self.validity.get_pong() { write!(w, "pong-received({:?} ago) ", pong.pong_at.elapsed())?; } if let Some(when) = self.last_incoming_ping() { diff --git a/iroh/src/magicsock/node_map/path_validity.rs b/iroh/src/magicsock/node_map/path_validity.rs new file mode 100644 index 00000000000..811ea8c1168 --- /dev/null +++ b/iroh/src/magicsock/node_map/path_validity.rs @@ -0,0 +1,92 @@ +use n0_future::time::{Duration, Instant}; + +use crate::magicsock::node_map::node_state::PongReply; + +/// How long we trust a UDP address as the exclusive path (without using relay) without having heard a Pong reply. +const TRUST_UDP_ADDR_DURATION: Duration = Duration::from_millis(6500); + +/// Tracks a path's validity. +/// +/// A path is valid: +/// - For [`Source::trust_duration`] after a successful [`PongReply`]. +/// - For [`Source::trust_duration`] longer starting at the most recent +/// received application payload *while the path was valid*. +#[derive(Debug, Clone, Default)] +pub(super) struct PathValidity(Option); + +#[derive(Debug, Clone)] +struct Inner { + recent_pong: PongReply, + confirmed_at: Instant, + trust_until: Instant, +} + +#[derive(Debug)] +pub(super) enum Source { + ReceivedPong, + // BestCandidate, + Udp, +} + +impl Source { + fn trust_duration(&self) -> Duration { + match self { + Source::ReceivedPong => TRUST_UDP_ADDR_DURATION, + // // TODO: Fix time + // Source::BestCandidate => Duration::from_secs(60 * 60), + Source::Udp => TRUST_UDP_ADDR_DURATION, + } + } +} + +impl PathValidity { + pub(super) fn new(recent_pong: PongReply) -> Self { + Self(Some(Inner { + confirmed_at: recent_pong.pong_at, + trust_until: recent_pong.pong_at + Source::ReceivedPong.trust_duration(), + recent_pong, + })) + } + + pub(super) fn empty() -> Self { + Self(None) + } + + pub(super) fn is_empty(&self) -> bool { + self.0.is_none() + } + + pub(super) fn is_valid(&self, now: Instant) -> bool { + let Some(state) = self.0.as_ref() else { + return false; + }; + + state.is_valid(now) + } + + /// Reconfirms path validity, if a payload was received while the + /// path was valid. + pub(super) fn receive_payload(&mut self, now: Instant, source: Source) { + let Some(state) = self.0.as_mut() else { + return; + }; + + if state.is_valid(now) { + state.trust_until = now + source.trust_duration(); + } + } + + pub(super) fn get_pong(&self) -> Option<&PongReply> { + self.0.as_ref().map(|inner| &inner.recent_pong) + } + + pub(super) fn confirmed_at(&self) -> Option { + self.0.as_ref().map(|inner| inner.confirmed_at) + } +} + +impl Inner { + fn is_valid(&self, now: Instant) -> bool { + self.confirmed_at <= now && now < self.trust_until + } +} diff --git a/iroh/src/magicsock/node_map/udp_paths.rs b/iroh/src/magicsock/node_map/udp_paths.rs index 53d32e861da..c1c1bd2c36a 100644 --- a/iroh/src/magicsock/node_map/udp_paths.rs +++ b/iroh/src/magicsock/node_map/udp_paths.rs @@ -151,10 +151,10 @@ impl NodeUdpPaths { let best_latency = best_pong .map(|p: &PongReply| p.latency) .unwrap_or(MAX_LATENCY); - match state.recent_pong { + match state.validity.get_pong() { // This pong is better if it has a lower latency, or if it has the same // latency but on an IPv6 path. - Some(ref pong) + Some(pong) if pong.latency < best_latency || (pong.latency == best_latency && ipp.ip().is_ipv6()) => { From 14e9bb947639e2f0e9e062963052faa57a4417cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Tue, 22 Jul 2025 11:45:08 +0200 Subject: [PATCH 06/11] Use `&self` instead of `&mut self` in `get_send_addr` --- iroh/src/magicsock/node_map.rs | 21 +- iroh/src/magicsock/node_map/node_state.rs | 169 ++++++++-------- iroh/src/magicsock/node_map/path_validity.rs | 33 ++++ iroh/src/magicsock/node_map/udp_paths.rs | 198 ++++++++++--------- 4 files changed, 226 insertions(+), 195 deletions(-) diff --git a/iroh/src/magicsock/node_map.rs b/iroh/src/magicsock/node_map.rs index ce912143302..952e325acad 100644 --- a/iroh/src/magicsock/node_map.rs +++ b/iroh/src/magicsock/node_map.rs @@ -200,7 +200,7 @@ impl NodeMap { .expect("poisoned") .get_mut(NodeStateKey::Idx(id)) { - ep.ping_timeout(tx_id); + ep.ping_timeout(tx_id, Instant::now()); } } @@ -277,9 +277,10 @@ impl NodeMap { } pub(super) fn reset_node_states(&self) { + let now = Instant::now(); let mut inner = self.inner.lock().expect("poisoned"); for (_, ep) in inner.node_states_mut() { - ep.note_connectivity_change(); + ep.note_connectivity_change(now); } } @@ -328,7 +329,7 @@ impl NodeMap { self.inner .lock() .expect("poisoned") - .on_direct_addr_discovered(discovered); + .on_direct_addr_discovered(discovered, Instant::now()); } } @@ -389,18 +390,22 @@ impl NodeMapInner { } /// Prunes direct addresses from nodes that claim to share an address we know points to us. - pub(super) fn on_direct_addr_discovered(&mut self, discovered: BTreeSet) { + pub(super) fn on_direct_addr_discovered( + &mut self, + discovered: BTreeSet, + now: Instant, + ) { for addr in discovered { - self.remove_by_ipp(addr.into(), ClearReason::MatchesOurLocalAddr) + self.remove_by_ipp(addr.into(), ClearReason::MatchesOurLocalAddr, now) } } /// Removes a direct address from a node. - fn remove_by_ipp(&mut self, ipp: IpPort, reason: ClearReason) { + fn remove_by_ipp(&mut self, ipp: IpPort, reason: ClearReason, now: Instant) { if let Some(id) = self.by_ip_port.remove(&ipp) { if let Entry::Occupied(mut entry) = self.by_id.entry(id) { let node = entry.get_mut(); - node.remove_direct_addr(&ipp, reason); + node.remove_direct_addr(&ipp, reason, now); if node.direct_addresses().count() == 0 { let node_id = node.public_key(); let mapped_addr = node.quic_mapped_addr(); @@ -853,7 +858,7 @@ mod tests { } info!("Pruning addresses"); - endpoint.prune_direct_addresses(); + endpoint.prune_direct_addresses(Instant::now()); // Half the offline addresses should have been pruned. All the active and alive // addresses should have been kept. diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index cce7ef4d033..1bfb8f7734b 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -2,6 +2,7 @@ use std::{ collections::{btree_map::Entry, BTreeSet, HashMap}, hash::Hash, net::{IpAddr, SocketAddr}, + sync::atomic::AtomicBool, }; use data_encoding::HEXLOWER; @@ -16,7 +17,7 @@ use tokio::sync::mpsc; use tracing::{debug, event, info, instrument, trace, warn, Level}; use super::{ - best_addr::{self, ClearReason, Source as BestAddrSource}, + best_addr::ClearReason, path_state::{summarize_node_paths, PathState}, udp_paths::{NodeUdpPaths, UdpSendAddr}, IpPort, Source, @@ -140,7 +141,7 @@ pub(super) struct NodeState { /// Whether the conn_type was ever observed to be `Direct` at some point. /// /// Used for metric reporting. - has_been_direct: bool, + has_been_direct: AtomicBool, /// Configuration for what path selection to use #[cfg(any(test, feature = "test-utils"))] path_selection: PathSelection, @@ -187,7 +188,7 @@ impl NodeState { last_used: options.active.then(Instant::now), last_call_me_maybe: None, conn_type: Watchable::new(ConnectionType::None), - has_been_direct: false, + has_been_direct: AtomicBool::new(false), #[cfg(any(test, feature = "test-utils"))] path_selection: options.path_selection, } @@ -281,8 +282,7 @@ impl NodeState { /// /// This may return to send on one, both or no paths. fn addr_for_send( - &mut self, - now: &Instant, + &self, have_ipv6: bool, metrics: &MagicsockMetrics, ) -> (Option, Option) { @@ -291,22 +291,22 @@ impl NodeState { debug!("in `RelayOnly` mode, giving the relay address as the only viable address for this endpoint"); return (None, self.relay_url()); } - let (best_addr, relay_url) = match self.udp_paths.send_addr(*now, have_ipv6) { + let (best_addr, relay_url) = match self.udp_paths.send_addr(have_ipv6) { UdpSendAddr::Valid(addr) => { // If we have a valid address we use it. trace!(%addr, "UdpSendAddr is valid, use it"); - (Some(addr), None) + (Some(*addr), None) } UdpSendAddr::Outdated(addr) => { // If the address is outdated we use it, but send via relay at the same time. // We also send disco pings so that it will become valid again if it still // works (i.e. we don't need to holepunch again). trace!(%addr, "UdpSendAddr is outdated, use it together with relay"); - (Some(addr), self.relay_url()) + (Some(*addr), self.relay_url()) } UdpSendAddr::Unconfirmed(addr) => { trace!(%addr, "UdpSendAddr is unconfirmed, use it together with relay"); - (Some(addr), self.relay_url()) + (Some(*addr), self.relay_url()) } UdpSendAddr::None => { trace!("No UdpSendAddr, use relay"); @@ -319,9 +319,13 @@ impl NodeState { (None, Some(relay_url)) => ConnectionType::Relay(relay_url), (None, None) => ConnectionType::None, }; - if !self.has_been_direct && matches!(&typ, ConnectionType::Direct(_)) { - self.has_been_direct = true; - metrics.nodes_contacted_directly.inc(); + if matches!(&typ, ConnectionType::Direct(_)) { + let before = self + .has_been_direct + .swap(true, std::sync::atomic::Ordering::Relaxed); + if !before { + metrics.nodes_contacted_directly.inc(); + } } if let Ok(prev_typ) = self.conn_type.set(typ.clone()) { // The connection type has changed. @@ -368,7 +372,12 @@ impl NodeState { /// Removes a direct address for this node. /// /// If this is also the best address, it will be cleared as well. - pub(super) fn remove_direct_addr(&mut self, ip_port: &IpPort, reason: ClearReason) { + pub(super) fn remove_direct_addr( + &mut self, + ip_port: &IpPort, + reason: ClearReason, + now: Instant, + ) { let Some(state) = self.udp_paths.paths.remove(ip_port) else { return; }; @@ -378,11 +387,7 @@ impl NodeState { None => debug!(%ip_port, last_seen=%"never", ?reason, "pruning address"), } - self.udp_paths.best_addr.clear_if_equals( - (*ip_port).into(), - reason, - self.relay_url.is_some(), - ); + self.udp_paths.update_to_best_addr(now); } /// Whether we need to send another call-me-maybe to the endpoint. @@ -400,20 +405,27 @@ impl NodeState { debug!("no previous full ping: need full ping"); return true; }; - match self.udp_paths.best_addr.state(*now) { - best_addr::State::Empty => { + match &self.udp_paths.best { + UdpSendAddr::None | UdpSendAddr::Unconfirmed(_) => { debug!("best addr not set: need full ping"); true } - best_addr::State::Outdated(_) => { + UdpSendAddr::Outdated(_) => { debug!("best addr expired: need full ping"); true } - best_addr::State::Valid(addr) => { - if addr.latency > GOOD_ENOUGH_LATENCY && *now - last_full_ping >= UPGRADE_INTERVAL { + UdpSendAddr::Valid(addr) => { + let latency = self + .udp_paths + .paths + .get(&(*addr).into()) + .expect("send path not tracked?") + .latency() + .expect("send_addr marked valid incorrectly"); + if latency > GOOD_ENOUGH_LATENCY && *now - last_full_ping >= UPGRADE_INTERVAL { debug!( "full ping interval expired and latency is only {}ms: need full ping", - addr.latency.as_millis() + latency.as_millis() ); true } else { @@ -432,7 +444,7 @@ impl NodeState { /// Cleanup the expired ping for the passed in txid. #[instrument("disco", skip_all, fields(node = %self.node_id.fmt_short()))] - pub(super) fn ping_timeout(&mut self, txid: stun::TransactionId) { + pub(super) fn ping_timeout(&mut self, txid: stun::TransactionId, now: Instant) { if let Some(sp) = self.sent_pings.remove(&txid) { debug!(tx = %HEXLOWER.encode(&txid), addr = %sp.to, "pong not received in timeout"); match sp.to { @@ -449,20 +461,12 @@ impl NodeState { // pong. Both are used to select this path again, but we know // it's not a usable path now. path_state.validity = PathValidity::empty(); - self.udp_paths.best_addr.clear_if_equals( - addr, - ClearReason::PongTimeout, - self.relay_url().is_some(), - ) + self.udp_paths.update_to_best_addr(now); } } else { // If we have no state for the best addr it should have been cleared // anyway. - self.udp_paths.best_addr.clear_if_equals( - addr, - ClearReason::PongTimeout, - self.relay_url.is_some(), - ); + self.udp_paths.update_to_best_addr(now); } } SendAddr::Relay(ref url) => { @@ -638,7 +642,7 @@ impl NodeState { return ping_msgs; } - self.prune_direct_addresses(); + self.prune_direct_addresses(now); let mut ping_dsts = String::from("["); self.udp_paths .paths @@ -670,7 +674,10 @@ impl NodeState { source: super::Source, metrics: &MagicsockMetrics, ) { - if self.udp_paths.best_addr.is_empty() { + if matches!( + self.udp_paths.best, + UdpSendAddr::None | UdpSendAddr::Unconfirmed(_) + ) { // we do not have a direct connection, so changing the relay information may // have an effect on our connection status if self.relay_url.is_none() && new_relay_url.is_some() { @@ -716,13 +723,12 @@ impl NodeState { #[instrument(skip_all, fields(node = %self.node_id.fmt_short()))] pub(super) fn reset(&mut self) { self.last_full_ping = None; - self.udp_paths - .best_addr - .clear(ClearReason::Reset, self.relay_url.is_some()); for es in self.udp_paths.paths.values_mut() { es.last_ping = None; } + + self.udp_paths.update_to_best_addr(Instant::now()); } /// Handle a received Disco Ping. @@ -802,14 +808,14 @@ impl NodeState { ); if matches!(path, SendAddr::Udp(_)) && matches!(role, PingRole::NewPath) { - self.prune_direct_addresses(); + self.prune_direct_addresses(now); } - // if the endpoint does not yet have a best_addrr + // if the endpoint does not yet have a best_addr let needs_ping_back = if matches!(path, SendAddr::Udp(_)) && matches!( - self.udp_paths.best_addr.state(now), - best_addr::State::Empty | best_addr::State::Outdated(_) + self.udp_paths.best, + UdpSendAddr::None | UdpSendAddr::Unconfirmed(_) | UdpSendAddr::Outdated(_) ) { // We also need to send a ping to make this path available to us as well. This // is always sent together with a pong. So in the worst case the pong gets lost @@ -837,7 +843,7 @@ impl NodeState { /// /// This trims the list of inactive paths for an endpoint. At most /// [`MAX_INACTIVE_DIRECT_ADDRESSES`] are kept. - pub(super) fn prune_direct_addresses(&mut self) { + pub(super) fn prune_direct_addresses(&mut self, now: Instant) { // prune candidates are addresses that are not active let mut prune_candidates: Vec<_> = self .udp_paths @@ -867,7 +873,7 @@ impl NodeState { prune_candidates.sort_unstable_by_key(|(_ip_port, last_alive)| *last_alive); prune_candidates.truncate(prune_count); for (ip_port, _last_alive) in prune_candidates.into_iter() { - self.remove_direct_addr(&ip_port, ClearReason::Inactive) + self.remove_direct_addr(&ip_port, ClearReason::Inactive, now); } debug!( paths = %summarize_node_paths(&self.udp_paths.paths), @@ -878,11 +884,11 @@ impl NodeState { /// Called when connectivity changes enough that we should question our earlier /// assumptions about which paths work. #[instrument("disco", skip_all, fields(node = %self.node_id.fmt_short()))] - pub(super) fn note_connectivity_change(&mut self) { - self.udp_paths.best_addr.clear_trust("connectivity changed"); + pub(super) fn note_connectivity_change(&mut self, now: Instant) { for es in self.udp_paths.paths.values_mut() { es.clear(); } + self.udp_paths.update_to_best_addr(now); } /// Handles a Pong message (a reply to an earlier ping). @@ -974,15 +980,9 @@ impl NodeState { // Promote this pong response to our current best address if it's lower latency. // TODO(bradfitz): decide how latency vs. preference order affects decision - if let SendAddr::Udp(to) = sp.to { + if let SendAddr::Udp(_to) = sp.to { debug_assert!(!is_relay, "mismatching relay & udp"); - self.udp_paths.best_addr.insert_if_better_or_reconfirm( - to, - latency, - best_addr::Source::ReceivedPong, - now, - now, - ); + self.udp_paths.update_to_best_addr(now); } node_map_insert @@ -1045,15 +1045,8 @@ impl NodeState { } // Clear trust on our best_addr if it is not included in the updated set. Also // clear the last call-me-maybe send time so we will send one again. - if let Some(addr) = self.udp_paths.best_addr.addr() { - let ipp: IpPort = addr.into(); - if !call_me_maybe_ipps.contains(&ipp) { - self.udp_paths - .best_addr - .clear_trust("best_addr not in new call-me-maybe"); - self.last_call_me_maybe = None; - } - } + self.udp_paths.update_to_best_addr(now); + self.last_call_me_maybe = None; debug!( paths = %summarize_node_paths(&self.udp_paths.paths), "updated endpoint paths from call-me-maybe", @@ -1070,19 +1063,7 @@ impl NodeState { }; state.receive_payload(now); self.last_used = Some(now); - if state.validity.is_valid(now) { - let confirmed_at = state.validity.confirmed_at().unwrap(); - let latency = state.validity.get_pong().unwrap().latency; - self.udp_paths - .best_addr - .insert_if_soon_outdated_or_reconfirm( - addr.into(), - latency, - BestAddrSource::Udp, - confirmed_at, - now, - ); - } + self.udp_paths.update_to_best_addr(now); } pub(super) fn receive_relay(&mut self, url: &RelayUrl, src: NodeId, now: Instant) { @@ -1150,7 +1131,7 @@ impl NodeState { } // Send heartbeat ping to keep the current addr going as long as we need it. - if let Some(udp_addr) = self.udp_paths.best_addr.addr() { + if let Some(udp_addr) = self.udp_paths.best.get_addr() { let elapsed = self.last_ping(&SendAddr::Udp(udp_addr)).map(|l| now - l); // Send a ping if the last ping is older than 2 seconds. let needs_ping = match elapsed { @@ -1178,6 +1159,8 @@ impl NodeState { /// Returns the addresses on which a payload should be sent right now. /// /// This is in the hot path of `.poll_send()`. + // TODO(matheus23): Make this take &self. That's not quite possible yet due to `send_call_me_maybe` + // eventually calling `prune_direct_addresses` (which needs &mut self) #[instrument("get_send_addrs", skip_all, fields(node = %self.node_id.fmt_short()))] pub(crate) fn get_send_addrs( &mut self, @@ -1190,7 +1173,7 @@ impl NodeState { // this is the first time we are trying to connect to this node metrics.nodes_contacted.inc(); } - let (udp_addr, relay_url) = self.addr_for_send(&now, have_ipv6, metrics); + let (udp_addr, relay_url) = self.addr_for_send(have_ipv6, metrics); let mut ping_msgs = Vec::new(); if self.want_call_me_maybe(&now) { @@ -1471,7 +1454,6 @@ pub enum ConnectionType { mod tests { use std::{collections::BTreeMap, net::Ipv4Addr}; - use best_addr::BestAddr; use iroh_base::SecretKey; use super::*; @@ -1528,18 +1510,19 @@ mod tests { relay_url: None, udp_paths: NodeUdpPaths::from_parts( endpoint_state, - BestAddr::from_parts( - ip_port.into(), - latency, - now, - now + Duration::from_secs(100), - ), + UdpSendAddr::Valid(ip_port.into()), + // BestAddr::from_parts( + // ip_port.into(), + // latency, + // now, + // now + Duration::from_secs(100), //TODO(matheus23): Is the 100s validity needed for the test? + // ), ), sent_pings: HashMap::new(), last_used: Some(now), last_call_me_maybe: None, conn_type: Watchable::new(ConnectionType::Direct(ip_port.into())), - has_been_direct: true, + has_been_direct: AtomicBool::new(true), #[cfg(any(test, feature = "test-utils"))] path_selection: PathSelection::default(), }, @@ -1561,7 +1544,7 @@ mod tests { last_used: Some(now), last_call_me_maybe: None, conn_type: Watchable::new(ConnectionType::Relay(send_addr.clone())), - has_been_direct: false, + has_been_direct: AtomicBool::new(false), #[cfg(any(test, feature = "test-utils"))] path_selection: PathSelection::default(), } @@ -1590,7 +1573,7 @@ mod tests { last_used: Some(now), last_call_me_maybe: None, conn_type: Watchable::new(ConnectionType::Relay(send_addr.clone())), - has_been_direct: false, + has_been_direct: AtomicBool::new(false), #[cfg(any(test, feature = "test-utils"))] path_selection: PathSelection::default(), } @@ -1623,7 +1606,9 @@ mod tests { relay_url: relay_and_state(key.public(), send_addr.clone()), udp_paths: NodeUdpPaths::from_parts( endpoint_state, - BestAddr::from_parts(socket_addr, Duration::from_millis(80), now, expired), + UdpSendAddr::Outdated(socket_addr), + // TODO(matheus23): Test might need adjustments + // BestAddr::from_parts(socket_addr, Duration::from_millis(80), now, expired), ), sent_pings: HashMap::new(), last_used: Some(now), @@ -1632,7 +1617,7 @@ mod tests { socket_addr, send_addr.clone(), )), - has_been_direct: false, + has_been_direct: AtomicBool::new(false), #[cfg(any(test, feature = "test-utils"))] path_selection: PathSelection::default(), }, diff --git a/iroh/src/magicsock/node_map/path_validity.rs b/iroh/src/magicsock/node_map/path_validity.rs index 811ea8c1168..3f34a2df989 100644 --- a/iroh/src/magicsock/node_map/path_validity.rs +++ b/iroh/src/magicsock/node_map/path_validity.rs @@ -64,6 +64,33 @@ impl PathValidity { state.is_valid(now) } + pub(super) fn latency_if_valid(&self, now: Instant) -> Option { + let Some(state) = self.0.as_ref() else { + return None; + }; + + state.is_valid(now).then_some(state.recent_pong.latency) + } + + pub(super) fn is_outdated(&self, now: Instant) -> bool { + let Some(state) = self.0.as_ref() else { + return false; + }; + + // We *used* to be valid, but are now outdated. + // This happens when we had a DISCO pong but didn't receive + // any payload data or further pongs for at least TRUST_UDP_ADDR_DURATION + state.is_outdated(now) + } + + pub(super) fn latency_if_outdated(&self, now: Instant) -> Option { + let Some(state) = self.0.as_ref() else { + return None; + }; + + state.is_outdated(now).then_some(state.recent_pong.latency) + } + /// Reconfirms path validity, if a payload was received while the /// path was valid. pub(super) fn receive_payload(&mut self, now: Instant, source: Source) { @@ -72,6 +99,7 @@ impl PathValidity { }; if state.is_valid(now) { + state.confirmed_at = now; state.trust_until = now + source.trust_duration(); } } @@ -80,6 +108,7 @@ impl PathValidity { self.0.as_ref().map(|inner| &inner.recent_pong) } + // TODO(matheus23): Use this to bias the choice of best outdated addr maybe? pub(super) fn confirmed_at(&self) -> Option { self.0.as_ref().map(|inner| inner.confirmed_at) } @@ -89,4 +118,8 @@ impl Inner { fn is_valid(&self, now: Instant) -> bool { self.confirmed_at <= now && now < self.trust_until } + + fn is_outdated(&self, now: Instant) -> bool { + self.confirmed_at <= now && self.trust_until <= now + } } diff --git a/iroh/src/magicsock/node_map/udp_paths.rs b/iroh/src/magicsock/node_map/udp_paths.rs index c1c1bd2c36a..e51ea0e0f1f 100644 --- a/iroh/src/magicsock/node_map/udp_paths.rs +++ b/iroh/src/magicsock/node_map/udp_paths.rs @@ -7,17 +7,9 @@ //! [`NodeState`]: super::node_state::NodeState use std::{collections::BTreeMap, net::SocketAddr}; -use n0_future::time::{Duration, Instant}; -use rand::seq::IteratorRandom; -use tracing::warn; +use n0_future::time::Instant; -use super::{ - best_addr::{self, BestAddr}, - node_state::PongReply, - path_state::PathState, - IpPort, -}; -use crate::disco::SendAddr; +use super::{path_state::PathState, IpPort}; /// The address on which to send datagrams over UDP. /// @@ -32,7 +24,7 @@ use crate::disco::SendAddr; /// /// [`MagicSock`]: crate::magicsock::MagicSock /// [`NodeState`]: super::node_state::NodeState -#[derive(Debug)] +#[derive(Debug, Default, Clone, Copy)] pub(super) enum UdpSendAddr { /// The UDP address can be relied on to deliver data to the remote node. /// @@ -54,9 +46,39 @@ pub(super) enum UdpSendAddr { /// establish a connection. Unconfirmed(SocketAddr), /// No known UDP path exists to the remote node. + #[default] None, } +impl UdpSendAddr { + pub fn get_addr(&self) -> Option { + match self { + UdpSendAddr::Valid(addr) + | UdpSendAddr::Outdated(addr) + | UdpSendAddr::Unconfirmed(addr) => Some(*addr), + UdpSendAddr::None => None, + } + } + + pub fn is_better_than(&self, other: &Self) -> bool { + match (other, self) { + // Other being valid, we'll never be better + (UdpSendAddr::Valid(_), _) => false, + // Anything anything above outdated is better + (UdpSendAddr::Outdated(_), UdpSendAddr::Valid(_)) => true, + (UdpSendAddr::Outdated(_), _) => false, + // Anything above unconfirmed is better + (UdpSendAddr::Unconfirmed(_), UdpSendAddr::Valid(_)) => true, + (UdpSendAddr::Unconfirmed(_), UdpSendAddr::Outdated(_)) => true, + (UdpSendAddr::Unconfirmed(_), _) => false, + // None compared to none is equally bad + (UdpSendAddr::None, UdpSendAddr::None) => false, + // Anything above none is better + (UdpSendAddr::None, _) => true, + } + } +} + /// The UDP paths for a single node. /// /// Paths are identified by the [`IpPort`] of their UDP address. @@ -72,10 +94,16 @@ pub(super) enum UdpSendAddr { pub(super) struct NodeUdpPaths { /// The state for each of this node's direct paths. pub(super) paths: BTreeMap, - /// Best UDP path currently selected. - pub(super) best_addr: BestAddr, - /// If we had to choose a path because we had no `best_addr` it is stored here. - chosen_candidate: Option, + /// The current address we use to send on. + /// + /// This is *almost* the same as going through `paths` and finding + /// the best one, except that this is + /// 1. Not updated in `send_addr`, but instead when there's changes to `paths`, so that `send_addr` can take `&self`. + /// 2. Slightly sticky: It only changes when + /// - the current send addr is not a validated path anymore or + /// - we received a pong with lower latency. + pub(super) best: UdpSendAddr, + pub(super) best_non_ipv6: UdpSendAddr, } impl NodeUdpPaths { @@ -84,98 +112,78 @@ impl NodeUdpPaths { } #[cfg(test)] - pub(super) fn from_parts(paths: BTreeMap, best_addr: BestAddr) -> Self { + pub(super) fn from_parts(paths: BTreeMap, best: UdpSendAddr) -> Self { Self { paths, - best_addr, - chosen_candidate: None, + best_non_ipv6: best, + best, } } /// Returns the current UDP address to send on. - /// - /// TODO: The goal here is for this to simply return the already known send address, so - /// it should be `&self` and not `&mut self`. This is only possible once the state from - /// [`NodeUdpPaths`] is no longer modified from outside. - pub(super) fn send_addr(&mut self, now: Instant, have_ipv6: bool) -> UdpSendAddr { - self.assign_best_addr_from_candidates_if_empty(now); - match self.best_addr.state(now) { - best_addr::State::Valid(addr) => UdpSendAddr::Valid(addr.addr), - best_addr::State::Outdated(addr) => UdpSendAddr::Outdated(addr.addr), - best_addr::State::Empty => { - // No direct connection has been used before. If we know of any possible - // candidate addresses, randomly try to use one. This path is most - // effective when folks use a NodeAddr with exactly one direct address which - // they know to work, effectively like using a traditional socket or QUIC - // endpoint. - let addr = self - .chosen_candidate - .and_then(|ipp| self.paths.get(&ipp)) - .and_then(|path| path.udp_addr()) - .filter(|addr| addr.is_ipv4() || have_ipv6) - .or_else(|| { - // Look for a new candidate in all the known paths. This may look - // like a RNG use on the hot-path but this is normally invoked at - // most most once at startup. - let addr = self - .paths - .values() - .filter_map(|path| path.udp_addr()) - .filter(|addr| addr.is_ipv4() || have_ipv6) - .choose(&mut rand::thread_rng()); - self.chosen_candidate = addr.map(IpPort::from); - addr - }); - match addr { - Some(addr) => UdpSendAddr::Unconfirmed(addr), - None => UdpSendAddr::None, - } - } + pub(super) fn send_addr(&self, have_ipv6: bool) -> &UdpSendAddr { + if !have_ipv6 { + return &self.best_non_ipv6; + } + if self.best_non_ipv6.is_better_than(&self.best) { + return &self.best_non_ipv6; } + &self.best } - /// Fixup best_addr from candidates. - /// - /// If somehow we end up in a state where we failed to set a best_addr, while we do have - /// valid candidates, this will chose a candidate and set best_addr again. Most likely - /// this is a bug elsewhere though. - fn assign_best_addr_from_candidates_if_empty(&mut self, now: Instant) { - if !self.best_addr.is_empty() { - return; - } + pub(super) fn update_to_best_addr(&mut self, now: Instant) { + self.best_non_ipv6 = self.best_addr(false, now); + self.best = self.best_addr(true, now); + } - // The highest acceptable latency for an endpoint path. If the latency is higher - // then this the path will be ignored. - const MAX_LATENCY: Duration = Duration::from_secs(60 * 60); - let best_pong = self.paths.iter().fold(None, |best_pong, (ipp, state)| { - let best_latency = best_pong - .map(|p: &PongReply| p.latency) - .unwrap_or(MAX_LATENCY); - match state.validity.get_pong() { - // This pong is better if it has a lower latency, or if it has the same - // latency but on an IPv6 path. - Some(pong) - if pong.latency < best_latency - || (pong.latency == best_latency && ipp.ip().is_ipv6()) => - { - Some(pong) + pub(super) fn best_addr(&self, have_ipv6: bool, now: Instant) -> UdpSendAddr { + let Some((ipp, path)) = self + .paths + .iter() + .filter(|(ipp, _)| have_ipv6 || ipp.ip.is_ipv4()) + .max_by_key(|(ipp, path)| { + // We find the best by sorting on a key of type (Option>, Option>, bool) + // where the first is set to Some(ReverseOrd(latency)) iff path.is_valid(now) and + // the second is set to Some(ReverseOrd(latency)) if path.is_outdated(now) and + // the third is set to whether the ipp is ipv6. + // This makes max_by_key sort for the lowest valid latency first, then sort for + // the lowest outdated latency second, and if latencies are equal, it'll sort IPv6 paths first. + let is_ipv6 = ipp.ip.is_ipv6(); + if let Some(latency) = path.validity.latency_if_valid(now) { + (Some(ReverseOrd(latency)), None, is_ipv6) + } else if let Some(latency) = path.validity.latency_if_outdated(now) { + (None, Some(ReverseOrd(latency)), is_ipv6) + } else { + (None, None, is_ipv6) } - _ => best_pong, - } - }); + }) + else { + return UdpSendAddr::None; + }; - // If we found a candidate, set to best addr - if let Some(pong) = best_pong { - if let SendAddr::Udp(addr) = pong.from { - warn!(%addr, "No best_addr was set, choose candidate with lowest latency"); - self.best_addr.insert_if_better_or_reconfirm( - addr, - pong.latency, - best_addr::Source::BestCandidate, - pong.pong_at, - now, - ) - } + if path.validity.is_valid(now) { + UdpSendAddr::Valid((*ipp).into()) + } else if path.validity.is_outdated(now) { + UdpSendAddr::Outdated((*ipp).into()) + } else { + UdpSendAddr::Unconfirmed((*ipp).into()) } } } + +#[derive(PartialEq, Eq)] +struct ReverseOrd(N); + +impl Ord for ReverseOrd { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.0.cmp(&other.0).reverse() + } +} + +impl PartialOrd for ReverseOrd { + fn partial_cmp(&self, other: &Self) -> Option { + self.0 + .partial_cmp(&other.0) + .map(std::cmp::Ordering::reverse) + } +} From e2e97087278f8acb1572ab650b8508044c58fcf5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Tue, 22 Jul 2025 11:48:49 +0200 Subject: [PATCH 07/11] Remove `best_addr` module --- iroh/src/magicsock/node_map.rs | 3 +- iroh/src/magicsock/node_map/best_addr.rs | 283 ------------------- iroh/src/magicsock/node_map/node_state.rs | 2 +- iroh/src/magicsock/node_map/path_validity.rs | 8 + 4 files changed, 10 insertions(+), 286 deletions(-) delete mode 100644 iroh/src/magicsock/node_map/best_addr.rs diff --git a/iroh/src/magicsock/node_map.rs b/iroh/src/magicsock/node_map.rs index 952e325acad..ff4e8c62c5a 100644 --- a/iroh/src/magicsock/node_map.rs +++ b/iroh/src/magicsock/node_map.rs @@ -12,8 +12,8 @@ use stun_rs::TransactionId; use tracing::{debug, info, instrument, trace, warn}; use self::{ - best_addr::ClearReason, node_state::{NodeState, Options, PingHandled}, + path_validity::ClearReason, }; use super::{metrics::Metrics, ActorMessage, DiscoMessageSource, NodeIdMappedAddr}; #[cfg(any(test, feature = "test-utils"))] @@ -23,7 +23,6 @@ use crate::{ watchable::Watcher, }; -mod best_addr; mod node_state; mod path_state; mod path_validity; diff --git a/iroh/src/magicsock/node_map/best_addr.rs b/iroh/src/magicsock/node_map/best_addr.rs deleted file mode 100644 index 7e846bf1676..00000000000 --- a/iroh/src/magicsock/node_map/best_addr.rs +++ /dev/null @@ -1,283 +0,0 @@ -//! The [`BestAddr`] is the currently active best address for UDP sends. - -use std::net::SocketAddr; - -use n0_future::time::{Duration, Instant}; -use tracing::{debug, info}; - -/// How long we trust a UDP address as the exclusive path (without using relay) without having heard a Pong reply. -pub(super) const TRUST_UDP_ADDR_DURATION: Duration = Duration::from_millis(6500); - -/// The grace period at which we consider switching away from our best addr -/// to another address that we've received data on. -/// -/// The trusted address lifecycle goes as follows: -/// - A UDP DICSO pong is received, this validates that the path is for sure valid. -/// - The disco path that seems to have the lowest latency is the path we use to send on. -/// - We trust this path as a path to send on for at least TRUST_UDP_ADDR_DURATION. -/// - This time is extended every time we receive a UDP DISCO pong on the address. -/// - This time is *also* extended every time we receive *application* payloads on this -/// address (i.e. QUIC datagrams). -/// - If our best address becomes outdated (TRUST_UDP_ADDR_DURATION expires) without -/// another pong or payload data, then we'll start sending over the relay, too! -/// (we switch to ConnectionType::Mixed) -/// -/// However, we might not get any UDP DISCO pongs because they're UDP packets and get -/// lost under e.g. high load. -/// -/// This is usually fine, because we also receive on the best addr, and that extends its -/// "trust period" just as well. -/// -/// However, if *additionally* we send on a different address than the one we receive on, -/// then this extension doesn't happen. -/// -/// To fix this, we also apply the same path validation logic to non-best addresses. -/// I.e. we keep track of when they last received a pong, at which point we consider them -/// validated, and then extend this validation period when we receive application data -/// while they're valid (or when we receive another pong). -/// -/// Now, when our best address becomes outdated, we need to switch to another valid path. -/// -/// We could switch to another path once the best address becomes outdated, but then we'd -/// already start sending on the relay for a couple of iterations! -/// -/// So instead, we switch to another path when it looks like the best address becomes -/// outdated. -/// Not just any path, but the path that we're currently receiving from for this node. -/// -/// Since we might not be receiving constantly from the remote side (e.g. if it's a -/// one-sided transfer), we need to take care to do consider switching early enough. -/// -/// So this duration is chosen as at least 1 keep alive interval (1s default in iroh atm) -/// + at maximum 400ms of latency spike. -const TRUST_UDP_ADDR_SOON_OUTDATED: Duration = Duration::from_millis(1400); - -#[derive(Debug, Default)] -pub(super) struct BestAddr(Option); - -#[derive(Debug)] -struct BestAddrInner { - addr: AddrLatency, - trust_until: Option, - confirmed_at: Instant, -} - -impl BestAddrInner { - fn is_trusted(&self, now: Instant) -> bool { - self.trust_until - .map(|trust_until| trust_until >= now) - .unwrap_or(false) - } - - fn addr(&self) -> SocketAddr { - self.addr.addr - } -} - -#[derive(Debug)] -pub(super) enum Source { - ReceivedPong, - BestCandidate, - Udp, -} - -impl Source { - fn trust_duration(&self) -> Duration { - match self { - Source::ReceivedPong => TRUST_UDP_ADDR_DURATION, - // TODO: Fix time - Source::BestCandidate => Duration::from_secs(60 * 60), - Source::Udp => TRUST_UDP_ADDR_DURATION, - } - } -} - -#[derive(Debug)] -pub(super) enum State<'a> { - Valid(&'a AddrLatency), - Outdated(&'a AddrLatency), - Empty, -} - -#[derive(Debug, Clone, Copy)] -pub enum ClearReason { - Reset, - Inactive, - PongTimeout, - MatchesOurLocalAddr, -} - -impl BestAddr { - #[cfg(test)] - pub fn from_parts( - addr: SocketAddr, - latency: Duration, - confirmed_at: Instant, - trust_until: Instant, - ) -> Self { - let inner = BestAddrInner { - addr: AddrLatency { addr, latency }, - confirmed_at, - trust_until: Some(trust_until), - }; - Self(Some(inner)) - } - - pub fn is_empty(&self) -> bool { - self.0.is_none() - } - - /// Unconditionally clears the best address. - pub fn clear(&mut self, reason: ClearReason, has_relay: bool) { - let old = self.0.take(); - if let Some(old_addr) = old.as_ref().map(BestAddrInner::addr) { - info!(?reason, ?has_relay, %old_addr, "clearing best_addr"); - } - } - - /// Clears the best address if equal to `addr`. - pub fn clear_if_equals(&mut self, addr: SocketAddr, reason: ClearReason, has_relay: bool) { - if self.addr() == Some(addr) { - self.clear(reason, has_relay) - } - } - - pub fn clear_trust(&mut self, why: &'static str) { - if let Some(state) = self.0.as_mut() { - info!( - %why, - prev_trust_until = ?state.trust_until, - "clearing best_addr trust", - ); - state.trust_until = None; - } - } - - pub fn insert_if_better_or_reconfirm( - &mut self, - addr: SocketAddr, - latency: Duration, - source: Source, - confirmed_at: Instant, - now: Instant, - ) { - match self.0.as_mut() { - None => { - self.insert(addr, latency, source, confirmed_at); - } - Some(state) => { - let candidate = AddrLatency { addr, latency }; - if !state.is_trusted(now) || candidate.is_better_than(&state.addr) { - self.insert(addr, latency, source, confirmed_at); - } else if state.addr.addr == addr { - state.confirmed_at = confirmed_at; - state.trust_until = Some(confirmed_at + source.trust_duration()); - } - } - } - } - - pub fn insert_if_soon_outdated_or_reconfirm( - &mut self, - addr: SocketAddr, - latency: Duration, - source: Source, - confirmed_at: Instant, - now: Instant, - ) { - match self.0.as_mut() { - None => { - self.insert(addr, latency, source, confirmed_at); - } - Some(state) => { - // If the current best addr will soon be outdated - // and the given candidate will be trusted for longer - if !state.is_trusted(now + TRUST_UDP_ADDR_SOON_OUTDATED) - && state.confirmed_at < confirmed_at - { - self.insert(addr, latency, source, confirmed_at); - } else if state.addr.addr == addr { - state.confirmed_at = confirmed_at; - state.trust_until = Some(confirmed_at + source.trust_duration()); - } - } - } - } - - fn insert( - &mut self, - addr: SocketAddr, - latency: Duration, - source: Source, - confirmed_at: Instant, - ) { - let trust_until = confirmed_at + source.trust_duration(); - - if self - .0 - .as_ref() - .map(|prev| prev.addr.addr == addr) - .unwrap_or_default() - { - debug!( - %addr, - latency = ?latency, - trust_for = ?trust_until.duration_since(Instant::now()), - "re-selecting direct path for node" - ); - } else { - info!( - %addr, - latency = ?latency, - trust_for = ?trust_until.duration_since(Instant::now()), - "selecting new direct path for node" - ); - } - let inner = BestAddrInner { - addr: AddrLatency { addr, latency }, - trust_until: Some(trust_until), - confirmed_at, - }; - self.0 = Some(inner); - } - - pub fn state(&self, now: Instant) -> State { - match &self.0 { - None => State::Empty, - Some(state) => match state.trust_until { - Some(expiry) if now < expiry => State::Valid(&state.addr), - Some(_) | None => State::Outdated(&state.addr), - }, - } - } - - pub fn addr(&self) -> Option { - self.0.as_ref().map(BestAddrInner::addr) - } -} - -/// A `SocketAddr` with an associated latency. -#[derive(Debug, Clone)] -pub struct AddrLatency { - pub addr: SocketAddr, - pub latency: Duration, -} - -impl AddrLatency { - /// Reports whether `self` is a better addr to use than `other`. - fn is_better_than(&self, other: &Self) -> bool { - if self.addr == other.addr { - return false; - } - if self.addr.is_ipv6() && other.addr.is_ipv4() { - // Prefer IPv6 for being a bit more robust, as long as - // the latencies are roughly equivalent. - if self.latency / 10 * 9 < other.latency { - return true; - } - } else if self.addr.is_ipv4() && other.addr.is_ipv6() && other.is_better_than(self) { - return false; - } - self.latency < other.latency - } -} diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index 1bfb8f7734b..2999c6677ad 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -17,8 +17,8 @@ use tokio::sync::mpsc; use tracing::{debug, event, info, instrument, trace, warn, Level}; use super::{ - best_addr::ClearReason, path_state::{summarize_node_paths, PathState}, + path_validity::ClearReason, udp_paths::{NodeUdpPaths, UdpSendAddr}, IpPort, Source, }; diff --git a/iroh/src/magicsock/node_map/path_validity.rs b/iroh/src/magicsock/node_map/path_validity.rs index 3f34a2df989..392b4c51448 100644 --- a/iroh/src/magicsock/node_map/path_validity.rs +++ b/iroh/src/magicsock/node_map/path_validity.rs @@ -28,6 +28,14 @@ pub(super) enum Source { Udp, } +#[derive(Debug, Clone, Copy)] +pub enum ClearReason { + Reset, + Inactive, + PongTimeout, + MatchesOurLocalAddr, +} + impl Source { fn trust_duration(&self) -> Duration { match self { From 7b03147fb8bdd02176f81830994fa7cd43288519 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Tue, 22 Jul 2025 12:04:20 +0200 Subject: [PATCH 08/11] Cleanup --- iroh/src/magicsock/node_map/node_state.rs | 9 ------ iroh/src/magicsock/node_map/path_state.rs | 12 +------- iroh/src/magicsock/node_map/path_validity.rs | 18 +++++++---- iroh/src/magicsock/node_map/udp_paths.rs | 32 +++++--------------- 4 files changed, 20 insertions(+), 51 deletions(-) diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index 2999c6677ad..5c2c9fdf2c1 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -1511,12 +1511,6 @@ mod tests { udp_paths: NodeUdpPaths::from_parts( endpoint_state, UdpSendAddr::Valid(ip_port.into()), - // BestAddr::from_parts( - // ip_port.into(), - // latency, - // now, - // now + Duration::from_secs(100), //TODO(matheus23): Is the 100s validity needed for the test? - // ), ), sent_pings: HashMap::new(), last_used: Some(now), @@ -1582,7 +1576,6 @@ mod tests { // endpoint w/ expired best addr and relay w/ latency let (d_endpoint, d_socket_addr) = { let socket_addr: SocketAddr = "0.0.0.0:7".parse().unwrap(); - let expired = now.checked_sub(Duration::from_secs(100)).unwrap(); let key = SecretKey::generate(rand::thread_rng()); let node_id = key.public(); let endpoint_state = BTreeMap::from([( @@ -1607,8 +1600,6 @@ mod tests { udp_paths: NodeUdpPaths::from_parts( endpoint_state, UdpSendAddr::Outdated(socket_addr), - // TODO(matheus23): Test might need adjustments - // BestAddr::from_parts(socket_addr, Duration::from_millis(80), now, expired), ), sent_pings: HashMap::new(), last_used: Some(now), diff --git a/iroh/src/magicsock/node_map/path_state.rs b/iroh/src/magicsock/node_map/path_state.rs index 913f16be44f..4cc0205f355 100644 --- a/iroh/src/magicsock/node_map/path_state.rs +++ b/iroh/src/magicsock/node_map/path_state.rs @@ -1,9 +1,6 @@ //! The state kept for each network path to a remote node. -use std::{ - collections::{BTreeMap, HashMap}, - net::SocketAddr, -}; +use std::collections::{BTreeMap, HashMap}; use iroh_base::NodeId; use iroh_relay::protos::stun; @@ -87,13 +84,6 @@ impl PathState { } } - pub(super) fn udp_addr(&self) -> Option { - match self.path { - SendAddr::Udp(addr) => Some(addr), - SendAddr::Relay(_) => None, - } - } - pub(super) fn with_last_payload( node_id: NodeId, path: SendAddr, diff --git a/iroh/src/magicsock/node_map/path_validity.rs b/iroh/src/magicsock/node_map/path_validity.rs index 392b4c51448..8d5d10c804b 100644 --- a/iroh/src/magicsock/node_map/path_validity.rs +++ b/iroh/src/magicsock/node_map/path_validity.rs @@ -2,7 +2,13 @@ use n0_future::time::{Duration, Instant}; use crate::magicsock::node_map::node_state::PongReply; -/// How long we trust a UDP address as the exclusive path (without using relay) without having heard a Pong reply. +/// How long we trust a UDP address as the exclusive path (i.e. without also sending via the relay). +/// +/// Trust for a UDP address begins when we receive a DISCO UDP pong on that address. +/// It is then further extended by this duration every time we receive QUIC payload data while it's +/// currently trusted. +/// +/// If trust goes away, it can be brought back with another valid DISCO UDP pong. const TRUST_UDP_ADDR_DURATION: Duration = Duration::from_millis(6500); /// Tracks a path's validity. @@ -30,9 +36,9 @@ pub(super) enum Source { #[derive(Debug, Clone, Copy)] pub enum ClearReason { - Reset, + // Reset, // TODO(matheus23): unused Inactive, - PongTimeout, + // PongTimeout, MatchesOurLocalAddr, } @@ -117,9 +123,9 @@ impl PathValidity { } // TODO(matheus23): Use this to bias the choice of best outdated addr maybe? - pub(super) fn confirmed_at(&self) -> Option { - self.0.as_ref().map(|inner| inner.confirmed_at) - } + // pub(super) fn confirmed_at(&self) -> Option { + // self.0.as_ref().map(|inner| inner.confirmed_at) + // } } impl Inner { diff --git a/iroh/src/magicsock/node_map/udp_paths.rs b/iroh/src/magicsock/node_map/udp_paths.rs index e51ea0e0f1f..86db241ac00 100644 --- a/iroh/src/magicsock/node_map/udp_paths.rs +++ b/iroh/src/magicsock/node_map/udp_paths.rs @@ -59,24 +59,6 @@ impl UdpSendAddr { UdpSendAddr::None => None, } } - - pub fn is_better_than(&self, other: &Self) -> bool { - match (other, self) { - // Other being valid, we'll never be better - (UdpSendAddr::Valid(_), _) => false, - // Anything anything above outdated is better - (UdpSendAddr::Outdated(_), UdpSendAddr::Valid(_)) => true, - (UdpSendAddr::Outdated(_), _) => false, - // Anything above unconfirmed is better - (UdpSendAddr::Unconfirmed(_), UdpSendAddr::Valid(_)) => true, - (UdpSendAddr::Unconfirmed(_), UdpSendAddr::Outdated(_)) => true, - (UdpSendAddr::Unconfirmed(_), _) => false, - // None compared to none is equally bad - (UdpSendAddr::None, UdpSendAddr::None) => false, - // Anything above none is better - (UdpSendAddr::None, _) => true, - } - } } /// The UDP paths for a single node. @@ -103,7 +85,10 @@ pub(super) struct NodeUdpPaths { /// - the current send addr is not a validated path anymore or /// - we received a pong with lower latency. pub(super) best: UdpSendAddr, - pub(super) best_non_ipv6: UdpSendAddr, + /// The current best address to send on from all IPv4 addresses we have available. + /// + /// Follows the same logic as `best` above, but doesn't include any IPv6 addresses. + pub(super) best_ipv4: UdpSendAddr, } impl NodeUdpPaths { @@ -115,7 +100,7 @@ impl NodeUdpPaths { pub(super) fn from_parts(paths: BTreeMap, best: UdpSendAddr) -> Self { Self { paths, - best_non_ipv6: best, + best_ipv4: best, // we only use ipv4 addrs in tests best, } } @@ -123,16 +108,13 @@ impl NodeUdpPaths { /// Returns the current UDP address to send on. pub(super) fn send_addr(&self, have_ipv6: bool) -> &UdpSendAddr { if !have_ipv6 { - return &self.best_non_ipv6; - } - if self.best_non_ipv6.is_better_than(&self.best) { - return &self.best_non_ipv6; + return &self.best_ipv4; } &self.best } pub(super) fn update_to_best_addr(&mut self, now: Instant) { - self.best_non_ipv6 = self.best_addr(false, now); + self.best_ipv4 = self.best_addr(false, now); self.best = self.best_addr(true, now); } From b08df6a04d011fa4681fb5cd44ceaa6e53e6b32f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Tue, 22 Jul 2025 17:16:02 +0200 Subject: [PATCH 09/11] Remove unused structs/variants, rename `Source::Udp` -> `Source::QuicPayload` --- iroh/src/magicsock/node_map.rs | 13 +++++-------- iroh/src/magicsock/node_map/node_state.rs | 14 ++++---------- iroh/src/magicsock/node_map/path_state.rs | 11 +++-------- iroh/src/magicsock/node_map/path_validity.rs | 15 ++------------- 4 files changed, 14 insertions(+), 39 deletions(-) diff --git a/iroh/src/magicsock/node_map.rs b/iroh/src/magicsock/node_map.rs index ff4e8c62c5a..0597b30ea3b 100644 --- a/iroh/src/magicsock/node_map.rs +++ b/iroh/src/magicsock/node_map.rs @@ -11,10 +11,7 @@ use serde::{Deserialize, Serialize}; use stun_rs::TransactionId; use tracing::{debug, info, instrument, trace, warn}; -use self::{ - node_state::{NodeState, Options, PingHandled}, - path_validity::ClearReason, -}; +use self::node_state::{NodeState, Options, PingHandled}; use super::{metrics::Metrics, ActorMessage, DiscoMessageSource, NodeIdMappedAddr}; #[cfg(any(test, feature = "test-utils"))] use crate::endpoint::PathSelection; @@ -395,22 +392,22 @@ impl NodeMapInner { now: Instant, ) { for addr in discovered { - self.remove_by_ipp(addr.into(), ClearReason::MatchesOurLocalAddr, now) + self.remove_by_ipp(addr.into(), now, "matches our local addr") } } /// Removes a direct address from a node. - fn remove_by_ipp(&mut self, ipp: IpPort, reason: ClearReason, now: Instant) { + fn remove_by_ipp(&mut self, ipp: IpPort, now: Instant, why: &'static str) { if let Some(id) = self.by_ip_port.remove(&ipp) { if let Entry::Occupied(mut entry) = self.by_id.entry(id) { let node = entry.get_mut(); - node.remove_direct_addr(&ipp, reason, now); + node.remove_direct_addr(&ipp, now, why); if node.direct_addresses().count() == 0 { let node_id = node.public_key(); let mapped_addr = node.quic_mapped_addr(); self.by_node_key.remove(node_id); self.by_quic_mapped_addr.remove(mapped_addr); - debug!(node_id=%node_id.fmt_short(), ?reason, "removing node"); + debug!(node_id=%node_id.fmt_short(), why, "removing node"); entry.remove(); } } diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index 5c2c9fdf2c1..faa5a1766f8 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -18,7 +18,6 @@ use tracing::{debug, event, info, instrument, trace, warn, Level}; use super::{ path_state::{summarize_node_paths, PathState}, - path_validity::ClearReason, udp_paths::{NodeUdpPaths, UdpSendAddr}, IpPort, Source, }; @@ -372,19 +371,14 @@ impl NodeState { /// Removes a direct address for this node. /// /// If this is also the best address, it will be cleared as well. - pub(super) fn remove_direct_addr( - &mut self, - ip_port: &IpPort, - reason: ClearReason, - now: Instant, - ) { + pub(super) fn remove_direct_addr(&mut self, ip_port: &IpPort, now: Instant, why: &'static str) { let Some(state) = self.udp_paths.paths.remove(ip_port) else { return; }; match state.last_alive().map(|instant| instant.elapsed()) { - Some(last_alive) => debug!(%ip_port, ?last_alive, ?reason, "pruning address"), - None => debug!(%ip_port, last_seen=%"never", ?reason, "pruning address"), + Some(last_alive) => debug!(%ip_port, ?last_alive, why, "pruning address"), + None => debug!(%ip_port, last_seen=%"never", why, "pruning address"), } self.udp_paths.update_to_best_addr(now); @@ -873,7 +867,7 @@ impl NodeState { prune_candidates.sort_unstable_by_key(|(_ip_port, last_alive)| *last_alive); prune_candidates.truncate(prune_count); for (ip_port, _last_alive) in prune_candidates.into_iter() { - self.remove_direct_addr(&ip_port, ClearReason::Inactive, now); + self.remove_direct_addr(&ip_port, now, "inactive"); } debug!( paths = %summarize_node_paths(&self.udp_paths.paths), diff --git a/iroh/src/magicsock/node_map/path_state.rs b/iroh/src/magicsock/node_map/path_state.rs index 4cc0205f355..025658a7c04 100644 --- a/iroh/src/magicsock/node_map/path_state.rs +++ b/iroh/src/magicsock/node_map/path_state.rs @@ -47,13 +47,10 @@ pub(super) struct PathState { /// The time this endpoint was last advertised via a call-me-maybe DISCO message. pub(super) call_me_maybe_time: Option, - // /// The most recent [`PongReply`]. - // /// - // /// Previous replies are cleared when they are no longer relevant to determine whether - // /// this path can still be used to reach the remote node. - // pub(super) recent_pong: Option, /// Tracks whether this path is valid. /// + /// Also stores the latest [`PongReply`], if there is one. + /// /// See [`PathValidity`] docs. pub(super) validity: PathValidity, /// When the last payload data was **received** via this path. @@ -134,10 +131,8 @@ impl PathState { pub(super) fn receive_payload(&mut self, now: Instant) { self.last_payload_msg = Some(now); - // TODO(matheus23): It's not necessarily UDP. Kinda weird to have this thing - // Also, it all results in the same 6.5s timeout anyways, maybe we just remove it? self.validity - .receive_payload(now, path_validity::Source::Udp); + .receive_payload(now, path_validity::Source::QuicPayload); } #[cfg(test)] diff --git a/iroh/src/magicsock/node_map/path_validity.rs b/iroh/src/magicsock/node_map/path_validity.rs index 8d5d10c804b..ed91f266aae 100644 --- a/iroh/src/magicsock/node_map/path_validity.rs +++ b/iroh/src/magicsock/node_map/path_validity.rs @@ -30,25 +30,14 @@ struct Inner { #[derive(Debug)] pub(super) enum Source { ReceivedPong, - // BestCandidate, - Udp, -} - -#[derive(Debug, Clone, Copy)] -pub enum ClearReason { - // Reset, // TODO(matheus23): unused - Inactive, - // PongTimeout, - MatchesOurLocalAddr, + QuicPayload, } impl Source { fn trust_duration(&self) -> Duration { match self { Source::ReceivedPong => TRUST_UDP_ADDR_DURATION, - // // TODO: Fix time - // Source::BestCandidate => Duration::from_secs(60 * 60), - Source::Udp => TRUST_UDP_ADDR_DURATION, + Source::QuicPayload => TRUST_UDP_ADDR_DURATION, } } } From 8e4d739c2bf8814e207c318585f689738dc0a460 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Wed, 30 Jul 2025 11:13:29 +0200 Subject: [PATCH 10/11] fix: Only free up sending a call me maybe when the best addr was invalidated --- iroh/src/magicsock/node_map/node_state.rs | 10 ++++--- iroh/src/magicsock/node_map/udp_paths.rs | 34 ++++++++++++++++++++--- 2 files changed, 36 insertions(+), 8 deletions(-) diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index faa5a1766f8..ba8ec0d42c0 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -1037,10 +1037,12 @@ impl NodeState { } } } - // Clear trust on our best_addr if it is not included in the updated set. Also - // clear the last call-me-maybe send time so we will send one again. - self.udp_paths.update_to_best_addr(now); - self.last_call_me_maybe = None; + // Clear trust on our best_addr if it is not included in the updated set. + let changed = self.udp_paths.update_to_best_addr(now); + if changed { + // Clear the last call-me-maybe send time so we will send one again. + self.last_call_me_maybe = None; + } debug!( paths = %summarize_node_paths(&self.udp_paths.paths), "updated endpoint paths from call-me-maybe", diff --git a/iroh/src/magicsock/node_map/udp_paths.rs b/iroh/src/magicsock/node_map/udp_paths.rs index 86db241ac00..12cd7ee8e04 100644 --- a/iroh/src/magicsock/node_map/udp_paths.rs +++ b/iroh/src/magicsock/node_map/udp_paths.rs @@ -8,6 +8,7 @@ use std::{collections::BTreeMap, net::SocketAddr}; use n0_future::time::Instant; +use tracing::{event, Level}; use super::{path_state::PathState, IpPort}; @@ -24,7 +25,7 @@ use super::{path_state::PathState, IpPort}; /// /// [`MagicSock`]: crate::magicsock::MagicSock /// [`NodeState`]: super::node_state::NodeState -#[derive(Debug, Default, Clone, Copy)] +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] pub(super) enum UdpSendAddr { /// The UDP address can be relied on to deliver data to the remote node. /// @@ -113,9 +114,34 @@ impl NodeUdpPaths { &self.best } - pub(super) fn update_to_best_addr(&mut self, now: Instant) { - self.best_ipv4 = self.best_addr(false, now); - self.best = self.best_addr(true, now); + /// Changes the current best address(es) to ones chosen as described in [`Self::best_addr`] docs. + /// + /// Returns whether one of the best addresses had to change. + /// + /// This should be called any time that `paths` is modified. + pub(super) fn update_to_best_addr(&mut self, now: Instant) -> bool { + let best_ipv4 = self.best_addr(false, now); + let best = self.best_addr(true, now); + let mut changed = false; + if best_ipv4 != self.best_ipv4 { + event!( + target: "iroh::_events::udp::best_ipv4", + Level::DEBUG, + ?best_ipv4, + ); + changed = true; + } + if best != self.best { + event!( + target: "iroh::_events::udp::best", + Level::DEBUG, + ?best, + ); + changed = true; + } + self.best_ipv4 = best_ipv4; + self.best = best; + changed } pub(super) fn best_addr(&self, have_ipv6: bool, now: Instant) -> UdpSendAddr { From d827e0a997a77ec4f7981e55bfffc357d399dd04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Fri, 1 Aug 2025 09:38:03 +0200 Subject: [PATCH 11/11] fix: Make sure to update the best addr when `add_node_addr` was called --- iroh/src/magicsock/node_map/node_state.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index ba8ec0d42c0..b96bfe33fd7 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -709,6 +709,7 @@ impl NodeState { PathState::new(self.node_id, SendAddr::from(addr), source.clone(), now) }); } + self.udp_paths.update_to_best_addr(now); let paths = summarize_node_paths(&self.udp_paths.paths); debug!(new = ?new_addrs , %paths, "added new direct paths for endpoint"); }