diff --git a/docs/troubleshooting.mdx b/docs/troubleshooting.mdx index 11aa4a6d6a..44e05f6c82 100644 --- a/docs/troubleshooting.mdx +++ b/docs/troubleshooting.mdx @@ -80,6 +80,21 @@ Your code is deployed separately from the rest of your app(s) so you need to mak Prisma uses code generation to create the client from your schema file. This means you need to add a bit of config so we can generate this file before your tasks run: [Read the guide](/config/config-file#prisma). +### `Parallel waits are not supported` + +In the current version, you can't perform more that one "wait" in parallel. + +Waits include: +- `wait.for()` +- `wait.until()` +- `task.triggerAndWait()` +- `task.batchTriggerAndWait()` +- And any of our functions with `wait` in the name. + +This restriction exists because we suspend the task server after a wait, and resume it when the wait is done. At the moment, if you do more than one wait, the run will never continue when deployed, so we throw this error instead. + +The most common situation this happens is if you're using `Promise.all` around some of our wait functions. Instead of doing this use our built-in functions for [triggering tasks](/triggering#triggering-from-inside-another-task). We have functions that allow you to trigger different tasks in parallel. + ### When triggering subtasks the parent task finishes too soon Make sure that you always use `await` when you call `trigger`, `triggerAndWait`, `batchTrigger`, and `batchTriggerAndWait`. If you don't then it's likely the task(s) won't be triggered because the calling function process can be terminated before the networks calls are sent. diff --git a/packages/core/src/v3/errors.ts b/packages/core/src/v3/errors.ts index 232bb8aa94..fef18f5251 100644 --- a/packages/core/src/v3/errors.ts +++ b/packages/core/src/v3/errors.ts @@ -12,6 +12,37 @@ import { links } from "./links.js"; import { ExceptionEventProperties } from "./schemas/openTelemetry.js"; import { assertExhaustive } from "../utils.js"; +/** + * If you throw this, it will get converted into an INTERNAL_ERROR + */ +export class InternalError extends Error { + public readonly code: TaskRunErrorCodes; + public readonly skipRetrying: boolean; + + constructor({ + code, + message, + showStackTrace = true, + skipRetrying = false, + }: { + code: TaskRunErrorCodes; + message?: string; + showStackTrace?: boolean; + skipRetrying?: boolean; + }) { + super(`${code}: ${message ?? "No message"}`); + this.name = "InternalError"; + this.code = code; + this.message = message ?? "InternalError"; + + if (!showStackTrace) { + this.stack = undefined; + } + + this.skipRetrying = skipRetrying; + } +} + export class AbortTaskRunError extends Error { constructor(message: string) { super(message); @@ -32,6 +63,15 @@ export class TaskPayloadParsedError extends Error { } export function parseError(error: unknown): TaskRunError { + if (error instanceof InternalError) { + return { + type: "INTERNAL_ERROR", + code: error.code, + message: error.message, + stackTrace: error.stack ?? "", + }; + } + if (error instanceof Error) { return { type: "BUILT_IN_ERROR", @@ -168,6 +208,7 @@ export function shouldRetryError(error: TaskRunError): boolean { case "DISK_SPACE_EXCEEDED": case "TASK_RUN_HEARTBEAT_TIMEOUT": case "OUTDATED_SDK_VERSION": + case "TASK_DID_CONCURRENT_WAIT": return false; case "GRACEFUL_EXIT_TIMEOUT": @@ -437,6 +478,14 @@ const prettyInternalErrors: Partial< href: links.docs.upgrade.beta, }, }, + TASK_DID_CONCURRENT_WAIT: { + message: + "Parallel waits are not supported, e.g. using Promise.all() around our wait functions.", + link: { + name: "Read the docs for solutions", + href: links.docs.troubleshooting.concurrentWaits, + }, + }, }; const getPrettyTaskRunError = (code: TaskRunInternalError["code"]): TaskRunInternalError => { diff --git a/packages/core/src/v3/links.ts b/packages/core/src/v3/links.ts index 39b5a89589..11a45fe023 100644 --- a/packages/core/src/v3/links.ts +++ b/packages/core/src/v3/links.ts @@ -12,6 +12,9 @@ export const links = { upgrade: { beta: "https://trigger.dev/docs/upgrading-beta", }, + troubleshooting: { + concurrentWaits: "https://trigger.dev/docs/troubleshooting#parallel-waits-are-not-supported", + }, }, site: { home: "https://trigger.dev", diff --git a/packages/core/src/v3/runtime/devRuntimeManager.ts b/packages/core/src/v3/runtime/devRuntimeManager.ts index acad2c3d0f..05f1a353a7 100644 --- a/packages/core/src/v3/runtime/devRuntimeManager.ts +++ b/packages/core/src/v3/runtime/devRuntimeManager.ts @@ -6,6 +6,7 @@ import { } from "../schemas/index.js"; import { unboundedTimeout } from "../utils/timers.js"; import { RuntimeManager } from "./manager.js"; +import { preventMultipleWaits } from "./preventMultipleWaits.js"; export class DevRuntimeManager implements RuntimeManager { _taskWaits: Map void }> = new Map(); @@ -17,12 +18,14 @@ export class DevRuntimeManager implements RuntimeManager { _pendingCompletionNotifications: Map = new Map(); + _preventMultipleWaits = preventMultipleWaits(); + disable(): void { // do nothing } async waitForDuration(ms: number): Promise { - await unboundedTimeout(ms); + await this._preventMultipleWaits(() => unboundedTimeout(ms)); } async waitUntil(date: Date): Promise { @@ -30,21 +33,23 @@ export class DevRuntimeManager implements RuntimeManager { } async waitForTask(params: { id: string; ctx: TaskRunContext }): Promise { - const pendingCompletion = this._pendingCompletionNotifications.get(params.id); + return this._preventMultipleWaits(async () => { + const pendingCompletion = this._pendingCompletionNotifications.get(params.id); - if (pendingCompletion) { - this._pendingCompletionNotifications.delete(params.id); + if (pendingCompletion) { + this._pendingCompletionNotifications.delete(params.id); - return pendingCompletion; - } + return pendingCompletion; + } - const promise = new Promise((resolve) => { - this._taskWaits.set(params.id, { resolve }); - }); + const promise = new Promise((resolve) => { + this._taskWaits.set(params.id, { resolve }); + }); - await this.#tryFlushMetadata(); + await this.#tryFlushMetadata(); - return await promise; + return await promise; + }); } async waitForBatch(params: { @@ -52,36 +57,38 @@ export class DevRuntimeManager implements RuntimeManager { runs: string[]; ctx: TaskRunContext; }): Promise { - if (!params.runs.length) { - return Promise.resolve({ id: params.id, items: [] }); - } + return this._preventMultipleWaits(async () => { + if (!params.runs.length) { + return Promise.resolve({ id: params.id, items: [] }); + } - const promise = Promise.all( - params.runs.map((runId) => { - return new Promise((resolve, reject) => { - const pendingCompletion = this._pendingCompletionNotifications.get(runId); + const promise = Promise.all( + params.runs.map((runId) => { + return new Promise((resolve, reject) => { + const pendingCompletion = this._pendingCompletionNotifications.get(runId); - if (pendingCompletion) { - this._pendingCompletionNotifications.delete(runId); + if (pendingCompletion) { + this._pendingCompletionNotifications.delete(runId); - resolve(pendingCompletion); + resolve(pendingCompletion); - return; - } + return; + } - this._taskWaits.set(runId, { resolve }); - }); - }) - ); + this._taskWaits.set(runId, { resolve }); + }); + }) + ); - await this.#tryFlushMetadata(); + await this.#tryFlushMetadata(); - const results = await promise; + const results = await promise; - return { - id: params.id, - items: results, - }; + return { + id: params.id, + items: results, + }; + }); } resumeTask(completion: TaskRunExecutionResult, runId: string): void { diff --git a/packages/core/src/v3/runtime/index.ts b/packages/core/src/v3/runtime/index.ts index 7eecb99296..24a520e75e 100644 --- a/packages/core/src/v3/runtime/index.ts +++ b/packages/core/src/v3/runtime/index.ts @@ -12,6 +12,10 @@ import { usage } from "../usage-api.js"; const NOOP_RUNTIME_MANAGER = new NoopRuntimeManager(); +/** + * All state must be inside the RuntimeManager, do NOT store it on this class. + * This is because of the "dual package hazard", this can be bundled multiple times. + */ export class RuntimeAPI { private static _instance?: RuntimeAPI; diff --git a/packages/core/src/v3/runtime/preventMultipleWaits.ts b/packages/core/src/v3/runtime/preventMultipleWaits.ts new file mode 100644 index 0000000000..35aa9bb949 --- /dev/null +++ b/packages/core/src/v3/runtime/preventMultipleWaits.ts @@ -0,0 +1,29 @@ +import { InternalError } from "../errors.js"; +import { TaskRunErrorCodes } from "../schemas/common.js"; + +const concurrentWaitErrorMessage = + "Parallel waits are not supported, e.g. using Promise.all() around our wait functions."; + +export function preventMultipleWaits() { + let isExecutingWait = false; + + return async (cb: () => Promise): Promise => { + if (isExecutingWait) { + console.error(concurrentWaitErrorMessage); + throw new InternalError({ + code: TaskRunErrorCodes.TASK_DID_CONCURRENT_WAIT, + message: concurrentWaitErrorMessage, + skipRetrying: true, + showStackTrace: false, + }); + } + + isExecutingWait = true; + + try { + return await cb(); + } finally { + isExecutingWait = false; + } + }; +} diff --git a/packages/core/src/v3/runtime/prodRuntimeManager.ts b/packages/core/src/v3/runtime/prodRuntimeManager.ts index 4fd63e04bd..a8ded95790 100644 --- a/packages/core/src/v3/runtime/prodRuntimeManager.ts +++ b/packages/core/src/v3/runtime/prodRuntimeManager.ts @@ -6,6 +6,7 @@ import { } from "../schemas/index.js"; import { ExecutorToWorkerProcessConnection } from "../zodIpc.js"; import { RuntimeManager } from "./manager.js"; +import { preventMultipleWaits } from "./preventMultipleWaits.js"; export type ProdRuntimeManagerOptions = { waitThresholdInMs?: number; @@ -21,6 +22,8 @@ export class ProdRuntimeManager implements RuntimeManager { _waitForDuration: { resolve: (value: void) => void; reject: (err?: any) => void } | undefined; + _preventMultipleWaits = preventMultipleWaits(); + constructor( private ipc: ExecutorToWorkerProcessConnection, private options: ProdRuntimeManagerOptions = {} @@ -31,19 +34,21 @@ export class ProdRuntimeManager implements RuntimeManager { } async waitForDuration(ms: number): Promise { - const now = Date.now(); + return this._preventMultipleWaits(async () => { + const now = Date.now(); - const resume = new Promise((resolve, reject) => { - this._waitForDuration = { resolve, reject }; - }); + const resume = new Promise((resolve, reject) => { + this._waitForDuration = { resolve, reject }; + }); - await this.ipc.send("WAIT_FOR_DURATION", { - ms, - now, - waitThresholdInMs: this.waitThresholdInMs, - }); + await this.ipc.send("WAIT_FOR_DURATION", { + ms, + now, + waitThresholdInMs: this.waitThresholdInMs, + }); - await resume; + await resume; + }); } resumeAfterDuration(): void { @@ -63,19 +68,21 @@ export class ProdRuntimeManager implements RuntimeManager { } async waitForTask(params: { id: string; ctx: TaskRunContext }): Promise { - const promise = new Promise((resolve) => { - this._taskWaits.set(params.id, { resolve }); - }); + return this._preventMultipleWaits(async () => { + const promise = new Promise((resolve) => { + this._taskWaits.set(params.id, { resolve }); + }); - await this.ipc.send("WAIT_FOR_TASK", { - friendlyId: params.id, - }); + await this.ipc.send("WAIT_FOR_TASK", { + friendlyId: params.id, + }); - const result = await promise; + const result = await promise; - clock.reset(); + clock.reset(); - return result; + return result; + }); } async waitForBatch(params: { @@ -83,31 +90,33 @@ export class ProdRuntimeManager implements RuntimeManager { runs: string[]; ctx: TaskRunContext; }): Promise { - if (!params.runs.length) { - return Promise.resolve({ id: params.id, items: [] }); - } - - const promise = Promise.all( - params.runs.map((runId) => { - return new Promise((resolve, reject) => { - this._taskWaits.set(runId, { resolve }); - }); - }) - ); - - await this.ipc.send("WAIT_FOR_BATCH", { - batchFriendlyId: params.id, - runFriendlyIds: params.runs, + return this._preventMultipleWaits(async () => { + if (!params.runs.length) { + return Promise.resolve({ id: params.id, items: [] }); + } + + const promise = Promise.all( + params.runs.map((runId) => { + return new Promise((resolve, reject) => { + this._taskWaits.set(runId, { resolve }); + }); + }) + ); + + await this.ipc.send("WAIT_FOR_BATCH", { + batchFriendlyId: params.id, + runFriendlyIds: params.runs, + }); + + const results = await promise; + + clock.reset(); + + return { + id: params.id, + items: results, + }; }); - - const results = await promise; - - clock.reset(); - - return { - id: params.id, - items: results, - }; } resumeTask(completion: TaskRunExecutionResult): void { diff --git a/packages/core/src/v3/schemas/common.ts b/packages/core/src/v3/schemas/common.ts index ca61333684..5b47d99c20 100644 --- a/packages/core/src/v3/schemas/common.ts +++ b/packages/core/src/v3/schemas/common.ts @@ -171,6 +171,7 @@ export const TaskRunInternalError = z.object({ "POD_EVICTED", "POD_UNKNOWN_ERROR", "OUTDATED_SDK_VERSION", + "TASK_DID_CONCURRENT_WAIT", ]), message: z.string().optional(), stackTrace: z.string().optional(), @@ -179,6 +180,7 @@ export const TaskRunInternalError = z.object({ export type TaskRunInternalError = z.infer; export const TaskRunErrorCodes = TaskRunInternalError.shape.code.enum; +export type TaskRunErrorCodes = TaskRunInternalError["code"]; export const TaskRunError = z.discriminatedUnion("type", [ TaskRunBuiltInError, diff --git a/packages/core/src/v3/workers/taskExecutor.ts b/packages/core/src/v3/workers/taskExecutor.ts index 894145878a..ef94282656 100644 --- a/packages/core/src/v3/workers/taskExecutor.ts +++ b/packages/core/src/v3/workers/taskExecutor.ts @@ -2,7 +2,7 @@ import { SpanKind } from "@opentelemetry/api"; import { VERSION } from "../../version.js"; import { ApiError, RateLimitError } from "../apiClient/errors.js"; import { ConsoleInterceptor } from "../consoleInterceptor.js"; -import { parseError, sanitizeError, TaskPayloadParsedError } from "../errors.js"; +import { InternalError, parseError, sanitizeError, TaskPayloadParsedError } from "../errors.js"; import { runMetadata, TriggerConfig, waitUntil } from "../index.js"; import { recordSpanException, TracingSDK } from "../otel/index.js"; import { @@ -536,9 +536,11 @@ export class TaskExecutor { if ( error instanceof Error && - (error.name === "AbortTaskRunError" || error.name === "TaskPayloadParsedError") + (error.name === "AbortTaskRunError" || + error.name === "TaskPayloadParsedError" || + ("skipRetrying" in error && error.skipRetrying === true)) ) { - return { status: "skipped" }; + return { status: "skipped", error: error instanceof InternalError ? error : undefined }; } if (execution.run.maxAttempts) { diff --git a/references/hello-world/src/trigger/parallel-waits.ts b/references/hello-world/src/trigger/parallel-waits.ts new file mode 100644 index 0000000000..eddfcfcc6b --- /dev/null +++ b/references/hello-world/src/trigger/parallel-waits.ts @@ -0,0 +1,22 @@ +import { logger, task, wait } from "@trigger.dev/sdk/v3"; +import { childTask } from "./example.js"; + +/* + * These aren't currently supported, and so should throw clear errors + */ +export const parallelWaits = task({ + id: "parallel-waits", + run: async (payload: any, { ctx }) => { + //parallel wait for 5/10 seconds + await Promise.all([ + wait.for({ seconds: 5 }), + wait.until({ date: new Date(Date.now() + 10_000) }), + ]); + + //parallel task call + await Promise.all([ + childTask.triggerAndWait({ message: "Hello, world!" }), + childTask.batchTriggerAndWait([{ payload: { message: "Hello, world!" } }]), + ]); + }, +});