Skip to content

Commit 2dff981

Browse files
ovrigorlukanin
authored andcommitted
fix(native): Be happy, don't panic when unable to schedule task to PyRuntime (#9983)
1 parent c76c5ef commit 2dff981

File tree

1 file changed

+46
-10
lines changed
  • packages/cubejs-backend-native/src/python

1 file changed

+46
-10
lines changed

packages/cubejs-backend-native/src/python/runtime.rs

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ enum PyScheduledFunResult {
4646

4747
pub struct PyRuntime {
4848
sender: tokio::sync::mpsc::Sender<PyScheduledFun>,
49+
js_channel: neon::event::Channel,
4950
}
5051

5152
impl PyRuntime {
@@ -55,16 +56,49 @@ impl PyRuntime {
5556
args: Vec<CLRepr>,
5657
deferred: Deferred,
5758
) {
58-
let res = self.sender.blocking_send(PyScheduledFun {
59+
// Try to reserve immediately for the fast path
60+
let permit = match self.sender.try_reserve() {
61+
Ok(permit) => permit,
62+
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
63+
log::warn!("Python channel is full, this may cause performance issues. Consider increasing the channel size for PyRuntime.");
64+
65+
// Channel is full, use async reserve with blocking for efficiency
66+
match futures::executor::block_on(self.sender.reserve()) {
67+
Ok(permit) => permit,
68+
Err(_) => {
69+
// Channel was closed while waiting
70+
deferred.settle_with(
71+
&self.js_channel,
72+
move |mut cx| -> NeonResult<Handle<JsError>> {
73+
cx.throw_error(
74+
"Unable to schedule python function call: channel is closed",
75+
)
76+
},
77+
);
78+
return;
79+
}
80+
}
81+
}
82+
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
83+
// Channel is closed, settle deferred with error
84+
deferred.settle_with(
85+
&self.js_channel,
86+
move |mut cx| -> NeonResult<Handle<JsError>> {
87+
cx.throw_error("Unable to schedule python function call: channel is closed")
88+
},
89+
);
90+
return;
91+
}
92+
};
93+
94+
let scheduled_fun = PyScheduledFun {
5995
fun,
6096
args,
6197
callback: PyScheduledCallback::NodeDeferred(deferred),
62-
});
63-
if let Err(err) = res {
64-
// TODO: We need to return this error to deferred, but for now
65-
// neon will handle this issue on Drop
66-
error!("Unable to schedule python function call: {}", err)
67-
}
98+
};
99+
100+
// This should never fail since we have a permit
101+
permit.send(scheduled_fun);
68102
}
69103

70104
pub async fn call_async(
@@ -202,7 +236,9 @@ impl PyRuntime {
202236

203237
trace!("New Python runtime");
204238

205-
std::thread::spawn(|| {
239+
let js_channel_clone = js_channel.clone();
240+
241+
std::thread::spawn(move || {
206242
trace!("Initializing executor in a separate thread");
207243

208244
std::thread::spawn(|| {
@@ -216,7 +252,7 @@ impl PyRuntime {
216252
if let Some(task) = receiver.recv().await {
217253
trace!("New task");
218254

219-
if let Err(err) = Self::process_task(task, &js_channel) {
255+
if let Err(err) = Self::process_task(task, &js_channel_clone) {
220256
error!("Error while processing python task: {:?}", err)
221257
};
222258
}
@@ -229,7 +265,7 @@ impl PyRuntime {
229265
}
230266
});
231267

232-
Self { sender }
268+
Self { sender, js_channel }
233269
}
234270
}
235271

0 commit comments

Comments
 (0)