Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ env:
RUSTDOCFLAGS: -Dwarnings
MSRV: "1.81"
SCCACHE_CACHE_SIZE: "10G"
IROH_FORCE_STAGING_RELAYS: "1"
# IROH_FORCE_STAGING_RELAYS: "1"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only way to turn off staging relays.

Unfortunately the staging relays are already on main, and main contains breaking changes to the relay wire protocol.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... And apparently it's still not quite sufficient.


jobs:
tests:
Expand Down
91 changes: 76 additions & 15 deletions iroh/src/magicsock/node_map/best_addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,51 @@ use n0_future::time::{Duration, Instant};
use tracing::{debug, info};

/// How long we trust a UDP address as the exclusive path (without using relay) without having heard a Pong reply.
const TRUST_UDP_ADDR_DURATION: Duration = Duration::from_millis(6500);
pub(super) const TRUST_UDP_ADDR_DURATION: Duration = Duration::from_millis(6500);

/// The grace period at which we consider switching away from our best addr
/// to another address that we've received data on.
///
/// The trusted address lifecycle goes as follows:
/// - A UDP DICSO pong is received, this validates that the path is for sure valid.
/// - The disco path that seems to have the lowest latency is the path we use to send on.
/// - We trust this path as a path to send on for at least TRUST_UDP_ADDR_DURATION.
/// - This time is extended every time we receive a UDP DISCO pong on the address.
/// - This time is *also* extended every time we receive *application* payloads on this
/// address (i.e. QUIC datagrams).
/// - If our best address becomes outdated (TRUST_UDP_ADDR_DURATION expires) without
/// another pong or payload data, then we'll start sending over the relay, too!
/// (we switch to ConnectionType::Mixed)
///
/// However, we might not get any UDP DISCO pongs because they're UDP packets and get
/// lost under e.g. high load.
///
/// This is usually fine, because we also receive on the best addr, and that extends its
/// "trust period" just as well.
///
/// However, if *additionally* we send on a different address than the one we receive on,
/// then this extension doesn't happen.
///
/// To fix this, we also apply the same path validation logic to non-best addresses.
/// I.e. we keep track of when they last received a pong, at which point we consider them
/// validated, and then extend this validation period when we receive application data
/// while they're valid (or when we receive another pong).
///
/// Now, when our best address becomes outdated, we need to switch to another valid path.
///
/// We could switch to another path once the best address becomes outdated, but then we'd
/// already start sending on the relay for a couple of iterations!
///
/// So instead, we switch to another path when it looks like the best address becomes
/// outdated.
/// Not just any path, but the path that we're currently receiving from for this node.
///
/// Since we might not be receiving constantly from the remote side (e.g. if it's a
/// one-sided transfer), we need to take care to do consider switching early enough.
///
/// So this duration is chosen as at least 1 keep alive interval (1s default in iroh atm)
/// + at maximum 400ms of latency spike.
const TRUST_UDP_ADDR_SOON_OUTDATED: Duration = Duration::from_millis(1400);

#[derive(Debug, Default)]
pub(super) struct BestAddr(Option<BestAddrInner>);
Expand Down Expand Up @@ -38,12 +82,12 @@ pub(super) enum Source {
}

impl Source {
fn trust_until(&self, from: Instant) -> Instant {
fn trust_duration(&self) -> Duration {
match self {
Source::ReceivedPong => from + TRUST_UDP_ADDR_DURATION,
Source::ReceivedPong => TRUST_UDP_ADDR_DURATION,
// TODO: Fix time
Source::BestCandidate => from + Duration::from_secs(60 * 60),
Source::Udp => from + TRUST_UDP_ADDR_DURATION,
Source::BestCandidate => Duration::from_secs(60 * 60),
Source::Udp => TRUST_UDP_ADDR_DURATION,
}
}
}
Expand Down Expand Up @@ -115,30 +159,47 @@ impl BestAddr {
latency: Duration,
source: Source,
confirmed_at: Instant,
now: Instant,
) {
match self.0.as_mut() {
None => {
self.insert(addr, latency, source, confirmed_at);
}
Some(state) => {
let candidate = AddrLatency { addr, latency };
if !state.is_trusted(confirmed_at) || candidate.is_better_than(&state.addr) {
if !state.is_trusted(now) || candidate.is_better_than(&state.addr) {
self.insert(addr, latency, source, confirmed_at);
} else if state.addr.addr == addr {
state.confirmed_at = confirmed_at;
state.trust_until = Some(source.trust_until(confirmed_at));
state.trust_until = Some(confirmed_at + source.trust_duration());
}
}
}
}

/// Reset the expiry, if the passed in addr matches the currently used one.
#[cfg(not(wasm_browser))]
pub fn reconfirm_if_used(&mut self, addr: SocketAddr, source: Source, confirmed_at: Instant) {
if let Some(state) = self.0.as_mut() {
if state.addr.addr == addr {
state.confirmed_at = confirmed_at;
state.trust_until = Some(source.trust_until(confirmed_at));
pub fn insert_if_soon_outdated_or_reconfirm(
&mut self,
addr: SocketAddr,
latency: Duration,
source: Source,
confirmed_at: Instant,
now: Instant,
) {
match self.0.as_mut() {
None => {
self.insert(addr, latency, source, confirmed_at);
}
Some(state) => {
// If the current best addr will soon be outdated
// and the given candidate will be trusted for longer
if !state.is_trusted(now + TRUST_UDP_ADDR_SOON_OUTDATED)
&& state.confirmed_at < confirmed_at
{
self.insert(addr, latency, source, confirmed_at);
} else if state.addr.addr == addr {
state.confirmed_at = confirmed_at;
state.trust_until = Some(confirmed_at + source.trust_duration());
}
}
}
}
Expand All @@ -150,7 +211,7 @@ impl BestAddr {
source: Source,
confirmed_at: Instant,
) {
let trust_until = source.trust_until(confirmed_at);
let trust_until = confirmed_at + source.trust_duration();

if self
.0
Expand Down
19 changes: 14 additions & 5 deletions iroh/src/magicsock/node_map/node_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,7 @@ impl NodeState {
latency,
best_addr::Source::ReceivedPong,
now,
now,
);
}

Expand Down Expand Up @@ -1064,18 +1065,26 @@ impl NodeState {
debug_assert!(false, "node map inconsistency by_ip_port <-> direct addr");
return;
};
state.last_payload_msg = Some(now);
state.receive_payload(now);
self.last_used = Some(now);
self.udp_paths
.best_addr
.reconfirm_if_used(addr.into(), BestAddrSource::Udp, now);
if let Some((latency, confirmed_at)) = state.valid_best_addr_candidate(now) {
self.udp_paths
.best_addr
.insert_if_soon_outdated_or_reconfirm(
addr.into(),
latency,
BestAddrSource::Udp,
confirmed_at,
now,
);
}
}

pub(super) fn receive_relay(&mut self, url: &RelayUrl, src: NodeId, now: Instant) {
match self.relay_url.as_mut() {
Some((current_home, state)) if current_home == url => {
// We received on the expected url. update state.
state.last_payload_msg = Some(now);
state.receive_payload(now);
}
Some((_current_home, _state)) => {
// we have a different url. we only update on ping, not on receive_relay.
Expand Down
58 changes: 57 additions & 1 deletion iroh/src/magicsock/node_map/path_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use super::{
node_state::{ControlMsg, PongReply, SESSION_ACTIVE_TIMEOUT},
IpPort, PingRole, Source,
};
use crate::{disco::SendAddr, magicsock::HEARTBEAT_INTERVAL};
use crate::{
disco::SendAddr,
magicsock::{node_map::best_addr::TRUST_UDP_ADDR_DURATION, HEARTBEAT_INTERVAL},
};

/// The minimum time between pings to an endpoint.
///
Expand Down Expand Up @@ -53,6 +56,9 @@ pub(super) struct PathState {
///
/// This excludes DISCO messages.
pub(super) last_payload_msg: Option<Instant>,
/// Whether the last payload msg was within [`TRUST_UDP_ADDR_DURATION`] of either
/// the last trusted payload message or a recent pong.
pub(super) last_payload_trusted: bool,
/// Sources is a map of [`Source`]s to [`Instant`]s, keeping track of all the ways we have
/// learned about this path
///
Expand All @@ -73,6 +79,7 @@ impl PathState {
call_me_maybe_time: None,
recent_pong: None,
last_payload_msg: None,
last_payload_trusted: false,
sources,
}
}
Expand Down Expand Up @@ -100,6 +107,7 @@ impl PathState {
call_me_maybe_time: None,
recent_pong: None,
last_payload_msg: Some(now),
last_payload_trusted: false,
sources,
}
}
Expand Down Expand Up @@ -129,6 +137,26 @@ impl PathState {
}
}
self.recent_pong = Some(r);

if let Some(last_payload_msg) = self.last_payload_msg {
self.last_payload_trusted = self.within_trusted_pong_duration(last_payload_msg);
}
}

fn within_trusted_pong_duration(&self, instant: Instant) -> bool {
if let Some(pong) = &self.recent_pong {
return pong.pong_at <= instant && instant < pong.pong_at + TRUST_UDP_ADDR_DURATION;
}

false
}

pub(super) fn receive_payload(&mut self, now: Instant) {
self.last_payload_msg = Some(now);

if self.within_trusted_pong_duration(now) {
self.last_payload_trusted = true;
}
}

#[cfg(test)]
Expand All @@ -141,6 +169,7 @@ impl PathState {
call_me_maybe_time: None,
recent_pong: Some(r),
last_payload_msg: None,
last_payload_trusted: false,
sources: HashMap::new(),
}
}
Expand Down Expand Up @@ -187,6 +216,21 @@ impl PathState {
.copied()
}

fn last_confirmed_at(&self) -> Option<Instant> {
let last_trusted_payload_msg = if self.last_payload_trusted {
self.last_payload_msg
} else {
None
};

self.recent_pong
.as_ref()
.map(|pong| pong.pong_at)
.into_iter()
.chain(last_trusted_payload_msg)
.max()
}

/// The last control or DISCO message **about** this path.
///
/// This is the most recent instant among:
Expand Down Expand Up @@ -222,6 +266,18 @@ impl PathState {
self.recent_pong.as_ref().map(|p| p.latency)
}

/// Returns the latency and confirmed_at time if this path would make for a valid best addr `now`
pub(crate) fn valid_best_addr_candidate(&self, now: Instant) -> Option<(Duration, Instant)> {
let latency = self.recent_pong.as_ref()?.latency;
let confirmed_at = self.last_confirmed_at()?;

if confirmed_at <= now && now < confirmed_at + TRUST_UDP_ADDR_DURATION {
return Some((latency, confirmed_at));
}

None
}

pub(super) fn needs_ping(&self, now: &Instant) -> bool {
match self.last_ping {
None => true,
Expand Down
5 changes: 3 additions & 2 deletions iroh/src/magicsock/node_map/udp_paths.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl NodeUdpPaths {
/// it should be `&self` and not `&mut self`. This is only possible once the state from
/// [`NodeUdpPaths`] is no longer modified from outside.
pub(super) fn send_addr(&mut self, now: Instant, have_ipv6: bool) -> UdpSendAddr {
self.assign_best_addr_from_candidates_if_empty();
self.assign_best_addr_from_candidates_if_empty(now);
match self.best_addr.state(now) {
best_addr::State::Valid(addr) => UdpSendAddr::Valid(addr.addr),
best_addr::State::Outdated(addr) => UdpSendAddr::Outdated(addr.addr),
Expand Down Expand Up @@ -139,7 +139,7 @@ impl NodeUdpPaths {
/// If somehow we end up in a state where we failed to set a best_addr, while we do have
/// valid candidates, this will chose a candidate and set best_addr again. Most likely
/// this is a bug elsewhere though.
fn assign_best_addr_from_candidates_if_empty(&mut self) {
fn assign_best_addr_from_candidates_if_empty(&mut self, now: Instant) {
if !self.best_addr.is_empty() {
return;
}
Expand Down Expand Up @@ -173,6 +173,7 @@ impl NodeUdpPaths {
pong.latency,
best_addr::Source::BestCandidate,
pong.pong_at,
now,
)
}
}
Expand Down
9 changes: 8 additions & 1 deletion iroh/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,15 @@ const ECHO_ALPN: &[u8] = b"echo";
async fn simple_node_id_based_connection_transfer() -> TestResult {
setup_logging();

let client = Endpoint::builder().discovery_n0().bind().await?;
let relay_map = iroh::defaults::prod::default_relay_map();

let client = Endpoint::builder()
.relay_mode(iroh::RelayMode::Custom(relay_map.clone()))
.discovery_n0()
.bind()
.await?;
let server = Endpoint::builder()
.relay_mode(iroh::RelayMode::Custom(relay_map))
.discovery_n0()
.alpns(vec![ECHO_ALPN.to_vec()])
.bind()
Expand Down
Loading