diff --git a/packages/cubejs-backend-native/src/python/runtime.rs b/packages/cubejs-backend-native/src/python/runtime.rs index 00f0ea55fdb24..74535746ee693 100644 --- a/packages/cubejs-backend-native/src/python/runtime.rs +++ b/packages/cubejs-backend-native/src/python/runtime.rs @@ -3,6 +3,7 @@ use crate::python::neon_py::*; use crate::python::utils::PyAnyHelpers; use crate::tokio_runtime_node; use cubesql::CubeError; +use futures::FutureExt; use log::{error, trace}; use neon::prelude::*; use neon::types::Deferred; @@ -11,6 +12,7 @@ use pyo3::prelude::*; use pyo3::types::{PyFunction, PyTuple}; use std::fmt::Formatter; use std::future::Future; +use std::panic; use std::pin::Pin; #[derive(Debug)] @@ -129,30 +131,42 @@ impl PyRuntime { ) -> Result<(), CubeError> { let (fun, args, callback) = task.split(); - let task_result = Python::with_gil(move |py| -> PyResult { - let mut prep_tuple = Vec::with_capacity(args.len()); - let mut py_kwargs = None; + let task_block = panic::AssertUnwindSafe(|| { + Python::with_gil(move |py| -> PyResult { + let mut prep_tuple = Vec::with_capacity(args.len()); + let mut py_kwargs = None; - for arg in args { - if arg.is_kwarg() { - py_kwargs = Some(arg.into_py_dict(py)?); - } else { - prep_tuple.push(arg.into_py(py)?); + for arg in args { + if arg.is_kwarg() { + py_kwargs = Some(arg.into_py_dict(py)?); + } else { + prep_tuple.push(arg.into_py(py)?); + } } - } - let py_args = PyTuple::new(py, prep_tuple); - let call_res = fun.call(py, py_args, py_kwargs)?; + let py_args = PyTuple::new(py, prep_tuple); + let call_res = fun.call(py, py_args, py_kwargs)?; - if call_res.is_coroutine()? { - let fut = pyo3_asyncio::tokio::into_future(call_res.as_ref(py))?; - Ok(PyScheduledFunResult::Poll(Box::pin(fut))) - } else { - Ok(PyScheduledFunResult::Ready(CLRepr::from_python_ref( - call_res.as_ref(py), - )?)) - } + if call_res.is_coroutine()? { + let fut = pyo3_asyncio::tokio::into_future(call_res.as_ref(py))?; + Ok(PyScheduledFunResult::Poll(Box::pin(fut))) + } else { + Ok(PyScheduledFunResult::Ready(CLRepr::from_python_ref( + call_res.as_ref(py), + )?)) + } + }) }); + + let task_result = match panic::catch_unwind(task_block) { + Ok(Ok(r)) => Ok(r), + Ok(Err(err)) => Err(CubeError::user(format_python_error(err))), + Err(panic_payload) => Err(CubeError::panic_with_message( + panic_payload, + "Unexpected panic while calling python function", + )), + }; + let task_result = match task_result { Ok(r) => r, Err(err) => { @@ -161,13 +175,12 @@ impl PyRuntime { deferred.settle_with( js_channel, move |mut cx| -> NeonResult> { - cx.throw_from_python_error(err) + cx.throw_error(err.to_string()) }, ); } PyScheduledCallback::Channel(chan) => { - let send_res = - chan.send(Err(CubeError::internal(format_python_error(err)))); + let send_res = chan.send(Err(err)); if send_res.is_err() { return Err(CubeError::internal( "Unable to send result back to consumer".to_string(), @@ -185,31 +198,37 @@ impl PyRuntime { let js_channel_to_move = js_channel.clone(); tokio::spawn(async move { - let fut_res = fut.await; + let safe_py_fut_poll = panic::AssertUnwindSafe(async { + let fut_res = fut.await; - let res = Python::with_gil(move |py| -> Result { - let res = match fut_res { - Ok(r) => CLRepr::from_python_ref(r.as_ref(py)), - Err(err) => Err(err), - }; + Python::with_gil(move |py| -> Result { + let res = match fut_res { + Ok(r) => CLRepr::from_python_ref(r.as_ref(py)), + Err(err) => Err(err), + }; - res + res + }) }); + let fut_res = match safe_py_fut_poll.catch_unwind().await { + Ok(Ok(r)) => Ok(r), + Ok(Err(err)) => Err(CubeError::internal(format_python_error(err))), + Err(panic_payload) => Err(CubeError::panic_with_message( + panic_payload, + "Unexpected panic while polling python future", + )), + }; + match callback { PyScheduledCallback::NodeDeferred(deferred) => { - deferred.settle_with(&js_channel_to_move, |mut cx| match res { - Err(err) => cx.throw_error(format!("Python error: {}", err)), + deferred.settle_with(&js_channel_to_move, |mut cx| match fut_res { Ok(r) => r.into_js(&mut cx), + Err(err) => cx.throw_error(format!("{}", err)), }); } PyScheduledCallback::Channel(chan) => { - let _ = match res { - Ok(r) => chan.send(Ok(r)), - Err(err) => { - chan.send(Err(CubeError::internal(format_python_error(err)))) - } - }; + let _ = chan.send(fut_res); } } }); diff --git a/rust/cubesql/cubesql/src/error.rs b/rust/cubesql/cubesql/src/error.rs index e518f259b068e..d67b36f30fde7 100644 --- a/rust/cubesql/cubesql/src/error.rs +++ b/rust/cubesql/cubesql/src/error.rs @@ -51,12 +51,16 @@ impl CubeError { } pub fn panic(error: Box) -> Self { + Self::panic_with_message(error, "Unexpected panic") + } + + pub fn panic_with_message(error: Box, message: &str) -> Self { if let Some(reason) = error.downcast_ref::<&str>() { - CubeError::internal(format!("Unexpected panic. Reason: {}", reason)) + CubeError::internal(format!("{}. Reason: {}", message, reason)) } else if let Some(reason) = error.downcast_ref::() { - CubeError::internal(format!("Unexpected panic. Reason: {}", reason)) + CubeError::internal(format!("{}. Reason: {}", message, reason)) } else { - CubeError::internal("Unexpected panic without reason".to_string()) + CubeError::internal(format!("{} without reason", message)) } } }