diff --git a/.changeset/red-rings-marry.md b/.changeset/red-rings-marry.md new file mode 100644 index 0000000000..87fb25647a --- /dev/null +++ b/.changeset/red-rings-marry.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/sdk": patch +--- + +Provide realtime skipColumns option via untamperable public access tokens diff --git a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts index 3d7a4c1a20..744dbb3189 100644 --- a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts +++ b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts @@ -8,7 +8,11 @@ import { TaskRun } from "@trigger.dev/database"; import { z } from "zod"; import { env } from "~/env.server"; import { EngineServiceValidationError } from "~/runEngine/concerns/errors"; -import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server"; +import { + ApiAuthenticationResultSuccess, + AuthenticatedEnvironment, + getOneTimeUseToken, +} from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; @@ -100,11 +104,7 @@ const { action, loader } = createActionApiRoute( return json({ error: "Task not found" }, { status: 404 }); } - const $responseHeaders = await responseHeaders( - result.run, - authentication.environment, - triggerClient - ); + const $responseHeaders = await responseHeaders(result.run, authentication, triggerClient); return json( { @@ -133,12 +133,15 @@ const { action, loader } = createActionApiRoute( async function responseHeaders( run: TaskRun, - environment: AuthenticatedEnvironment, + authentication: ApiAuthenticationResultSuccess, triggerClient?: string | null ): Promise> { + const { environment, realtime } = authentication; + const claimsHeader = JSON.stringify({ sub: environment.id, pub: true, + realtime, }); if (triggerClient === "browser") { @@ -146,6 +149,7 @@ async function responseHeaders( sub: environment.id, pub: true, scopes: [`read:runs:${run.friendlyId}`], + realtime, }; const jwt = await internal_generateJWT({ diff --git a/apps/webapp/app/routes/realtime.v1.batches.$batchId.ts b/apps/webapp/app/routes/realtime.v1.batches.$batchId.ts index cb7a891721..8cbc4d2b7e 100644 --- a/apps/webapp/app/routes/realtime.v1.batches.$batchId.ts +++ b/apps/webapp/app/routes/realtime.v1.batches.$batchId.ts @@ -31,6 +31,7 @@ export const loader = createLoaderApiRoute( request.url, authentication.environment, batchRun.id, + authentication.realtime, request.headers.get("x-trigger-electric-version") ?? undefined ); } diff --git a/apps/webapp/app/routes/realtime.v1.runs.$runId.ts b/apps/webapp/app/routes/realtime.v1.runs.$runId.ts index 412a1ac85b..935b4b5b05 100644 --- a/apps/webapp/app/routes/realtime.v1.runs.$runId.ts +++ b/apps/webapp/app/routes/realtime.v1.runs.$runId.ts @@ -44,6 +44,7 @@ export const loader = createLoaderApiRoute( request.url, authentication.environment, run.id, + authentication.realtime, request.headers.get("x-trigger-electric-version") ?? undefined ); } diff --git a/apps/webapp/app/routes/realtime.v1.runs.ts b/apps/webapp/app/routes/realtime.v1.runs.ts index b1d4fba17b..51fb48f8dd 100644 --- a/apps/webapp/app/routes/realtime.v1.runs.ts +++ b/apps/webapp/app/routes/realtime.v1.runs.ts @@ -29,6 +29,7 @@ export const loader = createLoaderApiRoute( request.url, authentication.environment, searchParams, + authentication.realtime, request.headers.get("x-trigger-electric-version") ?? undefined ); } diff --git a/apps/webapp/app/services/apiAuth.server.ts b/apps/webapp/app/services/apiAuth.server.ts index bed78809b6..3d839ba440 100644 --- a/apps/webapp/app/services/apiAuth.server.ts +++ b/apps/webapp/app/services/apiAuth.server.ts @@ -23,6 +23,11 @@ const ClaimsSchema = z.object({ scopes: z.array(z.string()).optional(), // One-time use token otu: z.boolean().optional(), + realtime: z + .object({ + skipColumns: z.array(z.string()).optional(), + }) + .optional(), }); type Optional = Prettify & Partial>>; @@ -43,6 +48,9 @@ export type ApiAuthenticationResultSuccess = { environment: AuthenticatedEnvironment; scopes?: string[]; oneTimeUse?: boolean; + realtime?: { + skipColumns?: string[]; + }; }; export type ApiAuthenticationResultFailure = { @@ -151,6 +159,7 @@ export async function authenticateApiKey( environment: validationResults.environment, scopes: parsedClaims.success ? parsedClaims.data.scopes : [], oneTimeUse: parsedClaims.success ? parsedClaims.data.otu : false, + realtime: parsedClaims.success ? parsedClaims.data.realtime : undefined, }; } } @@ -233,6 +242,7 @@ async function authenticateApiKeyWithFailure( environment: validationResults.environment, scopes: parsedClaims.success ? parsedClaims.data.scopes : [], oneTimeUse: parsedClaims.success ? parsedClaims.data.otu : false, + realtime: parsedClaims.success ? parsedClaims.data.realtime : undefined, }; } } diff --git a/apps/webapp/app/services/realtimeClient.server.ts b/apps/webapp/app/services/realtimeClient.server.ts index c1e2ee59ed..f6df277189 100644 --- a/apps/webapp/app/services/realtimeClient.server.ts +++ b/apps/webapp/app/services/realtimeClient.server.ts @@ -65,6 +65,10 @@ export type RealtimeRunsParams = { createdAt?: string; }; +export type RealtimeRequestOptions = { + skipColumns?: string[]; +}; + export class RealtimeClient { private redis: RedisClient; private expiryTimeInSeconds: number; @@ -124,15 +128,17 @@ export class RealtimeClient { url: URL | string, environment: RealtimeEnvironment, runId: string, + requestOptions?: RealtimeRequestOptions, clientVersion?: string ) { - return this.#streamRunsWhere(url, environment, `id='${runId}'`, clientVersion); + return this.#streamRunsWhere(url, environment, `id='${runId}'`, requestOptions, clientVersion); } async streamBatch( url: URL | string, environment: RealtimeEnvironment, batchId: string, + requestOptions?: RealtimeRequestOptions, clientVersion?: string ) { const whereClauses: string[] = [ @@ -142,13 +148,14 @@ export class RealtimeClient { const whereClause = whereClauses.join(" AND "); - return this.#streamRunsWhere(url, environment, whereClause, clientVersion); + return this.#streamRunsWhere(url, environment, whereClause, requestOptions, clientVersion); } async streamRuns( url: URL | string, environment: RealtimeEnvironment, params: RealtimeRunsParams, + requestOptions?: RealtimeRequestOptions, clientVersion?: string ) { const whereClauses: string[] = [`"runtimeEnvironmentId"='${environment.id}'`]; @@ -165,7 +172,13 @@ export class RealtimeClient { const whereClause = whereClauses.join(" AND "); - const response = await this.#streamRunsWhere(url, environment, whereClause, clientVersion); + const response = await this.#streamRunsWhere( + url, + environment, + whereClause, + requestOptions, + clientVersion + ); if (createdAtFilter) { const [setCreatedAtFilterError] = await tryCatch( @@ -256,12 +269,14 @@ export class RealtimeClient { url: URL | string, environment: RealtimeEnvironment, whereClause: string, + requestOptions?: RealtimeRequestOptions, clientVersion?: string ) { const electricUrl = this.#constructRunsElectricUrl( url, environment, whereClause, + requestOptions, clientVersion ); @@ -272,6 +287,7 @@ export class RealtimeClient { url: URL | string, environment: RealtimeEnvironment, whereClause: string, + requestOptions?: RealtimeRequestOptions, clientVersion?: string ): URL { const $url = new URL(url.toString()); @@ -297,13 +313,10 @@ export class RealtimeClient { electricUrl.searchParams.set("handle", electricUrl.searchParams.get("shape_id") ?? ""); } - const skipColumnsRaw = $url.searchParams.get("skipColumns"); + let skipColumns = getSkipColumns($url.searchParams, requestOptions); - if (skipColumnsRaw) { - const skipColumns = skipColumnsRaw - .split(",") - .map((c) => c.trim()) - .filter((c) => c !== "" && !RESERVED_COLUMNS.includes(c)); + if (skipColumns.length > 0) { + skipColumns = skipColumns.filter((c) => c !== "" && !RESERVED_COLUMNS.includes(c)); electricUrl.searchParams.set( "columns", @@ -543,3 +556,17 @@ declare module "ioredis" { ): Result; } } + +function getSkipColumns(searchParams: URLSearchParams, requestOptions?: RealtimeRequestOptions) { + if (requestOptions?.skipColumns) { + return requestOptions.skipColumns; + } + + const skipColumnsRaw = searchParams.get("skipColumns"); + + if (skipColumnsRaw) { + return skipColumnsRaw.split(",").map((c) => c.trim()); + } + + return []; +} diff --git a/apps/webapp/test/realtimeClient.test.ts b/apps/webapp/test/realtimeClient.test.ts index 5cfa8c39d9..6a94a31dee 100644 --- a/apps/webapp/test/realtimeClient.test.ts +++ b/apps/webapp/test/realtimeClient.test.ts @@ -72,6 +72,7 @@ describe.skipIf(process.env.GITHUB_ACTIONS)("RealtimeClient", () => { "http://localhost:3000?offset=-1", environment, run.id, + {}, "0.8.1" ); @@ -81,6 +82,7 @@ describe.skipIf(process.env.GITHUB_ACTIONS)("RealtimeClient", () => { "http://localhost:3000?offset=-1", environment, run.id, + {}, "0.8.1" ); @@ -108,6 +110,7 @@ describe.skipIf(process.env.GITHUB_ACTIONS)("RealtimeClient", () => { `http://localhost:3000?offset=0_0&live=true&handle=${shapeId}`, environment, run.id, + {}, "0.8.1" ); @@ -117,6 +120,7 @@ describe.skipIf(process.env.GITHUB_ACTIONS)("RealtimeClient", () => { `http://localhost:3000?offset=0_0&live=true&handle=${shapeId}`, environment, run.id, + {}, "0.8.1" ); @@ -217,6 +221,7 @@ describe.skipIf(process.env.GITHUB_ACTIONS)("RealtimeClient", () => { { tags: ["test:tag:1234"], }, + {}, "0.8.1" ); @@ -307,6 +312,7 @@ describe.skipIf(process.env.GITHUB_ACTIONS)("RealtimeClient", () => { "http://localhost:3000?offset=-1", environment, run.id, + {}, "0.8.1" ); diff --git a/packages/trigger-sdk/src/v3/auth.ts b/packages/trigger-sdk/src/v3/auth.ts index 1cfcb08741..ddcf92569a 100644 --- a/packages/trigger-sdk/src/v3/auth.ts +++ b/packages/trigger-sdk/src/v3/auth.ts @@ -1,4 +1,8 @@ -import { type ApiClientConfiguration, apiClientManager } from "@trigger.dev/core/v3"; +import { + type ApiClientConfiguration, + apiClientManager, + RealtimeRunSkipColumns, +} from "@trigger.dev/core/v3"; import { generateJWT as internal_generateJWT } from "@trigger.dev/core/v3"; /** @@ -106,6 +110,24 @@ export type CreatePublicTokenOptions = { * ``` */ expirationTime?: number | Date | string; + + realtime?: { + /** + * Skip columns from the subscription. + * + * @default [] + * + * @example + * ```ts + * auth.createPublicToken({ + * realtime: { + * skipColumns: ["payload", "output"] + * } + * }); + * ``` + */ + skipColumns?: RealtimeRunSkipColumns; + }; }; /** @@ -114,6 +136,8 @@ export type CreatePublicTokenOptions = { * @param options - Optional parameters for creating the public token. * @param options.scopes - An array of permission scopes to be included in the token. * @param options.expirationTime - The expiration time for the token. + * @param options.realtime - Options for realtime subscriptions. + * @param options.realtime.skipColumns - Skip columns from the subscription. * @returns A promise that resolves to a string representing the generated public token. * * @example @@ -139,6 +163,7 @@ async function createPublicToken(options?: CreatePublicTokenOptions): Promise