Skip to content

Commit 4a49062

Browse files
committed
Show accurate progress for scanning job
1 parent 5a7abb1 commit 4a49062

File tree

9 files changed

+88
-14
lines changed

9 files changed

+88
-14
lines changed

dalai/worker.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,25 @@ const worker = new Worker(
117117
async (job) => {
118118
const { s3Bucket, s3Key, outputBucket } = job.data || {}
119119

120+
/**
121+
* Full progress from 0 to 100
122+
*/
123+
let _progress = 0
124+
/**
125+
*
126+
* @param progress fraction (0-1) of progress of the current section
127+
* @param sectionSize the size of the section as a percentage of the whole job (0-100). All section sizes should add up to 100.
128+
*/
129+
const incrementProgress = (progress: number, sectionSize: number) => {
130+
_progress += progress * sectionSize
131+
job
132+
.updateProgress({
133+
ragFileId: job.data.ragFileId,
134+
progress: _progress,
135+
})
136+
.catch(() => {})
137+
}
138+
120139
console.log(`Processing job ${job.id}`)
121140

122141
if (!s3Bucket || !s3Key) {
@@ -146,6 +165,8 @@ const worker = new Worker(
146165
await fs.mkdir(outputTextDir, { recursive: true })
147166
await fs.mkdir(outputImagesDir, { recursive: true })
148167

168+
incrementProgress(1, 1) // 1% - Setup directories
169+
149170
/**
150171
* Download the pdf
151172
*/
@@ -155,6 +176,8 @@ const worker = new Worker(
155176
throw new Error(`Failed to download s3://${s3Bucket}/${s3Key}: ${err.message || err}`)
156177
}
157178

179+
incrementProgress(1, 1) // 1% - Download PDF
180+
158181
/**
159182
* Convert PDF pages to text
160183
*/
@@ -202,6 +225,8 @@ const worker = new Worker(
202225
throw new Error('PDF to text conversion failed')
203226
}
204227

228+
incrementProgress(1, 2) // 2% - PDF to text
229+
205230
/**
206231
* Convert PDF pages to PNG images
207232
*/
@@ -216,6 +241,8 @@ const worker = new Worker(
216241
throw new Error('PDF to PNG conversion failed')
217242
}
218243

244+
incrementProgress(1, 6) // 6% - PDF to PNGs. Total so far: 10%
245+
219246
/**
220247
* Transcription & Markdown Generation (with Ollama health/retry, fallback to PDF text)
221248
*/
@@ -268,6 +295,10 @@ const worker = new Worker(
268295
const txt = data?.response || ''
269296
await fs.writeFile(existingTxtPath, txt, 'utf-8')
270297
console.log(`Job ${job.id}: transcription complete for page ${pngPage.pageNumber}/${pngPages.length}`)
298+
299+
const pageProgress = 0.5 / pngPages.length // Halfway through the page processing
300+
incrementProgress(pageProgress, 87) // 87% - VLM & Markdown
301+
271302
return txt
272303
}, RETRY_COUNT)
273304
finalText = await retryOllamaCall(async () => {
@@ -311,6 +342,7 @@ const worker = new Worker(
311342
}
312343
await fs.writeFile(existingMdPath, text, 'utf-8')
313344
console.log(`Job ${job.id}: markdown generation complete for page ${pngPage.pageNumber}/${pngPages.length}`)
345+
314346
return text
315347
}, RETRY_COUNT)
316348
} catch (error) {
@@ -320,6 +352,9 @@ const worker = new Worker(
320352
await fs.writeFile(existingMdPath, finalText, 'utf-8')
321353
}
322354

355+
const pageProgress = 0.5 / pngPages.length // Second half of the page processing done
356+
incrementProgress(pageProgress, 87) // 87% - VLM & Markdown. Total so far: 97%
357+
323358
resultingMarkdown += `\n\n${finalText}`
324359
}
325360

@@ -336,6 +371,8 @@ const worker = new Worker(
336371
throw new Error(`Failed uploading outputs to s3://${outputBucket}: ${err.message || err}`)
337372
}
338373

374+
incrementProgress(1, 3) // 97 + 3 = 100% - Upload results
375+
339376
return {
340377
input: { bucket: s3Bucket, key: s3Key },
341378
output: { bucket: outputBucket },

src/client/components/Rag/RagFile.tsx

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import type { RagFileAttributes } from '../../../shared/types'
55
import { Box, Button, Container, Link, Typography } from '@mui/material'
66
import { RagFileInfo } from './RagFileDetails'
77
import type { RagIndexAttributes } from '../../../server/db/models/ragIndex'
8-
import { Chunk } from './Chunk'
98
import { useDeleteRagFileMutation, useDeleteRagFileTextMutation } from './api'
109
import { useTranslation } from 'react-i18next'
1110
import Markdown from 'react-markdown'

src/client/components/Rag/RagFileDetails.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ export const RagFileInfo: React.FC<{
5454
<TableCell>
5555
<Box display="flex" alignItems="end" gap={1}>
5656
{IngestionPipelineStages[file.pipelineStage]}
57+
{file.pipelineStage === 'parsing' ? file.progress && ` (${file.progress.toFixed()}%)` : null}
5758
{ProgressIcon[file.pipelineStage]}
5859
</Box>
5960
</TableCell>
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import { DataTypes } from 'sequelize'
2+
3+
import type { Migration } from '../connection'
4+
5+
export const up: Migration = async ({ context: queryInterface }) => {
6+
await queryInterface.addColumn('rag_files', 'progress', {
7+
type: DataTypes.FLOAT,
8+
allowNull: true,
9+
})
10+
}
11+
12+
export const down: Migration = async ({ context: queryInterface }) => {
13+
await queryInterface.removeColumn('rag_files', 'progress')
14+
}

src/server/db/models/ragFile.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ class RagFile extends Model<InferAttributes<RagFile>, InferCreationAttributes<Ra
1111

1212
declare pipelineStage: IngestionPipelineStageKey
1313

14+
declare progress: CreationOptional<number | null>
15+
1416
declare error: CreationOptional<string | null>
1517

1618
declare filename: string
@@ -58,6 +60,10 @@ RagFile.init(
5860
type: DataTypes.STRING,
5961
allowNull: false,
6062
},
63+
progress: {
64+
type: DataTypes.FLOAT,
65+
allowNull: true,
66+
},
6167
error: {
6268
type: DataTypes.STRING,
6369
allowNull: true,

src/server/routes/rag/ragIndex.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { SearchSchema } from '../../../shared/rag'
1313
import { S3_BUCKET } from '../../util/config'
1414
import { s3Client } from '../../util/s3client'
1515
import { ingestRagFiles } from '../../services/rag/ingestion'
16+
import { getPdfParsingJobId, queue } from 'src/server/services/jobs/pdfParsing.job'
1617

1718
const ragIndexRouter = Router()
1819

src/server/services/jobs/pdfParsing.job.ts

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import IORedis from 'ioredis'
22
import { BMQ_REDIS_CA, BMQ_REDIS_CERT, BMQ_REDIS_HOST, BMQ_REDIS_KEY, BMQ_REDIS_PORT, S3_BUCKET } from '../../util/config'
3-
import { Queue, QueueEvents } from 'bullmq'
3+
import { Job, Queue, QueueEvents } from 'bullmq'
44
import { FileStore } from '../rag/fileStore'
55
import { RagFile } from '../../db/models'
66

@@ -24,30 +24,45 @@ if (BMQ_REDIS_CA !== 'none') {
2424

2525
const connection = new IORedis(creds)
2626

27-
const queue = new Queue('llama-scan-queue', {
27+
export const queue = new Queue('llama-scan-queue', {
2828
connection,
2929
})
3030

31+
export const getPdfParsingJobId = (ragFile: RagFile) => {
32+
const s3Key = FileStore.getRagFileKey(ragFile)
33+
return `scan:${S3_BUCKET}/${s3Key}`
34+
}
35+
3136
export const pdfQueueEvents = new QueueEvents('llama-scan-queue', { connection })
3237

38+
pdfQueueEvents.on('progress', async (progressEvent) => {
39+
const data = progressEvent.data as { ragFileId: number; progress: number }
40+
await RagFile.update({ progress: data.progress }, { where: { id: data.ragFileId } })
41+
})
42+
43+
type PDFJobData = {
44+
s3Bucket: string
45+
s3Key: string
46+
outputBucket: string
47+
ragFileId: number
48+
}
49+
3350
/**
3451
* Adds a pdf parsing job to the queue. The file must be uploaded to S3 beforehand. The jobId is based on the ragFile - resubmitting with the same jobId while the previous job is running has no effect.
3552
* @param ragFile
3653
* @returns the job
3754
*/
3855
export const submitPdfParsingJob = async (ragFile: RagFile) => {
3956
const s3Key = FileStore.getRagFileKey(ragFile)
40-
const jobId = `scan:${S3_BUCKET}/${s3Key}`
57+
const jobId = getPdfParsingJobId(ragFile)
4158
console.log(`Submitting PDF parsing job ${jobId}`)
42-
const job = await queue.add(
43-
jobId,
44-
{
45-
s3Bucket: S3_BUCKET,
46-
s3Key,
47-
outputBucket: S3_BUCKET,
48-
},
49-
{ jobId, removeOnComplete: true, removeOnFail: true },
50-
)
59+
const jobData: PDFJobData = {
60+
s3Bucket: S3_BUCKET,
61+
s3Key,
62+
outputBucket: S3_BUCKET,
63+
ragFileId: ragFile.id,
64+
}
65+
const job = await queue.add(jobId, jobData, { jobId, removeOnComplete: true, removeOnFail: true })
5166

5267
return job
5368
}

src/shared/ingestion.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ export const IngestionPipelineStages = {
22
uploading: 'Uploading',
33
parsing: 'Scanning',
44
embedding: 'Creating embeddings',
5-
storing: 'Storing',
5+
storing: 'Waiting',
66
completed: 'Completed',
77
error: 'Error',
88
} as const

src/shared/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ export type RagFileAttributes = {
2424
updatedAt: string
2525
ragIndexId: number
2626
pipelineStage: IngestionPipelineStageKey
27+
progress: number | null
2728
fileType: string
2829
fileSize: number
2930
numChunks: number | null

0 commit comments

Comments
 (0)