Skip to content

Commit ea01b3a

Browse files
committed
chore: more robust followee_head_checker
1 parent 26cfccb commit ea01b3a

File tree

8 files changed

+160
-62
lines changed

8 files changed

+160
-62
lines changed

.aider.conf.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,2 @@
1-
lint-cmd: cargo check -q
21
test-cmd: cargo nextest run --cargo-quiet
32
cache-prompts: true
4-
stream: false

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ resolver = "2"
1616

1717
[workspace.package]
1818
version = "0.1.0"
19-
edition = "2021"
19+
edition = "2024"
2020
license = "MIT OR APACHE-2.0 OR MPL-2.0"
2121
description = "A p2p (f2f) social network."
2222

crates/rostra-client/src/task.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
pub(crate) mod connection_cache;
12
pub(crate) mod followee_head_checker;
23
pub(crate) mod head_merger;
34
pub(crate) mod head_update_broadcaster;
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
use std::collections::BTreeMap;
2+
use std::collections::btree_map::Entry;
3+
4+
use rostra_core::id::RostraId;
5+
use rostra_p2p::Connection;
6+
7+
use crate::ClientRef;
8+
9+
#[derive(Debug)]
10+
pub enum ConnectionState {
11+
Connected(Connection),
12+
Failed,
13+
}
14+
15+
#[derive(Default)]
16+
pub struct ConnectionCache {
17+
connections: BTreeMap<RostraId, ConnectionState>,
18+
}
19+
20+
impl ConnectionCache {
21+
pub fn new() -> Self {
22+
Self::default()
23+
}
24+
25+
pub async fn get_or_connect(
26+
&mut self,
27+
client: &ClientRef<'_>,
28+
id: RostraId,
29+
) -> Option<&mut Connection> {
30+
match self.connections.entry(id) {
31+
Entry::Occupied(entry) => match entry.get() {
32+
ConnectionState::Connected(_) => {}
33+
ConnectionState::Failed => return None,
34+
},
35+
Entry::Vacant(entry) => match client.connect(id).await {
36+
Ok(conn) => {
37+
entry.insert(ConnectionState::Connected(conn));
38+
}
39+
Err(_) => {
40+
entry.insert(ConnectionState::Failed);
41+
return None;
42+
}
43+
},
44+
}
45+
46+
let ConnectionState::Connected(conn) =
47+
self.connections.get_mut(&id).expect("Just inserted")
48+
else {
49+
unreachable!()
50+
};
51+
Some(conn)
52+
}
53+
}

crates/rostra-client/src/task/followee_head_checker.rs

Lines changed: 78 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
use std::collections::{BinaryHeap, HashMap};
1+
use std::collections::{BTreeMap, BinaryHeap, HashMap};
22
use std::sync::Arc;
33
use std::time::Duration;
44

55
use rostra_client_db::{Database, IdsFolloweesRecord, InsertEventOutcome};
6+
use rostra_core::ShortEventId;
67
use rostra_core::event::PersonaId;
78
use rostra_core::id::RostraId;
8-
use rostra_core::ShortEventId;
9+
use rostra_p2p::Connection;
910
use rostra_p2p::connection::GetHeadRequest;
1011
use rostra_util::is_rostra_dev_mode_set;
1112
use rostra_util_error::{BoxedErrorResult, FmtCompact, WhateverResult};
@@ -14,8 +15,9 @@ use snafu::ResultExt as _;
1415
use tokio::sync::watch;
1516
use tracing::{debug, info, instrument, trace};
1617

17-
use crate::client::Client;
18+
use super::connection_cache::ConnectionCache;
1819
use crate::ClientRef;
20+
use crate::client::Client;
1921
const LOG_TARGET: &str = "rostra::head_checker";
2022

2123
pub struct FolloweeHeadChecker {
@@ -33,7 +35,6 @@ impl FolloweeHeadChecker {
3335
client: client.handle(),
3436
db: client.db().to_owned(),
3537
self_id: client.rostra_id(),
36-
3738
followee_updated: client.self_followees_subscribe(),
3839
check_for_updates_rx: client.check_for_updates_tx_subscribe(),
3940
}
@@ -70,6 +71,9 @@ impl FolloweeHeadChecker {
7071
break;
7172
};
7273

74+
let mut connections = ConnectionCache::new();
75+
let mut followers_by_followee = BTreeMap::new();
76+
7377
let self_followees = storage.get_self_followees().await;
7478

7579
for (followee, _persona_id) in [&(self.self_id, PersonaId::default())]
@@ -98,9 +102,16 @@ impl FolloweeHeadChecker {
98102
}
99103
Ok(Some(head)) => {
100104
info!(target: LOG_TARGET, id = %followee, %source, "Has updates");
101-
if let Err(err) = self.download_new_data(&client, *followee, head).await
105+
if let Err(err) = self
106+
.download_new_data(
107+
*followee,
108+
head,
109+
&mut connections,
110+
&mut followers_by_followee,
111+
)
112+
.await
102113
{
103-
info!(target: LOG_TARGET, err = %err.fmt_compact(), id = %followee, "Failed to download new data");
114+
info!(target: LOG_TARGET, err = %(&*err).fmt_compact(), id = %followee, "Failed to download new data");
104115
}
105116
}
106117
}
@@ -147,20 +158,73 @@ impl FolloweeHeadChecker {
147158
}
148159

149160
async fn download_new_data(
161+
&self,
162+
rostra_id: RostraId,
163+
head: ShortEventId,
164+
connections: &mut ConnectionCache,
165+
followers_by_followee: &mut BTreeMap<RostraId, Vec<RostraId>>,
166+
) -> BoxedErrorResult<()> {
167+
let followers = if let Some(followers) = followers_by_followee.get(&rostra_id) {
168+
followers
169+
} else {
170+
let client = self.client.client_ref().boxed()?;
171+
let storage = client.db();
172+
let followers = storage.get_followers(rostra_id).await;
173+
followers_by_followee.insert(rostra_id, followers);
174+
175+
followers_by_followee
176+
.get(&rostra_id)
177+
.expect("Just inserted")
178+
};
179+
180+
for follower_id in followers.iter().chain([rostra_id, self.self_id].iter()) {
181+
let Ok(client) = self.client.client_ref().boxed() else {
182+
break;
183+
};
184+
let Some(conn) = connections.get_or_connect(&client, *follower_id).await else {
185+
continue;
186+
};
187+
188+
debug!(target: LOG_TARGET,
189+
rostra_id = %rostra_id,
190+
head = %head,
191+
follower_id = %follower_id,
192+
"Getting event data from a peer"
193+
);
194+
195+
match self
196+
.download_new_data_from(&client, rostra_id, conn, head)
197+
.await
198+
{
199+
Ok(true) => {
200+
return Ok(());
201+
}
202+
Ok(false) => {}
203+
Err(err) => {
204+
debug!(target: LOG_TARGET,
205+
rostra_id = %rostra_id,
206+
head = %head,
207+
follower_id = %follower_id,
208+
err = %err.fmt_compact(),
209+
"Error getting event from a peer"
210+
);
211+
}
212+
}
213+
}
214+
Ok(())
215+
}
216+
async fn download_new_data_from(
150217
&self,
151218
client: &ClientRef<'_>,
152219
rostra_id: RostraId,
220+
conn: &mut Connection,
153221
head: ShortEventId,
154-
) -> WhateverResult<()> {
222+
) -> WhateverResult<bool> {
155223
let mut events = BinaryHeap::from([(0, head)]);
224+
let mut downloaded_anything = false;
156225

157226
let storage = client.db();
158227

159-
let conn = client
160-
.connect(rostra_id)
161-
.await
162-
.whatever_context("Failed to connect")?;
163-
164228
let peer_id = conn.remote_node_id();
165229

166230
while let Some((depth, event_id)) = events.pop() {
@@ -187,6 +251,7 @@ impl FolloweeHeadChecker {
187251
);
188252
continue;
189253
};
254+
downloaded_anything = true;
190255
let (insert_outcome, process_state) = storage.process_event(&event).await;
191256

192257
if let InsertEventOutcome::Inserted {
@@ -226,6 +291,6 @@ impl FolloweeHeadChecker {
226291
}
227292
}
228293

229-
Ok(())
294+
Ok(downloaded_anything)
230295
}
231296
}

crates/rostra-client/src/task/missing_event_content_fetcher.rs

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
use std::collections::BTreeMap;
22
use std::time::Duration;
33

4+
use rostra_core::ShortEventId;
45
use rostra_core::event::VerifiedEvent;
56
use rostra_core::id::RostraId;
6-
use rostra_core::ShortEventId;
7-
use rostra_p2p::Connection;
87
use rostra_util::is_rostra_dev_mode_set;
98
use rostra_util_error::{BoxedErrorResult, FmtCompact, WhateverResult};
109
use snafu::ResultExt as _;
1110
use tracing::{debug, instrument, trace};
1211

12+
use super::connection_cache::ConnectionCache;
13+
use crate::LOG_TARGET;
1314
use crate::client::Client;
14-
use crate::{ClientHandle, LOG_TARGET};
1515

1616
#[derive(Clone)]
1717
pub struct MissingEventContentFetcher {
@@ -50,16 +50,15 @@ impl MissingEventContentFetcher {
5050

5151
let mut cursor: Option<ShortEventId> = None;
5252

53-
let mut connections = BTreeMap::new();
53+
let mut connections = ConnectionCache::new();
5454
let mut followers_by_followee = BTreeMap::new();
5555

5656
loop {
5757
let (events, new_cursor) = db.paginate_missing_events_contents(cursor, 100).await;
5858

5959
for (author_id, event_id) in events {
6060
let _ = self
61-
.get_event_from_followers(
62-
&self.client,
61+
.get_event_content_from_followers(
6362
author_id,
6463
event_id,
6564
&mut connections,
@@ -78,12 +77,11 @@ impl MissingEventContentFetcher {
7877
}
7978
}
8079

81-
async fn get_event_from_followers(
80+
async fn get_event_content_from_followers(
8281
&self,
83-
client: &ClientHandle,
8482
author_id: RostraId,
8583
event_id: ShortEventId,
86-
connections: &mut BTreeMap<RostraId, Connection>,
84+
connections: &mut ConnectionCache,
8785
followers_by_followee: &mut BTreeMap<RostraId, Vec<RostraId>>,
8886
db: &rostra_client_db::Database,
8987
) -> BoxedErrorResult<()> {
@@ -99,16 +97,12 @@ impl MissingEventContentFetcher {
9997
};
10098

10199
for follower_id in followers.iter().chain([author_id, self.self_id].iter()) {
102-
if connections.get(follower_id).is_none() {
103-
let conn = client
104-
.client_ref()
105-
.boxed()?
106-
.connect(*follower_id)
107-
.await
108-
.boxed()?;
109-
connections.insert(*follower_id, conn);
110-
}
111-
let conn = connections.get_mut(follower_id).expect("Must exist");
100+
let Ok(client) = self.client.client_ref().boxed() else {
101+
break;
102+
};
103+
let Some(conn) = connections.get_or_connect(&client, *follower_id).await else {
104+
continue;
105+
};
112106

113107
debug!(target: LOG_TARGET,
114108
author_id = %author_id,

crates/rostra-client/src/task/missing_event_fetcher.rs

Lines changed: 13 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
1-
use std::collections::BTreeMap;
2-
1+
use rostra_core::ShortEventId;
32
use rostra_core::event::{EventExt as _, SignedEventExt as _, VerifiedEvent};
43
use rostra_core::id::RostraId;
5-
use rostra_core::ShortEventId;
64
use rostra_p2p::Connection;
75
use rostra_util_error::{BoxedErrorResult, FmtCompact, WhateverResult};
86
use snafu::ResultExt as _;
97
use tracing::{debug, instrument, trace, warn};
108

9+
use super::connection_cache::ConnectionCache;
10+
use crate::LOG_TARGET;
1111
use crate::client::Client;
12-
use crate::{ClientHandle, LOG_TARGET};
1312

1413
#[derive(Clone)]
1514
pub struct MissingEventFetcher {
@@ -69,22 +68,22 @@ impl MissingEventFetcher {
6968
continue;
7069
}
7170

72-
let mut connections = BTreeMap::new();
71+
let mut connections = ConnectionCache::new();
7372

7473
for follower_id in followers.iter().chain([self.self_id].iter()) {
74+
let Ok(client) = self.client.client_ref().boxed() else {
75+
break;
76+
};
77+
let Some(conn) = connections.get_or_connect(&client, *follower_id).await else {
78+
continue;
79+
};
80+
7581
for missing_event in &missing_events {
7682
if db.has_event(*missing_event).await {
7783
continue;
7884
}
7985
match self
80-
.get_event_from(
81-
&self.client,
82-
author_id,
83-
*missing_event,
84-
*follower_id,
85-
&mut connections,
86-
&db,
87-
)
86+
.get_event_from(author_id, *missing_event, *follower_id, conn, &db)
8887
.await
8988
{
9089
Ok(_) => {}
@@ -97,7 +96,6 @@ impl MissingEventFetcher {
9796
err = %(&*err).fmt_compact(),
9897
"Error while getting id from a peer"
9998
);
100-
connections.remove(follower_id);
10199
}
102100
}
103101
}
@@ -107,24 +105,12 @@ impl MissingEventFetcher {
107105

108106
async fn get_event_from(
109107
&self,
110-
client: &ClientHandle,
111108
author_id: RostraId,
112109
event_id: ShortEventId,
113110
follower_id: RostraId,
114-
connections: &mut BTreeMap<RostraId, Connection>,
111+
conn: &mut Connection,
115112
db: &rostra_client_db::Database,
116113
) -> BoxedErrorResult<()> {
117-
if connections.get(&follower_id).is_none() {
118-
let conn = client
119-
.client_ref()
120-
.boxed()?
121-
.connect(follower_id)
122-
.await
123-
.boxed()?;
124-
connections.insert(follower_id, conn);
125-
}
126-
let conn = connections.get_mut(&follower_id).expect("Must exist");
127-
128114
debug!(target: LOG_TARGET,
129115
author_id = %author_id,
130116
event_id = %event_id,

0 commit comments

Comments
 (0)