Skip to content

Commit 5c8cb37

Browse files
committed
wip: Signal when the nodes are available, not when they are discovered
1 parent 5f79a70 commit 5c8cb37

File tree

2 files changed

+131
-47
lines changed

2 files changed

+131
-47
lines changed

dht-cache/src/cache.rs

Lines changed: 77 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,30 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
1616
use crate::domolibp2p::{self, generate_rsa_key};
1717
use crate::{
1818
cache::local::DomoCacheStateMessage,
19-
data::DomoEvent,
20-
dht::{dht_channel, Command, Event},
19+
dht::{dht_channel, Command, Event as DhtEvent},
2120
domolibp2p::DomoBehaviour,
2221
utils, Error,
2322
};
2423

2524
use self::local::{DomoCacheElement, LocalCache, Query};
2625

26+
/// DHT state change
27+
#[derive(Debug)]
28+
pub enum Event {
29+
/// Persistent, structured data
30+
///
31+
/// The information is persisted across nodes.
32+
/// Newly joining nodes will receive it from other participants and
33+
/// the local cache can be queried for it.
34+
PersistentData(DomoCacheElement),
35+
/// Volatile, unstructured data
36+
///
37+
/// The information is transmitted across all the nodes participating
38+
VolatileData(Value),
39+
/// Notify the peer availability
40+
ReadyPeers(Vec<String>),
41+
}
42+
2743
/// Builder for a Cached DHT Node
2844
// TODO: make it Clone
2945
pub struct Builder {
@@ -37,9 +53,7 @@ impl Builder {
3753
}
3854

3955
/// Instantiate a new DHT node a return
40-
pub async fn make_channel(
41-
self,
42-
) -> Result<(Cache, impl Stream<Item = DomoEvent>), crate::Error> {
56+
pub async fn make_channel(self) -> Result<(Cache, impl Stream<Item = Event>), crate::Error> {
4357
let loopback_only = self.cfg.loopback;
4458
let shared_key = domolibp2p::parse_hex_key(&self.cfg.shared_key)?;
4559
let private_key_file = self.cfg.private_key.as_ref();
@@ -239,7 +253,7 @@ pub fn cache_channel(
239253
local: LocalCache,
240254
swarm: Swarm<DomoBehaviour>,
241255
resend_interval: u64,
242-
) -> (Cache, impl Stream<Item = DomoEvent>) {
256+
) -> (Cache, impl Stream<Item = Event>) {
243257
let local_peer_id = swarm.local_peer_id().to_string();
244258

245259
let (cmd, r, _j) = dht_channel(swarm);
@@ -289,7 +303,7 @@ pub fn cache_channel(
289303
let cmd = cmd.clone();
290304
async move {
291305
match ev {
292-
Event::Config(cfg) => {
306+
DhtEvent::Config(cfg) => {
293307
let m: DomoCacheStateMessage = serde_json::from_str(&cfg).unwrap();
294308

295309
let hash = local_write.get_hash().await;
@@ -338,16 +352,16 @@ pub fn cache_channel(
338352

339353
None
340354
}
341-
Event::Discovered(who) => Some(DomoEvent::NewPeers(
355+
DhtEvent::Discovered(_who) => None /* Some(DomoEvent::NewPeers(
342356
who.into_iter().map(|w| w.to_string()).collect(),
343-
)),
344-
Event::VolatileData(data) => {
357+
))*/,
358+
DhtEvent::VolatileData(data) => {
345359
// TODO we swallow errors quietly here
346360
serde_json::from_str(&data)
347361
.ok()
348-
.map(DomoEvent::VolatileData)
362+
.map(Event::VolatileData)
349363
}
350-
Event::PersistentData(data) => {
364+
DhtEvent::PersistentData(data) => {
351365
if let Ok(mut elem) = serde_json::from_str::<DomoCacheElement>(&data) {
352366
if elem.republication_timestamp != 0 {
353367
log::debug!("Retransmission");
@@ -358,7 +372,15 @@ pub fn cache_channel(
358372
.try_put(&elem)
359373
.await
360374
.ok()
361-
.map(|_| DomoEvent::PersistentData(elem))
375+
.map(|_| Event::PersistentData(elem))
376+
} else {
377+
None
378+
}
379+
}
380+
DhtEvent::Ready(peers) => {
381+
if !peers.is_empty() {
382+
Some(Event::ReadyPeers(
383+
peers.into_iter().map(|p| p.to_string()).collect()))
362384
} else {
363385
None
364386
}
@@ -416,38 +438,52 @@ mod test {
416438
expected_peers.insert(b_c.peer_id.clone());
417439
expected_peers.insert(c_c.peer_id.clone());
418440

419-
tokio::task::spawn(async move {
420-
let a_ev = pin!(a_ev);
421-
let b_ev = pin!(b_ev);
422-
let c_ev = pin!(c_ev);
423-
for uuid in 0..10 {
424-
let _ = a_c
425-
.put(
426-
"Topic",
427-
&format!("uuid-{uuid}"),
428-
serde_json::json!({"key": uuid}),
429-
)
430-
.await;
441+
let mut a_ev = pin!(a_ev);
442+
let b_ev = pin!(b_ev);
443+
let c_ev = pin!(c_ev);
444+
445+
while let Some(ev) = a_ev.next().await {
446+
match ev {
447+
Event::ReadyPeers(peers) => {
448+
log::info!("Ready peers {peers:?}");
449+
break;
450+
}
451+
_ => log::debug!("waiting for ready {ev:?}"),
431452
}
453+
}
432454

433-
let mut s = (
434-
a_ev.map(|ev| ("a", ev)),
435-
b_ev.map(|ev| ("b", ev)),
436-
c_ev.map(|ev| ("c", ev)),
437-
)
438-
.merge();
439-
440-
while let Some((node, ev)) = s.next().await {
441-
match ev {
442-
DomoEvent::PersistentData(data) => {
443-
log::debug!("{node}: Got data {data:?}");
444-
}
445-
_ => {
446-
log::debug!("{node}: Other {ev:?}");
455+
for uuid in 0..10 {
456+
let _ = a_c
457+
.put(
458+
"Topic",
459+
&format!("uuid-{uuid}"),
460+
serde_json::json!({"key": uuid}),
461+
)
462+
.await;
463+
}
464+
let mut s = (
465+
a_ev.map(|ev| ("a", ev)),
466+
b_ev.map(|ev| ("b", ev)),
467+
c_ev.map(|ev| ("c", ev)),
468+
)
469+
.merge();
470+
471+
// wait for the nodes to have at least some elements
472+
let mut seen = 0;
473+
while let Some((node, ev)) = s.next().await {
474+
match ev {
475+
Event::PersistentData(data) => {
476+
log::debug!("{node}: Got data {data:?}");
477+
seen += 1;
478+
if seen > 10 {
479+
break;
447480
}
448481
}
482+
_ => {
483+
log::debug!("{node}: Other {ev:?}");
484+
}
449485
}
450-
});
486+
}
451487

452488
log::info!("Adding D");
453489

@@ -457,7 +493,7 @@ mod test {
457493
while !expected.is_empty() {
458494
let ev = d_ev.next().await.unwrap();
459495
match ev {
460-
DomoEvent::PersistentData(data) => {
496+
Event::PersistentData(data) => {
461497
assert!(expected.remove(&data.topic_uuid));
462498
log::warn!("d: Got data {data:?}");
463499
}

dht-cache/src/dht.rs

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
//! DHT Abstraction
22
//!
33
4+
use std::time::Duration;
5+
46
use crate::domolibp2p::{DomoBehaviour, OutEvent};
57
use futures::prelude::*;
68
use libp2p::{gossipsub::IdentTopic as Topic, swarm::SwarmEvent, Swarm};
@@ -26,6 +28,7 @@ pub enum Event {
2628
VolatileData(String),
2729
Config(String),
2830
Discovered(Vec<PeerId>),
31+
Ready(Vec<PeerId>),
2932
}
3033

3134
fn handle_command(swarm: &mut Swarm<DomoBehaviour>, cmd: Command) -> bool {
@@ -117,6 +120,11 @@ fn handle_swarm_event<E>(
117120
}
118121
}
119122
}
123+
SwarmEvent::Behaviour(crate::domolibp2p::OutEvent::Gossipsub(
124+
libp2p::gossipsub::Event::Subscribed { peer_id, topic },
125+
)) => {
126+
log::debug!("Peer {peer_id} subscribed to {}", topic.as_str());
127+
}
120128
SwarmEvent::Behaviour(crate::domolibp2p::OutEvent::Mdns(mdns::Event::Expired(list))) => {
121129
let local = OffsetDateTime::now_utc();
122130

@@ -154,16 +162,37 @@ pub fn dht_channel(
154162
let (ev_send, ev_recv) = mpsc::unbounded_channel();
155163

156164
let handle = tokio::task::spawn(async move {
165+
let mut interval = tokio::time::interval(Duration::from_secs(1));
166+
let volatile = Topic::new("domo-volatile-data").hash();
167+
let persistent = Topic::new("domo-persistent-data").hash();
168+
let config = Topic::new("domo-config").hash();
157169
loop {
170+
log::info!("Looping {}", swarm.local_peer_id());
158171
tokio::select! {
172+
// the mdns event is not enough to ensure we can send messages
173+
_ = interval.tick() => {
174+
log::info!("Checking for peers");
175+
let peers: Vec<_> = swarm.behaviour_mut().gossipsub.all_peers().filter_map(|(p, topics)| {
176+
log::info!("{p}, {topics:?}");
177+
(topics.contains(&&volatile) &&
178+
topics.contains(&&persistent) &&
179+
topics.contains(&&config)).then(
180+
||p.to_owned())
181+
}).collect();
182+
if !peers.is_empty() &&
183+
ev_send.send(Event::Ready(peers)).is_err() {
184+
return swarm;
185+
}
186+
}
159187
cmd = cmd_recv.recv() => {
160-
log::debug!("command {cmd:?}");
188+
log::info!("command {cmd:?}");
161189
if !cmd.is_some_and(|cmd| handle_command(&mut swarm, cmd)) {
162190
log::debug!("Exiting cmd");
163191
return swarm
164192
}
165193
}
166194
ev = swarm.select_next_some() => {
195+
log::info!("event {ev:?}");
167196
if handle_swarm_event(&mut swarm, ev, &ev_send).is_err() {
168197
log::debug!("Exiting ev");
169198
return swarm
@@ -217,10 +246,9 @@ pub(crate) mod test {
217246
}
218247

219248
pub async fn make_peer(variant: u8) -> Swarm<DomoBehaviour> {
220-
let mut a = new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap(), variant);
221-
a.listen().await;
222-
223-
a
249+
let mut swarm = new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap(), variant);
250+
swarm.listen().await;
251+
swarm
224252
}
225253

226254
pub async fn connect_peer(a: &mut Swarm<DomoBehaviour>, b: &mut Swarm<DomoBehaviour>) {
@@ -235,10 +263,15 @@ pub(crate) mod test {
235263

236264
pub async fn make_peers(variant: u8) -> [Swarm<DomoBehaviour>; 3] {
237265
let _ = env_logger::builder().is_test(true).try_init();
266+
238267
let mut a = new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap(), variant);
239268
let mut b = new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap(), variant);
240269
let mut c = new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap(), variant);
241-
270+
/*
271+
let mut a = Swarm::new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap());
272+
let mut b = Swarm::new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap());
273+
let mut c = Swarm::new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap());
274+
*/
242275
for a in a.external_addresses() {
243276
log::info!("{a:?}");
244277
}
@@ -251,8 +284,14 @@ pub(crate) mod test {
251284
b.connect(&mut c).await;
252285
c.connect(&mut a).await;
253286

287+
println!("a {}", a.local_peer_id());
288+
println!("b {}", b.local_peer_id());
289+
println!("c {}", c.local_peer_id());
290+
254291
let peers: Vec<_> = a.connected_peers().cloned().collect();
255292

293+
log::info!("Peers {peers:#?}");
294+
256295
for peer in peers {
257296
a.behaviour_mut().gossipsub.add_explicit_peer(&peer);
258297
}
@@ -280,6 +319,8 @@ pub(crate) mod test {
280319
let (b_s, br, _) = dht_channel(b);
281320
let (c_s, cr, _) = dht_channel(c);
282321

322+
log::info!("Waiting for peers");
323+
283324
// Wait until peers are discovered
284325
while let Some(ev) = ar.recv().await {
285326
match ev {
@@ -288,6 +329,9 @@ pub(crate) mod test {
288329
Event::Config(cfg) => log::info!("config {cfg}"),
289330
Event::Discovered(peers) => {
290331
log::info!("found peers: {peers:?}");
332+
}
333+
Event::Ready(peers) => {
334+
log::info!("ready peers: {peers:?}");
291335
break;
292336
}
293337
}
@@ -297,6 +341,7 @@ pub(crate) mod test {
297341

298342
a_s.send(Command::Broadcast(msg.clone())).unwrap();
299343

344+
log::info!("Sent volatile");
300345
for r in [br, cr].iter_mut() {
301346
while let Some(ev) = r.recv().await {
302347
match ev {
@@ -311,6 +356,9 @@ pub(crate) mod test {
311356
Event::Discovered(peers) => {
312357
log::info!("found peers: {peers:?}");
313358
}
359+
Event::Ready(peers) => {
360+
log::info!("peers ready: {peers:?}");
361+
}
314362
}
315363
}
316364
}

0 commit comments

Comments
 (0)