Skip to content

Commit b412927

Browse files
feat(iroh)!: remove node events (#2274)
These seem to be mostly unused by consumers, and add complexity. ## Breaking Changes - remove: - `iroh::node::Event` - `iroh::node::Node::subscribe` ## Notes - [x] Still need to figure out how to migrate the tests --------- Co-authored-by: Ruediger Klaehn <[email protected]>
1 parent 8ad6ff1 commit b412927

File tree

5 files changed

+66
-264
lines changed

5 files changed

+66
-264
lines changed

iroh/src/node.rs

Lines changed: 2 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,21 @@
22
//!
33
//! A node is a server that serves various protocols.
44
//!
5-
//! You can monitor what is happening in the node using [`Node::subscribe`].
6-
//!
75
//! To shut down the node, call [`Node::shutdown`].
86
use std::fmt::Debug;
97
use std::net::SocketAddr;
108
use std::path::Path;
119
use std::sync::Arc;
1210

1311
use anyhow::{anyhow, Result};
14-
use futures_lite::{future::Boxed as BoxFuture, FutureExt, StreamExt};
12+
use futures_lite::StreamExt;
1513
use iroh_base::key::PublicKey;
1614
use iroh_blobs::downloader::Downloader;
1715
use iroh_blobs::store::Store as BaoStore;
1816
use iroh_net::util::AbortingJoinHandle;
1917
use iroh_net::{endpoint::LocalEndpointsStream, key::SecretKey, Endpoint};
2018
use quic_rpc::transport::flume::FlumeConnection;
2119
use quic_rpc::RpcClient;
22-
use tokio::sync::{mpsc, RwLock};
2320
use tokio::task::JoinHandle;
2421
use tokio_util::sync::CancellationToken;
2522
use tokio_util::task::LocalPoolHandle;
@@ -35,38 +32,6 @@ mod rpc_status;
3532
pub use self::builder::{Builder, DiscoveryConfig, GcPolicy, StorageConfig};
3633
pub use self::rpc_status::RpcStatus;
3734

38-
type EventCallback = Box<dyn Fn(Event) -> BoxFuture<()> + 'static + Sync + Send>;
39-
40-
#[derive(Default, derive_more::Debug, Clone)]
41-
struct Callbacks(#[debug("..")] Arc<RwLock<Vec<EventCallback>>>);
42-
43-
impl Callbacks {
44-
async fn push(&self, cb: EventCallback) {
45-
self.0.write().await.push(cb);
46-
}
47-
48-
#[allow(dead_code)]
49-
async fn send(&self, event: Event) {
50-
let cbs = self.0.read().await;
51-
for cb in &*cbs {
52-
cb(event.clone()).await;
53-
}
54-
}
55-
}
56-
57-
impl iroh_blobs::provider::EventSender for Callbacks {
58-
fn send(&self, event: iroh_blobs::provider::Event) -> BoxFuture<()> {
59-
let this = self.clone();
60-
async move {
61-
let cbs = this.0.read().await;
62-
for cb in &*cbs {
63-
cb(Event::ByteProvide(event.clone())).await;
64-
}
65-
}
66-
.boxed()
67-
}
68-
}
69-
7035
/// A server which implements the iroh node.
7136
///
7237
/// Clients can connect to this server and requests hashes from it.
@@ -91,9 +56,6 @@ struct NodeInner<D> {
9156
secret_key: SecretKey,
9257
cancel_token: CancellationToken,
9358
controller: FlumeConnection<Response, Request>,
94-
#[debug("callbacks: Sender<Box<dyn Fn(Event)>>")]
95-
cb_sender: mpsc::Sender<Box<dyn Fn(Event) -> BoxFuture<()> + Send + Sync + 'static>>,
96-
callbacks: Callbacks,
9759
#[allow(dead_code)]
9860
gc_task: Option<AbortingJoinHandle<()>>,
9961
#[debug("rt")]
@@ -102,15 +64,6 @@ struct NodeInner<D> {
10264
downloader: Downloader,
10365
}
10466

105-
/// Events emitted by the [`Node`] informing about the current status.
106-
#[derive(Debug, Clone)]
107-
pub enum Event {
108-
/// Events from the iroh-blobs transfer protocol.
109-
ByteProvide(iroh_blobs::provider::Event),
110-
/// Events from database
111-
Db(iroh_blobs::store::Event),
112-
}
113-
11467
/// In memory node.
11568
pub type MemNode = Node<iroh_blobs::store::mem::Store>;
11669

@@ -177,18 +130,6 @@ impl<D: BaoStore> Node<D> {
177130
self.inner.secret_key.public()
178131
}
179132

180-
/// Subscribe to [`Event`]s emitted from the node, informing about connections and
181-
/// progress.
182-
///
183-
/// Warning: The callback must complete quickly, as otherwise it will block ongoing work.
184-
pub async fn subscribe<F: Fn(Event) -> BoxFuture<()> + Send + Sync + 'static>(
185-
&self,
186-
cb: F,
187-
) -> Result<()> {
188-
self.inner.cb_sender.send(Box::new(cb)).await?;
189-
Ok(())
190-
}
191-
192133
/// Returns a handle that can be used to do RPC calls to the node internally.
193134
pub fn controller(&self) -> crate::client::MemRpcClient {
194135
RpcClient::new(self.inner.controller.clone())
@@ -319,23 +260,7 @@ mod tests {
319260

320261
let _drop_guard = node.cancel_token().drop_guard();
321262

322-
let (r, mut s) = mpsc::channel(1);
323-
node.subscribe(move |event| {
324-
let r = r.clone();
325-
async move {
326-
if let Event::ByteProvide(iroh_blobs::provider::Event::TaggedBlobAdded {
327-
hash,
328-
..
329-
}) = event
330-
{
331-
r.send(hash).await.ok();
332-
}
333-
}
334-
.boxed()
335-
})
336-
.await?;
337-
338-
let got_hash = tokio::time::timeout(Duration::from_secs(1), async move {
263+
let _got_hash = tokio::time::timeout(Duration::from_secs(1), async move {
339264
let mut stream = node
340265
.controller()
341266
.server_streaming(BlobAddPathRequest {
@@ -364,9 +289,6 @@ mod tests {
364289
.context("timeout")?
365290
.context("get failed")?;
366291

367-
let event_hash = s.recv().await.expect("missing add tagged blob event");
368-
assert_eq!(got_hash, event_hash);
369-
370292
Ok(())
371293
}
372294

iroh/src/node/builder.rs

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,18 @@ use quic_rpc::{
2727
RpcServer, ServiceEndpoint,
2828
};
2929
use serde::{Deserialize, Serialize};
30-
use tokio::sync::mpsc;
3130
use tokio_util::{sync::CancellationToken, task::LocalPoolHandle};
3231
use tracing::{debug, error, error_span, info, trace, warn, Instrument};
3332

3433
use crate::{
3534
client::RPC_ALPN,
3635
docs_engine::Engine,
37-
node::{Event, NodeInner},
36+
node::NodeInner,
3837
rpc_protocol::{Request, Response, RpcService},
3938
util::{fs::load_secret_key, path::IrohPaths},
4039
};
4140

42-
use super::{rpc, rpc_status::RpcStatus, Callbacks, EventCallback, Node};
41+
use super::{rpc, rpc_status::RpcStatus, Node};
4342

4443
pub const PROTOCOLS: [&[u8]; 3] = [iroh_blobs::protocol::ALPN, GOSSIP_ALPN, DOCS_ALPN];
4544

@@ -69,7 +68,7 @@ const MAX_STREAMS: u64 = 10;
6968
///
7069
/// The returned [`Node`] is awaitable to know when it finishes. It can be terminated
7170
/// using [`Node::shutdown`].
72-
#[derive(Debug)]
71+
#[derive(derive_more::Debug)]
7372
pub struct Builder<D, E = DummyServerEndpoint>
7473
where
7574
D: Map,
@@ -88,6 +87,9 @@ where
8887
docs_store: iroh_docs::store::fs::Store,
8988
#[cfg(any(test, feature = "test-utils"))]
9089
insecure_skip_relay_cert_verify: bool,
90+
/// Callback to register when a gc loop is done
91+
#[debug("callback")]
92+
gc_done_callback: Option<Box<dyn Fn() + Send>>,
9193
}
9294

9395
/// Configuration for storage.
@@ -135,6 +137,7 @@ impl Default for Builder<iroh_blobs::store::mem::Store> {
135137
node_discovery: Default::default(),
136138
#[cfg(any(test, feature = "test-utils"))]
137139
insecure_skip_relay_cert_verify: false,
140+
gc_done_callback: None,
138141
}
139142
}
140143
}
@@ -160,6 +163,7 @@ impl<D: Map> Builder<D> {
160163
node_discovery: Default::default(),
161164
#[cfg(any(test, feature = "test-utils"))]
162165
insecure_skip_relay_cert_verify: false,
166+
gc_done_callback: None,
163167
}
164168
}
165169
}
@@ -222,6 +226,7 @@ where
222226
node_discovery: self.node_discovery,
223227
#[cfg(any(test, feature = "test-utils"))]
224228
insecure_skip_relay_cert_verify: false,
229+
gc_done_callback: self.gc_done_callback,
225230
})
226231
}
227232

@@ -242,6 +247,7 @@ where
242247
node_discovery: self.node_discovery,
243248
#[cfg(any(test, feature = "test-utils"))]
244249
insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify,
250+
gc_done_callback: self.gc_done_callback,
245251
}
246252
}
247253

@@ -267,6 +273,7 @@ where
267273
node_discovery: self.node_discovery,
268274
#[cfg(any(test, feature = "test-utils"))]
269275
insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify,
276+
gc_done_callback: self.gc_done_callback,
270277
})
271278
}
272279

@@ -337,6 +344,13 @@ where
337344
self
338345
}
339346

347+
/// Register a callback for when GC is done.
348+
#[cfg(any(test, feature = "test-utils"))]
349+
pub fn register_gc_done_cb(mut self, cb: Box<dyn Fn() + Send>) -> Self {
350+
self.gc_done_callback.replace(cb);
351+
self
352+
}
353+
340354
/// Whether to log the SSL pre-master key.
341355
///
342356
/// If `true` and the `SSLKEYLOGFILE` environment variable is the path to a file this
@@ -352,7 +366,7 @@ where
352366
/// This will create the underlying network server and spawn a tokio task accepting
353367
/// connections. The returned [`Node`] can be used to control the task as well as
354368
/// get information about it.
355-
pub async fn spawn(self) -> Result<Node<D>> {
369+
pub async fn spawn(mut self) -> Result<Node<D>> {
356370
trace!("spawning node");
357371
let lp = LocalPoolHandle::new(num_cpus::get());
358372

@@ -406,7 +420,6 @@ where
406420
let endpoint = endpoint.bind(bind_port).await?;
407421
trace!("created quinn endpoint");
408422

409-
let (cb_sender, cb_receiver) = mpsc::channel(8);
410423
let cancel_token = CancellationToken::new();
411424

412425
debug!("rpc listening on: {:?}", self.rpc_endpoint.local_addr());
@@ -427,12 +440,13 @@ where
427440
);
428441
let sync_db = sync.sync.clone();
429442

430-
let callbacks = Callbacks::default();
431443
let gc_task = if let GcPolicy::Interval(gc_period) = self.gc_policy {
432444
tracing::info!("Starting GC task with interval {:?}", gc_period);
433445
let db = self.blobs_store.clone();
434-
let callbacks = callbacks.clone();
435-
let task = lp.spawn_pinned(move || Self::gc_loop(db, sync_db, gc_period, callbacks));
446+
let gc_done_callback = self.gc_done_callback.take();
447+
448+
let task =
449+
lp.spawn_pinned(move || Self::gc_loop(db, sync_db, gc_period, gc_done_callback));
436450
Some(task.into())
437451
} else {
438452
None
@@ -446,8 +460,6 @@ where
446460
secret_key: self.secret_key,
447461
controller,
448462
cancel_token,
449-
callbacks: callbacks.clone(),
450-
cb_sender,
451463
gc_task,
452464
rt: lp.clone(),
453465
sync,
@@ -464,8 +476,6 @@ where
464476
async move {
465477
Self::run(
466478
ep,
467-
callbacks,
468-
cb_receiver,
469479
handler,
470480
self.rpc_endpoint,
471481
internal_rpc,
@@ -508,8 +518,6 @@ where
508518
#[allow(clippy::too_many_arguments)]
509519
async fn run(
510520
server: Endpoint,
511-
callbacks: Callbacks,
512-
mut cb_receiver: mpsc::Receiver<EventCallback>,
513521
handler: rpc::Handler<D>,
514522
rpc: E,
515523
internal_rpc: impl ServiceEndpoint<RpcService>,
@@ -586,10 +594,6 @@ where
586594
}
587595
});
588596
},
589-
// Handle new callbacks
590-
Some(cb) = cb_receiver.recv() => {
591-
callbacks.push(cb).await;
592-
}
593597
else => break,
594598
}
595599
}
@@ -609,7 +613,7 @@ where
609613
db: D,
610614
ds: iroh_docs::actor::SyncHandle,
611615
gc_period: Duration,
612-
callbacks: Callbacks,
616+
done_cb: Option<Box<dyn Fn() + Send>>,
613617
) {
614618
let mut live = BTreeSet::new();
615619
tracing::debug!("GC loop starting {:?}", gc_period);
@@ -623,14 +627,11 @@ where
623627
// do delay before the two phases of GC
624628
tokio::time::sleep(gc_period).await;
625629
tracing::debug!("Starting GC");
626-
callbacks
627-
.send(Event::Db(iroh_blobs::store::Event::GcStarted))
628-
.await;
629630
live.clear();
630631
let doc_hashes = match ds.content_hashes().await {
631632
Ok(hashes) => hashes,
632633
Err(err) => {
633-
tracing::error!("Error getting doc hashes: {}", err);
634+
tracing::warn!("Error getting doc hashes: {}", err);
634635
continue 'outer;
635636
}
636637
};
@@ -680,9 +681,9 @@ where
680681
}
681682
}
682683
}
683-
callbacks
684-
.send(Event::Db(iroh_blobs::store::Event::GcCompleted))
685-
.await;
684+
if let Some(ref cb) = done_cb {
685+
cb();
686+
}
686687
}
687688
}
688689
}
@@ -719,7 +720,7 @@ async fn handle_connection<D: BaoStore>(
719720
iroh_blobs::provider::handle_connection(
720721
connection,
721722
node.db.clone(),
722-
node.callbacks.clone(),
723+
MockEventSender,
723724
node.rt.clone(),
724725
)
725726
.await
@@ -776,3 +777,12 @@ fn make_rpc_endpoint(
776777

777778
Ok((rpc_endpoint, actual_rpc_port))
778779
}
780+
781+
#[derive(Debug, Clone)]
782+
struct MockEventSender;
783+
784+
impl iroh_blobs::provider::EventSender for MockEventSender {
785+
fn send(&self, _event: iroh_blobs::provider::Event) -> futures_lite::future::Boxed<()> {
786+
Box::pin(std::future::ready(()))
787+
}
788+
}

iroh/src/node/rpc.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use crate::rpc_protocol::{
5252
NodeWatchResponse, Request, RpcService, SetTagOption,
5353
};
5454

55-
use super::{Event, NodeInner};
55+
use super::NodeInner;
5656

5757
const HEALTH_POLL_WAIT: Duration = Duration::from_secs(1);
5858
/// Chunk size for getting blobs over RPC
@@ -761,13 +761,6 @@ impl<D: BaoStore> Handler<D> {
761761
tag: tag.clone(),
762762
})
763763
.await?;
764-
self.inner
765-
.callbacks
766-
.send(Event::ByteProvide(
767-
iroh_blobs::provider::Event::TaggedBlobAdded { hash, format, tag },
768-
))
769-
.await;
770-
771764
Ok(())
772765
}
773766

0 commit comments

Comments
 (0)