Skip to content

Commit e927579

Browse files
committed
chore: connect to all endpoints in parallel
1 parent ea01b3a commit e927579

File tree

3 files changed

+58
-45
lines changed

3 files changed

+58
-45
lines changed

crates/rostra-client/src/client.rs

Lines changed: 47 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -11,29 +11,30 @@ use std::sync::{Arc, Weak};
1111
use std::time::Duration;
1212

1313
use backon::Retryable as _;
14+
use iroh::NodeAddr;
15+
use iroh::discovery::ConcurrentDiscovery;
1416
use iroh::discovery::dns::DnsDiscovery;
1517
use iroh::discovery::pkarr::PkarrPublisher;
16-
use iroh::discovery::ConcurrentDiscovery;
17-
use iroh::NodeAddr;
1818
use itertools::Itertools as _;
1919
use rostra_client_db::{Database, DbResult, IdsFolloweesRecord, IdsFollowersRecord};
2020
use rostra_core::event::{
21-
content_kind, Event, EventExt as _, EventKind, IrohNodeId, PersonaId, SignedEvent,
22-
VerifiedEvent, VerifiedEventContent,
21+
Event, EventExt as _, EventKind, IrohNodeId, PersonaId, SignedEvent, VerifiedEvent,
22+
VerifiedEventContent, content_kind,
2323
};
2424
use rostra_core::id::{RostraId, RostraIdSecretKey, ToShort as _};
2525
use rostra_core::{ExternalEventId, ShortEventId};
2626
use rostra_p2p::connection::{Connection, FeedEventRequest, FeedEventResponse, PingRequest};
27-
use rostra_p2p::RpcError;
27+
use rostra_p2p::{ConnectionSnafu, RpcError};
2828
use rostra_p2p_api::ROSTRA_P2P_V0_ALPN;
2929
use rostra_util_error::FmtCompact as _;
3030
use rostra_util_fmt::AsFmtOption as _;
31-
use snafu::{ensure, Location, OptionExt as _, ResultExt as _, Snafu};
31+
use snafu::{Location, OptionExt as _, ResultExt as _, Snafu, ensure};
3232
use tokio::sync::{broadcast, watch};
3333
use tokio::time::Instant;
3434
use tracing::{debug, info, trace, warn};
3535

36-
use super::{get_rrecord_typed, RRECORD_HEAD_KEY, RRECORD_P2P_KEY};
36+
use super::{RRECORD_HEAD_KEY, RRECORD_P2P_KEY, get_rrecord_typed};
37+
use crate::LOG_TARGET;
3738
use crate::error::{
3839
ActivateResult, ConnectIrohSnafu, ConnectResult, IdResolveError, IdResolveResult,
3940
IdSecretReadResult, InitIrohClientSnafu, InitPkarrClientSnafu, InitResult, InvalidIdSnafu,
@@ -46,7 +47,6 @@ use crate::task::missing_event_content_fetcher::MissingEventContentFetcher;
4647
use crate::task::missing_event_fetcher::MissingEventFetcher;
4748
use crate::task::pkarr_id_publisher::PkarrIdPublisher;
4849
use crate::task::request_handler::RequestHandler;
49-
use crate::LOG_TARGET;
5050

5151
#[derive(Debug, Snafu)]
5252
#[snafu(visibility(pub))]
@@ -254,51 +254,59 @@ impl Client {
254254
}
255255

256256
pub async fn connect(&self, id: RostraId) -> ConnectResult<Connection> {
257-
// TODO: maintain connection attempt stats and use them to prioritize best
258-
// endpoints
259-
for ((_ts, node_id), _stats) in self.db.get_id_endpoints(id).await.into_iter().rev() {
257+
let endpoints = self.db.get_id_endpoints(id).await;
258+
259+
// Try all known endpoints in parallel
260+
let mut connection_attempts = Vec::new();
261+
for ((_ts, node_id), _stats) in endpoints {
260262
let Ok(node_id) = iroh::NodeId::from_bytes(&node_id.to_bytes()) else {
261263
debug!(target: LOG_TARGET, %id, "Invalid iroh id for rostra id found");
262264
continue;
263265
};
264266

265267
if node_id == self.endpoint.node_id() {
266-
// If we are trying to connect to our own Id, we want to connect (if possible)
267-
// with some other node.
268+
// Skip connecting to our own Id
268269
continue;
269270
}
270271

271-
let conn = match self.endpoint.connect(node_id, ROSTRA_P2P_V0_ALPN).await {
272-
Ok(conn) => Connection::from(conn),
273-
Err(err) => {
274-
debug!(
275-
target: LOG_TARGET,
276-
%id,
277-
%node_id,
278-
our_id = %self.endpoint.node_id(),
279-
err = %format!("{err:#}"),
280-
"Failed to connect to a know iroh endpoint"
281-
);
282-
continue;
283-
}
284-
};
272+
let endpoint = self.endpoint.clone();
273+
connection_attempts.push(tokio::spawn(async move {
274+
let conn = endpoint
275+
.connect(node_id, ROSTRA_P2P_V0_ALPN)
276+
.await
277+
.context(ConnectionSnafu)?;
278+
let conn = Connection::from(conn);
279+
280+
// Verify connection with ping
281+
conn.make_rpc(&PingRequest(0)).await?;
282+
Ok::<_, RpcError>(conn)
283+
}));
284+
}
285285

286-
// Make a ping request, just to make sure we can talk
287-
match conn.make_rpc(&PingRequest(0)).await {
288-
Ok(_) => return Ok(conn),
289-
Err(err) => {
290-
debug!(
291-
target: LOG_TARGET,
292-
%id,
293-
%node_id,
294-
err = %format!("{err:#}"),
295-
"Failed to ping a know iroh endpoint"
296-
);
297-
continue;
286+
if !connection_attempts.is_empty() {
287+
// Wait for first successful connection or all failures
288+
for attempt in connection_attempts {
289+
match attempt.await {
290+
Ok(Ok(conn)) => return Ok(conn),
291+
Ok(Err(err)) => {
292+
debug!(
293+
target: LOG_TARGET,
294+
%id,
295+
err = %err.fmt_compact(),
296+
"Failed to connect to endpoint"
297+
);
298+
}
299+
Err(_) => continue,
298300
}
299301
}
300302
}
303+
debug!(
304+
target: LOG_TARGET,
305+
%id,
306+
"All known endpoints failed, trying pkarr resolution"
307+
);
301308

309+
// Fall back to pkarr if no known endpoints worked
302310
self.connect_by_pkarr_resolution(id).await
303311
}
304312

crates/rostra-p2p/src/connection.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::pin::Pin;
44

55
use bao_tree::io::outboard::{EmptyOutboard, PreOrderMemOutboard};
66
use bao_tree::io::round_up_to_chunks;
7-
use bao_tree::{blake3, BlockSize, ByteRanges};
7+
use bao_tree::{BlockSize, ByteRanges, blake3};
88
use bincode::{Decode, Encode};
99
use convi::{CastInto, ExpectFrom};
1010
use iroh::endpoint::{RecvStream, SendStream};
@@ -20,8 +20,9 @@ use snafu::{OptionExt as _, ResultExt as _};
2020
use tracing::trace;
2121

2222
use crate::{
23-
ConnectionSnafu, DecodingBaoSnafu, DecodingSnafu, EncodingBaoSnafu, EventVerificationSnafu,
24-
FailedSnafu, MessageTooLargeSnafu, ReadSnafu, RpcResult, TrailerSnafu, WriteSnafu, LOG_TARGET,
23+
DecodingBaoSnafu, DecodingSnafu, EncodingBaoSnafu, EventVerificationSnafu, FailedSnafu,
24+
LOG_TARGET, MessageTooLargeSnafu, ReadSnafu, RpcResult, StreamConnectionSnafu, TrailerSnafu,
25+
WriteSnafu,
2526
};
2627

2728
#[derive(Debug)]
@@ -241,7 +242,7 @@ fn rpc_request_to_bytes_test() {
241242

242243
impl Connection {
243244
pub async fn make_rpc<R: Rpc>(&self, request: &R) -> RpcResult<<R as Rpc>::Response> {
244-
let (mut send, mut recv) = self.0.open_bi().await.context(ConnectionSnafu)?;
245+
let (mut send, mut recv) = self.0.open_bi().await.context(StreamConnectionSnafu)?;
245246

246247
Self::write_rpc_request(&mut send, request).await?;
247248

@@ -270,7 +271,7 @@ impl Connection {
270271
)
271272
-> Pin<Box<dyn Future<Output = BoxedErrorResult<()>> + 's + Send + Sync>>,
272273
{
273-
let (mut send, mut recv) = self.0.open_bi().await.context(ConnectionSnafu)?;
274+
let (mut send, mut recv) = self.0.open_bi().await.context(StreamConnectionSnafu)?;
274275

275276
Self::write_rpc_request(&mut send, request).await?;
276277

@@ -296,7 +297,7 @@ impl Connection {
296297
)
297298
-> Pin<Box<dyn Future<Output = BoxedErrorResult<T>> + 's + Send + Sync>>,
298299
{
299-
let (mut send, mut recv) = self.0.open_bi().await.context(ConnectionSnafu)?;
300+
let (mut send, mut recv) = self.0.open_bi().await.context(StreamConnectionSnafu)?;
300301

301302
Self::write_rpc_request(&mut send, request).await?;
302303

crates/rostra-p2p/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@ pub const LOG_TARGET: &str = "rostra::p2p";
1111

1212
#[derive(Debug, Snafu)]
1313
pub enum RpcError {
14+
#[snafu(visibility(pub))]
1415
Connection {
16+
source: anyhow::Error,
17+
},
18+
StreamConnection {
1519
source: iroh::endpoint::ConnectionError,
1620
},
1721
Write {

0 commit comments

Comments
 (0)