Skip to content

Commit ca65e2b

Browse files
committed
ensure skipChecks optimization validates at batch level
1 parent fae98c0 commit ca65e2b

File tree

3 files changed

+163
-18
lines changed

3 files changed

+163
-18
lines changed

apps/webapp/app/env.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -761,6 +761,8 @@ const EnvironmentSchema = z.object({
761761
.int()
762762
.default(60_000 * 5), // 5 minutes
763763

764+
BATCH_TRIGGER_CACHED_RUNS_CHECK_ENABLED: BoolEnv.default("false"),
765+
764766
BATCH_TRIGGER_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
765767
BATCH_TRIGGER_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
766768
BATCH_TRIGGER_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),

apps/webapp/app/runEngine/concerns/queues.server.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,11 @@ export class DefaultQueueManager implements QueueManager {
177177
return task.queue.name ?? defaultQueueName;
178178
}
179179

180-
async validateQueueLimits(environment: AuthenticatedEnvironment): Promise<QueueValidationResult> {
181-
const queueSizeGuard = await guardQueueSizeLimitsForEnv(this.engine, environment);
180+
async validateQueueLimits(
181+
environment: AuthenticatedEnvironment,
182+
itemsToAdd?: number
183+
): Promise<QueueValidationResult> {
184+
const queueSizeGuard = await guardQueueSizeLimitsForEnv(this.engine, environment, itemsToAdd);
182185

183186
logger.debug("Queue size guard result", {
184187
queueSizeGuard,

apps/webapp/app/runEngine/services/batchTrigger.server.ts

Lines changed: 156 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,25 @@
11
import {
2-
BatchTriggerTaskV2RequestBody,
3-
BatchTriggerTaskV3RequestBody,
4-
BatchTriggerTaskV3Response,
5-
IOPacket,
2+
type BatchTriggerTaskV2RequestBody,
3+
type BatchTriggerTaskV3RequestBody,
4+
type BatchTriggerTaskV3Response,
5+
type IOPacket,
66
packetRequiresOffloading,
77
parsePacket,
88
} from "@trigger.dev/core/v3";
99
import { BatchId, RunId } from "@trigger.dev/core/v3/isomorphic";
10-
import { BatchTaskRun, Prisma } from "@trigger.dev/database";
10+
import { type BatchTaskRun, Prisma } from "@trigger.dev/database";
1111
import { Evt } from "evt";
1212
import { z } from "zod";
13-
import { prisma, PrismaClientOrTransaction } from "~/db.server";
13+
import { prisma, type PrismaClientOrTransaction } from "~/db.server";
1414
import { env } from "~/env.server";
15-
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
15+
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1616
import { logger } from "~/services/logger.server";
17-
import { getEntitlement } from "~/services/platform.v3.server";
1817
import { batchTriggerWorker } from "~/v3/batchTriggerWorker.server";
18+
import { DefaultQueueManager } from "../concerns/queues.server";
19+
import { DefaultTriggerTaskValidator } from "../validators/triggerTaskValidator";
1920
import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../../v3/r2.server";
2021
import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server";
21-
import { OutOfEntitlementError, TriggerTaskService } from "../../v3/services/triggerTask.server";
22+
import { TriggerTaskService } from "../../v3/services/triggerTask.server";
2223
import { startActiveSpan } from "../../v3/tracer.server";
2324

2425
const PROCESSING_BATCH_SIZE = 50;
@@ -36,6 +37,7 @@ export const BatchProcessingOptions = z.object({
3637
strategy: BatchProcessingStrategy,
3738
parentRunId: z.string().optional(),
3839
resumeParentOnCompletion: z.boolean().optional(),
40+
planType: z.string().optional(),
3941
});
4042

4143
export type BatchProcessingOptions = z.infer<typeof BatchProcessingOptions>;
@@ -53,13 +55,18 @@ export type BatchTriggerTaskServiceOptions = {
5355
export class RunEngineBatchTriggerService extends WithRunEngine {
5456
private _batchProcessingStrategy: BatchProcessingStrategy;
5557
public onBatchTaskRunCreated: Evt<BatchTaskRun> = new Evt();
58+
private readonly queueConcern: DefaultQueueManager;
59+
private readonly validator: DefaultTriggerTaskValidator;
5660

5761
constructor(
5862
batchProcessingStrategy?: BatchProcessingStrategy,
5963
protected readonly _prisma: PrismaClientOrTransaction = prisma
6064
) {
6165
super({ prisma });
6266

67+
this.queueConcern = new DefaultQueueManager(this._prisma, this._engine);
68+
this.validator = new DefaultTriggerTaskValidator();
69+
6370
// Eric note: We need to force sequential processing because when doing parallel, we end up with high-contention on the parent run lock
6471
// becuase we are triggering a lot of runs at once, and each one is trying to lock the parent run.
6572
// by forcing sequential, we are only ever locking the parent run for a single run at a time.
@@ -80,13 +87,18 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
8087

8188
span.setAttribute("batchId", friendlyId);
8289

83-
if (environment.type !== "DEVELOPMENT") {
84-
const result = await getEntitlement(environment.organizationId);
85-
if (result && result.hasAccess === false) {
86-
throw new OutOfEntitlementError();
87-
}
90+
// Validate entitlement and extract planType for batch runs
91+
const entitlementValidation = await this.validator.validateEntitlement({
92+
environment,
93+
});
94+
95+
if (!entitlementValidation.ok) {
96+
throw entitlementValidation.error;
8897
}
8998

99+
// Extract plan type from entitlement response
100+
const planType = entitlementValidation.plan?.type;
101+
90102
// Upload to object store
91103
const payloadPacket = await this.#handlePayloadPacket(
92104
body.items,
@@ -99,7 +111,8 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
99111
payloadPacket,
100112
environment,
101113
body,
102-
options
114+
options,
115+
planType
103116
);
104117

105118
if (!batch) {
@@ -152,7 +165,8 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
152165
payloadPacket: IOPacket,
153166
environment: AuthenticatedEnvironment,
154167
body: BatchTriggerTaskV2RequestBody,
155-
options: BatchTriggerTaskServiceOptions = {}
168+
options: BatchTriggerTaskServiceOptions = {},
169+
planType?: string
156170
) {
157171
if (body.items.length <= ASYNC_BATCH_PROCESS_SIZE_THRESHOLD) {
158172
const batch = await this._prisma.batchTaskRun.create({
@@ -191,6 +205,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
191205
options,
192206
parentRunId: body.parentRunId,
193207
resumeParentOnCompletion: body.resumeParentOnCompletion,
208+
planType,
194209
});
195210

196211
switch (result.status) {
@@ -220,6 +235,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
220235
strategy: "sequential",
221236
parentRunId: body.parentRunId,
222237
resumeParentOnCompletion: body.resumeParentOnCompletion,
238+
planType,
223239
});
224240

225241
return batch;
@@ -242,6 +258,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
242258
strategy: "sequential",
243259
parentRunId: body.parentRunId,
244260
resumeParentOnCompletion: body.resumeParentOnCompletion,
261+
planType,
245262
});
246263

247264
return batch;
@@ -285,6 +302,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
285302
strategy: this._batchProcessingStrategy,
286303
parentRunId: body.parentRunId,
287304
resumeParentOnCompletion: body.resumeParentOnCompletion,
305+
planType,
288306
});
289307

290308
break;
@@ -307,6 +325,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
307325
strategy: this._batchProcessingStrategy,
308326
parentRunId: body.parentRunId,
309327
resumeParentOnCompletion: body.resumeParentOnCompletion,
328+
planType,
310329
})
311330
)
312331
);
@@ -410,6 +429,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
410429
options: $options,
411430
parentRunId: options.parentRunId,
412431
resumeParentOnCompletion: options.resumeParentOnCompletion,
432+
planType: options.planType,
413433
});
414434

415435
switch (result.status) {
@@ -443,6 +463,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
443463
strategy: options.strategy,
444464
parentRunId: options.parentRunId,
445465
resumeParentOnCompletion: options.resumeParentOnCompletion,
466+
planType: options.planType,
446467
});
447468
}
448469

@@ -470,6 +491,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
470491
strategy: options.strategy,
471492
parentRunId: options.parentRunId,
472493
resumeParentOnCompletion: options.resumeParentOnCompletion,
494+
planType: options.planType,
473495
});
474496
} else {
475497
await this.#enqueueBatchTaskRun({
@@ -486,6 +508,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
486508
strategy: options.strategy,
487509
parentRunId: options.parentRunId,
488510
resumeParentOnCompletion: options.resumeParentOnCompletion,
511+
planType: options.planType,
489512
});
490513
}
491514

@@ -503,6 +526,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
503526
options,
504527
parentRunId,
505528
resumeParentOnCompletion,
529+
planType,
506530
}: {
507531
batch: BatchTaskRun;
508532
environment: AuthenticatedEnvironment;
@@ -512,6 +536,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
512536
options?: BatchTriggerTaskServiceOptions;
513537
parentRunId?: string | undefined;
514538
resumeParentOnCompletion?: boolean | undefined;
539+
planType?: string;
515540
}): Promise<
516541
| { status: "COMPLETE" }
517542
| { status: "INCOMPLETE"; workingIndex: number }
@@ -520,6 +545,35 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
520545
// Grab the next PROCESSING_BATCH_SIZE items
521546
const itemsToProcess = items.slice(currentIndex, currentIndex + batchSize);
522547

548+
const newRunCount = await this.#countNewRuns(environment, itemsToProcess);
549+
550+
// Only validate queue size if we have new runs to create, i.e. they're not all cached
551+
if (newRunCount > 0) {
552+
const queueSizeGuard = await this.queueConcern.validateQueueLimits(environment, newRunCount);
553+
554+
logger.debug("Queue size guard result for chunk", {
555+
batchId: batch.friendlyId,
556+
currentIndex,
557+
runCount: batch.runCount,
558+
newRunCount,
559+
queueSizeGuard,
560+
});
561+
562+
if (!queueSizeGuard.ok) {
563+
return {
564+
status: "ERROR",
565+
error: `Cannot trigger ${newRunCount} new tasks as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`,
566+
workingIndex: currentIndex,
567+
};
568+
}
569+
} else {
570+
logger.debug("[RunEngineBatchTrigger][processBatchTaskRun] All runs are cached", {
571+
batchId: batch.friendlyId,
572+
currentIndex,
573+
runCount: batch.runCount,
574+
});
575+
}
576+
523577
logger.debug("[RunEngineBatchTrigger][processBatchTaskRun] Processing batch items", {
524578
batchId: batch.friendlyId,
525579
currentIndex,
@@ -540,6 +594,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
540594
options,
541595
parentRunId,
542596
resumeParentOnCompletion,
597+
planType,
543598
});
544599

545600
if (!run) {
@@ -615,6 +670,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
615670
options,
616671
parentRunId,
617672
resumeParentOnCompletion,
673+
planType,
618674
}: {
619675
batch: BatchTaskRun;
620676
environment: AuthenticatedEnvironment;
@@ -623,6 +679,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
623679
options?: BatchTriggerTaskServiceOptions;
624680
parentRunId: string | undefined;
625681
resumeParentOnCompletion: boolean | undefined;
682+
planType?: string;
626683
}) {
627684
logger.debug("[RunEngineBatchTrigger][processBatchTaskRunItem] Processing item", {
628685
batchId: batch.friendlyId,
@@ -649,6 +706,8 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
649706
spanParentAsLink: options?.spanParentAsLink,
650707
batchId: batch.id,
651708
batchIndex: currentIndex,
709+
skipChecks: true, // Skip entitlement and queue checks since we already validated at batch/chunk level
710+
planType, // Pass planType from batch-level entitlement check
652711
},
653712
"V2"
654713
);
@@ -691,4 +750,85 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
691750
};
692751
});
693752
}
753+
754+
#groupItemsByTaskIdentifier(
755+
items: BatchTriggerTaskV2RequestBody["items"]
756+
): Record<string, BatchTriggerTaskV2RequestBody["items"]> {
757+
return items.reduce((acc, item) => {
758+
if (!item.options?.idempotencyKey) return acc;
759+
760+
if (!acc[item.task]) {
761+
acc[item.task] = [];
762+
}
763+
acc[item.task].push(item);
764+
return acc;
765+
}, {} as Record<string, BatchTriggerTaskV2RequestBody["items"]>);
766+
}
767+
768+
async #countNewRuns(
769+
environment: AuthenticatedEnvironment,
770+
items: BatchTriggerTaskV2RequestBody["items"]
771+
): Promise<number> {
772+
// If cached runs check is disabled, return the total number of items
773+
if (!env.BATCH_TRIGGER_CACHED_RUNS_CHECK_ENABLED) {
774+
return items.length;
775+
}
776+
777+
// Group items by taskIdentifier for efficient lookup
778+
const itemsByTask = this.#groupItemsByTaskIdentifier(items);
779+
780+
// If no items have idempotency keys, all are new runs
781+
if (Object.keys(itemsByTask).length === 0) {
782+
return items.length;
783+
}
784+
785+
// Fetch cached runs for each task identifier separately to make use of the index
786+
const cachedRuns = await Promise.all(
787+
Object.entries(itemsByTask).map(([taskIdentifier, taskItems]) =>
788+
this._prisma.taskRun.findMany({
789+
where: {
790+
runtimeEnvironmentId: environment.id,
791+
taskIdentifier,
792+
idempotencyKey: {
793+
in: taskItems.map((i) => i.options?.idempotencyKey).filter(Boolean),
794+
},
795+
},
796+
select: {
797+
idempotencyKey: true,
798+
idempotencyKeyExpiresAt: true,
799+
},
800+
})
801+
)
802+
).then((results) => results.flat());
803+
804+
// Create a Map for O(1) lookups instead of O(m) find operations
805+
const cachedRunsMap = new Map(cachedRuns.map((run) => [run.idempotencyKey, run]));
806+
807+
// Count items that are NOT cached (or have expired cache)
808+
let newRunCount = 0;
809+
const now = new Date();
810+
811+
for (const item of items) {
812+
const idempotencyKey = item.options?.idempotencyKey;
813+
814+
if (!idempotencyKey) {
815+
// No idempotency key = always a new run
816+
newRunCount++;
817+
continue;
818+
}
819+
820+
const cachedRun = cachedRunsMap.get(idempotencyKey);
821+
822+
if (!cachedRun) {
823+
// No cached run = new run
824+
newRunCount++;
825+
} else if (cachedRun.idempotencyKeyExpiresAt && cachedRun.idempotencyKeyExpiresAt < now) {
826+
// Expired cached run = new run
827+
newRunCount++;
828+
}
829+
// else: valid cached run = not a new run
830+
}
831+
832+
return newRunCount;
833+
}
694834
}

0 commit comments

Comments
 (0)