Skip to content

Commit d8d6496

Browse files
authored
Merge pull request #1120 from openmina/fix/core/channels/wasm/panic
Fix WebNode: tokio channels sometimes panic when used in wasm main browser context
2 parents 15eb933 + 4614437 commit d8d6496

File tree

7 files changed

+93
-36
lines changed

7 files changed

+93
-36
lines changed

Cargo.lock

Lines changed: 13 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ binprot_derive = { git = "https://github.com/openmina/binprot-rs", rev = "400b52
1717
rand = "0.8.0"
1818
redux = { workspace = true }
1919
tokio = { version = "1.26", features = ["sync"] }
20+
flume = { version = "0.11.1", features = ["async", "spin"] }
2021
time = { version = "0.3", features = ["formatting", "macros", "parsing"] }
2122
md5 = "0.7.0"
2223
multihash = { version = "0.18.1", features = ["blake2b"] }

core/src/channels.rs

Lines changed: 62 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
pub use tokio::sync::oneshot;
22

33
pub mod mpsc {
4-
use std::{
5-
sync::{Arc, Weak},
6-
task::{Context, Poll},
7-
};
8-
use tokio::sync::mpsc::error::*;
9-
pub use tokio::sync::mpsc::{self, *};
4+
use std::sync::{Arc, Weak};
105

11-
pub struct UnboundedSender<T>(mpsc::UnboundedSender<T>, Arc<()>);
12-
pub struct UnboundedReceiver<T>(mpsc::UnboundedReceiver<T>);
6+
pub use flume::{SendError, TryRecvError, TrySendError};
7+
8+
pub type RecvStream<T> = flume::r#async::RecvStream<'static, T>;
9+
10+
pub struct Sender<T>(flume::Sender<T>);
11+
pub struct Receiver<T>(flume::Receiver<T>);
12+
13+
pub struct UnboundedSender<T>(flume::Sender<T>, Arc<()>);
14+
pub struct UnboundedReceiver<T>(flume::Receiver<T>);
1315

1416
pub type TrackedUnboundedSender<T> = UnboundedSender<Tracked<T>>;
1517
pub type TrackedUnboundedReceiver<T> = UnboundedReceiver<Tracked<T>>;
@@ -31,6 +33,12 @@ pub mod mpsc {
3133
}
3234
}
3335

36+
impl<T> Clone for Sender<T> {
37+
fn clone(&self) -> Self {
38+
Self(self.0.clone())
39+
}
40+
}
41+
3442
impl<T> Clone for UnboundedSender<T> {
3543
fn clone(&self) -> Self {
3644
Self(self.0.clone(), self.1.clone())
@@ -51,6 +59,34 @@ pub mod mpsc {
5159
}
5260
}
5361

62+
impl<T> Sender<T> {
63+
pub async fn send(&self, message: T) -> Result<(), SendError<T>> {
64+
self.0.send_async(message).await
65+
}
66+
67+
pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
68+
self.0.try_send(message)
69+
}
70+
}
71+
72+
impl<T> Receiver<T> {
73+
pub fn is_empty(&self) -> bool {
74+
self.0.is_empty()
75+
}
76+
77+
pub fn len(&self) -> usize {
78+
self.0.len()
79+
}
80+
81+
pub async fn recv(&mut self) -> Option<T> {
82+
self.0.recv_async().await.ok()
83+
}
84+
85+
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
86+
self.0.try_recv()
87+
}
88+
}
89+
5490
impl<T> UnboundedSender<T> {
5591
pub fn is_empty(&self) -> bool {
5692
self.len() == 0
@@ -82,31 +118,37 @@ pub mod mpsc {
82118
}
83119

84120
pub async fn recv(&mut self) -> Option<T> {
85-
self.0.recv().await
121+
self.0.recv_async().await.ok()
86122
}
87123

88124
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
89125
self.0.try_recv()
90126
}
91127

92-
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
93-
self.0.poll_recv(cx)
128+
pub fn stream(&self) -> RecvStream<T> {
129+
self.0.clone().into_stream()
94130
}
95131

96132
pub fn blocking_recv(&mut self) -> Option<T> {
97-
self.0.blocking_recv()
133+
self.0.recv().ok()
98134
}
99135
}
100136

137+
pub fn channel<T>(bound: usize) -> (Sender<T>, Receiver<T>) {
138+
let (tx, rx) = flume::bounded(bound);
139+
140+
(Sender(tx), Receiver(rx))
141+
}
142+
101143
pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
102-
let (tx, rx) = mpsc::unbounded_channel();
144+
let (tx, rx) = flume::unbounded();
103145

104146
(UnboundedSender(tx, Arc::new(())), UnboundedReceiver(rx))
105147
}
106148

107149
pub fn tracked_unbounded_channel<T>(
108150
) -> (UnboundedSender<Tracked<T>>, UnboundedReceiver<Tracked<T>>) {
109-
let (tx, rx) = mpsc::unbounded_channel();
151+
let (tx, rx) = flume::unbounded();
110152

111153
(UnboundedSender(tx, Arc::new(())), UnboundedReceiver(rx))
112154
}
@@ -133,21 +175,21 @@ pub mod watch {
133175
}
134176

135177
#[allow(dead_code)]
136-
pub struct Aborter(mpsc::Receiver<()>, mpsc::Sender<()>);
178+
pub struct Aborter(flume::Receiver<()>, flume::Sender<()>);
137179

138180
#[derive(Clone)]
139-
pub struct Aborted(mpsc::Sender<()>);
181+
pub struct Aborted(flume::Sender<()>);
140182

141183
impl Default for Aborter {
142184
fn default() -> Self {
143-
let (tx, rx) = mpsc::channel(1);
185+
let (tx, rx) = flume::bounded(0);
144186
Self(rx, tx)
145187
}
146188
}
147189

148190
impl Aborter {
149191
pub fn listener_count(&self) -> usize {
150-
self.1.strong_count().saturating_sub(1)
192+
self.0.sender_count().saturating_sub(1)
151193
}
152194

153195
/// Simply drops the object. No need to call manually, unless you
@@ -163,6 +205,7 @@ impl Aborter {
163205

164206
impl Aborted {
165207
pub async fn wait(&self) {
166-
self.0.closed().await;
208+
// it returning an error means receiver was dropped
209+
while self.0.send_async(()).await.is_ok() {}
167210
}
168211
}

node/testing/src/cluster/mod.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ mod p2p_task_spawner;
55

66
mod node_id;
77
pub use node_id::{ClusterNodeId, ClusterOcamlNodeId};
8+
use openmina_core::channels::Aborter;
89

910
pub mod runner;
1011

@@ -19,7 +20,6 @@ use libp2p::futures::{stream::FuturesUnordered, StreamExt};
1920

2021
use ledger::proofs::provers::BlockProver;
2122
use node::account::{AccountPublicKey, AccountSecretKey};
22-
use node::core::channels::mpsc;
2323
use node::core::consensus::ConsensusConstants;
2424
use node::core::constants::constraint_constants;
2525
use node::core::invariants::InvariantsState;
@@ -208,7 +208,8 @@ impl Cluster {
208208
let node_config = testing_config.clone();
209209
let node_id = ClusterNodeId::new_unchecked(self.nodes.len());
210210
let work_dir = TempDir::new().unwrap();
211-
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
211+
let shutdown_initiator = Aborter::default();
212+
let shutdown_listener = shutdown_initiator.aborted();
212213
let p2p_sec_key = match testing_config.peer_id {
213214
TestPeerId::Derived => P2pSecretKey::deterministic(node_id.index()),
214215
TestPeerId::Bytes(bytes) => P2pSecretKey::from_bytes(bytes),
@@ -306,7 +307,7 @@ impl Cluster {
306307
.ledger_init()
307308
.p2p_init_with_custom_task_spawner(
308309
p2p_sec_key.clone(),
309-
p2p_task_spawner::P2pTaskSpawner::new(shutdown_tx.clone()),
310+
p2p_task_spawner::P2pTaskSpawner::new(shutdown_listener.clone()),
310311
)
311312
.gather_stats()
312313
.record(match testing_config.recorder {
@@ -331,15 +332,15 @@ impl Cluster {
331332
.enable_all()
332333
.build()
333334
.unwrap();
334-
let shutdown = shutdown_tx.clone();
335+
let shutdown = shutdown_listener.clone();
335336
let rpc_sender = real_service.rpc_sender();
336337
thread::Builder::new()
337338
.name("openmina_http_server".to_owned())
338339
.spawn(move || {
339340
let local_set = tokio::task::LocalSet::new();
340341
let task = async {
341342
tokio::select! {
342-
_ = shutdown.closed() => {}
343+
_ = shutdown.wait() => {}
343344
_ = http_server::run(http_port, rpc_sender) => {}
344345
}
345346
};
@@ -349,7 +350,7 @@ impl Cluster {
349350

350351
let invariants_state = self.invariants_state.clone();
351352
let mut service =
352-
NodeTestingService::new(real_service, node_id, invariants_state, shutdown_rx);
353+
NodeTestingService::new(real_service, node_id, invariants_state, shutdown_initiator);
353354

354355
service.set_proof_kind(self.config.proof_kind());
355356
if self.config.all_rust_to_rust_use_webrtc() {

node/testing/src/cluster/p2p_task_spawner.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1-
use node::core::channels::mpsc;
21
use node::core::thread;
32
use node::p2p::service_impl::TaskSpawner;
3+
use openmina_core::channels::Aborted;
44

55
#[derive(Clone)]
66
pub struct P2pTaskSpawner {
7-
shutdown: mpsc::Sender<()>,
7+
shutdown: Aborted,
88
}
99

1010
impl P2pTaskSpawner {
11-
pub fn new(shutdown: mpsc::Sender<()>) -> Self {
11+
pub fn new(shutdown: Aborted) -> Self {
1212
Self { shutdown }
1313
}
1414
}
@@ -28,7 +28,7 @@ impl TaskSpawner for P2pTaskSpawner {
2828
.spawn(move || {
2929
let fut = async {
3030
tokio::select! {
31-
_ = shutdown.closed() => {}
31+
_ = shutdown.wait() => {}
3232
_ = fut => {}
3333
}
3434
};

node/testing/src/service/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ use node::{
5353
},
5454
};
5555
use node::{ActionWithMeta, State};
56+
use openmina_core::channels::Aborter;
5657
use openmina_node_native::NodeService;
5758
use redux::Instant;
5859

@@ -141,15 +142,15 @@ pub struct NodeTestingService {
141142

142143
cluster_invariants_state: Arc<StdMutex<InvariantsState>>,
143144
/// Once dropped, it will cause all threads associated to shutdown.
144-
_shutdown: mpsc::Receiver<()>,
145+
_shutdown: Aborter,
145146
}
146147

147148
impl NodeTestingService {
148149
pub fn new(
149150
real: NodeService,
150151
id: ClusterNodeId,
151152
cluster_invariants_state: Arc<StdMutex<InvariantsState>>,
152-
_shutdown: mpsc::Receiver<()>,
153+
_shutdown: Aborter,
153154
) -> Self {
154155
Self {
155156
real,

p2p/testing/src/rust_node.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,14 +73,14 @@ impl RustNodeConfig {
7373

7474
pub struct RustNode {
7575
store: Store,
76-
event_receiver: mpsc::UnboundedReceiver<P2pEvent>,
76+
event_receiver: mpsc::RecvStream<P2pEvent>,
7777
}
7878

7979
impl RustNode {
8080
pub(super) fn new(store: Store, event_receiver: mpsc::UnboundedReceiver<P2pEvent>) -> Self {
8181
RustNode {
8282
store,
83-
event_receiver,
83+
event_receiver: event_receiver.stream(),
8484
}
8585
}
8686

@@ -109,7 +109,7 @@ impl RustNode {
109109
}
110110

111111
fn poll_event_receiver(&mut self, cx: &mut Context<'_>) -> Poll<Option<RustNodeEvent>> {
112-
let event = ready!(Pin::new(&mut self.event_receiver).poll_recv(cx));
112+
let event = ready!(Pin::new(&mut self.event_receiver).poll_next(cx));
113113
Poll::Ready(event.map(|event| {
114114
self.dispatch_event(event.clone());
115115
RustNodeEvent::P2p { event }

0 commit comments

Comments
 (0)