Skip to content

Commit c9decd6

Browse files
committed
feat: add configurable timeout to stop() method
Enhance stop() method to wait for in-flight jobs to complete before returning, with configurable timeout option. Changes: - Add optional timeout parameter (default: 30 seconds) - Wait for all concurrent jobs to finish gracefully - Log timeout events and remaining job count - Provide predictable stop behavior for production use Usage: - await queue.stop() // 30s default timeout - await queue.stop({ timeout: 5000 }) // custom timeout This enables reliable graceful shutdown and prevents orphaned work during application restarts or shutdowns.
1 parent 9dff497 commit c9decd6

File tree

1 file changed

+23
-2
lines changed

1 file changed

+23
-2
lines changed

src/PrismaQueue.ts

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,13 +151,34 @@ export class PrismaQueue<
151151

152152
/**
153153
* Stops the job processing in the queue.
154+
* Waits for all in-flight jobs to complete before returning.
155+
* @param options - Stop options
156+
* @param options.timeout - Maximum time in milliseconds to wait for in-flight jobs (default: 30000)
154157
*/
155-
public async stop(): Promise<void> {
158+
public async stop(options: { timeout?: number } = {}): Promise<void> {
156159
const { pollInterval } = this.config;
160+
const { timeout = 30000 } = options;
157161
debug(`stopping queue named="${this.name}"...`);
158162
this.stopped = true;
159-
// Wait for the queue to stop
163+
164+
// Wait for the polling loop to notice the stop flag
160165
await waitFor(pollInterval);
166+
167+
// Wait for all in-flight jobs to complete
168+
const checkInterval = 100; // Check every 100ms
169+
const startTime = Date.now();
170+
171+
while (this.concurrency > 0) {
172+
if (Date.now() - startTime > timeout) {
173+
debug(
174+
`stop() timed out after ${timeout}ms waiting for ${this.concurrency} in-flight jobs to complete for queue named="${this.name}"`,
175+
);
176+
break;
177+
}
178+
await waitFor(checkInterval);
179+
}
180+
181+
debug(`queue named="${this.name}" stopped with ${this.concurrency} remaining jobs`);
161182
}
162183

163184
/**

0 commit comments

Comments
 (0)