diff --git a/apps/webapp/app/runEngine/services/batchTrigger.server.ts b/apps/webapp/app/runEngine/services/batchTrigger.server.ts index d5984f0671..742f3dd1ba 100644 --- a/apps/webapp/app/runEngine/services/batchTrigger.server.ts +++ b/apps/webapp/app/runEngine/services/batchTrigger.server.ts @@ -58,7 +58,10 @@ export class RunEngineBatchTriggerService extends WithRunEngine { ) { super({ prisma }); - this._batchProcessingStrategy = batchProcessingStrategy ?? "parallel"; + // Eric note: We need to force sequential processing because when doing parallel, we end up with high-contention on the parent run lock + // becuase we are triggering a lot of runs at once, and each one is trying to lock the parent run. + // by forcing sequential, we are only ever locking the parent run for a single run at a time. + this._batchProcessingStrategy = "sequential"; } public async call( @@ -316,6 +319,14 @@ export class RunEngineBatchTriggerService extends WithRunEngine { } } + async #enqueueBatchTaskRun(options: BatchProcessingOptions, tx?: PrismaClientOrTransaction) { + await workerQueue.enqueue("runengine.processBatchTaskRun", options, { + tx, + jobKey: `RunEngineBatchTriggerService.process:${options.batchId}:${options.processingId}`, + }); + } + + // This is the function that the worker will call async processBatchTaskRun(options: BatchProcessingOptions) { logger.debug("[RunEngineBatchTrigger][processBatchTaskRun] Processing batch", { options, @@ -648,13 +659,6 @@ export class RunEngineBatchTriggerService extends WithRunEngine { : undefined; } - async #enqueueBatchTaskRun(options: BatchProcessingOptions, tx?: PrismaClientOrTransaction) { - await workerQueue.enqueue("runengine.processBatchTaskRun", options, { - tx, - jobKey: `RunEngineBatchTriggerService.process:${options.batchId}:${options.processingId}`, - }); - } - async #handlePayloadPacket( payload: any, pathPrefix: string, diff --git a/internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts b/internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts index 49541a9a68..74073566e7 100644 --- a/internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts +++ b/internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts @@ -680,40 +680,6 @@ describe("ReleaseConcurrencyQueue", () => { } }); - redisTest( - "Should retrieve metrics for all queues via getQueueMetrics", - async ({ redisContainer }) => { - const { queue } = createReleaseConcurrencyQueue(redisContainer, 1); - - // Set up multiple queues with different states - await queue.attemptToRelease({ name: "metrics-queue1" }, "run1"); // Consume 1 token from queue1 - - // Add more items to queue1 that will be queued due to no tokens - await queue.attemptToRelease({ name: "metrics-queue1" }, "run2"); // This will be queued - await queue.attemptToRelease({ name: "metrics-queue1" }, "run3"); // This will be queued - await queue.attemptToRelease({ name: "metrics-queue1" }, "run4"); // This will be queued - - const metrics = await queue.getQueueMetrics(); - - expect(metrics).toHaveLength(1); - expect(metrics[0].releaseQueue).toBe("metrics-queue1"); - expect(metrics[0].currentTokens).toBe(0); - expect(metrics[0].queueLength).toBe(3); - - // Now add 10 items to 100 different queues - for (let i = 0; i < 100; i++) { - for (let j = 0; j < 10; j++) { - await queue.attemptToRelease({ name: `metrics-queue2-${i}` }, `run${i}-${j}`); - } - } - - const metrics2 = await queue.getQueueMetrics(); - expect(metrics2.length).toBeGreaterThan(90); - - await queue.quit(); - } - ); - redisTest( "refillTokenIfInReleasings should refill token when releaserId is in the releasings set", async ({ redisContainer }) => { diff --git a/references/hello-world/src/trigger/batches.ts b/references/hello-world/src/trigger/batches.ts new file mode 100644 index 0000000000..e5220fe9f1 --- /dev/null +++ b/references/hello-world/src/trigger/batches.ts @@ -0,0 +1,39 @@ +import { task } from "@trigger.dev/sdk/v3"; +import { setTimeout } from "timers/promises"; + +export const batchTriggerAndWait = task({ + id: "batch-trigger-and-wait", + maxDuration: 60, + run: async (payload: { count: number }, { ctx }) => { + const payloads = Array.from({ length: payload.count }, (_, i) => ({ + payload: { waitSeconds: 1, output: `test${i}` }, + })); + + // First batch triggerAndWait with idempotency keys + const firstResults = await fixedLengthTask.batchTriggerAndWait(payloads); + }, +}); + +type Payload = { + waitSeconds: number; + error?: string; + output?: any; +}; + +export const fixedLengthTask = task({ + id: "fixed-length-lask", + retry: { + maxAttempts: 2, + maxTimeoutInMs: 100, + }, + machine: "micro", + run: async ({ waitSeconds = 1, error, output }: Payload) => { + await setTimeout(waitSeconds * 1000); + + if (error) { + throw new Error(error); + } + + return output; + }, +});