Skip to content

Commit 16287ca

Browse files
committed
pool: add ReqExitPolicy::WaitForEvents variant
Allow exiting from a REQ after the subscription received N events. Signed-off-by: Yuki Kishimoto <[email protected]>
1 parent 914a92f commit 16287ca

File tree

6 files changed

+70
-6
lines changed

6 files changed

+70
-6
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@
9696
* database: add `Events::force_insert` ([Yuki Kishimoto])
9797
* pool: event verification cache ([Yuki Kishimoto])
9898
* pool: add `AdmitPolicy` trait ([Yuki Kishimoto])
99+
* pool: add `ReqExitPolicy::WaitForEvents` variant ([Yuki Kishimoto])
99100
* ffi: add Mac Catalyst support in Swift package ([Yuki Kishimoto])
100101
* js: add `KindStandard` enum ([Yuki Kishimoto])
101102

bindings/nostr-sdk-ffi/src/relay/options.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,8 @@ impl RelayOptions {
175175
pub enum ReqExitPolicy {
176176
/// Exit on EOSE
177177
ExitOnEOSE,
178+
/// Wait to receive N events and then exit.
179+
WaitForEvents { num: u16 },
178180
/// After EOSE is received, keep listening for N more events that match the filter.
179181
WaitForEventsAfterEOSE { num: u16 },
180182
/// After EOSE is received, keep listening for matching events for `Duration` more time.
@@ -185,6 +187,7 @@ impl From<ReqExitPolicy> for prelude::ReqExitPolicy {
185187
fn from(value: ReqExitPolicy) -> Self {
186188
match value {
187189
ReqExitPolicy::ExitOnEOSE => Self::ExitOnEOSE,
190+
ReqExitPolicy::WaitForEvents { num } => Self::WaitForEvents(num),
188191
ReqExitPolicy::WaitForEventsAfterEOSE { num } => Self::WaitForEventsAfterEOSE(num),
189192
ReqExitPolicy::WaitDurationAfterEOSE { duration } => {
190193
Self::WaitDurationAfterEOSE(duration)

bindings/nostr-sdk-js/src/relay/options.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,14 @@ impl JsReqExitPolicy {
108108
}
109109
}
110110

111+
/// Wait to receive N events and then exit.
112+
#[wasm_bindgen(js_name = waitForEvents)]
113+
pub fn wait_for_events(num: u16) -> Self {
114+
Self {
115+
inner: ReqExitPolicy::WaitForEvents(num),
116+
}
117+
}
118+
111119
/// After EOSE is received, keep listening for N more events that match the filter
112120
#[wasm_bindgen(js_name = waitForEventsAfterEOSE)]
113121
pub fn wait_for_events_after_eose(num: u16) -> Self {

crates/nostr-relay-pool/src/relay/inner.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1253,7 +1253,8 @@ impl InnerRelay {
12531253
activity: &Option<Sender<SubscriptionActivity>>,
12541254
) -> Option<HandleAutoClosing> {
12551255
time::timeout(opts.timeout, async move {
1256-
let mut counter: u16 = 0;
1256+
let mut wait_for_events_counter: u16 = 0;
1257+
let mut wait_for_events_after_eose_counter: u16 = 0;
12571258
let mut received_eose: bool = false;
12581259
let mut require_resubscription: bool = false;
12591260
let mut last_event: Option<Instant> = None;
@@ -1296,14 +1297,23 @@ impl InnerRelay {
12961297
last_event = Some(Instant::now());
12971298
}
12981299

1299-
if let ReqExitPolicy::WaitForEventsAfterEOSE(num) = opts.exit_policy
1300-
{
1301-
if received_eose {
1302-
counter += 1;
1303-
if counter >= num {
1300+
// Check exit policy
1301+
match opts.exit_policy {
1302+
ReqExitPolicy::WaitForEvents(num) => {
1303+
wait_for_events_counter += 1;
1304+
if wait_for_events_counter >= num {
13041305
break;
13051306
}
13061307
}
1308+
ReqExitPolicy::WaitForEventsAfterEOSE(num) => {
1309+
if received_eose {
1310+
wait_for_events_after_eose_counter += 1;
1311+
if wait_for_events_after_eose_counter >= num {
1312+
break;
1313+
}
1314+
}
1315+
}
1316+
_ => {}
13071317
}
13081318
}
13091319
}

crates/nostr-relay-pool/src/relay/mod.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1210,6 +1210,46 @@ mod tests {
12101210
assert_eq!(events.len(), 3);
12111211
}
12121212

1213+
#[tokio::test]
1214+
async fn test_fetch_events_wait_for_events() {
1215+
let (relay, _mock) = setup_event_fetching_relay(5).await;
1216+
1217+
let events = relay
1218+
.fetch_events(
1219+
Filter::new().kind(Kind::TextNote),
1220+
Duration::from_secs(15),
1221+
ReqExitPolicy::WaitForEvents(2),
1222+
)
1223+
.await
1224+
.unwrap();
1225+
assert_eq!(events.len(), 2); // Requested all text notes but exit after receive 2
1226+
1227+
// Task to send additional event
1228+
let r = relay.clone();
1229+
tokio::spawn(async move {
1230+
tokio::time::sleep(Duration::from_secs(2)).await;
1231+
1232+
// Signer
1233+
let keys = Keys::generate();
1234+
1235+
// Build and send event
1236+
let event = EventBuilder::metadata(&Metadata::new().name("Test"))
1237+
.sign_with_keys(&keys)
1238+
.unwrap();
1239+
r.send_event(&event).await.unwrap();
1240+
});
1241+
1242+
let events = relay
1243+
.fetch_events(
1244+
Filter::new().kind(Kind::Metadata),
1245+
Duration::from_secs(5),
1246+
ReqExitPolicy::WaitForEvents(1),
1247+
)
1248+
.await
1249+
.unwrap();
1250+
assert_eq!(events.len(), 1);
1251+
}
1252+
12131253
#[tokio::test]
12141254
async fn test_fetch_events_wait_for_events_after_eose() {
12151255
let (relay, _mock) = setup_event_fetching_relay(10).await;

crates/nostr-relay-pool/src/relay/options.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,8 @@ pub enum ReqExitPolicy {
175175
/// Exit on EOSE.
176176
#[default]
177177
ExitOnEOSE,
178+
/// Wait to receive N events and then exit.
179+
WaitForEvents(u16),
178180
/// After EOSE is received, keep listening for N more events that match the filter.
179181
WaitForEventsAfterEOSE(u16),
180182
/// After EOSE is received, keep listening for matching events for [`Duration`] more time.

0 commit comments

Comments
 (0)