diff --git a/apps/webapp/app/routes/api.v1.tasks.batch.ts b/apps/webapp/app/routes/api.v1.tasks.batch.ts index 5b1b89d2d9..c7534acee4 100644 --- a/apps/webapp/app/routes/api.v1.tasks.batch.ts +++ b/apps/webapp/app/routes/api.v1.tasks.batch.ts @@ -1,23 +1,23 @@ import { json } from "@remix-run/server-runtime"; import { - BatchTriggerTaskResponse, BatchTriggerTaskV2RequestBody, BatchTriggerTaskV2Response, generateJWT, } from "@trigger.dev/core/v3"; import { env } from "~/env.server"; +import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server"; +import { logger } from "~/services/logger.server"; import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; -import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger"; import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; +import { determineEngineVersion } from "~/v3/engineVersion.server"; +import { ServiceValidationError } from "~/v3/services/baseService.server"; import { BatchProcessingStrategy, BatchTriggerV2Service, } from "~/v3/services/batchTriggerV2.server"; -import { ServiceValidationError } from "~/v3/services/baseService.server"; +import { BatchTriggerV3Service } from "~/v3/services/batchTriggerV3.server"; import { OutOfEntitlementError } from "~/v3/services/triggerTask.server"; -import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server"; -import { logger } from "~/services/logger.server"; -import { z } from "zod"; +import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger"; const { action, loader } = createActionApiRoute( { @@ -87,7 +87,11 @@ const { action, loader } = createActionApiRoute( resolveIdempotencyKeyTTL(idempotencyKeyTTL) ?? new Date(Date.now() + 24 * 60 * 60 * 1000 * 30); - const service = new BatchTriggerV2Service(batchProcessingStrategy ?? undefined); + const version = await determineEngineVersion({ environment: authentication.environment }); + const service = + version === "V1" + ? new BatchTriggerV2Service(batchProcessingStrategy ?? undefined) + : new BatchTriggerV3Service(batchProcessingStrategy ?? undefined); try { const batch = await service.call(authentication.environment, body, { diff --git a/apps/webapp/app/services/worker.server.ts b/apps/webapp/app/services/worker.server.ts index c2409cf8c5..3eea668a6e 100644 --- a/apps/webapp/app/services/worker.server.ts +++ b/apps/webapp/app/services/worker.server.ts @@ -56,6 +56,10 @@ import { } from "~/v3/services/cancelDevSessionRuns.server"; import { logger } from "./logger.server"; import { BatchProcessingOptions, BatchTriggerV2Service } from "~/v3/services/batchTriggerV2.server"; +import { + BatchProcessingOptions as BatchProcessingOptionsV3, + BatchTriggerV3Service, +} from "~/v3/services/batchTriggerV3.server"; const workerCatalog = { indexEndpoint: z.object({ @@ -199,6 +203,7 @@ const workerCatalog = { }), "v3.cancelDevSessionRuns": CancelDevSessionRunsServiceOptions, "v3.processBatchTaskRun": BatchProcessingOptions, + "v3.processBatchTaskRunV3": BatchProcessingOptionsV3, }; const executionWorkerCatalog = { @@ -735,6 +740,15 @@ function getWorkerQueue() { handler: async (payload, job) => { const service = new BatchTriggerV2Service(payload.strategy); + await service.processBatchTaskRun(payload); + }, + }, + "v3.processBatchTaskRunV3": { + priority: 0, + maxAttempts: 5, + handler: async (payload, job) => { + const service = new BatchTriggerV3Service(payload.strategy); + await service.processBatchTaskRun(payload); }, }, diff --git a/apps/webapp/app/v3/services/batchTriggerV2.server.ts b/apps/webapp/app/v3/services/batchTriggerV2.server.ts index ff39679fc5..8c7bbf411f 100644 --- a/apps/webapp/app/v3/services/batchTriggerV2.server.ts +++ b/apps/webapp/app/v3/services/batchTriggerV2.server.ts @@ -49,6 +49,9 @@ export type BatchTriggerTaskServiceOptions = { oneTimeUseToken?: string; }; +/** + * Larger batches, used in Run Engine v1 + */ export class BatchTriggerV2Service extends BaseService { private _batchProcessingStrategy: BatchProcessingStrategy; @@ -787,7 +790,8 @@ export class BatchTriggerV2Service extends BaseService { batchId: batch.friendlyId, skipChecks: true, runFriendlyId: task.runFriendlyId, - } + }, + "V1" ); if (!run) { diff --git a/apps/webapp/app/v3/services/batchTriggerV3.server.ts b/apps/webapp/app/v3/services/batchTriggerV3.server.ts new file mode 100644 index 0000000000..66c259f83a --- /dev/null +++ b/apps/webapp/app/v3/services/batchTriggerV3.server.ts @@ -0,0 +1,914 @@ +import { + BatchTriggerTaskV2RequestBody, + BatchTriggerTaskV2Response, + IOPacket, + packetRequiresOffloading, + parsePacket, +} from "@trigger.dev/core/v3"; +import { BatchId, RunId } from "@trigger.dev/core/v3/apps"; +import { BatchTaskRun, Prisma } from "@trigger.dev/database"; +import { z } from "zod"; +import { $transaction, prisma, PrismaClientOrTransaction } from "~/db.server"; +import { env } from "~/env.server"; +import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; +import { logger } from "~/services/logger.server"; +import { getEntitlement } from "~/services/platform.v3.server"; +import { workerQueue } from "~/services/worker.server"; +import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../r2.server"; +import { startActiveSpan } from "../tracer.server"; +import { ServiceValidationError, WithRunEngine } from "./baseService.server"; +import { OutOfEntitlementError, TriggerTaskService } from "./triggerTask.server"; +import { guardQueueSizeLimitsForEnv } from "./triggerTaskV2.server"; + +const PROCESSING_BATCH_SIZE = 50; +const ASYNC_BATCH_PROCESS_SIZE_THRESHOLD = 20; +const MAX_ATTEMPTS = 10; + +export const BatchProcessingStrategy = z.enum(["sequential", "parallel"]); +export type BatchProcessingStrategy = z.infer; + +export const BatchProcessingOptions = z.object({ + batchId: z.string(), + processingId: z.string(), + range: z.object({ start: z.number().int(), count: z.number().int() }), + attemptCount: z.number().int(), + strategy: BatchProcessingStrategy, + parentRunId: z.string().optional(), + resumeParentOnCompletion: z.boolean().optional(), +}); + +export type BatchProcessingOptions = z.infer; + +export type BatchTriggerTaskServiceOptions = { + idempotencyKey?: string; + idempotencyKeyExpiresAt?: Date; + triggerVersion?: string; + traceContext?: Record; + spanParentAsLink?: boolean; + oneTimeUseToken?: string; +}; + +/** + * Larger batches, used in Run Engine v2 + */ +export class BatchTriggerV3Service extends WithRunEngine { + private _batchProcessingStrategy: BatchProcessingStrategy; + + constructor( + batchProcessingStrategy?: BatchProcessingStrategy, + protected readonly _prisma: PrismaClientOrTransaction = prisma + ) { + super({ prisma }); + + this._batchProcessingStrategy = batchProcessingStrategy ?? "parallel"; + } + + public async call( + environment: AuthenticatedEnvironment, + body: BatchTriggerTaskV2RequestBody, + options: BatchTriggerTaskServiceOptions = {} + ): Promise { + try { + return await this.traceWithEnv( + "call()", + environment, + async (span) => { + const existingBatch = options.idempotencyKey + ? await this._prisma.batchTaskRun.findUnique({ + where: { + runtimeEnvironmentId_idempotencyKey: { + runtimeEnvironmentId: environment.id, + idempotencyKey: options.idempotencyKey, + }, + }, + }) + : undefined; + + if (existingBatch) { + if ( + existingBatch.idempotencyKeyExpiresAt && + existingBatch.idempotencyKeyExpiresAt < new Date() + ) { + logger.debug("[BatchTriggerV3][call] Idempotency key has expired", { + idempotencyKey: options.idempotencyKey, + batch: { + id: existingBatch.id, + friendlyId: existingBatch.friendlyId, + runCount: existingBatch.runCount, + idempotencyKeyExpiresAt: existingBatch.idempotencyKeyExpiresAt, + idempotencyKey: existingBatch.idempotencyKey, + }, + }); + + // Update the existing batch to remove the idempotency key + await this._prisma.batchTaskRun.update({ + where: { id: existingBatch.id }, + data: { idempotencyKey: null }, + }); + + // Don't return, just continue with the batch trigger + } else { + span.setAttribute("batchId", existingBatch.friendlyId); + + return this.#respondWithExistingBatch( + existingBatch, + environment, + body.resumeParentOnCompletion ? body.parentRunId : undefined + ); + } + } + + const { id, friendlyId } = BatchId.generate(); + + span.setAttribute("batchId", friendlyId); + + if (environment.type !== "DEVELOPMENT") { + const result = await getEntitlement(environment.organizationId); + if (result && result.hasAccess === false) { + throw new OutOfEntitlementError(); + } + } + + const idempotencyKeys = body.items.map((i) => i.options?.idempotencyKey).filter(Boolean); + + const cachedRuns = + idempotencyKeys.length > 0 + ? await this._prisma.taskRun.findMany({ + where: { + runtimeEnvironmentId: environment.id, + idempotencyKey: { + in: body.items.map((i) => i.options?.idempotencyKey).filter(Boolean), + }, + }, + select: { + friendlyId: true, + idempotencyKey: true, + idempotencyKeyExpiresAt: true, + }, + }) + : []; + + if (cachedRuns.length) { + logger.debug("[BatchTriggerV3][call] Found cached runs", { + cachedRuns, + batchId: friendlyId, + }); + } + + // Now we need to create an array of all the run IDs, in order + // If we have a cached run, that isn't expired, we should use that run ID + // If we have a cached run, that is expired, we should generate a new run ID and save that cached run ID to a set of expired run IDs + // If we don't have a cached run, we should generate a new run ID + const expiredRunIds = new Set(); + let cachedRunCount = 0; + + const runs = body.items.map((item) => { + const cachedRun = cachedRuns.find( + (r) => r.idempotencyKey === item.options?.idempotencyKey + ); + + const runId = RunId.generate(); + + if (cachedRun) { + if ( + cachedRun.idempotencyKeyExpiresAt && + cachedRun.idempotencyKeyExpiresAt < new Date() + ) { + expiredRunIds.add(cachedRun.friendlyId); + + return { + id: runId.friendlyId, + isCached: false, + idempotencyKey: item.options?.idempotencyKey ?? undefined, + taskIdentifier: item.task, + }; + } + + cachedRunCount++; + + return { + id: cachedRun.friendlyId, + isCached: true, + idempotencyKey: item.options?.idempotencyKey ?? undefined, + taskIdentifier: item.task, + }; + } + + return { + id: runId.friendlyId, + isCached: false, + idempotencyKey: item.options?.idempotencyKey ?? undefined, + taskIdentifier: item.task, + }; + }); + + //block the parent with any existing children + if (body.resumeParentOnCompletion && body.parentRunId) { + const existingChildFriendlyIds = runs.flatMap((r) => (r.isCached ? [r.id] : [])); + + if (existingChildFriendlyIds.length > 0) { + await this.#blockParentRun({ + parentRunId: body.parentRunId, + childFriendlyIds: existingChildFriendlyIds, + environment, + }); + } + } + + // Calculate how many new runs we need to create + const newRunCount = body.items.length - cachedRunCount; + + if (newRunCount === 0) { + logger.debug("[BatchTriggerV3][call] All runs are cached", { + batchId: friendlyId, + }); + + await this._prisma.batchTaskRun.create({ + data: { + friendlyId, + runtimeEnvironmentId: environment.id, + idempotencyKey: options.idempotencyKey, + idempotencyKeyExpiresAt: options.idempotencyKeyExpiresAt, + runCount: body.items.length, + runIds: runs.map((r) => r.id), + //todo is this correct? Surely some of the runs could still be in progress? + status: "COMPLETED", + batchVersion: "v2", + oneTimeUseToken: options.oneTimeUseToken, + }, + }); + + return { + id: friendlyId, + isCached: false, + idempotencyKey: options.idempotencyKey ?? undefined, + runs, + }; + } + + const queueSizeGuard = await guardQueueSizeLimitsForEnv( + this._engine, + environment, + newRunCount + ); + + logger.debug("Queue size guard result", { + newRunCount, + queueSizeGuard, + environment: { + id: environment.id, + type: environment.type, + organization: environment.organization, + project: environment.project, + }, + }); + + if (!queueSizeGuard.isWithinLimits) { + throw new ServiceValidationError( + `Cannot trigger ${newRunCount} tasks as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}` + ); + } + + // Expire the cached runs that are no longer valid + if (expiredRunIds.size) { + logger.debug("Expiring cached runs", { + expiredRunIds: Array.from(expiredRunIds), + batchId: friendlyId, + }); + + // TODO: is there a limit to the number of items we can update in a single query? + await this._prisma.taskRun.updateMany({ + where: { friendlyId: { in: Array.from(expiredRunIds) } }, + data: { idempotencyKey: null }, + }); + } + + // Upload to object store + const payloadPacket = await this.#handlePayloadPacket( + body.items, + `batch/${friendlyId}`, + environment + ); + + const batch = await this.#createAndProcessBatchTaskRun( + friendlyId, + runs, + payloadPacket, + newRunCount, + environment, + body, + options + ); + + if (!batch) { + throw new Error("Failed to create batch"); + } + + return { + id: batch.friendlyId, + isCached: false, + idempotencyKey: batch.idempotencyKey ?? undefined, + runs, + }; + } + ); + } catch (error) { + // Detect a prisma transaction Unique constraint violation + if (error instanceof Prisma.PrismaClientKnownRequestError) { + logger.debug("BatchTriggerV3: Prisma transaction error", { + code: error.code, + message: error.message, + meta: error.meta, + }); + + if (error.code === "P2002") { + const target = error.meta?.target; + + if ( + Array.isArray(target) && + target.length > 0 && + typeof target[0] === "string" && + target[0].includes("oneTimeUseToken") + ) { + throw new ServiceValidationError( + "Cannot batch trigger with a one-time use token as it has already been used." + ); + } else { + throw new ServiceValidationError( + "Cannot batch trigger as it has already been triggered with the same idempotency key." + ); + } + } + } + + throw error; + } + } + + async #createAndProcessBatchTaskRun( + batchId: string, + runs: Array<{ + id: string; + isCached: boolean; + idempotencyKey: string | undefined; + taskIdentifier: string; + }>, + payloadPacket: IOPacket, + newRunCount: number, + environment: AuthenticatedEnvironment, + body: BatchTriggerTaskV2RequestBody, + options: BatchTriggerTaskServiceOptions = {} + ) { + if (newRunCount <= ASYNC_BATCH_PROCESS_SIZE_THRESHOLD) { + const batch = await this._prisma.batchTaskRun.create({ + data: { + friendlyId: batchId, + runtimeEnvironmentId: environment.id, + idempotencyKey: options.idempotencyKey, + idempotencyKeyExpiresAt: options.idempotencyKeyExpiresAt, + runCount: newRunCount, + runIds: runs.map((r) => r.id), + payload: payloadPacket.data, + payloadType: payloadPacket.dataType, + options, + batchVersion: "v2", + oneTimeUseToken: options.oneTimeUseToken, + }, + }); + + const result = await this.#processBatchTaskRunItems({ + batch, + environment, + currentIndex: 0, + batchSize: PROCESSING_BATCH_SIZE, + items: body.items, + options, + parentRunId: body.parentRunId, + resumeParentOnCompletion: body.resumeParentOnCompletion, + }); + + switch (result.status) { + case "COMPLETE": { + logger.debug("[BatchTriggerV3][call] Batch inline processing complete", { + batchId: batch.friendlyId, + currentIndex: 0, + }); + + return batch; + } + case "INCOMPLETE": { + logger.debug("[BatchTriggerV3][call] Batch inline processing incomplete", { + batchId: batch.friendlyId, + currentIndex: result.workingIndex, + }); + + // If processing inline does not finish for some reason, enqueue processing the rest of the batch + await this.#enqueueBatchTaskRun({ + batchId: batch.id, + processingId: "0", + range: { + start: result.workingIndex, + count: PROCESSING_BATCH_SIZE, + }, + attemptCount: 0, + strategy: "sequential", + parentRunId: body.parentRunId, + resumeParentOnCompletion: body.resumeParentOnCompletion, + }); + + return batch; + } + case "ERROR": { + logger.error("[BatchTriggerV3][call] Batch inline processing error", { + batchId: batch.friendlyId, + currentIndex: result.workingIndex, + error: result.error, + }); + + await this.#enqueueBatchTaskRun({ + batchId: batch.id, + processingId: "0", + range: { + start: result.workingIndex, + count: PROCESSING_BATCH_SIZE, + }, + attemptCount: 0, + strategy: "sequential", + parentRunId: body.parentRunId, + resumeParentOnCompletion: body.resumeParentOnCompletion, + }); + + return batch; + } + } + } else { + return await $transaction(this._prisma, async (tx) => { + const batch = await tx.batchTaskRun.create({ + data: { + friendlyId: batchId, + runtimeEnvironmentId: environment.id, + idempotencyKey: options.idempotencyKey, + idempotencyKeyExpiresAt: options.idempotencyKeyExpiresAt, + runCount: body.items.length, + runIds: runs.map((r) => r.id), + payload: payloadPacket.data, + payloadType: payloadPacket.dataType, + options, + batchVersion: "v2", + oneTimeUseToken: options.oneTimeUseToken, + }, + }); + + switch (this._batchProcessingStrategy) { + case "sequential": { + await this.#enqueueBatchTaskRun({ + batchId: batch.id, + processingId: batchId, + range: { start: 0, count: PROCESSING_BATCH_SIZE }, + attemptCount: 0, + strategy: this._batchProcessingStrategy, + parentRunId: body.parentRunId, + resumeParentOnCompletion: body.resumeParentOnCompletion, + }); + + break; + } + case "parallel": { + const ranges = Array.from({ + length: Math.ceil(newRunCount / PROCESSING_BATCH_SIZE), + }).map((_, index) => ({ + start: index * PROCESSING_BATCH_SIZE, + count: PROCESSING_BATCH_SIZE, + })); + + await Promise.all( + ranges.map((range, index) => + this.#enqueueBatchTaskRun( + { + batchId: batch.id, + processingId: `${index}`, + range, + attemptCount: 0, + strategy: this._batchProcessingStrategy, + parentRunId: body.parentRunId, + resumeParentOnCompletion: body.resumeParentOnCompletion, + }, + tx + ) + ) + ); + + break; + } + } + + return batch; + }); + } + } + + async #respondWithExistingBatch( + batch: BatchTaskRun, + environment: AuthenticatedEnvironment, + blockParentRunId: string | undefined + ): Promise { + // Resolve the payload + const payloadPacket = await downloadPacketFromObjectStore( + { + data: batch.payload ?? undefined, + dataType: batch.payloadType, + }, + environment + ); + + const payload = await parsePacket(payloadPacket).then( + (p) => p as BatchTriggerTaskV2RequestBody["items"] + ); + + const runs = batch.runIds.map((id, index) => { + const item = payload[index]; + + return { + id, + taskIdentifier: item.task, + isCached: true, + idempotencyKey: item.options?.idempotencyKey ?? undefined, + }; + }); + + //block the parent with all of the children + if (blockParentRunId) { + await this.#blockParentRun({ + parentRunId: blockParentRunId, + childFriendlyIds: batch.runIds, + environment, + }); + } + + return { + id: batch.friendlyId, + idempotencyKey: batch.idempotencyKey ?? undefined, + isCached: true, + runs, + }; + } + + async processBatchTaskRun(options: BatchProcessingOptions) { + logger.debug("[BatchTriggerV3][processBatchTaskRun] Processing batch", { + options, + }); + + const $attemptCount = options.attemptCount + 1; + + // Add early return if max attempts reached + if ($attemptCount > MAX_ATTEMPTS) { + logger.error("[BatchTriggerV3][processBatchTaskRun] Max attempts reached", { + options, + attemptCount: $attemptCount, + }); + // You might want to update the batch status to failed here + return; + } + + const batch = await this._prisma.batchTaskRun.findFirst({ + where: { id: options.batchId }, + include: { + runtimeEnvironment: { + include: { + project: true, + organization: true, + }, + }, + }, + }); + + if (!batch) { + return; + } + + // Check to make sure the currentIndex is not greater than the runCount + if (options.range.start >= batch.runCount) { + logger.debug("[BatchTriggerV3][processBatchTaskRun] currentIndex is greater than runCount", { + options, + batchId: batch.friendlyId, + runCount: batch.runCount, + attemptCount: $attemptCount, + }); + + return; + } + + // Resolve the payload + const payloadPacket = await downloadPacketFromObjectStore( + { + data: batch.payload ?? undefined, + dataType: batch.payloadType, + }, + batch.runtimeEnvironment + ); + + const payload = await parsePacket(payloadPacket); + + if (!payload) { + logger.debug("[BatchTriggerV3][processBatchTaskRun] Failed to parse payload", { + options, + batchId: batch.friendlyId, + attemptCount: $attemptCount, + }); + + throw new Error("Failed to parse payload"); + } + + // Skip zod parsing + const $payload = payload as BatchTriggerTaskV2RequestBody["items"]; + const $options = batch.options as BatchTriggerTaskServiceOptions; + + const result = await this.#processBatchTaskRunItems({ + batch, + environment: batch.runtimeEnvironment, + currentIndex: options.range.start, + batchSize: options.range.count, + items: $payload, + options: $options, + }); + + switch (result.status) { + case "COMPLETE": { + logger.debug("[BatchTriggerV3][processBatchTaskRun] Batch processing complete", { + options, + batchId: batch.friendlyId, + attemptCount: $attemptCount, + }); + + return; + } + case "INCOMPLETE": { + logger.debug("[BatchTriggerV3][processBatchTaskRun] Batch processing incomplete", { + batchId: batch.friendlyId, + currentIndex: result.workingIndex, + attemptCount: $attemptCount, + }); + + // Only enqueue the next batch task run if the strategy is sequential + // if the strategy is parallel, we will already have enqueued the next batch task run + if (options.strategy === "sequential") { + await this.#enqueueBatchTaskRun({ + batchId: batch.id, + processingId: options.processingId, + range: { + start: result.workingIndex, + count: options.range.count, + }, + attemptCount: 0, + strategy: options.strategy, + parentRunId: options.parentRunId, + resumeParentOnCompletion: options.resumeParentOnCompletion, + }); + } + + return; + } + case "ERROR": { + logger.error("[BatchTriggerV3][processBatchTaskRun] Batch processing error", { + batchId: batch.friendlyId, + currentIndex: result.workingIndex, + error: result.error, + attemptCount: $attemptCount, + }); + + // if the strategy is sequential, we will requeue processing with a count of the PROCESSING_BATCH_SIZE + // if the strategy is parallel, we will requeue processing with a range starting at the workingIndex and a count that is the remainder of this "slice" of the batch + if (options.strategy === "sequential") { + await this.#enqueueBatchTaskRun({ + batchId: batch.id, + processingId: options.processingId, + range: { + start: result.workingIndex, + count: options.range.count, // This will be the same as the original count + }, + attemptCount: $attemptCount, + strategy: options.strategy, + parentRunId: options.parentRunId, + resumeParentOnCompletion: options.resumeParentOnCompletion, + }); + } else { + await this.#enqueueBatchTaskRun({ + batchId: batch.id, + processingId: options.processingId, + range: { + start: result.workingIndex, + // This will be the remainder of the slice + // for example if the original range was 0-50 and the workingIndex is 25, the new range will be 25-25 + // if the original range was 51-100 and the workingIndex is 75, the new range will be 75-25 + count: options.range.count - result.workingIndex - options.range.start, + }, + attemptCount: $attemptCount, + strategy: options.strategy, + parentRunId: options.parentRunId, + resumeParentOnCompletion: options.resumeParentOnCompletion, + }); + } + + return; + } + } + } + + async #processBatchTaskRunItems({ + batch, + environment, + currentIndex, + batchSize, + items, + options, + parentRunId, + resumeParentOnCompletion, + }: { + batch: BatchTaskRun; + environment: AuthenticatedEnvironment; + currentIndex: number; + batchSize: number; + items: BatchTriggerTaskV2RequestBody["items"]; + options?: BatchTriggerTaskServiceOptions; + parentRunId?: string | undefined; + resumeParentOnCompletion?: boolean | undefined; + }): Promise< + | { status: "COMPLETE" } + | { status: "INCOMPLETE"; workingIndex: number } + | { status: "ERROR"; error: string; workingIndex: number } + > { + // Grab the next PROCESSING_BATCH_SIZE runIds + const runFriendlyIds = batch.runIds.slice(currentIndex, currentIndex + batchSize); + + logger.debug("[BatchTriggerV3][processBatchTaskRun] Processing batch items", { + batchId: batch.friendlyId, + currentIndex, + runIds: runFriendlyIds, + runCount: batch.runCount, + }); + + // Combine the "window" between currentIndex and currentIndex + PROCESSING_BATCH_SIZE with the runId and the item in the payload which is an array + const itemsToProcess = runFriendlyIds.map((runFriendlyId, index) => ({ + runFriendlyId, + item: items[index + currentIndex], + })); + + let workingIndex = currentIndex; + + for (const item of itemsToProcess) { + try { + await this.#processBatchTaskRunItem({ + batch, + environment, + task: item, + currentIndex: workingIndex, + options, + parentRunId, + resumeParentOnCompletion, + }); + + workingIndex++; + } catch (error) { + logger.error("[BatchTriggerV3][processBatchTaskRun] Failed to process item", { + batchId: batch.friendlyId, + currentIndex: workingIndex, + error, + }); + + return { + status: "ERROR", + error: error instanceof Error ? error.message : String(error), + workingIndex, + }; + } + } + + // if there are more items to process, requeue the batch + if (workingIndex < batch.runCount) { + return { status: "INCOMPLETE", workingIndex }; + } + + return { status: "COMPLETE" }; + } + + async #processBatchTaskRunItem({ + batch, + environment, + task, + currentIndex, + options, + parentRunId, + resumeParentOnCompletion, + }: { + batch: BatchTaskRun; + environment: AuthenticatedEnvironment; + task: { runFriendlyId: string; item: BatchTriggerTaskV2RequestBody["items"][number] }; + currentIndex: number; + options?: BatchTriggerTaskServiceOptions; + parentRunId: string | undefined; + resumeParentOnCompletion: boolean | undefined; + }) { + logger.debug("[BatchTriggerV3][processBatchTaskRunItem] Processing item", { + batchId: batch.friendlyId, + runId: task.runFriendlyId, + currentIndex, + }); + + const triggerTaskService = new TriggerTaskService(); + + await triggerTaskService.call( + task.item.task, + environment, + { + ...task.item, + options: { + ...task.item.options, + parentRunId, + resumeParentOnCompletion, + }, + }, + { + triggerVersion: options?.triggerVersion, + traceContext: options?.traceContext, + spanParentAsLink: options?.spanParentAsLink, + batchId: batch.friendlyId, + skipChecks: true, + runFriendlyId: task.runFriendlyId, + }, + "V2" + ); + } + + async #enqueueBatchTaskRun(options: BatchProcessingOptions, tx?: PrismaClientOrTransaction) { + await workerQueue.enqueue("v3.processBatchTaskRunV3", options, { + tx, + jobKey: `BatchTriggerV3Service.process:${options.batchId}:${options.processingId}`, + }); + } + + async #handlePayloadPacket( + payload: any, + pathPrefix: string, + environment: AuthenticatedEnvironment + ) { + return await startActiveSpan("handlePayloadPacket()", async (span) => { + const packet = { data: JSON.stringify(payload), dataType: "application/json" }; + + if (!packet.data) { + return packet; + } + + const { needsOffloading } = packetRequiresOffloading( + packet, + env.TASK_PAYLOAD_OFFLOAD_THRESHOLD + ); + + if (!needsOffloading) { + return packet; + } + + const filename = `${pathPrefix}/payload.json`; + + await uploadPacketToObjectStore(filename, packet.data, packet.dataType, environment); + + return { + data: filename, + dataType: "application/store", + }; + }); + } + + async #blockParentRun({ + parentRunId, + childFriendlyIds, + environment, + }: { + parentRunId: string; + childFriendlyIds: string[]; + environment: AuthenticatedEnvironment; + }) { + const runsWithAssociatedWaitpoints = await this._prisma.taskRun.findMany({ + where: { + id: { + in: childFriendlyIds.map((r) => RunId.fromFriendlyId(r)), + }, + }, + select: { + associatedWaitpoint: { + select: { + id: true, + }, + }, + }, + }); + + await this._engine.blockRunWithWaitpoint({ + runId: RunId.fromFriendlyId(parentRunId), + waitpointId: runsWithAssociatedWaitpoints.flatMap((r) => + r.associatedWaitpoint ? [r.associatedWaitpoint.id] : [] + ), + environmentId: environment.id, + projectId: environment.projectId, + }); + } +} diff --git a/apps/webapp/app/v3/services/triggerTask.server.ts b/apps/webapp/app/v3/services/triggerTask.server.ts index 5c7f1ad753..55ed259b8d 100644 --- a/apps/webapp/app/v3/services/triggerTask.server.ts +++ b/apps/webapp/app/v3/services/triggerTask.server.ts @@ -31,14 +31,15 @@ export class TriggerTaskService extends WithRunEngine { taskId: string, environment: AuthenticatedEnvironment, body: TriggerTaskRequestBody, - options: TriggerTaskServiceOptions = {} + options: TriggerTaskServiceOptions = {}, + version?: RunEngineVersion ) { return await this.traceWithEnv("call()", environment, async (span) => { span.setAttribute("taskId", taskId); - const version = await determineEngineVersion({ environment }); + const v = await determineEngineVersion({ environment, version }); - switch (version) { + switch (v) { case "V1": { return await this.callV1(taskId, environment, body, options); } diff --git a/apps/webapp/app/v3/services/triggerTaskV2.server.ts b/apps/webapp/app/v3/services/triggerTaskV2.server.ts index 781c45cec7..cc00491933 100644 --- a/apps/webapp/app/v3/services/triggerTaskV2.server.ts +++ b/apps/webapp/app/v3/services/triggerTaskV2.server.ts @@ -99,7 +99,6 @@ export class TriggerTaskServiceV2 extends WithRunEngine { waitpointId: existingRun.associatedWaitpoint.id, environmentId: environment.id, projectId: environment.projectId, - checkWaitpointIsPending: true, tx: this._prisma, }); } @@ -164,7 +163,7 @@ export class TriggerTaskServiceV2 extends WithRunEngine { //todo we will pass in the `parentRun` and `resumeParentOnCompletion` const parentRun = body.options?.parentRunId ? await this._prisma.taskRun.findFirst({ - where: { friendlyId: body.options.parentRunId }, + where: { id: RunId.fromFriendlyId(body.options.parentRunId) }, }) : undefined; @@ -299,7 +298,7 @@ export class TriggerTaskServiceV2 extends WithRunEngine { tags, oneTimeUseToken: options.oneTimeUseToken, parentTaskRunId: parentRun?.id, - rootTaskRunId: parentRun?.rootTaskRunId ?? undefined, + rootTaskRunId: parentRun?.rootTaskRunId ?? parentRun?.id, batchId: body.options?.parentBatch ?? undefined, resumeParentOnCompletion: body.options?.resumeParentOnCompletion, depth, diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index d4358b7786..f54fe3435f 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -1577,20 +1577,19 @@ export class RunEngine { waitpointId, projectId, failAfter, - checkWaitpointIsPending = false, tx, }: { runId: string; - waitpointId: string; + waitpointId: string | string[]; environmentId: string; projectId: string; failAfter?: Date; - /** If the waitpoint could be completed, i.e. not inside a run lock and not new */ - checkWaitpointIsPending?: boolean; tx?: PrismaClientOrTransaction; }): Promise { const prisma = tx ?? this.prisma; + let waitpointIds = typeof waitpointId === "string" ? [waitpointId] : waitpointId; + return await this.runLock.lock([runId], 5000, async (signal) => { let snapshot: TaskRunExecutionSnapshot = await getLatestExecutionSnapshot(prisma, runId); @@ -1602,29 +1601,25 @@ export class RunEngine { newStatus = "EXECUTING_WITH_WAITPOINTS"; } - if (checkWaitpointIsPending) { - const waitpoint = await prisma.waitpoint.findUnique({ - where: { id: waitpointId }, - }); - - if (!waitpoint) { - throw new ServiceValidationError("Waitpoint not found", 404); - } - - //the waitpoint has been completed since it was retrieved - if (waitpoint.status !== "PENDING") { - return snapshot; - } + const insertedBlockers = await prisma.$executeRaw` + INSERT INTO "TaskRunWaitpoint" ("id", "taskRunId", "waitpointId", "projectId", "createdAt", "updatedAt") + SELECT + gen_random_uuid(), + ${runId}, + w.id, + ${projectId}, + NOW(), + NOW() + FROM "Waitpoint" w + WHERE w.id IN (${Prisma.join(waitpointIds)}) + AND w.status = 'PENDING' + ON CONFLICT DO NOTHING;`; + + //if no runs were blocked we don't want to do anything more + if (insertedBlockers === 0) { + return snapshot; } - const taskWaitpoint = await prisma.taskRunWaitpoint.create({ - data: { - taskRunId: runId, - waitpointId: waitpointId, - projectId: projectId, - }, - }); - //if the state has changed, create a new snapshot if (newStatus !== snapshot.executionStatus) { snapshot = await this.#createExecutionSnapshot(prisma, { @@ -1641,12 +1636,14 @@ export class RunEngine { } if (failAfter) { - await this.worker.enqueue({ - id: `finishWaitpoint.${waitpointId}`, - job: "finishWaitpoint", - payload: { waitpointId, error: "Waitpoint timed out" }, - availableAt: failAfter, - }); + for (const waitpointId of waitpointIds) { + await this.worker.enqueue({ + id: `finishWaitpoint.${waitpointId}`, + job: "finishWaitpoint", + payload: { waitpointId, error: "Waitpoint timed out" }, + availableAt: failAfter, + }); + } } return snapshot; diff --git a/packages/core/src/v3/apps/friendlyId.ts b/packages/core/src/v3/apps/friendlyId.ts index f982ec280d..c95813eb48 100644 --- a/packages/core/src/v3/apps/friendlyId.ts +++ b/packages/core/src/v3/apps/friendlyId.ts @@ -80,3 +80,4 @@ export const QueueId = new IdUtil("queue"); export const RunId = new IdUtil("run"); export const SnapshotId = new IdUtil("snapshot"); export const WaitpointId = new IdUtil("waitpoint"); +export const BatchId = new IdUtil("batch"); diff --git a/packages/core/src/v3/runtime/managedRuntimeManager.ts b/packages/core/src/v3/runtime/managedRuntimeManager.ts index e29e06b5c9..90eddcd5e7 100644 --- a/packages/core/src/v3/runtime/managedRuntimeManager.ts +++ b/packages/core/src/v3/runtime/managedRuntimeManager.ts @@ -68,8 +68,6 @@ export class ManagedRuntimeManager implements RuntimeManager { runs: string[]; ctx: TaskRunContext; }): Promise { - console.log("waitForBatch", params); - if (!params.runs.length) { return Promise.resolve({ id: params.id, items: [] }); } diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index 2c52f740a0..609f7c8e15 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -154,7 +154,18 @@ export type BatchTriggerTaskItem = z.infer; export const BatchTriggerTaskV2RequestBody = z.object({ items: BatchTriggerTaskItem.array(), + /** @deprecated engine v1 only */ dependentAttempt: z.string().optional(), + /** + * RunEngine v2 + * If triggered inside another run, the parentRunId is the friendly ID of the parent run. + */ + parentRunId: z.string().optional(), + /** + * RunEngine v2 + * Should be `true` if `triggerAndWait` or `batchTriggerAndWait` + */ + resumeParentOnCompletion: z.boolean().optional(), }); export type BatchTriggerTaskV2RequestBody = z.infer; diff --git a/packages/trigger-sdk/src/v3/shared.ts b/packages/trigger-sdk/src/v3/shared.ts index 9bfca56ce4..86e0175662 100644 --- a/packages/trigger-sdk/src/v3/shared.ts +++ b/packages/trigger-sdk/src/v3/shared.ts @@ -617,6 +617,7 @@ export async function batchTriggerById( }; }) ), + parentRunId: taskContext.ctx?.run.id, }, { spanParentAsLink: true, @@ -791,6 +792,8 @@ export async function batchTriggerByIdAndWait( }) ), dependentAttempt: ctx.attempt.id, + parentRunId: ctx.run.id, + resumeParentOnCompletion: true, }, { processingStrategy: options?.triggerSequentially ? "sequential" : undefined, @@ -951,6 +954,7 @@ export async function batchTriggerTasks( }; }) ), + parentRunId: taskContext.ctx?.run.id, }, { spanParentAsLink: true, @@ -1127,6 +1131,8 @@ export async function batchTriggerAndWaitTasks( parentAttempt: taskContext.ctx?.attempt.id, metadata: options?.metadata, maxDuration: options?.maxDuration, + parentRunId: taskContext.ctx?.run.id, }, }, { @@ -1234,6 +1241,8 @@ async function batchTrigger_internal( ): Promise> { const apiClient = apiClientManager.clientOrThrow(); + const ctx = taskContext.ctx; + const response = await apiClient.batchTriggerV2( { items: await Promise.all( @@ -1259,6 +1268,7 @@ async function batchTrigger_internal( parentAttempt: taskContext.ctx?.attempt.id, metadata: item.options?.metadata, maxDuration: item.options?.maxDuration, + parentRunId: ctx?.run.id, }, }; }) @@ -1430,13 +1440,13 @@ async function batchTriggerAndWait_internal { + logger.log("Hello, world from the parent", { payload }); + + const results = await childTask.batchTriggerAndWait([ + { payload: { message: "Hello, world!" } }, + { payload: { message: "Hello, world 2!" } }, + ]); + logger.log("Results", { results }); + + const results2 = await batch.triggerAndWait([ + { id: "child", payload: { message: "Hello, world !" } }, + { id: "child", payload: { message: "Hello, world 2!" } }, + ]); + logger.log("Results 2", { results2 }); + + const results3 = await batch.triggerByTask([ + { task: childTask, payload: { message: "Hello, world !" } }, + { task: childTask, payload: { message: "Hello, world 2!" } }, + ]); + logger.log("Results 3", { results3 }); + + const results4 = await batch.triggerByTaskAndWait([ + { task: childTask, payload: { message: "Hello, world !" } }, + { task: childTask, payload: { message: "Hello, world 2!" } }, + ]); + logger.log("Results 4", { results4 }); + }, +}); + export const childTask = task({ id: "child", - run: async (payload: any, { ctx }) => { - logger.info("Hello, world from the child", { payload }); + run: async ( + { message, failureChance = 0.3 }: { message?: string; failureChance?: number }, + { ctx } + ) => { + logger.info("Hello, world from the child", { message, failureChance }); - if (Math.random() > 0.5) { + if (Math.random() < failureChance) { throw new Error("Random error at start"); } - await setTimeout(10000); + await setTimeout(3_000); - if (Math.random() > 0.5) { + if (Math.random() < failureChance) { throw new Error("Random error at end"); } + + return { + message, + }; }, });