@@ -362,4 +362,230 @@ describe("RunEngine batchTriggerAndWait", () => {
362362 engine . quit ( ) ;
363363 }
364364 } ) ;
365+
366+ containerTest (
367+ "batch ID should not carry over to triggerAndWait" ,
368+ async ( { prisma, redisOptions } ) => {
369+ //create environment
370+ const authenticatedEnvironment = await setupAuthenticatedEnvironment ( prisma , "PRODUCTION" ) ;
371+
372+ const engine = new RunEngine ( {
373+ prisma,
374+ worker : {
375+ redis : redisOptions ,
376+ workers : 1 ,
377+ tasksPerWorker : 10 ,
378+ pollIntervalMs : 20 ,
379+ } ,
380+ queue : {
381+ redis : redisOptions ,
382+ } ,
383+ runLock : {
384+ redis : redisOptions ,
385+ } ,
386+ machines : {
387+ defaultMachine : "small-1x" ,
388+ machines : {
389+ "small-1x" : {
390+ name : "small-1x" as const ,
391+ cpu : 0.5 ,
392+ memory : 0.5 ,
393+ centsPerMs : 0.0001 ,
394+ } ,
395+ } ,
396+ baseCostInCents : 0.0001 ,
397+ } ,
398+ tracer : trace . getTracer ( "test" , "0.0.0" ) ,
399+ } ) ;
400+
401+ try {
402+ const parentTask = "parent-task" ;
403+ const batchChildTask = "batch-child-task" ;
404+ const triggerAndWaitChildTask = "trigger-and-wait-child-task" ;
405+
406+ //create background worker
407+ await setupBackgroundWorker ( engine , authenticatedEnvironment , [
408+ parentTask ,
409+ batchChildTask ,
410+ triggerAndWaitChildTask ,
411+ ] ) ;
412+
413+ //create a batch
414+ const batch = await prisma . batchTaskRun . create ( {
415+ data : {
416+ friendlyId : generateFriendlyId ( "batch" ) ,
417+ runtimeEnvironmentId : authenticatedEnvironment . id ,
418+ } ,
419+ } ) ;
420+
421+ //trigger the parent run
422+ const parentRun = await engine . trigger (
423+ {
424+ number : 1 ,
425+ friendlyId : "run_p1234" ,
426+ environment : authenticatedEnvironment ,
427+ taskIdentifier : parentTask ,
428+ payload : "{}" ,
429+ payloadType : "application/json" ,
430+ context : { } ,
431+ traceContext : { } ,
432+ traceId : "t12345" ,
433+ spanId : "s12345" ,
434+ masterQueue : "main" ,
435+ queue : `task/${ parentTask } ` ,
436+ isTest : false ,
437+ tags : [ ] ,
438+ } ,
439+ prisma
440+ ) ;
441+
442+ //dequeue parent
443+ const dequeued = await engine . dequeueFromMasterQueue ( {
444+ consumerId : "test_12345" ,
445+ masterQueue : parentRun . masterQueue ,
446+ maxRunCount : 10 ,
447+ } ) ;
448+
449+ //create an attempt
450+ const initialExecutionData = await engine . getRunExecutionData ( { runId : parentRun . id } ) ;
451+ assertNonNullable ( initialExecutionData ) ;
452+ const attemptResult = await engine . startRunAttempt ( {
453+ runId : parentRun . id ,
454+ snapshotId : initialExecutionData . snapshot . id ,
455+ } ) ;
456+
457+ //block using the batch
458+ await engine . blockRunWithCreatedBatch ( {
459+ runId : parentRun . id ,
460+ batchId : batch . id ,
461+ environmentId : authenticatedEnvironment . id ,
462+ projectId : authenticatedEnvironment . projectId ,
463+ organizationId : authenticatedEnvironment . organizationId ,
464+ } ) ;
465+
466+ const afterBlockedByBatch = await engine . getRunExecutionData ( { runId : parentRun . id } ) ;
467+ assertNonNullable ( afterBlockedByBatch ) ;
468+ expect ( afterBlockedByBatch . snapshot . executionStatus ) . toBe ( "EXECUTING_WITH_WAITPOINTS" ) ;
469+ expect ( afterBlockedByBatch . batch ?. id ) . toBe ( batch . id ) ;
470+
471+ //create a batch child
472+ const batchChild = await engine . trigger (
473+ {
474+ number : 1 ,
475+ friendlyId : "run_c1234" ,
476+ environment : authenticatedEnvironment ,
477+ taskIdentifier : batchChildTask ,
478+ payload : "{}" ,
479+ payloadType : "application/json" ,
480+ context : { } ,
481+ traceContext : { } ,
482+ traceId : "t12345" ,
483+ spanId : "s12345" ,
484+ masterQueue : "main" ,
485+ queue : `task/${ batchChildTask } ` ,
486+ isTest : false ,
487+ tags : [ ] ,
488+ resumeParentOnCompletion : true ,
489+ parentTaskRunId : parentRun . id ,
490+ batch : { id : batch . id , index : 0 } ,
491+ } ,
492+ prisma
493+ ) ;
494+
495+ const parentAfterBatchChild = await engine . getRunExecutionData ( { runId : parentRun . id } ) ;
496+ assertNonNullable ( parentAfterBatchChild ) ;
497+ expect ( parentAfterBatchChild . snapshot . executionStatus ) . toBe ( "EXECUTING_WITH_WAITPOINTS" ) ;
498+ expect ( parentAfterBatchChild . batch ?. id ) . toBe ( batch . id ) ;
499+
500+ await engine . unblockRunForCreatedBatch ( {
501+ runId : parentRun . id ,
502+ batchId : batch . id ,
503+ environmentId : authenticatedEnvironment . id ,
504+ projectId : authenticatedEnvironment . projectId ,
505+ } ) ;
506+
507+ //dequeue and start the batch child
508+ const dequeuedBatchChild = await engine . dequeueFromMasterQueue ( {
509+ consumerId : "test_12345" ,
510+ masterQueue : batchChild . masterQueue ,
511+ maxRunCount : 1 ,
512+ } ) ;
513+
514+ expect ( dequeuedBatchChild . length ) . toBe ( 1 ) ;
515+
516+ const batchChildAttempt = await engine . startRunAttempt ( {
517+ runId : batchChild . id ,
518+ snapshotId : dequeuedBatchChild [ 0 ] . snapshot . id ,
519+ } ) ;
520+
521+ //complete the batch child
522+ await engine . completeRunAttempt ( {
523+ runId : batchChildAttempt . run . id ,
524+ snapshotId : batchChildAttempt . snapshot . id ,
525+ completion : {
526+ id : batchChild . id ,
527+ ok : true ,
528+ output : '{"foo":"bar"}' ,
529+ outputType : "application/json" ,
530+ } ,
531+ } ) ;
532+
533+ await setTimeout ( 500 ) ;
534+
535+ const runWaitpointsAfterBatchChild = await prisma . taskRunWaitpoint . findMany ( {
536+ where : {
537+ taskRunId : parentRun . id ,
538+ } ,
539+ include : {
540+ waitpoint : true ,
541+ } ,
542+ } ) ;
543+ expect ( runWaitpointsAfterBatchChild . length ) . toBe ( 0 ) ;
544+
545+ //parent snapshot
546+ const parentExecutionDataAfterBatchChildComplete = await engine . getRunExecutionData ( {
547+ runId : parentRun . id ,
548+ } ) ;
549+ assertNonNullable ( parentExecutionDataAfterBatchChildComplete ) ;
550+ expect ( parentExecutionDataAfterBatchChildComplete . snapshot . executionStatus ) . toBe (
551+ "EXECUTING"
552+ ) ;
553+ expect ( parentExecutionDataAfterBatchChildComplete . batch ?. id ) . toBe ( batch . id ) ;
554+ expect ( parentExecutionDataAfterBatchChildComplete . completedWaitpoints . length ) . toBe ( 2 ) ;
555+
556+ //now triggerAndWait
557+ const triggerAndWaitChildRun = await engine . trigger (
558+ {
559+ number : 1 ,
560+ friendlyId : "run_c123456" ,
561+ environment : authenticatedEnvironment ,
562+ taskIdentifier : triggerAndWaitChildTask ,
563+ payload : "{}" ,
564+ payloadType : "application/json" ,
565+ context : { } ,
566+ traceContext : { } ,
567+ traceId : "t123456" ,
568+ spanId : "s123456" ,
569+ masterQueue : "main" ,
570+ queue : `task/${ triggerAndWaitChildTask } ` ,
571+ isTest : false ,
572+ tags : [ ] ,
573+ resumeParentOnCompletion : true ,
574+ parentTaskRunId : parentRun . id ,
575+ } ,
576+ prisma
577+ ) ;
578+
579+ //check that the parent's execution data doesn't have a batch ID
580+ const parentAfterTriggerAndWait = await engine . getRunExecutionData ( { runId : parentRun . id } ) ;
581+ assertNonNullable ( parentAfterTriggerAndWait ) ;
582+ expect ( parentAfterTriggerAndWait . snapshot . executionStatus ) . toBe (
583+ "EXECUTING_WITH_WAITPOINTS"
584+ ) ;
585+ expect ( parentAfterTriggerAndWait . batch ) . toBeUndefined ( ) ;
586+ } finally {
587+ engine . quit ( ) ;
588+ }
589+ }
590+ ) ;
365591} ) ;
0 commit comments