Skip to content

Commit 914a92f

Browse files
committed
pool: better handling of auto-closing subscription activity when fetching events
Use a dedicated mpsc channel for sending auto-closing subscription activity to the `Relay::fetch_events_with_callback` method. Signed-off-by: Yuki Kishimoto <[email protected]>
1 parent 05ab131 commit 914a92f

File tree

3 files changed

+274
-95
lines changed

3 files changed

+274
-95
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
* lmdb: avoid long-lived read txn when ingesting event ([Yuki Kishimoto])
6666
* ndb: return `None` in `NostrEventsDatabase::event_by_id` if event doesn't exist ([Yuki Kishimoto])
6767
* ndb: avoid event clone when calling `NostrEventsDatabase::save_event` ([Yuki Kishimoto])
68+
* pool: better handling of auto-closing subscription activity when fetching events ([Yuki Kishimoto])
6869
* sdk: auto-update the gossip data when sending an event ([Yuki Kishimoto])
6970
* sdk: avoid full clone of relays when only urls are needed ([Yuki Kishimoto])
7071
* nwc: allow usage of multiple relays ([Yuki Kishimoto])

crates/nostr-relay-pool/src/relay/inner.rs

Lines changed: 81 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ use super::flags::AtomicRelayServiceFlags;
3131
use super::options::{RelayOptions, ReqExitPolicy, SubscribeAutoCloseOptions, SyncOptions};
3232
use super::ping::PingTracker;
3333
use super::stats::RelayConnectionStats;
34-
use super::{Error, Reconciliation, RelayNotification, RelayStatus, SubscriptionAutoClosedReason};
34+
use super::{
35+
Error, Reconciliation, RelayNotification, RelayStatus, SubscriptionActivity,
36+
SubscriptionAutoClosedReason,
37+
};
3538
use crate::policy::AdmitStatus;
3639
use crate::pool::RelayPoolNotification;
3740
use crate::relay::status::AtomicRelayStatus;
@@ -44,6 +47,11 @@ enum IngesterCommand {
4447
Authenticate { challenge: String },
4548
}
4649

50+
struct HandleAutoClosing {
51+
to_close: bool,
52+
reason: Option<SubscriptionAutoClosedReason>,
53+
}
54+
4755
#[derive(Debug)]
4856
struct RelayChannels {
4957
nostr: (
@@ -384,7 +392,6 @@ impl InnerRelay {
384392
RelayNotification::RelayStatus { .. } => None,
385393
RelayNotification::Authenticated => None,
386394
RelayNotification::AuthenticationFailed => None,
387-
RelayNotification::SubscriptionAutoClosed { .. } => None,
388395
RelayNotification::Shutdown => Some(RelayPoolNotification::Shutdown),
389396
};
390397

@@ -1197,21 +1204,22 @@ impl InnerRelay {
11971204
filter: Filter,
11981205
opts: SubscribeAutoCloseOptions,
11991206
notifications: broadcast::Receiver<RelayNotification>,
1207+
activity: Option<Sender<SubscriptionActivity>>,
12001208
) {
12011209
let relay = self.clone(); // <-- FULL RELAY CLONE HERE
12021210
task::spawn(async move {
12031211
// Check if CLOSE needed
12041212
let to_close: bool = match relay
1205-
.handle_auto_closing(&id, &filter, opts, notifications)
1213+
.handle_auto_closing(&id, &filter, opts, notifications, &activity)
12061214
.await
12071215
{
1208-
Some((to_close, reason)) => {
1209-
// Send subscription auto-closed notification
1216+
Some(HandleAutoClosing { to_close, reason }) => {
1217+
// Send activity
12101218
if let Some(reason) = reason {
1211-
relay.send_notification(
1212-
RelayNotification::SubscriptionAutoClosed { reason },
1213-
false,
1214-
);
1219+
if let Some(activity) = &activity {
1220+
// TODO: handle error?
1221+
let _ = activity.send(SubscriptionActivity::Closed(reason)).await;
1222+
}
12151223
}
12161224

12171225
to_close
@@ -1223,6 +1231,9 @@ impl InnerRelay {
12231231
}
12241232
};
12251233

1234+
// Drop activity sender to terminate the receiver activity loop
1235+
drop(activity);
1236+
12261237
// Close subscription
12271238
if to_close {
12281239
tracing::debug!(id = %id, "Auto-closing subscription.");
@@ -1239,7 +1250,8 @@ impl InnerRelay {
12391250
filter: &Filter,
12401251
opts: SubscribeAutoCloseOptions,
12411252
mut notifications: broadcast::Receiver<RelayNotification>,
1242-
) -> Option<(bool, Option<SubscriptionAutoClosedReason>)> {
1253+
activity: &Option<Sender<SubscriptionActivity>>,
1254+
) -> Option<HandleAutoClosing> {
12431255
time::timeout(opts.timeout, async move {
12441256
let mut counter: u16 = 0;
12451257
let mut received_eose: bool = false;
@@ -1255,16 +1267,30 @@ impl InnerRelay {
12551267
if let (Some(idle_timeout), Some(last_event)) = (opts.idle_timeout, last_event) {
12561268
if last_event.elapsed() > idle_timeout {
12571269
// Close the subscription
1258-
return Some((true, None)); // TODO: use SubscriptionAutoClosedReason::Timeout?
1270+
return Some(HandleAutoClosing {
1271+
to_close: true,
1272+
reason: None,
1273+
});
12591274
}
12601275
}
12611276

12621277
match notification {
12631278
RelayNotification::Message { message, .. } => match message {
12641279
RelayMessage::Event {
1265-
subscription_id, ..
1280+
subscription_id,
1281+
event,
12661282
} => {
12671283
if subscription_id.as_ref() == id {
1284+
// Send activity
1285+
if let Some(activity) = activity {
1286+
// TODO: handle error?
1287+
let _ = activity
1288+
.send(SubscriptionActivity::ReceivedEvent(
1289+
event.into_owned(),
1290+
))
1291+
.await;
1292+
}
1293+
12681294
// If no-events timeout is enabled, update instant of last event received
12691295
if opts.idle_timeout.is_some() {
12701296
last_event = Some(Instant::now());
@@ -1302,21 +1328,21 @@ impl InnerRelay {
13021328
if self.state.is_auto_authentication_enabled() {
13031329
require_resubscription = true;
13041330
} else {
1305-
return Some((
1306-
false,
1307-
Some(SubscriptionAutoClosedReason::Closed(
1331+
return Some(HandleAutoClosing {
1332+
to_close: false, // No need to send CLOSE msg
1333+
reason: Some(SubscriptionAutoClosedReason::Closed(
13081334
message.into_owned(),
13091335
)),
1310-
)); // No need to send CLOSE msg
1336+
});
13111337
}
13121338
}
13131339
_ => {
1314-
return Some((
1315-
false,
1316-
Some(SubscriptionAutoClosedReason::Closed(
1340+
return Some(HandleAutoClosing {
1341+
to_close: false, // No need to send CLOSE msg
1342+
reason: Some(SubscriptionAutoClosedReason::Closed(
13171343
message.into_owned(),
13181344
)),
1319-
)); // No need to send CLOSE msg
1345+
});
13201346
}
13211347
}
13221348
}
@@ -1335,18 +1361,24 @@ impl InnerRelay {
13351361
}
13361362
}
13371363
RelayNotification::AuthenticationFailed => {
1338-
return Some((
1339-
false,
1340-
Some(SubscriptionAutoClosedReason::AuthenticationFailed),
1341-
)); // No need to send CLOSE msg
1364+
return Some(HandleAutoClosing {
1365+
to_close: false, // No need to send CLOSE msg
1366+
reason: Some(SubscriptionAutoClosedReason::AuthenticationFailed),
1367+
});
13421368
}
13431369
RelayNotification::RelayStatus { status } => {
13441370
if status.is_disconnected() {
1345-
return Some((false, None)); // No need to send CLOSE msg
1371+
return Some(HandleAutoClosing {
1372+
to_close: false, // No need to send CLOSE msg
1373+
reason: None,
1374+
});
13461375
}
13471376
}
13481377
RelayNotification::Shutdown => {
1349-
return Some((false, None)); // No need to send CLOSE msg
1378+
return Some(HandleAutoClosing {
1379+
to_close: false, // No need to send CLOSE msg
1380+
reason: None,
1381+
});
13501382
}
13511383
_ => (),
13521384
}
@@ -1356,6 +1388,25 @@ impl InnerRelay {
13561388
time::timeout(Some(duration), async {
13571389
while let Ok(notification) = notifications.recv().await {
13581390
match notification {
1391+
RelayNotification::Message {
1392+
message:
1393+
RelayMessage::Event {
1394+
subscription_id,
1395+
event,
1396+
},
1397+
} => {
1398+
if subscription_id.as_ref() == id {
1399+
// Send activity
1400+
if let Some(activity) = activity {
1401+
// TODO: handle error?
1402+
let _ = activity
1403+
.send(SubscriptionActivity::ReceivedEvent(
1404+
event.into_owned(),
1405+
))
1406+
.await;
1407+
}
1408+
}
1409+
}
13591410
RelayNotification::RelayStatus { status } => {
13601411
if status.is_disconnected() {
13611412
return Ok(());
@@ -1373,7 +1424,10 @@ impl InnerRelay {
13731424
.await;
13741425
}
13751426

1376-
Some((true, Some(SubscriptionAutoClosedReason::Completed))) // Need to send CLOSE msg
1427+
Some(HandleAutoClosing {
1428+
to_close: true, // Need to send CLOSE msg
1429+
reason: Some(SubscriptionAutoClosedReason::Completed),
1430+
})
13771431
})
13781432
.await?
13791433
}

0 commit comments

Comments
 (0)