Skip to content

Commit e0c06a9

Browse files
committed
fix(client): connectvity caching
OMG, I'm so dumb.
1 parent af4fcc3 commit e0c06a9

File tree

5 files changed

+33
-16
lines changed

5 files changed

+33
-16
lines changed

crates/rostra-client/src/client.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@ use tracing::{debug, info, trace, warn};
3434
use super::{RRECORD_HEAD_KEY, RRECORD_P2P_KEY, get_rrecord_typed};
3535
use crate::LOG_TARGET;
3636
use crate::error::{
37-
ActivateResult, ActivateSnafu, IdResolveError, IdResolveResult, IdSecretReadResult,
38-
InitIrohClientSnafu, InitPkarrClientSnafu, InitResult, InvalidIdSnafu, IoSnafu,
39-
MissingTicketSnafu, ParsingSnafu, PkarrResolveSnafu, PostResult, RRecordSnafu,
37+
ActivateResult, ActivateSnafu, ConnectResult, IdResolveError, IdResolveResult,
38+
IdSecretReadResult, InitIrohClientSnafu, InitPkarrClientSnafu, InitResult, InvalidIdSnafu,
39+
IoSnafu, MissingTicketSnafu, ParsingSnafu, PkarrResolveSnafu, PostResult, RRecordSnafu,
4040
SecretMismatchSnafu,
4141
};
4242
use crate::id::{CompactTicket, IdPublishedData, IdResolvedData};
@@ -227,7 +227,7 @@ impl ClientRef<'_> {
227227
/// Returns a cached connection if available, otherwise creates a new one.
228228
/// This is more efficient than `connect_uncached` when making repeated
229229
/// connections to the same peer.
230-
pub async fn connect_cached(&self, id: RostraId) -> Option<Connection> {
230+
pub async fn connect_cached(&self, id: RostraId) -> ConnectResult<Connection> {
231231
self.connection_cache.get_or_connect(self, id).await
232232
}
233233
}

crates/rostra-client/src/connection_cache.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@ use tokio::sync::{Mutex, OnceCell};
77
use tracing::trace;
88

99
use crate::ClientRef;
10+
use crate::error::ConnectResult;
1011

1112
const LOG_TARGET: &str = "rostra-client::connection-cache";
1213

13-
type LazySharedConnection = Arc<OnceCell<Option<Connection>>>;
14+
type LazySharedConnection = Arc<OnceCell<Connection>>;
1415

1516
#[derive(Clone)]
1617
pub struct ConnectionCache {
@@ -30,14 +31,18 @@ impl ConnectionCache {
3031
}
3132
}
3233

33-
pub async fn get_or_connect(&self, client: &ClientRef<'_>, id: RostraId) -> Option<Connection> {
34+
pub async fn get_or_connect(
35+
&self,
36+
client: &ClientRef<'_>,
37+
id: RostraId,
38+
) -> ConnectResult<Connection> {
3439
let mut pool_lock = self.connections.lock().await;
3540

3641
let entry_arc = pool_lock
3742
.entry(id)
3843
.and_modify(|entry_arc| {
3944
// Check if existing connection is disconnected and remove it
40-
if let Some(Some(existing_conn)) = entry_arc.get()
45+
if let Some(existing_conn) = entry_arc.get()
4146
&& existing_conn.is_closed() {
4247
trace!(target: LOG_TARGET, %id, "Existing connection is disconnected, removing from pool");
4348
*entry_arc = Arc::new(OnceCell::new());
@@ -50,21 +55,21 @@ impl ConnectionCache {
5055
drop(pool_lock);
5156

5257
let result = entry_arc
53-
.get_or_init(|| async {
58+
.get_or_try_init(|| async {
5459
trace!(target: LOG_TARGET, %id, "Creating new connection");
5560
match client.connect_uncached(id).await {
5661
Ok(conn) => {
5762
trace!(target: LOG_TARGET, %id, "Connection successful");
58-
Some(conn)
63+
Ok(conn)
5964
}
6065
Err(err) => {
6166
trace!(target: LOG_TARGET, %id, err = %err, "Connection failed");
62-
None
67+
Err(err)
6368
}
6469
}
6570
})
6671
.await;
6772

68-
result.clone()
73+
result.cloned()
6974
}
7075
}

crates/rostra-client/src/task/followee_head_checker.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ impl FolloweeHeadChecker {
160160
id: RostraId,
161161
connections: &ConnectionCache,
162162
) -> BoxedErrorResult<Option<ShortEventId>> {
163-
let Some(conn) = connections.get_or_connect(client, id).await else {
163+
let Ok(conn) = connections.get_or_connect(client, id).await else {
164164
return Ok(None);
165165
};
166166

@@ -239,8 +239,19 @@ impl FolloweeHeadChecker {
239239
};
240240

241241
// ConnectionCache handles its own interior mutability
242-
let Some(conn) = connections.get_or_connect(&client, *follower_id).await else {
243-
continue;
242+
let conn = match connections.get_or_connect(&client, *follower_id).await {
243+
Ok(conn) => conn,
244+
Err(err) => {
245+
debug!(target: LOG_TARGET,
246+
rostra_id = %rostra_id,
247+
head = %head,
248+
follower_id = %follower_id,
249+
err = %err.fmt_compact(),
250+
"Could not connect to fetch updates"
251+
);
252+
253+
continue;
254+
}
244255
};
245256

246257
debug!(target: LOG_TARGET,

crates/rostra-client/src/task/missing_event_fetcher.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ impl MissingEventFetcher {
8888
follower_id = %follower_id,
8989
"Looking for a missing events from"
9090
);
91-
let Some(conn) = connections.get_or_connect(&client, *follower_id).await else {
91+
let Ok(conn) = connections.get_or_connect(&client, *follower_id).await else {
9292
debug!(
9393
target: LOG_TARGET,
9494
author_id = %author_id,

crates/rostra-client/src/util/rpc.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ pub async fn get_event_content_from_followers(
5151

5252
let conn = (connections_cache
5353
.get_or_connect(&client_ref, follower_id)
54-
.await)?;
54+
.await)
55+
.ok()?;
5556

5657
debug!(target: LOG_TARGET,
5758
author_id = %author_id.to_short(),

0 commit comments

Comments
 (0)