Skip to content

Commit cd6a0b3

Browse files
authored
fix: heartbeat race condition (acking) (#2380)
* The logger now supports metadata * Added metadata to ServiceValidationError in some critical places * Don't ack the heartbeat if there's a mismatch, it might prevent a brand new one * Don't ack the heartbeat inside stalled. By returning it will be acked IF the deduplication key matches
1 parent e9cc7de commit cd6a0b3

File tree

6 files changed

+29
-7
lines changed

6 files changed

+29
-7
lines changed

internal-packages/run-engine/src/engine/errors.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ export function runStatusFromError(error: TaskRunError): TaskRunStatus {
5959
export class ServiceValidationError extends Error {
6060
constructor(
6161
message: string,
62-
public status?: number
62+
public status?: number,
63+
public metadata?: Record<string, unknown>
6364
) {
6465
super(message);
6566
this.name = "ServiceValidationError";

internal-packages/run-engine/src/engine/index.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1155,7 +1155,6 @@ export class RunEngine {
11551155
}
11561156
);
11571157

1158-
await this.worker.ack(`heartbeatSnapshot.${runId}`);
11591158
return;
11601159
}
11611160

internal-packages/run-engine/src/engine/systems/checkpointSystem.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,11 +270,25 @@ export class CheckpointSystem {
270270
const snapshot = await getLatestExecutionSnapshot(prisma, runId);
271271

272272
if (snapshot.id !== snapshotId) {
273-
throw new ServiceValidationError("Snapshot ID doesn't match the latest snapshot", 400);
273+
throw new ServiceValidationError(
274+
"Snapshot ID doesn't match the latest snapshot in continueRunExecution",
275+
400,
276+
{
277+
snapshotId,
278+
latestSnapshotId: snapshot.id,
279+
}
280+
);
274281
}
275282

276283
if (!isPendingExecuting(snapshot.executionStatus)) {
277-
throw new ServiceValidationError("Snapshot is not in a valid state to continue", 400);
284+
throw new ServiceValidationError(
285+
"Snapshot is not in a valid state to continue in continueRunExecution",
286+
400,
287+
{
288+
snapshotId,
289+
snapshotStatus: snapshot.executionStatus,
290+
}
291+
);
278292
}
279293

280294
// Get the run and update the status

internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,6 @@ export class ExecutionSnapshotSystem {
361361
runnerId,
362362
});
363363

364-
await this.$.worker.ack(`heartbeatSnapshot.${runId}`);
365364
return executionResultFromSnapshot(latestSnapshot);
366365
}
367366

internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,9 +318,16 @@ export class RunAttemptSystem {
318318
//if there is a big delay between the snapshot and the attempt, the snapshot might have changed
319319
//we just want to log because elsewhere it should have been put back into a state where it can be attempted
320320
this.$.logger.warn(
321-
"RunEngine.createRunAttempt(): snapshot has changed since the attempt was created, ignoring."
321+
"RunEngine.createRunAttempt(): snapshot has changed since the attempt was created, ignoring.",
322+
{
323+
snapshotId,
324+
latestSnapshotId: latestSnapshot.id,
325+
}
322326
);
323-
throw new ServiceValidationError("Snapshot changed", 409);
327+
throw new ServiceValidationError("Snapshot changed inside startRunAttempt", 409, {
328+
snapshotId,
329+
latestSnapshotId: latestSnapshot.id,
330+
});
324331
}
325332

326333
const taskRun = await prisma.taskRun.findFirst({

packages/core/src/logger.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ function extractStructuredErrorFromArgs(...args: Array<Record<string, unknown> |
147147
message: error.message,
148148
stack: error.stack,
149149
name: error.name,
150+
metadata: "metadata" in error ? error.metadata : undefined,
150151
};
151152
}
152153

@@ -157,6 +158,7 @@ function extractStructuredErrorFromArgs(...args: Array<Record<string, unknown> |
157158
message: structuredError.error.message,
158159
stack: structuredError.error.stack,
159160
name: structuredError.error.name,
161+
metadata: "metadata" in structuredError.error ? structuredError.error.metadata : undefined,
160162
};
161163
}
162164

0 commit comments

Comments
 (0)