Skip to content

Commit 47ea5f6

Browse files
committed
relay-pool: add TransportSink for WebSocket
Add the `TransportSink` struct as a replacement for the `sink_map_err` method that was causing panics in case of connection interruption (i.e., broken pipe error). Fixes #984 Pull-Request: #1007 Signed-off-by: Yuki Kishimoto <[email protected]>
1 parent ca0d48c commit 47ea5f6

File tree

2 files changed

+42
-1
lines changed

2 files changed

+42
-1
lines changed

crates/nostr-relay-pool/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@
4444
- An option to ban relays that send events which don't match the subscription filter (https://github.com/rust-nostr/nostr/pull/981)
4545
- Add `RelayOptions::verify_subscriptions` option (https://github.com/rust-nostr/nostr/pull/997)
4646

47+
### Fixed
48+
49+
- Fix panic after a broken pipe error in the relay connection (https://github.com/rust-nostr/nostr/pull/1007)
50+
4751
## v0.42.0 - 2025/05/20
4852

4953
### Breaking changes

crates/nostr-relay-pool/src/transport/websocket.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@
55
//! WebSocket transport
66
77
use std::fmt;
8+
use std::pin::Pin;
89
use std::sync::Arc;
10+
use std::task::{Context, Poll};
911
use std::time::Duration;
1012

13+
use async_utility::futures_util::stream::SplitSink;
1114
use async_wsocket::futures_util::{Sink, SinkExt, Stream, StreamExt, TryStreamExt};
1215
use async_wsocket::{ConnectionMode, Message, WebSocket};
1316
use nostr::util::BoxedFuture;
@@ -94,9 +97,43 @@ impl WebSocketTransport for DefaultWebsocketTransport {
9497

9598
// Split sink and stream
9699
let (tx, rx) = socket.split();
97-
let sink: BoxSink = Box::new(tx.sink_map_err(TransportError::backend)) as BoxSink;
100+
101+
// NOTE: don't use sink_map_err here, as it may cause panics!
102+
// Issue: https://github.com/rust-nostr/nostr/issues/984
103+
let sink: BoxSink = Box::new(TransportSink(tx)) as BoxSink;
98104
let stream: BoxStream = Box::new(rx.map_err(TransportError::backend)) as BoxStream;
105+
99106
Ok((sink, stream))
100107
})
101108
}
102109
}
110+
111+
struct TransportSink(SplitSink<WebSocket, Message>);
112+
113+
impl Sink<Message> for TransportSink {
114+
type Error = TransportError;
115+
116+
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
117+
Pin::new(&mut self.0)
118+
.poll_ready_unpin(cx)
119+
.map_err(TransportError::backend)
120+
}
121+
122+
fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
123+
Pin::new(&mut self.0)
124+
.start_send_unpin(item)
125+
.map_err(TransportError::backend)
126+
}
127+
128+
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
129+
Pin::new(&mut self.0)
130+
.poll_flush_unpin(cx)
131+
.map_err(TransportError::backend)
132+
}
133+
134+
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
135+
Pin::new(&mut self.0)
136+
.poll_close_unpin(cx)
137+
.map_err(TransportError::backend)
138+
}
139+
}

0 commit comments

Comments
 (0)