Skip to content

Commit dbcb8d7

Browse files
committed
pool: move some methods from InnerRelayPool to RelayPool
In continuation of 3d4baee and fcfdb38 Signed-off-by: Yuki Kishimoto <[email protected]>
1 parent c492be2 commit dbcb8d7

File tree

2 files changed

+123
-148
lines changed

2 files changed

+123
-148
lines changed

crates/nostr-relay-pool/src/pool/inner.rs

Lines changed: 4 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,7 @@ use nostr_database::prelude::*;
1414
use tokio::sync::{broadcast, RwLock};
1515

1616
use super::options::RelayPoolOptions;
17-
use super::{Error, RelayPoolBuilder, RelayPoolNotification};
18-
use crate::relay::flags::RelayServiceFlags;
19-
use crate::relay::options::RelayOptions;
17+
use super::{RelayPoolBuilder, RelayPoolNotification};
2018
use crate::relay::Relay;
2119
use crate::shared::SharedState;
2220

@@ -27,7 +25,7 @@ pub(super) type Relays = HashMap<RelayUrl, Relay>;
2725
#[derive(Debug)]
2826
pub(super) struct AtomicPrivateData {
2927
pub(super) relays: RwLock<Relays>,
30-
subscriptions: RwLock<HashMap<SubscriptionId, Filter>>,
28+
pub(super) subscriptions: RwLock<HashMap<SubscriptionId, Filter>>,
3129
shutdown: AtomicBool,
3230
}
3331

@@ -36,7 +34,7 @@ pub struct InnerRelayPool {
3634
pub(super) state: SharedState,
3735
pub(super) atomic: Arc<AtomicPrivateData>,
3836
pub(super) notification_sender: broadcast::Sender<RelayPoolNotification>, // TODO: move to shared state?
39-
opts: RelayPoolOptions,
37+
pub(super) opts: RelayPoolOptions,
4038
}
4139

4240
impl AtomicDestroyer for InnerRelayPool {
@@ -73,6 +71,7 @@ impl InnerRelayPool {
7371
}
7472

7573
pub async fn shutdown(&self) {
74+
// TODO: use atomic swap?
7675
if self.is_shutdown() {
7776
return;
7877
}
@@ -89,118 +88,6 @@ impl InnerRelayPool {
8988
self.atomic.shutdown.store(true, Ordering::SeqCst);
9089
}
9190

92-
pub async fn subscriptions(&self) -> HashMap<SubscriptionId, Filter> {
93-
self.atomic.subscriptions.read().await.clone()
94-
}
95-
96-
pub async fn subscription(&self, id: &SubscriptionId) -> Option<Filter> {
97-
let subscriptions = self.atomic.subscriptions.read().await;
98-
subscriptions.get(id).cloned()
99-
}
100-
101-
pub async fn save_subscription(&self, id: SubscriptionId, filter: Filter) {
102-
let mut subscriptions = self.atomic.subscriptions.write().await;
103-
let current: &mut Filter = subscriptions.entry(id).or_default();
104-
*current = filter;
105-
}
106-
107-
pub(crate) async fn remove_subscription(&self, id: &SubscriptionId) {
108-
let mut subscriptions = self.atomic.subscriptions.write().await;
109-
subscriptions.remove(id);
110-
}
111-
112-
pub(crate) async fn remove_all_subscriptions(&self) {
113-
let mut subscriptions = self.atomic.subscriptions.write().await;
114-
subscriptions.clear();
115-
}
116-
117-
pub async fn add_relay<U>(&self, url: U, opts: RelayOptions) -> Result<bool, Error>
118-
where
119-
U: TryIntoUrl,
120-
Error: From<<U as TryIntoUrl>::Err>,
121-
{
122-
// Convert into url
123-
let url: RelayUrl = url.try_into_url()?;
124-
125-
// Check if the pool has been shutdown
126-
if self.is_shutdown() {
127-
return Err(Error::Shutdown);
128-
}
129-
130-
// Get relays
131-
let mut relays = self.atomic.relays.write().await;
132-
133-
// Check if map already contains url
134-
if relays.contains_key(&url) {
135-
return Ok(false);
136-
}
137-
138-
// Check number fo relays and limit
139-
if let Some(max) = self.opts.max_relays {
140-
if relays.len() >= max {
141-
return Err(Error::TooManyRelays { limit: max });
142-
}
143-
}
144-
145-
// Compose new relay
146-
let mut relay: Relay = Relay::new(url, self.state.clone(), opts);
147-
148-
// Set notification sender
149-
relay
150-
.inner
151-
.set_notification_sender(self.notification_sender.clone());
152-
153-
// If relay has `READ` flag, inherit pool subscriptions
154-
if relay.flags().has_read() {
155-
let subscriptions = self.subscriptions().await;
156-
for (id, filters) in subscriptions.into_iter() {
157-
relay.inner.update_subscription(id, filters, false).await;
158-
}
159-
}
160-
161-
// Insert relay into map
162-
relays.insert(relay.url().clone(), relay);
163-
164-
Ok(true)
165-
}
166-
167-
pub async fn remove_relay<U>(&self, url: U, force: bool) -> Result<(), Error>
168-
where
169-
U: TryIntoUrl,
170-
Error: From<<U as TryIntoUrl>::Err>,
171-
{
172-
// Convert into url
173-
let url: RelayUrl = url.try_into_url()?;
174-
175-
// Acquire write lock
176-
let mut relays = self.atomic.relays.write().await;
177-
178-
// Remove relay
179-
let relay: Relay = relays.remove(&url).ok_or(Error::RelayNotFound)?;
180-
181-
// If NOT force, check if it has `GOSSIP` flag
182-
if !force {
183-
// If can't be removed, re-insert it.
184-
if !can_remove_relay(&relay) {
185-
relays.insert(url, relay);
186-
return Ok(());
187-
}
188-
}
189-
190-
// Disconnect
191-
relay.disconnect();
192-
193-
Ok(())
194-
}
195-
196-
pub async fn remove_all_relays(&self) {
197-
// Acquire write lock
198-
let mut relays = self.atomic.relays.write().await;
199-
200-
// Retains all relays that can't be removed
201-
relays.retain(|_, r| !can_remove_relay(r));
202-
}
203-
20491
pub async fn force_remove_all_relays(&self) {
20592
// Acquire write lock
20693
let mut relays = self.atomic.relays.write().await;
@@ -214,23 +101,3 @@ impl InnerRelayPool {
214101
relays.clear();
215102
}
216103
}
217-
218-
/// Return `true` if the relay can be removed
219-
///
220-
/// If it CAN'T be removed,
221-
/// the flags are automatically updated (remove `READ`, `WRITE` and `DISCOVERY` flags).
222-
fn can_remove_relay(relay: &Relay) -> bool {
223-
let flags = relay.flags();
224-
if flags.has_any(RelayServiceFlags::GOSSIP) {
225-
// Remove READ, WRITE and DISCOVERY flags
226-
flags.remove(
227-
RelayServiceFlags::READ | RelayServiceFlags::WRITE | RelayServiceFlags::DISCOVERY,
228-
);
229-
230-
// Relay has `GOSSIP` flag so it can't be removed.
231-
return false;
232-
}
233-
234-
// Relay can be removed
235-
true
236-
}

crates/nostr-relay-pool/src/pool/mod.rs

Lines changed: 119 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,49 @@ impl RelayPool {
253253
U: TryIntoUrl,
254254
Error: From<<U as TryIntoUrl>::Err>,
255255
{
256-
self.inner.add_relay(url, opts).await
256+
// Convert into url
257+
let url: RelayUrl = url.try_into_url()?;
258+
259+
// Check if the pool has been shutdown
260+
if self.inner.is_shutdown() {
261+
return Err(Error::Shutdown);
262+
}
263+
264+
// Get relays
265+
let mut relays = self.inner.atomic.relays.write().await;
266+
267+
// Check if map already contains url
268+
if relays.contains_key(&url) {
269+
return Ok(false);
270+
}
271+
272+
// Check number fo relays and limit
273+
if let Some(max) = self.inner.opts.max_relays {
274+
if relays.len() >= max {
275+
return Err(Error::TooManyRelays { limit: max });
276+
}
277+
}
278+
279+
// Compose new relay
280+
let mut relay: Relay = Relay::new(url, self.inner.state.clone(), opts);
281+
282+
// Set notification sender
283+
relay
284+
.inner
285+
.set_notification_sender(self.inner.notification_sender.clone());
286+
287+
// If relay has `READ` flag, inherit pool subscriptions
288+
if relay.flags().has_read() {
289+
let subscriptions = self.subscriptions().await;
290+
for (id, filters) in subscriptions.into_iter() {
291+
relay.inner.update_subscription(id, filters, false).await;
292+
}
293+
}
294+
295+
// Insert relay into map
296+
relays.insert(relay.url().clone(), relay);
297+
298+
Ok(true)
257299
}
258300

259301
// Private API
@@ -270,12 +312,41 @@ impl RelayPool {
270312
match self.relay(&url).await {
271313
Ok(relay) => Ok(Some(relay)),
272314
Err(..) => {
273-
self.inner.add_relay(url, opts).await?;
315+
self.add_relay(url, opts).await?;
274316
Ok(None)
275317
}
276318
}
277319
}
278320

321+
async fn _remove_relay<U>(&self, url: U, force: bool) -> Result<(), Error>
322+
where
323+
U: TryIntoUrl,
324+
Error: From<<U as TryIntoUrl>::Err>,
325+
{
326+
// Convert into url
327+
let url: RelayUrl = url.try_into_url()?;
328+
329+
// Acquire write lock
330+
let mut relays = self.inner.atomic.relays.write().await;
331+
332+
// Remove relay
333+
let relay: Relay = relays.remove(&url).ok_or(Error::RelayNotFound)?;
334+
335+
// If NOT force, check if it has `GOSSIP` flag
336+
if !force {
337+
// If can't be removed, re-insert it.
338+
if !can_remove_relay(&relay) {
339+
relays.insert(url, relay);
340+
return Ok(());
341+
}
342+
}
343+
344+
// Disconnect
345+
relay.disconnect();
346+
347+
Ok(())
348+
}
349+
279350
/// Remove and disconnect relay
280351
///
281352
/// If the relay has [`RelayServiceFlags::GOSSIP`], it will not be removed from the pool and its
@@ -289,7 +360,7 @@ impl RelayPool {
289360
U: TryIntoUrl,
290361
Error: From<<U as TryIntoUrl>::Err>,
291362
{
292-
self.inner.remove_relay(url, false).await
363+
self._remove_relay(url, false).await
293364
}
294365

295366
/// Force remove and disconnect relay
@@ -301,7 +372,7 @@ impl RelayPool {
301372
U: TryIntoUrl,
302373
Error: From<<U as TryIntoUrl>::Err>,
303374
{
304-
self.inner.remove_relay(url, true).await
375+
self._remove_relay(url, true).await
305376
}
306377

307378
/// Disconnect and remove all relays
@@ -310,7 +381,11 @@ impl RelayPool {
310381
/// Use [`RelayPool::force_remove_all_relays`] to remove every relay.
311382
#[inline]
312383
pub async fn remove_all_relays(&self) {
313-
self.inner.remove_all_relays().await
384+
// Acquire write lock
385+
let mut relays = self.inner.atomic.relays.write().await;
386+
387+
// Retains all relays that can't be removed
388+
relays.retain(|_, r| !can_remove_relay(r));
314389
}
315390

316391
/// Disconnect and force remove all relays
@@ -486,21 +561,34 @@ impl RelayPool {
486561
/// Get subscriptions
487562
#[inline]
488563
pub async fn subscriptions(&self) -> HashMap<SubscriptionId, Filter> {
489-
self.inner.subscriptions().await
564+
self.inner.atomic.subscriptions.read().await.clone()
490565
}
491566

492-
/// Get subscription
567+
/// Get a subscription
493568
#[inline]
494569
pub async fn subscription(&self, id: &SubscriptionId) -> Option<Filter> {
495-
self.inner.subscription(id).await
570+
let subscriptions = self.inner.atomic.subscriptions.read().await;
571+
subscriptions.get(id).cloned()
496572
}
497573

498574
/// Register subscription in the [RelayPool]
499575
///
500576
/// When a new relay will be added, saved subscriptions will be automatically used for it.
501577
#[inline]
502578
pub async fn save_subscription(&self, id: SubscriptionId, filter: Filter) {
503-
self.inner.save_subscription(id, filter).await
579+
let mut subscriptions = self.inner.atomic.subscriptions.write().await;
580+
let current: &mut Filter = subscriptions.entry(id).or_default();
581+
*current = filter;
582+
}
583+
584+
async fn remove_subscription(&self, id: &SubscriptionId) {
585+
let mut subscriptions = self.inner.atomic.subscriptions.write().await;
586+
subscriptions.remove(id);
587+
}
588+
589+
async fn remove_all_subscriptions(&self) {
590+
let mut subscriptions = self.inner.atomic.subscriptions.write().await;
591+
subscriptions.clear();
504592
}
505593

506594
/// Send a client message to specific relays
@@ -833,7 +921,7 @@ impl RelayPool {
833921
/// Unsubscribe from subscription
834922
pub async fn unsubscribe(&self, id: &SubscriptionId) {
835923
// Remove subscription from pool
836-
self.inner.remove_subscription(id).await;
924+
self.remove_subscription(id).await;
837925

838926
// Lock with read shared access
839927
let relays = self.inner.atomic.relays.read().await;
@@ -851,7 +939,7 @@ impl RelayPool {
851939
/// Unsubscribe from all subscriptions
852940
pub async fn unsubscribe_all(&self) {
853941
// Remove subscriptions from pool
854-
self.inner.remove_all_subscriptions().await;
942+
self.remove_all_subscriptions().await;
855943

856944
// Lock with read shared access
857945
let relays = self.inner.atomic.relays.read().await;
@@ -1165,6 +1253,26 @@ impl RelayPool {
11651253
}
11661254
}
11671255

1256+
/// Return `true` if the relay can be removed
1257+
///
1258+
/// If it CAN'T be removed,
1259+
/// the flags are automatically updated (remove `READ`, `WRITE` and `DISCOVERY` flags).
1260+
fn can_remove_relay(relay: &Relay) -> bool {
1261+
let flags = relay.flags();
1262+
if flags.has_any(RelayServiceFlags::GOSSIP) {
1263+
// Remove READ, WRITE and DISCOVERY flags
1264+
flags.remove(
1265+
RelayServiceFlags::READ | RelayServiceFlags::WRITE | RelayServiceFlags::DISCOVERY,
1266+
);
1267+
1268+
// Relay has `GOSSIP` flag so it can't be removed.
1269+
return false;
1270+
}
1271+
1272+
// Relay can be removed
1273+
true
1274+
}
1275+
11681276
#[cfg(test)]
11691277
mod tests {
11701278
use nostr_relay_builder::MockRelay;

0 commit comments

Comments
 (0)