diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 10552695edf..b778f8e19c8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,7 +18,7 @@ env: RUSTDOCFLAGS: -Dwarnings MSRV: "1.81" SCCACHE_CACHE_SIZE: "10G" - IROH_FORCE_STAGING_RELAYS: "1" + # IROH_FORCE_STAGING_RELAYS: "1" jobs: tests: diff --git a/iroh/src/magicsock/node_map/best_addr.rs b/iroh/src/magicsock/node_map/best_addr.rs index 18d9ef960be..7e846bf1676 100644 --- a/iroh/src/magicsock/node_map/best_addr.rs +++ b/iroh/src/magicsock/node_map/best_addr.rs @@ -6,7 +6,51 @@ use n0_future::time::{Duration, Instant}; use tracing::{debug, info}; /// How long we trust a UDP address as the exclusive path (without using relay) without having heard a Pong reply. -const TRUST_UDP_ADDR_DURATION: Duration = Duration::from_millis(6500); +pub(super) const TRUST_UDP_ADDR_DURATION: Duration = Duration::from_millis(6500); + +/// The grace period at which we consider switching away from our best addr +/// to another address that we've received data on. +/// +/// The trusted address lifecycle goes as follows: +/// - A UDP DICSO pong is received, this validates that the path is for sure valid. +/// - The disco path that seems to have the lowest latency is the path we use to send on. +/// - We trust this path as a path to send on for at least TRUST_UDP_ADDR_DURATION. +/// - This time is extended every time we receive a UDP DISCO pong on the address. +/// - This time is *also* extended every time we receive *application* payloads on this +/// address (i.e. QUIC datagrams). +/// - If our best address becomes outdated (TRUST_UDP_ADDR_DURATION expires) without +/// another pong or payload data, then we'll start sending over the relay, too! +/// (we switch to ConnectionType::Mixed) +/// +/// However, we might not get any UDP DISCO pongs because they're UDP packets and get +/// lost under e.g. high load. +/// +/// This is usually fine, because we also receive on the best addr, and that extends its +/// "trust period" just as well. +/// +/// However, if *additionally* we send on a different address than the one we receive on, +/// then this extension doesn't happen. +/// +/// To fix this, we also apply the same path validation logic to non-best addresses. +/// I.e. we keep track of when they last received a pong, at which point we consider them +/// validated, and then extend this validation period when we receive application data +/// while they're valid (or when we receive another pong). +/// +/// Now, when our best address becomes outdated, we need to switch to another valid path. +/// +/// We could switch to another path once the best address becomes outdated, but then we'd +/// already start sending on the relay for a couple of iterations! +/// +/// So instead, we switch to another path when it looks like the best address becomes +/// outdated. +/// Not just any path, but the path that we're currently receiving from for this node. +/// +/// Since we might not be receiving constantly from the remote side (e.g. if it's a +/// one-sided transfer), we need to take care to do consider switching early enough. +/// +/// So this duration is chosen as at least 1 keep alive interval (1s default in iroh atm) +/// + at maximum 400ms of latency spike. +const TRUST_UDP_ADDR_SOON_OUTDATED: Duration = Duration::from_millis(1400); #[derive(Debug, Default)] pub(super) struct BestAddr(Option); @@ -38,12 +82,12 @@ pub(super) enum Source { } impl Source { - fn trust_until(&self, from: Instant) -> Instant { + fn trust_duration(&self) -> Duration { match self { - Source::ReceivedPong => from + TRUST_UDP_ADDR_DURATION, + Source::ReceivedPong => TRUST_UDP_ADDR_DURATION, // TODO: Fix time - Source::BestCandidate => from + Duration::from_secs(60 * 60), - Source::Udp => from + TRUST_UDP_ADDR_DURATION, + Source::BestCandidate => Duration::from_secs(60 * 60), + Source::Udp => TRUST_UDP_ADDR_DURATION, } } } @@ -115,6 +159,7 @@ impl BestAddr { latency: Duration, source: Source, confirmed_at: Instant, + now: Instant, ) { match self.0.as_mut() { None => { @@ -122,23 +167,39 @@ impl BestAddr { } Some(state) => { let candidate = AddrLatency { addr, latency }; - if !state.is_trusted(confirmed_at) || candidate.is_better_than(&state.addr) { + if !state.is_trusted(now) || candidate.is_better_than(&state.addr) { self.insert(addr, latency, source, confirmed_at); } else if state.addr.addr == addr { state.confirmed_at = confirmed_at; - state.trust_until = Some(source.trust_until(confirmed_at)); + state.trust_until = Some(confirmed_at + source.trust_duration()); } } } } - /// Reset the expiry, if the passed in addr matches the currently used one. - #[cfg(not(wasm_browser))] - pub fn reconfirm_if_used(&mut self, addr: SocketAddr, source: Source, confirmed_at: Instant) { - if let Some(state) = self.0.as_mut() { - if state.addr.addr == addr { - state.confirmed_at = confirmed_at; - state.trust_until = Some(source.trust_until(confirmed_at)); + pub fn insert_if_soon_outdated_or_reconfirm( + &mut self, + addr: SocketAddr, + latency: Duration, + source: Source, + confirmed_at: Instant, + now: Instant, + ) { + match self.0.as_mut() { + None => { + self.insert(addr, latency, source, confirmed_at); + } + Some(state) => { + // If the current best addr will soon be outdated + // and the given candidate will be trusted for longer + if !state.is_trusted(now + TRUST_UDP_ADDR_SOON_OUTDATED) + && state.confirmed_at < confirmed_at + { + self.insert(addr, latency, source, confirmed_at); + } else if state.addr.addr == addr { + state.confirmed_at = confirmed_at; + state.trust_until = Some(confirmed_at + source.trust_duration()); + } } } } @@ -150,7 +211,7 @@ impl BestAddr { source: Source, confirmed_at: Instant, ) { - let trust_until = source.trust_until(confirmed_at); + let trust_until = confirmed_at + source.trust_duration(); if self .0 diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index adbd155a791..ff495c947f6 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -978,6 +978,7 @@ impl NodeState { latency, best_addr::Source::ReceivedPong, now, + now, ); } @@ -1064,18 +1065,26 @@ impl NodeState { debug_assert!(false, "node map inconsistency by_ip_port <-> direct addr"); return; }; - state.last_payload_msg = Some(now); + state.receive_payload(now); self.last_used = Some(now); - self.udp_paths - .best_addr - .reconfirm_if_used(addr.into(), BestAddrSource::Udp, now); + if let Some((latency, confirmed_at)) = state.valid_best_addr_candidate(now) { + self.udp_paths + .best_addr + .insert_if_soon_outdated_or_reconfirm( + addr.into(), + latency, + BestAddrSource::Udp, + confirmed_at, + now, + ); + } } pub(super) fn receive_relay(&mut self, url: &RelayUrl, src: NodeId, now: Instant) { match self.relay_url.as_mut() { Some((current_home, state)) if current_home == url => { // We received on the expected url. update state. - state.last_payload_msg = Some(now); + state.receive_payload(now); } Some((_current_home, _state)) => { // we have a different url. we only update on ping, not on receive_relay. diff --git a/iroh/src/magicsock/node_map/path_state.rs b/iroh/src/magicsock/node_map/path_state.rs index 7241121722a..c115e90e828 100644 --- a/iroh/src/magicsock/node_map/path_state.rs +++ b/iroh/src/magicsock/node_map/path_state.rs @@ -14,7 +14,10 @@ use super::{ node_state::{ControlMsg, PongReply, SESSION_ACTIVE_TIMEOUT}, IpPort, PingRole, Source, }; -use crate::{disco::SendAddr, magicsock::HEARTBEAT_INTERVAL}; +use crate::{ + disco::SendAddr, + magicsock::{node_map::best_addr::TRUST_UDP_ADDR_DURATION, HEARTBEAT_INTERVAL}, +}; /// The minimum time between pings to an endpoint. /// @@ -53,6 +56,9 @@ pub(super) struct PathState { /// /// This excludes DISCO messages. pub(super) last_payload_msg: Option, + /// Whether the last payload msg was within [`TRUST_UDP_ADDR_DURATION`] of either + /// the last trusted payload message or a recent pong. + pub(super) last_payload_trusted: bool, /// Sources is a map of [`Source`]s to [`Instant`]s, keeping track of all the ways we have /// learned about this path /// @@ -73,6 +79,7 @@ impl PathState { call_me_maybe_time: None, recent_pong: None, last_payload_msg: None, + last_payload_trusted: false, sources, } } @@ -100,6 +107,7 @@ impl PathState { call_me_maybe_time: None, recent_pong: None, last_payload_msg: Some(now), + last_payload_trusted: false, sources, } } @@ -129,6 +137,26 @@ impl PathState { } } self.recent_pong = Some(r); + + if let Some(last_payload_msg) = self.last_payload_msg { + self.last_payload_trusted = self.within_trusted_pong_duration(last_payload_msg); + } + } + + fn within_trusted_pong_duration(&self, instant: Instant) -> bool { + if let Some(pong) = &self.recent_pong { + return pong.pong_at <= instant && instant < pong.pong_at + TRUST_UDP_ADDR_DURATION; + } + + false + } + + pub(super) fn receive_payload(&mut self, now: Instant) { + self.last_payload_msg = Some(now); + + if self.within_trusted_pong_duration(now) { + self.last_payload_trusted = true; + } } #[cfg(test)] @@ -141,6 +169,7 @@ impl PathState { call_me_maybe_time: None, recent_pong: Some(r), last_payload_msg: None, + last_payload_trusted: false, sources: HashMap::new(), } } @@ -187,6 +216,21 @@ impl PathState { .copied() } + fn last_confirmed_at(&self) -> Option { + let last_trusted_payload_msg = if self.last_payload_trusted { + self.last_payload_msg + } else { + None + }; + + self.recent_pong + .as_ref() + .map(|pong| pong.pong_at) + .into_iter() + .chain(last_trusted_payload_msg) + .max() + } + /// The last control or DISCO message **about** this path. /// /// This is the most recent instant among: @@ -222,6 +266,18 @@ impl PathState { self.recent_pong.as_ref().map(|p| p.latency) } + /// Returns the latency and confirmed_at time if this path would make for a valid best addr `now` + pub(crate) fn valid_best_addr_candidate(&self, now: Instant) -> Option<(Duration, Instant)> { + let latency = self.recent_pong.as_ref()?.latency; + let confirmed_at = self.last_confirmed_at()?; + + if confirmed_at <= now && now < confirmed_at + TRUST_UDP_ADDR_DURATION { + return Some((latency, confirmed_at)); + } + + None + } + pub(super) fn needs_ping(&self, now: &Instant) -> bool { match self.last_ping { None => true, diff --git a/iroh/src/magicsock/node_map/udp_paths.rs b/iroh/src/magicsock/node_map/udp_paths.rs index 0d96c3bba0b..53d32e861da 100644 --- a/iroh/src/magicsock/node_map/udp_paths.rs +++ b/iroh/src/magicsock/node_map/udp_paths.rs @@ -98,7 +98,7 @@ impl NodeUdpPaths { /// it should be `&self` and not `&mut self`. This is only possible once the state from /// [`NodeUdpPaths`] is no longer modified from outside. pub(super) fn send_addr(&mut self, now: Instant, have_ipv6: bool) -> UdpSendAddr { - self.assign_best_addr_from_candidates_if_empty(); + self.assign_best_addr_from_candidates_if_empty(now); match self.best_addr.state(now) { best_addr::State::Valid(addr) => UdpSendAddr::Valid(addr.addr), best_addr::State::Outdated(addr) => UdpSendAddr::Outdated(addr.addr), @@ -139,7 +139,7 @@ impl NodeUdpPaths { /// If somehow we end up in a state where we failed to set a best_addr, while we do have /// valid candidates, this will chose a candidate and set best_addr again. Most likely /// this is a bug elsewhere though. - fn assign_best_addr_from_candidates_if_empty(&mut self) { + fn assign_best_addr_from_candidates_if_empty(&mut self, now: Instant) { if !self.best_addr.is_empty() { return; } @@ -173,6 +173,7 @@ impl NodeUdpPaths { pong.latency, best_addr::Source::BestCandidate, pong.pong_at, + now, ) } } diff --git a/iroh/tests/integration.rs b/iroh/tests/integration.rs index 0ccac01cb2f..a0ed467ba71 100644 --- a/iroh/tests/integration.rs +++ b/iroh/tests/integration.rs @@ -36,8 +36,15 @@ const ECHO_ALPN: &[u8] = b"echo"; async fn simple_node_id_based_connection_transfer() -> TestResult { setup_logging(); - let client = Endpoint::builder().discovery_n0().bind().await?; + let relay_map = iroh::defaults::prod::default_relay_map(); + + let client = Endpoint::builder() + .relay_mode(iroh::RelayMode::Custom(relay_map.clone())) + .discovery_n0() + .bind() + .await?; let server = Endpoint::builder() + .relay_mode(iroh::RelayMode::Custom(relay_map)) .discovery_n0() .alpns(vec![ECHO_ALPN.to_vec()]) .bind()