@@ -50,6 +50,7 @@ use database::Transaction;
5050use deno_core:: v8;
5151use futures:: {
5252 channel:: mpsc,
53+ future:: BoxFuture ,
5354 select_biased,
5455 stream:: BoxStream ,
5556 Future ,
@@ -154,6 +155,7 @@ use crate::{
154155 } ,
155156 metrics:: {
156157 self ,
158+ log_isolate_request_cancelled,
157159 log_unawaited_pending_op,
158160 } ,
159161 ops:: OpProvider ,
@@ -257,6 +259,7 @@ impl<RT: Runtime> ActionEnvironment<RT> {
257259 isolate_clean : & mut bool ,
258260 validated_path : ValidatedHttpPath ,
259261 request : HttpActionRequest ,
262+ cancellation : BoxFuture < ' _ , ( ) > ,
260263 ) -> anyhow:: Result < HttpActionOutcome > {
261264 let client_id = Arc :: new ( client_id) ;
262265 let start_unix_timestamp = self . rt . unix_timestamp ( ) ;
@@ -278,6 +281,7 @@ impl<RT: Runtime> ActionEnvironment<RT> {
278281 & mut isolate_context,
279282 validated_path. canonicalized_udf_path ( ) ,
280283 request,
284+ cancellation,
281285 )
282286 . await ;
283287 // Override the returned result if we hit a termination error.
@@ -325,6 +329,7 @@ impl<RT: Runtime> ActionEnvironment<RT> {
325329 isolate : & mut RequestScope < ' _ , ' _ , RT , Self > ,
326330 router_path : & CanonicalizedUdfPath ,
327331 http_request : HttpActionRequest ,
332+ cancellation : BoxFuture < ' _ , ( ) > ,
328333 ) -> anyhow:: Result < ( HttpActionRoute , Result < HttpActionResponse , JsError > ) > {
329334 let handle = isolate. handle ( ) ;
330335 let mut v8_scope = isolate. scope ( ) ;
@@ -415,6 +420,7 @@ impl<RT: Runtime> ActionEnvironment<RT> {
415420 v8_function,
416421 & v8_args,
417422 Self :: collect_http_result,
423+ cancellation,
418424 )
419425 . await ?;
420426 Ok ( ( route, result) )
@@ -478,6 +484,7 @@ impl<RT: Runtime> ActionEnvironment<RT> {
478484 isolate : & mut Isolate < RT > ,
479485 isolate_clean : & mut bool ,
480486 request_params : ActionRequestParams ,
487+ cancellation : BoxFuture < ' _ , ( ) > ,
481488 ) -> anyhow:: Result < ActionOutcome > {
482489 let client_id = Arc :: new ( client_id) ;
483490 let start_unix_timestamp = self . rt . unix_timestamp ( ) ;
@@ -493,8 +500,13 @@ impl<RT: Runtime> ActionEnvironment<RT> {
493500 let mut isolate_context =
494501 RequestScope :: new ( & mut context_scope, handle. clone ( ) , state, true ) . await ?;
495502
496- let mut result =
497- Self :: run_action_inner ( client_id, & mut isolate_context, request_params. clone ( ) ) . await ;
503+ let mut result = Self :: run_action_inner (
504+ client_id,
505+ & mut isolate_context,
506+ request_params. clone ( ) ,
507+ cancellation,
508+ )
509+ . await ;
498510
499511 // Perform a microtask checkpoint one last time before taking the environment
500512 // to ensure the microtask queue is empty. Otherwise, JS from this request may
@@ -539,6 +551,7 @@ impl<RT: Runtime> ActionEnvironment<RT> {
539551 client_id : Arc < String > ,
540552 isolate : & mut RequestScope < ' _ , ' _ , RT , Self > ,
541553 request_params : ActionRequestParams ,
554+ cancellation : BoxFuture < ' _ , ( ) > ,
542555 ) -> anyhow:: Result < Result < ConvexValue , JsError > > {
543556 let handle = isolate. handle ( ) ;
544557 let mut v8_scope = isolate. scope ( ) ;
@@ -625,6 +638,7 @@ impl<RT: Runtime> ActionEnvironment<RT> {
625638 let result = deserialize_udf_result ( & udf_path, & result_str) ?;
626639 Ok ( async move { Ok ( result) } )
627640 } ,
641+ cancellation,
628642 )
629643 . await
630644 }
@@ -752,6 +766,7 @@ impl<RT: Runtime> ActionEnvironment<RT> {
752766 & mut ExecutionScope < ' a , ' b , RT , Self > ,
753767 String ,
754768 ) -> anyhow:: Result < Fut > ,
769+ cancellation : BoxFuture < ' _ , ( ) > ,
755770 ) -> anyhow:: Result < Result < T , JsError > >
756771 where
757772 Fut : Future < Output = anyhow:: Result < Result < T , JsError > > > + Send + ' static ,
@@ -784,6 +799,7 @@ impl<RT: Runtime> ActionEnvironment<RT> {
784799 // collecting a result. Using None would be nice, but `select_biased!`
785800 // does not like Options.
786801 let mut collecting_result = ( async { std:: future:: pending ( ) . await } ) . boxed ( ) . fuse ( ) ;
802+ let mut cancellation = cancellation. fuse ( ) ;
787803 let result = loop {
788804 // Advance the user's promise as far as it can go by draining the microtask
789805 // queue.
@@ -914,6 +930,10 @@ impl<RT: Runtime> ActionEnvironment<RT> {
914930 _ = timeout. fuse( ) => {
915931 continue ;
916932 } ,
933+ _ = cancellation => {
934+ log_isolate_request_cancelled( ) ;
935+ anyhow:: bail!( "Cancelled" ) ;
936+ } ,
917937 }
918938 let permit_acquire = scope. with_state_mut ( |state| {
919939 state
0 commit comments