@@ -31,6 +31,7 @@ import * as utils from '../utils';
3131
3232const abortSchedulingLoopReason = Symbol ( 'abort scheduling loop reason' ) ;
3333const abortQueuingLoopReason = Symbol ( 'abort queuing loop reason' ) ;
34+ const cancelTimerReason = Symbol ( 'cancel timer reason' ) ;
3435
3536interface TaskManager extends CreateDestroyStartStop { }
3637@CreateDestroyStartStop (
@@ -51,20 +52,23 @@ class TaskManager {
5152 handlers = { } ,
5253 lazy = false ,
5354 activeLimit = Infinity ,
55+ stopWarningTimeout = 10000 ,
5456 logger = new Logger ( this . name ) ,
5557 fresh = false ,
5658 } : {
5759 db : DB ;
5860 handlers ?: Record < TaskHandlerId , TaskHandler > ;
5961 lazy ?: boolean ;
6062 activeLimit ?: number ;
63+ stopWarningTimeout ?: number ;
6164 logger ?: Logger ;
6265 fresh ?: boolean ;
6366 } ) {
6467 logger . info ( `Creating ${ this . name } ` ) ;
6568 const tasks = new this ( {
6669 db,
6770 activeLimit,
71+ stopWarningTimeout,
6872 logger,
6973 } ) ;
7074 await tasks . start ( {
@@ -82,6 +86,7 @@ class TaskManager {
8286 protected db : DB ;
8387 protected handlers : Map < TaskHandlerId , TaskHandler > = new Map ( ) ;
8488 protected activeLimit : number ;
89+ protected stopWarningTimeout : number ;
8590 protected generateTaskId : ( ) => TaskId ;
8691 protected taskPromises : Map < TaskIdEncoded , PromiseCancellable < any > > =
8792 new Map ( ) ;
@@ -170,17 +175,20 @@ class TaskManager {
170175 public constructor ( {
171176 db,
172177 activeLimit,
178+ stopWarningTimeout,
173179 logger,
174180 } : {
175181 db : DB ;
176182 activeLimit : number ;
183+ stopWarningTimeout : number ;
177184 logger : Logger ;
178185 } ) {
179186 this . logger = logger ;
180187 this . schedulerLogger = logger . getChild ( 'scheduler' ) ;
181188 this . queueLogger = logger . getChild ( 'queue' ) ;
182189 this . db = db ;
183190 this . activeLimit = Math . max ( 1 , activeLimit ) ;
191+ this . stopWarningTimeout = stopWarningTimeout ;
184192 }
185193
186194 public async start ( {
@@ -246,7 +254,9 @@ class TaskManager {
246254 * This call is idempotent
247255 */
248256 public async stopProcessing ( ) : Promise < void > {
257+ this . logger . info ( 'Stopping Processing' ) ;
249258 await Promise . all ( [ this . stopQueueing ( ) , this . stopScheduling ( ) ] ) ;
259+ this . logger . info ( 'Stopped Processing' ) ;
250260 }
251261
252262 public isProcessing ( ) : boolean {
@@ -258,10 +268,32 @@ class TaskManager {
258268 * This call is idempotent
259269 */
260270 public async stopTasks ( ) : Promise < void > {
261- for ( const [ , activePromise ] of this . activePromises ) {
262- activePromise . cancel ( new tasksErrors . ErrorTaskStop ( ) ) ;
271+ this . logger . info ( 'Stopping Tasks' ) ;
272+ const watchdogTimer = new Timer ( {
273+ handler : async ( ) => {
274+ for ( const [ id ] of this . activePromises ) {
275+ const task = await this . getTask ( tasksUtils . decodeTaskId ( id ) ! ) ;
276+ if ( task == null ) continue ;
277+ this . logger . warn (
278+ `Failed to stop task (${ task . handlerId } ) after ${ this . stopWarningTimeout } ms` ,
279+ ) ;
280+ }
281+ } ,
282+ delay : this . stopWarningTimeout ,
283+ } ) ;
284+ try {
285+ for ( const [ , activePromise ] of this . activePromises ) {
286+ activePromise . cancel ( new tasksErrors . ErrorTaskStop ( ) ) ;
287+ }
288+ await Promise . allSettled ( this . activePromises . values ( ) ) ;
289+ } finally {
290+ watchdogTimer . cancel ( cancelTimerReason ) ;
291+ await watchdogTimer . catch ( ( e ) => {
292+ // Ignore cancellation reason
293+ if ( e !== cancelTimerReason ) throw e ;
294+ } ) ;
263295 }
264- await Promise . allSettled ( this . activePromises . values ( ) ) ;
296+ this . logger . info ( 'Stopped Tasks' ) ;
265297 }
266298
267299 public getHandler ( handlerId : TaskHandlerId ) : TaskHandler | undefined {
0 commit comments