Skip to content

Commit b06c645

Browse files
committed
v4: fix batchTriggerAndWait completion issues by processing batch chunks sequentially
1 parent 575413c commit b06c645

File tree

2 files changed

+51
-8
lines changed

2 files changed

+51
-8
lines changed

apps/webapp/app/runEngine/services/batchTrigger.server.ts

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,10 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
5858
) {
5959
super({ prisma });
6060

61-
this._batchProcessingStrategy = batchProcessingStrategy ?? "parallel";
61+
// Eric note: We need to force sequential processing because when doing parallel, we end up with high-contention on the parent run lock
62+
// becuase we are triggering a lot of runs at once, and each one is trying to lock the parent run.
63+
// by forcing sequential, we are only ever locking the parent run for a single run at a time.
64+
this._batchProcessingStrategy = "sequential";
6265
}
6366

6467
public async call(
@@ -316,6 +319,14 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
316319
}
317320
}
318321

322+
async #enqueueBatchTaskRun(options: BatchProcessingOptions, tx?: PrismaClientOrTransaction) {
323+
await workerQueue.enqueue("runengine.processBatchTaskRun", options, {
324+
tx,
325+
jobKey: `RunEngineBatchTriggerService.process:${options.batchId}:${options.processingId}`,
326+
});
327+
}
328+
329+
// This is the function that the worker will call
319330
async processBatchTaskRun(options: BatchProcessingOptions) {
320331
logger.debug("[RunEngineBatchTrigger][processBatchTaskRun] Processing batch", {
321332
options,
@@ -648,13 +659,6 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
648659
: undefined;
649660
}
650661

651-
async #enqueueBatchTaskRun(options: BatchProcessingOptions, tx?: PrismaClientOrTransaction) {
652-
await workerQueue.enqueue("runengine.processBatchTaskRun", options, {
653-
tx,
654-
jobKey: `RunEngineBatchTriggerService.process:${options.batchId}:${options.processingId}`,
655-
});
656-
}
657-
658662
async #handlePayloadPacket(
659663
payload: any,
660664
pathPrefix: string,
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import { task } from "@trigger.dev/sdk/v3";
2+
import { setTimeout } from "timers/promises";
3+
4+
export const batchTriggerAndWait = task({
5+
id: "batch-trigger-and-wait",
6+
maxDuration: 60,
7+
run: async (payload: { count: number }, { ctx }) => {
8+
const payloads = Array.from({ length: payload.count }, (_, i) => ({
9+
payload: { waitSeconds: 1, output: `test${i}` },
10+
}));
11+
12+
// First batch triggerAndWait with idempotency keys
13+
const firstResults = await fixedLengthTask.batchTriggerAndWait(payloads);
14+
},
15+
});
16+
17+
type Payload = {
18+
waitSeconds: number;
19+
error?: string;
20+
output?: any;
21+
};
22+
23+
export const fixedLengthTask = task({
24+
id: "fixed-length-lask",
25+
retry: {
26+
maxAttempts: 2,
27+
maxTimeoutInMs: 100,
28+
},
29+
machine: "micro",
30+
run: async ({ waitSeconds = 1, error, output }: Payload) => {
31+
await setTimeout(waitSeconds * 1000);
32+
33+
if (error) {
34+
throw new Error(error);
35+
}
36+
37+
return output;
38+
},
39+
});

0 commit comments

Comments
 (0)