@@ -3,6 +3,7 @@ use crate::python::neon_py::*;
33use crate :: python:: utils:: PyAnyHelpers ;
44use crate :: tokio_runtime_node;
55use cubesql:: CubeError ;
6+ use futures:: FutureExt ;
67use log:: { error, trace} ;
78use neon:: prelude:: * ;
89use neon:: types:: Deferred ;
@@ -11,6 +12,7 @@ use pyo3::prelude::*;
1112use pyo3:: types:: { PyFunction , PyTuple } ;
1213use std:: fmt:: Formatter ;
1314use std:: future:: Future ;
15+ use std:: panic;
1416use std:: pin:: Pin ;
1517
1618#[ derive( Debug ) ]
@@ -129,30 +131,42 @@ impl PyRuntime {
129131 ) -> Result < ( ) , CubeError > {
130132 let ( fun, args, callback) = task. split ( ) ;
131133
132- let task_result = Python :: with_gil ( move |py| -> PyResult < PyScheduledFunResult > {
133- let mut prep_tuple = Vec :: with_capacity ( args. len ( ) ) ;
134- let mut py_kwargs = None ;
134+ let task_block = panic:: AssertUnwindSafe ( || {
135+ Python :: with_gil ( move |py| -> PyResult < PyScheduledFunResult > {
136+ let mut prep_tuple = Vec :: with_capacity ( args. len ( ) ) ;
137+ let mut py_kwargs = None ;
135138
136- for arg in args {
137- if arg. is_kwarg ( ) {
138- py_kwargs = Some ( arg. into_py_dict ( py) ?) ;
139- } else {
140- prep_tuple. push ( arg. into_py ( py) ?) ;
139+ for arg in args {
140+ if arg. is_kwarg ( ) {
141+ py_kwargs = Some ( arg. into_py_dict ( py) ?) ;
142+ } else {
143+ prep_tuple. push ( arg. into_py ( py) ?) ;
144+ }
141145 }
142- }
143146
144- let py_args = PyTuple :: new ( py, prep_tuple) ;
145- let call_res = fun. call ( py, py_args, py_kwargs) ?;
147+ let py_args = PyTuple :: new ( py, prep_tuple) ;
148+ let call_res = fun. call ( py, py_args, py_kwargs) ?;
146149
147- if call_res. is_coroutine ( ) ? {
148- let fut = pyo3_asyncio:: tokio:: into_future ( call_res. as_ref ( py) ) ?;
149- Ok ( PyScheduledFunResult :: Poll ( Box :: pin ( fut) ) )
150- } else {
151- Ok ( PyScheduledFunResult :: Ready ( CLRepr :: from_python_ref (
152- call_res. as_ref ( py) ,
153- ) ?) )
154- }
150+ if call_res. is_coroutine ( ) ? {
151+ let fut = pyo3_asyncio:: tokio:: into_future ( call_res. as_ref ( py) ) ?;
152+ Ok ( PyScheduledFunResult :: Poll ( Box :: pin ( fut) ) )
153+ } else {
154+ Ok ( PyScheduledFunResult :: Ready ( CLRepr :: from_python_ref (
155+ call_res. as_ref ( py) ,
156+ ) ?) )
157+ }
158+ } )
155159 } ) ;
160+
161+ let task_result = match panic:: catch_unwind ( task_block) {
162+ Ok ( Ok ( r) ) => Ok ( r) ,
163+ Ok ( Err ( err) ) => Err ( CubeError :: user ( format_python_error ( err) ) ) ,
164+ Err ( panic_payload) => Err ( CubeError :: panic_with_message (
165+ panic_payload,
166+ "Unexpected panic while calling python function" ,
167+ ) ) ,
168+ } ;
169+
156170 let task_result = match task_result {
157171 Ok ( r) => r,
158172 Err ( err) => {
@@ -161,13 +175,12 @@ impl PyRuntime {
161175 deferred. settle_with (
162176 js_channel,
163177 move |mut cx| -> NeonResult < Handle < JsError > > {
164- cx. throw_from_python_error ( err)
178+ cx. throw_error ( err. to_string ( ) )
165179 } ,
166180 ) ;
167181 }
168182 PyScheduledCallback :: Channel ( chan) => {
169- let send_res =
170- chan. send ( Err ( CubeError :: internal ( format_python_error ( err) ) ) ) ;
183+ let send_res = chan. send ( Err ( err) ) ;
171184 if send_res. is_err ( ) {
172185 return Err ( CubeError :: internal (
173186 "Unable to send result back to consumer" . to_string ( ) ,
@@ -185,31 +198,37 @@ impl PyRuntime {
185198 let js_channel_to_move = js_channel. clone ( ) ;
186199
187200 tokio:: spawn ( async move {
188- let fut_res = fut. await ;
201+ let safe_py_fut_poll = panic:: AssertUnwindSafe ( async {
202+ let fut_res = fut. await ;
189203
190- let res = Python :: with_gil ( move |py| -> Result < CLRepr , PyErr > {
191- let res = match fut_res {
192- Ok ( r) => CLRepr :: from_python_ref ( r. as_ref ( py) ) ,
193- Err ( err) => Err ( err) ,
194- } ;
204+ Python :: with_gil ( move |py| -> Result < CLRepr , PyErr > {
205+ let res = match fut_res {
206+ Ok ( r) => CLRepr :: from_python_ref ( r. as_ref ( py) ) ,
207+ Err ( err) => Err ( err) ,
208+ } ;
195209
196- res
210+ res
211+ } )
197212 } ) ;
198213
214+ let fut_res = match safe_py_fut_poll. catch_unwind ( ) . await {
215+ Ok ( Ok ( r) ) => Ok ( r) ,
216+ Ok ( Err ( err) ) => Err ( CubeError :: internal ( format_python_error ( err) ) ) ,
217+ Err ( panic_payload) => Err ( CubeError :: panic_with_message (
218+ panic_payload,
219+ "Unexpected panic while polling python future" ,
220+ ) ) ,
221+ } ;
222+
199223 match callback {
200224 PyScheduledCallback :: NodeDeferred ( deferred) => {
201- deferred. settle_with ( & js_channel_to_move, |mut cx| match res {
202- Err ( err) => cx. throw_error ( format ! ( "Python error: {}" , err) ) ,
225+ deferred. settle_with ( & js_channel_to_move, |mut cx| match fut_res {
203226 Ok ( r) => r. into_js ( & mut cx) ,
227+ Err ( err) => cx. throw_error ( format ! ( "{}" , err) ) ,
204228 } ) ;
205229 }
206230 PyScheduledCallback :: Channel ( chan) => {
207- let _ = match res {
208- Ok ( r) => chan. send ( Ok ( r) ) ,
209- Err ( err) => {
210- chan. send ( Err ( CubeError :: internal ( format_python_error ( err) ) ) )
211- }
212- } ;
231+ let _ = chan. send ( fut_res) ;
213232 }
214233 }
215234 } ) ;
0 commit comments