From b2f2689187d671cdfe059634ffa13adbc6ed8d61 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 17 Apr 2025 20:26:12 +0100 Subject: [PATCH 1/3] Completed batch waitpoints when we completed the BatchTaskRun --- .../runEngine/services/batchTrigger.server.ts | 11 ------ .../run-engine/src/engine/index.ts | 38 +------------------ .../src/engine/systems/batchSystem.ts | 26 +++++++++++++ 3 files changed, 27 insertions(+), 48 deletions(-) diff --git a/apps/webapp/app/runEngine/services/batchTrigger.server.ts b/apps/webapp/app/runEngine/services/batchTrigger.server.ts index 74f16395e4..5e8c48cb7b 100644 --- a/apps/webapp/app/runEngine/services/batchTrigger.server.ts +++ b/apps/webapp/app/runEngine/services/batchTrigger.server.ts @@ -571,17 +571,6 @@ export class RunEngineBatchTriggerService extends WithRunEngine { //triggered all the runs if (updatedBatch.runIds.length === updatedBatch.runCount) { - //unblock the parent run from the batch - //this prevents the parent continuing before all the runs are created - if (parentRunId && resumeParentOnCompletion) { - await this._engine.unblockRunForCreatedBatch({ - runId: RunId.fromFriendlyId(parentRunId), - batchId: batch.id, - environmentId: environment.id, - projectId: environment.projectId, - }); - } - //if all the runs were idempotent, it's possible the batch is already completed await this._engine.tryCompleteBatch({ batchId: batch.id }); } diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 9420c67247..91f3d12dfd 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -290,6 +290,7 @@ export class RunEngine { this.batchSystem = new BatchSystem({ resources, + waitpointSystem: this.waitpointSystem, }); this.runAttemptSystem = new RunAttemptSystem({ @@ -905,43 +906,6 @@ export class RunEngine { } } - /** - * This is called when all the runs for a batch have been created. - * This does NOT mean that all the runs for the batch are completed. - */ - async unblockRunForCreatedBatch({ - runId, - batchId, - tx, - }: { - runId: string; - batchId: string; - environmentId: string; - projectId: string; - tx?: PrismaClientOrTransaction; - }): Promise { - const prisma = tx ?? this.prisma; - - const waitpoint = await prisma.waitpoint.findFirst({ - where: { - completedByBatchId: batchId, - }, - }); - - if (!waitpoint) { - this.logger.error("RunEngine.unblockRunForBatch(): Waitpoint not found", { - runId, - batchId, - }); - throw new ServiceValidationError("Waitpoint not found for batch", 404); - } - - await this.completeWaitpoint({ - id: waitpoint.id, - output: { value: "Batch waitpoint completed", isError: false }, - }); - } - async tryCompleteBatch({ batchId }: { batchId: string }): Promise { return this.batchSystem.scheduleCompleteBatch({ batchId }); } diff --git a/internal-packages/run-engine/src/engine/systems/batchSystem.ts b/internal-packages/run-engine/src/engine/systems/batchSystem.ts index 8f0a14f4e3..6c0160068c 100644 --- a/internal-packages/run-engine/src/engine/systems/batchSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/batchSystem.ts @@ -1,16 +1,20 @@ import { startSpan } from "@internal/tracing"; import { isFinalRunStatus } from "../statuses.js"; import { SystemResources } from "./systems.js"; +import { WaitpointSystem } from "./waitpointSystem.js"; export type BatchSystemOptions = { resources: SystemResources; + waitpointSystem: WaitpointSystem; }; export class BatchSystem { private readonly $: SystemResources; + private readonly waitpointSystem: WaitpointSystem; constructor(private readonly options: BatchSystemOptions) { this.$ = options.resources; + this.waitpointSystem = options.waitpointSystem; } public async scheduleCompleteBatch({ batchId }: { batchId: string }): Promise { @@ -75,6 +79,28 @@ export class BatchSystem { status: "COMPLETED", }, }); + + //get waitpoint (if there is one) + const waitpoint = await this.$.prisma.waitpoint.findFirst({ + where: { + completedByBatchId: batchId, + }, + }); + + if (!waitpoint) { + this.$.logger.debug( + "RunEngine.unblockRunForBatch(): Waitpoint not found. This is ok, because only batchTriggerAndWait has waitpoints", + { + batchId, + } + ); + return; + } + + await this.waitpointSystem.completeWaitpoint({ + id: waitpoint.id, + output: { value: "Batch waitpoint completed", isError: false }, + }); } else { this.$.logger.debug("#tryCompleteBatch: Not all runs are completed", { batchId }); } From 30644f5dfad6e5128db98465e9dd65d68f6f2310 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 17 Apr 2025 20:43:40 +0100 Subject: [PATCH 2/3] =?UTF-8?q?Try=20complete=20the=20batch=20faster=20now?= =?UTF-8?q?=20it=E2=80=99s=20being=20used=20operationally?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../run-engine/src/engine/systems/batchSystem.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal-packages/run-engine/src/engine/systems/batchSystem.ts b/internal-packages/run-engine/src/engine/systems/batchSystem.ts index 6c0160068c..0a17d1dc59 100644 --- a/internal-packages/run-engine/src/engine/systems/batchSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/batchSystem.ts @@ -23,8 +23,8 @@ export class BatchSystem { id: `tryCompleteBatch:${batchId}`, job: "tryCompleteBatch", payload: { batchId: batchId }, - //2s in the future - availableAt: new Date(Date.now() + 2_000), + //200ms in the future + availableAt: new Date(Date.now() + 200), }); } From b28fd423ce2e2017931a2483913e6f4811c69707 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 17 Apr 2025 20:44:27 +0100 Subject: [PATCH 3/3] Fix for tests that were using the old engine.unblockRunForCreatedBatch() function --- .../src/engine/tests/batchTriggerAndWait.test.ts | 16 +--------------- .../src/engine/tests/checkpoints.test.ts | 7 ------- 2 files changed, 1 insertion(+), 22 deletions(-) diff --git a/internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts b/internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts index 072785d6df..36deab4698 100644 --- a/internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts +++ b/internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts @@ -191,13 +191,6 @@ describe("RunEngine batchTriggerAndWait", () => { expect(batchWaitpoint?.waitpoint.type).toBe("BATCH"); expect(batchWaitpoint?.waitpoint.completedByBatchId).toBe(batch.id); - await engine.unblockRunForCreatedBatch({ - runId: parentRun.id, - batchId: batch.id, - environmentId: authenticatedEnvironment.id, - projectId: authenticatedEnvironment.projectId, - }); - //dequeue and start the 1st child const dequeuedChild = await engine.dequeueFromMasterQueue({ consumerId: "test_12345", @@ -303,7 +296,7 @@ describe("RunEngine batchTriggerAndWait", () => { expect(child2WaitpointAfter?.status).toBe("COMPLETED"); expect(child2WaitpointAfter?.output).toBe('{"baz":"qux"}'); - await setTimeout(500); + await setTimeout(1_000); const runWaitpointsAfterSecondChild = await prisma.taskRunWaitpoint.findMany({ where: { @@ -497,13 +490,6 @@ describe("RunEngine batchTriggerAndWait", () => { expect(parentAfterBatchChild.snapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS"); expect(parentAfterBatchChild.batch?.id).toBe(batch.id); - await engine.unblockRunForCreatedBatch({ - runId: parentRun.id, - batchId: batch.id, - environmentId: authenticatedEnvironment.id, - projectId: authenticatedEnvironment.projectId, - }); - //dequeue and start the batch child const dequeuedBatchChild = await engine.dequeueFromMasterQueue({ consumerId: "test_12345", diff --git a/internal-packages/run-engine/src/engine/tests/checkpoints.test.ts b/internal-packages/run-engine/src/engine/tests/checkpoints.test.ts index 01d46986f6..88d116b0b2 100644 --- a/internal-packages/run-engine/src/engine/tests/checkpoints.test.ts +++ b/internal-packages/run-engine/src/engine/tests/checkpoints.test.ts @@ -1166,13 +1166,6 @@ describe("RunEngine checkpoints", () => { expect(batchWaitpoint?.waitpoint.type).toBe("BATCH"); expect(batchWaitpoint?.waitpoint.completedByBatchId).toBe(batch.id); - await engine.unblockRunForCreatedBatch({ - runId: parentRun.id, - batchId: batch.id, - environmentId: authenticatedEnvironment.id, - projectId: authenticatedEnvironment.projectId, - }); - // Create a checkpoint const checkpointResult = await engine.createCheckpoint({ runId: parentRun.id,