Skip to content

Commit a383830

Browse files
authored
feat(kb): add adjustable concurrency and batching to uploads and embeddings (#1198)
1 parent 4310dd6 commit a383830

File tree

3 files changed

+30
-18
lines changed

3 files changed

+30
-18
lines changed

apps/sim/background/knowledge-processing.ts

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { task } from '@trigger.dev/sdk'
2+
import { env } from '@/lib/env'
23
import { processDocumentAsync } from '@/lib/knowledge/documents/service'
34
import { createLogger } from '@/lib/logs/console/logger'
45

@@ -25,15 +26,15 @@ export type DocumentProcessingPayload = {
2526

2627
export const processDocument = task({
2728
id: 'knowledge-process-document',
28-
maxDuration: 300,
29+
maxDuration: env.KB_CONFIG_MAX_DURATION,
2930
retry: {
30-
maxAttempts: 3,
31-
factor: 2,
32-
minTimeoutInMs: 1000,
33-
maxTimeoutInMs: 10000,
31+
maxAttempts: env.KB_CONFIG_MAX_ATTEMPTS,
32+
factor: env.KB_CONFIG_RETRY_FACTOR,
33+
minTimeoutInMs: env.KB_CONFIG_MIN_TIMEOUT,
34+
maxTimeoutInMs: env.KB_CONFIG_MAX_TIMEOUT,
3435
},
3536
queue: {
36-
concurrencyLimit: 20,
37+
concurrencyLimit: env.KB_CONFIG_CONCURRENCY_LIMIT,
3738
name: 'document-processing-queue',
3839
},
3940
run: async (payload: DocumentProcessingPayload) => {

apps/sim/lib/env.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,17 @@ export const env = createEnv({
139139
RATE_LIMIT_ENTERPRISE_SYNC: z.string().optional().default('150'), // Enterprise tier sync API executions per minute
140140
RATE_LIMIT_ENTERPRISE_ASYNC: z.string().optional().default('1000'), // Enterprise tier async API executions per minute
141141

142+
// Knowledge Base Processing Configuration - Shared across all processing methods
143+
KB_CONFIG_MAX_DURATION: z.number().optional().default(300), // Max processing duration in s
144+
KB_CONFIG_MAX_ATTEMPTS: z.number().optional().default(3), // Max retry attempts
145+
KB_CONFIG_RETRY_FACTOR: z.number().optional().default(2), // Retry backoff factor
146+
KB_CONFIG_MIN_TIMEOUT: z.number().optional().default(1000), // Min timeout in ms
147+
KB_CONFIG_MAX_TIMEOUT: z.number().optional().default(10000), // Max timeout in ms
148+
KB_CONFIG_CONCURRENCY_LIMIT: z.number().optional().default(20), // Queue concurrency limit
149+
KB_CONFIG_BATCH_SIZE: z.number().optional().default(20), // Processing batch size
150+
KB_CONFIG_DELAY_BETWEEN_BATCHES: z.number().optional().default(100), // Delay between batches in ms
151+
KB_CONFIG_DELAY_BETWEEN_DOCUMENTS: z.number().optional().default(50), // Delay between documents in ms
152+
142153
// Real-time Communication
143154
SOCKET_SERVER_URL: z.string().url().optional(), // WebSocket server URL for real-time features
144155
SOCKET_PORT: z.number().optional(), // Port for WebSocket server

apps/sim/lib/knowledge/documents/service.ts

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ import type { DocumentSortField, SortOrder } from './types'
1717
const logger = createLogger('DocumentService')
1818

1919
const TIMEOUTS = {
20-
OVERALL_PROCESSING: 600000,
21-
EMBEDDINGS_API: 180000,
20+
OVERALL_PROCESSING: env.KB_CONFIG_MAX_DURATION * 1000,
21+
EMBEDDINGS_API: env.KB_CONFIG_MAX_TIMEOUT * 18,
2222
} as const
2323

2424
/**
@@ -38,17 +38,17 @@ function withTimeout<T>(
3838
}
3939

4040
const PROCESSING_CONFIG = {
41-
maxConcurrentDocuments: 4,
42-
batchSize: 10,
43-
delayBetweenBatches: 200,
44-
delayBetweenDocuments: 100,
41+
maxConcurrentDocuments: Math.max(1, Math.floor(env.KB_CONFIG_CONCURRENCY_LIMIT / 5)) || 4,
42+
batchSize: Math.max(1, Math.floor(env.KB_CONFIG_BATCH_SIZE / 2)) || 10,
43+
delayBetweenBatches: env.KB_CONFIG_DELAY_BETWEEN_BATCHES * 2,
44+
delayBetweenDocuments: env.KB_CONFIG_DELAY_BETWEEN_DOCUMENTS * 2,
4545
}
4646

4747
const REDIS_PROCESSING_CONFIG = {
48-
maxConcurrentDocuments: 12,
49-
batchSize: 20,
50-
delayBetweenBatches: 100,
51-
delayBetweenDocuments: 50,
48+
maxConcurrentDocuments: env.KB_CONFIG_CONCURRENCY_LIMIT,
49+
batchSize: env.KB_CONFIG_BATCH_SIZE,
50+
delayBetweenBatches: env.KB_CONFIG_DELAY_BETWEEN_BATCHES,
51+
delayBetweenDocuments: env.KB_CONFIG_DELAY_BETWEEN_DOCUMENTS,
5252
}
5353

5454
let documentQueue: DocumentProcessingQueue | null = null
@@ -59,8 +59,8 @@ export function getDocumentQueue(): DocumentProcessingQueue {
5959
const config = redisClient ? REDIS_PROCESSING_CONFIG : PROCESSING_CONFIG
6060
documentQueue = new DocumentProcessingQueue({
6161
maxConcurrent: config.maxConcurrentDocuments,
62-
retryDelay: 2000,
63-
maxRetries: 5,
62+
retryDelay: env.KB_CONFIG_MIN_TIMEOUT,
63+
maxRetries: env.KB_CONFIG_MAX_ATTEMPTS,
6464
})
6565
}
6666
return documentQueue

0 commit comments

Comments
 (0)