Skip to content

Commit d6a6549

Browse files
committed
fix worker
1 parent e6b6f5e commit d6a6549

File tree

1 file changed

+30
-29
lines changed

1 file changed

+30
-29
lines changed

dalai/worker.ts

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ import path from 'node:path'
33
import fs from 'node:fs/promises'
44
import { S3Client, GetObjectCommand, PutObjectCommand } from '@aws-sdk/client-s3'
55
import { pipeline } from 'node:stream'
6-
import { createWriteStream } from 'node:fs'
7-
import Redis, { RedisOptions } from 'ioredis'
8-
import { pdfToPng, PngPageOutput } from 'pdf-to-png-converter'
6+
import { createWriteStream, createReadStream } from 'node:fs'
7+
import Redis from 'ioredis'
8+
import { pdfToPng } from 'pdf-to-png-converter'
9+
import { v4 as uuidv4 } from 'uuid'
910
import { promisify } from 'node:util'
1011
import pdfToText from 'pdf-parse-fork'
1112
import dotenv from 'dotenv'
@@ -14,18 +15,18 @@ dotenv.config()
1415

1516
const pipelineAsync = promisify(pipeline)
1617

17-
const downloadS3ToFile = async (s3, bucket, key, destPath) => {
18+
async function downloadS3ToFile(s3, bucket, key, destPath) {
1819
const res = await s3.send(new GetObjectCommand({ Bucket: bucket, Key: key }))
1920
await fs.mkdir(path.dirname(destPath), { recursive: true })
2021
await pipelineAsync(res.Body, createWriteStream(destPath))
2122
}
2223

23-
const uploadFileToS3 = async (s3, bucket, key, filePath, contentType) => {
24+
async function uploadFileToS3(s3, bucket, key, filePath, contentType) {
2425
const Body = await fs.readFile(filePath)
2526
await s3.send(new PutObjectCommand({ Bucket: bucket, Key: key, Body, ContentType: contentType }))
2627
}
2728

28-
const pathExists = async (p) => {
29+
async function pathExists(p) {
2930
try {
3031
await fs.access(p)
3132
return true
@@ -34,7 +35,7 @@ const pathExists = async (p) => {
3435
}
3536
}
3637

37-
const guessContentType = (filePath: string) => {
38+
function guessContentType(filePath) {
3839
const ext = path.extname(filePath).toLowerCase()
3940
if (ext === '.txt') return 'text/plain charset=utf-8'
4041
if (ext === '.json') return 'application/json'
@@ -48,19 +49,19 @@ const guessContentType = (filePath: string) => {
4849

4950
// --- Config ---
5051

51-
const REDIS_HOST = process.env.REDIS_HOST ?? ''
52-
const REDIS_PORT = parseInt(process.env.REDIS_PORT ?? '6379')
53-
const CA = process.env.CA ?? ''
54-
const CERT = process.env.CERT ?? ''
55-
const KEY = process.env.KEY ?? ''
52+
const REDIS_HOST = process.env.REDIS_HOST
53+
const REDIS_PORT = process.env.REDIS_PORT
54+
const CA = process.env.CA || undefined
55+
const CERT = process.env.CERT
56+
const KEY = process.env.KEY
5657

57-
let creds: RedisOptions = {
58+
let creds = {
5859
host: REDIS_HOST,
5960
port: REDIS_PORT,
6061
maxRetriesPerRequest: null,
6162
}
6263

63-
if (CA.length > 0) {
64+
if (CA !== undefined) {
6465
creds = {
6566
...creds,
6667
tls: {
@@ -76,12 +77,12 @@ const RETRY_COUNT = 1
7677

7778
const connection = new Redis(creds)
7879

79-
const QUEUE_NAME = process.env.LLAMA_SCAN_QUEUE ?? 'llama-scan-queue'
80-
const S3_HOST = process.env.S3_HOST ?? ''
81-
const S3_ACCESS_KEY = process.env.S3_ACCESS_KEY ?? ''
82-
const S3_SECRET_ACCESS_KEY = process.env.S3_SECRET_ACCESS_KEY ?? ''
80+
const QUEUE_NAME = process.env.LLAMA_SCAN_QUEUE || 'llama-scan-queue'
81+
const S3_HOST = process.env.S3_HOST || ''
82+
const S3_ACCESS_KEY = process.env.S3_ACCESS_KEY
83+
const S3_SECRET_ACCESS_KEY = process.env.S3_SECRET_ACCESS_KEY
8384
const OLLAMA_URL = process.env.LAAMA_API_URL ?? process.env.OLLAMA_URL
84-
const LAAMA_API_TOKEN = process.env.LAAMA_API_TOKEN ?? ''
85+
const LAAMA_API_TOKEN = process.LAAMA_API_TOKEN ?? ''
8586

8687
const s3 = new S3Client({
8788
region: 'eu-north-1',
@@ -93,8 +94,8 @@ const s3 = new S3Client({
9394
},
9495
})
9596

96-
const retryOllamaCall = async (fn: Function, maxRetries = 3) => {
97-
let lastError: Error | undefined = undefined
97+
async function retryOllamaCall(fn, maxRetries = 3) {
98+
let lastError
9899
for (let i = 0; i < maxRetries; i++) {
99100
// Health check before each attempt
100101
try {
@@ -127,7 +128,7 @@ const worker = new Worker(
127128
throw new Error('outputBucket is required in job data')
128129
}
129130

130-
const jobIdPath = job?.id?.replace('\//g', '_') ?? ''
131+
const jobIdPath = job.id.replaceAll('\/', '_')
131132

132133
const uploadsDir = './uploads'
133134
const jobRootDir = path.join(uploadsDir, jobIdPath)
@@ -155,7 +156,7 @@ const worker = new Worker(
155156
/**
156157
* Convert PDF pages to text
157158
*/
158-
const pagerender = (pageData) => {
159+
function pagerender(pageData) {
159160
let render_options = {
160161
normalizeWhitespace: false,
161162
disableCombineTextItems: false,
@@ -198,12 +199,12 @@ const worker = new Worker(
198199
/**
199200
* Convert PDF pages to PNG images
200201
*/
201-
let pngPages: PngPageOutput[]
202+
let pngPages
202203
try {
203204
pngPages = await pdfToPng(inputLocalPath, {
204205
outputFileMaskFunc: (pageNumber) => `page_${pageNumber}.png`,
205206
outputFolder: outputImagesDir,
206-
})
207+
});
207208
} catch (error) {
208209
console.error(`Job ${job.id} failed: PDF to PNG conversion failed`, error)
209210
throw new Error('PDF to PNG conversion failed')
@@ -293,8 +294,8 @@ const worker = new Worker(
293294
text = text.replace(/^```markdown/, '').replace(/```$/, '').trim()
294295
}
295296
// Add page number to the end of the first line if it's a heading
296-
const appendToFirstLine = (content: string, suffix: string) => {
297-
return content.replace(/^[^\r\n]*/, (match: string) => match + suffix)
297+
function appendToFirstLine(content, suffix) {
298+
return content.replace(/^[^\r\n]*/, (match) => match + suffix)
298299
}
299300
if (text.trim().startsWith('#')) {
300301
text = appendToFirstLine(text, ` (Page ${pngPage.pageNumber})`)
@@ -342,15 +343,15 @@ const worker = new Worker(
342343

343344
console.log(`Worker started. Listening to queue "${QUEUE_NAME}"...`)
344345

345-
worker.on('completed', (job, _result) => {
346+
worker.on('completed', (job, result) => {
346347
console.log(`Job ${job.id} completed.`)
347348
})
348349

349350
worker.on('failed', (job, err) => {
350351
console.error(`Job ${job?.id} failed:`, err)
351352
})
352353

353-
const shutdown = async () => {
354+
async function shutdown() {
354355
console.log('Shutting down worker...')
355356
try { await worker.close() } catch { }
356357
process.exit(0)

0 commit comments

Comments
 (0)