Skip to content

Commit d73a97f

Browse files
authored
feat(idempotency): added generalized idempotency service for all triggers/webhooks (#1330)
* update infra and remove railway * feat(webhooks): add idempotency service for all triggers/webhooks * Revert "update infra and remove railway" This reverts commit abfa2f8. * cleanup * ack PR comments
1 parent f2ec43e commit d73a97f

File tree

13 files changed

+7818
-328
lines changed

13 files changed

+7818
-328
lines changed
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import { type NextRequest, NextResponse } from 'next/server'
2+
import { verifyCronAuth } from '@/lib/auth/internal'
3+
import { cleanupExpiredIdempotencyKeys, getIdempotencyKeyStats } from '@/lib/idempotency/cleanup'
4+
import { createLogger } from '@/lib/logs/console/logger'
5+
import { generateRequestId } from '@/lib/utils'
6+
7+
const logger = createLogger('IdempotencyCleanupAPI')
8+
9+
export const dynamic = 'force-dynamic'
10+
export const maxDuration = 300 // Allow up to 5 minutes for cleanup
11+
12+
export async function GET(request: NextRequest) {
13+
const requestId = generateRequestId()
14+
logger.info(`Idempotency cleanup triggered (${requestId})`)
15+
16+
try {
17+
const authError = verifyCronAuth(request, 'Idempotency key cleanup')
18+
if (authError) {
19+
return authError
20+
}
21+
22+
const statsBefore = await getIdempotencyKeyStats()
23+
logger.info(
24+
`Pre-cleanup stats: ${statsBefore.totalKeys} keys across ${Object.keys(statsBefore.keysByNamespace).length} namespaces`
25+
)
26+
27+
const result = await cleanupExpiredIdempotencyKeys({
28+
maxAgeSeconds: 7 * 24 * 60 * 60, // 7 days
29+
batchSize: 1000,
30+
})
31+
32+
const statsAfter = await getIdempotencyKeyStats()
33+
logger.info(`Post-cleanup stats: ${statsAfter.totalKeys} keys remaining`)
34+
35+
return NextResponse.json({
36+
success: true,
37+
message: 'Idempotency key cleanup completed',
38+
requestId,
39+
result: {
40+
deleted: result.deleted,
41+
errors: result.errors,
42+
statsBefore: {
43+
totalKeys: statsBefore.totalKeys,
44+
keysByNamespace: statsBefore.keysByNamespace,
45+
},
46+
statsAfter: {
47+
totalKeys: statsAfter.totalKeys,
48+
keysByNamespace: statsAfter.keysByNamespace,
49+
},
50+
},
51+
})
52+
} catch (error) {
53+
logger.error(`Error during idempotency cleanup (${requestId}):`, error)
54+
return NextResponse.json(
55+
{
56+
success: false,
57+
message: 'Idempotency cleanup failed',
58+
error: error instanceof Error ? error.message : 'Unknown error',
59+
requestId,
60+
},
61+
{ status: 500 }
62+
)
63+
}
64+
}

apps/sim/app/api/webhooks/trigger/[path]/route.ts

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { type NextRequest, NextResponse } from 'next/server'
44
import { checkServerSideUsageLimits } from '@/lib/billing'
55
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
66
import { env, isTruthy } from '@/lib/env'
7+
import { IdempotencyService, webhookIdempotency } from '@/lib/idempotency/service'
78
import { createLogger } from '@/lib/logs/console/logger'
89
import { generateRequestId } from '@/lib/utils'
910
import {
@@ -328,7 +329,7 @@ export async function POST(
328329
// Continue processing - better to risk usage limit bypass than fail webhook
329330
}
330331

331-
// --- PHASE 5: Queue webhook execution (trigger.dev or direct based on env) ---
332+
// --- PHASE 5: Idempotent webhook execution ---
332333
try {
333334
const payload = {
334335
webhookId: foundWebhook.id,
@@ -341,22 +342,44 @@ export async function POST(
341342
blockId: foundWebhook.blockId,
342343
}
343344

344-
const useTrigger = isTruthy(env.TRIGGER_DEV_ENABLED)
345+
const idempotencyKey = IdempotencyService.createWebhookIdempotencyKey(
346+
foundWebhook.id,
347+
body,
348+
Object.fromEntries(request.headers.entries())
349+
)
345350

346-
if (useTrigger) {
347-
const handle = await tasks.trigger('webhook-execution', payload)
348-
logger.info(
349-
`[${requestId}] Queued webhook execution task ${handle.id} for ${foundWebhook.provider} webhook`
350-
)
351-
} else {
352-
// Fire-and-forget direct execution to avoid blocking webhook response
353-
void executeWebhookJob(payload).catch((error) => {
354-
logger.error(`[${requestId}] Direct webhook execution failed`, error)
355-
})
356-
logger.info(
357-
`[${requestId}] Queued direct webhook execution for ${foundWebhook.provider} webhook (Trigger.dev disabled)`
358-
)
359-
}
351+
const result = await webhookIdempotency.executeWithIdempotency(
352+
foundWebhook.provider,
353+
idempotencyKey,
354+
async () => {
355+
const useTrigger = isTruthy(env.TRIGGER_DEV_ENABLED)
356+
357+
if (useTrigger) {
358+
const handle = await tasks.trigger('webhook-execution', payload)
359+
logger.info(
360+
`[${requestId}] Queued webhook execution task ${handle.id} for ${foundWebhook.provider} webhook`
361+
)
362+
return {
363+
method: 'trigger.dev',
364+
taskId: handle.id,
365+
status: 'queued',
366+
}
367+
}
368+
// Fire-and-forget direct execution to avoid blocking webhook response
369+
void executeWebhookJob(payload).catch((error) => {
370+
logger.error(`[${requestId}] Direct webhook execution failed`, error)
371+
})
372+
logger.info(
373+
`[${requestId}] Queued direct webhook execution for ${foundWebhook.provider} webhook (Trigger.dev disabled)`
374+
)
375+
return {
376+
method: 'direct',
377+
status: 'queued',
378+
}
379+
}
380+
)
381+
382+
logger.debug(`[${requestId}] Webhook execution result:`, result)
360383

361384
// Return immediate acknowledgment with provider-specific format
362385
if (foundWebhook.provider === 'microsoftteams') {

apps/sim/background/webhook-execution.ts

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { eq, sql } from 'drizzle-orm'
33
import { v4 as uuidv4 } from 'uuid'
44
import { checkServerSideUsageLimits } from '@/lib/billing'
55
import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils'
6+
import { IdempotencyService, webhookIdempotency } from '@/lib/idempotency'
67
import { createLogger } from '@/lib/logs/console/logger'
78
import { LoggingSession } from '@/lib/logs/execution/logging-session'
89
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
@@ -41,11 +42,29 @@ export async function executeWebhookJob(payload: WebhookExecutionPayload) {
4142
executionId,
4243
})
4344

44-
// Initialize logging session outside try block so it's available in catch
45+
const idempotencyKey = IdempotencyService.createWebhookIdempotencyKey(
46+
payload.webhookId,
47+
payload.body,
48+
payload.headers
49+
)
50+
51+
return await webhookIdempotency.executeWithIdempotency(
52+
payload.provider,
53+
idempotencyKey,
54+
async () => {
55+
return await executeWebhookJobInternal(payload, executionId, requestId)
56+
}
57+
)
58+
}
59+
60+
async function executeWebhookJobInternal(
61+
payload: WebhookExecutionPayload,
62+
executionId: string,
63+
requestId: string
64+
) {
4565
const loggingSession = new LoggingSession(payload.workflowId, executionId, 'webhook', requestId)
4666

4767
try {
48-
// Check usage limits first
4968
const usageCheck = await checkServerSideUsageLimits(payload.userId)
5069
if (usageCheck.isExceeded) {
5170
logger.warn(
@@ -62,15 +81,13 @@ export async function executeWebhookJob(payload: WebhookExecutionPayload) {
6281
)
6382
}
6483

65-
// Load workflow from normalized tables
6684
const workflowData = await loadWorkflowFromNormalizedTables(payload.workflowId)
6785
if (!workflowData) {
6886
throw new Error(`Workflow not found: ${payload.workflowId}`)
6987
}
7088

7189
const { blocks, edges, loops, parallels } = workflowData
7290

73-
// Get environment variables with workspace precedence
7491
const wfRows = await db
7592
.select({ workspaceId: workflowTable.workspaceId })
7693
.from(workflowTable)
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
CREATE TABLE "idempotency_key" (
2+
"key" text NOT NULL,
3+
"namespace" text DEFAULT 'default' NOT NULL,
4+
"result" json NOT NULL,
5+
"created_at" timestamp DEFAULT now() NOT NULL
6+
);
7+
--> statement-breakpoint
8+
CREATE UNIQUE INDEX "idempotency_key_namespace_unique" ON "idempotency_key" USING btree ("key","namespace");--> statement-breakpoint
9+
CREATE INDEX "idempotency_key_created_at_idx" ON "idempotency_key" USING btree ("created_at");--> statement-breakpoint
10+
CREATE INDEX "idempotency_key_namespace_idx" ON "idempotency_key" USING btree ("namespace");

0 commit comments

Comments
 (0)