Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 78 additions & 1 deletion iroh/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2253,7 +2253,7 @@ mod tests {
};

use iroh_base::{NodeAddr, NodeId, SecretKey};
use n0_future::{StreamExt, task::AbortOnDropHandle};
use n0_future::{BufferedStreamExt, StreamExt, stream, task::AbortOnDropHandle};
use n0_snafu::{Error, Result, ResultExt};
use n0_watcher::Watcher;
use quinn::ConnectionError;
Expand All @@ -2264,7 +2264,9 @@ mod tests {
use super::Endpoint;
use crate::{
RelayMode,
discovery::static_provider::StaticProvider,
endpoint::{ConnectOptions, Connection, ConnectionType, RemoteInfo},
protocol::{AcceptError, ProtocolHandler, Router},
test_utils::{run_relay_server, run_relay_server_with},
};

Expand Down Expand Up @@ -3194,4 +3196,79 @@ mod tests {

Ok(())
}

/// Tests that initial connection establishment isn't extremely slow compared
/// to subsequent connections.
///
/// This is a time based test, but uses a very large ratio to reduce flakiness.
/// It also does a number of connections to average out any anomalies.
#[tokio::test]
#[traced_test]
async fn connect_multi_time() -> Result {
let n = 32;

const NOOP_ALPN: &[u8] = b"noop";

#[derive(Debug, Clone)]
struct Noop;

impl ProtocolHandler for Noop {
async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
connection.closed().await;
Ok(())
}
}

async fn noop_server() -> Result<(Router, NodeAddr)> {
let endpoint = Endpoint::builder()
.relay_mode(RelayMode::Disabled)
.bind()
.await
.e()?;
let addr = endpoint.node_addr().initialized().await;
let router = Router::builder(endpoint).accept(NOOP_ALPN, Noop).spawn();
Ok((router, addr))
}

let routers = stream::iter(0..n)
.map(|_| noop_server())
.buffered_unordered(32)
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
.e()?;

let addrs = routers
.iter()
.map(|(_, addr)| addr.clone())
.collect::<Vec<_>>();
let ids = addrs.iter().map(|addr| addr.node_id).collect::<Vec<_>>();
let discovery = StaticProvider::from_node_info(addrs);
let endpoint = Endpoint::builder()
.relay_mode(RelayMode::Disabled)
.discovery(discovery)
.bind()
.await
.e()?;
// wait for the endpoint to be initialized. This should not be needed,
// but we don't want to measure endpoint init time but connection time
// from a fully initialized endpoint.
endpoint.node_addr().initialized().await;
let t0 = Instant::now();
for id in &ids {
let conn = endpoint.connect(*id, NOOP_ALPN).await?;
conn.close(0u32.into(), b"done");
}
let dt0 = t0.elapsed().as_secs_f64();
let t1 = Instant::now();
for id in &ids {
let conn = endpoint.connect(*id, NOOP_ALPN).await?;
conn.close(0u32.into(), b"done");
}
let dt1 = t1.elapsed().as_secs_f64();

assert!(dt0 / dt1 < 20.0, "First round: {dt0}s, second round {dt1}s");
Ok(())
}
}
125 changes: 116 additions & 9 deletions iroh/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ impl MagicSock {
#[instrument(skip_all)]
fn prepare_send(
&self,
udp_sender: &UdpSender,
transmit: &quinn_udp::Transmit,
) -> io::Result<SmallVec<[transports::Addr; 3]>> {
self.metrics
Expand Down Expand Up @@ -526,9 +527,7 @@ impl MagicSock {
) {
Some((node_id, udp_addr, relay_url, ping_actions)) => {
if !ping_actions.is_empty() {
self.actor_sender
.try_send(ActorMessage::PingActions(ping_actions))
.ok();
self.try_send_ping_actions(udp_sender, ping_actions).ok();
}
if let Some(addr) = udp_addr {
active_paths.push(transports::Addr::from(addr));
Expand Down Expand Up @@ -1018,6 +1017,118 @@ impl MagicSock {
}
}
}
/// Tries to send out the given ping actions out.
fn try_send_ping_actions(&self, sender: &UdpSender, msgs: Vec<PingAction>) -> io::Result<()> {
for msg in msgs {
// Abort sending as soon as we know we are shutting down.
if self.is_closing() || self.is_closed() {
return Ok(());
}
match msg {
PingAction::SendCallMeMaybe {
relay_url,
dst_node,
} => {
// Sends the call-me-maybe DISCO message, queuing if addresses are too stale.
//
// To send the call-me-maybe message, we need to know our current direct addresses. If
// this information is too stale, the call-me-maybe is queued while a net_report run is
// scheduled. Once this run finishes, the call-me-maybe will be sent.
match self.direct_addrs.fresh_enough() {
Ok(()) => {
let msg = disco::Message::CallMeMaybe(
self.direct_addrs.to_call_me_maybe_message(),
);
if !self.disco.try_send(
SendAddr::Relay(relay_url.clone()),
dst_node,
msg.clone(),
) {
warn!(dstkey = %dst_node.fmt_short(), %relay_url, "relay channel full, dropping call-me-maybe");
} else {
debug!(dstkey = %dst_node.fmt_short(), %relay_url, "call-me-maybe sent");
}
}
Err(last_refresh_ago) => {
debug!(
?last_refresh_ago,
"want call-me-maybe but direct addrs stale; queuing after restun",
);
self.actor_sender
.try_send(ActorMessage::ScheduleDirectAddrUpdate(
UpdateReason::RefreshForPeering,
Some((dst_node, relay_url)),
))
.ok();
}
}
}
PingAction::SendPing(SendPing {
id,
dst,
dst_node,
tx_id,
purpose,
}) => {
let msg = disco::Message::Ping(disco::Ping {
tx_id,
node_key: self.public_key,
});

self.try_send_disco_message(sender, dst.clone(), dst_node, msg)?;
debug!(%dst, tx = %HEXLOWER.encode(&tx_id), ?purpose, "ping sent");
let msg_sender = self.actor_sender.clone();
self.node_map
.notify_ping_sent(id, dst, tx_id, purpose, msg_sender);
}
}
}
Ok(())
}

/// Tries to send out a disco message.
fn try_send_disco_message(
&self,
sender: &UdpSender,
dst: SendAddr,
dst_key: PublicKey,
msg: disco::Message,
) -> io::Result<()> {
let dst = match dst {
SendAddr::Udp(addr) => transports::Addr::Ip(addr),
SendAddr::Relay(url) => transports::Addr::Relay(url, dst_key),
};

trace!(?dst, %msg, "send disco message (UDP)");
if self.is_closed() {
return Err(io::Error::new(
io::ErrorKind::NotConnected,
"connection closed",
));
}

let pkt = self.disco.encode_and_seal(self.public_key, dst_key, &msg);

let transmit = transports::Transmit {
contents: &pkt,
ecn: None,
segment_size: None,
};

let dst2 = dst.clone();
match sender.inner_try_send(&dst2, None, &transmit) {
Ok(()) => {
trace!(?dst, %msg, "sent disco message");
self.metrics.magicsock.sent_disco_udp.inc();
disco_message_sent(&msg, &self.metrics.magicsock);
Ok(())
}
Err(err) => {
warn!(?dst, ?msg, ?err, "failed to send disco message");
Err(err)
}
}
}

/// Publishes our address to a discovery service, if configured.
///
Expand Down Expand Up @@ -1649,7 +1760,6 @@ impl AsyncUdpSocket for MagicUdpSocket {

#[derive(Debug)]
enum ActorMessage {
PingActions(Vec<PingAction>),
EndpointPingExpired(usize, stun_rs::TransactionId),
NetworkChange,
ScheduleDirectAddrUpdate(UpdateReason, Option<(NodeId, RelayUrl)>),
Expand Down Expand Up @@ -1790,7 +1900,7 @@ impl Actor {

trace!(?msg, "tick: msg");
self.msock.metrics.magicsock.actor_tick_msg.inc();
self.handle_actor_message(msg, &sender).await;
self.handle_actor_message(msg).await;
}
tick = self.periodic_re_stun_timer.tick() => {
trace!("tick: re_stun {:?}", tick);
Expand Down Expand Up @@ -1949,7 +2059,7 @@ impl Actor {
/// Processes an incoming actor message.
///
/// Returns `true` if it was a shutdown.
async fn handle_actor_message(&mut self, msg: ActorMessage, sender: &UdpSender) {
async fn handle_actor_message(&mut self, msg: ActorMessage) {
match msg {
ActorMessage::EndpointPingExpired(id, txid) => {
self.msock.node_map.notify_ping_timeout(id, txid);
Expand All @@ -1969,9 +2079,6 @@ impl Actor {
ActorMessage::ForceNetworkChange(is_major) => {
self.handle_network_change(is_major).await;
}
ActorMessage::PingActions(ping_actions) => {
self.handle_ping_actions(sender, ping_actions).await;
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions iroh/src/magicsock/transports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ impl UdpSender {
}

/// Best effort sending
fn inner_try_send(
pub(crate) fn inner_try_send(
&self,
destination: &Addr,
src: Option<IpAddr>,
Expand Down Expand Up @@ -504,7 +504,7 @@ impl quinn::UdpSender for UdpSender {
transmit: &quinn_udp::Transmit,
cx: &mut Context,
) -> Poll<io::Result<()>> {
let active_paths = self.msock.prepare_send(transmit)?;
let active_paths = self.msock.prepare_send(&self, transmit)?;

if active_paths.is_empty() {
// Returning Ok here means we let QUIC timeout.
Expand Down Expand Up @@ -556,7 +556,7 @@ impl quinn::UdpSender for UdpSender {
}

fn try_send(self: Pin<&mut Self>, transmit: &quinn_udp::Transmit) -> io::Result<()> {
let active_paths = self.msock.prepare_send(transmit)?;
let active_paths = self.msock.prepare_send(&self, transmit)?;
if active_paths.is_empty() {
// Returning Ok here means we let QUIC timeout.
// Returning an error would immediately fail a connection.
Expand Down
Loading