Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 56 additions & 37 deletions packages/cubejs-backend-native/src/python/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::python::neon_py::*;
use crate::python::utils::PyAnyHelpers;
use crate::tokio_runtime_node;
use cubesql::CubeError;
use futures::FutureExt;
use log::{error, trace};
use neon::prelude::*;
use neon::types::Deferred;
Expand All @@ -11,6 +12,7 @@ use pyo3::prelude::*;
use pyo3::types::{PyFunction, PyTuple};
use std::fmt::Formatter;
use std::future::Future;
use std::panic;
use std::pin::Pin;

#[derive(Debug)]
Expand Down Expand Up @@ -129,30 +131,42 @@ impl PyRuntime {
) -> Result<(), CubeError> {
let (fun, args, callback) = task.split();

let task_result = Python::with_gil(move |py| -> PyResult<PyScheduledFunResult> {
let mut prep_tuple = Vec::with_capacity(args.len());
let mut py_kwargs = None;
let task_block = panic::AssertUnwindSafe(|| {
Python::with_gil(move |py| -> PyResult<PyScheduledFunResult> {
let mut prep_tuple = Vec::with_capacity(args.len());
let mut py_kwargs = None;

for arg in args {
if arg.is_kwarg() {
py_kwargs = Some(arg.into_py_dict(py)?);
} else {
prep_tuple.push(arg.into_py(py)?);
for arg in args {
if arg.is_kwarg() {
py_kwargs = Some(arg.into_py_dict(py)?);
} else {
prep_tuple.push(arg.into_py(py)?);
}
}
}

let py_args = PyTuple::new(py, prep_tuple);
let call_res = fun.call(py, py_args, py_kwargs)?;
let py_args = PyTuple::new(py, prep_tuple);
let call_res = fun.call(py, py_args, py_kwargs)?;

if call_res.is_coroutine()? {
let fut = pyo3_asyncio::tokio::into_future(call_res.as_ref(py))?;
Ok(PyScheduledFunResult::Poll(Box::pin(fut)))
} else {
Ok(PyScheduledFunResult::Ready(CLRepr::from_python_ref(
call_res.as_ref(py),
)?))
}
if call_res.is_coroutine()? {
let fut = pyo3_asyncio::tokio::into_future(call_res.as_ref(py))?;
Ok(PyScheduledFunResult::Poll(Box::pin(fut)))
} else {
Ok(PyScheduledFunResult::Ready(CLRepr::from_python_ref(
call_res.as_ref(py),
)?))
}
})
});

let task_result = match panic::catch_unwind(task_block) {
Ok(Ok(r)) => Ok(r),
Ok(Err(err)) => Err(CubeError::user(format_python_error(err))),
Err(panic_payload) => Err(CubeError::panic_with_message(
panic_payload,
"Unexpected panic while calling python function",
)),
};

let task_result = match task_result {
Ok(r) => r,
Err(err) => {
Expand All @@ -161,13 +175,12 @@ impl PyRuntime {
deferred.settle_with(
js_channel,
move |mut cx| -> NeonResult<Handle<JsError>> {
cx.throw_from_python_error(err)
cx.throw_error(err.to_string())
},
);
}
PyScheduledCallback::Channel(chan) => {
let send_res =
chan.send(Err(CubeError::internal(format_python_error(err))));
let send_res = chan.send(Err(err));
if send_res.is_err() {
return Err(CubeError::internal(
"Unable to send result back to consumer".to_string(),
Expand All @@ -185,31 +198,37 @@ impl PyRuntime {
let js_channel_to_move = js_channel.clone();

tokio::spawn(async move {
let fut_res = fut.await;
let safe_py_fut_poll = panic::AssertUnwindSafe(async {
let fut_res = fut.await;

let res = Python::with_gil(move |py| -> Result<CLRepr, PyErr> {
let res = match fut_res {
Ok(r) => CLRepr::from_python_ref(r.as_ref(py)),
Err(err) => Err(err),
};
Python::with_gil(move |py| -> Result<CLRepr, PyErr> {
let res = match fut_res {
Ok(r) => CLRepr::from_python_ref(r.as_ref(py)),
Err(err) => Err(err),
};

res
res
})
});

let fut_res = match safe_py_fut_poll.catch_unwind().await {
Ok(Ok(r)) => Ok(r),
Ok(Err(err)) => Err(CubeError::internal(format_python_error(err))),
Err(panic_payload) => Err(CubeError::panic_with_message(
panic_payload,
"Unexpected panic while polling python future",
)),
};

match callback {
PyScheduledCallback::NodeDeferred(deferred) => {
deferred.settle_with(&js_channel_to_move, |mut cx| match res {
Err(err) => cx.throw_error(format!("Python error: {}", err)),
deferred.settle_with(&js_channel_to_move, |mut cx| match fut_res {
Ok(r) => r.into_js(&mut cx),
Err(err) => cx.throw_error(format!("{}", err)),
});
}
PyScheduledCallback::Channel(chan) => {
let _ = match res {
Ok(r) => chan.send(Ok(r)),
Err(err) => {
chan.send(Err(CubeError::internal(format_python_error(err))))
}
};
let _ = chan.send(fut_res);
}
}
});
Expand Down
10 changes: 7 additions & 3 deletions rust/cubesql/cubesql/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,16 @@ impl CubeError {
}

pub fn panic(error: Box<dyn Any + Send>) -> Self {
Self::panic_with_message(error, "Unexpected panic")
}

pub fn panic_with_message(error: Box<dyn Any + Send>, message: &str) -> Self {
if let Some(reason) = error.downcast_ref::<&str>() {
CubeError::internal(format!("Unexpected panic. Reason: {}", reason))
CubeError::internal(format!("{}. Reason: {}", message, reason))
} else if let Some(reason) = error.downcast_ref::<String>() {
CubeError::internal(format!("Unexpected panic. Reason: {}", reason))
CubeError::internal(format!("{}. Reason: {}", message, reason))
} else {
CubeError::internal("Unexpected panic without reason".to_string())
CubeError::internal(format!("{} without reason", message))
}
}
}
Expand Down
Loading