Skip to content

Commit 6f14c38

Browse files
committed
more eager dequeuing, queue cooloff periods, return workerQueueLength when dequeueing
1 parent 96243ef commit 6f14c38

File tree

5 files changed

+314
-55
lines changed

5 files changed

+314
-55
lines changed

internal-packages/run-engine/src/engine/index.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,8 @@ export class RunEngine {
566566
runnerId,
567567
tx,
568568
skipObserving,
569+
blockingPop,
570+
blockingPopTimeoutSeconds,
569571
}: {
570572
consumerId: string;
571573
workerQueue: string;
@@ -574,6 +576,8 @@ export class RunEngine {
574576
runnerId?: string;
575577
tx?: PrismaClientOrTransaction;
576578
skipObserving?: boolean;
579+
blockingPop?: boolean;
580+
blockingPopTimeoutSeconds?: number;
577581
}): Promise<DequeuedMessage[]> {
578582
if (!skipObserving) {
579583
// We only do this with "prod" worker queues because we don't want to observe dev (e.g. environment) worker queues
@@ -587,6 +591,8 @@ export class RunEngine {
587591
workerId,
588592
runnerId,
589593
tx,
594+
blockingPop,
595+
blockingPopTimeoutSeconds,
590596
});
591597

592598
if (!dequeuedMessage) {
@@ -619,6 +625,8 @@ export class RunEngine {
619625
runnerId,
620626
tx,
621627
skipObserving: true,
628+
blockingPop: true,
629+
blockingPopTimeoutSeconds: 10,
622630
});
623631
}
624632

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,17 @@ export class DequeueSystem {
4646
workerId,
4747
runnerId,
4848
tx,
49+
blockingPop,
50+
blockingPopTimeoutSeconds,
4951
}: {
5052
consumerId: string;
5153
workerQueue: string;
5254
backgroundWorkerId?: string;
5355
workerId?: string;
5456
runnerId?: string;
5557
tx?: PrismaClientOrTransaction;
58+
blockingPop?: boolean;
59+
blockingPopTimeoutSeconds?: number;
5660
}): Promise<DequeuedMessage | undefined> {
5761
const prisma = tx ?? this.$.prisma;
5862

@@ -63,7 +67,11 @@ export class DequeueSystem {
6367
//gets multiple runs from the queue
6468
const message = await this.$.runQueue.dequeueMessageFromWorkerQueue(
6569
consumerId,
66-
workerQueue
70+
workerQueue,
71+
{
72+
blockingPop,
73+
blockingPopTimeoutSeconds,
74+
}
6775
);
6876
if (!message) {
6977
return;
@@ -452,6 +460,7 @@ export class DequeueSystem {
452460
return {
453461
version: "1" as const,
454462
dequeuedAt: new Date(),
463+
workerQueueLength: message.workerQueueLength,
455464
snapshot: {
456465
id: newSnapshot.id,
457466
friendlyId: newSnapshot.friendlyId,

0 commit comments

Comments
 (0)