Skip to content

Commit 38902d3

Browse files
committed
coordinator will retry completion submission
1 parent 4b4c139 commit 38902d3

File tree

3 files changed

+131
-14
lines changed

3 files changed

+131
-14
lines changed

apps/coordinator/src/index.ts

Lines changed: 79 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import {
1111
} from "@trigger.dev/core/v3";
1212
import { ZodNamespace } from "@trigger.dev/core/v3/zodNamespace";
1313
import { ZodSocketConnection } from "@trigger.dev/core/v3/zodSocket";
14-
import { HttpReply, getTextBody } from "@trigger.dev/core/v3/apps";
14+
import { ExponentialBackoff, HttpReply, getTextBody } from "@trigger.dev/core/v3/apps";
1515
import { ChaosMonkey } from "./chaosMonkey";
1616
import { Checkpointer } from "./checkpointer";
1717
import { boolFromEnv, numFromEnv, safeJsonParse } from "./util";
@@ -30,6 +30,11 @@ const PLATFORM_WS_PORT = process.env.PLATFORM_WS_PORT || 3030;
3030
const PLATFORM_SECRET = process.env.PLATFORM_SECRET || "coordinator-secret";
3131
const SECURE_CONNECTION = ["1", "true"].includes(process.env.SECURE_CONNECTION ?? "false");
3232

33+
const TASK_RUN_COMPLETED_WITH_ACK_TIMEOUT_MS =
34+
parseInt(process.env.TASK_RUN_COMPLETED_WITH_ACK_TIMEOUT_MS || "") || 30_000;
35+
const TASK_RUN_COMPLETED_WITH_ACK_MAX_RETRIES =
36+
parseInt(process.env.TASK_RUN_COMPLETED_WITH_ACK_MAX_RETRIES || "") || 5;
37+
3338
const logger = new SimpleStructuredLogger("coordinator", undefined, { nodeName: NODE_NAME });
3439
const chaosMonkey = new ChaosMonkey(
3540
!!process.env.CHAOS_MONKEY_ENABLED,
@@ -720,37 +725,97 @@ class TaskCoordinator {
720725

721726
await chaosMonkey.call({ throwErrors: false });
722727

723-
const completeWithoutCheckpoint = (shouldExit: boolean) => {
728+
const sendCompletionWithAck = async (): Promise<boolean> => {
729+
try {
730+
const response = await this.#platformSocket?.sendWithAck(
731+
"TASK_RUN_COMPLETED_WITH_ACK",
732+
{
733+
version: "v2",
734+
execution,
735+
completion,
736+
},
737+
TASK_RUN_COMPLETED_WITH_ACK_TIMEOUT_MS
738+
);
739+
740+
if (!response) {
741+
log.error("TASK_RUN_COMPLETED_WITH_ACK: no response");
742+
return false;
743+
}
744+
745+
if (!response.success) {
746+
log.error("TASK_RUN_COMPLETED_WITH_ACK: error response", {
747+
error: response.error,
748+
});
749+
return false;
750+
}
751+
752+
log.log("TASK_RUN_COMPLETED_WITH_ACK: successful response");
753+
return true;
754+
} catch (error) {
755+
log.error("TASK_RUN_COMPLETED_WITH_ACK: threw error", { error });
756+
return false;
757+
}
758+
};
759+
760+
const completeWithoutCheckpoint = async (shouldExit: boolean) => {
724761
const supportsRetryCheckpoints = message.version === "v1";
725762

726-
this.#platformSocket?.send("TASK_RUN_COMPLETED", {
727-
version: supportsRetryCheckpoints ? "v1" : "v2",
728-
execution,
729-
completion,
730-
});
731763
callback({ willCheckpointAndRestore: false, shouldExit });
764+
765+
if (supportsRetryCheckpoints) {
766+
// This is only here for backwards compat
767+
this.#platformSocket?.send("TASK_RUN_COMPLETED", {
768+
version: "v1",
769+
execution,
770+
completion,
771+
});
772+
} else {
773+
// 99.99% of runs should end up here
774+
775+
const completedWithAckBackoff = new ExponentialBackoff("FullJitter").maxRetries(
776+
TASK_RUN_COMPLETED_WITH_ACK_MAX_RETRIES
777+
);
778+
779+
const result = await completedWithAckBackoff.execute(
780+
async ({ retry, delay, elapsedMs }) => {
781+
logger.log("TASK_RUN_COMPLETED_WITH_ACK: sending with backoff", {
782+
retry,
783+
delay,
784+
elapsedMs,
785+
});
786+
return await sendCompletionWithAck();
787+
}
788+
);
789+
790+
if (!result.success) {
791+
logger.error("TASK_RUN_COMPLETED_WITH_ACK: failed to send with backoff", result);
792+
return;
793+
}
794+
795+
logger.log("TASK_RUN_COMPLETED_WITH_ACK: sent with backoff", result);
796+
}
732797
};
733798

734799
if (completion.ok) {
735-
completeWithoutCheckpoint(true);
800+
await completeWithoutCheckpoint(true);
736801
return;
737802
}
738803

739804
if (
740805
completion.error.type === "INTERNAL_ERROR" &&
741806
completion.error.code === "TASK_RUN_CANCELLED"
742807
) {
743-
completeWithoutCheckpoint(true);
808+
await completeWithoutCheckpoint(true);
744809
return;
745810
}
746811

747812
if (completion.retry === undefined) {
748-
completeWithoutCheckpoint(true);
813+
await completeWithoutCheckpoint(true);
749814
return;
750815
}
751816

752817
if (completion.retry.delay < this.#delayThresholdInMs) {
753-
completeWithoutCheckpoint(false);
818+
await completeWithoutCheckpoint(false);
754819

755820
// Prevents runs that fail fast from never sending a heartbeat
756821
this.#sendRunHeartbeat(socket.data.runId);
@@ -759,7 +824,7 @@ class TaskCoordinator {
759824
}
760825

761826
if (message.version === "v2") {
762-
completeWithoutCheckpoint(true);
827+
await completeWithoutCheckpoint(true);
763828
return;
764829
}
765830

@@ -768,7 +833,7 @@ class TaskCoordinator {
768833
const willCheckpointAndRestore = canCheckpoint || willSimulate;
769834

770835
if (!willCheckpointAndRestore) {
771-
completeWithoutCheckpoint(false);
836+
await completeWithoutCheckpoint(false);
772837
return;
773838
}
774839

@@ -792,7 +857,7 @@ class TaskCoordinator {
792857

793858
if (!checkpoint) {
794859
log.error("Failed to checkpoint");
795-
completeWithoutCheckpoint(false);
860+
await completeWithoutCheckpoint(false);
796861
return;
797862
}
798863

apps/webapp/app/v3/handleSocketIo.server.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,44 @@ function createCoordinatorNamespace(io: Server) {
136136
checkpoint: message.checkpoint,
137137
});
138138
},
139+
TASK_RUN_COMPLETED_WITH_ACK: async (message) => {
140+
try {
141+
const completeAttempt = new CompleteAttemptService({
142+
supportsRetryCheckpoints: message.version === "v1",
143+
});
144+
await completeAttempt.call({
145+
completion: message.completion,
146+
execution: message.execution,
147+
checkpoint: message.checkpoint,
148+
});
149+
150+
return {
151+
success: true,
152+
};
153+
} catch (error) {
154+
const friendlyError =
155+
error instanceof Error
156+
? {
157+
name: error.name,
158+
message: error.message,
159+
stack: error.stack,
160+
}
161+
: {
162+
name: "UnknownError",
163+
message: String(error),
164+
};
165+
166+
logger.error("Error while completing attempt with ack", {
167+
error: friendlyError,
168+
message,
169+
});
170+
171+
return {
172+
success: false,
173+
error: friendlyError,
174+
};
175+
}
176+
},
139177
TASK_RUN_FAILED_TO_RUN: async (message) => {
140178
await sharedQueueTasks.taskRunFailed(message.completion);
141179
},

packages/core/src/v3/schemas/messages.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,20 @@ export const CoordinatorToPlatformMessages = {
435435
.optional(),
436436
}),
437437
},
438+
TASK_RUN_COMPLETED_WITH_ACK: {
439+
message: z.object({
440+
version: z.enum(["v1", "v2"]).default("v2"),
441+
execution: ProdTaskRunExecution,
442+
completion: TaskRunExecutionResult,
443+
checkpoint: z
444+
.object({
445+
docker: z.boolean(),
446+
location: z.string(),
447+
})
448+
.optional(),
449+
}),
450+
callback: AckCallbackResult,
451+
},
438452
TASK_RUN_FAILED_TO_RUN: {
439453
message: z.object({
440454
version: z.literal("v1").default("v1"),

0 commit comments

Comments
 (0)