Skip to content

Commit ea2f0af

Browse files
authored
Merge pull request #37 from noxify/batch-jobs
Batch processing
2 parents d130607 + 5066d2f commit ea2f0af

File tree

40 files changed

+1424
-1089
lines changed

40 files changed

+1424
-1089
lines changed

.changeset/fine-taxes-drive.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"@vorsteh-queue/adapter-drizzle": minor
3+
"@vorsteh-queue/adapter-kysely": minor
4+
"@vorsteh-queue/adapter-prisma": minor
5+
---
6+
7+
Added support for batch processing

.changeset/sweet-oranges-cut.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
---
2+
"@vorsteh-queue/core": minor
3+
---
4+
5+
### Added
6+
7+
- Batch processing support: You can now register batch handlers via `queue.registerBatch`, allowing the queue to process multiple jobs at once according to configurable batch sizes and timing.
8+
- New `batch` configuration options: `minSize`, `maxSize`, and `waitFor` allow fine-grained control over when and how batches are processed.
9+
- Type-safe batch jobs: Batch jobs are strictly separated from scheduled/single jobs and **do not support** cron, delay, or repeat options.
10+
- Adapter API extended: All core adapters now support efficient batch operations.
11+
- Events for batch lifecycle: The queue emits `batch:processing`, `batch:completed`, and `batch:failed` events for batch jobs.
12+
13+
**Handler exclusivity:** A queue can handle only batch jobs or single jobs — not both. Attempting to register both handler types in the same queue will throw an error. This ensures clear and predictable processing.
14+
15+
#### Example
16+
17+
```ts
18+
import { MemoryQueueAdapter, Queue } from "@vorsteh-queue/core"
19+
20+
type EmailPayload = { to: string; body: string }
21+
type EmailResult = { ok: boolean }
22+
23+
const adapter = new MemoryQueueAdapter()
24+
const queue = new Queue<EmailPayload, EmailResult>(adapter, {
25+
name: "batch-demo",
26+
batch: { minSize: 5, maxSize: 20, waitFor: 1000 },
27+
})
28+
29+
queue.registerBatch("send-emails", async (jobs) => {
30+
// jobs is an array of up to 20 jobs
31+
await sendBulkEmails(jobs.map((j) => j.payload))
32+
return jobs.map(() => ({ ok: true }))
33+
})
34+
35+
// Add jobs as usual
36+
await queue.addJobs("send-emails", [
37+
{ to: "[email protected]", body: "Hi A" },
38+
{ to: "[email protected]", body: "Hi B" },
39+
// ...
40+
])
41+
42+
queue.start()
43+
```

.github/workflows/tests.yml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ jobs:
4646
- name: Setup pnpm
4747
uses: pnpm/action-setup@v4
4848
with:
49-
version: 10.15.1
5049
run_install: false
5150

5251
- shell: bash
@@ -95,7 +94,6 @@ jobs:
9594
- name: Setup pnpm
9695
uses: pnpm/action-setup@v4
9796
with:
98-
version: 10.15.1
9997
run_install: false
10098

10199
- shell: bash
@@ -127,7 +125,6 @@ jobs:
127125
- name: Setup pnpm
128126
uses: pnpm/action-setup@v4
129127
with:
130-
version: 10.15.1
131128
run_install: false
132129

133130
- shell: bash
@@ -161,7 +158,6 @@ jobs:
161158
- name: Setup pnpm
162159
uses: pnpm/action-setup@v4
163160
with:
164-
version: 10.15.1
165161
run_install: false
166162

167163
- shell: bash

README.md

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@
77
## Features
88

99
- **Type-safe**: Full TypeScript support with generic job payloads
10-
- **Multiple adapters**: Drizzle ORM (PostgreSQL), Prisma ORM (PostgreSQL), and in-memory implementations
10+
- **Multiple adapters**: Drizzle ORM (PostgreSQL), Prisma ORM (PostgreSQL), Kysely (PostgreSQL) and in-memory implementations
1111
- **Priority queues**: Numeric priority system (lower = higher priority)
1212
- **Delayed jobs**: Schedule jobs for future execution
1313
- **Recurring jobs**: Cron expressions and interval-based repetition
14+
- **Batch processing**: Process multiple jobs concurrently for higher efficiency and better performance by supporting parallel execution and grouping of jobs.
1415
- **UTC-first timezone support**: Reliable timezone handling with UTC storage
1516
- **Progress tracking**: Real-time job progress updates
1617
- **Event system**: Listen to job lifecycle events
@@ -24,6 +25,7 @@
2425
│ ├── core/ # Core queue logic and interfaces
2526
│ ├── adapter-drizzle/ # Drizzle ORM adapter (PostgreSQL)
2627
│ └── adapter-prisma/ # Prisma ORM adapter (PostgreSQL)
28+
│ └── adapter-kysely/ # Kysely adapter (PostgreSQL)
2729
├── examples/ # Standalone usage examples
2830
└── tooling/ # Shared development tools
2931
```
@@ -49,9 +51,14 @@ pnpm add @vorsteh-queue/core @vorsteh-queue/adapter-drizzle
4951

5052
```typescript
5153
// Drizzle ORM with PostgreSQL
54+
55+
// Prisma ORM with PostgreSQL
56+
import { PrismaClient } from "@prisma/client"
5257
import { drizzle } from "drizzle-orm/node-postgres"
5358
import { Pool } from "pg"
59+
5460
import { PostgresQueueAdapter } from "@vorsteh-queue/adapter-drizzle"
61+
import { PostgresPrismaQueueAdapter } from "@vorsteh-queue/adapter-prisma"
5562
import { Queue } from "@vorsteh-queue/core"
5663

5764
interface EmailPayload {
@@ -69,10 +76,6 @@ const pool = new Pool({ connectionString: "postgresql://..." })
6976
const db = drizzle(pool)
7077
const queue = new Queue(new PostgresQueueAdapter(db), { name: "my-queue" })
7178

72-
// Prisma ORM with PostgreSQL
73-
import { PrismaClient } from "@prisma/client"
74-
import { PostgresPrismaQueueAdapter } from "@vorsteh-queue/adapter-prisma"
75-
7679
const prisma = new PrismaClient()
7780
const queue = new Queue(new PostgresPrismaQueueAdapter(prisma), { name: "my-queue" })
7881

@@ -139,6 +142,26 @@ await queue.add("health-check", payload, {
139142
})
140143
```
141144

145+
## Batch Processing
146+
147+
Process multiple jobs in a single batch for higher throughput and efficiency.
148+
149+
```typescript
150+
queue.registerBatch<{ file: string }, { ok: boolean }>("process-files", async (jobs) => {
151+
console.log(`Processing batch of ${jobs.length} files...`)
152+
return jobs.map(() => ({ ok: true }))
153+
})
154+
155+
await queue.addJobs("process-files", [{ file: "a.csv" }, { file: "b.csv" }, { file: "c.csv" }])
156+
157+
queue.on("batch:processing", (jobs) => {
158+
console.log(`Batch started: ${jobs.length} jobs`)
159+
})
160+
queue.on("batch:completed", (jobs) => {
161+
console.log(`Batch completed: ${jobs.length} jobs`)
162+
})
163+
```
164+
142165
## Job Cleanup
143166

144167
```typescript

apps/docs/package.json

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
"@mdx-js/loader": "3.1.1",
2323
"@mdx-js/node-loader": "3.1.1",
2424
"@mdx-js/react": "3.1.1",
25-
"@next/mdx": "15.5.3",
25+
"@next/mdx": "15.5.4",
2626
"@radix-ui/react-collapsible": "^1.1.12",
2727
"@radix-ui/react-compose-refs": "1.1.2",
2828
"@radix-ui/react-dialog": "^1.1.15",
@@ -41,7 +41,7 @@
4141
"interweave": "13.1.1",
4242
"lucide-react": "0.544.0",
4343
"multimatch": "7.0.0",
44-
"next": "15.5.3",
44+
"next": "15.5.4",
4545
"next-themes": "latest",
4646
"p-map": "7.0.3",
4747
"react": "19.1.1",
@@ -53,23 +53,24 @@
5353
"remark-mdx-frontmatter": "5.2.0",
5454
"remark-squeeze-paragraphs": "6.0.0",
5555
"remark-strip-badges": "7.0.0",
56-
"renoun": "10.1.0",
56+
"renoun": "10.1.2",
5757
"tm-grammars": "1.24.13",
5858
"tm-themes": "1.10.9",
5959
"ts-morph": "27.0.0",
60-
"tw-animate-css": "^1.3.8",
60+
"tw-animate-css": "^1.4.0",
6161
"use-debounce": "10.0.6",
6262
"zod": "4.1.11"
6363
},
6464
"devDependencies": {
6565
"@tailwindcss/postcss": "4.1.13",
66-
"@tailwindcss/typography": "0.5.18",
66+
"@tailwindcss/typography": "0.5.19",
6767
"@types/mdx": "2.0.13",
6868
"@types/node": "22.18.6",
6969
"@types/react": "19.1.13",
7070
"@types/react-dom": "19.1.9",
7171
"@types/serve-handler": "6.1.4",
7272
"@vorsteh-queue/adapter-drizzle": "workspace:*",
73+
"@vorsteh-queue/adapter-kysely": "workspace:*",
7374
"@vorsteh-queue/adapter-prisma": "workspace:*",
7475
"@vorsteh-queue/core": "workspace:*",
7576
"@vorsteh-queue/eslint-config": "workspace:*",
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
DATABASE_URL=postgresql://postgres:password@localhost:5432/queue_db
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import { defineConfig } from "drizzle-kit"
2+
3+
export default defineConfig({
4+
schema: "./src/schema.ts",
5+
out: "./drizzle",
6+
dialect: "postgresql",
7+
dbCredentials: {
8+
url: process.env.DATABASE_URL || "postgresql://postgres:postgres@localhost:5432/queue_tracking",
9+
},
10+
})
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
{
2+
"name": "batch-processing-example",
3+
"version": "1.0.0",
4+
"description": "batch processing example using Drizzle ORM with postgres.js",
5+
"type": "module",
6+
"private": true,
7+
"scripts": {
8+
"dev": "tsx src/index.ts",
9+
"db:push": "drizzle-kit push"
10+
},
11+
"dependencies": {
12+
"@vorsteh-queue/adapter-drizzle": "workspace:*",
13+
"@vorsteh-queue/core": "workspace:*",
14+
"drizzle-orm": "^0.44.5",
15+
"postgres": "^3.4.7"
16+
},
17+
"devDependencies": {
18+
"drizzle-kit": "^0.31.4",
19+
"tsx": "4.20.5",
20+
"typescript": "^5.9.2"
21+
}
22+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
---
2+
title: Batch Processing Example
3+
navTitle: Batch Processing
4+
description: Demonstrates batch job processing. Shows how to register batch handlers, dispatch jobs in batches, and monitor batch events for robust, event-driven workflows.
5+
---
6+
7+
## Setup
8+
9+
Use the CLI to create this example:
10+
11+
```bash
12+
npx create-vorsteh-queue@latest my-project --template batch-processing
13+
cd my-project
14+
cp .env.example .env
15+
# Edit .env with your PostgreSQL database URL
16+
pnpm db:push
17+
pnpm dev
18+
```
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import { drizzle } from "drizzle-orm/postgres-js"
2+
import postgres from "postgres"
3+
4+
import * as schema from "./schema"
5+
6+
// Shared database connection
7+
const client = postgres(
8+
process.env.DATABASE_URL || "postgresql://postgres:password@localhost:5432/queue_tracking",
9+
{ max: 10 } // Connection pool
10+
)
11+
12+
export const db = drizzle(client, { schema })
13+
export { client }

0 commit comments

Comments
 (0)