Skip to content

Commit ed83fbe

Browse files
committed
Added RESUME_AFTER_DEPENDENCY_WITH_ACK
1 parent 7d9d653 commit ed83fbe

File tree

3 files changed

+90
-22
lines changed

3 files changed

+90
-22
lines changed

apps/coordinator/src/index.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,33 @@ class TaskCoordinator {
162162

163163
taskSocket.emit("RESUME_AFTER_DEPENDENCY", message);
164164
},
165+
RESUME_AFTER_DEPENDENCY_WITH_ACK: async (message) => {
166+
const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId);
167+
168+
if (!taskSocket) {
169+
logger.log("Socket for attempt not found", {
170+
attemptFriendlyId: message.attemptFriendlyId,
171+
});
172+
return {
173+
success: false,
174+
error: {
175+
name: "SocketNotFoundError",
176+
message: "Socket for attempt not found",
177+
},
178+
};
179+
}
180+
181+
await chaosMonkey.call();
182+
183+
// In case the task resumed faster than we could checkpoint
184+
this.#cancelCheckpoint(message.runId);
185+
186+
taskSocket.emit("RESUME_AFTER_DEPENDENCY", message);
187+
188+
return {
189+
success: true,
190+
};
191+
},
165192
RESUME_AFTER_DURATION: async (message) => {
166193
const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId);
167194

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -725,20 +725,47 @@ export class SharedQueueConsumer {
725725
}
726726

727727
try {
728-
logger.debug("Broadcasting RESUME_AFTER_DEPENDENCY", {
729-
runId: resumableAttempt.taskRunId,
730-
attemptId: resumableAttempt.id,
731-
});
732-
733-
// The attempt should still be running so we can broadcast to all coordinators to resume immediately
734-
socketIo.coordinatorNamespace.emit("RESUME_AFTER_DEPENDENCY", {
735-
version: "v1",
728+
const resumeMessage = {
729+
version: "v1" as const,
736730
runId: resumableAttempt.taskRunId,
737731
attemptId: resumableAttempt.id,
738732
attemptFriendlyId: resumableAttempt.friendlyId,
739733
completions,
740734
executions,
735+
};
736+
737+
logger.debug("Broadcasting RESUME_AFTER_DEPENDENCY_WITH_ACK", { resumeMessage, message });
738+
739+
// The attempt should still be running so we can broadcast to all coordinators to resume immediately
740+
const responses = await socketIo.coordinatorNamespace
741+
.timeout(10_000)
742+
.emitWithAck("RESUME_AFTER_DEPENDENCY_WITH_ACK", resumeMessage);
743+
744+
logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK received", {
745+
resumeMessage,
746+
responses,
747+
message,
741748
});
749+
750+
if (responses.length === 0) {
751+
logger.error("RESUME_AFTER_DEPENDENCY_WITH_ACK no response", {
752+
resumeMessage,
753+
message,
754+
});
755+
await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 1_000);
756+
return;
757+
}
758+
759+
const failed = responses.filter((response) => !response.success);
760+
if (failed.length > 0) {
761+
logger.error("RESUME_AFTER_DEPENDENCY_WITH_ACK failed", {
762+
resumeMessage,
763+
failed,
764+
message,
765+
});
766+
await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 1_000);
767+
return;
768+
}
742769
} catch (e) {
743770
if (e instanceof Error) {
744771
this._currentSpan?.recordException(e);

packages/core/src/v3/schemas/messages.ts

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,21 @@ import {
1515
WaitReason,
1616
} from "./schemas.js";
1717

18+
const ackCallbackResult = z.discriminatedUnion("success", [
19+
z.object({
20+
success: z.literal(false),
21+
error: z.object({
22+
name: z.string(),
23+
message: z.string(),
24+
stack: z.string().optional(),
25+
stderr: z.string().optional(),
26+
}),
27+
}),
28+
z.object({
29+
success: z.literal(true),
30+
}),
31+
]);
32+
1833
export const BackgroundWorkerServerMessages = z.discriminatedUnion("type", [
1934
z.object({
2035
type: z.literal("CANCEL_ATTEMPT"),
@@ -269,20 +284,7 @@ export const PlatformToProviderMessages = {
269284
projectId: z.string(),
270285
deploymentId: z.string(),
271286
}),
272-
callback: z.discriminatedUnion("success", [
273-
z.object({
274-
success: z.literal(false),
275-
error: z.object({
276-
name: z.string(),
277-
message: z.string(),
278-
stack: z.string().optional(),
279-
stderr: z.string().optional(),
280-
}),
281-
}),
282-
z.object({
283-
success: z.literal(true),
284-
}),
285-
]),
287+
callback: ackCallbackResult,
286288
},
287289
RESTORE: {
288290
message: z.object({
@@ -504,6 +506,7 @@ export const CoordinatorToPlatformMessages = {
504506
};
505507

506508
export const PlatformToCoordinatorMessages = {
509+
/** @deprecated use RESUME_AFTER_DEPENDENCY_WITH_ACK instead */
507510
RESUME_AFTER_DEPENDENCY: {
508511
message: z.object({
509512
version: z.literal("v1").default("v1"),
@@ -514,6 +517,17 @@ export const PlatformToCoordinatorMessages = {
514517
executions: TaskRunExecution.array(),
515518
}),
516519
},
520+
RESUME_AFTER_DEPENDENCY_WITH_ACK: {
521+
message: z.object({
522+
version: z.literal("v1").default("v1"),
523+
runId: z.string(),
524+
attemptId: z.string(),
525+
attemptFriendlyId: z.string(),
526+
completions: TaskRunExecutionResult.array(),
527+
executions: TaskRunExecution.array(),
528+
}),
529+
callback: ackCallbackResult,
530+
},
517531
RESUME_AFTER_DURATION: {
518532
message: z.object({
519533
version: z.literal("v1").default("v1"),

0 commit comments

Comments
 (0)