Skip to content

Commit 96882fe

Browse files
refactor(jobs): optimize question chunking for parallel processing (#1825)
Co-authored-by: Tofik Hasanov <[email protected]>
1 parent 35c9aec commit 96882fe

File tree

5 files changed

+34
-43
lines changed

5 files changed

+34
-43
lines changed

apps/app/src/jobs/tasks/vendors/parse-questionnaire.ts

Lines changed: 22 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -456,10 +456,10 @@ ${chunk}`,
456456
* Optimized to handle large content by chunking and processing in parallel
457457
*/
458458
async function parseQuestionsAndAnswers(content: string): Promise<QuestionAnswer[]> {
459-
// GPT-5-mini can handle ~128k tokens. Chunk by question count + char limit for efficiency.
459+
// GPT-5-mini can handle ~128k tokens. Chunk by individual questions (1 question = 1 chunk) for parallel processing.
460460
const MAX_CHUNK_SIZE_CHARS = 80_000;
461461
const MIN_CHUNK_SIZE_CHARS = 5_000;
462-
const MAX_QUESTIONS_PER_CHUNK = 35;
462+
const MAX_QUESTIONS_PER_CHUNK = 1; // Each chunk contains exactly one question
463463

464464
const chunkInfos = buildQuestionAwareChunks(content, {
465465
maxChunkChars: MAX_CHUNK_SIZE_CHARS,
@@ -482,10 +482,10 @@ async function parseQuestionsAndAnswers(content: string): Promise<QuestionAnswer
482482

483483
const totalEstimatedQuestions = chunkInfos.reduce((sum, chunk) => sum + chunk.questionCount, 0);
484484

485-
logger.info('Chunking content by question count for parallel processing', {
485+
logger.info('Chunking content by individual questions (1 question per chunk) for parallel processing', {
486486
contentLength: content.length,
487487
totalChunks: chunkInfos.length,
488-
avgQuestionsPerChunk: Number((totalEstimatedQuestions / chunkInfos.length || 0).toFixed(2)),
488+
questionsPerChunk: 1, // Each chunk contains exactly one question
489489
});
490490

491491
// Process all chunks in parallel for maximum speed
@@ -547,60 +547,51 @@ function buildQuestionAwareChunks(
547547
return [];
548548
}
549549

550-
if (trimmedContent.length <= options.minChunkChars) {
551-
return [
552-
{
553-
content: trimmedContent,
554-
questionCount: estimateQuestionCount(trimmedContent),
555-
},
556-
];
557-
}
558-
559550
const chunks: ChunkInfo[] = [];
560551
const lines = trimmedContent.split(/\r?\n/);
561-
let buffer: string[] = [];
562-
let bufferCharCount = 0;
563-
let bufferQuestionCount = 0;
552+
let currentChunk: string[] = [];
553+
let currentQuestionFound = false;
564554

565555
const pushChunk = () => {
566-
const chunkText = buffer.join('\n').trim();
556+
const chunkText = currentChunk.join('\n').trim();
567557
if (!chunkText) {
568558
return;
569559
}
570560
chunks.push({
571561
content: chunkText,
572-
questionCount: bufferQuestionCount || estimateQuestionCount(chunkText),
562+
questionCount: 1, // Each chunk contains exactly one question
573563
});
574-
buffer = [];
575-
bufferCharCount = 0;
576-
bufferQuestionCount = 0;
564+
currentChunk = [];
565+
currentQuestionFound = false;
577566
};
578567

579568
for (const line of lines) {
580-
const originalLine = line;
581569
const trimmedLine = line.trim();
582570
const isEmpty = trimmedLine.length === 0;
583571
const looksLikeQuestion = !isEmpty && looksLikeQuestionLine(trimmedLine);
584572

585-
const exceedsCharBudget = bufferCharCount + originalLine.length > options.maxChunkChars;
586-
const exceedsQuestionBudget = bufferQuestionCount >= options.maxQuestionsPerChunk;
587-
588-
if ((exceedsCharBudget || (exceedsQuestionBudget && looksLikeQuestion)) && buffer.length) {
573+
// If we find a new question and we already have a question in the current chunk, start a new chunk
574+
if (looksLikeQuestion && currentQuestionFound && currentChunk.length > 0) {
589575
pushChunk();
590576
}
591577

592-
if (!isEmpty || buffer.length) {
593-
buffer.push(originalLine);
594-
bufferCharCount += originalLine.length + 1;
578+
// Add line to current chunk (including empty lines for context)
579+
if (!isEmpty || currentChunk.length > 0) {
580+
currentChunk.push(line);
595581
}
596582

583+
// Mark that we've found a question in this chunk
597584
if (looksLikeQuestion) {
598-
bufferQuestionCount += 1;
585+
currentQuestionFound = true;
599586
}
600587
}
601588

602-
pushChunk();
589+
// Push the last chunk if it has content
590+
if (currentChunk.length > 0) {
591+
pushChunk();
592+
}
603593

594+
// If no questions were detected, return the entire content as a single chunk
604595
return chunks.length > 0
605596
? chunks
606597
: [

apps/app/src/lib/vector/core/count-embeddings.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ export async function countEmbeddings(
3030

3131
const results = await vectorIndex.query({
3232
vector: queryEmbedding,
33-
topK: 1000, // Max allowed by Upstash Vector
33+
topK: 100, // Max allowed by Upstash Vector
3434
includeMetadata: true,
3535
});
3636

@@ -101,7 +101,7 @@ export async function listManualAnswerEmbeddings(
101101

102102
const results = await vectorIndex.query({
103103
vector: queryEmbedding,
104-
topK: 1000,
104+
topK: 100,
105105
includeMetadata: true,
106106
});
107107

apps/app/src/lib/vector/core/find-existing-embeddings.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ export async function findEmbeddingsForSource(
4848
const orgQueryEmbedding = await generateEmbedding(organizationId);
4949
const orgResults = await vectorIndex.query({
5050
vector: orgQueryEmbedding,
51-
topK: 1000,
51+
topK: 100,
5252
includeMetadata: true,
5353
});
5454

@@ -84,7 +84,7 @@ export async function findEmbeddingsForSource(
8484
const sourceQueryEmbedding = await generateEmbedding(sourceId);
8585
const sourceResults = await vectorIndex.query({
8686
vector: sourceQueryEmbedding,
87-
topK: 1000,
87+
topK: 100,
8888
includeMetadata: true,
8989
});
9090

@@ -122,7 +122,7 @@ export async function findEmbeddingsForSource(
122122
const combinedQueryEmbedding = await generateEmbedding(combinedQuery);
123123
const combinedResults = await vectorIndex.query({
124124
vector: combinedQueryEmbedding,
125-
topK: 1000,
125+
topK: 100,
126126
includeMetadata: true,
127127
});
128128

@@ -160,7 +160,7 @@ export async function findEmbeddingsForSource(
160160
const docNameQueryEmbedding = await generateEmbedding(documentName);
161161
const docNameResults = await vectorIndex.query({
162162
vector: docNameQueryEmbedding,
163-
topK: 1000,
163+
topK: 100,
164164
includeMetadata: true,
165165
});
166166

@@ -221,7 +221,7 @@ export async function findEmbeddingsForSource(
221221
const contentQueryEmbedding = await generateEmbedding(contentQuery);
222222
const contentResults = await vectorIndex.query({
223223
vector: contentQueryEmbedding,
224-
topK: 1000,
224+
topK: 100,
225225
includeMetadata: true,
226226
});
227227

@@ -251,7 +251,7 @@ export async function findEmbeddingsForSource(
251251
const filenameQueryEmbedding = await generateEmbedding(chunkDocumentName);
252252
const filenameResults = await vectorIndex.query({
253253
vector: filenameQueryEmbedding,
254-
topK: 1000,
254+
topK: 100,
255255
includeMetadata: true,
256256
});
257257

@@ -306,7 +306,7 @@ export async function findEmbeddingsForSource(
306306
const genericQueryEmbedding = await generateEmbedding(genericQuery);
307307
const genericResults = await vectorIndex.query({
308308
vector: genericQueryEmbedding,
309-
topK: 1000,
309+
topK: 100,
310310
includeMetadata: true,
311311
});
312312

@@ -389,7 +389,7 @@ export async function findAllOrganizationEmbeddings(
389389
// Respect Upstash Vector limit of 1000
390390
const results = await vectorIndex.query({
391391
vector: queryEmbedding,
392-
topK: 1000, // Max allowed by Upstash Vector
392+
topK: 100, // Max allowed by Upstash Vector
393393
includeMetadata: true,
394394
});
395395

apps/app/src/lib/vector/core/find-similar.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ export async function findSimilarContent(
4949
// so we'll filter results after retrieval
5050
const results = await vectorIndex.query({
5151
vector: queryEmbedding,
52-
topK: limit * 2, // Get more results to account for filtering
52+
topK: 100, // Get more results to account for filtering
5353
includeMetadata: true,
5454
});
5555

apps/app/src/lib/vector/sync/sync-organization.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ async function performSync(organizationId: string): Promise<void> {
259259
return; // Skip empty context
260260
}
261261

262-
const chunks = chunkText(contextText, 500, 50);
262+
const chunks = chunkText(contextText, 8000, 50);
263263

264264
if (chunks.length === 0) {
265265
return; // Skip if no chunks

0 commit comments

Comments
 (0)