@@ -1129,219 +1129,6 @@ describe("RunEngine Waitpoints", () => {
11291129 }
11301130 } ) ;
11311131
1132- containerTest (
1133- "continueRunIfUnblocked enqueues run when cannot reacquire concurrency" ,
1134- async ( { prisma, redisOptions } ) => {
1135- const authenticatedEnvironment = await setupAuthenticatedEnvironment ( prisma , "PRODUCTION" ) ;
1136-
1137- const engine = new RunEngine ( {
1138- prisma,
1139- worker : {
1140- redis : redisOptions ,
1141- workers : 1 ,
1142- tasksPerWorker : 10 ,
1143- pollIntervalMs : 100 ,
1144- } ,
1145- queue : {
1146- redis : redisOptions ,
1147- } ,
1148- runLock : {
1149- redis : redisOptions ,
1150- } ,
1151- machines : {
1152- defaultMachine : "small-1x" ,
1153- machines : {
1154- "small-1x" : {
1155- name : "small-1x" as const ,
1156- cpu : 0.5 ,
1157- memory : 0.5 ,
1158- centsPerMs : 0.0001 ,
1159- } ,
1160- } ,
1161- baseCostInCents : 0.0001 ,
1162- } ,
1163- tracer : trace . getTracer ( "test" , "0.0.0" ) ,
1164- } ) ;
1165-
1166- try {
1167- const taskIdentifier = "test-task" ;
1168-
1169- // Create background worker
1170- await setupBackgroundWorker (
1171- engine ,
1172- authenticatedEnvironment ,
1173- taskIdentifier ,
1174- undefined ,
1175- undefined ,
1176- {
1177- concurrencyLimit : 1 ,
1178- }
1179- ) ;
1180-
1181- // Create first run with queue concurrency limit of 1
1182- const firstRun = await engine . trigger (
1183- {
1184- number : 1 ,
1185- friendlyId : "run_first" ,
1186- environment : authenticatedEnvironment ,
1187- taskIdentifier,
1188- payload : "{}" ,
1189- payloadType : "application/json" ,
1190- context : { } ,
1191- traceContext : { } ,
1192- traceId : "t12345-first" ,
1193- spanId : "s12345-first" ,
1194- workerQueue : "main" ,
1195- queue : "task/test-task" ,
1196- isTest : false ,
1197- tags : [ ] ,
1198- } ,
1199- prisma
1200- ) ;
1201-
1202- // Dequeue and start the first run
1203- await setTimeout ( 500 ) ;
1204- const dequeuedFirst = await engine . dequeueFromWorkerQueue ( {
1205- consumerId : "test_12345" ,
1206- workerQueue : "main" ,
1207- } ) ;
1208-
1209- const firstAttempt = await engine . startRunAttempt ( {
1210- runId : dequeuedFirst [ 0 ] . run . id ,
1211- snapshotId : dequeuedFirst [ 0 ] . snapshot . id ,
1212- } ) ;
1213- expect ( firstAttempt . snapshot . executionStatus ) . toBe ( "EXECUTING" ) ;
1214-
1215- // Create a manual waitpoint for the first run
1216- const waitpoint = await engine . createManualWaitpoint ( {
1217- environmentId : authenticatedEnvironment . id ,
1218- projectId : authenticatedEnvironment . projectId ,
1219- } ) ;
1220- expect ( waitpoint . waitpoint . status ) . toBe ( "PENDING" ) ;
1221-
1222- // Block the first run
1223- const blockedResult = await engine . blockRunWithWaitpoint ( {
1224- runId : firstRun . id ,
1225- waitpoints : waitpoint . waitpoint . id ,
1226- projectId : authenticatedEnvironment . projectId ,
1227- organizationId : authenticatedEnvironment . organizationId ,
1228- } ) ;
1229-
1230- // Verify first run is blocked
1231- const firstRunData = await engine . getRunExecutionData ( { runId : firstRun . id } ) ;
1232- expect ( firstRunData ?. snapshot . executionStatus ) . toBe ( "EXECUTING_WITH_WAITPOINTS" ) ;
1233-
1234- // Create and start second run on the same queue
1235- const secondRun = await engine . trigger (
1236- {
1237- number : 2 ,
1238- friendlyId : "run_second" ,
1239- environment : authenticatedEnvironment ,
1240- taskIdentifier,
1241- payload : "{}" ,
1242- payloadType : "application/json" ,
1243- context : { } ,
1244- traceContext : { } ,
1245- traceId : "t12345-second" ,
1246- spanId : "s12345-second" ,
1247- workerQueue : "main" ,
1248- queue : "task/test-task" ,
1249- isTest : false ,
1250- tags : [ ] ,
1251- } ,
1252- prisma
1253- ) ;
1254-
1255- // Dequeue and start the second run
1256- await setTimeout ( 500 ) ;
1257- const dequeuedSecond = await engine . dequeueFromWorkerQueue ( {
1258- consumerId : "test_12345" ,
1259- workerQueue : "main" ,
1260- } ) ;
1261-
1262- const secondAttempt = await engine . startRunAttempt ( {
1263- runId : dequeuedSecond [ 0 ] . run . id ,
1264- snapshotId : dequeuedSecond [ 0 ] . snapshot . id ,
1265- } ) ;
1266- expect ( secondAttempt . snapshot . executionStatus ) . toBe ( "EXECUTING" ) ;
1267-
1268- // Now complete the waitpoint for the first run
1269- await engine . completeWaitpoint ( {
1270- id : waitpoint . waitpoint . id ,
1271- } ) ;
1272-
1273- // Wait for the continueRunIfUnblocked to process
1274- await setTimeout ( 500 ) ;
1275-
1276- // Verify the first run is now in QUEUED_EXECUTING state
1277- const executionDataAfter = await engine . getRunExecutionData ( { runId : firstRun . id } ) ;
1278- expect ( executionDataAfter ?. snapshot . executionStatus ) . toBe ( "QUEUED_EXECUTING" ) ;
1279- expect ( executionDataAfter ?. snapshot . description ) . toBe (
1280- "Run can continue, but is waiting for concurrency"
1281- ) ;
1282-
1283- // Verify the waitpoint is no longer blocking the first run
1284- const runWaitpoint = await prisma . taskRunWaitpoint . findFirst ( {
1285- where : {
1286- taskRunId : firstRun . id ,
1287- } ,
1288- include : {
1289- waitpoint : true ,
1290- } ,
1291- } ) ;
1292- expect ( runWaitpoint ) . toBeNull ( ) ;
1293-
1294- // Verify the waitpoint itself is completed
1295- const completedWaitpoint = await prisma . waitpoint . findUnique ( {
1296- where : {
1297- id : waitpoint . waitpoint . id ,
1298- } ,
1299- } ) ;
1300- assertNonNullable ( completedWaitpoint ) ;
1301- expect ( completedWaitpoint . status ) . toBe ( "COMPLETED" ) ;
1302-
1303- // Complete the second run so the first run can be dequeued
1304- const result = await engine . completeRunAttempt ( {
1305- runId : dequeuedSecond [ 0 ] . run . id ,
1306- snapshotId : secondAttempt . snapshot . id ,
1307- completion : {
1308- ok : true ,
1309- id : dequeuedSecond [ 0 ] . run . id ,
1310- output : `{"foo":"bar"}` ,
1311- outputType : "application/json" ,
1312- } ,
1313- } ) ;
1314-
1315- await setTimeout ( 500 ) ;
1316-
1317- let event : EventBusEventArgs < "workerNotification" > [ 0 ] | undefined = undefined ;
1318- engine . eventBus . on ( "workerNotification" , ( result ) => {
1319- event = result ;
1320- } ) ;
1321-
1322- // Verify the first run is back in the queue
1323- const queuedRun = await engine . dequeueFromWorkerQueue ( {
1324- consumerId : "test_12345" ,
1325- workerQueue : "main" ,
1326- } ) ;
1327-
1328- expect ( queuedRun . length ) . toBe ( 0 ) ;
1329-
1330- // Get the latest execution snapshot and make sure it's EXECUTING
1331- const executionData = await engine . getRunExecutionData ( { runId : firstRun . id } ) ;
1332- assertNonNullable ( executionData ) ;
1333- expect ( executionData . snapshot . executionStatus ) . toBe ( "EXECUTING" ) ;
1334-
1335- assertNonNullable ( event ) ;
1336- const notificationEvent = event as EventBusEventArgs < "workerNotification" > [ 0 ] ;
1337- expect ( notificationEvent . run . id ) . toBe ( firstRun . id ) ;
1338- expect ( notificationEvent . snapshot . executionStatus ) . toBe ( "EXECUTING" ) ;
1339- } finally {
1340- await engine . quit ( ) ;
1341- }
1342- }
1343- ) ;
1344-
13451132 containerTest (
13461133 "getSnapshotsSince returns correct snapshots and handles errors" ,
13471134 async ( { prisma, redisOptions } ) => {
0 commit comments