Skip to content

Commit fa1df05

Browse files
chunk data and queue job worker
1 parent 237be68 commit fa1df05

File tree

11 files changed

+46
-19
lines changed

11 files changed

+46
-19
lines changed

README.MD

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
- [x] Upload Multiple Files
2-
- [ ] implement queue worker
2+
- [x] implement queue worker
33
- [x] read pdf and chunk the text
44
- [ ] combine those knowledge
55
- [ ] store in vector db

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
"scripts": {
77
"dev": "nodemon src/index.ts",
88
"build": "tsc --project tsconfig.prod.json",
9+
"work": "nodemon src/jobs/workers/filesWorker.ts",
910
"start": "node dist/src/index.js"
1011
},
1112
"keywords": [],

src/controllers/pdf/pdf-upload.ts

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
1-
import { Request, Response, NextFunction } from "express";
1+
import { Request, Response } from "express";
22
import { processPdf } from "../../service/process-document";
3+
import fileQueue from "../../jobs/queues/filesQueue";
34

4-
export const pdfUploadFilesController = (req: Request, res: Response): any => {
5+
export const pdfUploadFilesController = async (
6+
req: Request,
7+
res: Response
8+
): Promise<any> => {
59
if (!req.files || (req.files as Express.Multer.File[]).length === 0) {
610
return res.status(400).json("No Files were upload");
711
}
@@ -13,7 +17,18 @@ export const pdfUploadFilesController = (req: Request, res: Response): any => {
1317
path: file.path,
1418
}));
1519

16-
processPdf(fileInfo[0].path);
20+
try {
21+
for (const file of files) {
22+
console.log(`Adding job to queue for file: ${file.originalname}`);
23+
24+
await fileQueue.add("process-pdf", {
25+
filePath: file.path,
26+
originalName: file.filename,
27+
});
28+
}
29+
} catch (error) {
30+
console.log("Queue Job Worker Error", error.message);
31+
}
1732

1833
res.status(200).json({
1934
message: `${files.length} files uploaded successfully!`,

src/jobs/queues/filesQueue.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import Redis from "ioredis";
33

44
const connection = new Redis({
55
host: process.env.REDIS_HOST,
6-
port: parseInt(process.env.REDIS_PORT),
6+
port: parseInt(process.env.REDIS_PORT) | 6379,
77
});
88

99
const fileQueue = new Queue("FileQueue", { connection });

src/jobs/workers/filesWorker.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1+
import "dotenv/config";
12
import { Job, Worker } from "bullmq";
23
import Redis from "ioredis";
34
import { processPdf } from "../../service/process-document";
45

6+
console.log(process.env.REDIS_PORT);
7+
58
const connection = new Redis({
69
host: process.env.REDIS_HOST,
7-
port: parseInt(process.env.REDIS_PORT),
10+
port: parseInt(process.env.REDIS_PORT) || 6379,
811
maxRetriesPerRequest: null,
912
});
1013

@@ -15,9 +18,15 @@ const fileWorker = new Worker(
1518
`[Worker] Picked up job #${job.id} for file: ${job.data.originalName}`
1619
);
1720

18-
await processPdf(job.data.filePath);
21+
const chunk = await processPdf(job.data.filePath);
22+
23+
console.log(` ✅ Chunk Data from ${job.data.originalName} ${chunk}`);
24+
25+
console.log(` ✅ [Worker] Finished job #${job.id}`);
1926

20-
console.log(`[Worker] Finished job #${job.id}`);
27+
console.log(
28+
"-------------------------------------------------------------------"
29+
);
2130
},
2231
{ connection }
2332
);

src/service/process-document.ts

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ export async function processPdf(filePath: string) {
1212
const extractedText = pdfData.text;
1313

1414
console.log(`✅ Text extracted successfully.`);
15-
console.log(` - Total Pages: ${pdfData.numpages}`);
16-
console.log(` - Text Length: ${extractedText.length} characters\n`);
15+
// console.log(` - Total Pages: ${pdfData.numpages}`);
16+
// console.log(` - Text Length: ${extractedText.length} characters\n`);
1717

1818
const splitter = new RecursiveCharacterTextSplitter({
1919
chunkSize: 1000, // Max characters per chunk
@@ -22,18 +22,20 @@ export async function processPdf(filePath: string) {
2222

2323
const chunks = await splitter.splitText(extractedText);
2424

25-
console.log(`✅ Text chunked successfully.`);
25+
console.log(`✅ Text chunked successfully for ${filePath}.`);
2626
console.log(` - Number of chunks created: ${chunks.length}\n`);
2727

28-
for (let i = 0; i < Math.min(3, chunks.length); i++) {
29-
console.log(`\n--- Chunk ${i + 1} (Length: ${chunks[i].length}) ---`);
30-
console.log(chunks[i]);
31-
console.log("-------------------------------------\n");
32-
}
28+
// for (let i = 0; i < Math.min(1, chunks.length); i++) {
29+
// console.log(`\n--- Chunk ${i + 1} (Length: ${chunks[i].length}) ---`);
30+
// console.log(chunks[i]);
31+
// console.log("-------------------------------------\n");
32+
// }
3333

34-
if (chunks.length > 3) {
35-
console.log(`... and ${chunks.length - 3} more chunks.`);
36-
}
34+
// if (chunks.length > 3) {
35+
// console.log(`... and ${chunks.length - 3} more chunks.`);
36+
// }
37+
38+
return chunks;
3739
} catch (error) {
3840
console.error("An error occurred during PDF processing:", error);
3941
}
829 KB
Binary file not shown.
2.02 MB
Binary file not shown.
2.02 MB
Binary file not shown.
1000 KB
Binary file not shown.

0 commit comments

Comments
 (0)