Skip to content

Commit adc29d8

Browse files
committed
fixing dalai worker types n' stuff
1 parent a7a4068 commit adc29d8

File tree

1 file changed

+29
-30
lines changed

1 file changed

+29
-30
lines changed

dalai/worker.ts

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,9 @@ import path from 'node:path'
33
import fs from 'node:fs/promises'
44
import { S3Client, GetObjectCommand, PutObjectCommand } from '@aws-sdk/client-s3'
55
import { pipeline } from 'node:stream'
6-
import { createWriteStream, createReadStream } from 'node:fs'
7-
import Redis from 'ioredis'
8-
import { pdfToPng } from 'pdf-to-png-converter'
9-
import { v4 as uuidv4 } from 'uuid'
6+
import { createWriteStream } from 'node:fs'
7+
import Redis, { RedisOptions } from 'ioredis'
8+
import { pdfToPng, PngPageOutput } from 'pdf-to-png-converter'
109
import { promisify } from 'node:util'
1110
import pdfToText from 'pdf-parse-fork'
1211
import dotenv from 'dotenv'
@@ -15,18 +14,18 @@ dotenv.config()
1514

1615
const pipelineAsync = promisify(pipeline)
1716

18-
async function downloadS3ToFile(s3, bucket, key, destPath) {
17+
const downloadS3ToFile = async (s3, bucket, key, destPath) => {
1918
const res = await s3.send(new GetObjectCommand({ Bucket: bucket, Key: key }))
2019
await fs.mkdir(path.dirname(destPath), { recursive: true })
2120
await pipelineAsync(res.Body, createWriteStream(destPath))
2221
}
2322

24-
async function uploadFileToS3(s3, bucket, key, filePath, contentType) {
23+
const uploadFileToS3 = async (s3, bucket, key, filePath, contentType) => {
2524
const Body = await fs.readFile(filePath)
2625
await s3.send(new PutObjectCommand({ Bucket: bucket, Key: key, Body, ContentType: contentType }))
2726
}
2827

29-
async function pathExists(p) {
28+
const pathExists = async (p) => {
3029
try {
3130
await fs.access(p)
3231
return true
@@ -35,7 +34,7 @@ async function pathExists(p) {
3534
}
3635
}
3736

38-
function guessContentType(filePath) {
37+
const guessContentType = (filePath: string) => {
3938
const ext = path.extname(filePath).toLowerCase()
4039
if (ext === '.txt') return 'text/plain charset=utf-8'
4140
if (ext === '.json') return 'application/json'
@@ -49,19 +48,19 @@ function guessContentType(filePath) {
4948

5049
// --- Config ---
5150

52-
const REDIS_HOST = process.env.REDIS_HOST
53-
const REDIS_PORT = process.env.REDIS_PORT
54-
const CA = process.env.CA || undefined
55-
const CERT = process.env.CERT
56-
const KEY = process.env.KEY
51+
const REDIS_HOST = process.env.REDIS_HOST ?? ''
52+
const REDIS_PORT = parseInt(process.env.REDIS_PORT ?? '6379')
53+
const CA = process.env.CA ?? ''
54+
const CERT = process.env.CERT ?? ''
55+
const KEY = process.env.KEY ?? ''
5756

58-
let creds = {
57+
let creds: RedisOptions = {
5958
host: REDIS_HOST,
6059
port: REDIS_PORT,
6160
maxRetriesPerRequest: null,
6261
}
6362

64-
if (CA !== undefined) {
63+
if (CA.length > 0) {
6564
creds = {
6665
...creds,
6766
tls: {
@@ -77,12 +76,12 @@ const RETRY_COUNT = 1
7776

7877
const connection = new Redis(creds)
7978

80-
const QUEUE_NAME = process.env.LLAMA_SCAN_QUEUE || 'llama-scan-queue'
81-
const S3_HOST = process.env.S3_HOST || ''
82-
const S3_ACCESS_KEY = process.env.S3_ACCESS_KEY
83-
const S3_SECRET_ACCESS_KEY = process.env.S3_SECRET_ACCESS_KEY
79+
const QUEUE_NAME = process.env.LLAMA_SCAN_QUEUE ?? 'llama-scan-queue'
80+
const S3_HOST = process.env.S3_HOST ?? ''
81+
const S3_ACCESS_KEY = process.env.S3_ACCESS_KEY ?? ''
82+
const S3_SECRET_ACCESS_KEY = process.env.S3_SECRET_ACCESS_KEY ?? ''
8483
const OLLAMA_URL = process.env.LAAMA_API_URL ?? process.env.OLLAMA_URL
85-
const LAAMA_API_TOKEN = process.LAAMA_API_TOKEN ?? ''
84+
const LAAMA_API_TOKEN = process.env.LAAMA_API_TOKEN ?? ''
8685

8786
const s3 = new S3Client({
8887
region: 'eu-north-1',
@@ -94,8 +93,8 @@ const s3 = new S3Client({
9493
},
9594
})
9695

97-
async function retryOllamaCall(fn, maxRetries = 3) {
98-
let lastError
96+
const retryOllamaCall = async (fn: Function, maxRetries = 3) => {
97+
let lastError: Error | undefined = undefined
9998
for (let i = 0; i < maxRetries; i++) {
10099
// Health check before each attempt
101100
try {
@@ -128,7 +127,7 @@ const worker = new Worker(
128127
throw new Error('outputBucket is required in job data')
129128
}
130129

131-
const jobIdPath = job.id.replaceAll('\/', '_')
130+
const jobIdPath = job?.id?.replace('\//g', '_') ?? ''
132131

133132
const uploadsDir = './uploads'
134133
const jobRootDir = path.join(uploadsDir, jobIdPath)
@@ -156,7 +155,7 @@ const worker = new Worker(
156155
/**
157156
* Convert PDF pages to text
158157
*/
159-
function pagerender(pageData) {
158+
const pagerender = (pageData) => {
160159
let render_options = {
161160
normalizeWhitespace: false,
162161
disableCombineTextItems: false,
@@ -199,12 +198,12 @@ const worker = new Worker(
199198
/**
200199
* Convert PDF pages to PNG images
201200
*/
202-
let pngPages
201+
let pngPages: PngPageOutput[]
203202
try {
204203
pngPages = await pdfToPng(inputLocalPath, {
205204
outputFileMaskFunc: (pageNumber) => `page_${pageNumber}.png`,
206205
outputFolder: outputImagesDir,
207-
});
206+
})
208207
} catch (error) {
209208
console.error(`Job ${job.id} failed: PDF to PNG conversion failed`, error)
210209
throw new Error('PDF to PNG conversion failed')
@@ -294,8 +293,8 @@ const worker = new Worker(
294293
text = text.replace(/^```markdown/, '').replace(/```$/, '').trim()
295294
}
296295
// Add page number to the end of the first line if it's a heading
297-
function appendToFirstLine(content, suffix) {
298-
return content.replace(/^[^\r\n]*/, (match) => match + suffix)
296+
const appendToFirstLine = (content: string, suffix: string) => {
297+
return content.replace(/^[^\r\n]*/, (match: string) => match + suffix)
299298
}
300299
if (text.trim().startsWith('#')) {
301300
text = appendToFirstLine(text, ` (Page ${pngPage.pageNumber})`)
@@ -343,15 +342,15 @@ const worker = new Worker(
343342

344343
console.log(`Worker started. Listening to queue "${QUEUE_NAME}"...`)
345344

346-
worker.on('completed', (job, result) => {
345+
worker.on('completed', (job, _result) => {
347346
console.log(`Job ${job.id} completed.`)
348347
})
349348

350349
worker.on('failed', (job, err) => {
351350
console.error(`Job ${job?.id} failed:`, err)
352351
})
353352

354-
async function shutdown() {
353+
const shutdown = async () => {
355354
console.log('Shutting down worker...')
356355
try { await worker.close() } catch { }
357356
process.exit(0)

0 commit comments

Comments
 (0)