diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 1e8ada42a7..f943a25a39 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -2431,8 +2431,11 @@ export class RunEngine { }, }); - //extending is the same as creating a new heartbeat - await this.#setHeartbeatDeadline({ runId, snapshotId, status: latestSnapshot.executionStatus }); + //extending the heartbeat + const intervalMs = this.#getHeartbeatIntervalMs(latestSnapshot.executionStatus); + if (intervalMs !== null) { + await this.worker.reschedule(`heartbeatSnapshot.${runId}`, new Date(Date.now() + intervalMs)); + } return executionResultFromSnapshot(latestSnapshot); } @@ -3636,11 +3639,15 @@ export class RunEngine { if (!error) { //set heartbeat (if relevant) - await this.#setHeartbeatDeadline({ - status: newSnapshot.executionStatus, - runId: run.id, - snapshotId: newSnapshot.id, - }); + const intervalMs = this.#getHeartbeatIntervalMs(newSnapshot.executionStatus); + if (intervalMs !== null) { + await this.worker.enqueue({ + id: `heartbeatSnapshot.${run.id}`, + job: "heartbeatSnapshot", + payload: { snapshotId: newSnapshot.id, runId: run.id }, + availableAt: new Date(Date.now() + intervalMs), + }); + } } this.eventBus.emit("executionSnapshotCreated", { @@ -3684,29 +3691,6 @@ export class RunEngine { //#endregion //#region Heartbeat - async #setHeartbeatDeadline({ - runId, - snapshotId, - status, - }: { - runId: string; - snapshotId: string; - status: TaskRunExecutionStatus; - }) { - const intervalMs = this.#getHeartbeatIntervalMs(status); - - if (intervalMs === null) { - return; - } - - await this.worker.enqueue({ - id: `heartbeatSnapshot.${runId}`, - job: "heartbeatSnapshot", - payload: { snapshotId, runId }, - availableAt: new Date(Date.now() + intervalMs), - }); - } - async #handleStalledSnapshot({ runId, snapshotId, diff --git a/internal-packages/run-engine/src/engine/tests/heartbeats.test.ts b/internal-packages/run-engine/src/engine/tests/heartbeats.test.ts index c1fefde687..1ff229cd7b 100644 --- a/internal-packages/run-engine/src/engine/tests/heartbeats.test.ts +++ b/internal-packages/run-engine/src/engine/tests/heartbeats.test.ts @@ -490,4 +490,123 @@ describe("RunEngine heartbeats", () => { await engine.quit(); } }); + + containerTest( + "Heartbeat keeps run alive", + { timeout: 15_000 }, + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const executingTimeout = 100; + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + heartbeatTimeoutsMs: { + EXECUTING: executingTimeout, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + + //create background worker + const backgroundWorker = await setupBackgroundWorker( + prisma, + authenticatedEnvironment, + taskIdentifier + ); + + //trigger the run + const run = await engine.trigger( + { + number: 1, + friendlyId: "run_1234", + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + masterQueue: "main", + queueName: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + //dequeue the run + const dequeued = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: run.masterQueue, + maxRunCount: 10, + }); + + //create an attempt + const attempt = await engine.startRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: dequeued[0].snapshot.id, + }); + + //should be executing + const executionData = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionData); + expect(executionData.snapshot.executionStatus).toBe("EXECUTING"); + expect(executionData.run.status).toBe("EXECUTING"); + + // Send heartbeats every 50ms (half the timeout) + for (let i = 0; i < 6; i++) { + await setTimeout(50); + await engine.heartbeatRun({ + runId: run.id, + snapshotId: attempt.snapshot.id, + }); + } + + // After 300ms (3x the timeout) the run should still be executing + // because we've been sending heartbeats + const executionData2 = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionData2); + expect(executionData2.snapshot.executionStatus).toBe("EXECUTING"); + expect(executionData2.run.status).toBe("EXECUTING"); + + // Stop sending heartbeats and wait for timeout + await setTimeout(executingTimeout * 2); + + // Now it should have timed out and be queued + const executionData3 = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionData3); + expect(executionData3.snapshot.executionStatus).toBe("QUEUED"); + } finally { + await engine.quit(); + } + } + ); });