Skip to content

Commit 79e7a15

Browse files
committed
Fix more tests
1 parent 9037d3c commit 79e7a15

File tree

3 files changed

+84
-77
lines changed

3 files changed

+84
-77
lines changed

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

Lines changed: 75 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -542,84 +542,84 @@ export class DequeueSystem {
542542
}) {
543543
const prisma = tx ?? this.$.prisma;
544544

545-
return startSpan(
546-
this.$.tracer,
547-
"#pendingVersion",
548-
async (span) => {
549-
return this.$.runLock.lock("pendingVersion", [runId], 5_000, async (signal) => {
550-
//mark run as waiting for deploy
551-
const run = await prisma.taskRun.update({
552-
where: { id: runId },
553-
data: {
554-
status: "PENDING_VERSION",
555-
statusReason,
556-
},
545+
this.$.logger.debug("RunEngine.dequeueFromWorkerQueue(): Pending version", {
546+
runId,
547+
reason,
548+
statusReason,
549+
});
550+
551+
return this.$.runLock.lock("pendingVersion", [runId], 5_000, async (signal) => {
552+
this.$.logger.debug("RunEngine.dequeueFromWorkerQueue(): Pending version lock acquired", {
553+
runId,
554+
reason,
555+
statusReason,
556+
});
557+
558+
//mark run as waiting for deploy
559+
const run = await prisma.taskRun.update({
560+
where: { id: runId },
561+
data: {
562+
status: "PENDING_VERSION",
563+
statusReason,
564+
},
565+
select: {
566+
id: true,
567+
status: true,
568+
attemptNumber: true,
569+
updatedAt: true,
570+
createdAt: true,
571+
runtimeEnvironment: {
557572
select: {
558573
id: true,
559-
status: true,
560-
attemptNumber: true,
561-
updatedAt: true,
562-
createdAt: true,
563-
runtimeEnvironment: {
564-
select: {
565-
id: true,
566-
type: true,
567-
projectId: true,
568-
project: { select: { id: true, organizationId: true } },
569-
},
570-
},
571-
},
572-
});
573-
574-
this.$.logger.debug("RunEngine.dequeueFromWorkerQueue(): Pending version", {
575-
runId,
576-
run,
577-
});
578-
579-
await this.executionSnapshotSystem.createExecutionSnapshot(prisma, {
580-
run,
581-
snapshot: {
582-
executionStatus: "RUN_CREATED",
583-
description:
584-
reason ??
585-
"The run doesn't have a background worker, so we're going to ack it for now.",
586-
},
587-
environmentId: run.runtimeEnvironment.id,
588-
environmentType: run.runtimeEnvironment.type,
589-
projectId: run.runtimeEnvironment.projectId,
590-
organizationId: run.runtimeEnvironment.project.organizationId,
591-
workerId,
592-
runnerId,
593-
});
594-
595-
//we ack because when it's deployed it will be requeued
596-
await this.$.runQueue.acknowledgeMessage(orgId, runId);
597-
598-
this.$.eventBus.emit("runStatusChanged", {
599-
time: new Date(),
600-
run: {
601-
id: runId,
602-
status: run.status,
603-
updatedAt: run.updatedAt,
604-
createdAt: run.createdAt,
605-
},
606-
organization: {
607-
id: run.runtimeEnvironment.project.organizationId,
574+
type: true,
575+
projectId: true,
576+
project: { select: { id: true, organizationId: true } },
608577
},
609-
project: {
610-
id: run.runtimeEnvironment.projectId,
611-
},
612-
environment: {
613-
id: run.runtimeEnvironment.id,
614-
},
615-
});
616-
});
617-
},
618-
{
619-
attributes: {
620-
runId,
578+
},
621579
},
622-
}
623-
);
580+
});
581+
582+
this.$.logger.debug("RunEngine.dequeueFromWorkerQueue(): Pending version", {
583+
runId,
584+
run,
585+
});
586+
587+
await this.executionSnapshotSystem.createExecutionSnapshot(prisma, {
588+
run,
589+
snapshot: {
590+
executionStatus: "RUN_CREATED",
591+
description:
592+
reason ?? "The run doesn't have a background worker, so we're going to ack it for now.",
593+
},
594+
environmentId: run.runtimeEnvironment.id,
595+
environmentType: run.runtimeEnvironment.type,
596+
projectId: run.runtimeEnvironment.projectId,
597+
organizationId: run.runtimeEnvironment.project.organizationId,
598+
workerId,
599+
runnerId,
600+
});
601+
602+
//we ack because when it's deployed it will be requeued
603+
await this.$.runQueue.acknowledgeMessage(orgId, runId);
604+
605+
this.$.eventBus.emit("runStatusChanged", {
606+
time: new Date(),
607+
run: {
608+
id: runId,
609+
status: run.status,
610+
updatedAt: run.updatedAt,
611+
createdAt: run.createdAt,
612+
},
613+
organization: {
614+
id: run.runtimeEnvironment.project.organizationId,
615+
},
616+
project: {
617+
id: run.runtimeEnvironment.projectId,
618+
},
619+
environment: {
620+
id: run.runtimeEnvironment.id,
621+
},
622+
});
623+
});
624624
}
625625
}

internal-packages/run-engine/src/engine/tests/heartbeats.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ describe("RunEngine heartbeats", () => {
350350
assertNonNullable(executionData2);
351351
expect(executionData2.snapshot.executionStatus).toBe("QUEUED");
352352

353-
await setTimeout(1_000);
353+
await engine.runQueue.processMasterQueueForEnvironment(authenticatedEnvironment.id);
354354

355355
//have to dequeue again
356356
const dequeued2 = await engine.dequeueFromWorkerQueue({

internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ describe("RunEngine pending version", () => {
192192
//set this so we have to requeue the runs in two batches
193193
queueRunsWaitingForWorkerBatchSize: 1,
194194
tracer: trace.getTracer("test", "0.0.0"),
195+
logLevel: "debug",
195196
});
196197

197198
try {
@@ -247,7 +248,7 @@ describe("RunEngine pending version", () => {
247248
expect(executionDataR1.snapshot.executionStatus).toBe("QUEUED");
248249
expect(executionDataR2.snapshot.executionStatus).toBe("QUEUED");
249250

250-
await setTimeout(500);
251+
await engine.runQueue.processMasterQueueForEnvironment(authenticatedEnvironment.id);
251252

252253
//dequeuing should fail
253254
const dequeued = await engine.dequeueFromWorkerQueue({
@@ -256,6 +257,12 @@ describe("RunEngine pending version", () => {
256257
});
257258
expect(dequeued.length).toBe(0);
258259

260+
const dequeued2 = await engine.dequeueFromWorkerQueue({
261+
consumerId: "test_12345",
262+
workerQueue: "main",
263+
});
264+
expect(dequeued2.length).toBe(0);
265+
259266
//queue should be empty
260267
const queueLength = await engine.runQueue.lengthOfQueue(
261268
authenticatedEnvironment,

0 commit comments

Comments
 (0)