@@ -11,7 +11,7 @@ import {
1111} from "@trigger.dev/core/v3" ;
1212import { ZodNamespace } from "@trigger.dev/core/v3/zodNamespace" ;
1313import { ZodSocketConnection } from "@trigger.dev/core/v3/zodSocket" ;
14- import { HttpReply , getTextBody } from "@trigger.dev/core/v3/apps" ;
14+ import { ExponentialBackoff , HttpReply , getTextBody } from "@trigger.dev/core/v3/apps" ;
1515import { ChaosMonkey } from "./chaosMonkey" ;
1616import { Checkpointer } from "./checkpointer" ;
1717import { boolFromEnv , numFromEnv } from "./util" ;
@@ -37,6 +37,22 @@ const chaosMonkey = new ChaosMonkey(
3737 ! ! process . env . CHAOS_MONKEY_DISABLE_DELAYS
3838) ;
3939
40+ const backoff = new ExponentialBackoff ( "FullJitter" , {
41+ maxRetries : 7 ,
42+ } ) ;
43+
44+ function serializeError ( error : unknown ) {
45+ if ( error instanceof Error ) {
46+ return {
47+ name : error . name ,
48+ message : error . message ,
49+ stack : error . stack ,
50+ } ;
51+ }
52+
53+ return error ;
54+ }
55+
4056class CheckpointReadinessTimeoutError extends Error { }
4157class CheckpointCancelError extends Error { }
4258
@@ -612,12 +628,36 @@ class TaskCoordinator {
612628 log . log ( "Handling READY_FOR_LAZY_ATTEMPT" ) ;
613629
614630 try {
615- const lazyAttempt = await this . #platformSocket?. sendWithAck ( "READY_FOR_LAZY_ATTEMPT" , {
616- ...message ,
617- envId : socket . data . envId ,
618- } ) ;
631+ const lazyAttempt = await backoff
632+ . max ( 2 ) // The run controller expects a reply in 10s so we can't have long retry delays
633+ . maxRetries ( 5 )
634+ . execute ( async ( ) => {
635+ return await this . #platformSocket?. sendWithAck ( "READY_FOR_LAZY_ATTEMPT" , {
636+ ...message ,
637+ envId : socket . data . envId ,
638+ } ) ;
639+ } ) ;
640+
641+ if ( ! lazyAttempt . success ) {
642+ log . error ( "Failed to send READY_FOR_LAZY_ATTEMPT" , {
643+ error : serializeError ( lazyAttempt . error ) ,
644+ cause : lazyAttempt . cause ,
645+ } ) ;
646+
647+ await crashRun ( {
648+ name : "ReadyForLazyAttemptError" ,
649+ message :
650+ lazyAttempt . error instanceof Error
651+ ? `[${ lazyAttempt . cause } ] ${ lazyAttempt . error . name } : ${ lazyAttempt . error . message } `
652+ : `[${ lazyAttempt . cause } ] ${ lazyAttempt . error } ` ,
653+ } ) ;
654+
655+ return ;
656+ }
657+
658+ const lazyAttemptResponse = lazyAttempt . result ;
619659
620- if ( ! lazyAttempt ) {
660+ if ( ! lazyAttemptResponse ) {
621661 log . error ( "no lazy attempt ack" ) ;
622662
623663 await crashRun ( {
@@ -628,8 +668,10 @@ class TaskCoordinator {
628668 return ;
629669 }
630670
631- if ( ! lazyAttempt . success ) {
632- log . error ( "failed to get lazy attempt payload" , { reason : lazyAttempt . reason } ) ;
671+ if ( ! lazyAttemptResponse . success ) {
672+ log . error ( "failed to get lazy attempt payload" , {
673+ reason : lazyAttemptResponse . reason ,
674+ } ) ;
633675
634676 await crashRun ( {
635677 name : "ReadyForLazyAttemptError" ,
@@ -643,7 +685,7 @@ class TaskCoordinator {
643685
644686 socket . emit ( "EXECUTE_TASK_RUN_LAZY_ATTEMPT" , {
645687 version : "v1" ,
646- lazyPayload : lazyAttempt . lazyPayload ,
688+ lazyPayload : lazyAttemptResponse . lazyPayload ,
647689 } ) ;
648690 } catch ( error ) {
649691 if ( error instanceof ChaosMonkey . Error ) {
@@ -949,20 +991,39 @@ class TaskCoordinator {
949991
950992 log . addFields ( { checkpoint } ) ;
951993
952- const ack = await this . #platformSocket?. sendWithAck ( "CHECKPOINT_CREATED" , {
953- version : "v1" ,
954- runId : socket . data . runId ,
955- attemptFriendlyId : message . attemptFriendlyId ,
956- docker : checkpoint . docker ,
957- location : checkpoint . location ,
958- reason : {
959- type : "WAIT_FOR_DURATION" ,
960- ms : message . ms ,
961- now : message . now ,
962- } ,
994+ const checkpointCreated = await backoff . execute ( async ( ) => {
995+ return await this . #platformSocket?. sendWithAck ( "CHECKPOINT_CREATED" , {
996+ version : "v1" ,
997+ runId : socket . data . runId ,
998+ attemptFriendlyId : message . attemptFriendlyId ,
999+ docker : checkpoint . docker ,
1000+ location : checkpoint . location ,
1001+ reason : {
1002+ type : "WAIT_FOR_DURATION" ,
1003+ ms : message . ms ,
1004+ now : message . now ,
1005+ } ,
1006+ } ) ;
9631007 } ) ;
9641008
965- if ( ack ?. keepRunAlive ) {
1009+ if ( ! checkpointCreated . success ) {
1010+ log . error ( "Failed to send CHECKPOINT_CREATED" , {
1011+ error : serializeError ( checkpointCreated . error ) ,
1012+ cause : checkpointCreated . cause ,
1013+ } ) ;
1014+
1015+ await crashRun ( {
1016+ name : "WaitForDurationCheckpointError" ,
1017+ message :
1018+ checkpointCreated . error instanceof Error
1019+ ? `[${ checkpointCreated . cause } ] ${ checkpointCreated . error . name } : ${ checkpointCreated . error . message } `
1020+ : `[${ checkpointCreated . cause } ] ${ checkpointCreated . error } ` ,
1021+ } ) ;
1022+
1023+ return ;
1024+ }
1025+
1026+ if ( checkpointCreated . result ?. keepRunAlive ) {
9661027 log . log ( "keeping run alive after duration checkpoint" ) ;
9671028 return ;
9681029 }
@@ -1042,19 +1103,38 @@ class TaskCoordinator {
10421103 socket . data . requiresCheckpointResumeWithMessage = `location:${ checkpoint . location } -docker:${ checkpoint . docker } ` ;
10431104 log . log ( "WAIT_FOR_TASK set requiresCheckpointResumeWithMessage" ) ;
10441105
1045- const ack = await this . #platformSocket?. sendWithAck ( "CHECKPOINT_CREATED" , {
1046- version : "v1" ,
1047- runId : socket . data . runId ,
1048- attemptFriendlyId : message . attemptFriendlyId ,
1049- docker : checkpoint . docker ,
1050- location : checkpoint . location ,
1051- reason : {
1052- type : "WAIT_FOR_TASK" ,
1053- friendlyId : message . friendlyId ,
1054- } ,
1106+ const checkpointCreated = await backoff . execute ( async ( ) => {
1107+ return await this . #platformSocket?. sendWithAck ( "CHECKPOINT_CREATED" , {
1108+ version : "v1" ,
1109+ runId : socket . data . runId ,
1110+ attemptFriendlyId : message . attemptFriendlyId ,
1111+ docker : checkpoint . docker ,
1112+ location : checkpoint . location ,
1113+ reason : {
1114+ type : "WAIT_FOR_TASK" ,
1115+ friendlyId : message . friendlyId ,
1116+ } ,
1117+ } ) ;
10551118 } ) ;
10561119
1057- if ( ack ?. keepRunAlive ) {
1120+ if ( ! checkpointCreated . success ) {
1121+ log . error ( "Failed to send CHECKPOINT_CREATED" , {
1122+ error : serializeError ( checkpointCreated . error ) ,
1123+ cause : checkpointCreated . cause ,
1124+ } ) ;
1125+
1126+ await crashRun ( {
1127+ name : "WaitForTaskCheckpointError" ,
1128+ message :
1129+ checkpointCreated . error instanceof Error
1130+ ? `[${ checkpointCreated . cause } ] ${ checkpointCreated . error . name } : ${ checkpointCreated . error . message } `
1131+ : `[${ checkpointCreated . cause } ] ${ checkpointCreated . error } ` ,
1132+ } ) ;
1133+
1134+ return ;
1135+ }
1136+
1137+ if ( checkpointCreated . result ?. keepRunAlive ) {
10581138 socket . data . requiresCheckpointResumeWithMessage = undefined ;
10591139 log . log ( "keeping run alive after task checkpoint" ) ;
10601140 return ;
@@ -1135,20 +1215,39 @@ class TaskCoordinator {
11351215 socket . data . requiresCheckpointResumeWithMessage = `location:${ checkpoint . location } -docker:${ checkpoint . docker } ` ;
11361216 log . log ( "WAIT_FOR_BATCH set checkpoint" ) ;
11371217
1138- const ack = await this . #platformSocket?. sendWithAck ( "CHECKPOINT_CREATED" , {
1139- version : "v1" ,
1140- runId : socket . data . runId ,
1141- attemptFriendlyId : message . attemptFriendlyId ,
1142- docker : checkpoint . docker ,
1143- location : checkpoint . location ,
1144- reason : {
1145- type : "WAIT_FOR_BATCH" ,
1146- batchFriendlyId : message . batchFriendlyId ,
1147- runFriendlyIds : message . runFriendlyIds ,
1148- } ,
1218+ const checkpointCreated = await backoff . execute ( async ( ) => {
1219+ return await this . #platformSocket?. sendWithAck ( "CHECKPOINT_CREATED" , {
1220+ version : "v1" ,
1221+ runId : socket . data . runId ,
1222+ attemptFriendlyId : message . attemptFriendlyId ,
1223+ docker : checkpoint . docker ,
1224+ location : checkpoint . location ,
1225+ reason : {
1226+ type : "WAIT_FOR_BATCH" ,
1227+ batchFriendlyId : message . batchFriendlyId ,
1228+ runFriendlyIds : message . runFriendlyIds ,
1229+ } ,
1230+ } ) ;
11491231 } ) ;
11501232
1151- if ( ack ?. keepRunAlive ) {
1233+ if ( ! checkpointCreated . success ) {
1234+ log . error ( "Failed to send CHECKPOINT_CREATED" , {
1235+ error : serializeError ( checkpointCreated . error ) ,
1236+ cause : checkpointCreated . cause ,
1237+ } ) ;
1238+
1239+ await crashRun ( {
1240+ name : "WaitForBatchCheckpointError" ,
1241+ message :
1242+ checkpointCreated . error instanceof Error
1243+ ? `[${ checkpointCreated . cause } ] ${ checkpointCreated . error . name } : ${ checkpointCreated . error . message } `
1244+ : `[${ checkpointCreated . cause } ] ${ checkpointCreated . error } ` ,
1245+ } ) ;
1246+
1247+ return ;
1248+ }
1249+
1250+ if ( checkpointCreated . result ?. keepRunAlive ) {
11521251 socket . data . requiresCheckpointResumeWithMessage = undefined ;
11531252 log . log ( "keeping run alive after batch checkpoint" ) ;
11541253 return ;
@@ -1239,26 +1338,49 @@ class TaskCoordinator {
12391338 try {
12401339 await chaosMonkey . call ( { throwErrors : false } ) ;
12411340
1242- const createAttempt = await this . #platformSocket?. sendWithAck (
1243- "CREATE_TASK_RUN_ATTEMPT" ,
1244- {
1245- runId : message . runId ,
1246- envId : socket . data . envId ,
1247- }
1248- ) ;
1341+ const createAttempt = await backoff
1342+ . max ( 3 ) // The run controller expects a reply in 15s so we can't have long retry delays
1343+ . maxRetries ( 5 )
1344+ . execute ( async ( ) => {
1345+ return await this . #platformSocket?. sendWithAck ( "CREATE_TASK_RUN_ATTEMPT" , {
1346+ runId : message . runId ,
1347+ envId : socket . data . envId ,
1348+ } ) ;
1349+ } ) ;
1350+
1351+ if ( ! createAttempt . success ) {
1352+ log . error ( "Failed to send CREATE_TASK_RUN_ATTEMPT" , {
1353+ error : serializeError ( createAttempt . error ) ,
1354+ cause : createAttempt . cause ,
1355+ } ) ;
1356+
1357+ callback ( {
1358+ success : false ,
1359+ reason :
1360+ createAttempt . error instanceof Error
1361+ ? `[${ createAttempt . cause } ] ${ createAttempt . error . name } : ${ createAttempt . error . message } `
1362+ : `[${ createAttempt . cause } ] ${ createAttempt . error } ` ,
1363+ } ) ;
12491364
1250- if ( ! createAttempt ?. success ) {
1251- log . debug ( "no ack while creating attempt" , { reason : createAttempt ?. reason } ) ;
1252- callback ( { success : false , reason : createAttempt ?. reason } ) ;
12531365 return ;
12541366 }
12551367
1256- updateAttemptFriendlyId ( createAttempt . executionPayload . execution . attempt . id ) ;
1257- updateAttemptNumber ( createAttempt . executionPayload . execution . attempt . number ) ;
1368+ const createAttemptResponse = createAttempt . result ;
1369+
1370+ if ( ! createAttemptResponse ?. success ) {
1371+ log . debug ( "no ack while creating attempt" , { reason : createAttemptResponse ?. reason } ) ;
1372+ callback ( { success : false , reason : createAttemptResponse ?. reason } ) ;
1373+ return ;
1374+ }
1375+
1376+ const { executionPayload } = createAttemptResponse ;
1377+
1378+ updateAttemptFriendlyId ( executionPayload . execution . attempt . id ) ;
1379+ updateAttemptNumber ( executionPayload . execution . attempt . number ) ;
12581380
12591381 callback ( {
12601382 success : true ,
1261- executionPayload : createAttempt . executionPayload ,
1383+ executionPayload,
12621384 } ) ;
12631385 } catch ( error ) {
12641386 log . error ( "CREATE_TASK_RUN_ATTEMPT error" , { error } ) ;
0 commit comments