Skip to content

Commit 8efe87d

Browse files
committed
pool: add relay monitor
Introduce the `Monitor` struct to track relay status changes via channel notifications. Closes #850 Pull-Request: #851 Signed-off-by: Yuki Kishimoto <[email protected]>
1 parent 7900a0b commit 8efe87d

File tree

13 files changed

+145
-0
lines changed

13 files changed

+145
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
- mls-memory-storage: add an in-memory implementation for MLS ([JeffG] at https://github.com/rust-nostr/nostr/pull/839)
4545
- mls-sqlite-storage: a sqlite implementation for MLS ([JeffG] at https://github.com/rust-nostr/nostr/pull/842)
4646
- mls: add new crate for implementing MLS messaging ([JeffG] at https://github.com/rust-nostr/nostr/pull/843)
47+
- pool: add relay monitor ([Yuki Kishimoto] at https://github.com/rust-nostr/nostr/pull/851)
4748
- sdk: add `Options::pool` ([Yuki Kishimoto])
4849

4950
### Deprecated

crates/nostr-relay-pool/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ extern crate test;
1818

1919
pub use async_wsocket::ConnectionMode;
2020

21+
pub mod monitor;
2122
pub mod policy;
2223
pub mod pool;
2324
pub mod prelude;
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright (c) 2022-2023 Yuki Kishimoto
2+
// Copyright (c) 2023-2025 Rust Nostr Developers
3+
// Distributed under the MIT software license
4+
5+
//! Monitor
6+
7+
use nostr::RelayUrl;
8+
use tokio::sync::broadcast::{self, Receiver, Sender};
9+
10+
use crate::relay::RelayStatus;
11+
12+
/// Relay monitor notification
13+
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
14+
pub enum MonitorNotification {
15+
/// Relay status changed
16+
StatusChanged {
17+
/// Relay URL
18+
relay_url: RelayUrl,
19+
/// Status
20+
status: RelayStatus,
21+
},
22+
}
23+
24+
/// Relay monitor
25+
#[derive(Debug, Clone)]
26+
pub struct Monitor {
27+
channel: Sender<MonitorNotification>,
28+
}
29+
30+
impl Monitor {
31+
/// Create a new monitor with the given channel size
32+
///
33+
/// # Panics
34+
///
35+
/// This will panic if the channel size is equal to `0` or larger than `usize::MAX / 2`;
36+
pub fn new(channel_size: usize) -> Self {
37+
let (tx, ..) = broadcast::channel(channel_size);
38+
39+
Self { channel: tx }
40+
}
41+
42+
/// Subscribe to monitor notifications
43+
///
44+
/// <div class="warning">When you call this method, you subscribe to the notifications channel from that precise moment. Anything received by relay/s before that moment is not included in the channel!</div>
45+
#[inline]
46+
pub fn subscribe(&self) -> Receiver<MonitorNotification> {
47+
self.channel.subscribe()
48+
}
49+
50+
#[inline]
51+
fn notify(&self, notification: MonitorNotification) {
52+
let _ = self.channel.send(notification);
53+
}
54+
55+
#[inline]
56+
pub(crate) fn notify_status_change(&self, relay_url: RelayUrl, status: RelayStatus) {
57+
self.notify(MonitorNotification::StatusChanged { relay_url, status });
58+
}
59+
}

crates/nostr-relay-pool/src/pool/builder.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use nostr_database::{MemoryDatabase, NostrDatabase};
1111

1212
use super::options::RelayPoolOptions;
1313
use super::RelayPool;
14+
use crate::monitor::Monitor;
1415
use crate::policy::AdmitPolicy;
1516
use crate::transport::websocket::{DefaultWebsocketTransport, WebSocketTransport};
1617

@@ -21,6 +22,8 @@ pub struct RelayPoolBuilder {
2122
pub websocket_transport: Arc<dyn WebSocketTransport>,
2223
/// Admission policy
2324
pub admit_policy: Option<Arc<dyn AdmitPolicy>>,
25+
/// Relay monitor
26+
pub monitor: Option<Monitor>,
2427
/// Relay pool options
2528
pub opts: RelayPoolOptions,
2629
// Private stuff
@@ -35,6 +38,7 @@ impl Default for RelayPoolBuilder {
3538
Self {
3639
websocket_transport: Arc::new(DefaultWebsocketTransport),
3740
admit_policy: None,
41+
monitor: None,
3842
opts: RelayPoolOptions::default(),
3943
__database: Arc::new(MemoryDatabase::default()),
4044
__signer: None,
@@ -69,6 +73,13 @@ impl RelayPoolBuilder {
6973
self
7074
}
7175

76+
/// Set monitor
77+
#[inline]
78+
pub fn monitor(mut self, monitor: Monitor) -> Self {
79+
self.monitor = Some(monitor);
80+
self
81+
}
82+
7283
/// Set options
7384
#[inline]
7485
pub fn opts(mut self, opts: RelayPoolOptions) -> Self {

crates/nostr-relay-pool/src/pool/inner.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ impl InnerRelayPool {
5555
builder.__signer,
5656
builder.admit_policy,
5757
builder.opts.nip42_auto_authentication,
58+
builder.monitor,
5859
),
5960
atomic: Arc::new(AtomicPrivateData {
6061
relays: RwLock::new(HashMap::new()),

crates/nostr-relay-pool/src/pool/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ pub use self::error::Error;
2828
use self::inner::{InnerRelayPool, Relays};
2929
pub use self::options::RelayPoolOptions;
3030
pub use self::output::Output;
31+
use crate::monitor::Monitor;
3132
use crate::relay::flags::FlagCheck;
3233
use crate::relay::options::{RelayOptions, ReqExitPolicy, SyncOptions};
3334
use crate::relay::Relay;
@@ -139,6 +140,13 @@ impl RelayPool {
139140
self.inner.notification_sender.subscribe()
140141
}
141142

143+
/// Returns the reference to the monitor, if any.
144+
///
145+
/// Returns `None` if the monitor is not configured (see [`RelayPoolBuilder::monitor`] ).
146+
pub fn monitor(&self) -> Option<&Monitor> {
147+
self.inner.state.monitor.as_ref()
148+
}
149+
142150
/// Get shared state
143151
#[inline]
144152
pub fn state(&self) -> &SharedState {

crates/nostr-relay-pool/src/prelude.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ pub use nostr::prelude::*;
1414
pub use nostr_database::*;
1515

1616
// Internal modules
17+
pub use crate::monitor::{self, *};
1718
pub use crate::policy::*;
1819
pub use crate::pool::builder::*;
1920
pub use crate::pool::constants::*;

crates/nostr-relay-pool/src/relay/inner.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,11 @@ impl InnerRelay {
224224

225225
// Send notification
226226
self.send_notification(RelayNotification::RelayStatus { status }, false);
227+
228+
// If monitor is enabled, notify status change.
229+
if let Some(monitor) = &self.state.monitor {
230+
monitor.notify_status_change(self.url.clone(), status);
231+
}
227232
}
228233

229234
/// Perform health checks

crates/nostr-relay-pool/src/shared.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use nostr::{EventId, NostrSigner};
1515
use nostr_database::{IntoNostrDatabase, MemoryDatabase, NostrDatabase};
1616
use tokio::sync::RwLock;
1717

18+
use crate::monitor::Monitor;
1819
use crate::policy::AdmitPolicy;
1920
use crate::transport::websocket::{DefaultWebsocketTransport, WebSocketTransport};
2021

@@ -47,6 +48,7 @@ pub struct SharedState {
4748
nip42_auto_authentication: Arc<AtomicBool>,
4849
verification_cache: Arc<Mutex<LruCache<u64, ()>>>,
4950
pub(crate) admit_policy: Option<Arc<dyn AdmitPolicy>>,
51+
pub(crate) monitor: Option<Monitor>,
5052
}
5153

5254
impl Default for SharedState {
@@ -57,6 +59,7 @@ impl Default for SharedState {
5759
None,
5860
None,
5961
true,
62+
None,
6063
)
6164
}
6265
}
@@ -68,6 +71,7 @@ impl SharedState {
6871
signer: Option<Arc<dyn NostrSigner>>,
6972
admit_policy: Option<Arc<dyn AdmitPolicy>>,
7073
nip42_auto_authentication: bool,
74+
monitor: Option<Monitor>,
7175
) -> Self {
7276
let max_verification_cache_size: NonZeroUsize =
7377
NonZeroUsize::new(MAX_VERIFICATION_CACHE_SIZE)
@@ -80,6 +84,7 @@ impl SharedState {
8084
nip42_auto_authentication: Arc::new(AtomicBool::new(nip42_auto_authentication)),
8185
verification_cache: Arc::new(Mutex::new(LruCache::new(max_verification_cache_size))),
8286
admit_policy,
87+
monitor,
8388
}
8489
}
8590

crates/nostr-sdk/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ name = "fetch-events"
7676
name = "gossip"
7777
required-features = ["all-nips"]
7878

79+
[[example]]
80+
name = "monitor"
81+
7982
[[example]]
8083
name = "nostr-connect"
8184
required-features = ["nip59"]

0 commit comments

Comments
 (0)