Skip to content

Commit 26ec667

Browse files
committed
Fix more tests
1 parent b2f83f7 commit 26ec667

File tree

3 files changed

+84
-77
lines changed

3 files changed

+84
-77
lines changed

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

Lines changed: 75 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -541,84 +541,84 @@ export class DequeueSystem {
541541
}) {
542542
const prisma = tx ?? this.$.prisma;
543543

544-
return startSpan(
545-
this.$.tracer,
546-
"#pendingVersion",
547-
async (span) => {
548-
return this.$.runLock.lock("pendingVersion", [runId], 5_000, async (signal) => {
549-
//mark run as waiting for deploy
550-
const run = await prisma.taskRun.update({
551-
where: { id: runId },
552-
data: {
553-
status: "PENDING_VERSION",
554-
statusReason,
555-
},
544+
this.$.logger.debug("RunEngine.dequeueFromWorkerQueue(): Pending version", {
545+
runId,
546+
reason,
547+
statusReason,
548+
});
549+
550+
return this.$.runLock.lock("pendingVersion", [runId], 5_000, async (signal) => {
551+
this.$.logger.debug("RunEngine.dequeueFromWorkerQueue(): Pending version lock acquired", {
552+
runId,
553+
reason,
554+
statusReason,
555+
});
556+
557+
//mark run as waiting for deploy
558+
const run = await prisma.taskRun.update({
559+
where: { id: runId },
560+
data: {
561+
status: "PENDING_VERSION",
562+
statusReason,
563+
},
564+
select: {
565+
id: true,
566+
status: true,
567+
attemptNumber: true,
568+
updatedAt: true,
569+
createdAt: true,
570+
runtimeEnvironment: {
556571
select: {
557572
id: true,
558-
status: true,
559-
attemptNumber: true,
560-
updatedAt: true,
561-
createdAt: true,
562-
runtimeEnvironment: {
563-
select: {
564-
id: true,
565-
type: true,
566-
projectId: true,
567-
project: { select: { id: true, organizationId: true } },
568-
},
569-
},
570-
},
571-
});
572-
573-
this.$.logger.debug("RunEngine.dequeueFromWorkerQueue(): Pending version", {
574-
runId,
575-
run,
576-
});
577-
578-
await this.executionSnapshotSystem.createExecutionSnapshot(prisma, {
579-
run,
580-
snapshot: {
581-
executionStatus: "RUN_CREATED",
582-
description:
583-
reason ??
584-
"The run doesn't have a background worker, so we're going to ack it for now.",
585-
},
586-
environmentId: run.runtimeEnvironment.id,
587-
environmentType: run.runtimeEnvironment.type,
588-
projectId: run.runtimeEnvironment.projectId,
589-
organizationId: run.runtimeEnvironment.project.organizationId,
590-
workerId,
591-
runnerId,
592-
});
593-
594-
//we ack because when it's deployed it will be requeued
595-
await this.$.runQueue.acknowledgeMessage(orgId, runId);
596-
597-
this.$.eventBus.emit("runStatusChanged", {
598-
time: new Date(),
599-
run: {
600-
id: runId,
601-
status: run.status,
602-
updatedAt: run.updatedAt,
603-
createdAt: run.createdAt,
604-
},
605-
organization: {
606-
id: run.runtimeEnvironment.project.organizationId,
573+
type: true,
574+
projectId: true,
575+
project: { select: { id: true, organizationId: true } },
607576
},
608-
project: {
609-
id: run.runtimeEnvironment.projectId,
610-
},
611-
environment: {
612-
id: run.runtimeEnvironment.id,
613-
},
614-
});
615-
});
616-
},
617-
{
618-
attributes: {
619-
runId,
577+
},
620578
},
621-
}
622-
);
579+
});
580+
581+
this.$.logger.debug("RunEngine.dequeueFromWorkerQueue(): Pending version", {
582+
runId,
583+
run,
584+
});
585+
586+
await this.executionSnapshotSystem.createExecutionSnapshot(prisma, {
587+
run,
588+
snapshot: {
589+
executionStatus: "RUN_CREATED",
590+
description:
591+
reason ?? "The run doesn't have a background worker, so we're going to ack it for now.",
592+
},
593+
environmentId: run.runtimeEnvironment.id,
594+
environmentType: run.runtimeEnvironment.type,
595+
projectId: run.runtimeEnvironment.projectId,
596+
organizationId: run.runtimeEnvironment.project.organizationId,
597+
workerId,
598+
runnerId,
599+
});
600+
601+
//we ack because when it's deployed it will be requeued
602+
await this.$.runQueue.acknowledgeMessage(orgId, runId);
603+
604+
this.$.eventBus.emit("runStatusChanged", {
605+
time: new Date(),
606+
run: {
607+
id: runId,
608+
status: run.status,
609+
updatedAt: run.updatedAt,
610+
createdAt: run.createdAt,
611+
},
612+
organization: {
613+
id: run.runtimeEnvironment.project.organizationId,
614+
},
615+
project: {
616+
id: run.runtimeEnvironment.projectId,
617+
},
618+
environment: {
619+
id: run.runtimeEnvironment.id,
620+
},
621+
});
622+
});
623623
}
624624
}

internal-packages/run-engine/src/engine/tests/heartbeats.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ describe("RunEngine heartbeats", () => {
350350
assertNonNullable(executionData2);
351351
expect(executionData2.snapshot.executionStatus).toBe("QUEUED");
352352

353-
await setTimeout(1_000);
353+
await engine.runQueue.processMasterQueueForEnvironment(authenticatedEnvironment.id);
354354

355355
//have to dequeue again
356356
const dequeued2 = await engine.dequeueFromWorkerQueue({

internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ describe("RunEngine pending version", () => {
192192
//set this so we have to requeue the runs in two batches
193193
queueRunsWaitingForWorkerBatchSize: 1,
194194
tracer: trace.getTracer("test", "0.0.0"),
195+
logLevel: "debug",
195196
});
196197

197198
try {
@@ -247,7 +248,7 @@ describe("RunEngine pending version", () => {
247248
expect(executionDataR1.snapshot.executionStatus).toBe("QUEUED");
248249
expect(executionDataR2.snapshot.executionStatus).toBe("QUEUED");
249250

250-
await setTimeout(500);
251+
await engine.runQueue.processMasterQueueForEnvironment(authenticatedEnvironment.id);
251252

252253
//dequeuing should fail
253254
const dequeued = await engine.dequeueFromWorkerQueue({
@@ -256,6 +257,12 @@ describe("RunEngine pending version", () => {
256257
});
257258
expect(dequeued.length).toBe(0);
258259

260+
const dequeued2 = await engine.dequeueFromWorkerQueue({
261+
consumerId: "test_12345",
262+
workerQueue: "main",
263+
});
264+
expect(dequeued2.length).toBe(0);
265+
259266
//queue should be empty
260267
const queueLength = await engine.runQueue.lengthOfQueue(
261268
authenticatedEnvironment,

0 commit comments

Comments
 (0)