@@ -23,6 +23,7 @@ import {
2323 TaskRunExecution ,
2424 timeout ,
2525 TriggerConfig ,
26+ UsageMeasurement ,
2627 waitUntil ,
2728 WorkerManifest ,
2829 WorkerToExecutorMessageCatalog ,
@@ -232,7 +233,10 @@ async function bootstrap() {
232233
233234let _execution : TaskRunExecution | undefined ;
234235let _isRunning = false ;
236+ let _isCancelled = false ;
235237let _tracingSDK : TracingSDK | undefined ;
238+ let _executionMeasurement : UsageMeasurement | undefined ;
239+ const cancelController = new AbortController ( ) ;
236240
237241const zodIpc = new ZodIpcConnection ( {
238242 listenSchema : WorkerToExecutorMessageCatalog ,
@@ -403,18 +407,33 @@ const zodIpc = new ZodIpcConnection({
403407 getNumberEnvVar ( "TRIGGER_RUN_METADATA_FLUSH_INTERVAL" , 1000 )
404408 ) ;
405409
406- const measurement = usage . start ( ) ;
410+ _executionMeasurement = usage . start ( ) ;
407411
408412 // This lives outside of the executor because this will eventually be moved to the controller level
409- const signal = execution . run . maxDuration
410- ? timeout . abortAfterTimeout ( execution . run . maxDuration )
411- : undefined ;
413+ const timeoutController = timeout . abortAfterTimeout ( execution . run . maxDuration ) ;
412414
413- const { result } = await executor . execute ( execution , metadata , traceContext , signal ) ;
415+ timeoutController . signal . addEventListener ( "abort" , ( ) => {
416+ if ( _isCancelled ) {
417+ return ;
418+ }
414419
415- const usageSample = usage . stop ( measurement ) ;
420+ if ( cancelController . signal . aborted ) {
421+ return ;
422+ }
423+
424+ cancelController . abort ( timeoutController . signal . reason ) ;
425+ } ) ;
426+
427+ const { result } = await executor . execute (
428+ execution ,
429+ metadata ,
430+ traceContext ,
431+ cancelController . signal
432+ ) ;
433+
434+ if ( _isRunning && ! _isCancelled ) {
435+ const usageSample = usage . stop ( _executionMeasurement ) ;
416436
417- if ( _isRunning ) {
418437 return sender . send ( "TASK_RUN_COMPLETED" , {
419438 execution,
420439 result : {
@@ -458,7 +477,16 @@ const zodIpc = new ZodIpcConnection({
458477 WAIT_COMPLETED_NOTIFICATION : async ( ) => {
459478 await managedWorkerRuntime . completeWaitpoints ( [ ] ) ;
460479 } ,
461- FLUSH : async ( { timeoutInMs } , sender ) => {
480+ CANCEL : async ( { timeoutInMs } ) => {
481+ _isCancelled = true ;
482+ cancelController . abort ( "run cancelled" ) ;
483+ await callCancelHooks ( timeoutInMs ) ;
484+ if ( _executionMeasurement ) {
485+ usage . stop ( _executionMeasurement ) ;
486+ }
487+ await flushAll ( timeoutInMs ) ;
488+ } ,
489+ FLUSH : async ( { timeoutInMs } ) => {
462490 await flushAll ( timeoutInMs ) ;
463491 } ,
464492 WAITPOINT_CREATED : async ( { wait, waitpoint } ) => {
@@ -470,6 +498,18 @@ const zodIpc = new ZodIpcConnection({
470498 } ,
471499} ) ;
472500
501+ async function callCancelHooks ( timeoutInMs : number = 10_000 ) {
502+ const now = performance . now ( ) ;
503+
504+ try {
505+ await Promise . race ( [ lifecycleHooks . callOnCancelHookListeners ( ) , setTimeout ( timeoutInMs ) ] ) ;
506+ } finally {
507+ const duration = performance . now ( ) - now ;
508+
509+ log ( `Called cancel hooks in ${ duration } ms` ) ;
510+ }
511+ }
512+
473513async function flushAll ( timeoutInMs : number = 10_000 ) {
474514 const now = performance . now ( ) ;
475515
0 commit comments