Skip to content

Commit 50c462f

Browse files
committed
Add a short delay for the master queue consumer loop
1 parent 08753e0 commit 50c462f

File tree

1 file changed

+16
-17
lines changed
  • packages/redis-worker/src/fair-queue

1 file changed

+16
-17
lines changed

packages/redis-worker/src/fair-queue/index.ts

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -788,23 +788,22 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
788788
this.batchedSpanManager.markForRotation(loopId);
789789
}
790790

791-
// Only wait if there was no work (avoid spinning when idle)
792-
// When there's work, immediately process the next batch
793-
if (!hadWork) {
794-
await new Promise<void>((resolve, reject) => {
795-
const abortHandler = () => {
796-
clearTimeout(timeout);
797-
reject(new Error("AbortError"));
798-
};
799-
const timeout = setTimeout(() => {
800-
// Must remove listener when timeout fires, otherwise listeners accumulate
801-
// (the { once: true } option only removes on abort, not on timeout)
802-
this.abortController.signal.removeEventListener("abort", abortHandler);
803-
resolve();
804-
}, this.consumerIntervalMs);
805-
this.abortController.signal.addEventListener("abort", abortHandler, { once: true });
806-
});
807-
}
791+
// Wait between iterations to prevent CPU spin
792+
// Short delay when there's work (yield to event loop), longer delay when idle
793+
const waitMs = hadWork ? 1 : this.consumerIntervalMs;
794+
await new Promise<void>((resolve, reject) => {
795+
const abortHandler = () => {
796+
clearTimeout(timeout);
797+
reject(new Error("AbortError"));
798+
};
799+
const timeout = setTimeout(() => {
800+
// Must remove listener when timeout fires, otherwise listeners accumulate
801+
// (the { once: true } option only removes on abort, not on timeout)
802+
this.abortController.signal.removeEventListener("abort", abortHandler);
803+
resolve();
804+
}, waitMs);
805+
this.abortController.signal.addEventListener("abort", abortHandler, { once: true });
806+
});
808807
}
809808
} catch (error) {
810809
if (error instanceof Error && error.message === "AbortError") {

0 commit comments

Comments
 (0)