Skip to content

Commit 75aca00

Browse files
authored
improvement(kb): optimize processes, add more robust fallbacks for large file ops (#2684)
* improvement(kb): optimize processes, add more robust fallbacks for large file ops * stronger typing * comments cleanup * ack PR comments * upgraded turborepo * ack more PR comments * fix failing test * moved doc update inside tx for embeddings chunks upload * ack more PR comments
1 parent d25084e commit 75aca00

File tree

22 files changed

+589
-258
lines changed

22 files changed

+589
-258
lines changed

apps/sim/app/api/knowledge/utils.test.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,16 +136,29 @@ vi.mock('@sim/db', () => {
136136
},
137137
}),
138138
}),
139+
delete: () => ({
140+
where: () => Promise.resolve(),
141+
}),
142+
insert: () => ({
143+
values: (records: any) => {
144+
dbOps.order.push('insert')
145+
dbOps.insertRecords.push(records)
146+
return Promise.resolve()
147+
},
148+
}),
139149
transaction: vi.fn(async (fn: any) => {
140150
await fn({
141-
insert: (table: any) => ({
151+
delete: () => ({
152+
where: () => Promise.resolve(),
153+
}),
154+
insert: () => ({
142155
values: (records: any) => {
143156
dbOps.order.push('insert')
144157
dbOps.insertRecords.push(records)
145158
return Promise.resolve()
146159
},
147160
}),
148-
update: (table: any) => ({
161+
update: () => ({
149162
set: (payload: any) => ({
150163
where: () => {
151164
dbOps.updatePayloads.push(payload)

apps/sim/app/workspace/[workspaceId]/knowledge/[id]/base.tsx

Lines changed: 9 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,8 @@ export function KnowledgeBase({
453453
error: knowledgeBaseError,
454454
refresh: refreshKnowledgeBase,
455455
} = useKnowledgeBase(id)
456+
const [hasProcessingDocuments, setHasProcessingDocuments] = useState(false)
457+
456458
const {
457459
documents,
458460
pagination,
@@ -468,6 +470,7 @@ export function KnowledgeBase({
468470
offset: (currentPage - 1) * DOCUMENTS_PER_PAGE,
469471
sortBy,
470472
sortOrder,
473+
refetchInterval: hasProcessingDocuments && !isDeleting ? 3000 : false,
471474
})
472475

473476
const { tagDefinitions } = useKnowledgeBaseTagDefinitions(id)
@@ -534,25 +537,15 @@ export function KnowledgeBase({
534537
)
535538

536539
useEffect(() => {
537-
const hasProcessingDocuments = documents.some(
540+
const processing = documents.some(
538541
(doc) => doc.processingStatus === 'pending' || doc.processingStatus === 'processing'
539542
)
543+
setHasProcessingDocuments(processing)
540544

541-
if (!hasProcessingDocuments) return
542-
543-
const refreshInterval = setInterval(async () => {
544-
try {
545-
if (!isDeleting) {
546-
await checkForDeadProcesses()
547-
await refreshDocuments()
548-
}
549-
} catch (error) {
550-
logger.error('Error refreshing documents:', error)
551-
}
552-
}, 3000)
553-
554-
return () => clearInterval(refreshInterval)
555-
}, [documents, refreshDocuments, isDeleting])
545+
if (processing) {
546+
checkForDeadProcesses()
547+
}
548+
}, [documents])
556549

557550
/**
558551
* Checks for documents with stale processing states and marks them as failed
@@ -672,25 +665,6 @@ export function KnowledgeBase({
672665

673666
await refreshDocuments()
674667

675-
let refreshAttempts = 0
676-
const maxRefreshAttempts = 3
677-
const refreshInterval = setInterval(async () => {
678-
try {
679-
refreshAttempts++
680-
await refreshDocuments()
681-
if (refreshAttempts >= maxRefreshAttempts) {
682-
clearInterval(refreshInterval)
683-
}
684-
} catch (error) {
685-
logger.error('Error refreshing documents after retry:', error)
686-
clearInterval(refreshInterval)
687-
}
688-
}, 1000)
689-
690-
setTimeout(() => {
691-
clearInterval(refreshInterval)
692-
}, 4000)
693-
694668
logger.info(`Document retry initiated successfully for: ${docId}`)
695669
} catch (err) {
696670
logger.error('Error retrying document:', err)

apps/sim/background/knowledge-processing.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ export type DocumentProcessingPayload = {
2727
export const processDocument = task({
2828
id: 'knowledge-process-document',
2929
maxDuration: env.KB_CONFIG_MAX_DURATION || 600,
30+
machine: 'large-1x', // 2 vCPU, 2GB RAM - needed for large PDF processing
3031
retry: {
3132
maxAttempts: env.KB_CONFIG_MAX_ATTEMPTS || 3,
3233
factor: env.KB_CONFIG_RETRY_FACTOR || 2,

apps/sim/hooks/queries/knowledge.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ export function useKnowledgeDocumentsQuery(
228228
params: KnowledgeDocumentsParams,
229229
options?: {
230230
enabled?: boolean
231+
refetchInterval?: number | false
231232
}
232233
) {
233234
const paramsKey = serializeDocumentParams(params)
@@ -237,6 +238,7 @@ export function useKnowledgeDocumentsQuery(
237238
enabled: (options?.enabled ?? true) && Boolean(params.knowledgeBaseId),
238239
staleTime: 60 * 1000,
239240
placeholderData: keepPreviousData,
241+
refetchInterval: options?.refetchInterval ?? false,
240242
})
241243
}
242244

apps/sim/hooks/use-knowledge.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ export function useKnowledgeBaseDocuments(
6767
sortBy?: string
6868
sortOrder?: string
6969
enabled?: boolean
70+
refetchInterval?: number | false
7071
}
7172
) {
7273
const queryClient = useQueryClient()
@@ -92,6 +93,7 @@ export function useKnowledgeBaseDocuments(
9293
},
9394
{
9495
enabled: (options?.enabled ?? true) && Boolean(knowledgeBaseId),
96+
refetchInterval: options?.refetchInterval,
9597
}
9698
)
9799

apps/sim/lib/chunkers/docs-chunker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ interface HeaderInfo {
1616
interface Frontmatter {
1717
title?: string
1818
description?: string
19-
[key: string]: any
19+
[key: string]: unknown
2020
}
2121

2222
const logger = createLogger('DocsChunker')

apps/sim/lib/chunkers/json-yaml-chunker.ts

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@ import { estimateTokenCount } from '@/lib/tokenization/estimators'
66

77
const logger = createLogger('JsonYamlChunker')
88

9+
type JsonPrimitive = string | number | boolean | null
10+
type JsonValue = JsonPrimitive | JsonObject | JsonArray
11+
type JsonObject = { [key: string]: JsonValue }
12+
type JsonArray = JsonValue[]
13+
914
function getTokenCount(text: string): number {
1015
try {
1116
return getAccurateTokenCount(text, 'text-embedding-3-small')
@@ -59,11 +64,11 @@ export class JsonYamlChunker {
5964
*/
6065
async chunk(content: string): Promise<Chunk[]> {
6166
try {
62-
let data: any
67+
let data: JsonValue
6368
try {
64-
data = JSON.parse(content)
69+
data = JSON.parse(content) as JsonValue
6570
} catch {
66-
data = yaml.load(content)
71+
data = yaml.load(content) as JsonValue
6772
}
6873
const chunks = this.chunkStructuredData(data)
6974

@@ -86,15 +91,15 @@ export class JsonYamlChunker {
8691
/**
8792
* Chunk structured data based on its structure
8893
*/
89-
private chunkStructuredData(data: any, path: string[] = []): Chunk[] {
94+
private chunkStructuredData(data: JsonValue, path: string[] = []): Chunk[] {
9095
const chunks: Chunk[] = []
9196

9297
if (Array.isArray(data)) {
9398
return this.chunkArray(data, path)
9499
}
95100

96101
if (typeof data === 'object' && data !== null) {
97-
return this.chunkObject(data, path)
102+
return this.chunkObject(data as JsonObject, path)
98103
}
99104

100105
const content = JSON.stringify(data, null, 2)
@@ -118,9 +123,9 @@ export class JsonYamlChunker {
118123
/**
119124
* Chunk an array intelligently
120125
*/
121-
private chunkArray(arr: any[], path: string[]): Chunk[] {
126+
private chunkArray(arr: JsonArray, path: string[]): Chunk[] {
122127
const chunks: Chunk[] = []
123-
let currentBatch: any[] = []
128+
let currentBatch: JsonValue[] = []
124129
let currentTokens = 0
125130

126131
const contextHeader = path.length > 0 ? `// ${path.join('.')}\n` : ''
@@ -194,7 +199,7 @@ export class JsonYamlChunker {
194199
/**
195200
* Chunk an object intelligently
196201
*/
197-
private chunkObject(obj: Record<string, any>, path: string[]): Chunk[] {
202+
private chunkObject(obj: JsonObject, path: string[]): Chunk[] {
198203
const chunks: Chunk[] = []
199204
const entries = Object.entries(obj)
200205

@@ -213,7 +218,7 @@ export class JsonYamlChunker {
213218
return chunks
214219
}
215220

216-
let currentObj: Record<string, any> = {}
221+
let currentObj: JsonObject = {}
217222
let currentTokens = 0
218223
let currentKeys: string[] = []
219224

apps/sim/lib/chunkers/text-chunker.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,12 @@ export class TextChunker {
110110
chunks.push(currentChunk.trim())
111111
}
112112

113-
// Start new chunk with current part
114113
// If part itself is too large, split it further
115114
if (this.estimateTokens(part) > this.chunkSize) {
116-
chunks.push(...(await this.splitRecursively(part, separatorIndex + 1)))
115+
const subChunks = await this.splitRecursively(part, separatorIndex + 1)
116+
for (const subChunk of subChunks) {
117+
chunks.push(subChunk)
118+
}
117119
currentChunk = ''
118120
} else {
119121
currentChunk = part

apps/sim/lib/core/config/env.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ export const env = createEnv({
178178
KB_CONFIG_BATCH_SIZE: z.number().optional().default(2000), // Chunks to process per embedding batch
179179
KB_CONFIG_DELAY_BETWEEN_BATCHES: z.number().optional().default(0), // Delay between batches in ms (0 for max speed)
180180
KB_CONFIG_DELAY_BETWEEN_DOCUMENTS: z.number().optional().default(50), // Delay between documents in ms
181+
KB_CONFIG_CHUNK_CONCURRENCY: z.number().optional().default(10), // Concurrent PDF chunk OCR processing
181182

182183
// Real-time Communication
183184
SOCKET_SERVER_URL: z.string().url().optional(), // WebSocket server URL for real-time features

apps/sim/lib/file-parsers/doc-parser.ts

Lines changed: 57 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ export class DocParser implements FileParser {
1717
throw new Error(`File not found: ${filePath}`)
1818
}
1919

20-
logger.info(`Parsing DOC file: ${filePath}`)
21-
2220
const buffer = await readFile(filePath)
2321
return this.parseBuffer(buffer)
2422
} catch (error) {
@@ -29,53 +27,80 @@ export class DocParser implements FileParser {
2927

3028
async parseBuffer(buffer: Buffer): Promise<FileParseResult> {
3129
try {
32-
logger.info('Parsing DOC buffer, size:', buffer.length)
33-
3430
if (!buffer || buffer.length === 0) {
3531
throw new Error('Empty buffer provided')
3632
}
3733

38-
let parseOfficeAsync
3934
try {
4035
const officeParser = await import('officeparser')
41-
parseOfficeAsync = officeParser.parseOfficeAsync
42-
} catch (importError) {
43-
logger.warn('officeparser not available, using fallback extraction')
44-
return this.fallbackExtraction(buffer)
36+
const result = await officeParser.parseOfficeAsync(buffer)
37+
38+
if (result) {
39+
const resultString = typeof result === 'string' ? result : String(result)
40+
const content = sanitizeTextForUTF8(resultString.trim())
41+
42+
if (content.length > 0) {
43+
return {
44+
content,
45+
metadata: {
46+
characterCount: content.length,
47+
extractionMethod: 'officeparser',
48+
},
49+
}
50+
}
51+
}
52+
} catch (officeError) {
53+
logger.warn('officeparser failed, trying mammoth:', officeError)
4554
}
4655

4756
try {
48-
const result = await parseOfficeAsync(buffer)
49-
50-
if (!result) {
51-
throw new Error('officeparser returned no result')
57+
const mammoth = await import('mammoth')
58+
const result = await mammoth.extractRawText({ buffer })
59+
60+
if (result.value && result.value.trim().length > 0) {
61+
const content = sanitizeTextForUTF8(result.value.trim())
62+
return {
63+
content,
64+
metadata: {
65+
characterCount: content.length,
66+
extractionMethod: 'mammoth',
67+
messages: result.messages,
68+
},
69+
}
5270
}
53-
54-
const resultString = typeof result === 'string' ? result : String(result)
55-
56-
const content = sanitizeTextForUTF8(resultString.trim())
57-
58-
logger.info('DOC parsing completed successfully with officeparser')
59-
60-
return {
61-
content: content,
62-
metadata: {
63-
characterCount: content.length,
64-
extractionMethod: 'officeparser',
65-
},
66-
}
67-
} catch (extractError) {
68-
logger.warn('officeparser failed, using fallback:', extractError)
69-
return this.fallbackExtraction(buffer)
71+
} catch (mammothError) {
72+
logger.warn('mammoth failed:', mammothError)
7073
}
74+
75+
return this.fallbackExtraction(buffer)
7176
} catch (error) {
72-
logger.error('DOC buffer parsing error:', error)
77+
logger.error('DOC parsing error:', error)
7378
throw new Error(`Failed to parse DOC buffer: ${(error as Error).message}`)
7479
}
7580
}
7681

7782
private fallbackExtraction(buffer: Buffer): FileParseResult {
78-
logger.info('Using fallback text extraction for DOC file')
83+
const isBinaryDoc = buffer.length >= 2 && buffer[0] === 0xd0 && buffer[1] === 0xcf
84+
85+
if (!isBinaryDoc) {
86+
const textContent = buffer.toString('utf8').trim()
87+
88+
if (textContent.length > 0) {
89+
const printableChars = textContent.match(/[\x20-\x7E\n\r\t]/g)?.length || 0
90+
const isProbablyText = printableChars / textContent.length > 0.9
91+
92+
if (isProbablyText) {
93+
return {
94+
content: sanitizeTextForUTF8(textContent),
95+
metadata: {
96+
extractionMethod: 'plaintext-fallback',
97+
characterCount: textContent.length,
98+
warning: 'File is not a valid DOC format, extracted as plain text',
99+
},
100+
}
101+
}
102+
}
103+
}
79104

80105
const text = buffer.toString('utf8', 0, Math.min(buffer.length, 100000))
81106

0 commit comments

Comments
 (0)