Skip to content

Commit 8202b92

Browse files
committed
Use x-trigger-request-idempotency-key header instead
Plus cleanup shared logic in the routes
1 parent 84eeeae commit 8202b92

File tree

5 files changed

+173
-123
lines changed

5 files changed

+173
-123
lines changed

apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts

Lines changed: 25 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,14 @@ import { z } from "zod";
99
import { prisma } from "~/db.server";
1010
import { env } from "~/env.server";
1111
import { EngineServiceValidationError } from "~/runEngine/concerns/errors";
12-
import {
13-
ApiAuthenticationResultSuccess,
14-
AuthenticatedEnvironment,
15-
getOneTimeUseToken,
16-
} from "~/services/apiAuth.server";
12+
import { ApiAuthenticationResultSuccess, getOneTimeUseToken } from "~/services/apiAuth.server";
1713
import { logger } from "~/services/logger.server";
18-
import { requestIdempotency } from "~/services/requestIdempotencyInstance.server";
1914
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
2015
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
16+
import {
17+
handleRequestIdempotency,
18+
saveRequestIdempotency,
19+
} from "~/utils/requestIdempotency.server";
2120
import { ServiceValidationError } from "~/v3/services/baseService.server";
2221
import { OutOfEntitlementError, TriggerTaskService } from "~/v3/services/triggerTask.server";
2322

@@ -33,7 +32,7 @@ export const HeadersSchema = z.object({
3332
"x-trigger-worker": z.string().nullish(),
3433
"x-trigger-client": z.string().nullish(),
3534
"x-trigger-engine-version": RunEngineVersionSchema.nullish(),
36-
"x-trigger-request-id": z.string().nullish(),
35+
"x-trigger-request-idempotency-key": z.string().nullish(),
3736
traceparent: z.string().optional(),
3837
tracestate: z.string().optional(),
3938
});
@@ -63,51 +62,32 @@ const { action, loader } = createActionApiRoute(
6362
"x-trigger-worker": isFromWorker,
6463
"x-trigger-client": triggerClient,
6564
"x-trigger-engine-version": engineVersion,
66-
"x-trigger-request-id": requestId,
65+
"x-trigger-request-idempotency-key": requestIdempotencyKey,
6766
} = headers;
6867

69-
if (requestId) {
70-
logger.debug("request-idempotency: checking for cached trigger request", {
71-
requestId,
72-
});
73-
74-
const cachedRequest = await requestIdempotency.checkRequest("trigger", requestId);
75-
76-
if (cachedRequest) {
77-
logger.info("request-idempotency: found cached trigger request", {
78-
requestId,
79-
cachedRequest,
80-
});
81-
82-
const cachedRun = await prisma.taskRun.findFirst({
68+
const cachedResponse = await handleRequestIdempotency(requestIdempotencyKey, {
69+
requestType: "trigger",
70+
findCachedEntity: async (cachedRequestId) => {
71+
return await prisma.taskRun.findFirst({
8372
where: {
84-
id: cachedRequest.id,
73+
id: cachedRequestId,
8574
},
8675
select: {
8776
friendlyId: true,
8877
},
8978
});
79+
},
80+
buildResponse: (cachedRun) => ({
81+
id: cachedRun.friendlyId,
82+
isCached: false,
83+
}),
84+
buildResponseHeaders: async (responseBody, cachedEntity) => {
85+
return await responseHeaders(cachedEntity, authentication, triggerClient);
86+
},
87+
});
9088

91-
if (cachedRun) {
92-
logger.info("request-idempotency: found cached trigger run", {
93-
requestId,
94-
cachedRun,
95-
});
96-
97-
const $responseHeaders = await responseHeaders(cachedRun, authentication, triggerClient);
98-
99-
return json(
100-
{
101-
id: cachedRun.friendlyId,
102-
isCached: false,
103-
},
104-
{
105-
headers: $responseHeaders,
106-
status: 200,
107-
}
108-
);
109-
}
110-
}
89+
if (cachedResponse) {
90+
return cachedResponse;
11191
}
11292

11393
const service = new TriggerTaskService();
@@ -152,11 +132,7 @@ const { action, loader } = createActionApiRoute(
152132
return json({ error: "Task not found" }, { status: 404 });
153133
}
154134

155-
if (requestId) {
156-
await requestIdempotency.saveRequest("trigger", requestId, {
157-
id: result.run.id,
158-
});
159-
}
135+
await saveRequestIdempotency(requestIdempotencyKey, "trigger", result.run.id);
160136

161137
const $responseHeaders = await responseHeaders(result.run, authentication, triggerClient);
162138

@@ -167,7 +143,7 @@ const { action, loader } = createActionApiRoute(
167143
},
168144
{
169145
headers: $responseHeaders,
170-
status: 200,
146+
status: 408,
171147
}
172148
);
173149
} catch (error) {

apps/webapp/app/routes/api.v2.tasks.batch.ts

Lines changed: 25 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,20 @@ import {
44
BatchTriggerTaskV3Response,
55
generateJWT,
66
} from "@trigger.dev/core/v3";
7+
import { prisma } from "~/db.server";
78
import { env } from "~/env.server";
9+
import { RunEngineBatchTriggerService } from "~/runEngine/services/batchTrigger.server";
810
import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server";
911
import { logger } from "~/services/logger.server";
1012
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
13+
import {
14+
handleRequestIdempotency,
15+
saveRequestIdempotency,
16+
} from "~/utils/requestIdempotency.server";
1117
import { ServiceValidationError } from "~/v3/services/baseService.server";
1218
import { BatchProcessingStrategy } from "~/v3/services/batchTriggerV3.server";
1319
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
1420
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
15-
import { RunEngineBatchTriggerService } from "~/runEngine/services/batchTrigger.server";
16-
import { z } from "zod";
17-
import { requestIdempotency } from "~/services/requestIdempotencyInstance.server";
18-
import { prisma } from "~/db.server";
1921

2022
const { action, loader } = createActionApiRoute(
2123
{
@@ -56,7 +58,7 @@ const { action, loader } = createActionApiRoute(
5658
"x-trigger-client": triggerClient,
5759
"x-trigger-engine-version": engineVersion,
5860
"batch-processing-strategy": batchProcessingStrategy,
59-
"x-trigger-request-id": requestId,
61+
"x-trigger-request-idempotency-key": requestIdempotencyKey,
6062
traceparent,
6163
tracestate,
6264
} = headers;
@@ -71,55 +73,34 @@ const { action, loader } = createActionApiRoute(
7173
traceparent,
7274
tracestate,
7375
batchProcessingStrategy,
74-
requestId,
76+
requestIdempotencyKey,
7577
});
7678

77-
if (requestId) {
78-
logger.debug("request-idempotency: checking for cached batch-trigger request", {
79-
requestId,
80-
});
81-
82-
const cachedRequest = await requestIdempotency.checkRequest("batch-trigger", requestId);
83-
84-
if (cachedRequest) {
85-
logger.info("request-idempotency: found cached batch-trigger request", {
86-
requestId,
87-
cachedRequest,
88-
});
89-
90-
// Find the batch and return it as a 200 response
91-
const cachedBatch = await prisma.batchTaskRun.findFirst({
79+
const cachedResponse = await handleRequestIdempotency(requestIdempotencyKey, {
80+
requestType: "batch-trigger",
81+
findCachedEntity: async (cachedRequestId) => {
82+
return await prisma.batchTaskRun.findFirst({
9283
where: {
93-
id: cachedRequest.id,
84+
id: cachedRequestId,
9485
runtimeEnvironmentId: authentication.environment.id,
9586
},
9687
select: {
9788
friendlyId: true,
9889
runCount: true,
9990
},
10091
});
92+
},
93+
buildResponse: (cachedBatch) => ({
94+
id: cachedBatch.friendlyId,
95+
runCount: cachedBatch.runCount,
96+
}),
97+
buildResponseHeaders: async (responseBody, cachedEntity) => {
98+
return await responseHeaders(responseBody, authentication.environment, triggerClient);
99+
},
100+
});
101101

102-
if (cachedBatch) {
103-
logger.info("request-idempotency: found cached batch-trigger request", {
104-
requestId,
105-
cachedRequest,
106-
cachedBatch,
107-
});
108-
109-
const responseBody = {
110-
id: cachedBatch.friendlyId,
111-
runCount: cachedBatch.runCount,
112-
};
113-
114-
const $responseHeaders = await responseHeaders(
115-
responseBody,
116-
authentication.environment,
117-
triggerClient
118-
);
119-
120-
return json(responseBody, { status: 200, headers: $responseHeaders });
121-
}
122-
}
102+
if (cachedResponse) {
103+
return cachedResponse;
123104
}
124105

125106
const traceContext =
@@ -130,11 +111,7 @@ const { action, loader } = createActionApiRoute(
130111
const service = new RunEngineBatchTriggerService(batchProcessingStrategy ?? undefined);
131112

132113
service.onBatchTaskRunCreated.attachOnce(async (batch) => {
133-
if (requestId) {
134-
await requestIdempotency.saveRequest("batch-trigger", requestId, {
135-
id: batch.id,
136-
});
137-
}
114+
await saveRequestIdempotency(requestIdempotencyKey, "batch-trigger", batch.id);
138115
});
139116

140117
try {

apps/webapp/app/services/requestIdempotency.server.ts

Lines changed: 32 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { MemoryStore } from "@unkey/cache/stores";
44
import { RedisCacheStore } from "./unkey/redisCacheStore.server";
55
import { RedisWithClusterOptions } from "~/redis.server";
66
import { validate as uuidValidate, version as uuidVersion } from "uuid";
7+
import { startActiveSpan } from "~/v3/tracer.server";
78

89
export type RequestIdempotencyServiceOptions<TTypes extends string> = {
910
types: TTypes[];
@@ -53,37 +54,46 @@ export class RequestIdempotencyService<TTypes extends string> {
5354
this.cache = cache;
5455
}
5556

56-
async checkRequest(type: TTypes, requestId: string) {
57-
if (!this.#validateRequestId(requestId)) {
58-
this.logger.warn("RequestIdempotency: invalid requestId", {
59-
requestId,
57+
async checkRequest(type: TTypes, requestIdempotencyKey: string) {
58+
if (!this.#validateRequestId(requestIdempotencyKey)) {
59+
this.logger.warn("RequestIdempotency: invalid requestIdempotencyKey", {
60+
requestIdempotencyKey,
6061
});
6162

6263
return undefined;
6364
}
6465

65-
const key = `${type}:${requestId}`;
66-
const result = await this.cache.requests.get(key);
66+
return startActiveSpan("RequestIdempotency.checkRequest()", async (span) => {
67+
span.setAttribute("request_id", requestIdempotencyKey);
68+
span.setAttribute("type", type);
6769

68-
this.logger.debug("RequestIdempotency: checking request", {
69-
type,
70-
requestId,
71-
key,
72-
result,
73-
});
70+
const key = `${type}:${requestIdempotencyKey}`;
71+
const result = await this.cache.requests.get(key);
72+
73+
this.logger.debug("RequestIdempotency: checking request", {
74+
type,
75+
requestIdempotencyKey,
76+
key,
77+
result,
78+
});
7479

75-
return result.val ? result.val : undefined;
80+
return result.val ? result.val : undefined;
81+
});
7682
}
7783

78-
async saveRequest(type: TTypes, requestId: string, value: RequestIdempotencyCacheEntry) {
79-
if (!this.#validateRequestId(requestId)) {
80-
this.logger.warn("RequestIdempotency: invalid requestId", {
81-
requestId,
84+
async saveRequest(
85+
type: TTypes,
86+
requestIdempotencyKey: string,
87+
value: RequestIdempotencyCacheEntry
88+
) {
89+
if (!this.#validateRequestId(requestIdempotencyKey)) {
90+
this.logger.warn("RequestIdempotency: invalid requestIdempotencyKey", {
91+
requestIdempotencyKey,
8292
});
8393
return undefined;
8494
}
8595

86-
const key = `${type}:${requestId}`;
96+
const key = `${type}:${requestIdempotencyKey}`;
8797
const result = await this.cache.requests.set(key, value);
8898

8999
if (result.err) {
@@ -94,7 +104,7 @@ export class RequestIdempotencyService<TTypes extends string> {
94104
} else {
95105
this.logger.debug("RequestIdempotency: saved request", {
96106
type,
97-
requestId,
107+
requestIdempotencyKey,
98108
key,
99109
value,
100110
});
@@ -103,9 +113,9 @@ export class RequestIdempotencyService<TTypes extends string> {
103113
return result;
104114
}
105115

106-
// The requestId should be a valid UUID
107-
#validateRequestId(requestId: string): boolean {
108-
return isValidV4UUID(requestId);
116+
// The requestIdempotencyKey should be a valid UUID
117+
#validateRequestId(requestIdempotencyKey: string): boolean {
118+
return isValidV4UUID(requestIdempotencyKey);
109119
}
110120
}
111121

0 commit comments

Comments
 (0)