Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,14 @@ const EnvironmentSchema = z.object({
RUN_ENGINE_RUN_LOCK_JITTER_FACTOR: z.coerce.number().default(0.15),
RUN_ENGINE_RUN_LOCK_MAX_TOTAL_WAIT_TIME: z.coerce.number().int().default(15000),

RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_MAX_COUNT: z.coerce.number().int().default(12),
RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_MAX_DELAY_MS: z.coerce
.number()
.int()
.default(60_000 * 60 * 6),
RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_INITIAL_DELAY_MS: z.coerce.number().int().default(60_000),
RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_FACTOR: z.coerce.number().default(2),

RUN_ENGINE_WORKER_REDIS_HOST: z
.string()
.optional()
Expand Down
6 changes: 6 additions & 0 deletions apps/webapp/app/v3/runEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ function createRunEngine() {
EXECUTING_WITH_WAITPOINTS: env.RUN_ENGINE_TIMEOUT_EXECUTING_WITH_WAITPOINTS,
SUSPENDED: env.RUN_ENGINE_TIMEOUT_SUSPENDED,
},
suspendedHeartbeatRetriesConfig: {
maxCount: env.RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_MAX_COUNT,
maxDelayMs: env.RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_MAX_DELAY_MS,
initialDelayMs: env.RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_INITIAL_DELAY_MS,
factor: env.RUN_ENGINE_SUSPENDED_HEARTBEAT_RETRIES_FACTOR,
},
retryWarmStartThresholdMs: env.RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS,
billing: {
getCurrentPlan: async (orgId: string) => {
Expand Down
79 changes: 78 additions & 1 deletion internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1161,10 +1161,12 @@ export class RunEngine {
async #handleStalledSnapshot({
runId,
snapshotId,
restartAttempt,
tx,
}: {
runId: string;
snapshotId: string;
restartAttempt?: number;
tx?: PrismaClientOrTransaction;
}) {
const prisma = tx ?? this.prisma;
Expand Down Expand Up @@ -1297,11 +1299,86 @@ export class RunEngine {
snapshotId: latestSnapshot.id,
});

switch (result) {
switch (result.status) {
case "blocked": {
if (!this.options.suspendedHeartbeatRetriesConfig) {
break;
}

if (result.waitpoints.length === 0) {
this.logger.info("handleStalledSnapshot SUSPENDED blocked but no waitpoints", {
runId,
result,
snapshotId: latestSnapshot.id,
});
// If the run is blocked but there are no waitpoints, we don't restart the heartbeat
break;
}

const hasRunOrBatchWaitpoints = result.waitpoints.some(
(w) => w.type === "RUN" || w.type === "BATCH"
);

if (!hasRunOrBatchWaitpoints) {
this.logger.info(
"handleStalledSnapshot SUSPENDED blocked but no run or batch waitpoints",
{
runId,
result,
snapshotId: latestSnapshot.id,
}
);
// If the run is blocked by waitpoints that are not RUN or BATCH, we don't restart the heartbeat
break;
}

const initialDelayMs =
this.options.suspendedHeartbeatRetriesConfig.initialDelayMs ?? 60_000;
const $restartAttempt = (restartAttempt ?? 0) + 1; // Start at 1
const maxDelayMs =
this.options.suspendedHeartbeatRetriesConfig.maxDelayMs ?? 60_000 * 60 * 6; // 6 hours
const factor = this.options.suspendedHeartbeatRetriesConfig.factor ?? 2;
const maxCount = this.options.suspendedHeartbeatRetriesConfig.maxCount ?? 12;

if ($restartAttempt >= maxCount) {
this.logger.info(
"handleStalledSnapshot SUSPENDED blocked with waitpoints, max retries reached",
{
runId,
result,
snapshotId: latestSnapshot.id,
restartAttempt: $restartAttempt,
maxCount,
config: this.options.suspendedHeartbeatRetriesConfig,
}
);

break;
}

// Calculate the delay based on the retry attempt
const delayMs = Math.min(
initialDelayMs * Math.pow(factor, $restartAttempt - 1),
maxDelayMs
);

this.logger.info(
"handleStalledSnapshot SUSPENDED blocked with waitpoints, restarting heartbeat",
{
runId,
result,
snapshotId: latestSnapshot.id,
delayMs,
restartAttempt: $restartAttempt,
}
);

// Reschedule the heartbeat
await this.executionSnapshotSystem.restartHeartbeatForRun({
runId,
delayMs,
restartAttempt: $restartAttempt,
tx,
});
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,32 +396,31 @@ export class ExecutionSnapshotSystem {

public async restartHeartbeatForRun({
runId,
delayMs,
restartAttempt,
tx,
}: {
runId: string;
delayMs: number;
restartAttempt: number;
tx?: PrismaClientOrTransaction;
}): Promise<ExecutionResult> {
const prisma = tx ?? this.$.prisma;

const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId);

//extending the heartbeat
const intervalMs = this.#getHeartbeatIntervalMs(latestSnapshot.executionStatus);

if (intervalMs !== null) {
this.$.logger.debug("restartHeartbeatForRun: enqueuing heartbeat", {
runId,
snapshotId: latestSnapshot.id,
intervalMs,
});
this.$.logger.debug("restartHeartbeatForRun: enqueuing heartbeat", {
runId,
snapshotId: latestSnapshot.id,
delayMs,
});

await this.$.worker.enqueue({
id: `heartbeatSnapshot.${runId}`,
job: "heartbeatSnapshot",
payload: { snapshotId: latestSnapshot.id, runId },
availableAt: new Date(Date.now() + intervalMs),
});
}
await this.$.worker.enqueue({
id: `heartbeatSnapshot.${runId}`,
job: "heartbeatSnapshot",
payload: { snapshotId: latestSnapshot.id, runId, restartAttempt },
availableAt: new Date(Date.now() + delayMs),
});

return executionResultFromSnapshot(latestSnapshot);
}
Expand Down
61 changes: 51 additions & 10 deletions internal-packages/run-engine/src/engine/systems/waitpointSystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,22 @@ export type WaitpointSystemOptions = {
enqueueSystem: EnqueueSystem;
};

type WaitpointContinuationWaitpoint = Pick<Waitpoint, "id" | "type" | "completedAfter" | "status">;

export type WaitpointContinuationResult =
| {
status: "unblocked";
waitpoints: Array<WaitpointContinuationWaitpoint>;
}
| {
status: "skipped";
reason: string;
}
| {
status: "blocked";
waitpoints: Array<WaitpointContinuationWaitpoint>;
};

export class WaitpointSystem {
private readonly $: SystemResources;
private readonly executionSnapshotSystem: ExecutionSnapshotSystem;
Expand Down Expand Up @@ -480,7 +496,7 @@ export class WaitpointSystem {
runId,
}: {
runId: string;
}): Promise<"blocked" | "unblocked" | "skipped"> {
}): Promise<WaitpointContinuationResult> {
this.$.logger.debug(`continueRunIfUnblocked: start`, {
runId,
});
Expand All @@ -496,7 +512,7 @@ export class WaitpointSystem {
batchId: true,
batchIndex: true,
waitpoint: {
select: { id: true, status: true },
select: { id: true, status: true, type: true, completedAfter: true },
},
},
});
Expand All @@ -507,7 +523,11 @@ export class WaitpointSystem {
runId,
blockingWaitpoints,
});
return "blocked";

return {
status: "blocked",
waitpoints: blockingWaitpoints.map((w) => w.waitpoint),
};
}

// 3. Get the run with environment
Expand Down Expand Up @@ -547,7 +567,10 @@ export class WaitpointSystem {
executionStatus: snapshot.executionStatus,
});

return "skipped";
return {
status: "skipped",
reason: "run is already executing",
};
}
case "QUEUED": {
this.$.logger.info(`continueRunIfUnblocked: run is queued, skipping`, {
Expand All @@ -556,7 +579,10 @@ export class WaitpointSystem {
executionStatus: snapshot.executionStatus,
});

return "skipped";
return {
status: "skipped",
reason: "run is already queued",
};
}
case "PENDING_EXECUTING": {
this.$.logger.info(`continueRunIfUnblocked: run is pending executing, skipping`, {
Expand All @@ -565,7 +591,10 @@ export class WaitpointSystem {
executionStatus: snapshot.executionStatus,
});

return "skipped";
return {
status: "skipped",
reason: "run is already pending executing",
};
}
case "QUEUED_EXECUTING": {
this.$.logger.info(`continueRunIfUnblocked: run is already queued executing, skipping`, {
Expand All @@ -574,7 +603,10 @@ export class WaitpointSystem {
executionStatus: snapshot.executionStatus,
});

return "skipped";
return {
status: "skipped",
reason: "run is already queued executing",
};
}
case "EXECUTING": {
this.$.logger.info(`continueRunIfUnblocked: run is already executing, skipping`, {
Expand All @@ -583,7 +615,10 @@ export class WaitpointSystem {
executionStatus: snapshot.executionStatus,
});

return "skipped";
return {
status: "skipped",
reason: "run is already executing",
};
}
case "PENDING_CANCEL":
case "FINISHED": {
Expand All @@ -592,7 +627,10 @@ export class WaitpointSystem {
snapshot,
executionStatus: snapshot.executionStatus,
});
return "skipped";
return {
status: "skipped",
reason: "run is finished",
};
}
case "EXECUTING_WITH_WAITPOINTS": {
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(
Expand Down Expand Up @@ -693,7 +731,10 @@ export class WaitpointSystem {
});
}

return "unblocked";
return {
status: "unblocked",
waitpoints: blockingWaitpoints.map((w) => w.waitpoint),
};
}); // end of runlock
}

Expand Down
Loading
Loading