Skip to content

Commit 0419fe8

Browse files
committed
Introduce request idempotency to prevent duplicate triggers
1 parent 1a64013 commit 0419fe8

File tree

11 files changed

+261
-12
lines changed

11 files changed

+261
-12
lines changed

apps/webapp/app/env.server.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -932,6 +932,44 @@ const EnvironmentSchema = z.object({
932932
TRIGGER_CLI_TAG: z.string().default("latest"),
933933

934934
HEALTHCHECK_DATABASE_DISABLED: z.string().default("0"),
935+
936+
REQUEST_IDEMPOTENCY_REDIS_HOST: z
937+
.string()
938+
.optional()
939+
.transform((v) => v ?? process.env.REDIS_HOST),
940+
REQUEST_IDEMPOTENCY_REDIS_READER_HOST: z
941+
.string()
942+
.optional()
943+
.transform((v) => v ?? process.env.REDIS_READER_HOST),
944+
REQUEST_IDEMPOTENCY_REDIS_READER_PORT: z.coerce
945+
.number()
946+
.optional()
947+
.transform(
948+
(v) =>
949+
v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined)
950+
),
951+
REQUEST_IDEMPOTENCY_REDIS_PORT: z.coerce
952+
.number()
953+
.optional()
954+
.transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)),
955+
REQUEST_IDEMPOTENCY_REDIS_USERNAME: z
956+
.string()
957+
.optional()
958+
.transform((v) => v ?? process.env.REDIS_USERNAME),
959+
REQUEST_IDEMPOTENCY_REDIS_PASSWORD: z
960+
.string()
961+
.optional()
962+
.transform((v) => v ?? process.env.REDIS_PASSWORD),
963+
REQUEST_IDEMPOTENCY_REDIS_TLS_DISABLED: z
964+
.string()
965+
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
966+
967+
REQUEST_IDEMPOTENCY_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
968+
969+
REQUEST_IDEMPOTENCY_TTL_IN_MS: z.coerce
970+
.number()
971+
.int()
972+
.default(60_000 * 60 * 24),
935973
});
936974

937975
export type Environment = z.infer<typeof EnvironmentSchema>;

apps/webapp/app/routes/api.v2.tasks.batch.ts

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,15 @@ import { BatchProcessingStrategy } from "~/v3/services/batchTriggerV3.server";
1313
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
1414
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
1515
import { RunEngineBatchTriggerService } from "~/runEngine/services/batchTrigger.server";
16+
import { z } from "zod";
17+
import { requestIdempotency } from "~/services/requestIdempotencyInstance.server";
18+
import { prisma } from "~/db.server";
1619

1720
const { action, loader } = createActionApiRoute(
1821
{
1922
headers: HeadersSchema.extend({
2023
"batch-processing-strategy": BatchProcessingStrategy.nullish(),
24+
"x-trigger-request-id": z.string().nullish(),
2125
}),
2226
body: BatchTriggerTaskV3RequestBody,
2327
allowJWT: true,
@@ -53,6 +57,7 @@ const { action, loader } = createActionApiRoute(
5357
"x-trigger-client": triggerClient,
5458
"x-trigger-engine-version": engineVersion,
5559
"batch-processing-strategy": batchProcessingStrategy,
60+
"x-trigger-request-id": requestId,
5661
traceparent,
5762
tracestate,
5863
} = headers;
@@ -67,15 +72,72 @@ const { action, loader } = createActionApiRoute(
6772
traceparent,
6873
tracestate,
6974
batchProcessingStrategy,
75+
requestId,
7076
});
7177

78+
if (requestId) {
79+
logger.debug("request-idempotency: checking for cached batch-trigger request", {
80+
requestId,
81+
});
82+
83+
const cachedRequest = await requestIdempotency.checkRequest("batch-trigger", requestId);
84+
85+
if (cachedRequest) {
86+
logger.info("request-idempotency: found cached batch-trigger request", {
87+
requestId,
88+
cachedRequest,
89+
});
90+
91+
// Find the batch and return it as a 200 response
92+
const cachedBatch = await prisma.batchTaskRun.findFirst({
93+
where: {
94+
id: cachedRequest.id,
95+
runtimeEnvironmentId: authentication.environment.id,
96+
},
97+
select: {
98+
friendlyId: true,
99+
runCount: true,
100+
},
101+
});
102+
103+
if (cachedBatch) {
104+
logger.info("request-idempotency: found cached batch-trigger request", {
105+
requestId,
106+
cachedRequest,
107+
cachedBatch,
108+
});
109+
110+
const responseBody = {
111+
id: cachedBatch.friendlyId,
112+
runCount: cachedBatch.runCount,
113+
};
114+
115+
const $responseHeaders = await responseHeaders(
116+
responseBody,
117+
authentication.environment,
118+
triggerClient
119+
);
120+
121+
return json(responseBody, { status: 200, headers: $responseHeaders });
122+
}
123+
}
124+
}
125+
72126
const traceContext =
73127
traceparent && isFromWorker // If the request is from a worker, we should pass the trace context
74128
? { traceparent, tracestate }
75129
: undefined;
76130

77131
const service = new RunEngineBatchTriggerService(batchProcessingStrategy ?? undefined);
78132

133+
service.onBatchTaskRunCreated.attachOnce(async (batch) => {
134+
if (requestId) {
135+
await requestIdempotency.saveRequest("batch-trigger", requestId, {
136+
id: batch.id,
137+
});
138+
}
139+
});
140+
79141
try {
80142
const batch = await service.call(authentication.environment, body, {
81143
triggerVersion: triggerVersion ?? undefined,
@@ -90,7 +152,10 @@ const { action, loader } = createActionApiRoute(
90152
triggerClient
91153
);
92154

93-
return json(batch, { status: 202, headers: $responseHeaders });
155+
return json(batch, {
156+
status: 202,
157+
headers: $responseHeaders,
158+
});
94159
} catch (error) {
95160
logger.error("Batch trigger error", {
96161
error: {

apps/webapp/app/runEngine/services/batchTrigger.server.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@ import {
88
} from "@trigger.dev/core/v3";
99
import { BatchId, RunId } from "@trigger.dev/core/v3/isomorphic";
1010
import { BatchTaskRun, Prisma } from "@trigger.dev/database";
11+
import { Evt } from "evt";
1112
import { z } from "zod";
12-
import { $transaction, prisma, PrismaClientOrTransaction } from "~/db.server";
13+
import { prisma, PrismaClientOrTransaction } from "~/db.server";
1314
import { env } from "~/env.server";
1415
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1516
import { logger } from "~/services/logger.server";
@@ -51,6 +52,7 @@ export type BatchTriggerTaskServiceOptions = {
5152
*/
5253
export class RunEngineBatchTriggerService extends WithRunEngine {
5354
private _batchProcessingStrategy: BatchProcessingStrategy;
55+
public onBatchTaskRunCreated: Evt<BatchTaskRun> = new Evt();
5456

5557
constructor(
5658
batchProcessingStrategy?: BatchProcessingStrategy,
@@ -168,6 +170,8 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
168170
},
169171
});
170172

173+
this.onBatchTaskRunCreated.post(batch);
174+
171175
if (body.parentRunId && body.resumeParentOnCompletion) {
172176
await this._engine.blockRunWithCreatedBatch({
173177
runId: RunId.fromFriendlyId(body.parentRunId),
@@ -259,6 +263,8 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
259263
},
260264
});
261265

266+
this.onBatchTaskRunCreated.post(batch);
267+
262268
if (body.parentRunId && body.resumeParentOnCompletion) {
263269
await this._engine.blockRunWithCreatedBatch({
264270
runId: RunId.fromFriendlyId(body.parentRunId),
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import { Logger, LogLevel } from "@trigger.dev/core/logger";
2+
import { createCache, DefaultStatefulContext, Namespace, Cache as UnkeyCache } from "@unkey/cache";
3+
import { MemoryStore } from "@unkey/cache/stores";
4+
import { RedisCacheStore } from "./unkey/redisCacheStore.server";
5+
import { RedisWithClusterOptions } from "~/redis.server";
6+
7+
export type RequestIdempotencyServiceOptions<TTypes extends string> = {
8+
types: TTypes[];
9+
redis: RedisWithClusterOptions;
10+
logger?: Logger;
11+
logLevel?: LogLevel;
12+
ttlInMs?: number;
13+
};
14+
15+
const DEFAULT_TTL_IN_MS = 60_000 * 60 * 24;
16+
17+
type RequestIdempotencyCacheEntry = {
18+
id: string;
19+
};
20+
21+
export class RequestIdempotencyService<TTypes extends string> {
22+
private readonly logger: Logger;
23+
private readonly cache: UnkeyCache<{ requests: RequestIdempotencyCacheEntry }>;
24+
25+
constructor(private readonly options: RequestIdempotencyServiceOptions<TTypes>) {
26+
this.logger =
27+
options.logger ?? new Logger("RequestIdempotencyService", options.logLevel ?? "info");
28+
29+
const keyPrefix = options.redis.keyPrefix
30+
? `request-idempotency:${options.redis.keyPrefix}`
31+
: "request-idempotency:";
32+
33+
const ctx = new DefaultStatefulContext();
34+
const memory = new MemoryStore({ persistentMap: new Map() });
35+
const redisCacheStore = new RedisCacheStore({
36+
name: "request-idempotency",
37+
connection: {
38+
keyPrefix: keyPrefix,
39+
...options.redis,
40+
},
41+
});
42+
43+
// This cache holds the rate limit configuration for each org, so we don't have to fetch it every request
44+
const cache = createCache({
45+
requests: new Namespace<RequestIdempotencyCacheEntry>(ctx, {
46+
stores: [memory, redisCacheStore],
47+
fresh: options.ttlInMs ?? DEFAULT_TTL_IN_MS,
48+
stale: options.ttlInMs ?? DEFAULT_TTL_IN_MS,
49+
}),
50+
});
51+
52+
this.cache = cache;
53+
}
54+
55+
async checkRequest(type: TTypes, requestId: string) {
56+
const key = `${type}:${requestId}`;
57+
const result = await this.cache.requests.get(key);
58+
59+
this.logger.debug("RequestIdempotency: checking request", {
60+
type,
61+
requestId,
62+
key,
63+
result,
64+
});
65+
66+
return result.val ? result.val : undefined;
67+
}
68+
69+
async saveRequest(type: TTypes, requestId: string, value: RequestIdempotencyCacheEntry) {
70+
const key = `${type}:${requestId}`;
71+
const result = await this.cache.requests.set(key, value);
72+
73+
if (result.err) {
74+
this.logger.error("RequestIdempotency: error saving request", {
75+
key,
76+
error: result.err,
77+
});
78+
} else {
79+
this.logger.debug("RequestIdempotency: saved request", {
80+
type,
81+
requestId,
82+
key,
83+
value,
84+
});
85+
}
86+
87+
return result;
88+
}
89+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { singleton } from "~/utils/singleton";
2+
import { RequestIdempotencyService } from "./requestIdempotency.server";
3+
import { env } from "~/env.server";
4+
5+
export const requestIdempotency = singleton("requestIdempotency", createRequestIdempotencyInstance);
6+
7+
function createRequestIdempotencyInstance() {
8+
return new RequestIdempotencyService({
9+
redis: {
10+
keyPrefix: "request-idempotency:",
11+
port: env.REQUEST_IDEMPOTENCY_REDIS_PORT ?? undefined,
12+
host: env.REQUEST_IDEMPOTENCY_REDIS_HOST ?? undefined,
13+
username: env.REQUEST_IDEMPOTENCY_REDIS_USERNAME ?? undefined,
14+
password: env.REQUEST_IDEMPOTENCY_REDIS_PASSWORD ?? undefined,
15+
tlsDisabled: env.REQUEST_IDEMPOTENCY_REDIS_TLS_DISABLED === "true",
16+
clusterMode: false,
17+
},
18+
logLevel: env.REQUEST_IDEMPOTENCY_LOG_LEVEL,
19+
ttlInMs: env.REQUEST_IDEMPOTENCY_TTL_IN_MS,
20+
types: ["batch-trigger", "trigger"],
21+
});
22+
}

apps/webapp/app/services/unkey/redisCacheStore.server.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { createRedisClient, RedisClient, RedisWithClusterOptions } from "~/redis
55

66
export type RedisCacheStoreConfig = {
77
connection: RedisWithClusterOptions;
8+
name?: string;
89
};
910

1011
export class RedisCacheStore<TNamespace extends string, TValue = any>
@@ -14,7 +15,7 @@ export class RedisCacheStore<TNamespace extends string, TValue = any>
1415
private readonly redis: RedisClient;
1516

1617
constructor(config: RedisCacheStoreConfig) {
17-
this.redis = createRedisClient("trigger:cacheStore", config.connection);
18+
this.redis = createRedisClient(config.name ?? "trigger:cacheStore", config.connection);
1819
}
1920

2021
private buildCacheKey(namespace: TNamespace, key: string): string {

packages/core/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@
195195
"std-env": "^3.8.1",
196196
"superjson": "^2.2.1",
197197
"tinyexec": "^0.3.2",
198+
"uncrypto": "^0.1.3",
198199
"zod": "3.23.8",
199200
"zod-error": "1.5.0",
200201
"zod-validation-error": "^1.5.0"

packages/core/src/v3/apiClient/core.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
OffsetLimitPageResponse,
1919
} from "./pagination.js";
2020
import { EventSource, type ErrorEvent } from "eventsource";
21+
import { randomUUID } from "../utils/crypto.js";
2122

2223
export const defaultRetryOptions = {
2324
maxAttempts: 3,
@@ -200,7 +201,10 @@ async function _doZodFetch<TResponseBodySchema extends z.ZodTypeAny>(
200201
let $requestInit = await requestInit;
201202

202203
return traceZodFetch({ url, requestInit: $requestInit, options }, async (span) => {
204+
const requestId = await randomUUID();
205+
203206
$requestInit = injectPropagationHeadersIfInWorker($requestInit);
207+
$requestInit = injectRequestId(requestId, $requestInit);
204208

205209
const result = await _doZodFetchWithRetries(schema, url, $requestInit, options);
206210

@@ -627,6 +631,17 @@ function injectPropagationHeadersIfInWorker(requestInit?: RequestInit): RequestI
627631
};
628632
}
629633

634+
function injectRequestId(requestId: string, requestInit?: RequestInit): RequestInit | undefined {
635+
const headers = new Headers(requestInit?.headers);
636+
637+
headers.set("x-trigger-request-id", requestId);
638+
639+
return {
640+
...requestInit,
641+
headers,
642+
};
643+
}
644+
630645
export type ZodFetchSSEMessageValueSchema<
631646
TDiscriminatedUnion extends z.ZodDiscriminatedUnion<any, any>,
632647
> = z.ZodFirstPartySchemaTypes | TDiscriminatedUnion;

packages/core/src/v3/idempotencyKeys.ts

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { taskContext } from "./task-context-api.js";
22
import { IdempotencyKey } from "./types/idempotencyKeys.js";
3+
import { digestSHA256 } from "./utils/crypto.js";
34

45
export function isIdempotencyKey(
56
value: string | string[] | IdempotencyKey
@@ -115,15 +116,7 @@ function injectScope(scope: "run" | "attempt" | "global"): string[] {
115116
}
116117

117118
async function generateIdempotencyKey(keyMaterial: string[]) {
118-
const hash = await crypto.subtle.digest(
119-
"SHA-256",
120-
new TextEncoder().encode(keyMaterial.join("-"))
121-
);
122-
123-
// Return a hex string, using cross-runtime compatible methods
124-
return Array.from(new Uint8Array(hash))
125-
.map((byte) => byte.toString(16).padStart(2, "0"))
126-
.join("");
119+
return await digestSHA256(keyMaterial.join("-"));
127120
}
128121

129122
type AttemptKeyMaterial = {

0 commit comments

Comments
 (0)