Skip to content

Commit 64c4a99

Browse files
committed
WIP batch triggering
1 parent a469365 commit 64c4a99

File tree

3 files changed

+41
-56
lines changed

3 files changed

+41
-56
lines changed

apps/webapp/app/v3/services/batchTriggerV3.server.ts

Lines changed: 29 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1313
import { logger } from "~/services/logger.server";
1414
import { getEntitlement } from "~/services/platform.v3.server";
1515
import { workerQueue } from "~/services/worker.server";
16-
import { generateFriendlyId } from "../friendlyIdentifiers";
1716
import { marqs } from "../marqs/index.server";
1817
import { guardQueueSizeLimitsForEnv } from "../queueSizeLimits.server";
1918
import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../r2.server";
@@ -22,6 +21,7 @@ import { startActiveSpan } from "../tracer.server";
2221
import { BaseService, ServiceValidationError } from "./baseService.server";
2322
import { OutOfEntitlementError, TriggerTaskService } from "./triggerTask.server";
2423
import { z } from "zod";
24+
import { BatchId, RunId } from "@trigger.dev/core/v3/apps";
2525

2626
const PROCESSING_BATCH_SIZE = 50;
2727
const ASYNC_BATCH_PROCESS_SIZE_THRESHOLD = 20;
@@ -52,7 +52,7 @@ export type BatchTriggerTaskServiceOptions = {
5252
/**
5353
* Larger batches, used in Run Engine v2
5454
*/
55-
export class BatchTriggerV2Service extends BaseService {
55+
export class BatchTriggerV3Service extends BaseService {
5656
private _batchProcessingStrategy: BatchProcessingStrategy;
5757

5858
constructor(
@@ -90,7 +90,7 @@ export class BatchTriggerV2Service extends BaseService {
9090
existingBatch.idempotencyKeyExpiresAt &&
9191
existingBatch.idempotencyKeyExpiresAt < new Date()
9292
) {
93-
logger.debug("[BatchTriggerV2][call] Idempotency key has expired", {
93+
logger.debug("[BatchTriggerV3][call] Idempotency key has expired", {
9494
idempotencyKey: options.idempotencyKey,
9595
batch: {
9696
id: existingBatch.id,
@@ -115,38 +115,9 @@ export class BatchTriggerV2Service extends BaseService {
115115
}
116116
}
117117

118-
const batchId = generateFriendlyId("batch");
118+
const { id, friendlyId } = BatchId.generate();
119119

120-
span.setAttribute("batchId", batchId);
121-
122-
const dependentAttempt = body?.dependentAttempt
123-
? await this._prisma.taskRunAttempt.findUnique({
124-
where: { friendlyId: body.dependentAttempt },
125-
include: {
126-
taskRun: {
127-
select: {
128-
id: true,
129-
status: true,
130-
},
131-
},
132-
},
133-
})
134-
: undefined;
135-
136-
if (
137-
dependentAttempt &&
138-
(isFinalAttemptStatus(dependentAttempt.status) ||
139-
isFinalRunStatus(dependentAttempt.taskRun.status))
140-
) {
141-
logger.debug("[BatchTriggerV2][call] Dependent attempt or run is in a terminal state", {
142-
dependentAttempt: dependentAttempt,
143-
batchId,
144-
});
145-
146-
throw new ServiceValidationError(
147-
"Cannot process batch as the parent run is already in a terminal state"
148-
);
149-
}
120+
span.setAttribute("batchId", friendlyId);
150121

151122
if (environment.type !== "DEVELOPMENT") {
152123
const result = await getEntitlement(environment.organizationId);
@@ -175,9 +146,9 @@ export class BatchTriggerV2Service extends BaseService {
175146
: [];
176147

177148
if (cachedRuns.length) {
178-
logger.debug("[BatchTriggerV2][call] Found cached runs", {
149+
logger.debug("[BatchTriggerV3][call] Found cached runs", {
179150
cachedRuns,
180-
batchId,
151+
batchId: friendlyId,
181152
});
182153
}
183154

@@ -193,6 +164,8 @@ export class BatchTriggerV2Service extends BaseService {
193164
(r) => r.idempotencyKey === item.options?.idempotencyKey
194165
);
195166

167+
const runId = RunId.generate();
168+
196169
if (cachedRun) {
197170
if (
198171
cachedRun.idempotencyKeyExpiresAt &&
@@ -201,7 +174,7 @@ export class BatchTriggerV2Service extends BaseService {
201174
expiredRunIds.add(cachedRun.friendlyId);
202175

203176
return {
204-
id: generateFriendlyId("run"),
177+
id: runId.friendlyId,
205178
isCached: false,
206179
idempotencyKey: item.options?.idempotencyKey ?? undefined,
207180
taskIdentifier: item.task,
@@ -219,7 +192,7 @@ export class BatchTriggerV2Service extends BaseService {
219192
}
220193

221194
return {
222-
id: generateFriendlyId("run"),
195+
id: runId.friendlyId,
223196
isCached: false,
224197
idempotencyKey: item.options?.idempotencyKey ?? undefined,
225198
taskIdentifier: item.task,
@@ -230,13 +203,13 @@ export class BatchTriggerV2Service extends BaseService {
230203
const newRunCount = body.items.length - cachedRunCount;
231204

232205
if (newRunCount === 0) {
233-
logger.debug("[BatchTriggerV2][call] All runs are cached", {
234-
batchId,
206+
logger.debug("[BatchTriggerV3][call] All runs are cached", {
207+
batchId: friendlyId,
235208
});
236209

237210
await this._prisma.batchTaskRun.create({
238211
data: {
239-
friendlyId: batchId,
212+
friendlyId,
240213
runtimeEnvironmentId: environment.id,
241214
idempotencyKey: options.idempotencyKey,
242215
idempotencyKeyExpiresAt: options.idempotencyKeyExpiresAt,
@@ -323,7 +296,7 @@ export class BatchTriggerV2Service extends BaseService {
323296
} catch (error) {
324297
// Detect a prisma transaction Unique constraint violation
325298
if (error instanceof Prisma.PrismaClientKnownRequestError) {
326-
logger.debug("BatchTriggerV2: Prisma transaction error", {
299+
logger.debug("BatchTriggerV3: Prisma transaction error", {
327300
code: error.code,
328301
message: error.message,
329302
meta: error.meta,
@@ -397,15 +370,15 @@ export class BatchTriggerV2Service extends BaseService {
397370

398371
switch (result.status) {
399372
case "COMPLETE": {
400-
logger.debug("[BatchTriggerV2][call] Batch inline processing complete", {
373+
logger.debug("[BatchTriggerV3][call] Batch inline processing complete", {
401374
batchId: batch.friendlyId,
402375
currentIndex: 0,
403376
});
404377

405378
return batch;
406379
}
407380
case "INCOMPLETE": {
408-
logger.debug("[BatchTriggerV2][call] Batch inline processing incomplete", {
381+
logger.debug("[BatchTriggerV3][call] Batch inline processing incomplete", {
409382
batchId: batch.friendlyId,
410383
currentIndex: result.workingIndex,
411384
});
@@ -425,7 +398,7 @@ export class BatchTriggerV2Service extends BaseService {
425398
return batch;
426399
}
427400
case "ERROR": {
428-
logger.error("[BatchTriggerV2][call] Batch inline processing error", {
401+
logger.error("[BatchTriggerV3][call] Batch inline processing error", {
429402
batchId: batch.friendlyId,
430403
currentIndex: result.workingIndex,
431404
error: result.error,
@@ -545,15 +518,15 @@ export class BatchTriggerV2Service extends BaseService {
545518
}
546519

547520
async processBatchTaskRun(options: BatchProcessingOptions) {
548-
logger.debug("[BatchTriggerV2][processBatchTaskRun] Processing batch", {
521+
logger.debug("[BatchTriggerV3][processBatchTaskRun] Processing batch", {
549522
options,
550523
});
551524

552525
const $attemptCount = options.attemptCount + 1;
553526

554527
// Add early return if max attempts reached
555528
if ($attemptCount > MAX_ATTEMPTS) {
556-
logger.error("[BatchTriggerV2][processBatchTaskRun] Max attempts reached", {
529+
logger.error("[BatchTriggerV3][processBatchTaskRun] Max attempts reached", {
557530
options,
558531
attemptCount: $attemptCount,
559532
});
@@ -579,7 +552,7 @@ export class BatchTriggerV2Service extends BaseService {
579552

580553
// Check to make sure the currentIndex is not greater than the runCount
581554
if (options.range.start >= batch.runCount) {
582-
logger.debug("[BatchTriggerV2][processBatchTaskRun] currentIndex is greater than runCount", {
555+
logger.debug("[BatchTriggerV3][processBatchTaskRun] currentIndex is greater than runCount", {
583556
options,
584557
batchId: batch.friendlyId,
585558
runCount: batch.runCount,
@@ -601,7 +574,7 @@ export class BatchTriggerV2Service extends BaseService {
601574
const payload = await parsePacket(payloadPacket);
602575

603576
if (!payload) {
604-
logger.debug("[BatchTriggerV2][processBatchTaskRun] Failed to parse payload", {
577+
logger.debug("[BatchTriggerV3][processBatchTaskRun] Failed to parse payload", {
605578
options,
606579
batchId: batch.friendlyId,
607580
attemptCount: $attemptCount,
@@ -625,7 +598,7 @@ export class BatchTriggerV2Service extends BaseService {
625598

626599
switch (result.status) {
627600
case "COMPLETE": {
628-
logger.debug("[BatchTriggerV2][processBatchTaskRun] Batch processing complete", {
601+
logger.debug("[BatchTriggerV3][processBatchTaskRun] Batch processing complete", {
629602
options,
630603
batchId: batch.friendlyId,
631604
attemptCount: $attemptCount,
@@ -634,7 +607,7 @@ export class BatchTriggerV2Service extends BaseService {
634607
return;
635608
}
636609
case "INCOMPLETE": {
637-
logger.debug("[BatchTriggerV2][processBatchTaskRun] Batch processing incomplete", {
610+
logger.debug("[BatchTriggerV3][processBatchTaskRun] Batch processing incomplete", {
638611
batchId: batch.friendlyId,
639612
currentIndex: result.workingIndex,
640613
attemptCount: $attemptCount,
@@ -658,7 +631,7 @@ export class BatchTriggerV2Service extends BaseService {
658631
return;
659632
}
660633
case "ERROR": {
661-
logger.error("[BatchTriggerV2][processBatchTaskRun] Batch processing error", {
634+
logger.error("[BatchTriggerV3][processBatchTaskRun] Batch processing error", {
662635
batchId: batch.friendlyId,
663636
currentIndex: result.workingIndex,
664637
error: result.error,
@@ -714,7 +687,7 @@ export class BatchTriggerV2Service extends BaseService {
714687
// Grab the next PROCESSING_BATCH_SIZE runIds
715688
const runFriendlyIds = batch.runIds.slice(currentIndex, currentIndex + batchSize);
716689

717-
logger.debug("[BatchTriggerV2][processBatchTaskRun] Processing batch items", {
690+
logger.debug("[BatchTriggerV3][processBatchTaskRun] Processing batch items", {
718691
batchId: batch.friendlyId,
719692
currentIndex,
720693
runIds: runFriendlyIds,
@@ -735,7 +708,7 @@ export class BatchTriggerV2Service extends BaseService {
735708

736709
workingIndex++;
737710
} catch (error) {
738-
logger.error("[BatchTriggerV2][processBatchTaskRun] Failed to process item", {
711+
logger.error("[BatchTriggerV3][processBatchTaskRun] Failed to process item", {
739712
batchId: batch.friendlyId,
740713
currentIndex: workingIndex,
741714
error,
@@ -764,7 +737,7 @@ export class BatchTriggerV2Service extends BaseService {
764737
currentIndex: number,
765738
options?: BatchTriggerTaskServiceOptions
766739
) {
767-
logger.debug("[BatchTriggerV2][processBatchTaskRunItem] Processing item", {
740+
logger.debug("[BatchTriggerV3][processBatchTaskRunItem] Processing item", {
768741
batchId: batch.friendlyId,
769742
runId: task.runFriendlyId,
770743
currentIndex,
@@ -809,7 +782,7 @@ export class BatchTriggerV2Service extends BaseService {
809782
async #enqueueBatchTaskRun(options: BatchProcessingOptions, tx?: PrismaClientOrTransaction) {
810783
await workerQueue.enqueue("v3.processBatchTaskRun", options, {
811784
tx,
812-
jobKey: `BatchTriggerV2Service.process:${options.batchId}:${options.processingId}`,
785+
jobKey: `BatchTriggerV3Service.process:${options.batchId}:${options.processingId}`,
813786
});
814787
}
815788

packages/core/src/v3/apps/friendlyId.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,3 +80,4 @@ export const QueueId = new IdUtil("queue");
8080
export const RunId = new IdUtil("run");
8181
export const SnapshotId = new IdUtil("snapshot");
8282
export const WaitpointId = new IdUtil("waitpoint");
83+
export const BatchId = new IdUtil("batch");

packages/core/src/v3/schemas/api.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,18 @@ export type BatchTriggerTaskItem = z.infer<typeof BatchTriggerTaskItem>;
154154

155155
export const BatchTriggerTaskV2RequestBody = z.object({
156156
items: BatchTriggerTaskItem.array(),
157+
/** @deprecated engine v1 only */
157158
dependentAttempt: z.string().optional(),
159+
/**
160+
* RunEngine v2
161+
* If triggered inside another run, the parentRunId is the friendly ID of the parent run.
162+
*/
163+
parentRunId: z.string().optional(),
164+
/**
165+
* RunEngine v2
166+
* Should be `true` if `triggerAndWait` or `batchTriggerAndWait`
167+
*/
168+
resumeParentOnCompletion: z.boolean().optional(),
158169
});
159170

160171
export type BatchTriggerTaskV2RequestBody = z.infer<typeof BatchTriggerTaskV2RequestBody>;

0 commit comments

Comments
 (0)