Skip to content

Commit 1ec4f79

Browse files
committed
Convert run engine tests and run engine to use runQueue changes
1 parent d1a2723 commit 1ec4f79

23 files changed

+1169
-1254
lines changed

internal-packages/run-engine/src/engine/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ export class RunEngine {
119119
immediatePollIntervalMs: options.worker.immediatePollIntervalMs,
120120
shutdownTimeoutMs: options.worker.shutdownTimeoutMs,
121121
},
122+
masterQueueConsumersDisabled: options.queue?.masterQueueConsumersDisabled,
123+
processWorkerQueueDebounceMs: options.queue?.processWorkerQueueDebounceMs,
122124
});
123125

124126
this.worker = new Worker({

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ export class DequeueSystem {
183183
latestSnapshot: snapshot.id,
184184
});
185185
await this.$.runQueue.acknowledgeMessage(orgId, runId);
186-
return null;
186+
return;
187187
}
188188
case "RUN_ENVIRONMENT_ARCHIVED": {
189189
//this happens if the preview branch was archived
@@ -196,18 +196,6 @@ export class DequeueSystem {
196196
}
197197
);
198198
await this.$.runQueue.acknowledgeMessage(orgId, runId);
199-
return null;
200-
}
201-
case "NO_WORKER":
202-
case "TASK_NEVER_REGISTERED":
203-
case "QUEUE_NOT_FOUND":
204-
case "TASK_NOT_IN_LATEST": {
205-
this.$.logger.warn(`RunEngine.dequeueFromMasterQueue(): ${result.code}`, {
206-
runId,
207-
latestSnapshot: snapshot.id,
208-
result,
209-
});
210-
211199
return;
212200
}
213201
case "NO_WORKER":

internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ describe("RunEngine attempt failures", () => {
2020
},
2121
queue: {
2222
redis: redisOptions,
23+
masterQueueConsumersDisabled: true,
24+
processWorkerQueueDebounceMs: 50,
2325
},
2426
runLock: {
2527
redis: redisOptions,
@@ -62,7 +64,7 @@ describe("RunEngine attempt failures", () => {
6264
traceContext: {},
6365
traceId: "t12345",
6466
spanId: "s12345",
65-
masterQueue: "main",
67+
workerQueue: "main",
6668
queue: "task/test-task",
6769
isTest: false,
6870
tags: [],
@@ -71,10 +73,10 @@ describe("RunEngine attempt failures", () => {
7173
);
7274

7375
//dequeue the run
74-
const dequeued = await engine.dequeueFromMasterQueue({
76+
await setTimeout(500);
77+
const dequeued = await engine.dequeueFromWorkerQueue({
7578
consumerId: "test_12345",
76-
masterQueue: run.masterQueue,
77-
maxRunCount: 10,
79+
workerQueue: "main",
7880
});
7981

8082
//create an attempt
@@ -173,6 +175,8 @@ describe("RunEngine attempt failures", () => {
173175
},
174176
queue: {
175177
redis: redisOptions,
178+
masterQueueConsumersDisabled: true,
179+
processWorkerQueueDebounceMs: 50,
176180
},
177181
runLock: {
178182
redis: redisOptions,
@@ -213,7 +217,7 @@ describe("RunEngine attempt failures", () => {
213217
traceContext: {},
214218
traceId: "t12345",
215219
spanId: "s12345",
216-
masterQueue: "main",
220+
workerQueue: "main",
217221
queue: "task/test-task",
218222
isTest: false,
219223
tags: [],
@@ -222,10 +226,10 @@ describe("RunEngine attempt failures", () => {
222226
);
223227

224228
//dequeue the run
225-
const dequeued = await engine.dequeueFromMasterQueue({
229+
await setTimeout(500);
230+
const dequeued = await engine.dequeueFromWorkerQueue({
226231
consumerId: "test_12345",
227-
masterQueue: run.masterQueue,
228-
maxRunCount: 10,
232+
workerQueue: "main",
229233
});
230234

231235
//create an attempt
@@ -284,6 +288,8 @@ describe("RunEngine attempt failures", () => {
284288
},
285289
queue: {
286290
redis: redisOptions,
291+
masterQueueConsumersDisabled: true,
292+
processWorkerQueueDebounceMs: 50,
287293
},
288294
runLock: {
289295
redis: redisOptions,
@@ -324,7 +330,7 @@ describe("RunEngine attempt failures", () => {
324330
traceContext: {},
325331
traceId: "t12345",
326332
spanId: "s12345",
327-
masterQueue: "main",
333+
workerQueue: "main",
328334
queue: "task/test-task",
329335
isTest: false,
330336
tags: [],
@@ -333,10 +339,10 @@ describe("RunEngine attempt failures", () => {
333339
);
334340

335341
//dequeue the run
336-
const dequeued = await engine.dequeueFromMasterQueue({
342+
await setTimeout(500);
343+
const dequeued = await engine.dequeueFromWorkerQueue({
337344
consumerId: "test_12345",
338-
masterQueue: run.masterQueue,
339-
maxRunCount: 10,
345+
workerQueue: "main",
340346
});
341347

342348
//create an attempt
@@ -393,6 +399,8 @@ describe("RunEngine attempt failures", () => {
393399
},
394400
queue: {
395401
redis: redisOptions,
402+
masterQueueConsumersDisabled: true,
403+
processWorkerQueueDebounceMs: 50,
396404
},
397405
runLock: {
398406
redis: redisOptions,
@@ -431,7 +439,7 @@ describe("RunEngine attempt failures", () => {
431439
traceContext: {},
432440
traceId: "t12345",
433441
spanId: "s12345",
434-
masterQueue: "main",
442+
workerQueue: "main",
435443
queue: "task/test-task",
436444
isTest: false,
437445
tags: [],
@@ -440,10 +448,10 @@ describe("RunEngine attempt failures", () => {
440448
);
441449

442450
//dequeue the run
443-
const dequeued = await engine.dequeueFromMasterQueue({
451+
await setTimeout(500);
452+
const dequeued = await engine.dequeueFromWorkerQueue({
444453
consumerId: "test_12345",
445-
masterQueue: run.masterQueue,
446-
maxRunCount: 10,
454+
workerQueue: "main",
447455
});
448456

449457
//create an attempt
@@ -500,6 +508,8 @@ describe("RunEngine attempt failures", () => {
500508
},
501509
queue: {
502510
redis: redisOptions,
511+
masterQueueConsumersDisabled: true,
512+
processWorkerQueueDebounceMs: 50,
503513
},
504514
runLock: {
505515
redis: redisOptions,
@@ -548,7 +558,7 @@ describe("RunEngine attempt failures", () => {
548558
traceContext: {},
549559
traceId: "t12345",
550560
spanId: "s12345",
551-
masterQueue: "main",
561+
workerQueue: "main",
552562
queue: "task/test-task",
553563
isTest: false,
554564
tags: [],
@@ -557,10 +567,10 @@ describe("RunEngine attempt failures", () => {
557567
);
558568

559569
//dequeue the run
560-
const dequeued = await engine.dequeueFromMasterQueue({
570+
await setTimeout(500);
571+
const dequeued = await engine.dequeueFromWorkerQueue({
561572
consumerId: "test_12345",
562-
masterQueue: run.masterQueue,
563-
maxRunCount: 10,
573+
workerQueue: "main",
564574
});
565575

566576
//create an attempt
@@ -657,6 +667,8 @@ describe("RunEngine attempt failures", () => {
657667
},
658668
queue: {
659669
redis: redisOptions,
670+
masterQueueConsumersDisabled: true,
671+
processWorkerQueueDebounceMs: 50,
660672
},
661673
runLock: {
662674
redis: redisOptions,
@@ -707,7 +719,7 @@ describe("RunEngine attempt failures", () => {
707719
traceContext: {},
708720
traceId: "t12345",
709721
spanId: "s12345",
710-
masterQueue: "main",
722+
workerQueue: "main",
711723
queue: "task/test-task",
712724
isTest: false,
713725
tags: [],
@@ -716,10 +728,10 @@ describe("RunEngine attempt failures", () => {
716728
);
717729

718730
//dequeue the run
719-
const dequeued = await engine.dequeueFromMasterQueue({
731+
await setTimeout(500);
732+
const dequeued = await engine.dequeueFromWorkerQueue({
720733
consumerId: "test_12345",
721-
masterQueue: run.masterQueue,
722-
maxRunCount: 10,
734+
workerQueue: "main",
723735
});
724736

725737
//create first attempt
@@ -762,10 +774,10 @@ describe("RunEngine attempt failures", () => {
762774
await setTimeout(5_000);
763775

764776
//dequeue again
765-
const dequeued2 = await engine.dequeueFromMasterQueue({
777+
await setTimeout(500);
778+
const dequeued2 = await engine.dequeueFromWorkerQueue({
766779
consumerId: "test_12345",
767-
masterQueue: run.masterQueue,
768-
maxRunCount: 10,
780+
workerQueue: "main",
769781
});
770782
expect(dequeued2.length).toBe(1);
771783

internal-packages/run-engine/src/engine/tests/batchTrigger.test.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ describe("RunEngine batchTrigger", () => {
2424
},
2525
queue: {
2626
redis: redisOptions,
27+
masterQueueConsumersDisabled: true,
28+
processWorkerQueueDebounceMs: 50,
2729
},
2830
runLock: {
2931
redis: redisOptions,
@@ -73,7 +75,7 @@ describe("RunEngine batchTrigger", () => {
7375
traceContext: {},
7476
traceId: "t12345",
7577
spanId: "s12345",
76-
masterQueue: "main",
78+
workerQueue: "main",
7779
queue: "task/test-task",
7880
isTest: false,
7981
tags: [],
@@ -94,7 +96,7 @@ describe("RunEngine batchTrigger", () => {
9496
traceContext: {},
9597
traceId: "t12345",
9698
spanId: "s12345",
97-
masterQueue: "main",
99+
workerQueue: "main",
98100
queue: "task/test-task",
99101
isTest: false,
100102
tags: [],
@@ -116,13 +118,13 @@ describe("RunEngine batchTrigger", () => {
116118
expect(queueLength).toBe(2);
117119

118120
//dequeue
121+
await setTimeout(500);
119122
const dequeued: DequeuedMessage[] = [];
120123
for (let i = 0; i < 2; i++) {
121124
dequeued.push(
122-
...(await engine.dequeueFromMasterQueue({
125+
...(await engine.dequeueFromWorkerQueue({
123126
consumerId: "test_12345",
124-
masterQueue: "main",
125-
maxRunCount: 1,
127+
workerQueue: "main",
126128
}))
127129
);
128130
}

0 commit comments

Comments
 (0)