Skip to content

Commit 3f90094

Browse files
authored
improvement(kb): use trigger.dev for kb tasks (#1166)
1 parent bda8ee7 commit 3f90094

File tree

5 files changed

+171
-19
lines changed

5 files changed

+171
-19
lines changed

apps/sim/app/api/files/presigned/route.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -232,10 +232,9 @@ async function handleS3PresignedUrl(
232232
)
233233
}
234234

235-
// For chat images, use direct S3 URLs since they need to be permanently accessible
236-
// For other files, use serve path for access control
235+
// For chat images and knowledge base files, use direct URLs since they need to be accessible by external services
237236
const finalPath =
238-
uploadType === 'chat'
237+
uploadType === 'chat' || uploadType === 'knowledge-base'
239238
? `https://${config.bucket}.s3.${config.region}.amazonaws.com/${uniqueKey}`
240239
: `/api/files/serve/s3/${encodeURIComponent(uniqueKey)}`
241240

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import { task } from '@trigger.dev/sdk'
2+
import { processDocumentAsync } from '@/lib/knowledge/documents/service'
3+
import { createLogger } from '@/lib/logs/console/logger'
4+
5+
const logger = createLogger('TriggerKnowledgeProcessing')
6+
7+
export type DocumentProcessingPayload = {
8+
knowledgeBaseId: string
9+
documentId: string
10+
docData: {
11+
filename: string
12+
fileUrl: string
13+
fileSize: number
14+
mimeType: string
15+
}
16+
processingOptions: {
17+
chunkSize?: number
18+
minCharactersPerChunk?: number
19+
recipe?: string
20+
lang?: string
21+
chunkOverlap?: number
22+
}
23+
requestId: string
24+
}
25+
26+
export const processDocument = task({
27+
id: 'knowledge-process-document',
28+
maxDuration: 300,
29+
retry: {
30+
maxAttempts: 3,
31+
factor: 2,
32+
minTimeoutInMs: 1000,
33+
maxTimeoutInMs: 10000,
34+
},
35+
queue: {
36+
concurrencyLimit: 20,
37+
name: 'document-processing-queue',
38+
},
39+
run: async (payload: DocumentProcessingPayload) => {
40+
const { knowledgeBaseId, documentId, docData, processingOptions, requestId } = payload
41+
42+
logger.info(`[${requestId}] Starting Trigger.dev processing for document: ${docData.filename}`)
43+
44+
try {
45+
await processDocumentAsync(knowledgeBaseId, documentId, docData, processingOptions)
46+
47+
logger.info(`[${requestId}] Successfully processed document: ${docData.filename}`)
48+
49+
return {
50+
success: true,
51+
documentId,
52+
filename: docData.filename,
53+
processingTime: Date.now(),
54+
}
55+
} catch (error) {
56+
logger.error(`[${requestId}] Failed to process document: ${docData.filename}`, error)
57+
throw error
58+
}
59+
},
60+
})

apps/sim/lib/knowledge/documents/service.ts

Lines changed: 102 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
import crypto, { randomUUID } from 'crypto'
2+
import { tasks } from '@trigger.dev/sdk'
23
import { and, asc, desc, eq, inArray, isNull, sql } from 'drizzle-orm'
34
import { getSlotsForFieldType, type TAG_SLOT_CONFIG } from '@/lib/constants/knowledge'
45
import { generateEmbeddings } from '@/lib/embeddings/utils'
6+
import { env } from '@/lib/env'
57
import { processDocument } from '@/lib/knowledge/documents/document-processor'
68
import { getNextAvailableSlot } from '@/lib/knowledge/tags/service'
79
import { createLogger } from '@/lib/logs/console/logger'
810
import { getRedisClient } from '@/lib/redis'
11+
import type { DocumentProcessingPayload } from '@/background/knowledge-processing'
912
import { db } from '@/db'
1013
import { document, embedding, knowledgeBaseTagDefinitions } from '@/db/schema'
1114
import { DocumentProcessingQueue } from './queue'
@@ -181,14 +184,55 @@ export async function processDocumentTags(
181184
}
182185

183186
/**
184-
* Process documents with Redis queue when available, fallback to concurrency control
187+
* Process documents with best available method: Trigger.dev > Redis queue > in-memory concurrency control
185188
*/
186189
export async function processDocumentsWithQueue(
187190
createdDocuments: DocumentData[],
188191
knowledgeBaseId: string,
189192
processingOptions: ProcessingOptions,
190193
requestId: string
191194
): Promise<void> {
195+
// Priority 1: Trigger.dev
196+
if (isTriggerAvailable()) {
197+
try {
198+
logger.info(
199+
`[${requestId}] Using Trigger.dev background processing for ${createdDocuments.length} documents`
200+
)
201+
202+
const triggerPayloads = createdDocuments.map((doc) => ({
203+
knowledgeBaseId,
204+
documentId: doc.documentId,
205+
docData: {
206+
filename: doc.filename,
207+
fileUrl: doc.fileUrl,
208+
fileSize: doc.fileSize,
209+
mimeType: doc.mimeType,
210+
},
211+
processingOptions: {
212+
chunkSize: processingOptions.chunkSize || 1024,
213+
minCharactersPerChunk: processingOptions.minCharactersPerChunk || 1,
214+
recipe: processingOptions.recipe || 'default',
215+
lang: processingOptions.lang || 'en',
216+
chunkOverlap: processingOptions.chunkOverlap || 200,
217+
},
218+
requestId,
219+
}))
220+
221+
const result = await processDocumentsWithTrigger(triggerPayloads, requestId)
222+
223+
if (result.success) {
224+
logger.info(
225+
`[${requestId}] Successfully triggered background processing: ${result.message}`
226+
)
227+
return
228+
}
229+
logger.warn(`[${requestId}] Trigger.dev failed: ${result.message}, falling back to Redis`)
230+
} catch (error) {
231+
logger.warn(`[${requestId}] Trigger.dev processing failed, falling back to Redis:`, error)
232+
}
233+
}
234+
235+
// Priority 2: Redis queue
192236
const queue = getDocumentQueue()
193237
const redisClient = getRedisClient()
194238

@@ -213,6 +257,7 @@ export async function processDocumentsWithQueue(
213257

214258
await Promise.all(jobPromises)
215259

260+
// Start Redis background processing
216261
queue
217262
.processJobs(async (job) => {
218263
const data = job.data as DocumentJobData
@@ -221,7 +266,6 @@ export async function processDocumentsWithQueue(
221266
})
222267
.catch((error) => {
223268
logger.error(`[${requestId}] Error in Redis queue processing:`, error)
224-
// Don't throw here - let the processing continue with fallback if needed
225269
})
226270

227271
logger.info(`[${requestId}] All documents queued for Redis processing`)
@@ -231,7 +275,10 @@ export async function processDocumentsWithQueue(
231275
}
232276
}
233277

234-
logger.info(`[${requestId}] Using fallback in-memory processing (Redis not available or failed)`)
278+
// Priority 3: In-memory processing
279+
logger.info(
280+
`[${requestId}] Using fallback in-memory processing (neither Trigger.dev nor Redis available)`
281+
)
235282
await processDocumentsWithConcurrencyControl(
236283
createdDocuments,
237284
knowledgeBaseId,
@@ -500,6 +547,51 @@ export async function processDocumentAsync(
500547
}
501548
}
502549

550+
/**
551+
* Check if Trigger.dev is available and configured
552+
*/
553+
export function isTriggerAvailable(): boolean {
554+
return !!(env.TRIGGER_SECRET_KEY && env.TRIGGER_DEV_ENABLED !== false)
555+
}
556+
557+
/**
558+
* Process documents using Trigger.dev
559+
*/
560+
export async function processDocumentsWithTrigger(
561+
documents: DocumentProcessingPayload[],
562+
requestId: string
563+
): Promise<{ success: boolean; message: string; jobIds?: string[] }> {
564+
if (!isTriggerAvailable()) {
565+
throw new Error('Trigger.dev is not configured - TRIGGER_SECRET_KEY missing')
566+
}
567+
568+
try {
569+
logger.info(`[${requestId}] Triggering background processing for ${documents.length} documents`)
570+
571+
const jobPromises = documents.map(async (document) => {
572+
const job = await tasks.trigger('knowledge-process-document', document)
573+
return job.id
574+
})
575+
576+
const jobIds = await Promise.all(jobPromises)
577+
578+
logger.info(`[${requestId}] Triggered ${jobIds.length} document processing jobs`)
579+
580+
return {
581+
success: true,
582+
message: `${documents.length} document processing jobs triggered`,
583+
jobIds,
584+
}
585+
} catch (error) {
586+
logger.error(`[${requestId}] Failed to trigger document processing jobs:`, error)
587+
588+
return {
589+
success: false,
590+
message: error instanceof Error ? error.message : 'Failed to trigger background jobs',
591+
}
592+
}
593+
}
594+
503595
/**
504596
* Create document records in database with tags
505597
*/
@@ -644,8 +736,8 @@ export async function getDocuments(
644736
search,
645737
limit = 50,
646738
offset = 0,
647-
sortBy = 'uploadedAt',
648-
sortOrder = 'desc',
739+
sortBy = 'filename',
740+
sortOrder = 'asc',
649741
} = options
650742

651743
// Build where conditions
@@ -696,7 +788,10 @@ export async function getDocuments(
696788
}
697789
}
698790

699-
const orderByClause = sortOrder === 'asc' ? asc(getOrderByColumn()) : desc(getOrderByColumn())
791+
// Use stable secondary sort to prevent shifting when primary values are identical
792+
const primaryOrderBy = sortOrder === 'asc' ? asc(getOrderByColumn()) : desc(getOrderByColumn())
793+
const secondaryOrderBy =
794+
sortBy === 'filename' ? desc(document.uploadedAt) : asc(document.filename)
700795

701796
const documents = await db
702797
.select({
@@ -725,7 +820,7 @@ export async function getDocuments(
725820
})
726821
.from(document)
727822
.where(and(...whereConditions))
728-
.orderBy(orderByClause)
823+
.orderBy(primaryOrderBy, secondaryOrderBy)
729824
.limit(limit)
730825
.offset(offset)
731826

apps/sim/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@
6868
"@radix-ui/react-tooltip": "^1.1.6",
6969
"@react-email/components": "^0.0.34",
7070
"@sentry/nextjs": "^9.15.0",
71-
"@trigger.dev/sdk": "4.0.0",
71+
"@trigger.dev/sdk": "4.0.1",
7272
"@types/pg": "8.15.5",
7373
"@types/three": "0.177.0",
7474
"@vercel/og": "^0.6.5",
@@ -134,7 +134,7 @@
134134
"@testing-library/jest-dom": "^6.6.3",
135135
"@testing-library/react": "^16.3.0",
136136
"@testing-library/user-event": "^14.6.1",
137-
"@trigger.dev/build": "4.0.0",
137+
"@trigger.dev/build": "4.0.1",
138138
"@types/html-to-text": "^9.0.4",
139139
"@types/js-yaml": "4.0.9",
140140
"@types/jsdom": "21.1.7",

bun.lock

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@
9898
"@radix-ui/react-tooltip": "^1.1.6",
9999
"@react-email/components": "^0.0.34",
100100
"@sentry/nextjs": "^9.15.0",
101-
"@trigger.dev/sdk": "4.0.0",
101+
"@trigger.dev/sdk": "4.0.1",
102102
"@types/pg": "8.15.5",
103103
"@types/three": "0.177.0",
104104
"@vercel/og": "^0.6.5",
@@ -164,7 +164,7 @@
164164
"@testing-library/jest-dom": "^6.6.3",
165165
"@testing-library/react": "^16.3.0",
166166
"@testing-library/user-event": "^14.6.1",
167-
"@trigger.dev/build": "4.0.0",
167+
"@trigger.dev/build": "4.0.1",
168168
"@types/html-to-text": "^9.0.4",
169169
"@types/js-yaml": "4.0.9",
170170
"@types/jsdom": "21.1.7",
@@ -1276,11 +1276,11 @@
12761276

12771277
"@testing-library/user-event": ["@testing-library/[email protected]", "", { "peerDependencies": { "@testing-library/dom": ">=7.21.4" } }, "sha512-vq7fv0rnt+QTXgPxr5Hjc210p6YKq2kmdziLgnsZGgLJ9e6VAShx1pACLuRjd/AS/sr7phAR58OIIpf0LlmQNw=="],
12781278

1279-
"@trigger.dev/build": ["@trigger.dev/[email protected].0", "", { "dependencies": { "@trigger.dev/core": "4.0.0", "pkg-types": "^1.1.3", "tinyglobby": "^0.2.2", "tsconfck": "3.1.3" } }, "sha512-OXTTS+pV6ZuqcCtWhiDoW/zB6lrnG1YtkGgYT+QRt+HYeYdOoVBfYfv0y8x3U4Yfiw9kznwQC/sDB1b6DiHtBA=="],
1279+
"@trigger.dev/build": ["@trigger.dev/[email protected].1", "", { "dependencies": { "@trigger.dev/core": "4.0.1", "pkg-types": "^1.1.3", "tinyglobby": "^0.2.2", "tsconfck": "3.1.3" } }, "sha512-PGOnCPjVSKkj72xmJb6mdRbzDSP3Ti/C5/tfaBFdSZ7qcoVctSzDfS5iwEGsSoSWSIv+MVy12c4v7Ji/r7MO1A=="],
12801280

1281-
"@trigger.dev/core": ["@trigger.dev/[email protected]", "", { "dependencies": { "@bugsnag/cuid": "^3.1.1", "@electric-sql/client": "1.0.0-beta.1", "@google-cloud/precise-date": "^4.0.0", "@jsonhero/path": "^1.0.21", "@opentelemetry/api": "1.9.0", "@opentelemetry/api-logs": "0.203.0", "@opentelemetry/core": "2.0.1", "@opentelemetry/exporter-logs-otlp-http": "0.203.0", "@opentelemetry/exporter-trace-otlp-http": "0.203.0", "@opentelemetry/instrumentation": "0.203.0", "@opentelemetry/resources": "2.0.1", "@opentelemetry/sdk-logs": "0.203.0", "@opentelemetry/sdk-trace-base": "2.0.1", "@opentelemetry/sdk-trace-node": "2.0.1", "@opentelemetry/semantic-conventions": "1.36.0", "dequal": "^2.0.3", "eventsource": "^3.0.5", "eventsource-parser": "^3.0.0", "execa": "^8.0.1", "humanize-duration": "^3.27.3", "jose": "^5.4.0", "lodash.get": "^4.4.2", "nanoid": "3.3.8", "prom-client": "^15.1.0", "socket.io": "4.7.4", "socket.io-client": "4.7.5", "std-env": "^3.8.1", "superjson": "^2.2.1", "tinyexec": "^0.3.2", "uncrypto": "^0.1.3", "zod": "3.25.76", "zod-error": "1.5.0", "zod-validation-error": "^1.5.0" } }, "sha512-VlRMN6RPeqU66e/j0fGmWTn97DY1b+ChsMDDBm62jZ3N9XtiOlDkrWNtggPoxPtyXsHuShllo/3gpiZDvhtKww=="],
1281+
"@trigger.dev/core": ["@trigger.dev/[email protected]", "", { "dependencies": { "@bugsnag/cuid": "^3.1.1", "@electric-sql/client": "1.0.0-beta.1", "@google-cloud/precise-date": "^4.0.0", "@jsonhero/path": "^1.0.21", "@opentelemetry/api": "1.9.0", "@opentelemetry/api-logs": "0.203.0", "@opentelemetry/core": "2.0.1", "@opentelemetry/exporter-logs-otlp-http": "0.203.0", "@opentelemetry/exporter-trace-otlp-http": "0.203.0", "@opentelemetry/instrumentation": "0.203.0", "@opentelemetry/resources": "2.0.1", "@opentelemetry/sdk-logs": "0.203.0", "@opentelemetry/sdk-trace-base": "2.0.1", "@opentelemetry/sdk-trace-node": "2.0.1", "@opentelemetry/semantic-conventions": "1.36.0", "dequal": "^2.0.3", "eventsource": "^3.0.5", "eventsource-parser": "^3.0.0", "execa": "^8.0.1", "humanize-duration": "^3.27.3", "jose": "^5.4.0", "nanoid": "3.3.8", "prom-client": "^15.1.0", "socket.io": "4.7.4", "socket.io-client": "4.7.5", "std-env": "^3.8.1", "superjson": "^2.2.1", "tinyexec": "^0.3.2", "uncrypto": "^0.1.3", "zod": "3.25.76", "zod-error": "1.5.0", "zod-validation-error": "^1.5.0" } }, "sha512-NTffiVPy/zFopujdptGGoy3lj3/CKV16JA8CobCfsEpDfu+K+wEys+9p8PFY8j5I0UI86aqlFpJu9/VRqUQ/yQ=="],
12821282

1283-
"@trigger.dev/sdk": ["@trigger.dev/[email protected].0", "", { "dependencies": { "@opentelemetry/api": "1.9.0", "@opentelemetry/semantic-conventions": "1.36.0", "@trigger.dev/core": "4.0.0", "chalk": "^5.2.0", "cronstrue": "^2.21.0", "debug": "^4.3.4", "evt": "^2.4.13", "slug": "^6.0.0", "ulid": "^2.3.0", "uncrypto": "^0.1.3", "uuid": "^9.0.0", "ws": "^8.11.0" }, "peerDependencies": { "ai": "^4.2.0 || ^5.0.0", "zod": "^3.0.0 || ^4.0.0" }, "optionalPeers": ["ai"] }, "sha512-rq7XvY4jxCmWr6libN1egw8w0Bq0TWbbnAxCCXDScgWEszLauYmXy8WaVlJyxbwslVMHsvXP36JBFa3J3ay2yg=="],
1283+
"@trigger.dev/sdk": ["@trigger.dev/[email protected].1", "", { "dependencies": { "@opentelemetry/api": "1.9.0", "@opentelemetry/semantic-conventions": "1.36.0", "@trigger.dev/core": "4.0.1", "chalk": "^5.2.0", "cronstrue": "^2.21.0", "debug": "^4.3.4", "evt": "^2.4.13", "slug": "^6.0.0", "ulid": "^2.3.0", "uncrypto": "^0.1.3", "uuid": "^9.0.0", "ws": "^8.11.0" }, "peerDependencies": { "ai": "^4.2.0 || ^5.0.0", "zod": "^3.0.0 || ^4.0.0" }, "optionalPeers": ["ai"] }, "sha512-cdEgrwIl2Kg2jd85dA4tdePPPe+iMjAGX0Q8QrO2CNo/iBcjl7jB7uzvmSjDKYmJoC+8a30fCWviYy6ljOs1oQ=="],
12841284

12851285
"@tweenjs/tween.js": ["@tweenjs/[email protected]", "", {}, "sha512-vJmvvwFxYuGnF2axRtPYocag6Clbb5YS7kLL+SO/TeVFzHqDIWrNKYtcsPMibjDx9O+bu+psAy9NKfWklassUA=="],
12861286

@@ -2304,8 +2304,6 @@
23042304

23052305
"lodash.defaults": ["[email protected]", "", {}, "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ=="],
23062306

2307-
"lodash.get": ["[email protected]", "", {}, "sha512-z+Uw/vLuy6gQe8cfaFWD7p0wVv8fJl3mbzXh33RS+0oW2wvUqiRXiQ69gLWSLpgB5/6sU+r6BlQR0MBILadqTQ=="],
2308-
23092307
"lodash.isarguments": ["[email protected]", "", {}, "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg=="],
23102308

23112309
"lodash.merge": ["[email protected]", "", {}, "sha512-0KpjqXRVvrYyCsX1swR/XTK0va6VQkQM6MNo7PqW77ByjAhoARA8EfrP1N4+KlKj8YS0ZUCtRT/YUuhyYDujIQ=="],

0 commit comments

Comments
 (0)