Skip to content

Commit 83b4fdf

Browse files
committed
implement onComplete
1 parent fcb4992 commit 83b4fdf

File tree

1 file changed

+118
-0
lines changed

1 file changed

+118
-0
lines changed

packages/core/src/v3/workers/taskExecutor.ts

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import {
3333
stringifyIO,
3434
} from "../utils/ioSerialization.js";
3535
import { calculateNextRetryDelay } from "../utils/retries.js";
36+
import { TaskCompleteResult } from "../lifecycleHooks/types.js";
3637

3738
export type TaskExecutorOptions = {
3839
tracingSDK: TracingSDK;
@@ -154,6 +155,15 @@ export class TaskExecutor {
154155
span.setAttributes(attributes);
155156
}
156157

158+
// Call onComplete with success result
159+
await this.#callOnCompleteFunctions(
160+
parsedPayload,
161+
{ ok: true, data: output },
162+
ctx,
163+
initOutput,
164+
signal
165+
);
166+
157167
return {
158168
ok: true,
159169
id: execution.run.id,
@@ -163,6 +173,15 @@ export class TaskExecutor {
163173
} catch (outputError) {
164174
recordSpanException(span, outputError);
165175

176+
// Call onComplete with error result
177+
await this.#callOnCompleteFunctions(
178+
parsedPayload,
179+
{ ok: false, error: outputError },
180+
ctx,
181+
initOutput,
182+
signal
183+
);
184+
166185
return {
167186
ok: false,
168187
id: execution.run.id,
@@ -199,6 +218,15 @@ export class TaskExecutor {
199218
initOutput,
200219
signal
201220
);
221+
222+
// Call onComplete with error result
223+
await this.#callOnCompleteFunctions(
224+
parsedPayload,
225+
{ ok: false, error: handleErrorResult.error ?? runError },
226+
ctx,
227+
initOutput,
228+
signal
229+
);
202230
}
203231

204232
return {
@@ -215,6 +243,15 @@ export class TaskExecutor {
215243
} catch (handleErrorError) {
216244
recordSpanException(span, handleErrorError);
217245

246+
// Call onComplete with error result
247+
await this.#callOnCompleteFunctions(
248+
parsedPayload,
249+
{ ok: false, error: handleErrorError },
250+
ctx,
251+
initOutput,
252+
signal
253+
);
254+
218255
return {
219256
ok: false,
220257
id: execution.run.id,
@@ -767,4 +804,85 @@ export class TaskExecutor {
767804
}
768805
);
769806
}
807+
808+
async #callOnCompleteFunctions(
809+
payload: unknown,
810+
result: TaskCompleteResult<unknown>,
811+
ctx: TaskRunContext,
812+
initOutput: any,
813+
signal?: AbortSignal
814+
) {
815+
const globalCompleteHooks = lifecycleHooks.getGlobalCompleteHooks();
816+
const taskCompleteHook = lifecycleHooks.getTaskCompleteHook(this.task.id);
817+
818+
if (globalCompleteHooks.length === 0 && !taskCompleteHook) {
819+
return;
820+
}
821+
822+
return this._tracer.startActiveSpan(
823+
"hooks.complete",
824+
async (span) => {
825+
return await runTimelineMetrics.measureMetric(
826+
"trigger.dev/execution",
827+
"complete",
828+
async () => {
829+
for (const hook of globalCompleteHooks) {
830+
try {
831+
await this._tracer.startActiveSpan(
832+
hook.name ?? "global",
833+
async (span) => {
834+
await hook.fn({
835+
payload,
836+
result,
837+
ctx,
838+
signal,
839+
task: this.task.id,
840+
init: initOutput,
841+
});
842+
},
843+
{
844+
attributes: {
845+
[SemanticInternalAttributes.STYLE_ICON]: "tabler-function",
846+
},
847+
}
848+
);
849+
} catch {
850+
// Ignore errors from onComplete functions
851+
}
852+
}
853+
854+
if (taskCompleteHook) {
855+
try {
856+
await this._tracer.startActiveSpan(
857+
"task",
858+
async (span) => {
859+
await taskCompleteHook({
860+
payload,
861+
result,
862+
ctx,
863+
signal,
864+
task: this.task.id,
865+
init: initOutput,
866+
});
867+
},
868+
{
869+
attributes: {
870+
[SemanticInternalAttributes.STYLE_ICON]: "tabler-function",
871+
},
872+
}
873+
);
874+
} catch {
875+
// Ignore errors from onComplete functions
876+
}
877+
}
878+
}
879+
);
880+
},
881+
{
882+
attributes: {
883+
[SemanticInternalAttributes.STYLE_ICON]: "tabler-function",
884+
},
885+
}
886+
);
887+
}
770888
}

0 commit comments

Comments
 (0)