Skip to content

Commit 0977ed2

Browse files
authored
improvement(kb): add configurable concurrency to chunks processing, sped up 22x for large docs (#2681)
1 parent ed6b9c0 commit 0977ed2

File tree

3 files changed

+52
-30
lines changed

3 files changed

+52
-30
lines changed

apps/sim/lib/core/config/env.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,9 @@ export const env = createEnv({
174174
KB_CONFIG_RETRY_FACTOR: z.number().optional().default(2), // Retry backoff factor
175175
KB_CONFIG_MIN_TIMEOUT: z.number().optional().default(1000), // Min timeout in ms
176176
KB_CONFIG_MAX_TIMEOUT: z.number().optional().default(10000), // Max timeout in ms
177-
KB_CONFIG_CONCURRENCY_LIMIT: z.number().optional().default(20), // Queue concurrency limit
178-
KB_CONFIG_BATCH_SIZE: z.number().optional().default(20), // Processing batch size
179-
KB_CONFIG_DELAY_BETWEEN_BATCHES: z.number().optional().default(100), // Delay between batches in ms
177+
KB_CONFIG_CONCURRENCY_LIMIT: z.number().optional().default(50), // Concurrent embedding API calls
178+
KB_CONFIG_BATCH_SIZE: z.number().optional().default(2000), // Chunks to process per embedding batch
179+
KB_CONFIG_DELAY_BETWEEN_BATCHES: z.number().optional().default(0), // Delay between batches in ms (0 for max speed)
180180
KB_CONFIG_DELAY_BETWEEN_DOCUMENTS: z.number().optional().default(50), // Delay between documents in ms
181181

182182
// Real-time Communication

apps/sim/lib/knowledge/documents/service.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ const TIMEOUTS = {
2929

3030
// Configuration for handling large documents
3131
const LARGE_DOC_CONFIG = {
32-
MAX_CHUNKS_PER_BATCH: 500, // Insert embeddings in batches of 500
33-
MAX_EMBEDDING_BATCH: 500, // Generate embeddings in batches of 500
34-
MAX_FILE_SIZE: 100 * 1024 * 1024, // 100MB max file size
35-
MAX_CHUNKS_PER_DOCUMENT: 100000, // Maximum chunks allowed per document
32+
MAX_CHUNKS_PER_BATCH: 500,
33+
MAX_EMBEDDING_BATCH: env.KB_CONFIG_BATCH_SIZE || 2000,
34+
MAX_FILE_SIZE: 100 * 1024 * 1024,
35+
MAX_CHUNKS_PER_DOCUMENT: 100000,
3636
}
3737

3838
/**

apps/sim/lib/knowledge/embeddings.ts

Lines changed: 45 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { batchByTokenLimit, getTotalTokenCount } from '@/lib/tokenization'
77
const logger = createLogger('EmbeddingUtils')
88

99
const MAX_TOKENS_PER_REQUEST = 8000
10+
const MAX_CONCURRENT_BATCHES = env.KB_CONFIG_CONCURRENCY_LIMIT || 50
1011

1112
export class EmbeddingAPIError extends Error {
1213
public status: number
@@ -121,8 +122,29 @@ async function callEmbeddingAPI(inputs: string[], config: EmbeddingConfig): Prom
121122
}
122123

123124
/**
124-
* Generate embeddings for multiple texts with token-aware batching
125-
* Uses tiktoken for token counting
125+
* Process batches with controlled concurrency
126+
*/
127+
async function processWithConcurrency<T, R>(
128+
items: T[],
129+
concurrency: number,
130+
processor: (item: T, index: number) => Promise<R>
131+
): Promise<R[]> {
132+
const results: R[] = new Array(items.length)
133+
let currentIndex = 0
134+
135+
const workers = Array.from({ length: Math.min(concurrency, items.length) }, async () => {
136+
while (currentIndex < items.length) {
137+
const index = currentIndex++
138+
results[index] = await processor(items[index], index)
139+
}
140+
})
141+
142+
await Promise.all(workers)
143+
return results
144+
}
145+
146+
/**
147+
* Generate embeddings for multiple texts with token-aware batching and parallel processing
126148
*/
127149
export async function generateEmbeddings(
128150
texts: string[],
@@ -138,35 +160,35 @@ export async function generateEmbeddings(
138160
const batches = batchByTokenLimit(texts, MAX_TOKENS_PER_REQUEST, embeddingModel)
139161

140162
logger.info(
141-
`Split ${texts.length} texts into ${batches.length} batches (max ${MAX_TOKENS_PER_REQUEST} tokens per batch)`
163+
`Split ${texts.length} texts into ${batches.length} batches (max ${MAX_TOKENS_PER_REQUEST} tokens per batch, ${MAX_CONCURRENT_BATCHES} concurrent)`
142164
)
143165

144-
const allEmbeddings: number[][] = []
166+
const batchResults = await processWithConcurrency(
167+
batches,
168+
MAX_CONCURRENT_BATCHES,
169+
async (batch, i) => {
170+
const batchTokenCount = getTotalTokenCount(batch, embeddingModel)
145171

146-
for (let i = 0; i < batches.length; i++) {
147-
const batch = batches[i]
148-
const batchTokenCount = getTotalTokenCount(batch, embeddingModel)
172+
logger.info(
173+
`Processing batch ${i + 1}/${batches.length}: ${batch.length} texts, ${batchTokenCount} tokens`
174+
)
149175

150-
logger.info(
151-
`Processing batch ${i + 1}/${batches.length}: ${batch.length} texts, ${batchTokenCount} tokens`
152-
)
176+
try {
177+
const batchEmbeddings = await callEmbeddingAPI(batch, config)
153178

154-
try {
155-
const batchEmbeddings = await callEmbeddingAPI(batch, config)
156-
allEmbeddings.push(...batchEmbeddings)
179+
logger.info(
180+
`Generated ${batchEmbeddings.length} embeddings for batch ${i + 1}/${batches.length}`
181+
)
157182

158-
logger.info(
159-
`Generated ${batchEmbeddings.length} embeddings for batch ${i + 1}/${batches.length}`
160-
)
161-
} catch (error) {
162-
logger.error(`Failed to generate embeddings for batch ${i + 1}:`, error)
163-
throw error
183+
return batchEmbeddings
184+
} catch (error) {
185+
logger.error(`Failed to generate embeddings for batch ${i + 1}:`, error)
186+
throw error
187+
}
164188
}
189+
)
165190

166-
if (i + 1 < batches.length) {
167-
await new Promise((resolve) => setTimeout(resolve, 100))
168-
}
169-
}
191+
const allEmbeddings = batchResults.flat()
170192

171193
logger.info(`Successfully generated ${allEmbeddings.length} embeddings total`)
172194

0 commit comments

Comments
 (0)