Skip to content

Commit 5480978

Browse files
authored
fix(native): Handle JsAsyncChannel error (without panic) (#7729)
1 parent cb9698e commit 5480978

File tree

1 file changed

+63
-30
lines changed

1 file changed

+63
-30
lines changed

packages/cubejs-backend-native/src/channel.rs

Lines changed: 63 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,33 @@
11
use std::cell::RefCell;
22
use std::collections::HashMap;
3+
#[cfg(debug_assertions)]
4+
use std::sync::atomic::{AtomicU64, Ordering};
35
use std::sync::Arc;
46

57
use crate::transport::MapCubeErrExt;
68
use async_trait::async_trait;
79
use cubesql::transport::{SqlGenerator, SqlTemplates};
810
use cubesql::CubeError;
9-
#[cfg(build = "debug")]
11+
#[cfg(debug_assertions)]
1012
use log::trace;
1113
use neon::prelude::*;
1214
use tokio::sync::oneshot;
1315

1416
use crate::utils::bind_method;
1517

16-
type JsAsyncStringChannelCallback = Box<dyn FnOnce(Result<String, CubeError>) + Send>;
17-
type JsAsyncChannelCallback =
18-
Box<dyn FnOnce(&mut FunctionContext, Result<Handle<JsValue>, CubeError>) + Send>;
18+
type JsAsyncStringChannelCallback =
19+
Box<dyn FnOnce(Result<String, CubeError>) -> Result<(), CubeError> + Send>;
20+
type JsAsyncChannelCallback = Box<
21+
dyn FnOnce(&mut FunctionContext, Result<Handle<JsValue>, CubeError>) -> Result<(), CubeError>
22+
+ Send,
23+
>;
24+
25+
#[cfg(debug_assertions)]
26+
static JS_ASYNC_CHANNEL_DEBUG_ID_SEQ: AtomicU64 = AtomicU64::new(0);
1927

2028
pub struct JsAsyncChannel {
29+
#[cfg(debug_assertions)]
30+
_id: u64,
2131
callback: Option<JsAsyncChannelCallback>,
2232
}
2333

@@ -26,32 +36,39 @@ type BoxedChannel = JsBox<RefCell<JsAsyncChannel>>;
2636
impl Finalize for JsAsyncChannel {}
2737

2838
fn js_async_channel_resolve(mut cx: FunctionContext) -> JsResult<JsUndefined> {
29-
#[cfg(build = "debug")]
30-
trace!("JsAsyncChannel.resolved");
31-
3239
let this = cx.this().downcast_or_throw::<BoxedChannel, _>(&mut cx)?;
40+
41+
#[cfg(debug_assertions)]
42+
trace!("JsAsyncChannel.resolved {}", this.borrow()._id);
43+
3344
let result = cx.argument::<JsValue>(0)?;
3445

35-
if this.borrow_mut().resolve(&mut cx, result) {
36-
Ok(cx.undefined())
46+
let tricky_rust_scope_hack = if let Err(err) = this.borrow_mut().resolve(&mut cx, result) {
47+
cx.throw_error(format!("JsAsyncChannel resolving error: {}", err))
3748
} else {
38-
cx.throw_error("Resolve was called on AsyncChannel that was already used")
39-
}
49+
Ok(cx.undefined())
50+
};
51+
52+
tricky_rust_scope_hack
4053
}
4154

4255
fn js_async_channel_reject(mut cx: FunctionContext) -> JsResult<JsUndefined> {
43-
#[cfg(build = "debug")]
44-
trace!("JsAsyncChannel.reject");
45-
4656
let this = cx.this().downcast_or_throw::<BoxedChannel, _>(&mut cx)?;
57+
58+
#[cfg(debug_assertions)]
59+
trace!("JsAsyncChannel.reject {}", this.borrow()._id);
60+
4761
let error = cx.argument::<JsString>(0)?;
4862

4963
let error_str = error.value(&mut cx);
50-
if this.borrow_mut().reject(&mut cx, error_str) {
51-
Ok(cx.undefined())
64+
65+
let tricky_rust_scope_hack = if let Err(err) = this.borrow_mut().reject(&mut cx, error_str) {
66+
cx.throw_error(format!("JsAsyncChannel rejecting error: {}", err))
5267
} else {
53-
cx.throw_error("Reject was called on AsyncChannel that was already used")
54-
}
68+
Ok(cx.undefined())
69+
};
70+
71+
tricky_rust_scope_hack
5572
}
5673

5774
impl JsAsyncChannel {
@@ -70,6 +87,8 @@ impl JsAsyncChannel {
7087

7188
pub fn new_raw(callback: JsAsyncChannelCallback) -> Self {
7289
Self {
90+
#[cfg(debug_assertions)]
91+
_id: JS_ASYNC_CHANNEL_DEBUG_ID_SEQ.fetch_add(1, Ordering::SeqCst),
7392
callback: Some(callback),
7493
}
7594
}
@@ -91,23 +110,27 @@ impl JsAsyncChannel {
91110
Ok(obj)
92111
}
93112

94-
fn resolve(&mut self, cx: &mut FunctionContext, result: Handle<JsValue>) -> bool {
113+
fn resolve(
114+
&mut self,
115+
cx: &mut FunctionContext,
116+
result: Handle<JsValue>,
117+
) -> Result<(), CubeError> {
95118
if let Some(callback) = self.callback.take() {
96-
callback(cx, Ok(result));
97-
98-
true
119+
callback(cx, Ok(result))
99120
} else {
100-
false
121+
Err(CubeError::internal(
122+
"Resolve was called on AsyncChannel that was already used".to_string(),
123+
))
101124
}
102125
}
103126

104-
fn reject(&mut self, cx: &mut FunctionContext, error: String) -> bool {
127+
fn reject(&mut self, cx: &mut FunctionContext, error: String) -> Result<(), CubeError> {
105128
if let Some(callback) = self.callback.take() {
106-
callback(cx, Err(CubeError::internal(error)));
107-
108-
true
129+
callback(cx, Err(CubeError::internal(error)))
109130
} else {
110-
false
131+
Err(CubeError::internal(
132+
"Reject was called on AsyncChannel that was already used".to_string(),
133+
))
111134
}
112135
}
113136
}
@@ -132,7 +155,12 @@ where
132155
Err(err) => Err(CubeError::internal(err.to_string())),
133156
};
134157

135-
tx.send(to_channel).unwrap();
158+
tx.send(to_channel).map_err(|_| {
159+
CubeError::internal(
160+
"AsyncChannel: Unable to send result from JS back to Rust, channel closed"
161+
.to_string(),
162+
)
163+
})
136164
}));
137165

138166
channel
@@ -185,7 +213,12 @@ where
185213
let async_channel = JsAsyncChannel::new_raw(Box::new(move |cx, result| {
186214
let to_channel = result.and_then(|res| result_from_js_value(cx, res));
187215

188-
tx.send(to_channel).unwrap();
216+
tx.send(to_channel).map_err(|_| {
217+
CubeError::internal(
218+
"AsyncChannel: Unable to send result from JS back to Rust, channel closed"
219+
.to_string(),
220+
)
221+
})
189222
}));
190223

191224
channel.send(move |mut cx| {

0 commit comments

Comments
 (0)