Skip to content

Commit f5f12cb

Browse files
committed
Remove QUEUED_EXECUTING because we no longer "eagerly" release before checkpointing
1 parent dfc88e5 commit f5f12cb

File tree

5 files changed

+27
-784
lines changed

5 files changed

+27
-784
lines changed

internal-packages/run-engine/src/engine/systems/waitpointSystem.ts

Lines changed: 27 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -595,75 +595,45 @@ export class WaitpointSystem {
595595
return "skipped";
596596
}
597597
case "EXECUTING_WITH_WAITPOINTS": {
598-
const result = await this.$.runQueue.reacquireConcurrency(
599-
run.runtimeEnvironment.organization.id,
600-
runId
601-
);
602-
603-
if (result) {
604-
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(
605-
this.$.prisma,
606-
{
607-
run: {
608-
id: runId,
609-
status: snapshot.runStatus,
610-
attemptNumber: snapshot.attemptNumber,
611-
},
612-
snapshot: {
613-
executionStatus: "EXECUTING",
614-
description: "Run was continued, whilst still executing.",
615-
},
616-
previousSnapshotId: snapshot.id,
617-
environmentId: snapshot.environmentId,
618-
environmentType: snapshot.environmentType,
619-
projectId: snapshot.projectId,
620-
organizationId: snapshot.organizationId,
621-
batchId: snapshot.batchId ?? undefined,
622-
completedWaitpoints: blockingWaitpoints.map((b) => ({
623-
id: b.waitpoint.id,
624-
index: b.batchIndex ?? undefined,
625-
})),
626-
}
627-
);
628-
629-
this.$.logger.debug(
630-
`continueRunIfUnblocked: run was still executing, sending notification`,
631-
{
632-
runId,
633-
snapshot,
634-
newSnapshot,
635-
}
636-
);
637-
638-
await sendNotificationToWorker({
639-
runId,
640-
snapshot: newSnapshot,
641-
eventBus: this.$.eventBus,
642-
});
643-
} else {
644-
// Because we cannot reacquire the concurrency, we need to enqueue the run again
645-
// and because the run is still executing, we need to set the status to QUEUED_EXECUTING
646-
const newSnapshot = await this.enqueueSystem.enqueueRun({
647-
run,
648-
env: run.runtimeEnvironment,
598+
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(
599+
this.$.prisma,
600+
{
601+
run: {
602+
id: runId,
603+
status: snapshot.runStatus,
604+
attemptNumber: snapshot.attemptNumber,
605+
},
649606
snapshot: {
650-
status: "QUEUED_EXECUTING",
651-
description: "Run can continue, but is waiting for concurrency",
607+
executionStatus: "EXECUTING",
608+
description: "Run was continued, whilst still executing.",
652609
},
653610
previousSnapshotId: snapshot.id,
611+
environmentId: snapshot.environmentId,
612+
environmentType: snapshot.environmentType,
613+
projectId: snapshot.projectId,
614+
organizationId: snapshot.organizationId,
654615
batchId: snapshot.batchId ?? undefined,
655616
completedWaitpoints: blockingWaitpoints.map((b) => ({
656617
id: b.waitpoint.id,
657618
index: b.batchIndex ?? undefined,
658619
})),
659-
});
620+
}
621+
);
660622

661-
this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED_EXECUTING`, {
623+
this.$.logger.debug(
624+
`continueRunIfUnblocked: run was still executing, sending notification`,
625+
{
662626
runId,
663627
snapshot,
664628
newSnapshot,
665-
});
666-
}
629+
}
630+
);
631+
632+
await sendNotificationToWorker({
633+
runId,
634+
snapshot: newSnapshot,
635+
eventBus: this.$.eventBus,
636+
});
667637

668638
break;
669639
}

internal-packages/run-engine/src/engine/tests/checkpoints.test.ts

Lines changed: 0 additions & 231 deletions
Original file line numberDiff line numberDiff line change
@@ -775,237 +775,6 @@ describe("RunEngine checkpoints", () => {
775775
}
776776
);
777777

778-
containerTest(
779-
"when a checkpoint is created while the run is in QUEUED_EXECUTING state, the run is QUEUED",
780-
async ({ prisma, redisOptions }) => {
781-
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
782-
783-
const engine = new RunEngine({
784-
prisma,
785-
worker: {
786-
redis: redisOptions,
787-
workers: 1,
788-
tasksPerWorker: 10,
789-
pollIntervalMs: 100,
790-
},
791-
queue: {
792-
redis: redisOptions,
793-
masterQueueConsumersDisabled: true,
794-
processWorkerQueueDebounceMs: 50,
795-
},
796-
runLock: {
797-
redis: redisOptions,
798-
},
799-
machines: {
800-
defaultMachine: "small-1x",
801-
machines: {
802-
"small-1x": {
803-
name: "small-1x" as const,
804-
cpu: 0.5,
805-
memory: 0.5,
806-
centsPerMs: 0.0001,
807-
},
808-
},
809-
baseCostInCents: 0.0001,
810-
},
811-
tracer: trace.getTracer("test", "0.0.0"),
812-
});
813-
814-
try {
815-
const taskIdentifier = "test-task";
816-
817-
// Create background worker
818-
await setupBackgroundWorker(
819-
engine,
820-
authenticatedEnvironment,
821-
taskIdentifier,
822-
undefined,
823-
undefined,
824-
{
825-
concurrencyLimit: 1,
826-
}
827-
);
828-
829-
// Create first run with queue concurrency limit of 1
830-
const firstRun = await engine.trigger(
831-
{
832-
number: 1,
833-
friendlyId: "run_first",
834-
environment: authenticatedEnvironment,
835-
taskIdentifier,
836-
payload: "{}",
837-
payloadType: "application/json",
838-
context: {},
839-
traceContext: {},
840-
traceId: "t12345-first",
841-
spanId: "s12345-first",
842-
workerQueue: "main",
843-
queue: "task/test-task",
844-
isTest: false,
845-
tags: [],
846-
},
847-
prisma
848-
);
849-
850-
await setTimeout(500);
851-
852-
// Dequeue and start the first run
853-
const dequeuedFirst = await engine.dequeueFromWorkerQueue({
854-
consumerId: "test_12345",
855-
workerQueue: "main",
856-
});
857-
expect(dequeuedFirst.length).toBe(1);
858-
assertNonNullable(dequeuedFirst[0]);
859-
860-
const firstAttempt = await engine.startRunAttempt({
861-
runId: dequeuedFirst[0].run.id,
862-
snapshotId: dequeuedFirst[0].snapshot.id,
863-
});
864-
expect(firstAttempt.snapshot.executionStatus).toBe("EXECUTING");
865-
866-
// Create a manual waitpoint for the first run
867-
const waitpoint = await engine.createManualWaitpoint({
868-
environmentId: authenticatedEnvironment.id,
869-
projectId: authenticatedEnvironment.projectId,
870-
});
871-
expect(waitpoint.waitpoint.status).toBe("PENDING");
872-
873-
// Block the first run
874-
const blockedResult = await engine.blockRunWithWaitpoint({
875-
runId: firstRun.id,
876-
waitpoints: waitpoint.waitpoint.id,
877-
projectId: authenticatedEnvironment.projectId,
878-
organizationId: authenticatedEnvironment.organizationId,
879-
});
880-
881-
// Verify first run is blocked
882-
const firstRunData = await engine.getRunExecutionData({ runId: firstRun.id });
883-
expect(firstRunData?.snapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS");
884-
885-
// Create and start second run on the same queue
886-
const secondRun = await engine.trigger(
887-
{
888-
number: 2,
889-
friendlyId: "run_second",
890-
environment: authenticatedEnvironment,
891-
taskIdentifier,
892-
payload: "{}",
893-
payloadType: "application/json",
894-
context: {},
895-
traceContext: {},
896-
traceId: "t12345-second",
897-
spanId: "s12345-second",
898-
workerQueue: "main",
899-
queue: "task/test-task",
900-
isTest: false,
901-
tags: [],
902-
},
903-
prisma
904-
);
905-
906-
await setTimeout(500);
907-
908-
// Dequeue and start the second run
909-
const dequeuedSecond = await engine.dequeueFromWorkerQueue({
910-
consumerId: "test_12345",
911-
workerQueue: "main",
912-
});
913-
expect(dequeuedSecond.length).toBe(1);
914-
assertNonNullable(dequeuedSecond[0]);
915-
916-
const secondAttempt = await engine.startRunAttempt({
917-
runId: dequeuedSecond[0].run.id,
918-
snapshotId: dequeuedSecond[0].snapshot.id,
919-
});
920-
expect(secondAttempt.snapshot.executionStatus).toBe("EXECUTING");
921-
922-
// Now complete the waitpoint for the first run
923-
await engine.completeWaitpoint({
924-
id: waitpoint.waitpoint.id,
925-
});
926-
927-
// Wait for the continueRunIfUnblocked to process
928-
await setTimeout(500);
929-
930-
// Verify the first run is now in QUEUED_EXECUTING state
931-
const executionDataAfter = await engine.getRunExecutionData({ runId: firstRun.id });
932-
expect(executionDataAfter?.snapshot.executionStatus).toBe("QUEUED_EXECUTING");
933-
expect(executionDataAfter?.snapshot.description).toBe(
934-
"Run can continue, but is waiting for concurrency"
935-
);
936-
937-
// Verify the waitpoint is no longer blocking the first run
938-
const runWaitpoint = await prisma.taskRunWaitpoint.findFirst({
939-
where: {
940-
taskRunId: firstRun.id,
941-
},
942-
include: {
943-
waitpoint: true,
944-
},
945-
});
946-
expect(runWaitpoint).toBeNull();
947-
948-
// Verify the waitpoint itself is completed
949-
const completedWaitpoint = await prisma.waitpoint.findUnique({
950-
where: {
951-
id: waitpoint.waitpoint.id,
952-
},
953-
});
954-
assertNonNullable(completedWaitpoint);
955-
expect(completedWaitpoint.status).toBe("COMPLETED");
956-
957-
// Create checkpoint after waitpoint completion
958-
const checkpointResult = await engine.createCheckpoint({
959-
runId: firstRun.id,
960-
snapshotId: firstRunData?.snapshot.id!,
961-
checkpoint: {
962-
type: "DOCKER",
963-
reason: "TEST_CHECKPOINT",
964-
location: "test-location",
965-
imageRef: "test-image-ref",
966-
},
967-
});
968-
969-
expect(checkpointResult.ok).toBe(true);
970-
const checkpoint = checkpointResult.ok ? checkpointResult.snapshot : null;
971-
assertNonNullable(checkpoint);
972-
expect(checkpoint.executionStatus).toBe("QUEUED");
973-
974-
// Complete the second run so the first run can be dequeued
975-
const result = await engine.completeRunAttempt({
976-
runId: dequeuedSecond[0].run.id,
977-
snapshotId: secondAttempt.snapshot.id,
978-
completion: {
979-
ok: true,
980-
id: dequeuedSecond[0].run.id,
981-
output: `{"foo":"bar"}`,
982-
outputType: "application/json",
983-
},
984-
});
985-
986-
await setTimeout(1000);
987-
988-
// Verify the first run is back in the queue
989-
const queuedRun = await engine.dequeueFromWorkerQueue({
990-
consumerId: "test_12345",
991-
workerQueue: "main",
992-
});
993-
expect(queuedRun.length).toBe(1);
994-
assertNonNullable(queuedRun[0]);
995-
996-
// Now we can continue the run
997-
const continueResult = await engine.continueRunExecution({
998-
runId: firstRun.id,
999-
snapshotId: queuedRun[0].snapshot.id,
1000-
});
1001-
1002-
expect(continueResult.snapshot.executionStatus).toBe("EXECUTING");
1003-
} finally {
1004-
await engine.quit();
1005-
}
1006-
}
1007-
);
1008-
1009778
containerTest("batchTriggerAndWait resume after checkpoint", async ({ prisma, redisOptions }) => {
1010779
//create environment
1011780
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");

internal-packages/run-engine/src/run-queue/index.test.ts

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -952,33 +952,6 @@ describe("RunQueue", () => {
952952
0
953953
);
954954
expect(await queue.currentConcurrencyOfEnvironment(authenticatedEnvProd)).toBe(0);
955-
956-
//reacquire the concurrency
957-
await queue.reacquireConcurrency(authenticatedEnvProd.organization.id, message.messageId);
958-
959-
//check concurrencies are back to what they were before
960-
expect(await queue.currentConcurrencyOfQueue(authenticatedEnvProd, messageProd.queue)).toBe(
961-
1
962-
);
963-
expect(await queue.currentConcurrencyOfEnvironment(authenticatedEnvProd)).toBe(1);
964-
965-
//release the concurrency (with the queue this time)
966-
await queue.releaseAllConcurrency(authenticatedEnvProd.organization.id, message.messageId);
967-
968-
//concurrencies
969-
expect(await queue.currentConcurrencyOfQueue(authenticatedEnvProd, messageProd.queue)).toBe(
970-
0
971-
);
972-
expect(await queue.currentConcurrencyOfEnvironment(authenticatedEnvProd)).toBe(0);
973-
974-
//reacquire the concurrency
975-
await queue.reacquireConcurrency(authenticatedEnvProd.organization.id, message.messageId);
976-
977-
//check concurrencies are back to what they were before
978-
expect(await queue.currentConcurrencyOfQueue(authenticatedEnvProd, messageProd.queue)).toBe(
979-
1
980-
);
981-
expect(await queue.currentConcurrencyOfEnvironment(authenticatedEnvProd)).toBe(1);
982955
} finally {
983956
try {
984957
await queue.quit();

0 commit comments

Comments
 (0)