diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index d146edb084..f78c2c3a0c 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -66,18 +66,7 @@ export class WaitpointSystem { isError: boolean; }; }): Promise { - // 1. Find the TaskRuns blocked by this waitpoint - const affectedTaskRuns = await this.$.prisma.taskRunWaitpoint.findMany({ - where: { waitpointId: id }, - select: { taskRunId: true, spanIdToComplete: true, createdAt: true }, - }); - - if (affectedTaskRuns.length === 0) { - this.$.logger.debug(`completeWaitpoint: No TaskRunWaitpoints found for waitpoint`, { - waitpointId: id, - }); - } - + // 1. Complete the Waitpoint (if not completed) let [waitpointError, waitpoint] = await tryCatch( this.$.prisma.waitpoint.update({ where: { id, status: "PENDING" }, @@ -109,15 +98,44 @@ export class WaitpointSystem { throw new Error(`Waitpoint ${id} not found`); } - //schedule trying to continue the runs + if (waitpoint.status !== "COMPLETED") { + this.$.logger.error(`completeWaitpoint: waitpoint is not completed`, { + waitpointId: id, + }); + throw new Error(`Waitpoint ${id} is not completed`); + } + + // 2. Find the TaskRuns blocked by this waitpoint + const affectedTaskRuns = await this.$.prisma.taskRunWaitpoint.findMany({ + where: { waitpointId: id }, + select: { taskRunId: true, spanIdToComplete: true, createdAt: true }, + }); + + if (affectedTaskRuns.length === 0) { + this.$.logger.debug(`completeWaitpoint: no TaskRunWaitpoints found for waitpoint`, { + waitpointId: id, + }); + } + + // 3. Schedule trying to continue the runs for (const run of affectedTaskRuns) { + const jobId = `continueRunIfUnblocked:${run.taskRunId}`; + //50ms in the future + const availableAt = new Date(Date.now() + 50); + + this.$.logger.debug(`completeWaitpoint: enqueueing continueRunIfUnblocked`, { + waitpointId: id, + runId: run.taskRunId, + jobId, + availableAt, + }); + await this.$.worker.enqueue({ //this will debounce the call - id: `continueRunIfUnblocked:${run.taskRunId}`, + id: jobId, job: "continueRunIfUnblocked", payload: { runId: run.taskRunId }, - //50ms in the future - availableAt: new Date(Date.now() + 50), + availableAt, }); // emit an event to complete associated cached runs @@ -469,6 +487,10 @@ export class WaitpointSystem { } public async continueRunIfUnblocked({ runId }: { runId: string }) { + this.$.logger.debug(`continueRunIfUnblocked: start`, { + runId, + }); + // 1. Get the any blocking waitpoints const blockingWaitpoints = await this.$.prisma.taskRunWaitpoint.findMany({ where: { taskRunId: runId }, @@ -483,6 +505,10 @@ export class WaitpointSystem { // 2. There are blockers still, so do nothing if (blockingWaitpoints.some((w) => w.waitpoint.status !== "COMPLETED")) { + this.$.logger.debug(`continueRunIfUnblocked: blocking waitpoints still exist`, { + runId, + blockingWaitpoints, + }); return; } @@ -505,7 +531,10 @@ export class WaitpointSystem { }); if (!run) { - throw new Error(`#continueRunIfUnblocked: run not found: ${runId}`); + this.$.logger.error(`continueRunIfUnblocked: run not found`, { + runId, + }); + throw new Error(`continueRunIfUnblocked: run not found: ${runId}`); } //4. Continue the run whether it's executing or not @@ -513,7 +542,7 @@ export class WaitpointSystem { const snapshot = await getLatestExecutionSnapshot(this.$.prisma, runId); if (isFinishedOrPendingFinished(snapshot.executionStatus)) { - this.$.logger.debug(`#continueRunIfUnblocked: run is finished, skipping`, { + this.$.logger.debug(`continueRunIfUnblocked: run is finished, skipping`, { runId, snapshot, }); @@ -555,6 +584,15 @@ export class WaitpointSystem { await this.releaseConcurrencySystem.refillTokensForSnapshot(snapshot); + this.$.logger.debug( + `continueRunIfUnblocked: run was still executing, sending notification`, + { + runId, + snapshot, + newSnapshot, + } + ); + await sendNotificationToWorker({ runId, snapshot: newSnapshot, @@ -563,7 +601,7 @@ export class WaitpointSystem { } else { // Because we cannot reacquire the concurrency, we need to enqueue the run again // and because the run is still executing, we need to set the status to QUEUED_EXECUTING - await this.enqueueSystem.enqueueRun({ + const newSnapshot = await this.enqueueSystem.enqueueRun({ run, env: run.runtimeEnvironment, snapshot: { @@ -577,21 +615,27 @@ export class WaitpointSystem { index: b.batchIndex ?? undefined, })), }); + + this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED_EXECUTING`, { + runId, + snapshot, + newSnapshot, + }); } } else { if (snapshot.executionStatus !== "RUN_CREATED" && !snapshot.checkpointId) { // TODO: We're screwed, should probably fail the run immediately - this.$.logger.error(`#continueRunIfUnblocked: run has no checkpoint`, { + this.$.logger.error(`continueRunIfUnblocked: run has no checkpoint`, { runId: run.id, snapshot, blockingWaitpoints, }); - throw new Error(`#continueRunIfUnblocked: run has no checkpoint: ${run.id}`); + throw new Error(`continueRunIfUnblocked: run has no checkpoint: ${run.id}`); } //put it back in the queue, with the original timestamp (w/ priority) //this prioritizes dequeuing waiting runs over new runs - await this.enqueueSystem.enqueueRun({ + const newSnapshot = await this.enqueueSystem.enqueueRun({ run, env: run.runtimeEnvironment, snapshot: { @@ -604,6 +648,12 @@ export class WaitpointSystem { })), checkpointId: snapshot.checkpointId ?? undefined, }); + + this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED`, { + runId, + snapshot, + newSnapshot, + }); } }); @@ -613,6 +663,10 @@ export class WaitpointSystem { taskRunId: runId, }, }); + + this.$.logger.debug(`continueRunIfUnblocked: removed blocking waitpoints`, { + runId, + }); } public async createRunAssociatedWaitpoint(