@@ -72,6 +72,11 @@ export class Context extends RpcTarget {
7272 }
7373
7474 const status = await this . #engine. getStatus ( ) ;
75+
76+ if ( status === InstanceStatus . Paused ) {
77+ throw new Error ( ABORT_REASONS . USER_PAUSE ) ;
78+ }
79+
7580 if ( status === InstanceStatus . WaitingForPause ) {
7681 await this . #state. storage . put ( PAUSE_DATETIME , new Date ( ) ) ;
7782 const metadata =
@@ -83,7 +88,7 @@ export class Context extends RpcTarget {
8388 InstanceStatus . Paused
8489 ) ;
8590 }
86- await this . #engine . abort ( ABORT_REASONS . USER_PAUSE ) ;
91+ throw new Error ( ABORT_REASONS . USER_PAUSE ) ;
8792 }
8893 }
8994
@@ -511,8 +516,34 @@ export class Context extends RpcTarget {
511516 type : "retry" ,
512517 } ) ;
513518 await this . #engine. timeoutHandler . release ( this . #engine) ;
514- // this may never finish because of the grace period - but waker will take of it
515- await scheduler . wait ( durationMs ) ;
519+ // Race retry wait against the pause signal so pause
520+ // takes effect immediately during retries
521+ {
522+ const retryPauseSignal = this . #engine. pauseController . signal ;
523+ let pausedDuringRetry = false ;
524+ await Promise . race ( [
525+ scheduler . wait ( durationMs ) ,
526+ new Promise < void > ( ( resolve ) => {
527+ if ( retryPauseSignal . aborted ) {
528+ resolve ( ) ;
529+ return ;
530+ }
531+ retryPauseSignal . addEventListener ( "abort" , ( ) => resolve ( ) , {
532+ once : true ,
533+ } ) ;
534+ } ) ,
535+ ] ) ;
536+ const retryStatus = await this . #engine. getStatus ( ) ;
537+ if (
538+ retryStatus === InstanceStatus . Paused ||
539+ retryStatus === InstanceStatus . WaitingForPause
540+ ) {
541+ pausedDuringRetry = true ;
542+ }
543+ if ( pausedDuringRetry ) {
544+ throw new Error ( ABORT_REASONS . USER_PAUSE ) ;
545+ }
546+ }
516547
517548 // if it ever reaches here, we can try to remove it from the priority queue since it's no longer useful
518549 // @ts -expect-error priorityQueue is initiated in init
@@ -634,8 +665,37 @@ export class Context extends RpcTarget {
634665 type : "sleep" ,
635666 } ) ;
636667
637- // this probably will never finish except if sleep is less than the grace period
638- await scheduler . wait ( disableSleep ? 0 : duration ) ;
668+ // Race the sleep against the pause signal
669+ const pauseSignal = this . #engine. pauseController . signal ;
670+ const sleepDuration = disableSleep ? 0 : duration ;
671+
672+ let pausedDuringSleep = false ;
673+ await Promise . race ( [
674+ scheduler . wait ( sleepDuration ) ,
675+ new Promise < void > ( ( resolve ) => {
676+ if ( pauseSignal . aborted ) {
677+ resolve ( ) ;
678+ return ;
679+ }
680+ pauseSignal . addEventListener ( "abort" , ( ) => resolve ( ) , {
681+ once : true ,
682+ } ) ;
683+ } ) ,
684+ ] ) ;
685+
686+ // Check if we were paused during the sleep
687+ const statusAfterSleep = await this . #engine. getStatus ( ) ;
688+ if (
689+ statusAfterSleep === InstanceStatus . Paused ||
690+ statusAfterSleep === InstanceStatus . WaitingForPause
691+ ) {
692+ pausedDuringSleep = true ;
693+ }
694+
695+ if ( pausedDuringSleep ) {
696+ // Throw pause error
697+ throw new Error ( ABORT_REASONS . USER_PAUSE ) ;
698+ }
639699
640700 this . #engine. writeLog (
641701 InstanceEvent . SLEEP_COMPLETE ,
@@ -806,33 +866,49 @@ export class Context extends RpcTarget {
806866 this . #engine. waiters . set ( options . type , callbacks ) ;
807867 } ) ;
808868
809- const result = await Promise . race ( [
869+ // Race event, timeout, and pause signal. The pause promise resolves
870+ // when the race settles via event/timeout before the pause signal fires
871+ const pauseSignal = this . #engine. pauseController . signal ;
872+ const pausePromise = new Promise < void > ( ( resolve ) => {
873+ if ( pauseSignal . aborted ) {
874+ resolve ( ) ;
875+ return ;
876+ }
877+ pauseSignal . addEventListener ( "abort" , ( ) => resolve ( ) , {
878+ once : true ,
879+ } ) ;
880+ } ) ;
881+
882+ const raceResult = await Promise . race ( [
810883 eventPromise ,
811884 timeoutEntryPQ !== undefined
812885 ? timeoutPromise ( timeoutEntryPQ . targetTimestamp - Date . now ( ) , false )
813886 : timeoutPromise ( ms ( options . timeout ) , true ) ,
814- ] )
815- . then ( async ( event ) => {
816- this . #engine. writeLog (
817- InstanceEvent . WAIT_COMPLETE ,
818- cacheKey ,
819- waitForEventNameWithCounter ,
820- event as Event
821- ) ;
822- await this . #state. storage . put ( waitForEventKey , event ) ;
823- return event ;
824- } )
825- . catch ( async ( error ) => {
826- this . #engine. writeLog (
827- InstanceEvent . WAIT_TIMED_OUT ,
828- cacheKey ,
829- waitForEventNameWithCounter ,
830- error
831- ) ;
832- await this . #state. storage . put ( errorKey , error ) ;
833- throw error ;
834- } ) ;
887+ pausePromise ,
888+ ] ) . catch ( async ( error ) => {
889+ this . #engine. writeLog (
890+ InstanceEvent . WAIT_TIMED_OUT ,
891+ cacheKey ,
892+ waitForEventNameWithCounter ,
893+ error
894+ ) ;
895+ await this . #state. storage . put ( errorKey , error ) ;
896+ throw error ;
897+ } ) ;
898+
899+ // Pause signal won the race — throw to stop the workflow
900+ if ( raceResult === undefined ) {
901+ throw new Error ( ABORT_REASONS . USER_PAUSE ) ;
902+ }
903+
904+ this . #engine. writeLog (
905+ InstanceEvent . WAIT_COMPLETE ,
906+ cacheKey ,
907+ waitForEventNameWithCounter ,
908+ raceResult as Event
909+ ) ;
910+ await this . #state. storage . put ( waitForEventKey , raceResult ) ;
835911
836- return result as WorkflowStepEvent < T > ;
912+ return raceResult as WorkflowStepEvent < T > ;
837913 }
838914}
0 commit comments