@@ -490,4 +490,123 @@ describe("RunEngine heartbeats", () => {
490490 await engine . quit ( ) ;
491491 }
492492 } ) ;
493+
494+ containerTest (
495+ "Heartbeat keeps run alive" ,
496+ { timeout : 15_000 } ,
497+ async ( { prisma, redisOptions } ) => {
498+ const authenticatedEnvironment = await setupAuthenticatedEnvironment ( prisma , "PRODUCTION" ) ;
499+
500+ const executingTimeout = 100 ;
501+
502+ const engine = new RunEngine ( {
503+ prisma,
504+ worker : {
505+ redis : redisOptions ,
506+ workers : 1 ,
507+ tasksPerWorker : 10 ,
508+ pollIntervalMs : 100 ,
509+ } ,
510+ queue : {
511+ redis : redisOptions ,
512+ } ,
513+ runLock : {
514+ redis : redisOptions ,
515+ } ,
516+ machines : {
517+ defaultMachine : "small-1x" ,
518+ machines : {
519+ "small-1x" : {
520+ name : "small-1x" as const ,
521+ cpu : 0.5 ,
522+ memory : 0.5 ,
523+ centsPerMs : 0.0001 ,
524+ } ,
525+ } ,
526+ baseCostInCents : 0.0001 ,
527+ } ,
528+ heartbeatTimeoutsMs : {
529+ EXECUTING : executingTimeout ,
530+ } ,
531+ tracer : trace . getTracer ( "test" , "0.0.0" ) ,
532+ } ) ;
533+
534+ try {
535+ const taskIdentifier = "test-task" ;
536+
537+ //create background worker
538+ const backgroundWorker = await setupBackgroundWorker (
539+ prisma ,
540+ authenticatedEnvironment ,
541+ taskIdentifier
542+ ) ;
543+
544+ //trigger the run
545+ const run = await engine . trigger (
546+ {
547+ number : 1 ,
548+ friendlyId : "run_1234" ,
549+ environment : authenticatedEnvironment ,
550+ taskIdentifier,
551+ payload : "{}" ,
552+ payloadType : "application/json" ,
553+ context : { } ,
554+ traceContext : { } ,
555+ traceId : "t12345" ,
556+ spanId : "s12345" ,
557+ masterQueue : "main" ,
558+ queueName : "task/test-task" ,
559+ isTest : false ,
560+ tags : [ ] ,
561+ } ,
562+ prisma
563+ ) ;
564+
565+ //dequeue the run
566+ const dequeued = await engine . dequeueFromMasterQueue ( {
567+ consumerId : "test_12345" ,
568+ masterQueue : run . masterQueue ,
569+ maxRunCount : 10 ,
570+ } ) ;
571+
572+ //create an attempt
573+ const attempt = await engine . startRunAttempt ( {
574+ runId : dequeued [ 0 ] . run . id ,
575+ snapshotId : dequeued [ 0 ] . snapshot . id ,
576+ } ) ;
577+
578+ //should be executing
579+ const executionData = await engine . getRunExecutionData ( { runId : run . id } ) ;
580+ assertNonNullable ( executionData ) ;
581+ expect ( executionData . snapshot . executionStatus ) . toBe ( "EXECUTING" ) ;
582+ expect ( executionData . run . status ) . toBe ( "EXECUTING" ) ;
583+
584+ // Send heartbeats every 50ms (half the timeout)
585+ for ( let i = 0 ; i < 6 ; i ++ ) {
586+ await setTimeout ( 50 ) ;
587+ await engine . heartbeatRun ( {
588+ runId : run . id ,
589+ snapshotId : attempt . snapshot . id ,
590+ } ) ;
591+ }
592+
593+ // After 300ms (3x the timeout) the run should still be executing
594+ // because we've been sending heartbeats
595+ const executionData2 = await engine . getRunExecutionData ( { runId : run . id } ) ;
596+ assertNonNullable ( executionData2 ) ;
597+ expect ( executionData2 . snapshot . executionStatus ) . toBe ( "EXECUTING" ) ;
598+ expect ( executionData2 . run . status ) . toBe ( "EXECUTING" ) ;
599+
600+ // Stop sending heartbeats and wait for timeout
601+ await setTimeout ( executingTimeout * 2 ) ;
602+
603+ // Now it should have timed out and be queued
604+ const executionData3 = await engine . getRunExecutionData ( { runId : run . id } ) ;
605+ assertNonNullable ( executionData3 ) ;
606+ expect ( executionData3 . snapshot . executionStatus ) . toBe ( "QUEUED" ) ;
607+ } finally {
608+ await engine . quit ( ) ;
609+ }
610+ }
611+ ) ;
493612} ) ;
0 commit comments