Skip to content

Commit 681135a

Browse files
committed
relay-pool: change Relay::send_msg and Relay::batch_msg signatures
Make sure that messages are sent in `Relay::batch_msg` before continuing with code execution. Signed-off-by: Yuki Kishimoto <[email protected]>
1 parent c1a2438 commit 681135a

File tree

5 files changed

+105
-44
lines changed

5 files changed

+105
-44
lines changed

crates/nostr-relay-pool/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@
2323
2424
-->
2525

26+
## Unreleased
27+
28+
### Breaking changes
29+
30+
- Change `Relay::send_msg` and `Relay::batch_msg` signatures (TBD)
31+
2632
## v0.44.0 - 2025/11/06
2733

2834
### Breaking changes

crates/nostr-relay-pool/src/pool/mod.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -705,12 +705,23 @@ impl RelayPool {
705705
}
706706
}
707707

708+
let mut urls: Vec<RelayUrl> = Vec::with_capacity(set.len());
709+
let mut futures = Vec::with_capacity(set.len());
708710
let mut output: Output<()> = Output::default();
709711

710-
// Batch messages and construct outputs
712+
// Compose futures
711713
for url in set.into_iter() {
712714
let relay: &Relay = self.internal_relay(&relays, &url)?;
713-
match relay.batch_msg(msgs.clone()) {
715+
urls.push(url);
716+
futures.push(relay.batch_msg(msgs.clone()));
717+
}
718+
719+
// Join futures
720+
let list = future::join_all(futures).await;
721+
722+
// Iter results and construct output
723+
for (url, result) in urls.into_iter().zip(list.into_iter()) {
724+
match result {
714725
Ok(..) => {
715726
// Success, insert relay url in 'success' set result
716727
output.success.insert(url);

crates/nostr-relay-pool/src/relay/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ pub enum Error {
3434
Negentropy(negentropy::Error),
3535
/// Database error
3636
Database(DatabaseError),
37+
/// Can't receive send confirmation
38+
CantReceiveSendConfirmation,
3739
/// Generic timeout
3840
Timeout,
3941
/// Not replied to ping
@@ -139,6 +141,7 @@ impl fmt::Display for Error {
139141
Self::Hex(e) => e.fmt(f),
140142
Self::Negentropy(e) => e.fmt(f),
141143
Self::Database(e) => e.fmt(f),
144+
Self::CantReceiveSendConfirmation => f.write_str("can't receive send confirmation"),
142145
Self::Timeout => f.write_str("timeout"),
143146
Self::NotRepliedToPing => f.write_str("not replied to ping"),
144147
Self::CantParsePong => f.write_str("can't parse pong"),

crates/nostr-relay-pool/src/relay/inner.rs

Lines changed: 73 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use negentropy::{Id, Negentropy, NegentropyStorageVector};
1717
use nostr::secp256k1::rand::{self, Rng};
1818
use nostr_database::prelude::*;
1919
use tokio::sync::mpsc::{self, Receiver, Sender};
20-
use tokio::sync::{broadcast, Mutex, MutexGuard, Notify, RwLock, RwLockWriteGuard};
20+
use tokio::sync::{broadcast, oneshot, Mutex, MutexGuard, Notify, RwLock, RwLockWriteGuard};
2121

2222
use super::constants::{
2323
DEFAULT_CONNECTION_TIMEOUT, JITTER_RANGE, MAX_RETRY_INTERVAL, MIN_ATTEMPTS, MIN_SUCCESS_RATE,
@@ -58,11 +58,16 @@ struct HandleAutoClosing {
5858
reason: Option<SubscriptionAutoClosedReason>,
5959
}
6060

61+
struct SendMessageRequest {
62+
msgs: Vec<ClientMessageJson>,
63+
confirmation: Option<oneshot::Sender<()>>,
64+
}
65+
6166
#[derive(Debug)]
6267
struct RelayChannels {
6368
nostr: (
64-
Sender<Vec<ClientMessageJson>>,
65-
Mutex<Receiver<Vec<ClientMessageJson>>>,
69+
Sender<SendMessageRequest>,
70+
Mutex<Receiver<SendMessageRequest>>,
6671
),
6772
ping: Notify,
6873
terminate: Notify,
@@ -79,21 +84,28 @@ impl RelayChannels {
7984
}
8085
}
8186

82-
pub fn send_client_msgs(&self, msgs: Vec<ClientMessage>) -> Result<(), Error> {
87+
fn send_client_msgs(
88+
&self,
89+
msgs: Vec<ClientMessage>,
90+
confirmation: Option<oneshot::Sender<()>>,
91+
) -> Result<(), Error> {
8392
// Serialize messages to JSON
8493
let msgs: Vec<ClientMessageJson> = msgs.into_iter().map(|msg| msg.as_json()).collect();
8594

95+
// Build request
96+
let req: SendMessageRequest = SendMessageRequest { msgs, confirmation };
97+
8698
// Send
8799
self.nostr
88100
.0
89-
.try_send(msgs)
101+
.try_send(req)
90102
.map_err(|_| Error::CantSendChannelMessage {
91103
channel: String::from("nostr"),
92104
})
93105
}
94106

95107
#[inline]
96-
pub async fn rx_nostr(&self) -> MutexGuard<'_, Receiver<Vec<ClientMessageJson>>> {
108+
async fn rx_nostr(&self) -> MutexGuard<'_, Receiver<SendMessageRequest>> {
97109
self.nostr.1.lock().await
98110
}
99111

@@ -700,7 +712,7 @@ impl InnerRelay {
700712
async fn connect_and_run(
701713
&self,
702714
stream: Option<(WebSocketSink, WebSocketStream)>,
703-
rx_nostr: &mut MutexGuard<'_, Receiver<Vec<ClientMessageJson>>>,
715+
rx_nostr: &mut MutexGuard<'_, Receiver<SendMessageRequest>>,
704716
last_ws_error: &mut Option<String>,
705717
) {
706718
match stream {
@@ -743,7 +755,7 @@ impl InnerRelay {
743755
&self,
744756
mut ws_tx: WebSocketSink,
745757
ws_rx: WebSocketStream,
746-
rx_nostr: &mut MutexGuard<'_, Receiver<Vec<ClientMessageJson>>>,
758+
rx_nostr: &mut MutexGuard<'_, Receiver<SendMessageRequest>>,
747759
) {
748760
// (Re)subscribe to relay
749761
if self.flags.can_read() {
@@ -792,7 +804,7 @@ impl InnerRelay {
792804
async fn sender_message_handler(
793805
&self,
794806
ws_tx: &mut WebSocketSink,
795-
rx_nostr: &mut MutexGuard<'_, Receiver<Vec<ClientMessageJson>>>,
807+
rx_nostr: &mut MutexGuard<'_, Receiver<SendMessageRequest>>,
796808
ping: &PingTracker,
797809
) -> Result<(), Error> {
798810
#[cfg(target_arch = "wasm32")]
@@ -801,7 +813,7 @@ impl InnerRelay {
801813
loop {
802814
tokio::select! {
803815
// Nostr channel receiver
804-
Some(msgs) = rx_nostr.recv() => {
816+
Some(SendMessageRequest { msgs, confirmation }) = rx_nostr.recv() => {
805817
// Compose WebSocket text messages
806818
let msgs: Vec<Message> = msgs
807819
.into_iter()
@@ -823,6 +835,14 @@ impl InnerRelay {
823835
// Send WebSocket messages
824836
send_ws_msgs(ws_tx, msgs).await?;
825837

838+
// Send confirmation that messages has been sent
839+
if let Some(confirmation) = confirmation {
840+
match confirmation.send(()) {
841+
Ok(()) => tracing::trace!("Message confirmation sent."),
842+
Err(_) => tracing::error!("Can't send message confirmation."),
843+
}
844+
}
845+
826846
// Increase sent bytes
827847
self.stats.add_bytes_sent(size);
828848
}
@@ -1304,11 +1324,11 @@ impl InnerRelay {
13041324
}
13051325

13061326
#[inline]
1307-
pub fn send_msg(&self, msg: ClientMessage<'_>) -> Result<(), Error> {
1308-
self.batch_msg(vec![msg])
1327+
pub async fn send_msg(&self, msg: ClientMessage<'_>) -> Result<(), Error> {
1328+
self.batch_msg(vec![msg]).await
13091329
}
13101330

1311-
pub fn batch_msg(&self, msgs: Vec<ClientMessage<'_>>) -> Result<(), Error> {
1331+
pub async fn batch_msg(&self, msgs: Vec<ClientMessage<'_>>) -> Result<(), Error> {
13121332
// Check if relay is operational
13131333
self.ensure_operational()?;
13141334

@@ -1327,21 +1347,31 @@ impl InnerRelay {
13271347
return Err(Error::ReadDisabled);
13281348
}
13291349

1350+
let (tx, rx) = oneshot::channel();
1351+
13301352
// Send messages
1331-
self.atomic.channels.send_client_msgs(msgs)
1353+
self.atomic.channels.send_client_msgs(msgs, Some(tx))?;
1354+
1355+
// Wait for confirmation and propagate error, if any
1356+
time::timeout(Some(Duration::from_secs(10)), rx)
1357+
.await
1358+
.ok_or(Error::Timeout)?
1359+
.map_err(|_| Error::CantReceiveSendConfirmation)
13321360
}
13331361

1334-
fn send_neg_msg(&self, id: &SubscriptionId, message: &str) -> Result<(), Error> {
1362+
async fn send_neg_msg(&self, id: &SubscriptionId, message: &str) -> Result<(), Error> {
13351363
self.send_msg(ClientMessage::NegMsg {
13361364
subscription_id: Cow::Borrowed(id),
13371365
message: Cow::Borrowed(message),
13381366
})
1367+
.await
13391368
}
13401369

1341-
fn send_neg_close(&self, id: &SubscriptionId) -> Result<(), Error> {
1370+
async fn send_neg_close(&self, id: &SubscriptionId) -> Result<(), Error> {
13421371
self.send_msg(ClientMessage::NegClose {
13431372
subscription_id: Cow::Borrowed(id),
13441373
})
1374+
.await
13451375
}
13461376

13471377
async fn auth(&self, challenge: String) -> Result<(), Error> {
@@ -1357,7 +1387,8 @@ impl InnerRelay {
13571387
let mut notifications = self.internal_notification_sender.subscribe();
13581388

13591389
// Send the AUTH message
1360-
self.send_msg(ClientMessage::Auth(Cow::Borrowed(&event)))?;
1390+
self.send_msg(ClientMessage::Auth(Cow::Borrowed(&event)))
1391+
.await?;
13611392

13621393
// Wait for OK
13631394
// The event ID is already checked in `wait_for_ok` method
@@ -1416,7 +1447,7 @@ impl InnerRelay {
14161447
let subscriptions = self.subscriptions().await;
14171448
for (id, filters) in subscriptions.into_iter() {
14181449
if !filters.is_empty() && self.should_resubscribe(&id).await {
1419-
self.send_msg(ClientMessage::req(id, filters))?;
1450+
self.send_msg(ClientMessage::req(id, filters)).await?;
14201451
} else {
14211452
tracing::debug!("Skip re-subscription of '{id}'");
14221453
}
@@ -1464,7 +1495,9 @@ impl InnerRelay {
14641495
// Close subscription
14651496
let send_result = if to_close {
14661497
tracing::debug!(id = %id, "Auto-closing subscription.");
1467-
relay.send_msg(ClientMessage::Close(Cow::Borrowed(&id)))
1498+
relay
1499+
.send_msg(ClientMessage::Close(Cow::Borrowed(&id)))
1500+
.await
14681501
} else {
14691502
Ok(())
14701503
};
@@ -1620,7 +1653,7 @@ impl InnerRelay {
16201653
subscription_id: Cow::Borrowed(id),
16211654
filters: filters.iter().map(Cow::Borrowed).collect(),
16221655
};
1623-
let _ = self.send_msg(msg);
1656+
let _ = self.send_msg(msg).await;
16241657
}
16251658
}
16261659
RelayNotification::AuthenticationFailed => {
@@ -1695,10 +1728,10 @@ impl InnerRelay {
16951728
.await?
16961729
}
16971730

1698-
fn _unsubscribe_long_lived_subscription(
1731+
async fn _unsubscribe_long_lived_subscription(
16991732
&self,
1700-
subscriptions: &mut RwLockWriteGuard<HashMap<SubscriptionId, SubscriptionData>>,
1701-
id: Cow<SubscriptionId>,
1733+
subscriptions: &mut RwLockWriteGuard<'_, HashMap<SubscriptionId, SubscriptionData>>,
1734+
id: Cow<'_, SubscriptionId>,
17021735
) -> Result<(), Error> {
17031736
// Remove the subscription from the map
17041737
if let Some(sub) = subscriptions.remove(&id) {
@@ -1710,12 +1743,13 @@ impl InnerRelay {
17101743
}
17111744

17121745
// Send CLOSE message
1713-
self.send_msg(ClientMessage::Close(id))
1746+
self.send_msg(ClientMessage::Close(id)).await
17141747
}
17151748

17161749
pub async fn unsubscribe(&self, id: &SubscriptionId) -> Result<(), Error> {
17171750
let mut subscriptions = self.atomic.subscriptions.write().await;
17181751
self._unsubscribe_long_lived_subscription(&mut subscriptions, Cow::Borrowed(id))
1752+
.await
17191753
}
17201754

17211755
pub async fn unsubscribe_all(&self) -> Result<(), Error> {
@@ -1726,14 +1760,15 @@ impl InnerRelay {
17261760

17271761
// Unsubscribe
17281762
for id in ids.into_iter() {
1729-
self._unsubscribe_long_lived_subscription(&mut subscriptions, Cow::Owned(id))?;
1763+
self._unsubscribe_long_lived_subscription(&mut subscriptions, Cow::Owned(id))
1764+
.await?;
17301765
}
17311766

17321767
Ok(())
17331768
}
17341769

17351770
#[inline(never)]
1736-
fn handle_neg_msg<I>(
1771+
async fn handle_neg_msg<I>(
17371772
&self,
17381773
subscription_id: &SubscriptionId,
17391774
msg: Option<Vec<u8>>,
@@ -1775,13 +1810,16 @@ impl InnerRelay {
17751810
}
17761811

17771812
match msg {
1778-
Some(query) => self.send_neg_msg(subscription_id, &hex::encode(query)),
1813+
Some(query) => {
1814+
self.send_neg_msg(subscription_id, &hex::encode(query))
1815+
.await
1816+
}
17791817
None => {
17801818
// Mark sync as done
17811819
*sync_done = true;
17821820

17831821
// Send NEG-CLOSE message
1784-
self.send_neg_close(subscription_id)
1822+
self.send_neg_close(subscription_id).await
17851823
}
17861824
}
17871825
}
@@ -1805,7 +1843,7 @@ impl InnerRelay {
18051843
match self.state.database().event_by_id(&id).await {
18061844
Ok(Some(event)) => {
18071845
in_flight_up.insert(id);
1808-
self.send_msg(ClientMessage::event(event))?;
1846+
self.send_msg(ClientMessage::event(event)).await?;
18091847
num_sent += 1;
18101848
}
18111849
Ok(None) => {
@@ -1886,7 +1924,7 @@ impl InnerRelay {
18861924
.await;
18871925

18881926
// Send msg
1889-
if let Err(e) = self.send_msg(msg) {
1927+
if let Err(e) = self.send_msg(msg).await {
18901928
// Remove previously added subscription
18911929
self.remove_subscription(down_sub_id).await;
18921930

@@ -1960,7 +1998,7 @@ impl InnerRelay {
19601998
id_size: None,
19611999
initial_message: Cow::Owned(hex::encode(initial_message)),
19622000
};
1963-
self.send_msg(open_msg)?;
2001+
self.send_msg(open_msg).await?;
19642002

19652003
// Check if negentropy is supported
19662004
check_negentropy_support(&sub_id, opts, &mut temp_notifications).await?;
@@ -2006,7 +2044,8 @@ impl InnerRelay {
20062044
&mut have_ids,
20072045
&mut need_ids,
20082046
&mut sync_done,
2009-
)?;
2047+
)
2048+
.await?;
20102049
}
20112050
}
20122051
RelayMessage::NegErr {
@@ -2046,7 +2085,8 @@ impl InnerRelay {
20462085
self.remove_subscription(&down_sub_id).await;
20472086

20482087
// Close subscription
2049-
self.send_msg(ClientMessage::Close(Cow::Borrowed(&down_sub_id)))?;
2088+
self.send_msg(ClientMessage::Close(Cow::Borrowed(&down_sub_id)))
2089+
.await?;
20502090
}
20512091
}
20522092
RelayMessage::Closed {

0 commit comments

Comments
 (0)