Skip to content

Commit 6ff0b92

Browse files
authored
New internal idempotency implementation for trigger and batch trigger (#2256)
* Introduce request idempotency to prevent duplicate triggers * Implement request idempotency on trigger * Use x-trigger-request-idempotency-key header instead Plus cleanup shared logic in the routes * Add changeset * Oops, lets not hardcode a 408 * A couple of improvements
1 parent 1a64013 commit 6ff0b92

16 files changed

+428
-24
lines changed

.changeset/clean-beans-compete.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
---
4+
5+
New internal idempotency implementation for trigger and batch trigger to prevent request retries from duplicating work

apps/webapp/app/env.server.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -932,6 +932,44 @@ const EnvironmentSchema = z.object({
932932
TRIGGER_CLI_TAG: z.string().default("latest"),
933933

934934
HEALTHCHECK_DATABASE_DISABLED: z.string().default("0"),
935+
936+
REQUEST_IDEMPOTENCY_REDIS_HOST: z
937+
.string()
938+
.optional()
939+
.transform((v) => v ?? process.env.REDIS_HOST),
940+
REQUEST_IDEMPOTENCY_REDIS_READER_HOST: z
941+
.string()
942+
.optional()
943+
.transform((v) => v ?? process.env.REDIS_READER_HOST),
944+
REQUEST_IDEMPOTENCY_REDIS_READER_PORT: z.coerce
945+
.number()
946+
.optional()
947+
.transform(
948+
(v) =>
949+
v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined)
950+
),
951+
REQUEST_IDEMPOTENCY_REDIS_PORT: z.coerce
952+
.number()
953+
.optional()
954+
.transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)),
955+
REQUEST_IDEMPOTENCY_REDIS_USERNAME: z
956+
.string()
957+
.optional()
958+
.transform((v) => v ?? process.env.REDIS_USERNAME),
959+
REQUEST_IDEMPOTENCY_REDIS_PASSWORD: z
960+
.string()
961+
.optional()
962+
.transform((v) => v ?? process.env.REDIS_PASSWORD),
963+
REQUEST_IDEMPOTENCY_REDIS_TLS_DISABLED: z
964+
.string()
965+
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
966+
967+
REQUEST_IDEMPOTENCY_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
968+
969+
REQUEST_IDEMPOTENCY_TTL_IN_MS: z.coerce
970+
.number()
971+
.int()
972+
.default(60_000 * 60 * 24),
935973
});
936974

937975
export type Environment = z.infer<typeof EnvironmentSchema>;

apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,17 @@ import {
66
} from "@trigger.dev/core/v3";
77
import { TaskRun } from "@trigger.dev/database";
88
import { z } from "zod";
9+
import { prisma } from "~/db.server";
910
import { env } from "~/env.server";
1011
import { EngineServiceValidationError } from "~/runEngine/concerns/errors";
11-
import {
12-
ApiAuthenticationResultSuccess,
13-
AuthenticatedEnvironment,
14-
getOneTimeUseToken,
15-
} from "~/services/apiAuth.server";
12+
import { ApiAuthenticationResultSuccess, getOneTimeUseToken } from "~/services/apiAuth.server";
1613
import { logger } from "~/services/logger.server";
1714
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
1815
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
16+
import {
17+
handleRequestIdempotency,
18+
saveRequestIdempotency,
19+
} from "~/utils/requestIdempotency.server";
1920
import { ServiceValidationError } from "~/v3/services/baseService.server";
2021
import { OutOfEntitlementError, TriggerTaskService } from "~/v3/services/triggerTask.server";
2122

@@ -31,6 +32,7 @@ export const HeadersSchema = z.object({
3132
"x-trigger-worker": z.string().nullish(),
3233
"x-trigger-client": z.string().nullish(),
3334
"x-trigger-engine-version": RunEngineVersionSchema.nullish(),
35+
"x-trigger-request-idempotency-key": z.string().nullish(),
3436
traceparent: z.string().optional(),
3537
tracestate: z.string().optional(),
3638
});
@@ -60,8 +62,34 @@ const { action, loader } = createActionApiRoute(
6062
"x-trigger-worker": isFromWorker,
6163
"x-trigger-client": triggerClient,
6264
"x-trigger-engine-version": engineVersion,
65+
"x-trigger-request-idempotency-key": requestIdempotencyKey,
6366
} = headers;
6467

68+
const cachedResponse = await handleRequestIdempotency(requestIdempotencyKey, {
69+
requestType: "trigger",
70+
findCachedEntity: async (cachedRequestId) => {
71+
return await prisma.taskRun.findFirst({
72+
where: {
73+
id: cachedRequestId,
74+
},
75+
select: {
76+
friendlyId: true,
77+
},
78+
});
79+
},
80+
buildResponse: (cachedRun) => ({
81+
id: cachedRun.friendlyId,
82+
isCached: false,
83+
}),
84+
buildResponseHeaders: async (responseBody, cachedEntity) => {
85+
return await responseHeaders(cachedEntity, authentication, triggerClient);
86+
},
87+
});
88+
89+
if (cachedResponse) {
90+
return cachedResponse;
91+
}
92+
6593
const service = new TriggerTaskService();
6694

6795
try {
@@ -104,6 +132,8 @@ const { action, loader } = createActionApiRoute(
104132
return json({ error: "Task not found" }, { status: 404 });
105133
}
106134

135+
await saveRequestIdempotency(requestIdempotencyKey, "trigger", result.run.id);
136+
107137
const $responseHeaders = await responseHeaders(result.run, authentication, triggerClient);
108138

109139
return json(
@@ -113,6 +143,7 @@ const { action, loader } = createActionApiRoute(
113143
},
114144
{
115145
headers: $responseHeaders,
146+
status: 200,
116147
}
117148
);
118149
} catch (error) {
@@ -132,7 +163,7 @@ const { action, loader } = createActionApiRoute(
132163
);
133164

134165
async function responseHeaders(
135-
run: TaskRun,
166+
run: Pick<TaskRun, "friendlyId">,
136167
authentication: ApiAuthenticationResultSuccess,
137168
triggerClient?: string | null
138169
): Promise<Record<string, string>> {

apps/webapp/app/routes/api.v2.tasks.batch.ts

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,20 @@ import {
44
BatchTriggerTaskV3Response,
55
generateJWT,
66
} from "@trigger.dev/core/v3";
7+
import { prisma } from "~/db.server";
78
import { env } from "~/env.server";
9+
import { RunEngineBatchTriggerService } from "~/runEngine/services/batchTrigger.server";
810
import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server";
911
import { logger } from "~/services/logger.server";
1012
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
13+
import {
14+
handleRequestIdempotency,
15+
saveRequestIdempotency,
16+
} from "~/utils/requestIdempotency.server";
1117
import { ServiceValidationError } from "~/v3/services/baseService.server";
1218
import { BatchProcessingStrategy } from "~/v3/services/batchTriggerV3.server";
1319
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
1420
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
15-
import { RunEngineBatchTriggerService } from "~/runEngine/services/batchTrigger.server";
1621

1722
const { action, loader } = createActionApiRoute(
1823
{
@@ -53,6 +58,7 @@ const { action, loader } = createActionApiRoute(
5358
"x-trigger-client": triggerClient,
5459
"x-trigger-engine-version": engineVersion,
5560
"batch-processing-strategy": batchProcessingStrategy,
61+
"x-trigger-request-idempotency-key": requestIdempotencyKey,
5662
traceparent,
5763
tracestate,
5864
} = headers;
@@ -67,15 +73,47 @@ const { action, loader } = createActionApiRoute(
6773
traceparent,
6874
tracestate,
6975
batchProcessingStrategy,
76+
requestIdempotencyKey,
77+
});
78+
79+
const cachedResponse = await handleRequestIdempotency(requestIdempotencyKey, {
80+
requestType: "batch-trigger",
81+
findCachedEntity: async (cachedRequestId) => {
82+
return await prisma.batchTaskRun.findFirst({
83+
where: {
84+
id: cachedRequestId,
85+
runtimeEnvironmentId: authentication.environment.id,
86+
},
87+
select: {
88+
friendlyId: true,
89+
runCount: true,
90+
},
91+
});
92+
},
93+
buildResponse: (cachedBatch) => ({
94+
id: cachedBatch.friendlyId,
95+
runCount: cachedBatch.runCount,
96+
}),
97+
buildResponseHeaders: async (responseBody, cachedEntity) => {
98+
return await responseHeaders(responseBody, authentication.environment, triggerClient);
99+
},
70100
});
71101

102+
if (cachedResponse) {
103+
return cachedResponse;
104+
}
105+
72106
const traceContext =
73107
traceparent && isFromWorker // If the request is from a worker, we should pass the trace context
74108
? { traceparent, tracestate }
75109
: undefined;
76110

77111
const service = new RunEngineBatchTriggerService(batchProcessingStrategy ?? undefined);
78112

113+
service.onBatchTaskRunCreated.attachOnce(async (batch) => {
114+
await saveRequestIdempotency(requestIdempotencyKey, "batch-trigger", batch.id);
115+
});
116+
79117
try {
80118
const batch = await service.call(authentication.environment, body, {
81119
triggerVersion: triggerVersion ?? undefined,
@@ -90,7 +128,10 @@ const { action, loader } = createActionApiRoute(
90128
triggerClient
91129
);
92130

93-
return json(batch, { status: 202, headers: $responseHeaders });
131+
return json(batch, {
132+
status: 202,
133+
headers: $responseHeaders,
134+
});
94135
} catch (error) {
95136
logger.error("Batch trigger error", {
96137
error: {

apps/webapp/app/runEngine/services/batchTrigger.server.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@ import {
88
} from "@trigger.dev/core/v3";
99
import { BatchId, RunId } from "@trigger.dev/core/v3/isomorphic";
1010
import { BatchTaskRun, Prisma } from "@trigger.dev/database";
11+
import { Evt } from "evt";
1112
import { z } from "zod";
12-
import { $transaction, prisma, PrismaClientOrTransaction } from "~/db.server";
13+
import { prisma, PrismaClientOrTransaction } from "~/db.server";
1314
import { env } from "~/env.server";
1415
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1516
import { logger } from "~/services/logger.server";
@@ -51,6 +52,7 @@ export type BatchTriggerTaskServiceOptions = {
5152
*/
5253
export class RunEngineBatchTriggerService extends WithRunEngine {
5354
private _batchProcessingStrategy: BatchProcessingStrategy;
55+
public onBatchTaskRunCreated: Evt<BatchTaskRun> = new Evt();
5456

5557
constructor(
5658
batchProcessingStrategy?: BatchProcessingStrategy,
@@ -168,6 +170,8 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
168170
},
169171
});
170172

173+
this.onBatchTaskRunCreated.post(batch);
174+
171175
if (body.parentRunId && body.resumeParentOnCompletion) {
172176
await this._engine.blockRunWithCreatedBatch({
173177
runId: RunId.fromFriendlyId(body.parentRunId),
@@ -259,6 +263,8 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
259263
},
260264
});
261265

266+
this.onBatchTaskRunCreated.post(batch);
267+
262268
if (body.parentRunId && body.resumeParentOnCompletion) {
263269
await this._engine.blockRunWithCreatedBatch({
264270
runId: RunId.fromFriendlyId(body.parentRunId),
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
import { Logger, LogLevel } from "@trigger.dev/core/logger";
2+
import { createCache, DefaultStatefulContext, Namespace, Cache as UnkeyCache } from "@unkey/cache";
3+
import { MemoryStore } from "@unkey/cache/stores";
4+
import { RedisCacheStore } from "./unkey/redisCacheStore.server";
5+
import { RedisWithClusterOptions } from "~/redis.server";
6+
import { validate as uuidValidate, version as uuidVersion } from "uuid";
7+
import { startActiveSpan } from "~/v3/tracer.server";
8+
9+
export type RequestIdempotencyServiceOptions<TTypes extends string> = {
10+
types: TTypes[];
11+
redis: RedisWithClusterOptions;
12+
logger?: Logger;
13+
logLevel?: LogLevel;
14+
ttlInMs?: number;
15+
};
16+
17+
const DEFAULT_TTL_IN_MS = 60_000 * 60 * 24;
18+
19+
type RequestIdempotencyCacheEntry = {
20+
id: string;
21+
};
22+
23+
export class RequestIdempotencyService<TTypes extends string> {
24+
private readonly logger: Logger;
25+
private readonly cache: UnkeyCache<{ requests: RequestIdempotencyCacheEntry }>;
26+
27+
constructor(private readonly options: RequestIdempotencyServiceOptions<TTypes>) {
28+
this.logger =
29+
options.logger ?? new Logger("RequestIdempotencyService", options.logLevel ?? "info");
30+
31+
const keyPrefix = options.redis.keyPrefix
32+
? `request-idempotency:${options.redis.keyPrefix}`
33+
: "request-idempotency:";
34+
35+
const ctx = new DefaultStatefulContext();
36+
const memory = new MemoryStore({ persistentMap: new Map() });
37+
const redisCacheStore = new RedisCacheStore({
38+
name: "request-idempotency",
39+
connection: {
40+
keyPrefix: keyPrefix,
41+
...options.redis,
42+
},
43+
});
44+
45+
// This cache holds the rate limit configuration for each org, so we don't have to fetch it every request
46+
const cache = createCache({
47+
requests: new Namespace<RequestIdempotencyCacheEntry>(ctx, {
48+
stores: [memory, redisCacheStore],
49+
fresh: options.ttlInMs ?? DEFAULT_TTL_IN_MS,
50+
stale: options.ttlInMs ?? DEFAULT_TTL_IN_MS,
51+
}),
52+
});
53+
54+
this.cache = cache;
55+
}
56+
57+
async checkRequest(type: TTypes, requestIdempotencyKey: string) {
58+
if (!this.#validateRequestId(requestIdempotencyKey)) {
59+
this.logger.warn("RequestIdempotency: invalid requestIdempotencyKey", {
60+
requestIdempotencyKey,
61+
});
62+
63+
return undefined;
64+
}
65+
66+
return startActiveSpan("RequestIdempotency.checkRequest()", async (span) => {
67+
span.setAttribute("request_id", requestIdempotencyKey);
68+
span.setAttribute("type", type);
69+
70+
const key = `${type}:${requestIdempotencyKey}`;
71+
const result = await this.cache.requests.get(key);
72+
73+
this.logger.debug("RequestIdempotency: checking request", {
74+
type,
75+
requestIdempotencyKey,
76+
key,
77+
result,
78+
});
79+
80+
return result.val ? result.val : undefined;
81+
});
82+
}
83+
84+
async saveRequest(
85+
type: TTypes,
86+
requestIdempotencyKey: string,
87+
value: RequestIdempotencyCacheEntry
88+
) {
89+
if (!this.#validateRequestId(requestIdempotencyKey)) {
90+
this.logger.warn("RequestIdempotency: invalid requestIdempotencyKey", {
91+
requestIdempotencyKey,
92+
});
93+
return undefined;
94+
}
95+
96+
const key = `${type}:${requestIdempotencyKey}`;
97+
const result = await this.cache.requests.set(key, value);
98+
99+
if (result.err) {
100+
this.logger.error("RequestIdempotency: error saving request", {
101+
key,
102+
error: result.err,
103+
});
104+
} else {
105+
this.logger.debug("RequestIdempotency: saved request", {
106+
type,
107+
requestIdempotencyKey,
108+
key,
109+
value,
110+
});
111+
}
112+
113+
return result;
114+
}
115+
116+
// The requestIdempotencyKey should be a valid UUID
117+
#validateRequestId(requestIdempotencyKey: string): boolean {
118+
return isValidV4UUID(requestIdempotencyKey);
119+
}
120+
}
121+
122+
function isValidV4UUID(uuid: string): boolean {
123+
return uuidValidate(uuid) && uuidVersion(uuid) === 4;
124+
}

0 commit comments

Comments
 (0)