Skip to content

Commit de0a277

Browse files
committed
pdf parsing with job
1 parent 624d13b commit de0a277

File tree

4 files changed

+36
-33
lines changed

4 files changed

+36
-33
lines changed

compose.yaml

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,21 +42,26 @@ services:
4242
- redis_data:/data
4343

4444
# Comment out if you use local ollama. Set OLLAMA_URL=host.docker.internal:11434 if using recent Docker Desktop.
45-
# ollama:
46-
# image: ollama/ollama
47-
# ports:
48-
# - 11434:11434 # expose the Ollama API to the host
49-
# volumes:
50-
# - ollama_data:/root/.ollama # persistent model storage
45+
ollama:
46+
image: ollama/ollama
47+
ports:
48+
- 11434:11434 # expose the Ollama API to the host
49+
volumes:
50+
- ollama_data:/root/.ollama # persistent model storage
51+
entrypoint: ["/bin/bash", "-c", "\
52+
ollama serve & \
53+
sleep 5 && \
54+
ollama pull qwen2.5vl:7b && \
55+
wait"]
5156

5257
dalai:
53-
# image: toska/dalai:latest
58+
# # image: toska/dalai:latest
5459
build:
5560
context: ../dalai
5661
dockerfile: ../dalai/dev.Dockerfile
5762
environment:
58-
- OLLAMA_URL=http://host.docker.internal:11434
59-
#- OLLAMA_URL=http://ollama:11434
63+
# - OLLAMA_URL=http://host.docker.internal:11434
64+
- OLLAMA_URL=http://ollama:11434
6065
- REDIS_HOST=redis
6166
- REDIS_PORT=6379
6267
- S3_HOST=http://minio:9000
@@ -67,7 +72,7 @@ services:
6772
- dalai_data:/app
6873
container_name: gptwrapper_dalai
6974
depends_on:
70-
- redis
75+
- redis
7176

7277
minio:
7378
image: minio/minio:latest
@@ -95,7 +100,7 @@ services:
95100
/bin/sh -c "
96101
set -e;
97102
until (mc alias set local http://minio:9000 minioadmin minioadmin); do
98-
echo 'Waiting for MinIO...'; sleep 10;
103+
echo 'Waiting for MinIO...'; sleep 2;
99104
done;
100105
mc mb --ignore-existing local/mybucket;
101106
tail -f /dev/null

src/server/services/jobs/pdfParsing.job.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,8 @@ const queue = new Queue('llama-scan-queue', {
3131
export const pdfQueueEvents = new QueueEvents('llama-scan-queue', { connection })
3232

3333
/**
34-
* Adds a pdf parsing job to the queue. The file must be uploaded to S3 beforehand. The jobId is based on the filename and prefix - resubmitting with the same jobId while the previous job is running has no effect.
35-
* @param filename
36-
* @param prefix
34+
* Adds a pdf parsing job to the queue. The file must be uploaded to S3 beforehand. The jobId is based on the ragFile - resubmitting with the same jobId while the previous job is running has no effect.
35+
* @param ragFile
3736
* @returns the job
3837
*/
3938
export const submitPdfParsingJob = async (ragFile: RagFile) => {

src/server/services/rag/fileStore.ts

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import {
88
} from '@aws-sdk/client-s3'
99
import type { RagFile, RagIndex } from '../../db/models'
1010
import { ApplicationError } from '../../util/ApplicationError'
11-
import { pdfToText } from '../../util/pdfToText'
1211
import { S3_BUCKET } from '../../util/config'
1312
import { s3Client } from '../../util/s3client'
1413

@@ -129,12 +128,3 @@ const streamToString = (stream: any): Promise<string> => {
129128
stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf-8')))
130129
})
131130
}
132-
133-
const streamToBuffer = (stream: any): Promise<Buffer> => {
134-
return new Promise((resolve, reject) => {
135-
const chunks: any[] = []
136-
stream.on('data', (chunk: any) => chunks.push(chunk))
137-
stream.on('error', reject)
138-
stream.on('end', () => resolve(Buffer.concat(chunks)))
139-
})
140-
}

src/server/services/rag/ingestion.ts

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,26 @@ export const ingestRagFiles = async (ragIndex: RagIndex) => {
3434
console.time(`Ingestion ${ragFile.filename}`)
3535

3636
await ragFile.save()
37-
38-
const job = await submitPdfParsingJob(ragFile)
37+
let needToParse = false
3938

4039
try {
41-
await job.waitUntilFinished(pdfQueueEvents)
42-
} catch (error: any) {
43-
console.error('Error waiting for PDF parsing job to finish:', error)
44-
ragFile.pipelineStage = 'error'
45-
ragFile.error = 'PDF parsing failed'
46-
await ragFile.save()
47-
return
40+
await FileStore.readRagFileTextContent(ragFile)
41+
} catch (error) {
42+
needToParse = true
43+
}
44+
45+
if (needToParse) {
46+
const job = await submitPdfParsingJob(ragFile)
47+
48+
try {
49+
await job.waitUntilFinished(pdfQueueEvents)
50+
} catch (error: any) {
51+
console.error('Error waiting for PDF parsing job to finish:', error)
52+
ragFile.pipelineStage = 'error'
53+
ragFile.error = 'PDF parsing failed'
54+
await ragFile.save()
55+
return
56+
}
4857
}
4958

5059
const text = await FileStore.readRagFileTextContent(ragFile)

0 commit comments

Comments
 (0)