Skip to content

Commit ccccd5f

Browse files
committed
Wait for wrapped storage to be dropped
1 parent 45f5345 commit ccccd5f

File tree

2 files changed

+29
-8
lines changed

2 files changed

+29
-8
lines changed

src/storage/send_wrapper/wrapper.rs

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@ use uuid::Uuid;
1616
/// a few types are `!Send` and any async function handling such types are also `!Send`.
1717
///
1818
/// On WASM, the wrapped storage runs in an async task, but not in a thread.
19-
#[derive(Clone)]
2019
pub(in crate::storage) struct Wrapper {
21-
sender: mpsc::UnboundedSender<ActorMessage>,
20+
// Both fields in this struct are `Option<..>` to allow them to be dropped individually
21+
// in `Wrapper::drop`.
22+
sender: Option<mpsc::UnboundedSender<ActorMessage>>,
23+
#[cfg(not(target_arch = "wasm32"))]
24+
thread: Option<std::thread::JoinHandle<()>>,
2225
}
2326

2427
impl Wrapper {
@@ -62,7 +65,7 @@ impl Wrapper {
6265
// Otherwise, spawn a new thread, and within that a local Tokio RT that can handle !Send
6366
// futures.
6467
#[cfg(not(target_arch = "wasm32"))]
65-
{
68+
let thread = {
6669
use std::thread;
6770
use tokio::runtime;
6871
thread::spawn(move || {
@@ -75,12 +78,16 @@ impl Wrapper {
7578
};
7679

7780
rt.block_on(in_thread(init_sender));
78-
});
79-
}
81+
})
82+
};
8083

8184
// Wait until the thread sends its initialization result.
8285
init_receiver.await??;
83-
Ok(Self { sender })
86+
Ok(Self {
87+
sender: Some(sender),
88+
#[cfg(not(target_arch = "wasm32"))]
89+
thread: Some(thread),
90+
})
8491
}
8592
}
8693

@@ -91,12 +98,27 @@ impl Storage for Wrapper {
9198
// communicates with the underlying sync txn.
9299
async fn txn<'a>(&'a mut self) -> Result<Box<dyn StorageTxn + Send + 'a>> {
93100
let (reply_tx, reply_rx) = oneshot::channel();
94-
self.sender.send(ActorMessage::BeginTxn(reply_tx))?;
101+
self.sender
102+
.as_mut()
103+
.expect("txn called after drop")
104+
.send(ActorMessage::BeginTxn(reply_tx))?;
95105
let txn_sender = reply_rx.await??;
96106
Ok(Box::new(WrapperTxn::new(txn_sender)))
97107
}
98108
}
99109

110+
#[cfg(not(target_arch = "wasm32"))]
111+
impl Drop for Wrapper {
112+
fn drop(&mut self) {
113+
// Deleting the sender signals to the actor thread that it should drop the
114+
// wrapped storage and terminate.
115+
self.sender = None;
116+
// Wait for the thread to terminate, indicating that the wrapped storage has
117+
// been fully dropped.
118+
let _ = self.thread.take().expect("thread joined twice").join();
119+
}
120+
}
121+
100122
/// An async proxy for a transaction running on the sync actor thread.
101123
struct WrapperTxn {
102124
sender: mpsc::UnboundedSender<TxnMessage>,

src/storage/sqlite/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ impl ToSql for StoredUuid {
4141
}
4242

4343
/// SqliteStorage stores task data in a file on disk.
44-
#[derive(Clone)]
4544
pub struct SqliteStorage(Wrapper);
4645

4746
impl SqliteStorage {

0 commit comments

Comments
 (0)