@@ -14,7 +14,7 @@ import { ZodSocketConnection } from "@trigger.dev/core/v3/zodSocket";
1414import { HttpReply , getTextBody } from "@trigger.dev/core/v3/apps" ;
1515import { ChaosMonkey } from "./chaosMonkey" ;
1616import { Checkpointer } from "./checkpointer" ;
17- import { boolFromEnv , numFromEnv } from "./util" ;
17+ import { boolFromEnv , numFromEnv , safeJsonParse } from "./util" ;
1818
1919import { collectDefaultMetrics , register , Gauge } from "prom-client" ;
2020import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger" ;
@@ -42,6 +42,8 @@ class CheckpointCancelError extends Error {}
4242
4343class TaskCoordinator {
4444 #httpServer: ReturnType < typeof createServer > ;
45+ #internalHttpServer: ReturnType < typeof createServer > ;
46+
4547 #checkpointer = new Checkpointer ( {
4648 dockerMode : ! process . env . KUBERNETES_PORT ,
4749 forceSimulate : boolFromEnv ( "FORCE_CHECKPOINT_SIMULATION" , false ) ,
@@ -79,6 +81,8 @@ class TaskCoordinator {
7981 private host = "0.0.0.0"
8082 ) {
8183 this . #httpServer = this . #createHttpServer( ) ;
84+ this . #internalHttpServer = this . #createInternalHttpServer( ) ;
85+
8286 this . #checkpointer. init ( ) ;
8387 this . #platformSocket = this . #createPlatformSocket( ) ;
8488
@@ -653,11 +657,11 @@ class TaskCoordinator {
653657
654658 log . error ( "READY_FOR_LAZY_ATTEMPT error" , { error } ) ;
655659
656- await crashRun ( {
657- name : "ReadyForLazyAttemptError" ,
658- message :
659- error instanceof Error ? `Unexpected error: ${ error . message } ` : "Unexpected error" ,
660- } ) ;
660+ // await crashRun({
661+ // name: "ReadyForLazyAttemptError",
662+ // message:
663+ // error instanceof Error ? `Unexpected error: ${error.message}` : "Unexpected error",
664+ // });
661665
662666 return ;
663667 }
@@ -1368,13 +1372,236 @@ class TaskCoordinator {
13681372 case "/metrics" : {
13691373 return reply . text ( await register . metrics ( ) , 200 , register . contentType ) ;
13701374 }
1375+ default : {
1376+ return reply . empty ( 404 ) ;
1377+ }
1378+ }
1379+ } ) ;
1380+
1381+ httpServer . on ( "clientError" , ( err , socket ) => {
1382+ socket . end ( "HTTP/1.1 400 Bad Request\r\n\r\n" ) ;
1383+ } ) ;
1384+
1385+ httpServer . on ( "listening" , ( ) => {
1386+ logger . log ( "server listening on port" , { port : HTTP_SERVER_PORT } ) ;
1387+ } ) ;
1388+
1389+ return httpServer ;
1390+ }
1391+
1392+ #createInternalHttpServer( ) {
1393+ const httpServer = createServer ( async ( req , res ) => {
1394+ logger . log ( `[${ req . method } ]` , { url : req . url } ) ;
1395+
1396+ const reply = new HttpReply ( res ) ;
1397+
1398+ switch ( req . url ) {
13711399 case "/whoami" : {
13721400 return reply . text ( NODE_NAME ) ;
13731401 }
1374- case "/checkpoint" : {
1375- const body = await getTextBody ( req ) ;
1376- // await this.#checkpointer.checkpointAndPush(body);
1377- return reply . text ( `sent restore request: ${ body } ` ) ;
1402+ case "/checkpoint/duration" : {
1403+ try {
1404+ const body = await getTextBody ( req ) ;
1405+ const json = safeJsonParse ( body ) ;
1406+
1407+ if ( typeof json !== "object" || ! json ) {
1408+ return reply . text ( "Invalid body" , 400 ) ;
1409+ }
1410+
1411+ if ( ! ( "runId" in json ) || typeof json . runId !== "string" ) {
1412+ return reply . text ( "Missing or invalid: runId" , 400 ) ;
1413+ }
1414+
1415+ if ( ! ( "now" in json ) || typeof json . now !== "number" ) {
1416+ return reply . text ( "Missing or invalid: now" , 400 ) ;
1417+ }
1418+
1419+ if ( ! ( "ms" in json ) || typeof json . ms !== "number" ) {
1420+ return reply . text ( "Missing or invalid: ms" , 400 ) ;
1421+ }
1422+
1423+ let keepRunAlive = false ;
1424+ if ( "keepRunAlive" in json && typeof json . keepRunAlive === "boolean" ) {
1425+ keepRunAlive = json . keepRunAlive ;
1426+ }
1427+
1428+ let async = false ;
1429+ if ( "async" in json && typeof json . async === "boolean" ) {
1430+ async = json . async ;
1431+ }
1432+
1433+ const { runId, now, ms } = json ;
1434+
1435+ if ( ! runId ) {
1436+ return reply . text ( "Missing runId" , 400 ) ;
1437+ }
1438+
1439+ const runSocket = await this . #getRunSocket( runId ) ;
1440+ if ( ! runSocket ) {
1441+ return reply . text ( "Run socket not found" , 404 ) ;
1442+ }
1443+
1444+ const { data } = runSocket ;
1445+
1446+ console . log ( "Manual duration checkpoint" , data ) ;
1447+
1448+ if ( async ) {
1449+ reply . text ( "Creating checkpoint in the background" , 202 ) ;
1450+ }
1451+
1452+ const checkpoint = await this . #checkpointer. checkpointAndPush ( {
1453+ runId : data . runId ,
1454+ projectRef : data . projectRef ,
1455+ deploymentVersion : data . deploymentVersion ,
1456+ attemptNumber : data . attemptNumber ? parseInt ( data . attemptNumber ) : undefined ,
1457+ } ) ;
1458+
1459+ if ( ! checkpoint ) {
1460+ return reply . text ( "Failed to checkpoint" , 500 ) ;
1461+ }
1462+
1463+ if ( ! data . attemptFriendlyId ) {
1464+ return reply . text ( "Socket data missing attemptFriendlyId" , 500 ) ;
1465+ }
1466+
1467+ const ack = await this . #platformSocket?. sendWithAck ( "CHECKPOINT_CREATED" , {
1468+ version : "v1" ,
1469+ runId,
1470+ attemptFriendlyId : data . attemptFriendlyId ,
1471+ docker : checkpoint . docker ,
1472+ location : checkpoint . location ,
1473+ reason : {
1474+ type : "WAIT_FOR_DURATION" ,
1475+ ms,
1476+ now,
1477+ } ,
1478+ } ) ;
1479+
1480+ if ( ack ?. keepRunAlive || keepRunAlive ) {
1481+ return reply . json ( {
1482+ message : `keeping run ${ runId } alive after checkpoint` ,
1483+ checkpoint,
1484+ requestJson : json ,
1485+ platformAck : ack ,
1486+ } ) ;
1487+ }
1488+
1489+ runSocket . emit ( "REQUEST_EXIT" , {
1490+ version : "v1" ,
1491+ } ) ;
1492+
1493+ return reply . json ( {
1494+ message : `checkpoint created for run ${ runId } ` ,
1495+ checkpoint,
1496+ requestJson : json ,
1497+ platformAck : ack ,
1498+ } ) ;
1499+ } catch ( error ) {
1500+ return reply . json ( {
1501+ message : `error` ,
1502+ error,
1503+ } ) ;
1504+ }
1505+ }
1506+ case "/checkpoint/manual" : {
1507+ try {
1508+ const body = await getTextBody ( req ) ;
1509+ const json = safeJsonParse ( body ) ;
1510+
1511+ if ( typeof json !== "object" || ! json ) {
1512+ return reply . text ( "Invalid body" , 400 ) ;
1513+ }
1514+
1515+ if ( ! ( "runId" in json ) || typeof json . runId !== "string" ) {
1516+ return reply . text ( "Missing or invalid: runId" , 400 ) ;
1517+ }
1518+
1519+ let restoreAtUnixTimeMs : number | undefined ;
1520+ if ( "restoreAtUnixTimeMs" in json && typeof json . restoreAtUnixTimeMs === "number" ) {
1521+ restoreAtUnixTimeMs = json . restoreAtUnixTimeMs ;
1522+ }
1523+
1524+ let keepRunAlive = false ;
1525+ if ( "keepRunAlive" in json && typeof json . keepRunAlive === "boolean" ) {
1526+ keepRunAlive = json . keepRunAlive ;
1527+ }
1528+
1529+ let async = false ;
1530+ if ( "async" in json && typeof json . async === "boolean" ) {
1531+ async = json . async ;
1532+ }
1533+
1534+ const { runId } = json ;
1535+
1536+ if ( ! runId ) {
1537+ return reply . text ( "Missing runId" , 400 ) ;
1538+ }
1539+
1540+ const runSocket = await this . #getRunSocket( runId ) ;
1541+ if ( ! runSocket ) {
1542+ return reply . text ( "Run socket not found" , 404 ) ;
1543+ }
1544+
1545+ const { data } = runSocket ;
1546+
1547+ console . log ( "Manual checkpoint" , data ) ;
1548+
1549+ if ( async ) {
1550+ reply . text ( "Creating checkpoint in the background" , 202 ) ;
1551+ }
1552+
1553+ const checkpoint = await this . #checkpointer. checkpointAndPush ( {
1554+ runId : data . runId ,
1555+ projectRef : data . projectRef ,
1556+ deploymentVersion : data . deploymentVersion ,
1557+ attemptNumber : data . attemptNumber ? parseInt ( data . attemptNumber ) : undefined ,
1558+ } ) ;
1559+
1560+ if ( ! checkpoint ) {
1561+ return reply . text ( "Failed to checkpoint" , 500 ) ;
1562+ }
1563+
1564+ if ( ! data . attemptFriendlyId ) {
1565+ return reply . text ( "Socket data missing attemptFriendlyId" , 500 ) ;
1566+ }
1567+
1568+ const ack = await this . #platformSocket?. sendWithAck ( "CHECKPOINT_CREATED" , {
1569+ version : "v1" ,
1570+ runId,
1571+ attemptFriendlyId : data . attemptFriendlyId ,
1572+ docker : checkpoint . docker ,
1573+ location : checkpoint . location ,
1574+ reason : {
1575+ type : "MANUAL" ,
1576+ restoreAtUnixTimeMs,
1577+ } ,
1578+ } ) ;
1579+
1580+ if ( ack ?. keepRunAlive || keepRunAlive ) {
1581+ return reply . json ( {
1582+ message : `keeping run ${ runId } alive after checkpoint` ,
1583+ checkpoint,
1584+ requestJson : json ,
1585+ platformAck : ack ,
1586+ } ) ;
1587+ }
1588+
1589+ runSocket . emit ( "REQUEST_EXIT" , {
1590+ version : "v1" ,
1591+ } ) ;
1592+
1593+ return reply . json ( {
1594+ message : `checkpoint created for run ${ runId } ` ,
1595+ checkpoint,
1596+ requestJson : json ,
1597+ platformAck : ack ,
1598+ } ) ;
1599+ } catch ( error ) {
1600+ return reply . json ( {
1601+ message : `error` ,
1602+ error,
1603+ } ) ;
1604+ }
13781605 }
13791606 default : {
13801607 return reply . empty ( 404 ) ;
@@ -1387,14 +1614,15 @@ class TaskCoordinator {
13871614 } ) ;
13881615
13891616 httpServer . on ( "listening" , ( ) => {
1390- logger . log ( "server listening on port" , { port : HTTP_SERVER_PORT } ) ;
1617+ logger . log ( "internal server listening on port" , { port : HTTP_SERVER_PORT + 100 } ) ;
13911618 } ) ;
13921619
13931620 return httpServer ;
13941621 }
13951622
13961623 listen ( ) {
13971624 this . #httpServer. listen ( this . port , this . host ) ;
1625+ this . #internalHttpServer. listen ( this . port + 100 , "127.0.0.1" ) ;
13981626 }
13991627}
14001628
0 commit comments