diff --git a/apps/webapp/app/runEngine/services/batchTrigger.server.ts b/apps/webapp/app/runEngine/services/batchTrigger.server.ts index 74f16395e4..5e8c48cb7b 100644 --- a/apps/webapp/app/runEngine/services/batchTrigger.server.ts +++ b/apps/webapp/app/runEngine/services/batchTrigger.server.ts @@ -571,17 +571,6 @@ export class RunEngineBatchTriggerService extends WithRunEngine { //triggered all the runs if (updatedBatch.runIds.length === updatedBatch.runCount) { - //unblock the parent run from the batch - //this prevents the parent continuing before all the runs are created - if (parentRunId && resumeParentOnCompletion) { - await this._engine.unblockRunForCreatedBatch({ - runId: RunId.fromFriendlyId(parentRunId), - batchId: batch.id, - environmentId: environment.id, - projectId: environment.projectId, - }); - } - //if all the runs were idempotent, it's possible the batch is already completed await this._engine.tryCompleteBatch({ batchId: batch.id }); } diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 9420c67247..91f3d12dfd 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -290,6 +290,7 @@ export class RunEngine { this.batchSystem = new BatchSystem({ resources, + waitpointSystem: this.waitpointSystem, }); this.runAttemptSystem = new RunAttemptSystem({ @@ -905,43 +906,6 @@ export class RunEngine { } } - /** - * This is called when all the runs for a batch have been created. - * This does NOT mean that all the runs for the batch are completed. - */ - async unblockRunForCreatedBatch({ - runId, - batchId, - tx, - }: { - runId: string; - batchId: string; - environmentId: string; - projectId: string; - tx?: PrismaClientOrTransaction; - }): Promise { - const prisma = tx ?? this.prisma; - - const waitpoint = await prisma.waitpoint.findFirst({ - where: { - completedByBatchId: batchId, - }, - }); - - if (!waitpoint) { - this.logger.error("RunEngine.unblockRunForBatch(): Waitpoint not found", { - runId, - batchId, - }); - throw new ServiceValidationError("Waitpoint not found for batch", 404); - } - - await this.completeWaitpoint({ - id: waitpoint.id, - output: { value: "Batch waitpoint completed", isError: false }, - }); - } - async tryCompleteBatch({ batchId }: { batchId: string }): Promise { return this.batchSystem.scheduleCompleteBatch({ batchId }); } diff --git a/internal-packages/run-engine/src/engine/systems/batchSystem.ts b/internal-packages/run-engine/src/engine/systems/batchSystem.ts index 8f0a14f4e3..0a17d1dc59 100644 --- a/internal-packages/run-engine/src/engine/systems/batchSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/batchSystem.ts @@ -1,16 +1,20 @@ import { startSpan } from "@internal/tracing"; import { isFinalRunStatus } from "../statuses.js"; import { SystemResources } from "./systems.js"; +import { WaitpointSystem } from "./waitpointSystem.js"; export type BatchSystemOptions = { resources: SystemResources; + waitpointSystem: WaitpointSystem; }; export class BatchSystem { private readonly $: SystemResources; + private readonly waitpointSystem: WaitpointSystem; constructor(private readonly options: BatchSystemOptions) { this.$ = options.resources; + this.waitpointSystem = options.waitpointSystem; } public async scheduleCompleteBatch({ batchId }: { batchId: string }): Promise { @@ -19,8 +23,8 @@ export class BatchSystem { id: `tryCompleteBatch:${batchId}`, job: "tryCompleteBatch", payload: { batchId: batchId }, - //2s in the future - availableAt: new Date(Date.now() + 2_000), + //200ms in the future + availableAt: new Date(Date.now() + 200), }); } @@ -75,6 +79,28 @@ export class BatchSystem { status: "COMPLETED", }, }); + + //get waitpoint (if there is one) + const waitpoint = await this.$.prisma.waitpoint.findFirst({ + where: { + completedByBatchId: batchId, + }, + }); + + if (!waitpoint) { + this.$.logger.debug( + "RunEngine.unblockRunForBatch(): Waitpoint not found. This is ok, because only batchTriggerAndWait has waitpoints", + { + batchId, + } + ); + return; + } + + await this.waitpointSystem.completeWaitpoint({ + id: waitpoint.id, + output: { value: "Batch waitpoint completed", isError: false }, + }); } else { this.$.logger.debug("#tryCompleteBatch: Not all runs are completed", { batchId }); } diff --git a/internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts b/internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts index 072785d6df..36deab4698 100644 --- a/internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts +++ b/internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts @@ -191,13 +191,6 @@ describe("RunEngine batchTriggerAndWait", () => { expect(batchWaitpoint?.waitpoint.type).toBe("BATCH"); expect(batchWaitpoint?.waitpoint.completedByBatchId).toBe(batch.id); - await engine.unblockRunForCreatedBatch({ - runId: parentRun.id, - batchId: batch.id, - environmentId: authenticatedEnvironment.id, - projectId: authenticatedEnvironment.projectId, - }); - //dequeue and start the 1st child const dequeuedChild = await engine.dequeueFromMasterQueue({ consumerId: "test_12345", @@ -303,7 +296,7 @@ describe("RunEngine batchTriggerAndWait", () => { expect(child2WaitpointAfter?.status).toBe("COMPLETED"); expect(child2WaitpointAfter?.output).toBe('{"baz":"qux"}'); - await setTimeout(500); + await setTimeout(1_000); const runWaitpointsAfterSecondChild = await prisma.taskRunWaitpoint.findMany({ where: { @@ -497,13 +490,6 @@ describe("RunEngine batchTriggerAndWait", () => { expect(parentAfterBatchChild.snapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS"); expect(parentAfterBatchChild.batch?.id).toBe(batch.id); - await engine.unblockRunForCreatedBatch({ - runId: parentRun.id, - batchId: batch.id, - environmentId: authenticatedEnvironment.id, - projectId: authenticatedEnvironment.projectId, - }); - //dequeue and start the batch child const dequeuedBatchChild = await engine.dequeueFromMasterQueue({ consumerId: "test_12345", diff --git a/internal-packages/run-engine/src/engine/tests/checkpoints.test.ts b/internal-packages/run-engine/src/engine/tests/checkpoints.test.ts index 01d46986f6..88d116b0b2 100644 --- a/internal-packages/run-engine/src/engine/tests/checkpoints.test.ts +++ b/internal-packages/run-engine/src/engine/tests/checkpoints.test.ts @@ -1166,13 +1166,6 @@ describe("RunEngine checkpoints", () => { expect(batchWaitpoint?.waitpoint.type).toBe("BATCH"); expect(batchWaitpoint?.waitpoint.completedByBatchId).toBe(batch.id); - await engine.unblockRunForCreatedBatch({ - runId: parentRun.id, - batchId: batch.id, - environmentId: authenticatedEnvironment.id, - projectId: authenticatedEnvironment.projectId, - }); - // Create a checkpoint const checkpointResult = await engine.createCheckpoint({ runId: parentRun.id,