From 5eaa2e23eb4bc082c7270a1da1a529e70ee50490 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 24 Apr 2025 11:55:00 +0100 Subject: [PATCH 1/3] Increase v4 visibility timeouts --- .../run-engine/src/engine/workerCatalog.ts | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/internal-packages/run-engine/src/engine/workerCatalog.ts b/internal-packages/run-engine/src/engine/workerCatalog.ts index 92eddce19e..f3900d0f8f 100644 --- a/internal-packages/run-engine/src/engine/workerCatalog.ts +++ b/internal-packages/run-engine/src/engine/workerCatalog.ts @@ -6,20 +6,20 @@ export const workerCatalog = { waitpointId: z.string(), error: z.string().optional(), }), - visibilityTimeoutMs: 5000, + visibilityTimeoutMs: 30_000, }, heartbeatSnapshot: { schema: z.object({ runId: z.string(), snapshotId: z.string(), }), - visibilityTimeoutMs: 5000, + visibilityTimeoutMs: 30_000, }, expireRun: { schema: z.object({ runId: z.string(), }), - visibilityTimeoutMs: 5000, + visibilityTimeoutMs: 30_000, }, cancelRun: { schema: z.object({ @@ -27,30 +27,30 @@ export const workerCatalog = { completedAt: z.coerce.date(), reason: z.string().optional(), }), - visibilityTimeoutMs: 5000, + visibilityTimeoutMs: 30_000, }, queueRunsPendingVersion: { schema: z.object({ backgroundWorkerId: z.string(), }), - visibilityTimeoutMs: 5000, + visibilityTimeoutMs: 60_000, }, tryCompleteBatch: { schema: z.object({ batchId: z.string(), }), - visibilityTimeoutMs: 10_000, + visibilityTimeoutMs: 30_000, }, continueRunIfUnblocked: { schema: z.object({ runId: z.string(), }), - visibilityTimeoutMs: 10_000, + visibilityTimeoutMs: 30_000, }, enqueueDelayedRun: { schema: z.object({ runId: z.string(), }), - visibilityTimeoutMs: 10_000, + visibilityTimeoutMs: 30_000, }, }; From ee34b1584851ca6d18818ce713364f94a581d93c Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 24 Apr 2025 11:56:47 +0100 Subject: [PATCH 2/3] Added some logging for #continueRunIfUnblocked: run has no checkpoint --- .../run-engine/src/engine/systems/waitpointSystem.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index 425ae8262d..2905d697f3 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -573,6 +573,11 @@ export class WaitpointSystem { } else { if (snapshot.executionStatus !== "RUN_CREATED" && !snapshot.checkpointId) { // TODO: We're screwed, should probably fail the run immediately + this.$.logger.error(`#continueRunIfUnblocked: run has no checkpoint`, { + runId: run.id, + snapshot, + blockingWaitpoints, + }); throw new Error(`#continueRunIfUnblocked: run has no checkpoint: ${run.id}`); } From 7e4c896cba21aae708bb3e72fc71075a2e28d6e3 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 24 Apr 2025 12:28:54 +0100 Subject: [PATCH 3/3] =?UTF-8?q?If=20a=20run=20is=20finished=20or=20pending?= =?UTF-8?q?=20cancel,=20don=E2=80=99t=20try=20and=20continue=20it?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal-packages/run-engine/src/engine/statuses.ts | 5 +++++ .../run-engine/src/engine/systems/waitpointSystem.ts | 10 +++++++++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/internal-packages/run-engine/src/engine/statuses.ts b/internal-packages/run-engine/src/engine/statuses.ts index 5eb923fa3d..93a4428cac 100644 --- a/internal-packages/run-engine/src/engine/statuses.ts +++ b/internal-packages/run-engine/src/engine/statuses.ts @@ -31,6 +31,11 @@ export function isCheckpointable(status: TaskRunExecutionStatus): boolean { return checkpointableStatuses.includes(status); } +export function isFinishedOrPendingFinished(status: TaskRunExecutionStatus): boolean { + const finishedStatuses: TaskRunExecutionStatus[] = ["FINISHED", "PENDING_CANCEL"]; + return finishedStatuses.includes(status); +} + export function isFinalRunStatus(status: TaskRunStatus): boolean { const finalStatuses: TaskRunStatus[] = [ "CANCELED", diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index 2905d697f3..9dfb9659fc 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -10,7 +10,7 @@ import { } from "@trigger.dev/database"; import { nanoid } from "nanoid"; import { sendNotificationToWorker } from "../eventBus.js"; -import { isExecuting } from "../statuses.js"; +import { isExecuting, isFinishedOrPendingFinished } from "../statuses.js"; import { EnqueueSystem } from "./enqueueSystem.js"; import { ExecutionSnapshotSystem, getLatestExecutionSnapshot } from "./executionSnapshotSystem.js"; import { SystemResources } from "./systems.js"; @@ -512,6 +512,14 @@ export class WaitpointSystem { await this.$.runLock.lock([runId], 5000, async () => { const snapshot = await getLatestExecutionSnapshot(this.$.prisma, runId); + if (isFinishedOrPendingFinished(snapshot.executionStatus)) { + this.$.logger.debug(`#continueRunIfUnblocked: run is finished, skipping`, { + runId, + snapshot, + }); + return; + } + //run is still executing, send a message to the worker if (isExecuting(snapshot.executionStatus)) { const result = await this.$.runQueue.reacquireConcurrency(