Skip to content

Commit 84eeeae

Browse files
committed
Implement request idempotency on trigger
1 parent 0419fe8 commit 84eeeae

File tree

6 files changed

+93
-4
lines changed

6 files changed

+93
-4
lines changed

apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
} from "@trigger.dev/core/v3";
77
import { TaskRun } from "@trigger.dev/database";
88
import { z } from "zod";
9+
import { prisma } from "~/db.server";
910
import { env } from "~/env.server";
1011
import { EngineServiceValidationError } from "~/runEngine/concerns/errors";
1112
import {
@@ -14,6 +15,7 @@ import {
1415
getOneTimeUseToken,
1516
} from "~/services/apiAuth.server";
1617
import { logger } from "~/services/logger.server";
18+
import { requestIdempotency } from "~/services/requestIdempotencyInstance.server";
1719
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
1820
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
1921
import { ServiceValidationError } from "~/v3/services/baseService.server";
@@ -31,6 +33,7 @@ export const HeadersSchema = z.object({
3133
"x-trigger-worker": z.string().nullish(),
3234
"x-trigger-client": z.string().nullish(),
3335
"x-trigger-engine-version": RunEngineVersionSchema.nullish(),
36+
"x-trigger-request-id": z.string().nullish(),
3437
traceparent: z.string().optional(),
3538
tracestate: z.string().optional(),
3639
});
@@ -60,8 +63,53 @@ const { action, loader } = createActionApiRoute(
6063
"x-trigger-worker": isFromWorker,
6164
"x-trigger-client": triggerClient,
6265
"x-trigger-engine-version": engineVersion,
66+
"x-trigger-request-id": requestId,
6367
} = headers;
6468

69+
if (requestId) {
70+
logger.debug("request-idempotency: checking for cached trigger request", {
71+
requestId,
72+
});
73+
74+
const cachedRequest = await requestIdempotency.checkRequest("trigger", requestId);
75+
76+
if (cachedRequest) {
77+
logger.info("request-idempotency: found cached trigger request", {
78+
requestId,
79+
cachedRequest,
80+
});
81+
82+
const cachedRun = await prisma.taskRun.findFirst({
83+
where: {
84+
id: cachedRequest.id,
85+
},
86+
select: {
87+
friendlyId: true,
88+
},
89+
});
90+
91+
if (cachedRun) {
92+
logger.info("request-idempotency: found cached trigger run", {
93+
requestId,
94+
cachedRun,
95+
});
96+
97+
const $responseHeaders = await responseHeaders(cachedRun, authentication, triggerClient);
98+
99+
return json(
100+
{
101+
id: cachedRun.friendlyId,
102+
isCached: false,
103+
},
104+
{
105+
headers: $responseHeaders,
106+
status: 200,
107+
}
108+
);
109+
}
110+
}
111+
}
112+
65113
const service = new TriggerTaskService();
66114

67115
try {
@@ -104,6 +152,12 @@ const { action, loader } = createActionApiRoute(
104152
return json({ error: "Task not found" }, { status: 404 });
105153
}
106154

155+
if (requestId) {
156+
await requestIdempotency.saveRequest("trigger", requestId, {
157+
id: result.run.id,
158+
});
159+
}
160+
107161
const $responseHeaders = await responseHeaders(result.run, authentication, triggerClient);
108162

109163
return json(
@@ -113,6 +167,7 @@ const { action, loader } = createActionApiRoute(
113167
},
114168
{
115169
headers: $responseHeaders,
170+
status: 200,
116171
}
117172
);
118173
} catch (error) {
@@ -132,7 +187,7 @@ const { action, loader } = createActionApiRoute(
132187
);
133188

134189
async function responseHeaders(
135-
run: TaskRun,
190+
run: Pick<TaskRun, "friendlyId">,
136191
authentication: ApiAuthenticationResultSuccess,
137192
triggerClient?: string | null
138193
): Promise<Record<string, string>> {

apps/webapp/app/routes/api.v2.tasks.batch.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ const { action, loader } = createActionApiRoute(
2121
{
2222
headers: HeadersSchema.extend({
2323
"batch-processing-strategy": BatchProcessingStrategy.nullish(),
24-
"x-trigger-request-id": z.string().nullish(),
2524
}),
2625
body: BatchTriggerTaskV3RequestBody,
2726
allowJWT: true,

apps/webapp/app/services/requestIdempotency.server.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { createCache, DefaultStatefulContext, Namespace, Cache as UnkeyCache } f
33
import { MemoryStore } from "@unkey/cache/stores";
44
import { RedisCacheStore } from "./unkey/redisCacheStore.server";
55
import { RedisWithClusterOptions } from "~/redis.server";
6+
import { validate as uuidValidate, version as uuidVersion } from "uuid";
67

78
export type RequestIdempotencyServiceOptions<TTypes extends string> = {
89
types: TTypes[];
@@ -53,6 +54,14 @@ export class RequestIdempotencyService<TTypes extends string> {
5354
}
5455

5556
async checkRequest(type: TTypes, requestId: string) {
57+
if (!this.#validateRequestId(requestId)) {
58+
this.logger.warn("RequestIdempotency: invalid requestId", {
59+
requestId,
60+
});
61+
62+
return undefined;
63+
}
64+
5665
const key = `${type}:${requestId}`;
5766
const result = await this.cache.requests.get(key);
5867

@@ -67,6 +76,13 @@ export class RequestIdempotencyService<TTypes extends string> {
6776
}
6877

6978
async saveRequest(type: TTypes, requestId: string, value: RequestIdempotencyCacheEntry) {
79+
if (!this.#validateRequestId(requestId)) {
80+
this.logger.warn("RequestIdempotency: invalid requestId", {
81+
requestId,
82+
});
83+
return undefined;
84+
}
85+
7086
const key = `${type}:${requestId}`;
7187
const result = await this.cache.requests.set(key, value);
7288

@@ -86,4 +102,13 @@ export class RequestIdempotencyService<TTypes extends string> {
86102

87103
return result;
88104
}
105+
106+
// The requestId should be a valid UUID
107+
#validateRequestId(requestId: string): boolean {
108+
return isValidV4UUID(requestId);
109+
}
110+
}
111+
112+
function isValidV4UUID(uuid: string): boolean {
113+
return uuidValidate(uuid) && uuidVersion(uuid) === 4;
89114
}

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import { TriggerTaskServiceV1 } from "./triggerTaskV1.server";
1515
import { DefaultTraceEventsConcern } from "~/runEngine/concerns/traceEvents.server";
1616
import { DefaultRunChainStateManager } from "~/runEngine/concerns/runChainStates.server";
1717
import { env } from "~/env.server";
18+
import { Evt } from "evt";
1819

1920
export type TriggerTaskServiceOptions = {
2021
idempotencyKey?: string;
@@ -113,6 +114,7 @@ export class TriggerTaskService extends WithRunEngine {
113114
),
114115
tracer: tracer,
115116
});
117+
116118
return await service.call({
117119
taskId,
118120
environment,

apps/webapp/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@
198198
"tiny-invariant": "^1.2.0",
199199
"ulid": "^2.3.0",
200200
"ulidx": "^2.2.1",
201+
"uuid": "^9.0.0",
201202
"ws": "^8.11.0",
202203
"zod": "3.23.8",
203204
"zod-error": "1.5.0",
@@ -239,6 +240,7 @@
239240
"@types/slug": "^5.0.3",
240241
"@types/supertest": "^6.0.2",
241242
"@types/tar": "^6.1.4",
243+
"@types/uuid": "^9.0.0",
242244
"@types/ws": "^8.5.3",
243245
"@typescript-eslint/eslint-plugin": "^5.59.6",
244246
"@typescript-eslint/parser": "^5.59.6",

pnpm-lock.yaml

Lines changed: 8 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)