From 22262ed057f6c94e403ae4d7202d86cbf43f7274 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Wed, 15 Jan 2025 19:24:24 +0000 Subject: [PATCH 01/11] WIP preventing concurrent waits, throw an error --- packages/core/src/v3/runtime/index.ts | 39 +++++++++++++++++-- .../hello-world/src/trigger/parallel-waits.ts | 22 +++++++++++ 2 files changed, 57 insertions(+), 4 deletions(-) create mode 100644 references/hello-world/src/trigger/parallel-waits.ts diff --git a/packages/core/src/v3/runtime/index.ts b/packages/core/src/v3/runtime/index.ts index 7eecb99296..31ef5fddb9 100644 --- a/packages/core/src/v3/runtime/index.ts +++ b/packages/core/src/v3/runtime/index.ts @@ -12,8 +12,16 @@ import { usage } from "../usage-api.js"; const NOOP_RUNTIME_MANAGER = new NoopRuntimeManager(); +const concurrentWaitErrorMessage = + "Parallel waits are not supported, e.g. using Promise.all() around our wait functions."; + +/** + * All state must be inside the RuntimeManager, do NOT store it on this class. + * This is because of the "dual package hazard", this can be bundled multiple times. + */ export class RuntimeAPI { private static _instance?: RuntimeAPI; + private isExecutingWait = false; private constructor() {} @@ -26,15 +34,21 @@ export class RuntimeAPI { } public waitForDuration(ms: number): Promise { - return usage.pauseAsync(() => this.#getRuntimeManager().waitForDuration(ms)); + return this.#preventConcurrentWaits(() => + usage.pauseAsync(() => this.#getRuntimeManager().waitForDuration(ms)) + ); } public waitUntil(date: Date): Promise { - return usage.pauseAsync(() => this.#getRuntimeManager().waitUntil(date)); + return this.#preventConcurrentWaits(() => + usage.pauseAsync(() => this.#getRuntimeManager().waitUntil(date)) + ); } public waitForTask(params: { id: string; ctx: TaskRunContext }): Promise { - return usage.pauseAsync(() => this.#getRuntimeManager().waitForTask(params)); + return this.#preventConcurrentWaits(() => + usage.pauseAsync(() => this.#getRuntimeManager().waitForTask(params)) + ); } public waitForBatch(params: { @@ -42,7 +56,9 @@ export class RuntimeAPI { runs: string[]; ctx: TaskRunContext; }): Promise { - return usage.pauseAsync(() => this.#getRuntimeManager().waitForBatch(params)); + return this.#preventConcurrentWaits(() => + usage.pauseAsync(() => this.#getRuntimeManager().waitForBatch(params)) + ); } public setGlobalRuntimeManager(runtimeManager: RuntimeManager): boolean { @@ -57,4 +73,19 @@ export class RuntimeAPI { #getRuntimeManager(): RuntimeManager { return getGlobal(API_NAME) ?? NOOP_RUNTIME_MANAGER; } + + async #preventConcurrentWaits(cb: () => Promise): Promise { + if (this.isExecutingWait) { + console.error(concurrentWaitErrorMessage); + throw new Error(concurrentWaitErrorMessage); + } + + this.isExecutingWait = true; + + try { + return await cb(); + } finally { + this.isExecutingWait = false; + } + } } diff --git a/references/hello-world/src/trigger/parallel-waits.ts b/references/hello-world/src/trigger/parallel-waits.ts new file mode 100644 index 0000000000..eddfcfcc6b --- /dev/null +++ b/references/hello-world/src/trigger/parallel-waits.ts @@ -0,0 +1,22 @@ +import { logger, task, wait } from "@trigger.dev/sdk/v3"; +import { childTask } from "./example.js"; + +/* + * These aren't currently supported, and so should throw clear errors + */ +export const parallelWaits = task({ + id: "parallel-waits", + run: async (payload: any, { ctx }) => { + //parallel wait for 5/10 seconds + await Promise.all([ + wait.for({ seconds: 5 }), + wait.until({ date: new Date(Date.now() + 10_000) }), + ]); + + //parallel task call + await Promise.all([ + childTask.triggerAndWait({ message: "Hello, world!" }), + childTask.batchTriggerAndWait([{ payload: { message: "Hello, world!" } }]), + ]); + }, +}); From 119cedc936cb04dbf34e0762555f70212e97d6cd Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 16 Jan 2025 09:47:16 +0000 Subject: [PATCH 02/11] Added ConcurrentWaitError (not retryable) --- packages/core/src/v3/errors.ts | 7 +++++++ packages/core/src/v3/workers/taskExecutor.ts | 4 +++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/packages/core/src/v3/errors.ts b/packages/core/src/v3/errors.ts index 232bb8aa94..209bfec649 100644 --- a/packages/core/src/v3/errors.ts +++ b/packages/core/src/v3/errors.ts @@ -31,6 +31,13 @@ export class TaskPayloadParsedError extends Error { } } +export class ConcurrentWaitError extends Error { + constructor(message: string) { + super(message); + this.name = "ConcurrentWaitError"; + } +} + export function parseError(error: unknown): TaskRunError { if (error instanceof Error) { return { diff --git a/packages/core/src/v3/workers/taskExecutor.ts b/packages/core/src/v3/workers/taskExecutor.ts index 894145878a..e115967605 100644 --- a/packages/core/src/v3/workers/taskExecutor.ts +++ b/packages/core/src/v3/workers/taskExecutor.ts @@ -536,7 +536,9 @@ export class TaskExecutor { if ( error instanceof Error && - (error.name === "AbortTaskRunError" || error.name === "TaskPayloadParsedError") + (error.name === "AbortTaskRunError" || + error.name === "TaskPayloadParsedError" || + error.name === "ConcurrentWaitError") ) { return { status: "skipped" }; } From e6d744cb85ce49ac98e7d66fad3c9348f96c118e Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 16 Jan 2025 09:47:44 +0000 Subject: [PATCH 03/11] Move preventMultipleWaits out of the RuntimeAPI --- packages/core/src/v3/runtime/index.ts | 35 +++---------------- .../src/v3/runtime/preventMultipleWaits.ts | 23 ++++++++++++ 2 files changed, 27 insertions(+), 31 deletions(-) create mode 100644 packages/core/src/v3/runtime/preventMultipleWaits.ts diff --git a/packages/core/src/v3/runtime/index.ts b/packages/core/src/v3/runtime/index.ts index 31ef5fddb9..24a520e75e 100644 --- a/packages/core/src/v3/runtime/index.ts +++ b/packages/core/src/v3/runtime/index.ts @@ -12,16 +12,12 @@ import { usage } from "../usage-api.js"; const NOOP_RUNTIME_MANAGER = new NoopRuntimeManager(); -const concurrentWaitErrorMessage = - "Parallel waits are not supported, e.g. using Promise.all() around our wait functions."; - /** * All state must be inside the RuntimeManager, do NOT store it on this class. * This is because of the "dual package hazard", this can be bundled multiple times. */ export class RuntimeAPI { private static _instance?: RuntimeAPI; - private isExecutingWait = false; private constructor() {} @@ -34,21 +30,15 @@ export class RuntimeAPI { } public waitForDuration(ms: number): Promise { - return this.#preventConcurrentWaits(() => - usage.pauseAsync(() => this.#getRuntimeManager().waitForDuration(ms)) - ); + return usage.pauseAsync(() => this.#getRuntimeManager().waitForDuration(ms)); } public waitUntil(date: Date): Promise { - return this.#preventConcurrentWaits(() => - usage.pauseAsync(() => this.#getRuntimeManager().waitUntil(date)) - ); + return usage.pauseAsync(() => this.#getRuntimeManager().waitUntil(date)); } public waitForTask(params: { id: string; ctx: TaskRunContext }): Promise { - return this.#preventConcurrentWaits(() => - usage.pauseAsync(() => this.#getRuntimeManager().waitForTask(params)) - ); + return usage.pauseAsync(() => this.#getRuntimeManager().waitForTask(params)); } public waitForBatch(params: { @@ -56,9 +46,7 @@ export class RuntimeAPI { runs: string[]; ctx: TaskRunContext; }): Promise { - return this.#preventConcurrentWaits(() => - usage.pauseAsync(() => this.#getRuntimeManager().waitForBatch(params)) - ); + return usage.pauseAsync(() => this.#getRuntimeManager().waitForBatch(params)); } public setGlobalRuntimeManager(runtimeManager: RuntimeManager): boolean { @@ -73,19 +61,4 @@ export class RuntimeAPI { #getRuntimeManager(): RuntimeManager { return getGlobal(API_NAME) ?? NOOP_RUNTIME_MANAGER; } - - async #preventConcurrentWaits(cb: () => Promise): Promise { - if (this.isExecutingWait) { - console.error(concurrentWaitErrorMessage); - throw new Error(concurrentWaitErrorMessage); - } - - this.isExecutingWait = true; - - try { - return await cb(); - } finally { - this.isExecutingWait = false; - } - } } diff --git a/packages/core/src/v3/runtime/preventMultipleWaits.ts b/packages/core/src/v3/runtime/preventMultipleWaits.ts new file mode 100644 index 0000000000..af2f10b880 --- /dev/null +++ b/packages/core/src/v3/runtime/preventMultipleWaits.ts @@ -0,0 +1,23 @@ +import { ConcurrentWaitError } from "../errors.js"; + +const concurrentWaitErrorMessage = + "Parallel waits are not supported, e.g. using Promise.all() around our wait functions."; + +export function preventMultipleWaits() { + let isExecutingWait = false; + + return async (cb: () => Promise): Promise => { + if (isExecutingWait) { + console.error(concurrentWaitErrorMessage); + throw new ConcurrentWaitError(concurrentWaitErrorMessage); + } + + isExecutingWait = true; + + try { + return await cb(); + } finally { + isExecutingWait = false; + } + }; +} From 4ce001ec6976e65117d7843b5943ea03f092caca Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 16 Jan 2025 13:34:28 +0000 Subject: [PATCH 04/11] Added preventMultipleWaits to the devRuntimeManager --- .../core/src/v3/runtime/devRuntimeManager.ts | 73 ++++++++++--------- 1 file changed, 40 insertions(+), 33 deletions(-) diff --git a/packages/core/src/v3/runtime/devRuntimeManager.ts b/packages/core/src/v3/runtime/devRuntimeManager.ts index acad2c3d0f..05f1a353a7 100644 --- a/packages/core/src/v3/runtime/devRuntimeManager.ts +++ b/packages/core/src/v3/runtime/devRuntimeManager.ts @@ -6,6 +6,7 @@ import { } from "../schemas/index.js"; import { unboundedTimeout } from "../utils/timers.js"; import { RuntimeManager } from "./manager.js"; +import { preventMultipleWaits } from "./preventMultipleWaits.js"; export class DevRuntimeManager implements RuntimeManager { _taskWaits: Map void }> = new Map(); @@ -17,12 +18,14 @@ export class DevRuntimeManager implements RuntimeManager { _pendingCompletionNotifications: Map = new Map(); + _preventMultipleWaits = preventMultipleWaits(); + disable(): void { // do nothing } async waitForDuration(ms: number): Promise { - await unboundedTimeout(ms); + await this._preventMultipleWaits(() => unboundedTimeout(ms)); } async waitUntil(date: Date): Promise { @@ -30,21 +33,23 @@ export class DevRuntimeManager implements RuntimeManager { } async waitForTask(params: { id: string; ctx: TaskRunContext }): Promise { - const pendingCompletion = this._pendingCompletionNotifications.get(params.id); + return this._preventMultipleWaits(async () => { + const pendingCompletion = this._pendingCompletionNotifications.get(params.id); - if (pendingCompletion) { - this._pendingCompletionNotifications.delete(params.id); + if (pendingCompletion) { + this._pendingCompletionNotifications.delete(params.id); - return pendingCompletion; - } + return pendingCompletion; + } - const promise = new Promise((resolve) => { - this._taskWaits.set(params.id, { resolve }); - }); + const promise = new Promise((resolve) => { + this._taskWaits.set(params.id, { resolve }); + }); - await this.#tryFlushMetadata(); + await this.#tryFlushMetadata(); - return await promise; + return await promise; + }); } async waitForBatch(params: { @@ -52,36 +57,38 @@ export class DevRuntimeManager implements RuntimeManager { runs: string[]; ctx: TaskRunContext; }): Promise { - if (!params.runs.length) { - return Promise.resolve({ id: params.id, items: [] }); - } + return this._preventMultipleWaits(async () => { + if (!params.runs.length) { + return Promise.resolve({ id: params.id, items: [] }); + } - const promise = Promise.all( - params.runs.map((runId) => { - return new Promise((resolve, reject) => { - const pendingCompletion = this._pendingCompletionNotifications.get(runId); + const promise = Promise.all( + params.runs.map((runId) => { + return new Promise((resolve, reject) => { + const pendingCompletion = this._pendingCompletionNotifications.get(runId); - if (pendingCompletion) { - this._pendingCompletionNotifications.delete(runId); + if (pendingCompletion) { + this._pendingCompletionNotifications.delete(runId); - resolve(pendingCompletion); + resolve(pendingCompletion); - return; - } + return; + } - this._taskWaits.set(runId, { resolve }); - }); - }) - ); + this._taskWaits.set(runId, { resolve }); + }); + }) + ); - await this.#tryFlushMetadata(); + await this.#tryFlushMetadata(); - const results = await promise; + const results = await promise; - return { - id: params.id, - items: results, - }; + return { + id: params.id, + items: results, + }; + }); } resumeTask(completion: TaskRunExecutionResult, runId: string): void { From 86ca36356c9e170855d337ec75d2a48eddb10cef Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 16 Jan 2025 14:18:56 +0000 Subject: [PATCH 05/11] Added throwable InternalError. Plus new TASK_DID_CONCURRENT_WAIT code --- packages/core/src/v3/errors.ts | 54 ++++++++++++++++++++++++++++++---- 1 file changed, 48 insertions(+), 6 deletions(-) diff --git a/packages/core/src/v3/errors.ts b/packages/core/src/v3/errors.ts index 209bfec649..fef18f5251 100644 --- a/packages/core/src/v3/errors.ts +++ b/packages/core/src/v3/errors.ts @@ -12,6 +12,37 @@ import { links } from "./links.js"; import { ExceptionEventProperties } from "./schemas/openTelemetry.js"; import { assertExhaustive } from "../utils.js"; +/** + * If you throw this, it will get converted into an INTERNAL_ERROR + */ +export class InternalError extends Error { + public readonly code: TaskRunErrorCodes; + public readonly skipRetrying: boolean; + + constructor({ + code, + message, + showStackTrace = true, + skipRetrying = false, + }: { + code: TaskRunErrorCodes; + message?: string; + showStackTrace?: boolean; + skipRetrying?: boolean; + }) { + super(`${code}: ${message ?? "No message"}`); + this.name = "InternalError"; + this.code = code; + this.message = message ?? "InternalError"; + + if (!showStackTrace) { + this.stack = undefined; + } + + this.skipRetrying = skipRetrying; + } +} + export class AbortTaskRunError extends Error { constructor(message: string) { super(message); @@ -31,14 +62,16 @@ export class TaskPayloadParsedError extends Error { } } -export class ConcurrentWaitError extends Error { - constructor(message: string) { - super(message); - this.name = "ConcurrentWaitError"; +export function parseError(error: unknown): TaskRunError { + if (error instanceof InternalError) { + return { + type: "INTERNAL_ERROR", + code: error.code, + message: error.message, + stackTrace: error.stack ?? "", + }; } -} -export function parseError(error: unknown): TaskRunError { if (error instanceof Error) { return { type: "BUILT_IN_ERROR", @@ -175,6 +208,7 @@ export function shouldRetryError(error: TaskRunError): boolean { case "DISK_SPACE_EXCEEDED": case "TASK_RUN_HEARTBEAT_TIMEOUT": case "OUTDATED_SDK_VERSION": + case "TASK_DID_CONCURRENT_WAIT": return false; case "GRACEFUL_EXIT_TIMEOUT": @@ -444,6 +478,14 @@ const prettyInternalErrors: Partial< href: links.docs.upgrade.beta, }, }, + TASK_DID_CONCURRENT_WAIT: { + message: + "Parallel waits are not supported, e.g. using Promise.all() around our wait functions.", + link: { + name: "Read the docs for solutions", + href: links.docs.troubleshooting.concurrentWaits, + }, + }, }; const getPrettyTaskRunError = (code: TaskRunInternalError["code"]): TaskRunInternalError => { From 1b3a6c095fcbeb44ed5a20f858ebc61cfc9d0ed4 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 16 Jan 2025 14:19:12 +0000 Subject: [PATCH 06/11] Docs link for troubleshooting concurrent waits --- packages/core/src/v3/links.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/core/src/v3/links.ts b/packages/core/src/v3/links.ts index 39b5a89589..11a45fe023 100644 --- a/packages/core/src/v3/links.ts +++ b/packages/core/src/v3/links.ts @@ -12,6 +12,9 @@ export const links = { upgrade: { beta: "https://trigger.dev/docs/upgrading-beta", }, + troubleshooting: { + concurrentWaits: "https://trigger.dev/docs/troubleshooting#parallel-waits-are-not-supported", + }, }, site: { home: "https://trigger.dev", From da2e5e065700faf67f42a13a26d328ae1e29a2bd Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 16 Jan 2025 14:19:20 +0000 Subject: [PATCH 07/11] Docs for troubleshooting concurrent waits --- docs/troubleshooting.mdx | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/docs/troubleshooting.mdx b/docs/troubleshooting.mdx index 11aa4a6d6a..44e05f6c82 100644 --- a/docs/troubleshooting.mdx +++ b/docs/troubleshooting.mdx @@ -80,6 +80,21 @@ Your code is deployed separately from the rest of your app(s) so you need to mak Prisma uses code generation to create the client from your schema file. This means you need to add a bit of config so we can generate this file before your tasks run: [Read the guide](/config/config-file#prisma). +### `Parallel waits are not supported` + +In the current version, you can't perform more that one "wait" in parallel. + +Waits include: +- `wait.for()` +- `wait.until()` +- `task.triggerAndWait()` +- `task.batchTriggerAndWait()` +- And any of our functions with `wait` in the name. + +This restriction exists because we suspend the task server after a wait, and resume it when the wait is done. At the moment, if you do more than one wait, the run will never continue when deployed, so we throw this error instead. + +The most common situation this happens is if you're using `Promise.all` around some of our wait functions. Instead of doing this use our built-in functions for [triggering tasks](/triggering#triggering-from-inside-another-task). We have functions that allow you to trigger different tasks in parallel. + ### When triggering subtasks the parent task finishes too soon Make sure that you always use `await` when you call `trigger`, `triggerAndWait`, `batchTrigger`, and `batchTriggerAndWait`. If you don't then it's likely the task(s) won't be triggered because the calling function process can be terminated before the networks calls are sent. From 1c19fdaafb2744fb712448667e96de2d11b70271 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 16 Jan 2025 14:19:28 +0000 Subject: [PATCH 08/11] preventMultipleWaits function --- packages/core/src/v3/runtime/preventMultipleWaits.ts | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/packages/core/src/v3/runtime/preventMultipleWaits.ts b/packages/core/src/v3/runtime/preventMultipleWaits.ts index af2f10b880..35aa9bb949 100644 --- a/packages/core/src/v3/runtime/preventMultipleWaits.ts +++ b/packages/core/src/v3/runtime/preventMultipleWaits.ts @@ -1,4 +1,5 @@ -import { ConcurrentWaitError } from "../errors.js"; +import { InternalError } from "../errors.js"; +import { TaskRunErrorCodes } from "../schemas/common.js"; const concurrentWaitErrorMessage = "Parallel waits are not supported, e.g. using Promise.all() around our wait functions."; @@ -9,7 +10,12 @@ export function preventMultipleWaits() { return async (cb: () => Promise): Promise => { if (isExecutingWait) { console.error(concurrentWaitErrorMessage); - throw new ConcurrentWaitError(concurrentWaitErrorMessage); + throw new InternalError({ + code: TaskRunErrorCodes.TASK_DID_CONCURRENT_WAIT, + message: concurrentWaitErrorMessage, + skipRetrying: true, + showStackTrace: false, + }); } isExecutingWait = true; From 2926461827b69dd100b1e8bf9f3b0babc246ffe5 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 16 Jan 2025 14:19:35 +0000 Subject: [PATCH 09/11] Added TASK_DID_CONCURRENT_WAIT code --- packages/core/src/v3/schemas/common.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/core/src/v3/schemas/common.ts b/packages/core/src/v3/schemas/common.ts index ca61333684..5b47d99c20 100644 --- a/packages/core/src/v3/schemas/common.ts +++ b/packages/core/src/v3/schemas/common.ts @@ -171,6 +171,7 @@ export const TaskRunInternalError = z.object({ "POD_EVICTED", "POD_UNKNOWN_ERROR", "OUTDATED_SDK_VERSION", + "TASK_DID_CONCURRENT_WAIT", ]), message: z.string().optional(), stackTrace: z.string().optional(), @@ -179,6 +180,7 @@ export const TaskRunInternalError = z.object({ export type TaskRunInternalError = z.infer; export const TaskRunErrorCodes = TaskRunInternalError.shape.code.enum; +export type TaskRunErrorCodes = TaskRunInternalError["code"]; export const TaskRunError = z.discriminatedUnion("type", [ TaskRunBuiltInError, From 60da979d3d0b58adcc831ec56b8ae3652eedab30 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 16 Jan 2025 14:19:47 +0000 Subject: [PATCH 10/11] Deal with InternalErrors that skipRetrying --- packages/core/src/v3/workers/taskExecutor.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/core/src/v3/workers/taskExecutor.ts b/packages/core/src/v3/workers/taskExecutor.ts index e115967605..ef94282656 100644 --- a/packages/core/src/v3/workers/taskExecutor.ts +++ b/packages/core/src/v3/workers/taskExecutor.ts @@ -2,7 +2,7 @@ import { SpanKind } from "@opentelemetry/api"; import { VERSION } from "../../version.js"; import { ApiError, RateLimitError } from "../apiClient/errors.js"; import { ConsoleInterceptor } from "../consoleInterceptor.js"; -import { parseError, sanitizeError, TaskPayloadParsedError } from "../errors.js"; +import { InternalError, parseError, sanitizeError, TaskPayloadParsedError } from "../errors.js"; import { runMetadata, TriggerConfig, waitUntil } from "../index.js"; import { recordSpanException, TracingSDK } from "../otel/index.js"; import { @@ -538,9 +538,9 @@ export class TaskExecutor { error instanceof Error && (error.name === "AbortTaskRunError" || error.name === "TaskPayloadParsedError" || - error.name === "ConcurrentWaitError") + ("skipRetrying" in error && error.skipRetrying === true)) ) { - return { status: "skipped" }; + return { status: "skipped", error: error instanceof InternalError ? error : undefined }; } if (execution.run.maxAttempts) { From 7a250369329a27cd8fd1297645cc0b63d0f0c5ed Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 16 Jan 2025 14:24:02 +0000 Subject: [PATCH 11/11] Added preventMultipleWaits to prod --- .../core/src/v3/runtime/prodRuntimeManager.ts | 95 ++++++++++--------- 1 file changed, 52 insertions(+), 43 deletions(-) diff --git a/packages/core/src/v3/runtime/prodRuntimeManager.ts b/packages/core/src/v3/runtime/prodRuntimeManager.ts index 4fd63e04bd..a8ded95790 100644 --- a/packages/core/src/v3/runtime/prodRuntimeManager.ts +++ b/packages/core/src/v3/runtime/prodRuntimeManager.ts @@ -6,6 +6,7 @@ import { } from "../schemas/index.js"; import { ExecutorToWorkerProcessConnection } from "../zodIpc.js"; import { RuntimeManager } from "./manager.js"; +import { preventMultipleWaits } from "./preventMultipleWaits.js"; export type ProdRuntimeManagerOptions = { waitThresholdInMs?: number; @@ -21,6 +22,8 @@ export class ProdRuntimeManager implements RuntimeManager { _waitForDuration: { resolve: (value: void) => void; reject: (err?: any) => void } | undefined; + _preventMultipleWaits = preventMultipleWaits(); + constructor( private ipc: ExecutorToWorkerProcessConnection, private options: ProdRuntimeManagerOptions = {} @@ -31,19 +34,21 @@ export class ProdRuntimeManager implements RuntimeManager { } async waitForDuration(ms: number): Promise { - const now = Date.now(); + return this._preventMultipleWaits(async () => { + const now = Date.now(); - const resume = new Promise((resolve, reject) => { - this._waitForDuration = { resolve, reject }; - }); + const resume = new Promise((resolve, reject) => { + this._waitForDuration = { resolve, reject }; + }); - await this.ipc.send("WAIT_FOR_DURATION", { - ms, - now, - waitThresholdInMs: this.waitThresholdInMs, - }); + await this.ipc.send("WAIT_FOR_DURATION", { + ms, + now, + waitThresholdInMs: this.waitThresholdInMs, + }); - await resume; + await resume; + }); } resumeAfterDuration(): void { @@ -63,19 +68,21 @@ export class ProdRuntimeManager implements RuntimeManager { } async waitForTask(params: { id: string; ctx: TaskRunContext }): Promise { - const promise = new Promise((resolve) => { - this._taskWaits.set(params.id, { resolve }); - }); + return this._preventMultipleWaits(async () => { + const promise = new Promise((resolve) => { + this._taskWaits.set(params.id, { resolve }); + }); - await this.ipc.send("WAIT_FOR_TASK", { - friendlyId: params.id, - }); + await this.ipc.send("WAIT_FOR_TASK", { + friendlyId: params.id, + }); - const result = await promise; + const result = await promise; - clock.reset(); + clock.reset(); - return result; + return result; + }); } async waitForBatch(params: { @@ -83,31 +90,33 @@ export class ProdRuntimeManager implements RuntimeManager { runs: string[]; ctx: TaskRunContext; }): Promise { - if (!params.runs.length) { - return Promise.resolve({ id: params.id, items: [] }); - } - - const promise = Promise.all( - params.runs.map((runId) => { - return new Promise((resolve, reject) => { - this._taskWaits.set(runId, { resolve }); - }); - }) - ); - - await this.ipc.send("WAIT_FOR_BATCH", { - batchFriendlyId: params.id, - runFriendlyIds: params.runs, + return this._preventMultipleWaits(async () => { + if (!params.runs.length) { + return Promise.resolve({ id: params.id, items: [] }); + } + + const promise = Promise.all( + params.runs.map((runId) => { + return new Promise((resolve, reject) => { + this._taskWaits.set(runId, { resolve }); + }); + }) + ); + + await this.ipc.send("WAIT_FOR_BATCH", { + batchFriendlyId: params.id, + runFriendlyIds: params.runs, + }); + + const results = await promise; + + clock.reset(); + + return { + id: params.id, + items: results, + }; }); - - const results = await promise; - - clock.reset(); - - return { - id: params.id, - items: results, - }; } resumeTask(completion: TaskRunExecutionResult): void {