Skip to content

Commit 7189966

Browse files
committed
split unsafe jar into smaller, self-contained module
1 parent 0e4c8fa commit 7189966

File tree

4 files changed

+194
-183
lines changed

4 files changed

+194
-183
lines changed

src/factory.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
22
database::OwnedDatabase,
3-
transaction::unsafe_jar,
3+
transaction::{unsafe_jar, RunnableTransaction, TransactionResult},
44
utils::{non_transaction_request, str_slice_to_array},
55
Database, ObjectStore, Transaction,
66
};
@@ -127,7 +127,7 @@ impl Factory {
127127
ran_upgrade_cb.set(true);
128128
on_upgrade_needed(event).await
129129
};
130-
unsafe_jar::RunnableTransaction::new(transaction, fut, result, finished_tx)
130+
RunnableTransaction::new(transaction, fut, result, finished_tx)
131131
},
132132
),
133133
async move |s| {
@@ -150,10 +150,10 @@ impl Factory {
150150
.take()
151151
.expect("Finished was called without the result being available");
152152
match result {
153-
unsafe_jar::TransactionResult::PolledForbiddenThing => {
153+
TransactionResult::PolledForbiddenThing => {
154154
panic!("Transaction blocked without any request under way")
155155
}
156-
unsafe_jar::TransactionResult::Done(upgrade_res) => upgrade_res?,
156+
TransactionResult::Done(upgrade_res) => upgrade_res?,
157157
}
158158
}
159159
completion_res.map_err(crate::Error::from_js_event)?;

src/transaction.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@ use web_sys::{
1515
IdbDatabase, IdbRequest, IdbTransaction, IdbTransactionMode,
1616
};
1717

18+
mod runner;
1819
pub(crate) mod unsafe_jar;
1920

21+
pub use runner::{RunnableTransaction, TransactionResult};
22+
2023
/// Wrapper for [`IDBTransaction`](https://developer.mozilla.org/en-US/docs/Web/API/IDBTransaction)
2124
#[derive(Debug)]
2225
pub struct Transaction<Err> {
@@ -119,7 +122,7 @@ impl TransactionBuilder {
119122
let (finished_tx, finished_rx) = futures_channel::oneshot::channel();
120123
unsafe_jar::extend_lifetime_to_scope_and_run(
121124
Box::new(move |()| {
122-
unsafe_jar::RunnableTransaction::new(
125+
RunnableTransaction::new(
123126
t.clone(),
124127
transaction(Transaction::from_sys(t)),
125128
result,
@@ -134,10 +137,10 @@ impl TransactionBuilder {
134137
.take()
135138
.expect("Transaction finished without setting result");
136139
match result {
137-
unsafe_jar::TransactionResult::PolledForbiddenThing => {
140+
TransactionResult::PolledForbiddenThing => {
138141
panic!("Transaction blocked without any request under way")
139142
}
140-
unsafe_jar::TransactionResult::Done(r) => r,
143+
TransactionResult::Done(r) => r,
141144
}
142145
},
143146
)
@@ -171,7 +174,7 @@ pub(crate) async fn transaction_request(req: IdbRequest) -> Result<JsValue, JsVa
171174
let result = Rc::new(RefCell::new(None));
172175

173176
// Keep the callbacks alive until execution completed
174-
let _callbacks = unsafe_jar::add_request(req, &result);
177+
let _callbacks = runner::add_request(req, &result);
175178

176179
match FakeFuture::new(&result).await {
177180
Ok(evt) => {

src/transaction/runner.rs

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
//! All the required to run a transaction
2+
3+
use std::{
4+
cell::{Cell, RefCell},
5+
future::Future,
6+
pin::Pin,
7+
rc::Rc,
8+
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
9+
};
10+
11+
use futures_channel::oneshot;
12+
use scoped_tls::scoped_thread_local;
13+
use web_sys::{
14+
js_sys::Function,
15+
wasm_bindgen::{closure::Closure, JsCast as _},
16+
IdbRequest, IdbTransaction,
17+
};
18+
19+
pub enum TransactionResult<R> {
20+
PolledForbiddenThing,
21+
Done(R),
22+
}
23+
24+
pub struct RunnableTransaction<'f> {
25+
transaction: IdbTransaction,
26+
inflight_requests: Cell<usize>,
27+
future: RefCell<Pin<Box<dyn 'f + Future<Output = ()>>>>,
28+
polled_forbidden_thing: Box<dyn 'f + Fn()>,
29+
finished: RefCell<Option<oneshot::Sender<()>>>,
30+
}
31+
32+
impl<'f> RunnableTransaction<'f> {
33+
pub fn new<R, E>(
34+
transaction: IdbTransaction,
35+
transaction_contents: impl 'f + Future<Output = Result<R, E>>,
36+
result: &'f RefCell<Option<TransactionResult<Result<R, E>>>>,
37+
finished: oneshot::Sender<()>,
38+
) -> RunnableTransaction<'f>
39+
where
40+
R: 'f,
41+
E: 'f,
42+
{
43+
RunnableTransaction {
44+
transaction: transaction.clone(),
45+
inflight_requests: Cell::new(0),
46+
future: RefCell::new(Box::pin(async move {
47+
let transaction_result = transaction_contents.await;
48+
if transaction_result.is_err() {
49+
// The transaction failed. We should abort it.
50+
let _ = transaction.abort();
51+
}
52+
assert!(
53+
result
54+
.replace(Some(TransactionResult::Done(transaction_result)))
55+
.is_none(),
56+
"Transaction completed multiple times",
57+
);
58+
})),
59+
polled_forbidden_thing: Box::new(move || {
60+
*result.borrow_mut() = Some(TransactionResult::PolledForbiddenThing);
61+
}),
62+
finished: RefCell::new(Some(finished)),
63+
}
64+
}
65+
}
66+
67+
fn panic_waker() -> Waker {
68+
fn clone(_: *const ()) -> RawWaker {
69+
RawWaker::new(
70+
std::ptr::null(),
71+
&RawWakerVTable::new(clone, wake, wake, drop),
72+
)
73+
}
74+
fn wake(_: *const ()) {
75+
panic!("IndexedDB transaction tried to await on something other than a request")
76+
}
77+
fn drop(_: *const ()) {}
78+
unsafe {
79+
Waker::new(
80+
std::ptr::null(),
81+
&RawWakerVTable::new(clone, wake, wake, drop),
82+
)
83+
}
84+
}
85+
86+
scoped_thread_local!(static CURRENT: Rc<RunnableTransaction<'static>>);
87+
88+
pub fn poll_it(state: &Rc<RunnableTransaction<'static>>) {
89+
CURRENT.set(&state, || {
90+
// Poll once, in order to run the transaction until its next await on a request
91+
let res = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
92+
state
93+
.future
94+
.borrow_mut()
95+
.as_mut()
96+
.poll(&mut Context::from_waker(&panic_waker()))
97+
}));
98+
99+
// Try catching the panic and aborting. This currently does not work in wasm due to panic=abort, but will
100+
// hopefully work some day. The transaction _should_ auto-abort if the wasm module aborts, so hopefully we're
101+
// fine around there.
102+
let res = match res {
103+
Ok(res) => res,
104+
Err(err) => {
105+
// The poll panicked, abort the transaction
106+
let _ = state.transaction.abort();
107+
std::panic::resume_unwind(err);
108+
}
109+
};
110+
111+
// Finally, check the poll result
112+
match res {
113+
Poll::Pending => {
114+
// Still some work to do. Is there at least one request in flight?
115+
if state.inflight_requests.get() == 0 {
116+
// Returned `Pending` despite no request being inflight. This means there was
117+
// an `await` on something other than transaction requests. Abort in order to
118+
// avoid the default auto-commit behavior.
119+
let _ = state.transaction.abort();
120+
let _ = (state.polled_forbidden_thing)();
121+
}
122+
}
123+
Poll::Ready(()) => {
124+
// Everything went well! Just signal that we're done
125+
let finished = state
126+
.finished
127+
.borrow_mut()
128+
.take()
129+
.expect("Transaction finished multiple times");
130+
if finished.send(()).is_err() {
131+
// Transaction aborted by not awaiting on it
132+
let _ = state.transaction.abort();
133+
return;
134+
}
135+
}
136+
}
137+
});
138+
}
139+
140+
pub fn add_request(
141+
req: IdbRequest,
142+
result: &Rc<RefCell<Option<Result<web_sys::Event, web_sys::Event>>>>,
143+
) -> impl Sized {
144+
CURRENT.with(move |state| {
145+
state
146+
.inflight_requests
147+
.set(state.inflight_requests.get() + 1);
148+
149+
let on_success = Closure::once({
150+
let state = state.clone();
151+
let result = result.clone();
152+
move |evt: web_sys::Event| {
153+
state
154+
.inflight_requests
155+
.set(state.inflight_requests.get() - 1);
156+
assert!(result.replace(Some(Ok(evt))).is_none());
157+
poll_it(&state);
158+
}
159+
});
160+
161+
let on_error = Closure::once({
162+
let state = state.clone();
163+
let result = result.clone();
164+
move |evt: web_sys::Event| {
165+
evt.prevent_default(); // Do not abort the transaction, we're dealing with it ourselves
166+
state
167+
.inflight_requests
168+
.set(state.inflight_requests.get() - 1);
169+
assert!(result.replace(Some(Err(evt))).is_none());
170+
poll_it(&state);
171+
}
172+
});
173+
174+
req.set_onsuccess(Some(&on_success.as_ref().dyn_ref::<Function>().unwrap()));
175+
req.set_onerror(Some(&on_error.as_ref().dyn_ref::<Function>().unwrap()));
176+
177+
// Keep the callbacks alive until they're no longer needed
178+
(on_success, on_error)
179+
})
180+
}

0 commit comments

Comments
 (0)