Skip to content

Fix for heartbeat race condition (acking) #2380

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion internal-packages/run-engine/src/engine/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ export function runStatusFromError(error: TaskRunError): TaskRunStatus {
export class ServiceValidationError extends Error {
constructor(
message: string,
public status?: number
public status?: number,
public metadata?: Record<string, unknown>
) {
Comment on lines +62 to 64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Preserve Error prototype chain for reliable instanceof checks

When extending Error, set the prototype explicitly to avoid issues in some transpilation/runtime scenarios.

Apply this diff:

   constructor(
     message: string,
     public status?: number,
     public metadata?: Record<string, unknown>
   ) {
     super(message);
     this.name = "ServiceValidationError";
+    Object.setPrototypeOf(this, new.target.prototype);
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public status?: number,
public metadata?: Record<string, unknown>
) {
constructor(
message: string,
public status?: number,
public metadata?: Record<string, unknown>
) {
super(message);
this.name = "ServiceValidationError";
Object.setPrototypeOf(this, new.target.prototype);
}
🤖 Prompt for AI Agents
In internal-packages/run-engine/src/engine/errors.ts around lines 62 to 64, the
custom Error subclass does not preserve the Error prototype chain which can
break instanceof checks in some transpilation/runtime environments; after
calling super(...) in the constructor, explicitly set the prototype (e.g.
Object.setPrototypeOf(this, EngineError.prototype)) and set this.name
appropriately (and optionally use Error.captureStackTrace if available) so the
instance properly inherits from Error and instanceof checks work reliably.

super(message);
this.name = "ServiceValidationError";
Expand Down
1 change: 0 additions & 1 deletion internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1155,7 +1155,6 @@ export class RunEngine {
}
);

await this.worker.ack(`heartbeatSnapshot.${runId}`);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,25 @@ export class CheckpointSystem {
const snapshot = await getLatestExecutionSnapshot(prisma, runId);

if (snapshot.id !== snapshotId) {
throw new ServiceValidationError("Snapshot ID doesn't match the latest snapshot", 400);
throw new ServiceValidationError(
"Snapshot ID doesn't match the latest snapshot in continueRunExecution",
400,
{
snapshotId,
latestSnapshotId: snapshot.id,
}
);
}

if (!isPendingExecuting(snapshot.executionStatus)) {
throw new ServiceValidationError("Snapshot is not in a valid state to continue", 400);
throw new ServiceValidationError(
"Snapshot is not in a valid state to continue in continueRunExecution",
400,
{
snapshotId,
snapshotStatus: snapshot.executionStatus,
}
);
}

// Get the run and update the status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,6 @@ export class ExecutionSnapshotSystem {
runnerId,
});

await this.$.worker.ack(`heartbeatSnapshot.${runId}`);
return executionResultFromSnapshot(latestSnapshot);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,16 @@ export class RunAttemptSystem {
//if there is a big delay between the snapshot and the attempt, the snapshot might have changed
//we just want to log because elsewhere it should have been put back into a state where it can be attempted
this.$.logger.warn(
"RunEngine.createRunAttempt(): snapshot has changed since the attempt was created, ignoring."
"RunEngine.createRunAttempt(): snapshot has changed since the attempt was created, ignoring.",
{
snapshotId,
latestSnapshotId: latestSnapshot.id,
}
);
throw new ServiceValidationError("Snapshot changed", 409);
throw new ServiceValidationError("Snapshot changed inside startRunAttempt", 409, {
snapshotId,
latestSnapshotId: latestSnapshot.id,
});
}

const taskRun = await prisma.taskRun.findFirst({
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ function extractStructuredErrorFromArgs(...args: Array<Record<string, unknown> |
message: error.message,
stack: error.stack,
name: error.name,
metadata: "metadata" in error ? error.metadata : undefined,
};
}

Expand All @@ -157,6 +158,7 @@ function extractStructuredErrorFromArgs(...args: Array<Record<string, unknown> |
message: structuredError.error.message,
stack: structuredError.error.stack,
name: structuredError.error.name,
metadata: "metadata" in structuredError.error ? structuredError.error.metadata : undefined,
};
}

Expand Down
Loading