diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index cc9a1cc1fd..6833e0c88f 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -2253,7 +2253,7 @@ mod tests { }; use iroh_base::{NodeAddr, NodeId, SecretKey}; - use n0_future::{StreamExt, task::AbortOnDropHandle}; + use n0_future::{BufferedStreamExt, StreamExt, stream, task::AbortOnDropHandle}; use n0_snafu::{Error, Result, ResultExt}; use n0_watcher::Watcher; use quinn::ConnectionError; @@ -2264,7 +2264,9 @@ mod tests { use super::Endpoint; use crate::{ RelayMode, + discovery::static_provider::StaticProvider, endpoint::{ConnectOptions, Connection, ConnectionType, RemoteInfo}, + protocol::{AcceptError, ProtocolHandler, Router}, test_utils::{run_relay_server, run_relay_server_with}, }; @@ -3194,4 +3196,79 @@ mod tests { Ok(()) } + + /// Tests that initial connection establishment isn't extremely slow compared + /// to subsequent connections. + /// + /// This is a time based test, but uses a very large ratio to reduce flakiness. + /// It also does a number of connections to average out any anomalies. + #[tokio::test] + #[traced_test] + async fn connect_multi_time() -> Result { + let n = 32; + + const NOOP_ALPN: &[u8] = b"noop"; + + #[derive(Debug, Clone)] + struct Noop; + + impl ProtocolHandler for Noop { + async fn accept(&self, connection: Connection) -> Result<(), AcceptError> { + connection.closed().await; + Ok(()) + } + } + + async fn noop_server() -> Result<(Router, NodeAddr)> { + let endpoint = Endpoint::builder() + .relay_mode(RelayMode::Disabled) + .bind() + .await + .e()?; + let addr = endpoint.node_addr().initialized().await; + let router = Router::builder(endpoint).accept(NOOP_ALPN, Noop).spawn(); + Ok((router, addr)) + } + + let routers = stream::iter(0..n) + .map(|_| noop_server()) + .buffered_unordered(32) + .collect::>() + .await + .into_iter() + .collect::, _>>() + .e()?; + + let addrs = routers + .iter() + .map(|(_, addr)| addr.clone()) + .collect::>(); + let ids = addrs.iter().map(|addr| addr.node_id).collect::>(); + let discovery = StaticProvider::from_node_info(addrs); + let endpoint = Endpoint::builder() + .relay_mode(RelayMode::Disabled) + .discovery(discovery) + .bind() + .await + .e()?; + // wait for the endpoint to be initialized. This should not be needed, + // but we don't want to measure endpoint init time but connection time + // from a fully initialized endpoint. + endpoint.node_addr().initialized().await; + let t0 = Instant::now(); + for id in &ids { + let conn = endpoint.connect(*id, NOOP_ALPN).await?; + conn.close(0u32.into(), b"done"); + } + let dt0 = t0.elapsed().as_secs_f64(); + let t1 = Instant::now(); + for id in &ids { + let conn = endpoint.connect(*id, NOOP_ALPN).await?; + conn.close(0u32.into(), b"done"); + } + let dt1 = t1.elapsed().as_secs_f64(); + + assert!(dt0 / dt1 < 20.0, "First round: {dt0}s, second round {dt1}s"); + Ok(()) + } } diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index 6a0ea12e64..76233bccc6 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -485,6 +485,7 @@ impl MagicSock { #[instrument(skip_all)] fn prepare_send( &self, + udp_sender: &UdpSender, transmit: &quinn_udp::Transmit, ) -> io::Result> { self.metrics @@ -526,9 +527,7 @@ impl MagicSock { ) { Some((node_id, udp_addr, relay_url, ping_actions)) => { if !ping_actions.is_empty() { - self.actor_sender - .try_send(ActorMessage::PingActions(ping_actions)) - .ok(); + self.try_send_ping_actions(udp_sender, ping_actions).ok(); } if let Some(addr) = udp_addr { active_paths.push(transports::Addr::from(addr)); @@ -1018,6 +1017,118 @@ impl MagicSock { } } } + /// Tries to send out the given ping actions out. + fn try_send_ping_actions(&self, sender: &UdpSender, msgs: Vec) -> io::Result<()> { + for msg in msgs { + // Abort sending as soon as we know we are shutting down. + if self.is_closing() || self.is_closed() { + return Ok(()); + } + match msg { + PingAction::SendCallMeMaybe { + relay_url, + dst_node, + } => { + // Sends the call-me-maybe DISCO message, queuing if addresses are too stale. + // + // To send the call-me-maybe message, we need to know our current direct addresses. If + // this information is too stale, the call-me-maybe is queued while a net_report run is + // scheduled. Once this run finishes, the call-me-maybe will be sent. + match self.direct_addrs.fresh_enough() { + Ok(()) => { + let msg = disco::Message::CallMeMaybe( + self.direct_addrs.to_call_me_maybe_message(), + ); + if !self.disco.try_send( + SendAddr::Relay(relay_url.clone()), + dst_node, + msg.clone(), + ) { + warn!(dstkey = %dst_node.fmt_short(), %relay_url, "relay channel full, dropping call-me-maybe"); + } else { + debug!(dstkey = %dst_node.fmt_short(), %relay_url, "call-me-maybe sent"); + } + } + Err(last_refresh_ago) => { + debug!( + ?last_refresh_ago, + "want call-me-maybe but direct addrs stale; queuing after restun", + ); + self.actor_sender + .try_send(ActorMessage::ScheduleDirectAddrUpdate( + UpdateReason::RefreshForPeering, + Some((dst_node, relay_url)), + )) + .ok(); + } + } + } + PingAction::SendPing(SendPing { + id, + dst, + dst_node, + tx_id, + purpose, + }) => { + let msg = disco::Message::Ping(disco::Ping { + tx_id, + node_key: self.public_key, + }); + + self.try_send_disco_message(sender, dst.clone(), dst_node, msg)?; + debug!(%dst, tx = %HEXLOWER.encode(&tx_id), ?purpose, "ping sent"); + let msg_sender = self.actor_sender.clone(); + self.node_map + .notify_ping_sent(id, dst, tx_id, purpose, msg_sender); + } + } + } + Ok(()) + } + + /// Tries to send out a disco message. + fn try_send_disco_message( + &self, + sender: &UdpSender, + dst: SendAddr, + dst_key: PublicKey, + msg: disco::Message, + ) -> io::Result<()> { + let dst = match dst { + SendAddr::Udp(addr) => transports::Addr::Ip(addr), + SendAddr::Relay(url) => transports::Addr::Relay(url, dst_key), + }; + + trace!(?dst, %msg, "send disco message (UDP)"); + if self.is_closed() { + return Err(io::Error::new( + io::ErrorKind::NotConnected, + "connection closed", + )); + } + + let pkt = self.disco.encode_and_seal(self.public_key, dst_key, &msg); + + let transmit = transports::Transmit { + contents: &pkt, + ecn: None, + segment_size: None, + }; + + let dst2 = dst.clone(); + match sender.inner_try_send(&dst2, None, &transmit) { + Ok(()) => { + trace!(?dst, %msg, "sent disco message"); + self.metrics.magicsock.sent_disco_udp.inc(); + disco_message_sent(&msg, &self.metrics.magicsock); + Ok(()) + } + Err(err) => { + warn!(?dst, ?msg, ?err, "failed to send disco message"); + Err(err) + } + } + } /// Publishes our address to a discovery service, if configured. /// @@ -1649,7 +1760,6 @@ impl AsyncUdpSocket for MagicUdpSocket { #[derive(Debug)] enum ActorMessage { - PingActions(Vec), EndpointPingExpired(usize, stun_rs::TransactionId), NetworkChange, ScheduleDirectAddrUpdate(UpdateReason, Option<(NodeId, RelayUrl)>), @@ -1790,7 +1900,7 @@ impl Actor { trace!(?msg, "tick: msg"); self.msock.metrics.magicsock.actor_tick_msg.inc(); - self.handle_actor_message(msg, &sender).await; + self.handle_actor_message(msg).await; } tick = self.periodic_re_stun_timer.tick() => { trace!("tick: re_stun {:?}", tick); @@ -1949,7 +2059,7 @@ impl Actor { /// Processes an incoming actor message. /// /// Returns `true` if it was a shutdown. - async fn handle_actor_message(&mut self, msg: ActorMessage, sender: &UdpSender) { + async fn handle_actor_message(&mut self, msg: ActorMessage) { match msg { ActorMessage::EndpointPingExpired(id, txid) => { self.msock.node_map.notify_ping_timeout(id, txid); @@ -1969,9 +2079,6 @@ impl Actor { ActorMessage::ForceNetworkChange(is_major) => { self.handle_network_change(is_major).await; } - ActorMessage::PingActions(ping_actions) => { - self.handle_ping_actions(sender, ping_actions).await; - } } } diff --git a/iroh/src/magicsock/transports.rs b/iroh/src/magicsock/transports.rs index c4703b303d..d579118c2e 100644 --- a/iroh/src/magicsock/transports.rs +++ b/iroh/src/magicsock/transports.rs @@ -454,7 +454,7 @@ impl UdpSender { } /// Best effort sending - fn inner_try_send( + pub(crate) fn inner_try_send( &self, destination: &Addr, src: Option, @@ -504,7 +504,7 @@ impl quinn::UdpSender for UdpSender { transmit: &quinn_udp::Transmit, cx: &mut Context, ) -> Poll> { - let active_paths = self.msock.prepare_send(transmit)?; + let active_paths = self.msock.prepare_send(&self, transmit)?; if active_paths.is_empty() { // Returning Ok here means we let QUIC timeout. @@ -556,7 +556,7 @@ impl quinn::UdpSender for UdpSender { } fn try_send(self: Pin<&mut Self>, transmit: &quinn_udp::Transmit) -> io::Result<()> { - let active_paths = self.msock.prepare_send(transmit)?; + let active_paths = self.msock.prepare_send(&self, transmit)?; if active_paths.is_empty() { // Returning Ok here means we let QUIC timeout. // Returning an error would immediately fail a connection.