Skip to content

Commit 59d1432

Browse files
fix(iroh): fix very slow initial connection establishment (#3434)
## Description Fixes very slow (~1s) initial connection establishment Also adds a test for initial connection delay. Initial connection was *very* slow even with relays disabled and static discovery. There was a fixed delay. ## Breaking Changes None ## Notes & open questions ~~Question: not sure how we can turn this into a proper test without making it super flaky, since it is a timing test. Maybe measure the ratio between first and second round and have some very conservative bounds?~~ Changed the test name and put the timings in the assertion. I think the test is good to go as is. The ratio is extremely conservative, so it should rarely flake out. Note: I do 32 connections as an attempt to make the timings more reliable, but strictly speaking it would also work with 1 connection. ``` thread 'endpoint::tests::initial_connection_time' panicked at iroh/src/endpoint.rs:3269:9: First round: 32.230450542s, second round 0.063144708s ``` ## Change checklist <!-- Remove any that are not relevant. --> - [ ] Self-review. - [ ] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [ ] Tests if relevant. - [ ] All breaking changes documented. - [ ] List all breaking changes in the above "Breaking Changes" section. - [ ] Open an issue or PR on any number0 repos that are affected by this breaking change. Give guidance on how the updates should be handled or do the actual updates themselves. The major ones are: - [ ] [`quic-rpc`](https://github.com/n0-computer/quic-rpc) - [ ] [`iroh-gossip`](https://github.com/n0-computer/iroh-gossip) - [ ] [`iroh-blobs`](https://github.com/n0-computer/iroh-blobs) - [ ] [`dumbpipe`](https://github.com/n0-computer/dumbpipe) - [ ] [`sendme`](https://github.com/n0-computer/sendme) --------- Co-authored-by: dignifiedquire <[email protected]>
1 parent 8c558a1 commit 59d1432

File tree

3 files changed

+197
-13
lines changed

3 files changed

+197
-13
lines changed

iroh/src/endpoint.rs

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2253,7 +2253,7 @@ mod tests {
22532253
};
22542254

22552255
use iroh_base::{NodeAddr, NodeId, SecretKey};
2256-
use n0_future::{StreamExt, task::AbortOnDropHandle};
2256+
use n0_future::{BufferedStreamExt, StreamExt, stream, task::AbortOnDropHandle};
22572257
use n0_snafu::{Error, Result, ResultExt};
22582258
use n0_watcher::Watcher;
22592259
use quinn::ConnectionError;
@@ -2264,7 +2264,9 @@ mod tests {
22642264
use super::Endpoint;
22652265
use crate::{
22662266
RelayMode,
2267+
discovery::static_provider::StaticProvider,
22672268
endpoint::{ConnectOptions, Connection, ConnectionType, RemoteInfo},
2269+
protocol::{AcceptError, ProtocolHandler, Router},
22682270
test_utils::{run_relay_server, run_relay_server_with},
22692271
};
22702272

@@ -3194,4 +3196,79 @@ mod tests {
31943196

31953197
Ok(())
31963198
}
3199+
3200+
/// Tests that initial connection establishment isn't extremely slow compared
3201+
/// to subsequent connections.
3202+
///
3203+
/// This is a time based test, but uses a very large ratio to reduce flakiness.
3204+
/// It also does a number of connections to average out any anomalies.
3205+
#[tokio::test]
3206+
#[traced_test]
3207+
async fn connect_multi_time() -> Result {
3208+
let n = 32;
3209+
3210+
const NOOP_ALPN: &[u8] = b"noop";
3211+
3212+
#[derive(Debug, Clone)]
3213+
struct Noop;
3214+
3215+
impl ProtocolHandler for Noop {
3216+
async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
3217+
connection.closed().await;
3218+
Ok(())
3219+
}
3220+
}
3221+
3222+
async fn noop_server() -> Result<(Router, NodeAddr)> {
3223+
let endpoint = Endpoint::builder()
3224+
.relay_mode(RelayMode::Disabled)
3225+
.bind()
3226+
.await
3227+
.e()?;
3228+
let addr = endpoint.node_addr().initialized().await;
3229+
let router = Router::builder(endpoint).accept(NOOP_ALPN, Noop).spawn();
3230+
Ok((router, addr))
3231+
}
3232+
3233+
let routers = stream::iter(0..n)
3234+
.map(|_| noop_server())
3235+
.buffered_unordered(32)
3236+
.collect::<Vec<_>>()
3237+
.await
3238+
.into_iter()
3239+
.collect::<Result<Vec<_>, _>>()
3240+
.e()?;
3241+
3242+
let addrs = routers
3243+
.iter()
3244+
.map(|(_, addr)| addr.clone())
3245+
.collect::<Vec<_>>();
3246+
let ids = addrs.iter().map(|addr| addr.node_id).collect::<Vec<_>>();
3247+
let discovery = StaticProvider::from_node_info(addrs);
3248+
let endpoint = Endpoint::builder()
3249+
.relay_mode(RelayMode::Disabled)
3250+
.discovery(discovery)
3251+
.bind()
3252+
.await
3253+
.e()?;
3254+
// wait for the endpoint to be initialized. This should not be needed,
3255+
// but we don't want to measure endpoint init time but connection time
3256+
// from a fully initialized endpoint.
3257+
endpoint.node_addr().initialized().await;
3258+
let t0 = Instant::now();
3259+
for id in &ids {
3260+
let conn = endpoint.connect(*id, NOOP_ALPN).await?;
3261+
conn.close(0u32.into(), b"done");
3262+
}
3263+
let dt0 = t0.elapsed().as_secs_f64();
3264+
let t1 = Instant::now();
3265+
for id in &ids {
3266+
let conn = endpoint.connect(*id, NOOP_ALPN).await?;
3267+
conn.close(0u32.into(), b"done");
3268+
}
3269+
let dt1 = t1.elapsed().as_secs_f64();
3270+
3271+
assert!(dt0 / dt1 < 20.0, "First round: {dt0}s, second round {dt1}s");
3272+
Ok(())
3273+
}
31973274
}

iroh/src/magicsock.rs

Lines changed: 116 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,7 @@ impl MagicSock {
485485
#[instrument(skip_all)]
486486
fn prepare_send(
487487
&self,
488+
udp_sender: &UdpSender,
488489
transmit: &quinn_udp::Transmit,
489490
) -> io::Result<SmallVec<[transports::Addr; 3]>> {
490491
self.metrics
@@ -526,9 +527,7 @@ impl MagicSock {
526527
) {
527528
Some((node_id, udp_addr, relay_url, ping_actions)) => {
528529
if !ping_actions.is_empty() {
529-
self.actor_sender
530-
.try_send(ActorMessage::PingActions(ping_actions))
531-
.ok();
530+
self.try_send_ping_actions(udp_sender, ping_actions).ok();
532531
}
533532
if let Some(addr) = udp_addr {
534533
active_paths.push(transports::Addr::from(addr));
@@ -1018,6 +1017,118 @@ impl MagicSock {
10181017
}
10191018
}
10201019
}
1020+
/// Tries to send out the given ping actions out.
1021+
fn try_send_ping_actions(&self, sender: &UdpSender, msgs: Vec<PingAction>) -> io::Result<()> {
1022+
for msg in msgs {
1023+
// Abort sending as soon as we know we are shutting down.
1024+
if self.is_closing() || self.is_closed() {
1025+
return Ok(());
1026+
}
1027+
match msg {
1028+
PingAction::SendCallMeMaybe {
1029+
relay_url,
1030+
dst_node,
1031+
} => {
1032+
// Sends the call-me-maybe DISCO message, queuing if addresses are too stale.
1033+
//
1034+
// To send the call-me-maybe message, we need to know our current direct addresses. If
1035+
// this information is too stale, the call-me-maybe is queued while a net_report run is
1036+
// scheduled. Once this run finishes, the call-me-maybe will be sent.
1037+
match self.direct_addrs.fresh_enough() {
1038+
Ok(()) => {
1039+
let msg = disco::Message::CallMeMaybe(
1040+
self.direct_addrs.to_call_me_maybe_message(),
1041+
);
1042+
if !self.disco.try_send(
1043+
SendAddr::Relay(relay_url.clone()),
1044+
dst_node,
1045+
msg.clone(),
1046+
) {
1047+
warn!(dstkey = %dst_node.fmt_short(), %relay_url, "relay channel full, dropping call-me-maybe");
1048+
} else {
1049+
debug!(dstkey = %dst_node.fmt_short(), %relay_url, "call-me-maybe sent");
1050+
}
1051+
}
1052+
Err(last_refresh_ago) => {
1053+
debug!(
1054+
?last_refresh_ago,
1055+
"want call-me-maybe but direct addrs stale; queuing after restun",
1056+
);
1057+
self.actor_sender
1058+
.try_send(ActorMessage::ScheduleDirectAddrUpdate(
1059+
UpdateReason::RefreshForPeering,
1060+
Some((dst_node, relay_url)),
1061+
))
1062+
.ok();
1063+
}
1064+
}
1065+
}
1066+
PingAction::SendPing(SendPing {
1067+
id,
1068+
dst,
1069+
dst_node,
1070+
tx_id,
1071+
purpose,
1072+
}) => {
1073+
let msg = disco::Message::Ping(disco::Ping {
1074+
tx_id,
1075+
node_key: self.public_key,
1076+
});
1077+
1078+
self.try_send_disco_message(sender, dst.clone(), dst_node, msg)?;
1079+
debug!(%dst, tx = %HEXLOWER.encode(&tx_id), ?purpose, "ping sent");
1080+
let msg_sender = self.actor_sender.clone();
1081+
self.node_map
1082+
.notify_ping_sent(id, dst, tx_id, purpose, msg_sender);
1083+
}
1084+
}
1085+
}
1086+
Ok(())
1087+
}
1088+
1089+
/// Tries to send out a disco message.
1090+
fn try_send_disco_message(
1091+
&self,
1092+
sender: &UdpSender,
1093+
dst: SendAddr,
1094+
dst_key: PublicKey,
1095+
msg: disco::Message,
1096+
) -> io::Result<()> {
1097+
let dst = match dst {
1098+
SendAddr::Udp(addr) => transports::Addr::Ip(addr),
1099+
SendAddr::Relay(url) => transports::Addr::Relay(url, dst_key),
1100+
};
1101+
1102+
trace!(?dst, %msg, "send disco message (UDP)");
1103+
if self.is_closed() {
1104+
return Err(io::Error::new(
1105+
io::ErrorKind::NotConnected,
1106+
"connection closed",
1107+
));
1108+
}
1109+
1110+
let pkt = self.disco.encode_and_seal(self.public_key, dst_key, &msg);
1111+
1112+
let transmit = transports::Transmit {
1113+
contents: &pkt,
1114+
ecn: None,
1115+
segment_size: None,
1116+
};
1117+
1118+
let dst2 = dst.clone();
1119+
match sender.inner_try_send(&dst2, None, &transmit) {
1120+
Ok(()) => {
1121+
trace!(?dst, %msg, "sent disco message");
1122+
self.metrics.magicsock.sent_disco_udp.inc();
1123+
disco_message_sent(&msg, &self.metrics.magicsock);
1124+
Ok(())
1125+
}
1126+
Err(err) => {
1127+
warn!(?dst, ?msg, ?err, "failed to send disco message");
1128+
Err(err)
1129+
}
1130+
}
1131+
}
10211132

10221133
/// Publishes our address to a discovery service, if configured.
10231134
///
@@ -1649,7 +1760,6 @@ impl AsyncUdpSocket for MagicUdpSocket {
16491760

16501761
#[derive(Debug)]
16511762
enum ActorMessage {
1652-
PingActions(Vec<PingAction>),
16531763
EndpointPingExpired(usize, stun_rs::TransactionId),
16541764
NetworkChange,
16551765
ScheduleDirectAddrUpdate(UpdateReason, Option<(NodeId, RelayUrl)>),
@@ -1790,7 +1900,7 @@ impl Actor {
17901900

17911901
trace!(?msg, "tick: msg");
17921902
self.msock.metrics.magicsock.actor_tick_msg.inc();
1793-
self.handle_actor_message(msg, &sender).await;
1903+
self.handle_actor_message(msg).await;
17941904
}
17951905
tick = self.periodic_re_stun_timer.tick() => {
17961906
trace!("tick: re_stun {:?}", tick);
@@ -1949,7 +2059,7 @@ impl Actor {
19492059
/// Processes an incoming actor message.
19502060
///
19512061
/// Returns `true` if it was a shutdown.
1952-
async fn handle_actor_message(&mut self, msg: ActorMessage, sender: &UdpSender) {
2062+
async fn handle_actor_message(&mut self, msg: ActorMessage) {
19532063
match msg {
19542064
ActorMessage::EndpointPingExpired(id, txid) => {
19552065
self.msock.node_map.notify_ping_timeout(id, txid);
@@ -1969,9 +2079,6 @@ impl Actor {
19692079
ActorMessage::ForceNetworkChange(is_major) => {
19702080
self.handle_network_change(is_major).await;
19712081
}
1972-
ActorMessage::PingActions(ping_actions) => {
1973-
self.handle_ping_actions(sender, ping_actions).await;
1974-
}
19752082
}
19762083
}
19772084

iroh/src/magicsock/transports.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,7 @@ impl UdpSender {
454454
}
455455

456456
/// Best effort sending
457-
fn inner_try_send(
457+
pub(crate) fn inner_try_send(
458458
&self,
459459
destination: &Addr,
460460
src: Option<IpAddr>,
@@ -504,7 +504,7 @@ impl quinn::UdpSender for UdpSender {
504504
transmit: &quinn_udp::Transmit,
505505
cx: &mut Context,
506506
) -> Poll<io::Result<()>> {
507-
let active_paths = self.msock.prepare_send(transmit)?;
507+
let active_paths = self.msock.prepare_send(&self, transmit)?;
508508

509509
if active_paths.is_empty() {
510510
// Returning Ok here means we let QUIC timeout.
@@ -556,7 +556,7 @@ impl quinn::UdpSender for UdpSender {
556556
}
557557

558558
fn try_send(self: Pin<&mut Self>, transmit: &quinn_udp::Transmit) -> io::Result<()> {
559-
let active_paths = self.msock.prepare_send(transmit)?;
559+
let active_paths = self.msock.prepare_send(&self, transmit)?;
560560
if active_paths.is_empty() {
561561
// Returning Ok here means we let QUIC timeout.
562562
// Returning an error would immediately fail a connection.

0 commit comments

Comments
 (0)