@@ -20,7 +20,7 @@ import {
2020 ValidationResult ,
2121} from "~/runEngine/types" ;
2222import { RunEngineTriggerTaskService } from "../../app/runEngine/services/triggerTask.server" ;
23- import { TaskRun } from "@trigger.dev/database" ;
23+ import { TaskRun , TaskRunStatus } from "@trigger.dev/database" ;
2424
2525vi . setConfig ( { testTimeout : 30_000 } ) ; // 30 seconds timeout
2626
@@ -183,4 +183,277 @@ describe("RunEngineTriggerTaskService", () => {
183183
184184 await engine . quit ( ) ;
185185 } ) ;
186+
187+ containerTest ( "should handle idempotency keys correctly" , async ( { prisma, redisOptions } ) => {
188+ const engine = new RunEngine ( {
189+ prisma,
190+ worker : {
191+ redis : redisOptions ,
192+ workers : 1 ,
193+ tasksPerWorker : 10 ,
194+ pollIntervalMs : 100 ,
195+ } ,
196+ queue : {
197+ redis : redisOptions ,
198+ } ,
199+ runLock : {
200+ redis : redisOptions ,
201+ } ,
202+ machines : {
203+ defaultMachine : "small-1x" ,
204+ machines : {
205+ "small-1x" : {
206+ name : "small-1x" as const ,
207+ cpu : 0.5 ,
208+ memory : 0.5 ,
209+ centsPerMs : 0.0001 ,
210+ } ,
211+ } ,
212+ baseCostInCents : 0.0005 ,
213+ } ,
214+ tracer : trace . getTracer ( "test" , "0.0.0" ) ,
215+ } ) ;
216+
217+ const authenticatedEnvironment = await setupAuthenticatedEnvironment ( prisma , "PRODUCTION" ) ;
218+
219+ const taskIdentifier = "test-task" ;
220+
221+ //create background worker
222+ await setupBackgroundWorker ( engine , authenticatedEnvironment , taskIdentifier ) ;
223+
224+ const queuesManager = new DefaultQueueManager ( prisma , engine ) ;
225+
226+ const idempotencyKeyConcern = new IdempotencyKeyConcern (
227+ prisma ,
228+ engine ,
229+ new MockTraceEventConcern ( )
230+ ) ;
231+
232+ const triggerTaskService = new RunEngineTriggerTaskService ( {
233+ engine,
234+ prisma,
235+ runNumberIncrementer : new MockRunNumberIncrementer ( ) ,
236+ payloadProcessor : new MockPayloadProcessor ( ) ,
237+ queueConcern : queuesManager ,
238+ idempotencyKeyConcern,
239+ validator : new MockTriggerTaskValidator ( ) ,
240+ traceEventConcern : new MockTraceEventConcern ( ) ,
241+ tracer : trace . getTracer ( "test" , "0.0.0" ) ,
242+ } ) ;
243+
244+ const result = await triggerTaskService . call ( {
245+ taskId : taskIdentifier ,
246+ environment : authenticatedEnvironment ,
247+ body : {
248+ payload : { test : "test" } ,
249+ options : {
250+ idempotencyKey : "test-idempotency-key" ,
251+ } ,
252+ } ,
253+ } ) ;
254+
255+ expect ( result ) . toBeDefined ( ) ;
256+ expect ( result ?. run . friendlyId ) . toBeDefined ( ) ;
257+ expect ( result ?. run . status ) . toBe ( "PENDING" ) ;
258+ expect ( result ?. isCached ) . toBe ( false ) ;
259+
260+ const run = await prisma . taskRun . findUnique ( {
261+ where : {
262+ id : result ?. run . id ,
263+ } ,
264+ } ) ;
265+
266+ expect ( run ) . toBeDefined ( ) ;
267+ expect ( run ?. friendlyId ) . toBe ( result ?. run . friendlyId ) ;
268+ expect ( run ?. engine ) . toBe ( "V2" ) ;
269+ expect ( run ?. queuedAt ) . toBeDefined ( ) ;
270+ expect ( run ?. queue ) . toBe ( `task/${ taskIdentifier } ` ) ;
271+
272+ // Lets make sure the task is in the queue
273+ const queueLength = await engine . runQueue . lengthOfQueue (
274+ authenticatedEnvironment ,
275+ `task/${ taskIdentifier } `
276+ ) ;
277+ expect ( queueLength ) . toBe ( 1 ) ;
278+
279+ // Now lets try to trigger the same task with the same idempotency key
280+ const cachedResult = await triggerTaskService . call ( {
281+ taskId : taskIdentifier ,
282+ environment : authenticatedEnvironment ,
283+ body : {
284+ payload : { test : "test" } ,
285+ options : {
286+ idempotencyKey : "test-idempotency-key" ,
287+ } ,
288+ } ,
289+ } ) ;
290+
291+ expect ( cachedResult ) . toBeDefined ( ) ;
292+ expect ( cachedResult ?. run . friendlyId ) . toBe ( result ?. run . friendlyId ) ;
293+ expect ( cachedResult ?. isCached ) . toBe ( true ) ;
294+
295+ await engine . quit ( ) ;
296+ } ) ;
297+
298+ containerTest (
299+ "should resolve queue names correctly when locked to version" ,
300+ async ( { prisma, redisOptions } ) => {
301+ const engine = new RunEngine ( {
302+ prisma,
303+ worker : {
304+ redis : redisOptions ,
305+ workers : 1 ,
306+ tasksPerWorker : 10 ,
307+ pollIntervalMs : 100 ,
308+ } ,
309+ queue : {
310+ redis : redisOptions ,
311+ } ,
312+ runLock : {
313+ redis : redisOptions ,
314+ } ,
315+ machines : {
316+ defaultMachine : "small-1x" ,
317+ machines : {
318+ "small-1x" : {
319+ name : "small-1x" as const ,
320+ cpu : 0.5 ,
321+ memory : 0.5 ,
322+ centsPerMs : 0.0001 ,
323+ } ,
324+ } ,
325+ baseCostInCents : 0.0005 ,
326+ } ,
327+ tracer : trace . getTracer ( "test" , "0.0.0" ) ,
328+ } ) ;
329+
330+ const authenticatedEnvironment = await setupAuthenticatedEnvironment ( prisma , "PRODUCTION" ) ;
331+ const taskIdentifier = "test-task" ;
332+
333+ // Create a background worker with a specific version
334+ const worker = await setupBackgroundWorker ( engine , authenticatedEnvironment , taskIdentifier , {
335+ preset : "small-1x" ,
336+ } ) ;
337+
338+ // Create a specific queue for this worker
339+ const specificQueue = await prisma . taskQueue . create ( {
340+ data : {
341+ name : "specific-queue" ,
342+ friendlyId : "specific-queue" ,
343+ projectId : authenticatedEnvironment . projectId ,
344+ runtimeEnvironmentId : authenticatedEnvironment . id ,
345+ workers : {
346+ connect : {
347+ id : worker . worker . id ,
348+ } ,
349+ } ,
350+ } ,
351+ } ) ;
352+
353+ // Associate the task with the queue
354+ await prisma . backgroundWorkerTask . update ( {
355+ where : {
356+ workerId_slug : {
357+ workerId : worker . worker . id ,
358+ slug : taskIdentifier ,
359+ } ,
360+ } ,
361+ data : {
362+ queueId : specificQueue . id ,
363+ } ,
364+ } ) ;
365+
366+ const queuesManager = new DefaultQueueManager ( prisma , engine ) ;
367+ const idempotencyKeyConcern = new IdempotencyKeyConcern (
368+ prisma ,
369+ engine ,
370+ new MockTraceEventConcern ( )
371+ ) ;
372+
373+ const triggerTaskService = new RunEngineTriggerTaskService ( {
374+ engine,
375+ prisma,
376+ runNumberIncrementer : new MockRunNumberIncrementer ( ) ,
377+ payloadProcessor : new MockPayloadProcessor ( ) ,
378+ queueConcern : queuesManager ,
379+ idempotencyKeyConcern,
380+ validator : new MockTriggerTaskValidator ( ) ,
381+ traceEventConcern : new MockTraceEventConcern ( ) ,
382+ tracer : trace . getTracer ( "test" , "0.0.0" ) ,
383+ } ) ;
384+
385+ // Test case 1: Trigger with lockToVersion but no specific queue
386+ const result1 = await triggerTaskService . call ( {
387+ taskId : taskIdentifier ,
388+ environment : authenticatedEnvironment ,
389+ body : {
390+ payload : { test : "test" } ,
391+ options : {
392+ lockToVersion : worker . worker . version ,
393+ } ,
394+ } ,
395+ } ) ;
396+
397+ expect ( result1 ) . toBeDefined ( ) ;
398+ expect ( result1 ?. run . queue ) . toBe ( "specific-queue" ) ;
399+
400+ // Test case 2: Trigger with lockToVersion and specific queue
401+ const result2 = await triggerTaskService . call ( {
402+ taskId : taskIdentifier ,
403+ environment : authenticatedEnvironment ,
404+ body : {
405+ payload : { test : "test" } ,
406+ options : {
407+ lockToVersion : worker . worker . version ,
408+ queue : {
409+ name : "specific-queue" ,
410+ } ,
411+ } ,
412+ } ,
413+ } ) ;
414+
415+ expect ( result2 ) . toBeDefined ( ) ;
416+ expect ( result2 ?. run . queue ) . toBe ( "specific-queue" ) ;
417+ expect ( result2 ?. run . lockedQueueId ) . toBe ( specificQueue . id ) ;
418+
419+ // Test case 3: Try to use non-existent queue with locked version (should throw)
420+ await expect (
421+ triggerTaskService . call ( {
422+ taskId : taskIdentifier ,
423+ environment : authenticatedEnvironment ,
424+ body : {
425+ payload : { test : "test" } ,
426+ options : {
427+ lockToVersion : worker . worker . version ,
428+ queue : {
429+ name : "non-existent-queue" ,
430+ } ,
431+ } ,
432+ } ,
433+ } )
434+ ) . rejects . toThrow (
435+ `Specified queue 'non-existent-queue' not found or not associated with locked version '${ worker . worker . version } '`
436+ ) ;
437+
438+ // Test case 4: Trigger with a non-existent queue without a locked version
439+ const result4 = await triggerTaskService . call ( {
440+ taskId : taskIdentifier ,
441+ environment : authenticatedEnvironment ,
442+ body : {
443+ payload : { test : "test" } ,
444+ options : {
445+ queue : {
446+ name : "non-existent-queue" ,
447+ } ,
448+ } ,
449+ } ,
450+ } ) ;
451+
452+ expect ( result4 ) . toBeDefined ( ) ;
453+ expect ( result4 ?. run . queue ) . toBe ( "non-existent-queue" ) ;
454+ expect ( result4 ?. run . status ) . toBe ( "PENDING" ) ;
455+
456+ await engine . quit ( ) ;
457+ }
458+ ) ;
186459} ) ;
0 commit comments