@@ -26,7 +26,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
2626use tokio_stream:: { Stream , StreamExt } ;
2727pub use tower:: { self , service_fn, Service } ;
2828use tower:: { util:: ServiceFn , ServiceExt } ;
29- use tracing:: { error, trace} ;
29+ use tracing:: { error, trace, Instrument } ;
3030
3131mod requests;
3232#[ cfg( test) ]
@@ -120,15 +120,6 @@ where
120120 continue ;
121121 }
122122
123- let body = hyper:: body:: to_bytes ( body) . await ?;
124- trace ! ( "response body - {}" , std:: str :: from_utf8( & body) ?) ;
125-
126- #[ cfg( debug_assertions) ]
127- if parts. status . is_server_error ( ) {
128- error ! ( "Lambda Runtime server returned an unexpected error" ) ;
129- return Err ( parts. status . to_string ( ) . into ( ) ) ;
130- }
131-
132123 let ctx: Context = Context :: try_from ( parts. headers ) ?;
133124 let ctx: Context = ctx. with_config ( config) ;
134125 let request_id = & ctx. request_id . clone ( ) ;
@@ -138,55 +129,80 @@ where
138129 Some ( trace_id) => env:: set_var ( "_X_AMZN_TRACE_ID" , trace_id) ,
139130 None => env:: remove_var ( "_X_AMZN_TRACE_ID" ) ,
140131 }
141- let body = match serde_json:: from_slice ( & body) {
142- Ok ( body) => body,
143- Err ( err) => {
144- let req = build_event_error_request ( request_id, err) ?;
145- client. call ( req) . await . expect ( "Unable to send response to Runtime APIs" ) ;
146- return Ok ( ( ) ) ;
147- }
132+ let request_span = match xray_trace_id {
133+ Some ( trace_id) => tracing:: span!(
134+ tracing:: Level :: INFO ,
135+ "Lambda runtime invoke" ,
136+ requestId = request_id,
137+ xrayTraceId = trace_id
138+ ) ,
139+ None => tracing:: span!( tracing:: Level :: INFO , "Lambda runtime invoke" , requestId = request_id) ,
148140 } ;
149141
150- let req = match handler. ready ( ) . await {
151- Ok ( handler) => {
152- // Catches panics outside of a `Future`
153- let task =
154- panic:: catch_unwind ( panic:: AssertUnwindSafe ( || handler. call ( LambdaEvent :: new ( body, ctx) ) ) ) ;
155-
156- let task = match task {
157- // Catches panics inside of the `Future`
158- Ok ( task) => panic:: AssertUnwindSafe ( task) . catch_unwind ( ) . await ,
159- Err ( err) => Err ( err) ,
160- } ;
161-
162- match task {
163- Ok ( response) => match response {
164- Ok ( response) => {
165- trace ! ( "Ok response from handler (run loop)" ) ;
166- EventCompletionRequest {
167- request_id,
168- body : response,
142+ // Group the handling in one future and instrument it with the span
143+ async {
144+ let body = hyper:: body:: to_bytes ( body) . await ?;
145+ trace ! ( "response body - {}" , std:: str :: from_utf8( & body) ?) ;
146+
147+ #[ cfg( debug_assertions) ]
148+ if parts. status . is_server_error ( ) {
149+ error ! ( "Lambda Runtime server returned an unexpected error" ) ;
150+ return Err ( parts. status . to_string ( ) . into ( ) ) ;
151+ }
152+
153+ let body = match serde_json:: from_slice ( & body) {
154+ Ok ( body) => body,
155+ Err ( err) => {
156+ let req = build_event_error_request ( request_id, err) ?;
157+ client. call ( req) . await . expect ( "Unable to send response to Runtime APIs" ) ;
158+ return Ok ( ( ) ) ;
159+ }
160+ } ;
161+
162+ let req = match handler. ready ( ) . await {
163+ Ok ( handler) => {
164+ // Catches panics outside of a `Future`
165+ let task =
166+ panic:: catch_unwind ( panic:: AssertUnwindSafe ( || handler. call ( LambdaEvent :: new ( body, ctx) ) ) ) ;
167+
168+ let task = match task {
169+ // Catches panics inside of the `Future`
170+ Ok ( task) => panic:: AssertUnwindSafe ( task) . catch_unwind ( ) . await ,
171+ Err ( err) => Err ( err) ,
172+ } ;
173+
174+ match task {
175+ Ok ( response) => match response {
176+ Ok ( response) => {
177+ trace ! ( "Ok response from handler (run loop)" ) ;
178+ EventCompletionRequest {
179+ request_id,
180+ body : response,
181+ }
182+ . into_req ( )
169183 }
170- . into_req ( )
184+ Err ( err) => build_event_error_request ( request_id, err) ,
185+ } ,
186+ Err ( err) => {
187+ error ! ( "{:?}" , err) ;
188+ let error_type = type_name_of_val ( & err) ;
189+ let msg = if let Some ( msg) = err. downcast_ref :: < & str > ( ) {
190+ format ! ( "Lambda panicked: {}" , msg)
191+ } else {
192+ "Lambda panicked" . to_string ( )
193+ } ;
194+ EventErrorRequest :: new ( request_id, error_type, & msg) . into_req ( )
171195 }
172- Err ( err) => build_event_error_request ( request_id, err) ,
173- } ,
174- Err ( err) => {
175- error ! ( "{:?}" , err) ;
176- let error_type = type_name_of_val ( & err) ;
177- let msg = if let Some ( msg) = err. downcast_ref :: < & str > ( ) {
178- format ! ( "Lambda panicked: {}" , msg)
179- } else {
180- "Lambda panicked" . to_string ( )
181- } ;
182- EventErrorRequest :: new ( request_id, error_type, & msg) . into_req ( )
183196 }
184197 }
185- }
186- Err ( err) => build_event_error_request ( request_id, err) ,
187- } ?;
198+ Err ( err) => build_event_error_request ( request_id, err) ,
199+ } ?;
188200
189- client. call ( req) . await . expect ( "Unable to send response to Runtime APIs" ) ;
201+ client. call ( req) . await . expect ( "Unable to send response to Runtime APIs" ) ;
202+ Ok :: < ( ) , Error > ( ( ) )
203+ }
204+ . instrument ( request_span)
205+ . await ?;
190206 }
191207 Ok ( ( ) )
192208 }
0 commit comments