Skip to content

Commit 6e9cb59

Browse files
committed
pool: add RelayPoolBuilder
- Add `RelayPool::builder` constructor - Change `RelayPool::new` constructor signature - Remove `set_admit_policy` method from `Relay` and `RelayPool` - Remove `RelayPool::__with_shared_state` Signed-off-by: Yuki Kishimoto <[email protected]>
1 parent 9d755c6 commit 6e9cb59

File tree

11 files changed

+141
-96
lines changed

11 files changed

+141
-96
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
* nostr: change `EventBuilder::delete` arguments ([Yuki Kishimoto])
5050
* pool: drop `RelayFiltering` ([Yuki Kishimoto])
5151
* pool: remove `Relay` constructors ([Yuki Kishimoto])
52+
* pool: change `RelayPool::new` signature ([Yuki Kishimoto])
5253
* sdk: change `Client::fetch_metadata` output ([Yuki Kishimoto])
5354

5455
### Changed
@@ -98,6 +99,7 @@
9899
* pool: event verification cache ([Yuki Kishimoto])
99100
* pool: add `AdmitPolicy` trait ([Yuki Kishimoto])
100101
* pool: add `ReqExitPolicy::WaitForEvents` variant ([Yuki Kishimoto])
102+
* pool: add `RelayPoolBuilder` ([Yuki Kishimoto])
101103
* ffi: add Mac Catalyst support in Swift package ([Yuki Kishimoto])
102104
* js: add `KindStandard` enum ([Yuki Kishimoto])
103105

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Copyright (c) 2022-2023 Yuki Kishimoto
2+
// Copyright (c) 2023-2025 Rust Nostr Developers
3+
// Distributed under the MIT software license
4+
5+
//! Relay Pool builder
6+
7+
use std::sync::Arc;
8+
9+
use nostr::NostrSigner;
10+
use nostr_database::{MemoryDatabase, NostrDatabase};
11+
12+
use super::options::RelayPoolOptions;
13+
use super::RelayPool;
14+
use crate::policy::AdmitPolicy;
15+
use crate::transport::websocket::{DefaultWebsocketTransport, WebSocketTransport};
16+
17+
/// Relay Pool builder
18+
#[derive(Debug, Clone)]
19+
pub struct RelayPoolBuilder {
20+
/// WebSocket transport
21+
pub websocket_transport: Arc<dyn WebSocketTransport>,
22+
/// Admission policy
23+
pub admit_policy: Option<Arc<dyn AdmitPolicy>>,
24+
/// Relay pool options
25+
pub opts: RelayPoolOptions,
26+
// Private stuff
27+
#[doc(hidden)]
28+
pub __database: Arc<dyn NostrDatabase>,
29+
#[doc(hidden)]
30+
pub __signer: Option<Arc<dyn NostrSigner>>,
31+
}
32+
33+
impl Default for RelayPoolBuilder {
34+
fn default() -> Self {
35+
Self {
36+
websocket_transport: Arc::new(DefaultWebsocketTransport),
37+
admit_policy: None,
38+
opts: RelayPoolOptions::default(),
39+
__database: Arc::new(MemoryDatabase::default()),
40+
__signer: None,
41+
}
42+
}
43+
}
44+
45+
impl RelayPoolBuilder {
46+
/// New default builder
47+
#[inline]
48+
pub fn new() -> Self {
49+
Self::default()
50+
}
51+
52+
/// Set a WebSocket transport
53+
#[inline]
54+
pub fn websocket_transport<T>(mut self, transport: T) -> Self
55+
where
56+
T: WebSocketTransport + 'static,
57+
{
58+
self.websocket_transport = Arc::new(transport);
59+
self
60+
}
61+
62+
/// Admission policy
63+
#[inline]
64+
pub fn admit_policy<T>(mut self, policy: T) -> Self
65+
where
66+
T: AdmitPolicy + 'static,
67+
{
68+
self.admit_policy = Some(Arc::new(policy));
69+
self
70+
}
71+
72+
/// Set options
73+
#[inline]
74+
pub fn opts(mut self, opts: RelayPoolOptions) -> Self {
75+
self.opts = opts;
76+
self
77+
}
78+
79+
/// Build relay pool
80+
#[inline]
81+
pub fn build(self) -> RelayPool {
82+
RelayPool::from_builder(self)
83+
}
84+
}

crates/nostr-relay-pool/src/pool/inner.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use nostr_database::prelude::*;
1414
use tokio::sync::{broadcast, RwLock};
1515

1616
use super::options::RelayPoolOptions;
17-
use super::{Error, RelayPoolNotification};
17+
use super::{Error, RelayPoolBuilder, RelayPoolNotification};
1818
use crate::relay::flags::RelayServiceFlags;
1919
use crate::relay::options::RelayOptions;
2020
use crate::relay::Relay;
@@ -47,18 +47,24 @@ impl AtomicDestroyer for InnerRelayPool {
4747
}
4848

4949
impl InnerRelayPool {
50-
pub fn new(opts: RelayPoolOptions, state: SharedState) -> Self {
51-
let (notification_sender, _) = broadcast::channel(opts.notification_channel_size);
50+
pub(super) fn from_builder(builder: RelayPoolBuilder) -> Self {
51+
let (notification_sender, _) = broadcast::channel(builder.opts.notification_channel_size);
5252

5353
Self {
54-
state,
54+
state: SharedState::new(
55+
builder.__database,
56+
builder.websocket_transport,
57+
builder.__signer,
58+
builder.admit_policy,
59+
builder.opts.nip42_auto_authentication,
60+
),
5561
atomic: Arc::new(AtomicPrivateData {
5662
relays: RwLock::new(HashMap::new()),
5763
subscriptions: RwLock::new(HashMap::new()),
5864
shutdown: AtomicBool::new(false),
5965
}),
6066
notification_sender,
61-
opts,
67+
opts: builder.opts,
6268
}
6369
}
6470

crates/nostr-relay-pool/src/pool/mod.rs

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,18 @@ use atomic_destructor::{AtomicDestructor, StealthClone};
1515
use nostr_database::prelude::*;
1616
use tokio::sync::{broadcast, mpsc, RwLockReadGuard};
1717

18+
pub mod builder;
1819
pub mod constants;
1920
mod error;
2021
mod inner;
2122
pub mod options;
2223
mod output;
2324

25+
pub use self::builder::RelayPoolBuilder;
2426
pub use self::error::Error;
2527
use self::inner::{InnerRelayPool, Relays};
2628
pub use self::options::RelayPoolOptions;
2729
pub use self::output::Output;
28-
use crate::policy::AdmitPolicy;
2930
use crate::relay::flags::FlagCheck;
3031
use crate::relay::options::{RelayOptions, ReqExitPolicy, SyncOptions};
3132
use crate::relay::Relay;
@@ -79,7 +80,7 @@ pub struct RelayPool {
7980

8081
impl Default for RelayPool {
8182
fn default() -> Self {
82-
Self::new(RelayPoolOptions::default())
83+
Self::builder().build()
8384
}
8485
}
8586

@@ -92,28 +93,25 @@ impl StealthClone for RelayPool {
9293
}
9394

9495
impl RelayPool {
95-
/// Create new `RelayPool`
96+
/// Construct new default relay pool
97+
///
98+
/// Use [`RelayPool::builder`] to customize it.
9699
#[inline]
97-
pub fn new(opts: RelayPoolOptions) -> Self {
98-
Self::__with_shared_state(opts, SharedState::default())
100+
pub fn new() -> Self {
101+
Self::default()
99102
}
100103

104+
/// New relay pool builder
101105
#[inline]
102-
#[doc(hidden)]
103-
pub fn __with_shared_state(opts: RelayPoolOptions, state: SharedState) -> Self {
104-
Self {
105-
inner: AtomicDestructor::new(InnerRelayPool::new(opts, state)),
106-
}
106+
pub fn builder() -> RelayPoolBuilder {
107+
RelayPoolBuilder::default()
107108
}
108109

109-
/// Set an admission policy
110110
#[inline]
111-
pub fn set_admit_policy<T>(&self, policy: T) -> Result<(), Error>
112-
where
113-
T: AdmitPolicy + 'static,
114-
{
115-
self.inner.state.set_admit_policy(policy)?;
116-
Ok(())
111+
fn from_builder(builder: RelayPoolBuilder) -> Self {
112+
Self {
113+
inner: AtomicDestructor::new(InnerRelayPool::from_builder(builder)),
114+
}
117115
}
118116

119117
/// Completely shutdown pool

crates/nostr-relay-pool/src/pool/options.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,15 @@ use super::constants::DEFAULT_NOTIFICATION_CHANNEL_SIZE;
1010
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1111
pub struct RelayPoolOptions {
1212
pub(super) max_relays: Option<usize>,
13+
pub(super) nip42_auto_authentication: bool,
1314
pub(super) notification_channel_size: usize,
1415
}
1516

1617
impl Default for RelayPoolOptions {
1718
fn default() -> Self {
1819
Self {
1920
max_relays: None,
21+
nip42_auto_authentication: true,
2022
notification_channel_size: DEFAULT_NOTIFICATION_CHANNEL_SIZE,
2123
}
2224
}
@@ -36,6 +38,15 @@ impl RelayPoolOptions {
3638
self
3739
}
3840

41+
/// Auto authenticate to relays (default: true)
42+
///
43+
/// <https://github.com/nostr-protocol/nips/blob/master/42.md>
44+
#[inline]
45+
pub fn automatic_authentication(mut self, enabled: bool) -> Self {
46+
self.nip42_auto_authentication = enabled;
47+
self
48+
}
49+
3950
/// Notification channel size (default: [`DEFAULT_NOTIFICATION_CHANNEL_SIZE`])
4051
#[inline]
4152
pub fn notification_channel_size(mut self, size: usize) -> Self {

crates/nostr-relay-pool/src/prelude.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ pub use nostr_database::*;
1515

1616
// Internal modules
1717
pub use crate::policy::*;
18+
pub use crate::pool::builder::*;
1819
pub use crate::pool::constants::*;
1920
pub use crate::pool::options::*;
2021
pub use crate::pool::{self, *};

crates/nostr-relay-pool/src/relay/inner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -999,7 +999,7 @@ impl InnerRelay {
999999
}
10001000

10011001
// Check event admission policy
1002-
if let Some(policy) = self.state.admit_policy.get() {
1002+
if let Some(policy) = &self.state.admit_policy {
10031003
if let AdmitStatus::Rejected = policy
10041004
.admit_event(&self.url, &subscription_id, &event)
10051005
.await?

crates/nostr-relay-pool/src/relay/mod.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ pub use self::options::{
3737
};
3838
pub use self::stats::RelayConnectionStats;
3939
pub use self::status::RelayStatus;
40-
use crate::policy::AdmitPolicy;
4140
use crate::shared::SharedState;
4241
use crate::transport::websocket::{BoxSink, BoxStream};
4342

@@ -157,16 +156,6 @@ impl Relay {
157156
}
158157
}
159158

160-
/// Set an admission policy
161-
#[inline]
162-
pub fn set_admit_policy<T>(&self, policy: T) -> Result<(), Error>
163-
where
164-
T: AdmitPolicy + 'static,
165-
{
166-
self.inner.state.set_admit_policy(policy)?;
167-
Ok(())
168-
}
169-
170159
/// Get relay url
171160
#[inline]
172161
pub fn url(&self) -> &RelayUrl {

crates/nostr-relay-pool/src/shared.rs

Lines changed: 5 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::fmt;
77
use std::hash::{Hash, Hasher};
88
use std::num::NonZeroUsize;
99
use std::sync::atomic::{AtomicBool, Ordering};
10-
use std::sync::{Arc, Mutex, OnceLock};
10+
use std::sync::{Arc, Mutex};
1111

1212
use lru::LruCache;
1313
use nostr::prelude::IntoNostrSigner;
@@ -16,9 +16,7 @@ use nostr_database::{IntoNostrDatabase, MemoryDatabase, NostrDatabase};
1616
use tokio::sync::RwLock;
1717

1818
use crate::policy::AdmitPolicy;
19-
use crate::transport::websocket::{
20-
DefaultWebsocketTransport, IntoWebSocketTransport, WebSocketTransport,
21-
};
19+
use crate::transport::websocket::{DefaultWebsocketTransport, WebSocketTransport};
2220

2321
// LruCache pre-allocate, so keep this at a reasonable value.
2422
// A good value may be <= 128k, considering that stored values are the 64-bit hashes of the event IDs.
@@ -27,8 +25,6 @@ const MAX_VERIFICATION_CACHE_SIZE: usize = 128_000;
2725
#[derive(Debug)]
2826
pub enum SharedStateError {
2927
SignerNotConfigured,
30-
/// Admit policy already set
31-
AdmitPolicyAlreadySet,
3228
MutexPoisoned,
3329
}
3430

@@ -38,29 +34,26 @@ impl fmt::Display for SharedStateError {
3834
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3935
match self {
4036
Self::SignerNotConfigured => write!(f, "signer not configured"),
41-
Self::AdmitPolicyAlreadySet => write!(f, "admission policy already set"),
4237
Self::MutexPoisoned => write!(f, "mutex poisoned"),
4338
}
4439
}
4540
}
4641

47-
// TODO: reduce atomic operations
4842
#[derive(Debug, Clone)]
4943
pub struct SharedState {
5044
pub(crate) database: Arc<dyn NostrDatabase>,
5145
pub(crate) transport: Arc<dyn WebSocketTransport>,
5246
signer: Arc<RwLock<Option<Arc<dyn NostrSigner>>>>,
5347
nip42_auto_authentication: Arc<AtomicBool>,
5448
verification_cache: Arc<Mutex<LruCache<u64, ()>>>,
55-
pub(crate) admit_policy: OnceLock<Arc<dyn AdmitPolicy>>,
56-
// TODO: add a semaphore to limit number of concurrent websocket connections attempts?
49+
pub(crate) admit_policy: Option<Arc<dyn AdmitPolicy>>,
5750
}
5851

5952
impl Default for SharedState {
6053
fn default() -> Self {
6154
Self::new(
6255
MemoryDatabase::new().into_nostr_database(),
63-
DefaultWebsocketTransport.into_transport(),
56+
Arc::new(DefaultWebsocketTransport),
6457
None,
6558
None,
6659
true,
@@ -86,33 +79,10 @@ impl SharedState {
8679
signer: Arc::new(RwLock::new(signer)),
8780
nip42_auto_authentication: Arc::new(AtomicBool::new(nip42_auto_authentication)),
8881
verification_cache: Arc::new(Mutex::new(LruCache::new(max_verification_cache_size))),
89-
admit_policy: match admit_policy {
90-
Some(policy) => OnceLock::from(policy),
91-
None => OnceLock::new(),
92-
},
82+
admit_policy,
9383
}
9484
}
9585

96-
/// Set a custom transport
97-
pub fn custom_transport<T>(mut self, transport: T) -> Self
98-
where
99-
T: IntoWebSocketTransport,
100-
{
101-
self.transport = transport.into_transport();
102-
self
103-
}
104-
105-
/// Set an admission policy
106-
#[inline]
107-
pub(crate) fn set_admit_policy<T>(&self, policy: T) -> Result<(), SharedStateError>
108-
where
109-
T: AdmitPolicy + 'static,
110-
{
111-
self.admit_policy
112-
.set(Arc::new(policy))
113-
.map_err(|_| SharedStateError::AdmitPolicyAlreadySet)
114-
}
115-
11686
/// Check if auto authentication to relays is enabled
11787
#[inline]
11888
pub fn is_auto_authentication_enabled(&self) -> bool {

0 commit comments

Comments
 (0)