diff --git a/.changeset/clean-beans-compete.md b/.changeset/clean-beans-compete.md new file mode 100644 index 0000000000..520ca16c81 --- /dev/null +++ b/.changeset/clean-beans-compete.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/sdk": patch +--- + +New internal idempotency implementation for trigger and batch trigger to prevent request retries from duplicating work diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index e0f7859044..7934bf1a5e 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -932,6 +932,44 @@ const EnvironmentSchema = z.object({ TRIGGER_CLI_TAG: z.string().default("latest"), HEALTHCHECK_DATABASE_DISABLED: z.string().default("0"), + + REQUEST_IDEMPOTENCY_REDIS_HOST: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_HOST), + REQUEST_IDEMPOTENCY_REDIS_READER_HOST: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_READER_HOST), + REQUEST_IDEMPOTENCY_REDIS_READER_PORT: z.coerce + .number() + .optional() + .transform( + (v) => + v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined) + ), + REQUEST_IDEMPOTENCY_REDIS_PORT: z.coerce + .number() + .optional() + .transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)), + REQUEST_IDEMPOTENCY_REDIS_USERNAME: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_USERNAME), + REQUEST_IDEMPOTENCY_REDIS_PASSWORD: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_PASSWORD), + REQUEST_IDEMPOTENCY_REDIS_TLS_DISABLED: z + .string() + .default(process.env.REDIS_TLS_DISABLED ?? "false"), + + REQUEST_IDEMPOTENCY_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"), + + REQUEST_IDEMPOTENCY_TTL_IN_MS: z.coerce + .number() + .int() + .default(60_000 * 60 * 24), }); export type Environment = z.infer; diff --git a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts index 744dbb3189..a5e4e16225 100644 --- a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts +++ b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts @@ -6,16 +6,17 @@ import { } from "@trigger.dev/core/v3"; import { TaskRun } from "@trigger.dev/database"; import { z } from "zod"; +import { prisma } from "~/db.server"; import { env } from "~/env.server"; import { EngineServiceValidationError } from "~/runEngine/concerns/errors"; -import { - ApiAuthenticationResultSuccess, - AuthenticatedEnvironment, - getOneTimeUseToken, -} from "~/services/apiAuth.server"; +import { ApiAuthenticationResultSuccess, getOneTimeUseToken } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; +import { + handleRequestIdempotency, + saveRequestIdempotency, +} from "~/utils/requestIdempotency.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { OutOfEntitlementError, TriggerTaskService } from "~/v3/services/triggerTask.server"; @@ -31,6 +32,7 @@ export const HeadersSchema = z.object({ "x-trigger-worker": z.string().nullish(), "x-trigger-client": z.string().nullish(), "x-trigger-engine-version": RunEngineVersionSchema.nullish(), + "x-trigger-request-idempotency-key": z.string().nullish(), traceparent: z.string().optional(), tracestate: z.string().optional(), }); @@ -60,8 +62,34 @@ const { action, loader } = createActionApiRoute( "x-trigger-worker": isFromWorker, "x-trigger-client": triggerClient, "x-trigger-engine-version": engineVersion, + "x-trigger-request-idempotency-key": requestIdempotencyKey, } = headers; + const cachedResponse = await handleRequestIdempotency(requestIdempotencyKey, { + requestType: "trigger", + findCachedEntity: async (cachedRequestId) => { + return await prisma.taskRun.findFirst({ + where: { + id: cachedRequestId, + }, + select: { + friendlyId: true, + }, + }); + }, + buildResponse: (cachedRun) => ({ + id: cachedRun.friendlyId, + isCached: false, + }), + buildResponseHeaders: async (responseBody, cachedEntity) => { + return await responseHeaders(cachedEntity, authentication, triggerClient); + }, + }); + + if (cachedResponse) { + return cachedResponse; + } + const service = new TriggerTaskService(); try { @@ -104,6 +132,8 @@ const { action, loader } = createActionApiRoute( return json({ error: "Task not found" }, { status: 404 }); } + await saveRequestIdempotency(requestIdempotencyKey, "trigger", result.run.id); + const $responseHeaders = await responseHeaders(result.run, authentication, triggerClient); return json( @@ -113,6 +143,7 @@ const { action, loader } = createActionApiRoute( }, { headers: $responseHeaders, + status: 200, } ); } catch (error) { @@ -132,7 +163,7 @@ const { action, loader } = createActionApiRoute( ); async function responseHeaders( - run: TaskRun, + run: Pick, authentication: ApiAuthenticationResultSuccess, triggerClient?: string | null ): Promise> { diff --git a/apps/webapp/app/routes/api.v2.tasks.batch.ts b/apps/webapp/app/routes/api.v2.tasks.batch.ts index 62732ff7ad..ba03919d49 100644 --- a/apps/webapp/app/routes/api.v2.tasks.batch.ts +++ b/apps/webapp/app/routes/api.v2.tasks.batch.ts @@ -4,15 +4,20 @@ import { BatchTriggerTaskV3Response, generateJWT, } from "@trigger.dev/core/v3"; +import { prisma } from "~/db.server"; import { env } from "~/env.server"; +import { RunEngineBatchTriggerService } from "~/runEngine/services/batchTrigger.server"; import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; +import { + handleRequestIdempotency, + saveRequestIdempotency, +} from "~/utils/requestIdempotency.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { BatchProcessingStrategy } from "~/v3/services/batchTriggerV3.server"; import { OutOfEntitlementError } from "~/v3/services/triggerTask.server"; import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger"; -import { RunEngineBatchTriggerService } from "~/runEngine/services/batchTrigger.server"; const { action, loader } = createActionApiRoute( { @@ -53,6 +58,7 @@ const { action, loader } = createActionApiRoute( "x-trigger-client": triggerClient, "x-trigger-engine-version": engineVersion, "batch-processing-strategy": batchProcessingStrategy, + "x-trigger-request-idempotency-key": requestIdempotencyKey, traceparent, tracestate, } = headers; @@ -67,8 +73,36 @@ const { action, loader } = createActionApiRoute( traceparent, tracestate, batchProcessingStrategy, + requestIdempotencyKey, + }); + + const cachedResponse = await handleRequestIdempotency(requestIdempotencyKey, { + requestType: "batch-trigger", + findCachedEntity: async (cachedRequestId) => { + return await prisma.batchTaskRun.findFirst({ + where: { + id: cachedRequestId, + runtimeEnvironmentId: authentication.environment.id, + }, + select: { + friendlyId: true, + runCount: true, + }, + }); + }, + buildResponse: (cachedBatch) => ({ + id: cachedBatch.friendlyId, + runCount: cachedBatch.runCount, + }), + buildResponseHeaders: async (responseBody, cachedEntity) => { + return await responseHeaders(responseBody, authentication.environment, triggerClient); + }, }); + if (cachedResponse) { + return cachedResponse; + } + const traceContext = traceparent && isFromWorker // If the request is from a worker, we should pass the trace context ? { traceparent, tracestate } @@ -76,6 +110,10 @@ const { action, loader } = createActionApiRoute( const service = new RunEngineBatchTriggerService(batchProcessingStrategy ?? undefined); + service.onBatchTaskRunCreated.attachOnce(async (batch) => { + await saveRequestIdempotency(requestIdempotencyKey, "batch-trigger", batch.id); + }); + try { const batch = await service.call(authentication.environment, body, { triggerVersion: triggerVersion ?? undefined, @@ -90,7 +128,10 @@ const { action, loader } = createActionApiRoute( triggerClient ); - return json(batch, { status: 202, headers: $responseHeaders }); + return json(batch, { + status: 202, + headers: $responseHeaders, + }); } catch (error) { logger.error("Batch trigger error", { error: { diff --git a/apps/webapp/app/runEngine/services/batchTrigger.server.ts b/apps/webapp/app/runEngine/services/batchTrigger.server.ts index 9eb6fed0bf..2a476e5a1e 100644 --- a/apps/webapp/app/runEngine/services/batchTrigger.server.ts +++ b/apps/webapp/app/runEngine/services/batchTrigger.server.ts @@ -8,8 +8,9 @@ import { } from "@trigger.dev/core/v3"; import { BatchId, RunId } from "@trigger.dev/core/v3/isomorphic"; import { BatchTaskRun, Prisma } from "@trigger.dev/database"; +import { Evt } from "evt"; import { z } from "zod"; -import { $transaction, prisma, PrismaClientOrTransaction } from "~/db.server"; +import { prisma, PrismaClientOrTransaction } from "~/db.server"; import { env } from "~/env.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; @@ -51,6 +52,7 @@ export type BatchTriggerTaskServiceOptions = { */ export class RunEngineBatchTriggerService extends WithRunEngine { private _batchProcessingStrategy: BatchProcessingStrategy; + public onBatchTaskRunCreated: Evt = new Evt(); constructor( batchProcessingStrategy?: BatchProcessingStrategy, @@ -168,6 +170,8 @@ export class RunEngineBatchTriggerService extends WithRunEngine { }, }); + this.onBatchTaskRunCreated.post(batch); + if (body.parentRunId && body.resumeParentOnCompletion) { await this._engine.blockRunWithCreatedBatch({ runId: RunId.fromFriendlyId(body.parentRunId), @@ -259,6 +263,8 @@ export class RunEngineBatchTriggerService extends WithRunEngine { }, }); + this.onBatchTaskRunCreated.post(batch); + if (body.parentRunId && body.resumeParentOnCompletion) { await this._engine.blockRunWithCreatedBatch({ runId: RunId.fromFriendlyId(body.parentRunId), diff --git a/apps/webapp/app/services/requestIdempotency.server.ts b/apps/webapp/app/services/requestIdempotency.server.ts new file mode 100644 index 0000000000..64e839fbca --- /dev/null +++ b/apps/webapp/app/services/requestIdempotency.server.ts @@ -0,0 +1,124 @@ +import { Logger, LogLevel } from "@trigger.dev/core/logger"; +import { createCache, DefaultStatefulContext, Namespace, Cache as UnkeyCache } from "@unkey/cache"; +import { MemoryStore } from "@unkey/cache/stores"; +import { RedisCacheStore } from "./unkey/redisCacheStore.server"; +import { RedisWithClusterOptions } from "~/redis.server"; +import { validate as uuidValidate, version as uuidVersion } from "uuid"; +import { startActiveSpan } from "~/v3/tracer.server"; + +export type RequestIdempotencyServiceOptions = { + types: TTypes[]; + redis: RedisWithClusterOptions; + logger?: Logger; + logLevel?: LogLevel; + ttlInMs?: number; +}; + +const DEFAULT_TTL_IN_MS = 60_000 * 60 * 24; + +type RequestIdempotencyCacheEntry = { + id: string; +}; + +export class RequestIdempotencyService { + private readonly logger: Logger; + private readonly cache: UnkeyCache<{ requests: RequestIdempotencyCacheEntry }>; + + constructor(private readonly options: RequestIdempotencyServiceOptions) { + this.logger = + options.logger ?? new Logger("RequestIdempotencyService", options.logLevel ?? "info"); + + const keyPrefix = options.redis.keyPrefix + ? `request-idempotency:${options.redis.keyPrefix}` + : "request-idempotency:"; + + const ctx = new DefaultStatefulContext(); + const memory = new MemoryStore({ persistentMap: new Map() }); + const redisCacheStore = new RedisCacheStore({ + name: "request-idempotency", + connection: { + keyPrefix: keyPrefix, + ...options.redis, + }, + }); + + // This cache holds the rate limit configuration for each org, so we don't have to fetch it every request + const cache = createCache({ + requests: new Namespace(ctx, { + stores: [memory, redisCacheStore], + fresh: options.ttlInMs ?? DEFAULT_TTL_IN_MS, + stale: options.ttlInMs ?? DEFAULT_TTL_IN_MS, + }), + }); + + this.cache = cache; + } + + async checkRequest(type: TTypes, requestIdempotencyKey: string) { + if (!this.#validateRequestId(requestIdempotencyKey)) { + this.logger.warn("RequestIdempotency: invalid requestIdempotencyKey", { + requestIdempotencyKey, + }); + + return undefined; + } + + return startActiveSpan("RequestIdempotency.checkRequest()", async (span) => { + span.setAttribute("request_id", requestIdempotencyKey); + span.setAttribute("type", type); + + const key = `${type}:${requestIdempotencyKey}`; + const result = await this.cache.requests.get(key); + + this.logger.debug("RequestIdempotency: checking request", { + type, + requestIdempotencyKey, + key, + result, + }); + + return result.val ? result.val : undefined; + }); + } + + async saveRequest( + type: TTypes, + requestIdempotencyKey: string, + value: RequestIdempotencyCacheEntry + ) { + if (!this.#validateRequestId(requestIdempotencyKey)) { + this.logger.warn("RequestIdempotency: invalid requestIdempotencyKey", { + requestIdempotencyKey, + }); + return undefined; + } + + const key = `${type}:${requestIdempotencyKey}`; + const result = await this.cache.requests.set(key, value); + + if (result.err) { + this.logger.error("RequestIdempotency: error saving request", { + key, + error: result.err, + }); + } else { + this.logger.debug("RequestIdempotency: saved request", { + type, + requestIdempotencyKey, + key, + value, + }); + } + + return result; + } + + // The requestIdempotencyKey should be a valid UUID + #validateRequestId(requestIdempotencyKey: string): boolean { + return isValidV4UUID(requestIdempotencyKey); + } +} + +function isValidV4UUID(uuid: string): boolean { + return uuidValidate(uuid) && uuidVersion(uuid) === 4; +} diff --git a/apps/webapp/app/services/requestIdempotencyInstance.server.ts b/apps/webapp/app/services/requestIdempotencyInstance.server.ts new file mode 100644 index 0000000000..1429ed50b0 --- /dev/null +++ b/apps/webapp/app/services/requestIdempotencyInstance.server.ts @@ -0,0 +1,22 @@ +import { singleton } from "~/utils/singleton"; +import { RequestIdempotencyService } from "./requestIdempotency.server"; +import { env } from "~/env.server"; + +export const requestIdempotency = singleton("requestIdempotency", createRequestIdempotencyInstance); + +function createRequestIdempotencyInstance() { + return new RequestIdempotencyService({ + redis: { + keyPrefix: "request-idempotency:", + port: env.REQUEST_IDEMPOTENCY_REDIS_PORT ?? undefined, + host: env.REQUEST_IDEMPOTENCY_REDIS_HOST ?? undefined, + username: env.REQUEST_IDEMPOTENCY_REDIS_USERNAME ?? undefined, + password: env.REQUEST_IDEMPOTENCY_REDIS_PASSWORD ?? undefined, + tlsDisabled: env.REQUEST_IDEMPOTENCY_REDIS_TLS_DISABLED === "true", + clusterMode: false, + }, + logLevel: env.REQUEST_IDEMPOTENCY_LOG_LEVEL, + ttlInMs: env.REQUEST_IDEMPOTENCY_TTL_IN_MS, + types: ["batch-trigger", "trigger"], + }); +} diff --git a/apps/webapp/app/services/unkey/redisCacheStore.server.ts b/apps/webapp/app/services/unkey/redisCacheStore.server.ts index 459b6d8e27..25bbb27634 100644 --- a/apps/webapp/app/services/unkey/redisCacheStore.server.ts +++ b/apps/webapp/app/services/unkey/redisCacheStore.server.ts @@ -5,6 +5,7 @@ import { createRedisClient, RedisClient, RedisWithClusterOptions } from "~/redis export type RedisCacheStoreConfig = { connection: RedisWithClusterOptions; + name?: string; }; export class RedisCacheStore @@ -14,7 +15,7 @@ export class RedisCacheStore private readonly redis: RedisClient; constructor(config: RedisCacheStoreConfig) { - this.redis = createRedisClient("trigger:cacheStore", config.connection); + this.redis = createRedisClient(config.name ?? "trigger:cacheStore", config.connection); } private buildCacheKey(namespace: TNamespace, key: string): string { diff --git a/apps/webapp/app/utils/requestIdempotency.server.ts b/apps/webapp/app/utils/requestIdempotency.server.ts new file mode 100644 index 0000000000..9f03c7adf2 --- /dev/null +++ b/apps/webapp/app/utils/requestIdempotency.server.ts @@ -0,0 +1,96 @@ +import { json } from "@remix-run/server-runtime"; +import { tryCatch } from "@trigger.dev/core/utils"; +import { logger } from "~/services/logger.server"; +import { requestIdempotency } from "~/services/requestIdempotencyInstance.server"; +import { startActiveSpan } from "~/v3/tracer.server"; + +type RequestIdempotencyType = "batch-trigger" | "trigger"; + +export type IdempotencyConfig = { + requestType: RequestIdempotencyType; + findCachedEntity: (cachedRequestId: string) => Promise; + buildResponse: (entity: T) => R; + buildResponseHeaders: (response: R, entity: T) => Promise>; +}; + +export async function handleRequestIdempotency( + requestIdempotencyKey: string | null | undefined, + config: IdempotencyConfig +): Promise { + if (!requestIdempotencyKey) { + return null; + } + + logger.debug(`request-idempotency: checking for cached ${config.requestType} request`, { + requestIdempotencyKey, + }); + + return startActiveSpan("RequestIdempotency.handle()", async (span) => { + span.setAttribute("request_idempotency_key", requestIdempotencyKey); + + const cachedRequest = await requestIdempotency.checkRequest( + config.requestType, + requestIdempotencyKey + ); + + if (!cachedRequest) { + span.setAttribute("cached_request", false); + + return null; + } + + span.setAttribute("cached_request", true); + span.setAttribute("cached_entity_id", cachedRequest.id); + + logger.info(`request-idempotency: found cached ${config.requestType} request`, { + requestIdempotencyKey, + cachedRequest, + }); + + const cachedEntity = await config.findCachedEntity(cachedRequest.id); + + if (!cachedEntity) { + span.setAttribute("cached_entity", false); + + return null; + } + + span.setAttribute("cached_entity", true); + + logger.info(`request-idempotency: found cached ${config.requestType} entity`, { + requestIdempotencyKey, + cachedRequest, + cachedEntity, + }); + + const responseBody = config.buildResponse(cachedEntity); + const responseHeaders = await config.buildResponseHeaders(responseBody, cachedEntity); + + return json(responseBody, { status: 200, headers: responseHeaders }); + }); +} + +export async function saveRequestIdempotency( + requestIdempotencyKey: string | null | undefined, + requestType: RequestIdempotencyType, + entityId: string +): Promise { + if (!requestIdempotencyKey) { + return; + } + + const [error] = await tryCatch( + requestIdempotency.saveRequest(requestType, requestIdempotencyKey, { + id: entityId, + }) + ); + + if (error) { + logger.error("request-idempotency: error saving request", { + error, + requestIdempotencyKey, + requestType, + entityId, + }); + } +} diff --git a/apps/webapp/app/v3/services/triggerTask.server.ts b/apps/webapp/app/v3/services/triggerTask.server.ts index 954c271faa..292c580fc8 100644 --- a/apps/webapp/app/v3/services/triggerTask.server.ts +++ b/apps/webapp/app/v3/services/triggerTask.server.ts @@ -1,9 +1,12 @@ import { TriggerTaskRequestBody } from "@trigger.dev/core/v3"; import { RunEngineVersion, TaskRun } from "@trigger.dev/database"; +import { env } from "~/env.server"; import { IdempotencyKeyConcern } from "~/runEngine/concerns/idempotencyKeys.server"; import { DefaultPayloadProcessor } from "~/runEngine/concerns/payloads.server"; import { DefaultQueueManager } from "~/runEngine/concerns/queues.server"; +import { DefaultRunChainStateManager } from "~/runEngine/concerns/runChainStates.server"; import { DefaultRunNumberIncrementer } from "~/runEngine/concerns/runNumbers.server"; +import { DefaultTraceEventsConcern } from "~/runEngine/concerns/traceEvents.server"; import { RunEngineTriggerTaskService } from "~/runEngine/services/triggerTask.server"; import { DefaultTriggerTaskValidator } from "~/runEngine/validators/triggerTaskValidator"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; @@ -12,9 +15,6 @@ import { eventRepository } from "../eventRepository.server"; import { tracer } from "../tracer.server"; import { WithRunEngine } from "./baseService.server"; import { TriggerTaskServiceV1 } from "./triggerTaskV1.server"; -import { DefaultTraceEventsConcern } from "~/runEngine/concerns/traceEvents.server"; -import { DefaultRunChainStateManager } from "~/runEngine/concerns/runChainStates.server"; -import { env } from "~/env.server"; export type TriggerTaskServiceOptions = { idempotencyKey?: string; @@ -113,6 +113,7 @@ export class TriggerTaskService extends WithRunEngine { ), tracer: tracer, }); + return await service.call({ taskId, environment, diff --git a/apps/webapp/package.json b/apps/webapp/package.json index 371ad770cc..0dbfffc7e2 100644 --- a/apps/webapp/package.json +++ b/apps/webapp/package.json @@ -198,6 +198,7 @@ "tiny-invariant": "^1.2.0", "ulid": "^2.3.0", "ulidx": "^2.2.1", + "uuid": "^9.0.0", "ws": "^8.11.0", "zod": "3.23.8", "zod-error": "1.5.0", @@ -239,6 +240,7 @@ "@types/slug": "^5.0.3", "@types/supertest": "^6.0.2", "@types/tar": "^6.1.4", + "@types/uuid": "^9.0.0", "@types/ws": "^8.5.3", "@typescript-eslint/eslint-plugin": "^5.59.6", "@typescript-eslint/parser": "^5.59.6", diff --git a/packages/core/package.json b/packages/core/package.json index a4b6455d95..b58dcb4982 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -195,6 +195,7 @@ "std-env": "^3.8.1", "superjson": "^2.2.1", "tinyexec": "^0.3.2", + "uncrypto": "^0.1.3", "zod": "3.23.8", "zod-error": "1.5.0", "zod-validation-error": "^1.5.0" diff --git a/packages/core/src/v3/apiClient/core.ts b/packages/core/src/v3/apiClient/core.ts index 5f1016cb6a..ec1fe8421e 100644 --- a/packages/core/src/v3/apiClient/core.ts +++ b/packages/core/src/v3/apiClient/core.ts @@ -18,6 +18,7 @@ import { OffsetLimitPageResponse, } from "./pagination.js"; import { EventSource, type ErrorEvent } from "eventsource"; +import { randomUUID } from "../utils/crypto.js"; export const defaultRetryOptions = { maxAttempts: 3, @@ -200,7 +201,10 @@ async function _doZodFetch( let $requestInit = await requestInit; return traceZodFetch({ url, requestInit: $requestInit, options }, async (span) => { + const requestIdempotencyKey = await randomUUID(); + $requestInit = injectPropagationHeadersIfInWorker($requestInit); + $requestInit = injectRequestIdempotencyKey(requestIdempotencyKey, $requestInit); const result = await _doZodFetchWithRetries(schema, url, $requestInit, options); @@ -627,6 +631,20 @@ function injectPropagationHeadersIfInWorker(requestInit?: RequestInit): RequestI }; } +function injectRequestIdempotencyKey( + requestIdempotencyKey: string, + requestInit?: RequestInit +): RequestInit | undefined { + const headers = new Headers(requestInit?.headers); + + headers.set("x-trigger-request-idempotency-key", requestIdempotencyKey); + + return { + ...requestInit, + headers, + }; +} + export type ZodFetchSSEMessageValueSchema< TDiscriminatedUnion extends z.ZodDiscriminatedUnion, > = z.ZodFirstPartySchemaTypes | TDiscriminatedUnion; diff --git a/packages/core/src/v3/idempotencyKeys.ts b/packages/core/src/v3/idempotencyKeys.ts index c3778d42a7..e19c1cfca0 100644 --- a/packages/core/src/v3/idempotencyKeys.ts +++ b/packages/core/src/v3/idempotencyKeys.ts @@ -1,5 +1,6 @@ import { taskContext } from "./task-context-api.js"; import { IdempotencyKey } from "./types/idempotencyKeys.js"; +import { digestSHA256 } from "./utils/crypto.js"; export function isIdempotencyKey( value: string | string[] | IdempotencyKey @@ -115,15 +116,7 @@ function injectScope(scope: "run" | "attempt" | "global"): string[] { } async function generateIdempotencyKey(keyMaterial: string[]) { - const hash = await crypto.subtle.digest( - "SHA-256", - new TextEncoder().encode(keyMaterial.join("-")) - ); - - // Return a hex string, using cross-runtime compatible methods - return Array.from(new Uint8Array(hash)) - .map((byte) => byte.toString(16).padStart(2, "0")) - .join(""); + return await digestSHA256(keyMaterial.join("-")); } type AttemptKeyMaterial = { diff --git a/packages/core/src/v3/utils/crypto.ts b/packages/core/src/v3/utils/crypto.ts new file mode 100644 index 0000000000..03e0b40773 --- /dev/null +++ b/packages/core/src/v3/utils/crypto.ts @@ -0,0 +1,16 @@ +export async function randomUUID(): Promise { + const { randomUUID } = await import("uncrypto"); + + return randomUUID(); +} + +export async function digestSHA256(data: string): Promise { + const { subtle } = await import("uncrypto"); + + const hash = await subtle.digest("SHA-256", new TextEncoder().encode(data)); + + // Return a hex string, using cross-runtime compatible methods + return Array.from(new Uint8Array(hash)) + .map((byte) => byte.toString(16).padStart(2, "0")) + .join(""); +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 02e2b9d771..a5cdef7b3e 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -689,6 +689,9 @@ importers: ulidx: specifier: ^2.2.1 version: 2.2.1 + uuid: + specifier: ^9.0.0 + version: 9.0.1 ws: specifier: ^8.11.0 version: 8.12.0 @@ -807,6 +810,9 @@ importers: '@types/tar': specifier: ^6.1.4 version: 6.1.4 + '@types/uuid': + specifier: ^9.0.0 + version: 9.0.0 '@types/ws': specifier: ^8.5.3 version: 8.5.4 @@ -1549,6 +1555,9 @@ importers: tinyexec: specifier: ^0.3.2 version: 0.3.2 + uncrypto: + specifier: ^0.1.3 + version: 0.1.3 zod: specifier: 3.23.8 version: 3.23.8 @@ -16878,7 +16887,7 @@ packages: dependencies: '@sentry/bundler-plugin-core': 2.22.2 unplugin: 1.0.1 - uuid: 9.0.0 + uuid: 9.0.1 transitivePeerDependencies: - encoding - supports-color @@ -35143,7 +35152,7 @@ packages: sqlite3: 5.1.7 ts-node: 10.9.2(@types/node@20.14.14)(typescript@5.5.4) tslib: 2.6.2 - uuid: 9.0.0 + uuid: 9.0.1 yargs: 17.7.2 transitivePeerDependencies: - supports-color