diff --git a/packages/core/src/v3/workers/taskExecutor.ts b/packages/core/src/v3/workers/taskExecutor.ts index 74acc8dc82..3e8f64ab58 100644 --- a/packages/core/src/v3/workers/taskExecutor.ts +++ b/packages/core/src/v3/workers/taskExecutor.ts @@ -143,7 +143,21 @@ export class TaskExecutor { ); } - parsedPayload = await this.#parsePayload(payloadResult); + const [parsePayloadError, parsedPayloadResult] = await tryCatch( + this.#parsePayload(payloadResult) + ); + + if (parsePayloadError) { + recordSpanException(span, parsePayloadError); + return this.#internalErrorResult( + execution, + TaskRunErrorCodes.TASK_INPUT_ERROR, + parsePayloadError, + true + ); + } + + parsedPayload = parsedPayloadResult; lifecycleHooks.registerOnWaitHookListener(async (wait) => { await this.#callOnWaitFunctions(wait, parsedPayload, ctx, initOutput, signal); @@ -1369,7 +1383,12 @@ export class TaskExecutor { }); } - #internalErrorResult(execution: TaskRunExecution, code: TaskRunErrorCodes, error: unknown) { + #internalErrorResult( + execution: TaskRunExecution, + code: TaskRunErrorCodes, + error: unknown, + skippedRetrying?: boolean + ) { return { ok: false, id: execution.run.id, @@ -1384,6 +1403,7 @@ export class TaskExecutor { : undefined, stackTrace: error instanceof Error ? error.stack : undefined, }, + skippedRetrying, } satisfies TaskRunExecutionResult; } diff --git a/packages/core/test/taskExecutor.test.ts b/packages/core/test/taskExecutor.test.ts index 531a872e4f..d78cc57cd7 100644 --- a/packages/core/test/taskExecutor.test.ts +++ b/packages/core/test/taskExecutor.test.ts @@ -1809,6 +1809,37 @@ describe("TaskExecutor", () => { expect(delay).toBeGreaterThan(29900); // Allow for some time passing during test expect(delay).toBeLessThan(32000); // Account for max 2000ms jitter }); + + test("should return error and skip retrying if parsePayload throws", async () => { + const parseError = new Error("Parse failed"); + const task = { + id: "test-task", + fns: { + run: async () => { + throw new Error("Should not reach run"); + }, + parsePayload: async () => { + throw parseError; + }, + }, + }; + + const result = await executeTask(task, { foo: "bar" }, undefined); + + expect(result).toEqual({ + result: { + ok: false, + id: "test-run-id", + error: { + type: "INTERNAL_ERROR", + code: TaskRunErrorCodes.TASK_INPUT_ERROR, + message: "TaskPayloadParsedError: Parsing payload with schema failed: Parse failed", + stackTrace: expect.any(String), + }, + skippedRetrying: true, + }, + }); + }); }); function executeTask( @@ -1828,7 +1859,11 @@ function executeTask( logger: tracingSDK.getLogger("test-task"), }); - const consoleInterceptor = new ConsoleInterceptor(tracingSDK.getLogger("test-task"), false); + const consoleInterceptor = new ConsoleInterceptor( + tracingSDK.getLogger("test-task"), + false, + false + ); const executor = new TaskExecutor(task, { tracingSDK,