diff --git a/.changeset/bright-lizards-wait.md b/.changeset/bright-lizards-wait.md new file mode 100644 index 0000000000..cb20871149 --- /dev/null +++ b/.changeset/bright-lizards-wait.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/sdk": patch +--- + +Add one-time use public tokens to trigger and batch trigger diff --git a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts index f45e49ad10..e6e3398e69 100644 --- a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts +++ b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts @@ -3,7 +3,7 @@ import { generateJWT as internal_generateJWT, TriggerTaskRequestBody } from "@tr import { TaskRun } from "@trigger.dev/database"; import { z } from "zod"; import { env } from "~/env.server"; -import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; +import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; @@ -33,7 +33,7 @@ const { action, loader } = createActionApiRoute( allowJWT: true, maxContentLength: env.TASK_PAYLOAD_MAXIMUM_SIZE, authorization: { - action: "write", + action: "trigger", resource: (params) => ({ tasks: params.taskId }), superScopes: ["write:tasks", "admin"], }, @@ -59,6 +59,8 @@ const { action, loader } = createActionApiRoute( ? { traceparent, tracestate } : undefined; + const oneTimeUseToken = await getOneTimeUseToken(authentication); + logger.debug("Triggering task", { taskId: params.taskId, idempotencyKey, @@ -78,6 +80,7 @@ const { action, loader } = createActionApiRoute( triggerVersion: triggerVersion ?? undefined, traceContext, spanParentAsLink: spanParentAsLink === 1, + oneTimeUseToken, }); if (!run) { diff --git a/apps/webapp/app/routes/api.v1.tasks.batch.ts b/apps/webapp/app/routes/api.v1.tasks.batch.ts index fcd5edc39f..93dc6d1b1f 100644 --- a/apps/webapp/app/routes/api.v1.tasks.batch.ts +++ b/apps/webapp/app/routes/api.v1.tasks.batch.ts @@ -12,7 +12,7 @@ import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; import { BatchTriggerV2Service } from "~/v3/services/batchTriggerV2.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { OutOfEntitlementError } from "~/v3/services/triggerTask.server"; -import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; +import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; const { action, loader } = createActionApiRoute( @@ -22,7 +22,7 @@ const { action, loader } = createActionApiRoute( allowJWT: true, maxContentLength: env.BATCH_TASK_PAYLOAD_MAXIMUM_SIZE, authorization: { - action: "write", + action: "batchTrigger", resource: (_, __, ___, body) => ({ tasks: Array.from(new Set(body.items.map((i) => i.task))), }), @@ -56,6 +56,8 @@ const { action, loader } = createActionApiRoute( tracestate, } = headers; + const oneTimeUseToken = await getOneTimeUseToken(authentication); + logger.debug("Batch trigger request", { idempotencyKey, idempotencyKeyTTL, @@ -86,6 +88,7 @@ const { action, loader } = createActionApiRoute( triggerVersion: triggerVersion ?? undefined, traceContext, spanParentAsLink: spanParentAsLink === 1, + oneTimeUseToken, }); const $responseHeaders = await responseHeaders( diff --git a/apps/webapp/app/services/apiAuth.server.ts b/apps/webapp/app/services/apiAuth.server.ts index c3d6413999..dd973b257d 100644 --- a/apps/webapp/app/services/apiAuth.server.ts +++ b/apps/webapp/app/services/apiAuth.server.ts @@ -20,6 +20,8 @@ import { isPublicJWT, validatePublicJwtKey } from "./realtime/jwtAuth.server"; const ClaimsSchema = z.object({ scopes: z.array(z.string()).optional(), + // One-time use token + otu: z.boolean().optional(), }); type Optional = Prettify & Partial>>; @@ -39,6 +41,7 @@ export type ApiAuthenticationResultSuccess = { type: "PUBLIC" | "PRIVATE" | "PUBLIC_JWT"; environment: AuthenticatedEnvironment; scopes?: string[]; + oneTimeUse?: boolean; }; export type ApiAuthenticationResultFailure = { @@ -146,6 +149,7 @@ export async function authenticateApiKey( ...result, environment: validationResults.environment, scopes: parsedClaims.success ? parsedClaims.data.scopes : [], + oneTimeUse: parsedClaims.success ? parsedClaims.data.otu : false, }; } } @@ -227,6 +231,7 @@ export async function authenticateApiKeyWithFailure( ...result, environment: validationResults.environment, scopes: parsedClaims.success ? parsedClaims.data.scopes : [], + oneTimeUse: parsedClaims.success ? parsedClaims.data.otu : false, }; } } @@ -531,3 +536,20 @@ function calculateJWTExpiration() { return (Date.now() + DEFAULT_JWT_EXPIRATION_IN_MS) / 1000; } + +export async function getOneTimeUseToken( + auth: ApiAuthenticationResultSuccess +): Promise { + if (auth.type !== "PUBLIC_JWT") { + return; + } + + if (!auth.oneTimeUse) { + return; + } + + // Hash the API key to make it unique + const hash = await crypto.subtle.digest("SHA-256", new TextEncoder().encode(auth.apiKey)); + + return Buffer.from(hash).toString("hex"); +} diff --git a/apps/webapp/app/services/authorization.server.ts b/apps/webapp/app/services/authorization.server.ts index fbb949db38..315b6094e0 100644 --- a/apps/webapp/app/services/authorization.server.ts +++ b/apps/webapp/app/services/authorization.server.ts @@ -1,4 +1,4 @@ -export type AuthorizationAction = "read" | "write"; // Add more actions as needed +export type AuthorizationAction = "read" | "write" | string; // Add more actions as needed const ResourceTypes = ["tasks", "tags", "runs", "batch"] as const; diff --git a/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts b/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts index a1314145b5..82c2d48a7f 100644 --- a/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts +++ b/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts @@ -570,10 +570,25 @@ export function createActionApiRoute< scopes: authenticationResult.scopes, }); - if (!checkAuthorization(authenticationResult, action, $resource, superScopes)) { + const authorizationResult = checkAuthorization( + authenticationResult, + action, + $resource, + superScopes + ); + + if (!authorizationResult.authorized) { return await wrapResponse( request, - json({ error: "Unauthorized" }, { status: 403 }), + json( + { + error: `Unauthorized: ${authorizationResult.reason}`, + code: "unauthorized", + param: "access_token", + type: "authorization", + }, + { status: 403 } + ), corsStrategy !== "none" ); } diff --git a/apps/webapp/app/v3/services/batchTriggerV2.server.ts b/apps/webapp/app/v3/services/batchTriggerV2.server.ts index cbe2a5421c..733261f41a 100644 --- a/apps/webapp/app/v3/services/batchTriggerV2.server.ts +++ b/apps/webapp/app/v3/services/batchTriggerV2.server.ts @@ -5,7 +5,7 @@ import { packetRequiresOffloading, parsePacket, } from "@trigger.dev/core/v3"; -import { BatchTaskRun, TaskRunAttempt } from "@trigger.dev/database"; +import { BatchTaskRun, Prisma, TaskRunAttempt } from "@trigger.dev/database"; import { $transaction, PrismaClientOrTransaction } from "~/db.server"; import { env } from "~/env.server"; import { batchTaskRunItemStatusForRunStatus } from "~/models/taskRun.server"; @@ -48,6 +48,7 @@ export type BatchTriggerTaskServiceOptions = { triggerVersion?: string; traceContext?: Record; spanParentAsLink?: boolean; + oneTimeUseToken?: string; }; export class BatchTriggerV2Service extends BaseService { @@ -56,255 +57,288 @@ export class BatchTriggerV2Service extends BaseService { body: BatchTriggerTaskV2RequestBody, options: BatchTriggerTaskServiceOptions = {} ): Promise { - return await this.traceWithEnv( - "call()", - environment, - async (span) => { - const existingBatch = options.idempotencyKey - ? await this._prisma.batchTaskRun.findUnique({ - where: { - runtimeEnvironmentId_idempotencyKey: { - runtimeEnvironmentId: environment.id, - idempotencyKey: options.idempotencyKey, + try { + return await this.traceWithEnv( + "call()", + environment, + async (span) => { + const existingBatch = options.idempotencyKey + ? await this._prisma.batchTaskRun.findUnique({ + where: { + runtimeEnvironmentId_idempotencyKey: { + runtimeEnvironmentId: environment.id, + idempotencyKey: options.idempotencyKey, + }, }, - }, - }) - : undefined; + }) + : undefined; - if (existingBatch) { - if ( - existingBatch.idempotencyKeyExpiresAt && - existingBatch.idempotencyKeyExpiresAt < new Date() - ) { - logger.debug("[BatchTriggerV2][call] Idempotency key has expired", { - idempotencyKey: options.idempotencyKey, - batch: { - id: existingBatch.id, - friendlyId: existingBatch.friendlyId, - runCount: existingBatch.runCount, - idempotencyKeyExpiresAt: existingBatch.idempotencyKeyExpiresAt, - idempotencyKey: existingBatch.idempotencyKey, - }, - }); + if (existingBatch) { + if ( + existingBatch.idempotencyKeyExpiresAt && + existingBatch.idempotencyKeyExpiresAt < new Date() + ) { + logger.debug("[BatchTriggerV2][call] Idempotency key has expired", { + idempotencyKey: options.idempotencyKey, + batch: { + id: existingBatch.id, + friendlyId: existingBatch.friendlyId, + runCount: existingBatch.runCount, + idempotencyKeyExpiresAt: existingBatch.idempotencyKeyExpiresAt, + idempotencyKey: existingBatch.idempotencyKey, + }, + }); - // Update the existing batch to remove the idempotency key - await this._prisma.batchTaskRun.update({ - where: { id: existingBatch.id }, - data: { idempotencyKey: null }, - }); + // Update the existing batch to remove the idempotency key + await this._prisma.batchTaskRun.update({ + where: { id: existingBatch.id }, + data: { idempotencyKey: null }, + }); - // Don't return, just continue with the batch trigger - } else { - span.setAttribute("batchId", existingBatch.friendlyId); + // Don't return, just continue with the batch trigger + } else { + span.setAttribute("batchId", existingBatch.friendlyId); - return this.#respondWithExistingBatch(existingBatch, environment); + return this.#respondWithExistingBatch(existingBatch, environment); + } } - } - const batchId = generateFriendlyId("batch"); + const batchId = generateFriendlyId("batch"); - span.setAttribute("batchId", batchId); + span.setAttribute("batchId", batchId); - const dependentAttempt = body?.dependentAttempt - ? await this._prisma.taskRunAttempt.findUnique({ - where: { friendlyId: body.dependentAttempt }, - include: { - taskRun: { - select: { - id: true, - status: true, + const dependentAttempt = body?.dependentAttempt + ? await this._prisma.taskRunAttempt.findUnique({ + where: { friendlyId: body.dependentAttempt }, + include: { + taskRun: { + select: { + id: true, + status: true, + }, }, }, - }, - }) - : undefined; - - if ( - dependentAttempt && - (isFinalAttemptStatus(dependentAttempt.status) || - isFinalRunStatus(dependentAttempt.taskRun.status)) - ) { - logger.debug("[BatchTriggerV2][call] Dependent attempt or run is in a terminal state", { - dependentAttempt: dependentAttempt, - batchId, - }); + }) + : undefined; - throw new ServiceValidationError( - "Cannot process batch as the parent run is already in a terminal state" - ); - } + if ( + dependentAttempt && + (isFinalAttemptStatus(dependentAttempt.status) || + isFinalRunStatus(dependentAttempt.taskRun.status)) + ) { + logger.debug("[BatchTriggerV2][call] Dependent attempt or run is in a terminal state", { + dependentAttempt: dependentAttempt, + batchId, + }); - if (environment.type !== "DEVELOPMENT") { - const result = await getEntitlement(environment.organizationId); - if (result && result.hasAccess === false) { - throw new OutOfEntitlementError(); + throw new ServiceValidationError( + "Cannot process batch as the parent run is already in a terminal state" + ); } - } - const idempotencyKeys = body.items.map((i) => i.options?.idempotencyKey).filter(Boolean); + if (environment.type !== "DEVELOPMENT") { + const result = await getEntitlement(environment.organizationId); + if (result && result.hasAccess === false) { + throw new OutOfEntitlementError(); + } + } - const cachedRuns = - idempotencyKeys.length > 0 - ? await this._prisma.taskRun.findMany({ - where: { - runtimeEnvironmentId: environment.id, - idempotencyKey: { - in: body.items.map((i) => i.options?.idempotencyKey).filter(Boolean), + const idempotencyKeys = body.items.map((i) => i.options?.idempotencyKey).filter(Boolean); + + const cachedRuns = + idempotencyKeys.length > 0 + ? await this._prisma.taskRun.findMany({ + where: { + runtimeEnvironmentId: environment.id, + idempotencyKey: { + in: body.items.map((i) => i.options?.idempotencyKey).filter(Boolean), + }, }, - }, - select: { - friendlyId: true, - idempotencyKey: true, - idempotencyKeyExpiresAt: true, - }, - }) - : []; + select: { + friendlyId: true, + idempotencyKey: true, + idempotencyKeyExpiresAt: true, + }, + }) + : []; - if (cachedRuns.length) { - logger.debug("[BatchTriggerV2][call] Found cached runs", { - cachedRuns, - batchId, - }); - } + if (cachedRuns.length) { + logger.debug("[BatchTriggerV2][call] Found cached runs", { + cachedRuns, + batchId, + }); + } - // Now we need to create an array of all the run IDs, in order - // If we have a cached run, that isn't expired, we should use that run ID - // If we have a cached run, that is expired, we should generate a new run ID and save that cached run ID to a set of expired run IDs - // If we don't have a cached run, we should generate a new run ID - const expiredRunIds = new Set(); - let cachedRunCount = 0; + // Now we need to create an array of all the run IDs, in order + // If we have a cached run, that isn't expired, we should use that run ID + // If we have a cached run, that is expired, we should generate a new run ID and save that cached run ID to a set of expired run IDs + // If we don't have a cached run, we should generate a new run ID + const expiredRunIds = new Set(); + let cachedRunCount = 0; - const runs = body.items.map((item) => { - const cachedRun = cachedRuns.find( - (r) => r.idempotencyKey === item.options?.idempotencyKey - ); + const runs = body.items.map((item) => { + const cachedRun = cachedRuns.find( + (r) => r.idempotencyKey === item.options?.idempotencyKey + ); - if (cachedRun) { - if ( - cachedRun.idempotencyKeyExpiresAt && - cachedRun.idempotencyKeyExpiresAt < new Date() - ) { - expiredRunIds.add(cachedRun.friendlyId); + if (cachedRun) { + if ( + cachedRun.idempotencyKeyExpiresAt && + cachedRun.idempotencyKeyExpiresAt < new Date() + ) { + expiredRunIds.add(cachedRun.friendlyId); + + return { + id: generateFriendlyId("run"), + isCached: false, + idempotencyKey: item.options?.idempotencyKey ?? undefined, + taskIdentifier: item.task, + }; + } + + cachedRunCount++; return { - id: generateFriendlyId("run"), - isCached: false, + id: cachedRun.friendlyId, + isCached: true, idempotencyKey: item.options?.idempotencyKey ?? undefined, taskIdentifier: item.task, }; } - cachedRunCount++; - return { - id: cachedRun.friendlyId, - isCached: true, + id: generateFriendlyId("run"), + isCached: false, idempotencyKey: item.options?.idempotencyKey ?? undefined, taskIdentifier: item.task, }; - } + }); - return { - id: generateFriendlyId("run"), - isCached: false, - idempotencyKey: item.options?.idempotencyKey ?? undefined, - taskIdentifier: item.task, - }; - }); + // Calculate how many new runs we need to create + const newRunCount = body.items.length - cachedRunCount; - // Calculate how many new runs we need to create - const newRunCount = body.items.length - cachedRunCount; + if (newRunCount === 0) { + logger.debug("[BatchTriggerV2][call] All runs are cached", { + batchId, + }); - if (newRunCount === 0) { - logger.debug("[BatchTriggerV2][call] All runs are cached", { - batchId, - }); + await this._prisma.batchTaskRun.create({ + data: { + friendlyId: batchId, + runtimeEnvironmentId: environment.id, + idempotencyKey: options.idempotencyKey, + idempotencyKeyExpiresAt: options.idempotencyKeyExpiresAt, + dependentTaskAttemptId: dependentAttempt?.id, + runCount: body.items.length, + runIds: runs.map((r) => r.id), + status: "COMPLETED", + batchVersion: "v2", + oneTimeUseToken: options.oneTimeUseToken, + }, + }); - await this._prisma.batchTaskRun.create({ - data: { - friendlyId: batchId, - runtimeEnvironmentId: environment.id, - idempotencyKey: options.idempotencyKey, - idempotencyKeyExpiresAt: options.idempotencyKeyExpiresAt, - dependentTaskAttemptId: dependentAttempt?.id, - runCount: body.items.length, - runIds: runs.map((r) => r.id), - status: "COMPLETED", - batchVersion: "v2", + return { + id: batchId, + isCached: false, + idempotencyKey: options.idempotencyKey ?? undefined, + runs, + }; + } + + const queueSizeGuard = await guardQueueSizeLimitsForEnv(environment, marqs, newRunCount); + + logger.debug("Queue size guard result", { + newRunCount, + queueSizeGuard, + environment: { + id: environment.id, + type: environment.type, + organization: environment.organization, + project: environment.project, }, }); - return { - id: batchId, - isCached: false, - idempotencyKey: options.idempotencyKey ?? undefined, - runs, - }; - } + if (!queueSizeGuard.isWithinLimits) { + throw new ServiceValidationError( + `Cannot trigger ${newRunCount} tasks as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}` + ); + } - const queueSizeGuard = await guardQueueSizeLimitsForEnv(environment, marqs, newRunCount); + // Expire the cached runs that are no longer valid + if (expiredRunIds.size) { + logger.debug("Expiring cached runs", { + expiredRunIds: Array.from(expiredRunIds), + batchId, + }); - logger.debug("Queue size guard result", { - newRunCount, - queueSizeGuard, - environment: { - id: environment.id, - type: environment.type, - organization: environment.organization, - project: environment.project, - }, - }); + // TODO: is there a limit to the number of items we can update in a single query? + await this._prisma.taskRun.updateMany({ + where: { friendlyId: { in: Array.from(expiredRunIds) } }, + data: { idempotencyKey: null }, + }); + } - if (!queueSizeGuard.isWithinLimits) { - throw new ServiceValidationError( - `Cannot trigger ${newRunCount} tasks as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}` + // Upload to object store + const payloadPacket = await this.#handlePayloadPacket( + body.items, + `batch/${batchId}`, + environment ); - } - // Expire the cached runs that are no longer valid - if (expiredRunIds.size) { - logger.debug("Expiring cached runs", { - expiredRunIds: Array.from(expiredRunIds), + const batch = await this.#createAndProcessBatchTaskRun( batchId, - }); + runs, + payloadPacket, + newRunCount, + environment, + body, + options, + dependentAttempt ?? undefined + ); - // TODO: is there a limit to the number of items we can update in a single query? - await this._prisma.taskRun.updateMany({ - where: { friendlyId: { in: Array.from(expiredRunIds) } }, - data: { idempotencyKey: null }, - }); + if (!batch) { + throw new Error("Failed to create batch"); + } + + return { + id: batch.friendlyId, + isCached: false, + idempotencyKey: batch.idempotencyKey ?? undefined, + runs, + }; } + ); + } catch (error) { + // Detect a prisma transaction Unique constraint violation + if (error instanceof Prisma.PrismaClientKnownRequestError) { + logger.debug("BatchTriggerV2: Prisma transaction error", { + code: error.code, + message: error.message, + meta: error.meta, + }); - // Upload to object store - const payloadPacket = await this.#handlePayloadPacket( - body.items, - `batch/${batchId}`, - environment - ); - - const batch = await this.#createAndProcessBatchTaskRun( - batchId, - runs, - payloadPacket, - newRunCount, - environment, - body, - options, - dependentAttempt ?? undefined - ); + if (error.code === "P2002") { + const target = error.meta?.target; - if (!batch) { - throw new Error("Failed to create batch"); + if ( + Array.isArray(target) && + target.length > 0 && + typeof target[0] === "string" && + target[0].includes("oneTimeUseToken") + ) { + throw new ServiceValidationError( + "Cannot batch trigger with a one-time use token as it has already been used." + ); + } else { + throw new ServiceValidationError( + "Cannot batch trigger as it has already been triggered with the same idempotency key." + ); + } } - - return { - id: batch.friendlyId, - isCached: false, - idempotencyKey: batch.idempotencyKey ?? undefined, - runs, - }; } - ); + + throw error; + } } async #createAndProcessBatchTaskRun( @@ -336,6 +370,7 @@ export class BatchTriggerV2Service extends BaseService { payloadType: payloadPacket.dataType, options, batchVersion: "v2", + oneTimeUseToken: options.oneTimeUseToken, }, }); @@ -413,6 +448,7 @@ export class BatchTriggerV2Service extends BaseService { payloadType: payloadPacket.dataType, options, batchVersion: "v2", + oneTimeUseToken: options.oneTimeUseToken, }, }); diff --git a/apps/webapp/app/v3/services/triggerTask.server.ts b/apps/webapp/app/v3/services/triggerTask.server.ts index 6501e1a717..5792360594 100644 --- a/apps/webapp/app/v3/services/triggerTask.server.ts +++ b/apps/webapp/app/v3/services/triggerTask.server.ts @@ -26,6 +26,7 @@ import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server"; import { guardQueueSizeLimitsForEnv } from "../queueSizeLimits.server"; import { clampMaxDuration } from "../utils/maxDuration"; import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; +import { Prisma } from "@trigger.dev/database"; export type TriggerTaskServiceOptions = { idempotencyKey?: string; @@ -38,6 +39,7 @@ export type TriggerTaskServiceOptions = { customIcon?: string; runId?: string; skipChecks?: boolean; + oneTimeUseToken?: string; }; export class OutOfEntitlementError extends Error { @@ -275,191 +277,217 @@ export class TriggerTaskService extends BaseService { }) : undefined; - return await eventRepository.traceEvent( - taskId, - { - context: options.traceContext, - spanParentAsLink: options.spanParentAsLink, - parentAsLinkType: options.parentAsLinkType, - kind: "SERVER", - environment, - taskSlug: taskId, - attributes: { - properties: { - [SemanticInternalAttributes.SHOW_ACTIONS]: true, - }, - style: { - icon: options.customIcon ?? "task", + try { + return await eventRepository.traceEvent( + taskId, + { + context: options.traceContext, + spanParentAsLink: options.spanParentAsLink, + parentAsLinkType: options.parentAsLinkType, + kind: "SERVER", + environment, + taskSlug: taskId, + attributes: { + properties: { + [SemanticInternalAttributes.SHOW_ACTIONS]: true, + }, + style: { + icon: options.customIcon ?? "task", + }, + runIsTest: body.options?.test ?? false, + batchId: options.batchId, + idempotencyKey, }, - runIsTest: body.options?.test ?? false, - batchId: options.batchId, - idempotencyKey, + incomplete: true, + immediate: true, }, - incomplete: true, - immediate: true, - }, - async (event, traceContext, traceparent) => { - const run = await autoIncrementCounter.incrementInTransaction( - `v3-run:${environment.id}:${taskId}`, - async (num, tx) => { - const lockedToBackgroundWorker = body.options?.lockToVersion - ? await tx.backgroundWorker.findUnique({ - where: { - projectId_runtimeEnvironmentId_version: { - projectId: environment.projectId, - runtimeEnvironmentId: environment.id, - version: body.options?.lockToVersion, + async (event, traceContext, traceparent) => { + const run = await autoIncrementCounter.incrementInTransaction( + `v3-run:${environment.id}:${taskId}`, + async (num, tx) => { + const lockedToBackgroundWorker = body.options?.lockToVersion + ? await tx.backgroundWorker.findUnique({ + where: { + projectId_runtimeEnvironmentId_version: { + projectId: environment.projectId, + runtimeEnvironmentId: environment.id, + version: body.options?.lockToVersion, + }, }, - }, - }) - : undefined; + }) + : undefined; - let queueName = sanitizeQueueName( - await this.#getQueueName(taskId, environment, body.options?.queue?.name) - ); + let queueName = sanitizeQueueName( + await this.#getQueueName(taskId, environment, body.options?.queue?.name) + ); - // Check that the queuename is not an empty string - if (!queueName) { - queueName = sanitizeQueueName(`task/${taskId}`); - } - - event.setAttribute("queueName", queueName); - span.setAttribute("queueName", queueName); - - //upsert tags - let tagIds: string[] = []; - const bodyTags = - typeof body.options?.tags === "string" ? [body.options.tags] : body.options?.tags; - if (bodyTags && bodyTags.length > 0) { - for (const tag of bodyTags) { - const tagRecord = await createTag({ - tag, - projectId: environment.projectId, - }); - if (tagRecord) { - tagIds.push(tagRecord.id); + // Check that the queuename is not an empty string + if (!queueName) { + queueName = sanitizeQueueName(`task/${taskId}`); + } + + event.setAttribute("queueName", queueName); + span.setAttribute("queueName", queueName); + + //upsert tags + let tagIds: string[] = []; + const bodyTags = + typeof body.options?.tags === "string" ? [body.options.tags] : body.options?.tags; + if (bodyTags && bodyTags.length > 0) { + for (const tag of bodyTags) { + const tagRecord = await createTag({ + tag, + projectId: environment.projectId, + }); + if (tagRecord) { + tagIds.push(tagRecord.id); + } } } - } - - const depth = dependentAttempt - ? dependentAttempt.taskRun.depth + 1 - : parentAttempt - ? parentAttempt.taskRun.depth + 1 - : dependentBatchRun?.dependentTaskAttempt - ? dependentBatchRun.dependentTaskAttempt.taskRun.depth + 1 - : 0; - - const taskRun = await tx.taskRun.create({ - data: { - status: delayUntil ? "DELAYED" : "PENDING", - number: num, - friendlyId: runFriendlyId, - runtimeEnvironmentId: environment.id, - projectId: environment.projectId, - idempotencyKey, - idempotencyKeyExpiresAt: idempotencyKey ? idempotencyKeyExpiresAt : undefined, - taskIdentifier: taskId, - payload: payloadPacket.data ?? "", - payloadType: payloadPacket.dataType, - context: body.context, - traceContext: traceContext, - traceId: event.traceId, - spanId: event.spanId, - parentSpanId: - options.parentAsLinkType === "replay" ? undefined : traceparent?.spanId, - lockedToVersionId: lockedToBackgroundWorker?.id, - taskVersion: lockedToBackgroundWorker?.version, - sdkVersion: lockedToBackgroundWorker?.sdkVersion, - cliVersion: lockedToBackgroundWorker?.cliVersion, - concurrencyKey: body.options?.concurrencyKey, - queue: queueName, - isTest: body.options?.test ?? false, - delayUntil, - queuedAt: delayUntil ? undefined : new Date(), - maxAttempts: body.options?.maxAttempts, - ttl, - tags: - tagIds.length === 0 - ? undefined - : { - connect: tagIds.map((id) => ({ id })), - }, - parentTaskRunId: - dependentAttempt?.taskRun.id ?? - parentAttempt?.taskRun.id ?? - dependentBatchRun?.dependentTaskAttempt?.taskRun.id, - parentTaskRunAttemptId: - dependentAttempt?.id ?? - parentAttempt?.id ?? - dependentBatchRun?.dependentTaskAttempt?.id, - rootTaskRunId: - dependentAttempt?.taskRun.rootTaskRunId ?? - dependentAttempt?.taskRun.id ?? - parentAttempt?.taskRun.rootTaskRunId ?? - parentAttempt?.taskRun.id ?? - dependentBatchRun?.dependentTaskAttempt?.taskRun.rootTaskRunId ?? - dependentBatchRun?.dependentTaskAttempt?.taskRun.id, - batchId: dependentBatchRun?.id ?? parentBatchRun?.id, - resumeParentOnCompletion: !!(dependentAttempt ?? dependentBatchRun), - depth, - metadata: metadataPacket?.data, - metadataType: metadataPacket?.dataType, - seedMetadata: metadataPacket?.data, - seedMetadataType: metadataPacket?.dataType, - maxDurationInSeconds: body.options?.maxDuration - ? clampMaxDuration(body.options.maxDuration) - : undefined, - runTags: bodyTags, - }, - }); - event.setAttribute("runId", taskRun.friendlyId); - span.setAttribute("runId", taskRun.friendlyId); + const depth = dependentAttempt + ? dependentAttempt.taskRun.depth + 1 + : parentAttempt + ? parentAttempt.taskRun.depth + 1 + : dependentBatchRun?.dependentTaskAttempt + ? dependentBatchRun.dependentTaskAttempt.taskRun.depth + 1 + : 0; - if (dependentAttempt) { - await tx.taskRunDependency.create({ + const taskRun = await tx.taskRun.create({ data: { - taskRunId: taskRun.id, - dependentAttemptId: dependentAttempt.id, - }, - }); - } else if (dependentBatchRun) { - await tx.taskRunDependency.create({ - data: { - taskRunId: taskRun.id, - dependentBatchRunId: dependentBatchRun.id, + status: delayUntil ? "DELAYED" : "PENDING", + number: num, + friendlyId: runFriendlyId, + runtimeEnvironmentId: environment.id, + projectId: environment.projectId, + idempotencyKey, + idempotencyKeyExpiresAt: idempotencyKey ? idempotencyKeyExpiresAt : undefined, + taskIdentifier: taskId, + payload: payloadPacket.data ?? "", + payloadType: payloadPacket.dataType, + context: body.context, + traceContext: traceContext, + traceId: event.traceId, + spanId: event.spanId, + parentSpanId: + options.parentAsLinkType === "replay" ? undefined : traceparent?.spanId, + lockedToVersionId: lockedToBackgroundWorker?.id, + taskVersion: lockedToBackgroundWorker?.version, + sdkVersion: lockedToBackgroundWorker?.sdkVersion, + cliVersion: lockedToBackgroundWorker?.cliVersion, + concurrencyKey: body.options?.concurrencyKey, + queue: queueName, + isTest: body.options?.test ?? false, + delayUntil, + queuedAt: delayUntil ? undefined : new Date(), + maxAttempts: body.options?.maxAttempts, + ttl, + tags: + tagIds.length === 0 + ? undefined + : { + connect: tagIds.map((id) => ({ id })), + }, + parentTaskRunId: + dependentAttempt?.taskRun.id ?? + parentAttempt?.taskRun.id ?? + dependentBatchRun?.dependentTaskAttempt?.taskRun.id, + parentTaskRunAttemptId: + dependentAttempt?.id ?? + parentAttempt?.id ?? + dependentBatchRun?.dependentTaskAttempt?.id, + rootTaskRunId: + dependentAttempt?.taskRun.rootTaskRunId ?? + dependentAttempt?.taskRun.id ?? + parentAttempt?.taskRun.rootTaskRunId ?? + parentAttempt?.taskRun.id ?? + dependentBatchRun?.dependentTaskAttempt?.taskRun.rootTaskRunId ?? + dependentBatchRun?.dependentTaskAttempt?.taskRun.id, + batchId: dependentBatchRun?.id ?? parentBatchRun?.id, + resumeParentOnCompletion: !!(dependentAttempt ?? dependentBatchRun), + depth, + metadata: metadataPacket?.data, + metadataType: metadataPacket?.dataType, + seedMetadata: metadataPacket?.data, + seedMetadataType: metadataPacket?.dataType, + maxDurationInSeconds: body.options?.maxDuration + ? clampMaxDuration(body.options.maxDuration) + : undefined, + runTags: bodyTags, + oneTimeUseToken: options.oneTimeUseToken, }, }); - } - if (body.options?.queue) { - const concurrencyLimit = - typeof body.options.queue.concurrencyLimit === "number" - ? Math.max(0, body.options.queue.concurrencyLimit) - : undefined; + event.setAttribute("runId", taskRun.friendlyId); + span.setAttribute("runId", taskRun.friendlyId); - let taskQueue = await tx.taskQueue.findFirst({ - where: { - runtimeEnvironmentId: environment.id, - name: queueName, - }, - }); + if (dependentAttempt) { + await tx.taskRunDependency.create({ + data: { + taskRunId: taskRun.id, + dependentAttemptId: dependentAttempt.id, + }, + }); + } else if (dependentBatchRun) { + await tx.taskRunDependency.create({ + data: { + taskRunId: taskRun.id, + dependentBatchRunId: dependentBatchRun.id, + }, + }); + } - const existingConcurrencyLimit = - typeof taskQueue?.concurrencyLimit === "number" - ? taskQueue.concurrencyLimit - : undefined; + if (body.options?.queue) { + const concurrencyLimit = + typeof body.options.queue.concurrencyLimit === "number" + ? Math.max(0, body.options.queue.concurrencyLimit) + : undefined; - if (taskQueue) { - if (existingConcurrencyLimit !== concurrencyLimit) { - taskQueue = await tx.taskQueue.update({ - where: { - id: taskQueue.id, - }, + let taskQueue = await tx.taskQueue.findFirst({ + where: { + runtimeEnvironmentId: environment.id, + name: queueName, + }, + }); + + const existingConcurrencyLimit = + typeof taskQueue?.concurrencyLimit === "number" + ? taskQueue.concurrencyLimit + : undefined; + + if (taskQueue) { + if (existingConcurrencyLimit !== concurrencyLimit) { + taskQueue = await tx.taskQueue.update({ + where: { + id: taskQueue.id, + }, + data: { + concurrencyLimit: + typeof concurrencyLimit === "number" ? concurrencyLimit : null, + }, + }); + + if (typeof taskQueue.concurrencyLimit === "number") { + await marqs?.updateQueueConcurrencyLimits( + environment, + taskQueue.name, + taskQueue.concurrencyLimit + ); + } else { + await marqs?.removeQueueConcurrencyLimits(environment, taskQueue.name); + } + } + } else { + const queueId = generateFriendlyId("queue"); + + taskQueue = await tx.taskQueue.create({ data: { - concurrencyLimit: - typeof concurrencyLimit === "number" ? concurrencyLimit : null, + friendlyId: queueId, + name: queueName, + concurrencyLimit, + runtimeEnvironmentId: environment.id, + projectId: environment.projectId, + type: "NAMED", }, }); @@ -469,106 +497,113 @@ export class TriggerTaskService extends BaseService { taskQueue.name, taskQueue.concurrencyLimit ); - } else { - await marqs?.removeQueueConcurrencyLimits(environment, taskQueue.name); } } - } else { - const queueId = generateFriendlyId("queue"); - - taskQueue = await tx.taskQueue.create({ - data: { - friendlyId: queueId, - name: queueName, - concurrencyLimit, - runtimeEnvironmentId: environment.id, - projectId: environment.projectId, - type: "NAMED", - }, - }); - - if (typeof taskQueue.concurrencyLimit === "number") { - await marqs?.updateQueueConcurrencyLimits( - environment, - taskQueue.name, - taskQueue.concurrencyLimit - ); - } } - } - if (taskRun.delayUntil) { - await workerQueue.enqueue( - "v3.enqueueDelayedRun", - { runId: taskRun.id }, - { tx, runAt: delayUntil, jobKey: `v3.enqueueDelayedRun.${taskRun.id}` } - ); - } + if (taskRun.delayUntil) { + await workerQueue.enqueue( + "v3.enqueueDelayedRun", + { runId: taskRun.id }, + { tx, runAt: delayUntil, jobKey: `v3.enqueueDelayedRun.${taskRun.id}` } + ); + } - if (!taskRun.delayUntil && taskRun.ttl) { - const expireAt = parseNaturalLanguageDuration(taskRun.ttl); + if (!taskRun.delayUntil && taskRun.ttl) { + const expireAt = parseNaturalLanguageDuration(taskRun.ttl); - if (expireAt) { - await ExpireEnqueuedRunService.enqueue(taskRun.id, expireAt, tx); + if (expireAt) { + await ExpireEnqueuedRunService.enqueue(taskRun.id, expireAt, tx); + } } - } - return taskRun; - }, - async (_, tx) => { - const counter = await tx.taskRunNumberCounter.findUnique({ - where: { - taskIdentifier_environmentId: { - taskIdentifier: taskId, - environmentId: environment.id, + return taskRun; + }, + async (_, tx) => { + const counter = await tx.taskRunNumberCounter.findUnique({ + where: { + taskIdentifier_environmentId: { + taskIdentifier: taskId, + environmentId: environment.id, + }, }, - }, - select: { lastNumber: true }, - }); - - return counter?.lastNumber; - }, - this._prisma - ); + select: { lastNumber: true }, + }); - //release the concurrency for the env and org, if part of a (batch)triggerAndWait - if (dependentAttempt) { - const isSameTask = dependentAttempt.taskRun.taskIdentifier === taskId; - await marqs?.releaseConcurrency(dependentAttempt.taskRun.id, isSameTask); - } - if (dependentBatchRun?.dependentTaskAttempt) { - const isSameTask = - dependentBatchRun.dependentTaskAttempt.taskRun.taskIdentifier === taskId; - await marqs?.releaseConcurrency( - dependentBatchRun.dependentTaskAttempt.taskRun.id, - isSameTask + return counter?.lastNumber; + }, + this._prisma ); - } - if (!run) { - return; - } + //release the concurrency for the env and org, if part of a (batch)triggerAndWait + if (dependentAttempt) { + const isSameTask = dependentAttempt.taskRun.taskIdentifier === taskId; + await marqs?.releaseConcurrency(dependentAttempt.taskRun.id, isSameTask); + } + if (dependentBatchRun?.dependentTaskAttempt) { + const isSameTask = + dependentBatchRun.dependentTaskAttempt.taskRun.taskIdentifier === taskId; + await marqs?.releaseConcurrency( + dependentBatchRun.dependentTaskAttempt.taskRun.id, + isSameTask + ); + } + + if (!run) { + return; + } + + // We need to enqueue the task run into the appropriate queue. This is done after the tx completes to prevent a race condition where the task run hasn't been created yet by the time we dequeue. + if (run.status === "PENDING") { + await marqs?.enqueueMessage( + environment, + run.queue, + run.id, + { + type: "EXECUTE", + taskIdentifier: taskId, + projectId: environment.projectId, + environmentId: environment.id, + environmentType: environment.type, + }, + body.options?.concurrencyKey + ); + } - // We need to enqueue the task run into the appropriate queue. This is done after the tx completes to prevent a race condition where the task run hasn't been created yet by the time we dequeue. - if (run.status === "PENDING") { - await marqs?.enqueueMessage( - environment, - run.queue, - run.id, - { - type: "EXECUTE", - taskIdentifier: taskId, - projectId: environment.projectId, - environmentId: environment.id, - environmentType: environment.type, - }, - body.options?.concurrencyKey - ); + return run; } + ); + } catch (error) { + // Detect a prisma transaction Unique constraint violation + if (error instanceof Prisma.PrismaClientKnownRequestError) { + logger.debug("TriggerTask: Prisma transaction error", { + code: error.code, + message: error.message, + meta: error.meta, + }); - return run; + if (error.code === "P2002") { + const target = error.meta?.target; + + if ( + Array.isArray(target) && + target.length > 0 && + typeof target[0] === "string" && + target[0].includes("oneTimeUseToken") + ) { + throw new ServiceValidationError( + `Cannot trigger ${taskId} with a one-time use token as it has already been used.` + ); + } else { + throw new ServiceValidationError( + `Cannot trigger ${taskId} as it has already been triggered with the same idempotency key.` + ); + } + } } - ); + + throw error; + } }); } diff --git a/apps/webapp/test/authorization.test.ts b/apps/webapp/test/authorization.test.ts index ad49e6e60e..c2c782f8dc 100644 --- a/apps/webapp/test/authorization.test.ts +++ b/apps/webapp/test/authorization.test.ts @@ -10,6 +10,10 @@ describe("checkAuthorization", () => { scopes: ["read:runs:run_1234", "read:tasks", "read:tags:tag_5678"], }; const publicJwtEntityNoPermissions: AuthorizationEntity = { type: "PUBLIC_JWT" }; + const publicJwtEntityWithTaskWritePermissions: AuthorizationEntity = { + type: "PUBLIC_JWT", + scopes: ["write:tasks:task-1"], + }; describe("PRIVATE entity", () => { it("should always return authorized regardless of action or resource", () => { @@ -49,6 +53,28 @@ describe("checkAuthorization", () => { }); }); + describe("PUBLIC_JWT entity with task write scope", () => { + it("should return authorized for specific resource scope", () => { + const result = checkAuthorization(publicJwtEntityWithTaskWritePermissions, "write", { + tasks: "task-1", + }); + expect(result.authorized).toBe(true); + expect(result).not.toHaveProperty("reason"); + }); + + it("should return unauthorized with reason for unauthorized specific resources", () => { + const result = checkAuthorization(publicJwtEntityWithTaskWritePermissions, "write", { + tasks: "task-2", + }); + expect(result.authorized).toBe(false); + if (!result.authorized) { + expect(result.reason).toBe( + "Public Access Token is missing required permissions. Token has the following permissions: 'write:tasks:task-1'. See https://trigger.dev/docs/frontend/overview#authentication for more information." + ); + } + }); + }); + describe("PUBLIC_JWT entity with scope", () => { it("should return authorized for specific resource scope", () => { const result = checkAuthorization(publicJwtEntityWithPermissions, "read", { diff --git a/internal-packages/database/prisma/migrations/20241129141824_add_one_time_use_tokens/migration.sql b/internal-packages/database/prisma/migrations/20241129141824_add_one_time_use_tokens/migration.sql new file mode 100644 index 0000000000..774d2619c4 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20241129141824_add_one_time_use_tokens/migration.sql @@ -0,0 +1,5 @@ +-- AlterTable +ALTER TABLE "BatchTaskRun" ADD COLUMN "oneTimeUseToken" TEXT; + +-- AlterTable +ALTER TABLE "TaskRun" ADD COLUMN "oneTimeUseToken" TEXT; diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index 143f2203c3..dee8253c13 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -1722,6 +1722,10 @@ model TaskRun { expiredAt DateTime? maxAttempts Int? + /// optional token that can be used to authenticate the task run + /// TODO: add a unique constraint on the token + oneTimeUseToken String? + batchItems BatchTaskRunItem[] dependency TaskRunDependency? CheckpointRestoreEvent CheckpointRestoreEvent[] @@ -1787,6 +1791,8 @@ model TaskRun { maxDurationInSeconds Int? + // TODO: uncomment this + // @@unique([oneTimeUseToken]) @@unique([runtimeEnvironmentId, taskIdentifier, idempotencyKey]) // Finding child runs @@index([parentTaskRunId]) @@ -2161,6 +2167,10 @@ model BatchTaskRun { options Json? batchVersion String @default("v1") + /// optional token that can be used to authenticate the task run + /// TODO: add a unique constraint on the token + oneTimeUseToken String? + ///all the below properties are engine v1 only items BatchTaskRunItem[] taskIdentifier String? @@ -2170,6 +2180,8 @@ model BatchTaskRun { dependentTaskAttemptId String? runDependencies TaskRunDependency[] @relation("dependentBatchRun") + // TODO: uncomment this + // @@unique([oneTimeUseToken]) ///this is used for all engine versions @@unique([runtimeEnvironmentId, idempotencyKey]) } diff --git a/packages/trigger-sdk/src/v3/auth.ts b/packages/trigger-sdk/src/v3/auth.ts index c871b3b664..8d45cbb180 100644 --- a/packages/trigger-sdk/src/v3/auth.ts +++ b/packages/trigger-sdk/src/v3/auth.ts @@ -25,11 +25,14 @@ export function configure(options: ApiClientConfiguration) { export const auth = { configure, createPublicToken, + createTriggerPublicToken, + createBatchTriggerPublicToken, withAuth, + withPublicToken, + withTriggerPublicToken, + withBatchTriggerPublicToken, }; -type PublicTokenPermissionAction = "read" | "write"; // Add more actions as needed - type PublicTokenPermissionProperties = { /** * Grant access to specific tasks @@ -53,7 +56,26 @@ type PublicTokenPermissionProperties = { }; export type PublicTokenPermissions = { - [key in PublicTokenPermissionAction]?: PublicTokenPermissionProperties; + read?: PublicTokenPermissionProperties; + + /** + * @deprecated use trigger instead + */ + write?: PublicTokenPermissionProperties; + + /** + * Use auth.createTriggerPublicToken + */ + trigger?: { + tasks: string | string[]; + }; + + /** + * Use auth.createBatchTriggerPublicToken + */ + batchTrigger?: { + tasks: string | string[]; + }; }; export type CreatePublicTokenOptions = { @@ -120,6 +142,180 @@ async function createPublicToken(options?: CreatePublicTokenOptions): Promise Promise) { + const token = await createPublicToken(options); + + await withAuth({ accessToken: token }, fn); +} + +export type CreateTriggerTokenOptions = { + /** + * The expiration time for the token. This can be a number representing the time in milliseconds, a `Date` object, or a string. + * + * @example + * + * ```typescript + * expirationTime: "1h" + * ``` + */ + expirationTime?: number | Date | string; + + /** + * Whether the token can be used multiple times. By default trigger tokens are one-time use. + * @default false + */ + multipleUse?: boolean; +}; + +/** + * Creates a one-time use token to trigger a specific task. + * + * @param task - The task ID or an array of task IDs that the token should allow triggering. + * @param options - Options for creating the one-time use token. + * @returns A promise that resolves to a string representing the generated one-time use token. + * + * @example + * Create a one-time use public token that allows triggering a specific task: + * + * ```ts + * import { auth } from "@trigger.dev/sdk/v3"; + * + * const token = await auth.createTriggerPublicToken("my-task"); + * ``` + * + * @example You can also create a one-time use token that allows triggering multiple tasks: + * + * ```ts + * import { auth } from "@trigger.dev/sdk/v3"; + * + * const token = await auth.createTriggerPublicToken(["task1", "task2"]); + * ``` + * + * @example You can also create a one-time use token that allows triggering a task with a specific expiration time: + * + * ```ts + * import { auth } from "@trigger.dev/sdk/v3"; + * + * const token = await auth.createTriggerPublicToken("my-task", { expirationTime: "1h" }); + * ``` + */ +async function createTriggerPublicToken( + task: string | string[], + options?: CreateTriggerTokenOptions +): Promise { + const apiClient = apiClientManager.clientOrThrow(); + + const claims = await apiClient.generateJWTClaims(); + + return await internal_generateJWT({ + secretKey: apiClient.accessToken, + payload: { + ...claims, + otu: typeof options?.multipleUse === "boolean" ? !options.multipleUse : true, + scopes: flattenScopes({ + trigger: { + tasks: task, + }, + }), + }, + expirationTime: options?.expirationTime, + }); +} + +/** + * Executes a function with a one-time use token that allows triggering a specific task. + * + * @param task - The task ID or an array of task IDs that the token should allow triggering. + * @param options - Options for creating the one-time use token. + * @param fn - The asynchronous function to be executed with the one-time use token. + */ +async function withTriggerPublicToken( + task: string | string[], + options: CreateTriggerTokenOptions = {}, + fn: () => Promise +) { + const token = await createTriggerPublicToken(task, options); + + await withAuth({ accessToken: token }, fn); +} + +/** + * Creates a one-time use token to batch trigger a specific task or tasks. + * + * @param task - The task ID or an array of task IDs that the token should allow triggering. + * @param options - Options for creating the one-time use token. + * @returns A promise that resolves to a string representing the generated one-time use token. + * + * @example + * + * ```ts + * import { auth } from "@trigger.dev/sdk/v3"; + * + * const token = await auth.createBatchTriggerPublicToken("my-task"); + * ``` + * + * @example You can also create a one-time use token that allows batch triggering multiple tasks: + * + * ```ts + * import { auth } from "@trigger.dev/sdk/v3"; + * + * const token = await auth.createBatchTriggerPublicToken(["task1", "task2"]); + * ``` + * + * @example You can also create a one-time use token that allows batch triggering a task with a specific expiration time: + * + * ```ts + * import { auth } from "@trigger.dev/sdk/v3"; + * + * const token = await auth.createBatchTriggerPublicToken("my-task", { expirationTime: "1h" }); + * ``` + */ +async function createBatchTriggerPublicToken( + task: string | string[], + options?: CreateTriggerTokenOptions +): Promise { + const apiClient = apiClientManager.clientOrThrow(); + + const claims = await apiClient.generateJWTClaims(); + + return await internal_generateJWT({ + secretKey: apiClient.accessToken, + payload: { + ...claims, + otu: typeof options?.multipleUse === "boolean" ? !options.multipleUse : true, + scopes: flattenScopes({ + batchTrigger: { + tasks: task, + }, + }), + }, + expirationTime: options?.expirationTime, + }); +} + +/** + * Executes a function with a one-time use token that allows triggering a specific task. + * + * @param task - The task ID or an array of task IDs that the token should allow triggering. + * @param options - Options for creating the one-time use token. + * @param fn - The asynchronous function to be executed with the one-time use token. + */ +async function withBatchTriggerPublicToken( + task: string | string[], + options: CreateTriggerTokenOptions = {}, + fn: () => Promise +) { + const token = await createBatchTriggerPublicToken(task, options); + + await withAuth({ accessToken: token }, fn); +} + /** * Executes a provided asynchronous function with a specified API client configuration. * diff --git a/references/v3-catalog/src/management.ts b/references/v3-catalog/src/management.ts index fc0694b382..efa486573a 100644 --- a/references/v3-catalog/src/management.ts +++ b/references/v3-catalog/src/management.ts @@ -1,4 +1,4 @@ -import { configure, envvars, runs, schedules, batch } from "@trigger.dev/sdk/v3"; +import { configure, envvars, runs, schedules, batch, auth } from "@trigger.dev/sdk/v3"; import dotenv from "dotenv"; import { unfriendlyIdTask } from "./trigger/other.js"; import { spamRateLimiter, taskThatErrors } from "./trigger/retries.js"; @@ -287,10 +287,138 @@ async function doRescheduleRun() { console.log("rescheduled run", rescheduledRun); } +async function doOneTimeUseTrigger() { + console.log("Testing with one-time use token"); + + try { + await auth.withTriggerPublicToken(simpleChildTask.id, {}, async () => { + const run1 = await simpleChildTask.trigger({ message: "Hello, World!" }); + + console.log("run1", run1); + + const run2 = await simpleChildTask.trigger({ message: "Hello, World!" }); + + console.log("run2", run2); + }); + } catch (error) { + console.error(error); + } + + console.log("Testing with deprecated public token"); + + try { + await auth.withPublicToken( + { + scopes: { + write: { + tasks: simpleChildTask.id, + }, + }, + }, + async () => { + const run1 = await simpleChildTask.trigger({ message: "Hello, World!" }); + } + ); + } catch (error) { + console.error(error); + } + + console.log("Testing with trigger public token"); + + try { + await auth.withPublicToken( + { + scopes: { + trigger: { + tasks: simpleChildTask.id, + }, + }, + }, + async () => { + const run1 = await simpleChildTask.trigger({ message: "Hello, World!" }); + + console.log("run1", run1); + + const run2 = await simpleChildTask.trigger({ message: "Hello, World!" }); + + console.log("run2", run2); + } + ); + } catch (error) { + console.error(error); + } + + console.log("Testing with a one-time use token for the wrong task"); + + try { + await auth.withTriggerPublicToken("wrong-task-id", {}, async () => { + const run1 = await simpleChildTask.trigger({ message: "Hello, World!" }); + + console.log("run1", run1); + }); + } catch (error) { + console.error(error); + } + + console.log("Testing with a public token for the wrong task"); + + try { + await auth.withPublicToken( + { + scopes: { + write: { + tasks: "wrong-task-id", + }, + }, + }, + async () => { + const run1 = await simpleChildTask.trigger({ message: "Hello, World!" }); + + console.log("run1", run1); + } + ); + } catch (error) { + console.error(error); + } + + console.log("Testing batch trigger with one-time use token"); + + try { + await auth.withBatchTriggerPublicToken(simpleChildTask.id, {}, async () => { + const batch1 = await batch.triggerByTask([ + { task: simpleChildTask, payload: { message: "Hello, World!" } }, + ]); + + console.log("batch1", batch1); + + const batch2 = await batch.triggerByTask([ + { task: simpleChildTask, payload: { message: "Hello, World!" } }, + ]); + }); + } catch (error) { + console.error(error); + } + + console.log("Testing batch trigger with a trigger token"); + + try { + await auth.withTriggerPublicToken(simpleChildTask.id, {}, async () => { + const batch1 = await batch.triggerByTask([ + { task: simpleChildTask, payload: { message: "Hello, World!" } }, + ]); + + console.log("batch1", batch1); + }); + } catch (error) { + console.error(error); + } +} + // doRuns().catch(console.error); // doListRuns().catch(console.error); // doScheduleLists().catch(console.error); // doBatchTrigger().catch(console.error); // doEnvVars().catch(console.error); // doTriggerUnfriendlyTaskId().catch(console.error); -doRescheduleRun().catch(console.error); +// doRescheduleRun().catch(console.error); +doOneTimeUseTrigger().catch(console.error);