Skip to content
Merged
27 changes: 27 additions & 0 deletions apps/coordinator/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,33 @@ class TaskCoordinator {

taskSocket.emit("RESUME_AFTER_DEPENDENCY", message);
},
RESUME_AFTER_DEPENDENCY_WITH_ACK: async (message) => {
const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId);

if (!taskSocket) {
logger.log("Socket for attempt not found", {
attemptFriendlyId: message.attemptFriendlyId,
});
return {
success: false,
error: {
name: "SocketNotFoundError",
message: "Socket for attempt not found",
},
};
}

await chaosMonkey.call();

// In case the task resumed faster than we could checkpoint
this.#cancelCheckpoint(message.runId);

taskSocket.emit("RESUME_AFTER_DEPENDENCY", message);

return {
success: true,
};
},
RESUME_AFTER_DURATION: async (message) => {
const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId);

Expand Down
4 changes: 2 additions & 2 deletions apps/webapp/app/hooks/useSearchParam.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ export function useSearchParams() {
}

if (typeof value === "string") {
search.set(param, encodeURIComponent(value));
search.set(param, value);
continue;
}

search.delete(param);
for (const v of value) {
search.append(param, encodeURIComponent(v));
search.append(param, v);
}
}
},
Expand Down
43 changes: 35 additions & 8 deletions apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -725,20 +725,47 @@ export class SharedQueueConsumer {
}

try {
logger.debug("Broadcasting RESUME_AFTER_DEPENDENCY", {
runId: resumableAttempt.taskRunId,
attemptId: resumableAttempt.id,
});

// The attempt should still be running so we can broadcast to all coordinators to resume immediately
socketIo.coordinatorNamespace.emit("RESUME_AFTER_DEPENDENCY", {
version: "v1",
const resumeMessage = {
version: "v1" as const,
runId: resumableAttempt.taskRunId,
attemptId: resumableAttempt.id,
attemptFriendlyId: resumableAttempt.friendlyId,
completions,
executions,
};

logger.debug("Broadcasting RESUME_AFTER_DEPENDENCY_WITH_ACK", { resumeMessage, message });

// The attempt should still be running so we can broadcast to all coordinators to resume immediately
const responses = await socketIo.coordinatorNamespace
.timeout(10_000)
.emitWithAck("RESUME_AFTER_DEPENDENCY_WITH_ACK", resumeMessage);

logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK received", {
resumeMessage,
responses,
message,
});

if (responses.length === 0) {
logger.error("RESUME_AFTER_DEPENDENCY_WITH_ACK no response", {
resumeMessage,
message,
});
await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 5_000);
return;
}

const failed = responses.filter((response) => !response.success);
if (failed.length > 0) {
logger.error("RESUME_AFTER_DEPENDENCY_WITH_ACK failed", {
resumeMessage,
failed,
message,
});
await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 5_000);
return;
}
} catch (e) {
if (e instanceof Error) {
this._currentSpan?.recordException(e);
Expand Down
4 changes: 3 additions & 1 deletion apps/webapp/app/v3/services/resumeBatchRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ export class ResumeBatchRunService extends BaseService {
if (wasUpdated) {
logger.debug("ResumeBatchRunService: Resuming dependent run without checkpoint", {
batchRunId: batchRun.id,
dependentTaskAttemptId: batchRun.dependentTaskAttempt.id,
dependentTaskAttempt: batchRun.dependentTaskAttempt,
checkpointEventId: batchRun.checkpointEventId,
hasCheckpointEvent: !!batchRun.checkpointEventId,
});
await marqs?.replaceMessage(dependentRun.id, {
type: "RESUME",
Expand Down
42 changes: 28 additions & 14 deletions packages/core/src/v3/schemas/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,21 @@ import {
WaitReason,
} from "./schemas.js";

const ackCallbackResult = z.discriminatedUnion("success", [
z.object({
success: z.literal(false),
error: z.object({
name: z.string(),
message: z.string(),
stack: z.string().optional(),
stderr: z.string().optional(),
}),
}),
z.object({
success: z.literal(true),
}),
]);

export const BackgroundWorkerServerMessages = z.discriminatedUnion("type", [
z.object({
type: z.literal("CANCEL_ATTEMPT"),
Expand Down Expand Up @@ -269,20 +284,7 @@ export const PlatformToProviderMessages = {
projectId: z.string(),
deploymentId: z.string(),
}),
callback: z.discriminatedUnion("success", [
z.object({
success: z.literal(false),
error: z.object({
name: z.string(),
message: z.string(),
stack: z.string().optional(),
stderr: z.string().optional(),
}),
}),
z.object({
success: z.literal(true),
}),
]),
callback: ackCallbackResult,
},
RESTORE: {
message: z.object({
Expand Down Expand Up @@ -504,6 +506,7 @@ export const CoordinatorToPlatformMessages = {
};

export const PlatformToCoordinatorMessages = {
/** @deprecated use RESUME_AFTER_DEPENDENCY_WITH_ACK instead */
RESUME_AFTER_DEPENDENCY: {
message: z.object({
version: z.literal("v1").default("v1"),
Expand All @@ -514,6 +517,17 @@ export const PlatformToCoordinatorMessages = {
executions: TaskRunExecution.array(),
}),
},
RESUME_AFTER_DEPENDENCY_WITH_ACK: {
message: z.object({
version: z.literal("v1").default("v1"),
runId: z.string(),
attemptId: z.string(),
attemptFriendlyId: z.string(),
completions: TaskRunExecutionResult.array(),
executions: TaskRunExecution.array(),
}),
callback: ackCallbackResult,
},
RESUME_AFTER_DURATION: {
message: z.object({
version: z.literal("v1").default("v1"),
Expand Down
Loading