Skip to content

Commit 523d8a9

Browse files
committed
improvement(kb): optimize processes, add more robust fallbacks for large file ops
1 parent 0977ed2 commit 523d8a9

File tree

17 files changed

+509
-224
lines changed

17 files changed

+509
-224
lines changed

apps/sim/app/api/knowledge/utils.test.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,16 @@ vi.mock('@sim/db', () => {
136136
},
137137
}),
138138
}),
139+
delete: () => ({
140+
where: () => Promise.resolve(),
141+
}),
142+
insert: (table: any) => ({
143+
values: (records: any) => {
144+
dbOps.order.push('insert')
145+
dbOps.insertRecords.push(records)
146+
return Promise.resolve()
147+
},
148+
}),
139149
transaction: vi.fn(async (fn: any) => {
140150
await fn({
141151
insert: (table: any) => ({

apps/sim/app/workspace/[workspaceId]/knowledge/[id]/base.tsx

Lines changed: 9 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,8 @@ export function KnowledgeBase({
453453
error: knowledgeBaseError,
454454
refresh: refreshKnowledgeBase,
455455
} = useKnowledgeBase(id)
456+
const [hasProcessingDocuments, setHasProcessingDocuments] = useState(false)
457+
456458
const {
457459
documents,
458460
pagination,
@@ -468,6 +470,7 @@ export function KnowledgeBase({
468470
offset: (currentPage - 1) * DOCUMENTS_PER_PAGE,
469471
sortBy,
470472
sortOrder,
473+
refetchInterval: hasProcessingDocuments && !isDeleting ? 3000 : false,
471474
})
472475

473476
const { tagDefinitions } = useKnowledgeBaseTagDefinitions(id)
@@ -534,25 +537,15 @@ export function KnowledgeBase({
534537
)
535538

536539
useEffect(() => {
537-
const hasProcessingDocuments = documents.some(
540+
const processing = documents.some(
538541
(doc) => doc.processingStatus === 'pending' || doc.processingStatus === 'processing'
539542
)
543+
setHasProcessingDocuments(processing)
540544

541-
if (!hasProcessingDocuments) return
542-
543-
const refreshInterval = setInterval(async () => {
544-
try {
545-
if (!isDeleting) {
546-
await checkForDeadProcesses()
547-
await refreshDocuments()
548-
}
549-
} catch (error) {
550-
logger.error('Error refreshing documents:', error)
551-
}
552-
}, 3000)
553-
554-
return () => clearInterval(refreshInterval)
555-
}, [documents, refreshDocuments, isDeleting])
545+
if (processing) {
546+
checkForDeadProcesses()
547+
}
548+
}, [documents])
556549

557550
/**
558551
* Checks for documents with stale processing states and marks them as failed
@@ -672,25 +665,6 @@ export function KnowledgeBase({
672665

673666
await refreshDocuments()
674667

675-
let refreshAttempts = 0
676-
const maxRefreshAttempts = 3
677-
const refreshInterval = setInterval(async () => {
678-
try {
679-
refreshAttempts++
680-
await refreshDocuments()
681-
if (refreshAttempts >= maxRefreshAttempts) {
682-
clearInterval(refreshInterval)
683-
}
684-
} catch (error) {
685-
logger.error('Error refreshing documents after retry:', error)
686-
clearInterval(refreshInterval)
687-
}
688-
}, 1000)
689-
690-
setTimeout(() => {
691-
clearInterval(refreshInterval)
692-
}, 4000)
693-
694668
logger.info(`Document retry initiated successfully for: ${docId}`)
695669
} catch (err) {
696670
logger.error('Error retrying document:', err)

apps/sim/background/knowledge-processing.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ export type DocumentProcessingPayload = {
2727
export const processDocument = task({
2828
id: 'knowledge-process-document',
2929
maxDuration: env.KB_CONFIG_MAX_DURATION || 600,
30+
machine: 'large-1x', // 2 vCPU, 2GB RAM - needed for large PDF processing
3031
retry: {
3132
maxAttempts: env.KB_CONFIG_MAX_ATTEMPTS || 3,
3233
factor: env.KB_CONFIG_RETRY_FACTOR || 2,

apps/sim/hooks/queries/knowledge.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ export function useKnowledgeDocumentsQuery(
228228
params: KnowledgeDocumentsParams,
229229
options?: {
230230
enabled?: boolean
231+
refetchInterval?: number | false
231232
}
232233
) {
233234
const paramsKey = serializeDocumentParams(params)
@@ -237,6 +238,7 @@ export function useKnowledgeDocumentsQuery(
237238
enabled: (options?.enabled ?? true) && Boolean(params.knowledgeBaseId),
238239
staleTime: 60 * 1000,
239240
placeholderData: keepPreviousData,
241+
refetchInterval: options?.refetchInterval ?? false,
240242
})
241243
}
242244

apps/sim/hooks/use-knowledge.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ export function useKnowledgeBaseDocuments(
6767
sortBy?: string
6868
sortOrder?: string
6969
enabled?: boolean
70+
refetchInterval?: number | false
7071
}
7172
) {
7273
const queryClient = useQueryClient()
@@ -92,6 +93,7 @@ export function useKnowledgeBaseDocuments(
9293
},
9394
{
9495
enabled: (options?.enabled ?? true) && Boolean(knowledgeBaseId),
96+
refetchInterval: options?.refetchInterval,
9597
}
9698
)
9799

apps/sim/lib/chunkers/text-chunker.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,12 @@ export class TextChunker {
110110
chunks.push(currentChunk.trim())
111111
}
112112

113-
// Start new chunk with current part
114113
// If part itself is too large, split it further
115114
if (this.estimateTokens(part) > this.chunkSize) {
116-
chunks.push(...(await this.splitRecursively(part, separatorIndex + 1)))
115+
const subChunks = await this.splitRecursively(part, separatorIndex + 1)
116+
for (const subChunk of subChunks) {
117+
chunks.push(subChunk)
118+
}
117119
currentChunk = ''
118120
} else {
119121
currentChunk = part

apps/sim/lib/core/config/env.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ export const env = createEnv({
178178
KB_CONFIG_BATCH_SIZE: z.number().optional().default(2000), // Chunks to process per embedding batch
179179
KB_CONFIG_DELAY_BETWEEN_BATCHES: z.number().optional().default(0), // Delay between batches in ms (0 for max speed)
180180
KB_CONFIG_DELAY_BETWEEN_DOCUMENTS: z.number().optional().default(50), // Delay between documents in ms
181+
KB_CONFIG_CHUNK_CONCURRENCY: z.number().optional().default(10), // Concurrent PDF chunk OCR processing
181182

182183
// Real-time Communication
183184
SOCKET_SERVER_URL: z.string().url().optional(), // WebSocket server URL for real-time features

apps/sim/lib/file-parsers/doc-parser.ts

Lines changed: 57 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ export class DocParser implements FileParser {
1717
throw new Error(`File not found: ${filePath}`)
1818
}
1919

20-
logger.info(`Parsing DOC file: ${filePath}`)
21-
2220
const buffer = await readFile(filePath)
2321
return this.parseBuffer(buffer)
2422
} catch (error) {
@@ -29,53 +27,80 @@ export class DocParser implements FileParser {
2927

3028
async parseBuffer(buffer: Buffer): Promise<FileParseResult> {
3129
try {
32-
logger.info('Parsing DOC buffer, size:', buffer.length)
33-
3430
if (!buffer || buffer.length === 0) {
3531
throw new Error('Empty buffer provided')
3632
}
3733

38-
let parseOfficeAsync
3934
try {
4035
const officeParser = await import('officeparser')
41-
parseOfficeAsync = officeParser.parseOfficeAsync
42-
} catch (importError) {
43-
logger.warn('officeparser not available, using fallback extraction')
44-
return this.fallbackExtraction(buffer)
36+
const result = await officeParser.parseOfficeAsync(buffer)
37+
38+
if (result) {
39+
const resultString = typeof result === 'string' ? result : String(result)
40+
const content = sanitizeTextForUTF8(resultString.trim())
41+
42+
if (content.length > 0) {
43+
return {
44+
content,
45+
metadata: {
46+
characterCount: content.length,
47+
extractionMethod: 'officeparser',
48+
},
49+
}
50+
}
51+
}
52+
} catch (officeError) {
53+
logger.warn('officeparser failed, trying mammoth:', officeError)
4554
}
4655

4756
try {
48-
const result = await parseOfficeAsync(buffer)
49-
50-
if (!result) {
51-
throw new Error('officeparser returned no result')
57+
const mammoth = await import('mammoth')
58+
const result = await mammoth.extractRawText({ buffer })
59+
60+
if (result.value && result.value.trim().length > 0) {
61+
const content = sanitizeTextForUTF8(result.value.trim())
62+
return {
63+
content,
64+
metadata: {
65+
characterCount: content.length,
66+
extractionMethod: 'mammoth',
67+
messages: result.messages,
68+
},
69+
}
5270
}
53-
54-
const resultString = typeof result === 'string' ? result : String(result)
55-
56-
const content = sanitizeTextForUTF8(resultString.trim())
57-
58-
logger.info('DOC parsing completed successfully with officeparser')
59-
60-
return {
61-
content: content,
62-
metadata: {
63-
characterCount: content.length,
64-
extractionMethod: 'officeparser',
65-
},
66-
}
67-
} catch (extractError) {
68-
logger.warn('officeparser failed, using fallback:', extractError)
69-
return this.fallbackExtraction(buffer)
71+
} catch (mammothError) {
72+
logger.warn('mammoth failed:', mammothError)
7073
}
74+
75+
return this.fallbackExtraction(buffer)
7176
} catch (error) {
72-
logger.error('DOC buffer parsing error:', error)
77+
logger.error('DOC parsing error:', error)
7378
throw new Error(`Failed to parse DOC buffer: ${(error as Error).message}`)
7479
}
7580
}
7681

7782
private fallbackExtraction(buffer: Buffer): FileParseResult {
78-
logger.info('Using fallback text extraction for DOC file')
83+
const isBinaryDoc = buffer.length >= 2 && buffer[0] === 0xd0 && buffer[1] === 0xcf
84+
85+
if (!isBinaryDoc) {
86+
const textContent = buffer.toString('utf8').trim()
87+
88+
if (textContent.length > 0) {
89+
const printableChars = textContent.match(/[\x20-\x7E\n\r\t]/g)?.length || 0
90+
const isProbablyText = printableChars / textContent.length > 0.9
91+
92+
if (isProbablyText) {
93+
return {
94+
content: sanitizeTextForUTF8(textContent),
95+
metadata: {
96+
extractionMethod: 'plaintext-fallback',
97+
characterCount: textContent.length,
98+
warning: 'File is not a valid DOC format, extracted as plain text',
99+
},
100+
}
101+
}
102+
}
103+
}
79104

80105
const text = buffer.toString('utf8', 0, Math.min(buffer.length, 100000))
81106

apps/sim/lib/file-parsers/docx-parser.ts

Lines changed: 62 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ import { readFile } from 'fs/promises'
22
import { createLogger } from '@sim/logger'
33
import mammoth from 'mammoth'
44
import type { FileParseResult, FileParser } from '@/lib/file-parsers/types'
5+
import { sanitizeTextForUTF8 } from '@/lib/file-parsers/utils'
56

67
const logger = createLogger('DocxParser')
78

8-
// Define interface for mammoth result
99
interface MammothResult {
1010
value: string
1111
messages: any[]
@@ -19,7 +19,6 @@ export class DocxParser implements FileParser {
1919
}
2020

2121
const buffer = await readFile(filePath)
22-
2322
return this.parseBuffer(buffer)
2423
} catch (error) {
2524
logger.error('DOCX file error:', error)
@@ -29,26 +28,74 @@ export class DocxParser implements FileParser {
2928

3029
async parseBuffer(buffer: Buffer): Promise<FileParseResult> {
3130
try {
32-
logger.info('Parsing buffer, size:', buffer.length)
31+
if (!buffer || buffer.length === 0) {
32+
throw new Error('Empty buffer provided')
33+
}
3334

34-
const result = await mammoth.extractRawText({ buffer })
35+
try {
36+
const result = await mammoth.extractRawText({ buffer })
37+
38+
if (result.value && result.value.trim().length > 0) {
39+
let htmlResult: MammothResult = { value: '', messages: [] }
40+
try {
41+
htmlResult = await mammoth.convertToHtml({ buffer })
42+
} catch {
43+
// HTML conversion is optional
44+
}
45+
46+
return {
47+
content: sanitizeTextForUTF8(result.value),
48+
metadata: {
49+
extractionMethod: 'mammoth',
50+
messages: [...result.messages, ...htmlResult.messages],
51+
html: htmlResult.value,
52+
},
53+
}
54+
}
55+
} catch (mammothError) {
56+
logger.warn('mammoth failed, trying officeparser:', mammothError)
57+
}
3558

36-
let htmlResult: MammothResult = { value: '', messages: [] }
3759
try {
38-
htmlResult = await mammoth.convertToHtml({ buffer })
39-
} catch (htmlError) {
40-
logger.warn('HTML conversion warning:', htmlError)
60+
const officeParser = await import('officeparser')
61+
const result = await officeParser.parseOfficeAsync(buffer)
62+
63+
if (result) {
64+
const resultString = typeof result === 'string' ? result : String(result)
65+
const content = sanitizeTextForUTF8(resultString.trim())
66+
67+
if (content.length > 0) {
68+
return {
69+
content,
70+
metadata: {
71+
extractionMethod: 'officeparser',
72+
characterCount: content.length,
73+
},
74+
}
75+
}
76+
}
77+
} catch (officeError) {
78+
logger.warn('officeparser failed:', officeError)
4179
}
4280

43-
return {
44-
content: result.value,
45-
metadata: {
46-
messages: [...result.messages, ...htmlResult.messages],
47-
html: htmlResult.value,
48-
},
81+
const isZipFile = buffer.length >= 2 && buffer[0] === 0x50 && buffer[1] === 0x4b
82+
if (!isZipFile) {
83+
const textContent = buffer.toString('utf8').trim()
84+
if (textContent.length > 0) {
85+
return {
86+
content: sanitizeTextForUTF8(textContent),
87+
metadata: {
88+
extractionMethod: 'plaintext-fallback',
89+
characterCount: textContent.length,
90+
warning: 'File is not a valid DOCX format, extracted as plain text',
91+
},
92+
}
93+
}
4994
}
95+
96+
throw new Error('Failed to extract text from DOCX file')
5097
} catch (error) {
51-
logger.error('DOCX buffer parsing error:', error)
98+
logger.error('DOCX parsing error:', error)
5299
throw new Error(`Failed to parse DOCX buffer: ${(error as Error).message}`)
53100
}
54101
}

0 commit comments

Comments
 (0)