@@ -13,7 +13,7 @@ use std::{
1313 convert:: { TryFrom , TryInto } ,
1414 env, fmt,
1515 future:: Future ,
16- sync :: Arc ,
16+ panic ,
1717} ;
1818use tokio:: io:: { AsyncRead , AsyncWrite } ;
1919use tokio_stream:: { Stream , StreamExt } ;
@@ -96,7 +96,7 @@ pub struct HandlerFn<F> {
9696impl < F , A , B , Error , Fut > Handler < A , B > for HandlerFn < F >
9797where
9898 F : Fn ( A , Context ) -> Fut ,
99- Fut : Future < Output = Result < B , Error > > + Send ,
99+ Fut : Future < Output = Result < B , Error > > ,
100100 Error : Into < Box < dyn std:: error:: Error + Send + Sync + ' static > > + fmt:: Display ,
101101{
102102 type Error = Error ;
@@ -139,14 +139,13 @@ where
139139 config : & Config ,
140140 ) -> Result < ( ) , Error >
141141 where
142- F : Handler < A , B > + Send + Sync + ' static ,
143- <F as Handler < A , B > >:: Fut : Future < Output = Result < B , <F as Handler < A , B > >:: Error > > + Send + ' static ,
144- <F as Handler < A , B > >:: Error : fmt:: Display + Send + Sync + ' static ,
145- A : for < ' de > Deserialize < ' de > + Send + Sync + ' static ,
146- B : Serialize + Send + Sync + ' static ,
142+ F : Handler < A , B > ,
143+ <F as Handler < A , B > >:: Fut : Future < Output = Result < B , <F as Handler < A , B > >:: Error > > ,
144+ <F as Handler < A , B > >:: Error : fmt:: Display ,
145+ A : for < ' de > Deserialize < ' de > ,
146+ B : Serialize ,
147147 {
148148 let client = & self . client ;
149- let handler = Arc :: new ( handler) ;
150149 tokio:: pin!( incoming) ;
151150 while let Some ( event) = incoming. next ( ) . await {
152151 trace ! ( "New event arrived (run loop)" ) ;
@@ -159,12 +158,10 @@ where
159158 trace ! ( "{}" , std:: str :: from_utf8( & body) ?) ; // this may be very verbose
160159 let body = serde_json:: from_slice ( & body) ?;
161160
162- let handler = Arc :: clone ( & handler) ;
163161 let request_id = & ctx. request_id . clone ( ) ;
164- #[ allow( clippy:: async_yields_async) ]
165- let task = tokio:: spawn ( async move { handler. call ( body, ctx) } ) ;
162+ let task = panic:: catch_unwind ( panic:: AssertUnwindSafe ( || handler. call ( body, ctx) ) ) ;
166163
167- let req = match task. await {
164+ let req = match task {
168165 Ok ( response) => match response. await {
169166 Ok ( response) => {
170167 trace ! ( "Ok response from handler (run loop)" ) ;
@@ -186,18 +183,21 @@ where
186183 . into_req ( )
187184 }
188185 } ,
189- Err ( err) if err . is_panic ( ) => {
186+ Err ( err) => {
190187 error ! ( "{:?}" , err) ; // inconsistent with other log record formats - to be reviewed
191188 EventErrorRequest {
192189 request_id,
193190 diagnostic : Diagnostic {
194191 error_type : type_name_of_val ( & err) . to_owned ( ) ,
195- error_message : format ! ( "Lambda panicked: {}" , err) ,
192+ error_message : if let Some ( msg) = err. downcast_ref :: < & str > ( ) {
193+ format ! ( "Lambda panicked: {}" , msg)
194+ } else {
195+ "Lambda panicked" . to_string ( )
196+ } ,
196197 } ,
197198 }
198199 . into_req ( )
199200 }
200- Err ( _) => unreachable ! ( "tokio::task should not be canceled" ) ,
201201 } ;
202202 let req = req?;
203203 client. call ( req) . await . expect ( "Unable to send response to Runtime APIs" ) ;
@@ -296,11 +296,11 @@ where
296296/// ```
297297pub async fn run < A , B , F > ( handler : F ) -> Result < ( ) , Error >
298298where
299- F : Handler < A , B > + Send + Sync + ' static ,
300- <F as Handler < A , B > >:: Fut : Future < Output = Result < B , <F as Handler < A , B > >:: Error > > + Send + ' static ,
301- <F as Handler < A , B > >:: Error : fmt:: Display + Send + Sync + ' static ,
302- A : for < ' de > Deserialize < ' de > + Send + Sync + ' static ,
303- B : Serialize + Send + Sync + ' static ,
299+ F : Handler < A , B > ,
300+ <F as Handler < A , B > >:: Fut : Future < Output = Result < B , <F as Handler < A , B > >:: Error > > ,
301+ <F as Handler < A , B > >:: Error : fmt:: Display ,
302+ A : for < ' de > Deserialize < ' de > ,
303+ B : Serialize ,
304304{
305305 trace ! ( "Loading config from env" ) ;
306306 let config = Config :: from_env ( ) ?;
0 commit comments