Skip to content

Commit f8bc209

Browse files
committed
implement and test handleError (now catchError)
1 parent ec4be99 commit f8bc209

File tree

2 files changed

+331
-85
lines changed

2 files changed

+331
-85
lines changed

packages/core/src/v3/workers/taskExecutor.ts

Lines changed: 121 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { ApiError, RateLimitError } from "../apiClient/errors.js";
44
import { ConsoleInterceptor } from "../consoleInterceptor.js";
55
import { isInternalError, parseError, sanitizeError, TaskPayloadParsedError } from "../errors.js";
66
import { flattenAttributes, lifecycleHooks, runMetadata, waitUntil } from "../index.js";
7-
import { TaskCompleteResult } from "../lifecycleHooks/types.js";
7+
import { TaskCompleteResult, TaskInitOutput } from "../lifecycleHooks/types.js";
88
import { recordSpanException, TracingSDK } from "../otel/index.js";
99
import { runTimelineMetrics } from "../run-timeline-metrics-api.js";
1010
import {
@@ -19,7 +19,11 @@ import {
1919
import { SemanticInternalAttributes } from "../semanticInternalAttributes.js";
2020
import { taskContext } from "../task-context-api.js";
2121
import { TriggerTracer } from "../tracer.js";
22-
import { HandleErrorFunction, TaskMetadataWithFunctions } from "../types/index.js";
22+
import {
23+
HandleErrorFunction,
24+
HandleErrorModificationOptions,
25+
TaskMetadataWithFunctions,
26+
} from "../types/index.js";
2327
import {
2428
conditionallyExportPacket,
2529
conditionallyImportPacket,
@@ -670,21 +674,17 @@ export class TaskExecutor {
670674
error: unknown,
671675
payload: any,
672676
ctx: TaskRunContext,
673-
init: unknown,
677+
init: TaskInitOutput,
674678
signal?: AbortSignal
675679
): Promise<
676680
| { status: "retry"; retry: TaskRunExecutionRetry; error?: unknown }
677-
| { status: "skipped"; error?: unknown } // skipped is different than noop, it means that the task was skipped from retrying, instead of just not retrying
681+
| { status: "skipped"; error?: unknown }
678682
| { status: "noop"; error?: unknown }
679683
> {
680684
const retriesConfig = this._retries;
681-
682685
const retry = this.task.retry ?? retriesConfig?.default;
683686

684-
if (!retry) {
685-
return { status: "noop" };
686-
}
687-
687+
// Early exit conditions that prevent retrying
688688
if (isInternalError(error) && error.skipRetrying) {
689689
return { status: "skipped", error };
690690
}
@@ -696,23 +696,28 @@ export class TaskExecutor {
696696
return { status: "skipped" };
697697
}
698698

699-
if (execution.run.maxAttempts) {
700-
retry.maxAttempts = Math.max(execution.run.maxAttempts, 1);
701-
}
702-
703-
let delay = calculateNextRetryDelay(retry, execution.attempt.number);
704-
705-
if (
706-
delay &&
707-
error instanceof Error &&
708-
error.name === "TriggerApiError" &&
709-
(error as ApiError).status === 429
710-
) {
711-
const rateLimitError = error as RateLimitError;
699+
// Calculate default retry delay if retry config exists
700+
let defaultDelay: number | undefined;
701+
if (retry) {
702+
if (execution.run.maxAttempts) {
703+
retry.maxAttempts = Math.max(execution.run.maxAttempts, 1);
704+
}
712705

713-
delay = rateLimitError.millisecondsUntilReset;
706+
defaultDelay = calculateNextRetryDelay(retry, execution.attempt.number);
707+
708+
// Handle rate limit errors
709+
if (
710+
defaultDelay &&
711+
error instanceof Error &&
712+
error.name === "TriggerApiError" &&
713+
(error as ApiError).status === 429
714+
) {
715+
const rateLimitError = error as RateLimitError;
716+
defaultDelay = rateLimitError.millisecondsUntilReset;
717+
}
714718
}
715719

720+
// Check if retries are enabled in dev environment
716721
if (
717722
execution.environment.type === "DEVELOPMENT" &&
718723
typeof retriesConfig?.enabledInDev === "boolean" &&
@@ -724,72 +729,53 @@ export class TaskExecutor {
724729
return this._tracer.startActiveSpan(
725730
"handleError()",
726731
async (span) => {
727-
const handleErrorResult = this.task.fns.handleError
728-
? await this.task.fns.handleError(payload, error, {
729-
ctx,
730-
init,
731-
retry,
732-
retryDelayInMs: delay,
733-
retryAt: delay ? new Date(Date.now() + delay) : undefined,
734-
signal,
735-
})
736-
: this._handleErrorFn
737-
? await this._handleErrorFn(payload, error, {
738-
ctx,
739-
init,
740-
retry,
741-
retryDelayInMs: delay,
742-
retryAt: delay ? new Date(Date.now() + delay) : undefined,
743-
signal,
744-
})
745-
: undefined;
746-
747-
// If handleErrorResult
748-
if (!handleErrorResult) {
749-
return typeof delay === "undefined"
750-
? { status: "noop" }
751-
: { status: "retry", retry: { timestamp: Date.now() + delay, delay } };
752-
}
753-
754-
if (handleErrorResult.skipRetrying) {
755-
return { status: "skipped", error: handleErrorResult.error };
756-
}
757-
758-
if (typeof handleErrorResult.retryAt !== "undefined") {
759-
return {
760-
status: "retry",
761-
retry: {
762-
timestamp: handleErrorResult.retryAt.getTime(),
763-
delay: handleErrorResult.retryAt.getTime() - Date.now(),
764-
},
765-
error: handleErrorResult.error,
766-
};
767-
}
768-
769-
if (typeof handleErrorResult.retryDelayInMs === "number") {
770-
return {
771-
status: "retry",
772-
retry: {
773-
timestamp: Date.now() + handleErrorResult.retryDelayInMs,
774-
delay: handleErrorResult.retryDelayInMs,
775-
},
776-
error: handleErrorResult.error,
777-
};
732+
// Try task-specific catch error hook first
733+
const taskCatchErrorHook = lifecycleHooks.getTaskCatchErrorHook(this.task.id);
734+
if (taskCatchErrorHook) {
735+
const result = await taskCatchErrorHook({
736+
payload,
737+
error,
738+
ctx,
739+
init,
740+
retry,
741+
retryDelayInMs: defaultDelay,
742+
retryAt: defaultDelay ? new Date(Date.now() + defaultDelay) : undefined,
743+
signal,
744+
task: this.task.id,
745+
});
746+
747+
if (result) {
748+
return this.#processHandleErrorResult(result, execution.attempt.number, defaultDelay);
749+
}
778750
}
779751

780-
if (handleErrorResult.retry && typeof handleErrorResult.retry === "object") {
781-
const delay = calculateNextRetryDelay(handleErrorResult.retry, execution.attempt.number);
782-
783-
return typeof delay === "undefined"
784-
? { status: "noop", error: handleErrorResult.error }
785-
: {
786-
status: "retry",
787-
retry: { timestamp: Date.now() + delay, delay },
788-
error: handleErrorResult.error,
789-
};
752+
// Try global catch error hooks in order
753+
const globalCatchErrorHooks = lifecycleHooks.getGlobalCatchErrorHooks();
754+
for (const hook of globalCatchErrorHooks) {
755+
const result = await hook.fn({
756+
payload,
757+
error,
758+
ctx,
759+
init,
760+
retry,
761+
retryDelayInMs: defaultDelay,
762+
retryAt: defaultDelay ? new Date(Date.now() + defaultDelay) : undefined,
763+
signal,
764+
task: this.task.id,
765+
});
766+
767+
if (result) {
768+
return this.#processHandleErrorResult(result, execution.attempt.number, defaultDelay);
769+
}
790770
}
791771

792-
return { status: "noop", error: handleErrorResult.error };
772+
// If no hooks handled the error, use default retry behavior
773+
return typeof defaultDelay === "undefined"
774+
? { status: "noop" as const }
775+
: {
776+
status: "retry" as const,
777+
retry: { timestamp: Date.now() + defaultDelay, delay: defaultDelay },
778+
};
793779
},
794780
{
795781
attributes: {
@@ -799,6 +785,56 @@ export class TaskExecutor {
799785
);
800786
}
801787

788+
// Helper method to process handle error results
789+
#processHandleErrorResult(
790+
result: HandleErrorModificationOptions,
791+
attemptNumber: number,
792+
defaultDelay?: number
793+
):
794+
| { status: "retry"; retry: TaskRunExecutionRetry; error?: unknown }
795+
| { status: "skipped"; error?: unknown }
796+
| { status: "noop"; error?: unknown } {
797+
if (result.skipRetrying) {
798+
return { status: "skipped", error: result.error };
799+
}
800+
801+
if (typeof result.retryAt !== "undefined") {
802+
return {
803+
status: "retry",
804+
retry: {
805+
timestamp: result.retryAt.getTime(),
806+
delay: result.retryAt.getTime() - Date.now(),
807+
},
808+
error: result.error,
809+
};
810+
}
811+
812+
if (typeof result.retryDelayInMs === "number") {
813+
return {
814+
status: "retry",
815+
retry: {
816+
timestamp: Date.now() + result.retryDelayInMs,
817+
delay: result.retryDelayInMs,
818+
},
819+
error: result.error,
820+
};
821+
}
822+
823+
if (result.retry && typeof result.retry === "object") {
824+
const delay = calculateNextRetryDelay(result.retry, attemptNumber);
825+
826+
return typeof delay === "undefined"
827+
? { status: "noop", error: result.error }
828+
: {
829+
status: "retry",
830+
retry: { timestamp: Date.now() + delay, delay },
831+
error: result.error,
832+
};
833+
}
834+
835+
return { status: "noop", error: result.error };
836+
}
837+
802838
async #callOnCompleteFunctions(
803839
payload: unknown,
804840
result: TaskCompleteResult<unknown>,

0 commit comments

Comments
 (0)