Skip to content

Commit 007eae9

Browse files
committed
add example for batch processing
1 parent be36cb8 commit 007eae9

File tree

8 files changed

+152
-0
lines changed

8 files changed

+152
-0
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
DATABASE_URL=postgresql://postgres:password@localhost:5432/queue_db
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import { defineConfig } from "drizzle-kit"
2+
3+
export default defineConfig({
4+
schema: "./src/schema.ts",
5+
out: "./drizzle",
6+
dialect: "postgresql",
7+
dbCredentials: {
8+
url: process.env.DATABASE_URL || "postgresql://postgres:postgres@localhost:5432/queue_tracking",
9+
},
10+
})
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
{
2+
"name": "batch-processing-example",
3+
"version": "1.0.0",
4+
"description": "batch processing example using Drizzle ORM with postgres.js",
5+
"type": "module",
6+
"private": true,
7+
"scripts": {
8+
"dev": "tsx src/index.ts",
9+
"db:push": "drizzle-kit push"
10+
},
11+
"dependencies": {
12+
"@vorsteh-queue/adapter-drizzle": "workspace:*",
13+
"@vorsteh-queue/core": "workspace:*",
14+
"drizzle-orm": "^0.44.5",
15+
"postgres": "^3.4.7"
16+
},
17+
"devDependencies": {
18+
"drizzle-kit": "^0.31.4",
19+
"tsx": "4.20.5",
20+
"typescript": "^5.9.2"
21+
}
22+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
---
2+
title: Batch Processing Example
3+
navTitle: Batch Processing
4+
description: Demonstrates batch job processing. Shows how to register batch handlers, dispatch jobs in batches, and monitor batch events for robust, event-driven workflows.
5+
---
6+
7+
## Setup
8+
9+
Use the CLI to create this example:
10+
11+
```bash
12+
npx create-vorsteh-queue@latest my-project --template batch-processing
13+
cd my-project
14+
cp .env.example .env
15+
# Edit .env with your PostgreSQL database URL
16+
pnpm db:push
17+
pnpm dev
18+
```
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import { drizzle } from "drizzle-orm/postgres-js"
2+
import postgres from "postgres"
3+
4+
import * as schema from "./schema"
5+
6+
// Shared database connection
7+
const client = postgres(
8+
process.env.DATABASE_URL || "postgresql://postgres:password@localhost:5432/queue_tracking",
9+
{ max: 10 } // Connection pool
10+
)
11+
12+
export const db = drizzle(client, { schema })
13+
export { client }
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import { PostgresQueueAdapter } from "@vorsteh-queue/adapter-drizzle"
2+
import { Queue } from "@vorsteh-queue/core"
3+
4+
import { client, db } from "./database"
5+
6+
// Queue setup with batch config
7+
const queue = new Queue(new PostgresQueueAdapter(db), {
8+
name: "batch-demo",
9+
batch: { minSize: 3, maxSize: 10, waitFor: 2000 },
10+
removeOnComplete: 5,
11+
removeOnFail: 3,
12+
})
13+
14+
interface FilePayload {
15+
file: string
16+
}
17+
interface FileResult {
18+
ok: boolean
19+
}
20+
21+
// Register a batch handler for processing files
22+
queue.registerBatch<FilePayload, FileResult>("process-files", async (jobs) => {
23+
console.log(`Processing batch of ${jobs.length} files...`)
24+
// Simulate processing
25+
await Promise.all(
26+
jobs.map(async (job) => {
27+
await new Promise((resolve) => setTimeout(resolve, 200))
28+
console.log(` ✔️ Processed: ${job.payload.file}`)
29+
}),
30+
)
31+
return jobs.map(() => ({ ok: true }))
32+
})
33+
34+
// Listen to batch events
35+
queue.on("batch:processing", (jobs) => {
36+
console.log(`Batch processing started: ${jobs.length} jobs`)
37+
})
38+
queue.on("batch:completed", (jobs) => {
39+
console.log(`Batch completed: ${jobs.length} jobs`)
40+
})
41+
queue.on("batch:failed", ({ jobs, error }) => {
42+
console.error(`Batch failed: ${jobs.length} jobs`, error)
43+
})
44+
45+
async function main() {
46+
console.log("🚀 Starting Batch Processing Example\n")
47+
48+
// Add jobs in a batch
49+
await queue.addJobs("process-files", [
50+
{ file: "a.csv" },
51+
{ file: "b.csv" },
52+
{ file: "c.csv" },
53+
{ file: "d.csv" },
54+
])
55+
56+
// Start processing
57+
queue.start()
58+
console.log("🔄 Queue processing started!")
59+
60+
// Wait for batches to complete
61+
setTimeout(async () => {
62+
await queue.stop()
63+
await client.end()
64+
console.log("✅ Batch processing complete. Shutdown.")
65+
process.exit(0)
66+
}, 5000)
67+
}
68+
69+
main().catch((error) => {
70+
console.error("❌ Batch processing error:", error)
71+
process.exit(1)
72+
})
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
import { postgresSchema } from "@vorsteh-queue/adapter-drizzle"
2+
3+
export const { queueJobs } = postgresSchema
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
{
2+
"compilerOptions": {
3+
"target": "ES2022",
4+
"module": "ESNext",
5+
"moduleResolution": "bundler",
6+
"strict": true,
7+
"esModuleInterop": true,
8+
"skipLibCheck": true,
9+
"forceConsistentCasingInFileNames": true
10+
},
11+
"include": ["src/**/*"],
12+
"exclude": ["node_modules", "dist"]
13+
}

0 commit comments

Comments
 (0)