11import { containerTest } from "@internal/testcontainers" ;
22import { trace } from "@internal/tracing" ;
3+ import { DequeuedMessage } from "@trigger.dev/core/v3" ;
34import { generateFriendlyId } from "@trigger.dev/core/v3/isomorphic" ;
45import { PrismaClientOrTransaction } from "@trigger.dev/database" ;
56import { expect } from "vitest" ;
67import { MinimalAuthenticatedEnvironment } from "../../shared/index.js" ;
78import { RunEngine } from "../index.js" ;
89import { setupAuthenticatedEnvironment , setupBackgroundWorker } from "./setup.js" ;
9- import { DequeuedMessage } from "@trigger.dev/core/v3" ;
10- import { setTimeout } from "timers/promises" ;
1110
1211vi . setConfig ( { testTimeout : 60_000 } ) ;
1312
@@ -62,12 +61,9 @@ describe("RunEngine dequeuing", () => {
6261 } ) ;
6362 expect ( runs . length ) . toBe ( 10 ) ;
6463
65- //check the queue length
66- const queueLength = await engine . runQueue . lengthOfEnvQueue ( authenticatedEnvironment ) ;
67- expect ( queueLength ) . toBe ( 10 ) ;
68-
6964 //dequeue
70- await setTimeout ( 500 ) ;
65+ await engine . runQueue . processMasterQueueForEnvironment ( authenticatedEnvironment . id , 5 ) ;
66+
7167 const dequeued : DequeuedMessage [ ] = [ ] ;
7268 for ( let i = 0 ; i < 5 ; i ++ ) {
7369 dequeued . push (
@@ -83,93 +79,6 @@ describe("RunEngine dequeuing", () => {
8379 await engine . quit ( ) ;
8480 }
8581 } ) ;
86-
87- //This will fail until we support dequeuing multiple runs from a single environment
88- containerTest . fails (
89- "Dequeues runs within machine constraints" ,
90- async ( { prisma, redisOptions } ) => {
91- const authenticatedEnvironment = await setupAuthenticatedEnvironment ( prisma , "PRODUCTION" ) ;
92-
93- const engine = new RunEngine ( {
94- prisma,
95- worker : {
96- redis : redisOptions ,
97- workers : 1 ,
98- tasksPerWorker : 10 ,
99- pollIntervalMs : 100 ,
100- } ,
101- queue : {
102- redis : redisOptions ,
103- masterQueueConsumersDisabled : true ,
104- processWorkerQueueDebounceMs : 50 ,
105- } ,
106- runLock : {
107- redis : redisOptions ,
108- } ,
109- machines : {
110- defaultMachine : "small-1x" ,
111- machines : {
112- "small-1x" : {
113- name : "small-1x" as const ,
114- cpu : 0.5 ,
115- memory : 0.5 ,
116- centsPerMs : 0.0001 ,
117- } ,
118- } ,
119- baseCostInCents : 0.0005 ,
120- } ,
121- tracer : trace . getTracer ( "test" , "0.0.0" ) ,
122- } ) ;
123-
124- try {
125- const taskIdentifier = "test-task" ;
126-
127- //create background worker
128- await setupBackgroundWorker ( engine , authenticatedEnvironment , taskIdentifier , {
129- preset : "small-1x" ,
130- } ) ;
131-
132- //trigger the runs
133- const runs = await triggerRuns ( {
134- engine,
135- environment : authenticatedEnvironment ,
136- taskIdentifier,
137- prisma,
138- count : 20 ,
139- } ) ;
140- expect ( runs . length ) . toBe ( 20 ) ;
141-
142- //check the queue length
143- const queueLength = await engine . runQueue . lengthOfEnvQueue ( authenticatedEnvironment ) ;
144- expect ( queueLength ) . toBe ( 20 ) ;
145-
146- //dequeue
147- await setTimeout ( 500 ) ;
148- const dequeued = await engine . dequeueFromWorkerQueue ( {
149- consumerId : "test_12345" ,
150- workerQueue : "main" ,
151- } ) ;
152- expect ( dequeued . length ) . toBe ( 2 ) ;
153-
154- //check the queue length
155- const queueLength2 = await engine . runQueue . lengthOfEnvQueue ( authenticatedEnvironment ) ;
156- expect ( queueLength2 ) . toBe ( 18 ) ;
157-
158- await setTimeout ( 500 ) ;
159- const dequeued2 = await engine . dequeueFromWorkerQueue ( {
160- consumerId : "test_12345" ,
161- workerQueue : "main" ,
162- } ) ;
163- expect ( dequeued2 . length ) . toBe ( 6 ) ;
164-
165- //check the queue length
166- const queueLength3 = await engine . runQueue . lengthOfEnvQueue ( authenticatedEnvironment ) ;
167- expect ( queueLength3 ) . toBe ( 12 ) ;
168- } finally {
169- await engine . quit ( ) ;
170- }
171- }
172- ) ;
17382} ) ;
17483
17584async function triggerRuns ( {
0 commit comments