1
+ import { GetObjectCommand , PutObjectCommand , S3Client } from '@aws-sdk/client-s3'
1
2
import { Worker } from 'bullmq'
2
- import path from 'node:path'
3
+ import dotenv from 'dotenv'
4
+ import Redis , { type RedisOptions } from 'ioredis'
5
+ import { createWriteStream } from 'node:fs'
3
6
import fs from 'node:fs/promises'
4
- import { S3Client , GetObjectCommand , PutObjectCommand } from '@aws-sdk/client-s3 '
7
+ import path from 'node:path '
5
8
import { pipeline } from 'node:stream'
6
- import { createWriteStream , createReadStream } from 'node:fs'
7
- import Redis from 'ioredis'
8
- import { pdfToPng } from 'pdf-to-png-converter'
9
- import { v4 as uuidv4 } from 'uuid'
10
9
import { promisify } from 'node:util'
11
10
import pdfToText from 'pdf-parse-fork'
12
- import dotenv from 'dotenv '
11
+ import { pdfToPng , type PngPageOutput } from 'pdf-to-png-converter '
13
12
14
13
dotenv . config ( )
15
14
16
15
const pipelineAsync = promisify ( pipeline )
17
16
18
- async function downloadS3ToFile ( s3 , bucket , key , destPath ) {
17
+ async function downloadS3ToFile ( s3 : S3Client , bucket , key : string , destPath : string ) {
19
18
const res = await s3 . send ( new GetObjectCommand ( { Bucket : bucket , Key : key } ) )
20
19
await fs . mkdir ( path . dirname ( destPath ) , { recursive : true } )
20
+ if ( ! res . Body ) {
21
+ throw new Error ( 'No Body in S3 GetObject response' )
22
+ }
21
23
await pipelineAsync ( res . Body , createWriteStream ( destPath ) )
22
24
}
23
25
24
- async function uploadFileToS3 ( s3 , bucket , key , filePath , contentType ) {
26
+ async function uploadFileToS3 ( s3 : S3Client , bucket , key : string , filePath : string , contentType : string ) {
25
27
const Body = await fs . readFile ( filePath )
26
28
await s3 . send ( new PutObjectCommand ( { Bucket : bucket , Key : key , Body, ContentType : contentType } ) )
27
29
}
@@ -35,7 +37,7 @@ async function pathExists(p) {
35
37
}
36
38
}
37
39
38
- function guessContentType ( filePath ) {
40
+ function guessContentType ( filePath : string ) {
39
41
const ext = path . extname ( filePath ) . toLowerCase ( )
40
42
if ( ext === '.txt' ) return 'text/plain charset=utf-8'
41
43
if ( ext === '.json' ) return 'application/json'
@@ -55,9 +57,9 @@ const CA = process.env.CA || undefined
55
57
const CERT = process . env . CERT
56
58
const KEY = process . env . KEY
57
59
58
- let creds = {
60
+ let creds : RedisOptions = {
59
61
host : REDIS_HOST ,
60
- port : REDIS_PORT ,
62
+ port : Number ( REDIS_PORT ) || 6379 ,
61
63
maxRetriesPerRequest : null ,
62
64
}
63
65
@@ -69,7 +71,7 @@ if (CA !== undefined) {
69
71
cert : CERT ,
70
72
key : KEY ,
71
73
servername : REDIS_HOST ,
72
- }
74
+ } ,
73
75
}
74
76
}
75
77
@@ -79,10 +81,10 @@ const connection = new Redis(creds)
79
81
80
82
const QUEUE_NAME = process . env . LLAMA_SCAN_QUEUE || 'llama-scan-queue'
81
83
const S3_HOST = process . env . S3_HOST || ''
82
- const S3_ACCESS_KEY = process . env . S3_ACCESS_KEY
83
- const S3_SECRET_ACCESS_KEY = process . env . S3_SECRET_ACCESS_KEY
84
+ const S3_ACCESS_KEY = process . env . S3_ACCESS_KEY || ''
85
+ const S3_SECRET_ACCESS_KEY = process . env . S3_SECRET_ACCESS_KEY || ''
84
86
const OLLAMA_URL = process . env . LAAMA_API_URL ?? process . env . OLLAMA_URL
85
- const LAAMA_API_TOKEN = process . LAAMA_API_TOKEN ?? ''
87
+ const LAAMA_API_TOKEN = process . env . LAAMA_API_TOKEN ?? ''
86
88
87
89
const s3 = new S3Client ( {
88
90
region : 'eu-north-1' ,
@@ -94,15 +96,15 @@ const s3 = new S3Client({
94
96
} ,
95
97
} )
96
98
97
- async function retryOllamaCall ( fn , maxRetries = 3 ) {
98
- let lastError
99
+ async function retryOllamaCall < T > ( fn : ( ) => Promise < T > , maxRetries = 3 ) : Promise < T > {
100
+ let lastError : any
99
101
for ( let i = 0 ; i < maxRetries ; i ++ ) {
100
102
// Health check before each attempt
101
103
try {
102
104
return await fn ( )
103
105
} catch ( err ) {
104
106
lastError = err
105
- await new Promise ( r => setTimeout ( r , 1000 * ( i + 1 ) ) )
107
+ await new Promise ( ( r ) => setTimeout ( r , 1000 * ( i + 1 ) ) )
106
108
}
107
109
}
108
110
throw lastError
@@ -113,11 +115,7 @@ async function retryOllamaCall(fn, maxRetries = 3) {
113
115
const worker = new Worker (
114
116
QUEUE_NAME ,
115
117
async ( job ) => {
116
- const {
117
- s3Bucket,
118
- s3Key,
119
- outputBucket,
120
- } = job . data || { }
118
+ const { s3Bucket, s3Key, outputBucket } = job . data || { }
121
119
122
120
console . log ( `Processing job ${ job . id } ` )
123
121
@@ -128,7 +126,11 @@ const worker = new Worker(
128
126
throw new Error ( 'outputBucket is required in job data' )
129
127
}
130
128
131
- const jobIdPath = job . id . replaceAll ( '\/' , '_' )
129
+ const jobId = job . id
130
+ if ( ! jobId ) {
131
+ throw new Error ( 'Job ID is missing' )
132
+ }
133
+ const jobIdPath = jobId . replaceAll ( '\/' , '_' )
132
134
133
135
const uploadsDir = './uploads'
134
136
const jobRootDir = path . join ( uploadsDir , jobIdPath )
@@ -154,24 +156,25 @@ const worker = new Worker(
154
156
}
155
157
156
158
/**
157
- * Convert PDF pages to text
158
- */
159
+ * Convert PDF pages to text
160
+ */
159
161
function pagerender ( pageData ) {
160
162
let render_options = {
161
163
normalizeWhitespace : false ,
162
164
disableCombineTextItems : false ,
163
165
}
164
166
return pageData . getTextContent ( render_options ) . then ( ( textContent ) => {
165
- let lastY , text = ''
167
+ let lastY : number | null = null ,
168
+ text = ''
166
169
for ( let item of textContent . items ) {
167
- if ( lastY == item . transform [ 5 ] || ! lastY ) {
170
+ if ( lastY === item . transform [ 5 ] || ! lastY ) {
168
171
text += item . str
169
172
} else {
170
- text += "\n" + item . str
173
+ text += '\n' + item . str
171
174
}
172
175
lastY = item . transform [ 5 ]
173
176
}
174
- return `${ JSON . stringify ( { text, pageNumber : pageData . pageNumber } ) } \n` ;
177
+ return `${ JSON . stringify ( { text, pageNumber : pageData . pageNumber } ) } \n`
175
178
} )
176
179
}
177
180
@@ -180,16 +183,19 @@ const worker = new Worker(
180
183
try {
181
184
const dataBuffer = await fs . readFile ( inputLocalPath )
182
185
const data = await pdfToText ( dataBuffer , { pagerender } )
183
- const jsonObjStrs = data . text . split ( '\n' ) . filter ( line => line . trim ( ) . startsWith ( '{' ) && line . trim ( ) . endsWith ( '}' ) )
184
- jsonObjStrs . map ( line => {
185
- try {
186
- return JSON . parse ( line )
187
- } catch {
188
- return null
189
- }
190
- } ) . filter ( page => page !== null && typeof page . pageNumber === 'number' && typeof page . text === 'string' ) . forEach ( page => {
191
- pages [ page . pageNumber ] = page . text
192
- } )
186
+ const jsonObjStrs = data . text . split ( '\n' ) . filter ( ( line ) => line . trim ( ) . startsWith ( '{' ) && line . trim ( ) . endsWith ( '}' ) )
187
+ jsonObjStrs
188
+ . map ( ( line ) => {
189
+ try {
190
+ return JSON . parse ( line )
191
+ } catch {
192
+ return null
193
+ }
194
+ } )
195
+ . filter ( ( page ) => page !== null && typeof page . pageNumber === 'number' && typeof page . text === 'string' )
196
+ . forEach ( ( page ) => {
197
+ pages [ page . pageNumber ] = page . text
198
+ } )
193
199
console . log ( `Job ${ job . id } : PDF to text conversion complete` )
194
200
} catch ( error ) {
195
201
console . error ( `Job ${ job . id } failed: PDF to text conversion failed` , error )
@@ -199,12 +205,12 @@ const worker = new Worker(
199
205
/**
200
206
* Convert PDF pages to PNG images
201
207
*/
202
- let pngPages
208
+ let pngPages : PngPageOutput [ ] = [ ]
203
209
try {
204
210
pngPages = await pdfToPng ( inputLocalPath , {
205
211
outputFileMaskFunc : ( pageNumber ) => `page_${ pageNumber } .png` ,
206
212
outputFolder : outputImagesDir ,
207
- } ) ;
213
+ } )
208
214
} catch ( error ) {
209
215
console . error ( `Job ${ job . id } failed: PDF to PNG conversion failed` , error )
210
216
throw new Error ( 'PDF to PNG conversion failed' )
@@ -251,8 +257,8 @@ const worker = new Worker(
251
257
But you are always obligated to keep the **image** tags intact.` ,
252
258
prompt : `Parsed PDF text:\n${ pdfText } \n\nImage transcription:` ,
253
259
stream : false ,
254
- images : [ image . toString ( 'base64' ) ]
255
- } )
260
+ images : [ image . toString ( 'base64' ) ] ,
261
+ } ) ,
256
262
} )
257
263
if ( ! response . ok ) {
258
264
const errorBody = await response . text ( )
@@ -269,7 +275,7 @@ const worker = new Worker(
269
275
method : 'POST' ,
270
276
headers : {
271
277
'Content-Type' : 'application/json' ,
272
- ' token' : LAAMA_API_TOKEN
278
+ token : LAAMA_API_TOKEN ,
273
279
} ,
274
280
body : JSON . stringify ( {
275
281
model : 'qwen2.5vl:7b' ,
@@ -282,16 +288,19 @@ const worker = new Worker(
282
288
Remeber you are always obligated to keep the **image** tags and tags insides intact.` ,
283
289
prompt : `Transcription:\n${ transcription } \n\nPDF:\n${ pdfText } \n\nCombined Markdown:` ,
284
290
stream : false ,
285
- } )
291
+ } ) ,
286
292
} )
287
293
if ( ! response2 . ok ) {
288
294
const errorBody = await response2 . text ( )
289
295
throw new Error ( `Ollama Markdown API request failed with status ${ response2 . status } : ${ errorBody } ` )
290
296
}
291
297
const data2 = await response2 . json ( )
292
298
let text = data2 ?. response || ''
293
- if ( text . trim ( ) . startsWith ( "```markdown" ) ) {
294
- text = text . replace ( / ^ ` ` ` m a r k d o w n / , '' ) . replace ( / ` ` ` $ / , '' ) . trim ( )
299
+ if ( text . trim ( ) . startsWith ( '```markdown' ) ) {
300
+ text = text
301
+ . replace ( / ^ ` ` ` m a r k d o w n / , '' )
302
+ . replace ( / ` ` ` $ / , '' )
303
+ . trim ( )
295
304
}
296
305
// Add page number to the end of the first line if it's a heading
297
306
function appendToFirstLine ( content , suffix ) {
@@ -312,7 +321,6 @@ const worker = new Worker(
312
321
}
313
322
314
323
resultingMarkdown += `\n\n${ finalText } `
315
-
316
324
}
317
325
318
326
const resultFileName = `${ inputFileName } .md`
@@ -333,12 +341,14 @@ const worker = new Worker(
333
341
output : { bucket : outputBucket } ,
334
342
}
335
343
} finally {
336
- try { await fs . rm ( jobRootDir , { recursive : true , force : true } ) } catch { }
344
+ try {
345
+ await fs . rm ( jobRootDir , { recursive : true , force : true } )
346
+ } catch { }
337
347
}
338
348
} ,
339
349
{
340
350
connection,
341
- }
351
+ } ,
342
352
)
343
353
344
354
console . log ( `Worker started. Listening to queue "${ QUEUE_NAME } "...` )
@@ -353,7 +363,9 @@ worker.on('failed', (job, err) => {
353
363
354
364
async function shutdown ( ) {
355
365
console . log ( 'Shutting down worker...' )
356
- try { await worker . close ( ) } catch { }
366
+ try {
367
+ await worker . close ( )
368
+ } catch { }
357
369
process . exit ( 0 )
358
370
}
359
371
process . on ( 'SIGINT' , shutdown )
0 commit comments