Skip to content

Commit ccaacc2

Browse files
committed
Add job result storage functionality
1 parent 4c64d96 commit ccaacc2

File tree

22 files changed

+795
-93
lines changed

22 files changed

+795
-93
lines changed

.changeset/clean-brooms-cut.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
---
2+
"@vorsteh-queue/core": minor
3+
"@vorsteh-queue/adapter-drizzle": minor
4+
"@vorsteh-queue/adapter-prisma": minor
5+
---
6+
7+
Add job result storage functionality
8+
9+
Job handlers can now return values that are automatically stored in the database and made available through events and job records.
10+
11+
**New Features:**
12+
13+
- Added optional `result` field to job records for storing handler return values
14+
- Enhanced `BaseJob` interface with generic result type parameter
15+
- Updated all adapters (Drizzle, Prisma, Memory) to support result storage
16+
- Results are accessible in `job:completed` events and job queries
17+
18+
**Database Changes:**
19+
20+
- Added `result` JSONB column to queue_jobs table in both Drizzle and Prisma schemas
21+
- Column is optional and backward compatible with existing jobs
22+
23+
**API Changes:**
24+
25+
- `JobHandler<TPayload, TResult>` now supports result type parameter
26+
- `updateJobStatus()` method accepts optional result parameter
27+
- Job completion events include result data
28+
29+
**Examples:**
30+
31+
```typescript
32+
// Handler with typed result
33+
queue.register<EmailPayload, EmailResult>("send-email", async (job) => {
34+
await sendEmail(job.payload)
35+
return { messageId: "msg_123", sent: true } // Stored in job.result
36+
})
37+
38+
// Access results in events
39+
queue.on("job:completed", (job) => {
40+
console.log("Result:", job.result) // { messageId: "msg_123", sent: true }
41+
})
42+
```

README.md

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ queue.register<EmailPayload, EmailResult>("send-email", async (job) => {
8888
// Send email logic here
8989
await sendEmail(job.payload)
9090

91+
// Return result - will be stored in job.result field
9192
return {
9293
messageId: "msg_123",
9394
sent: true
@@ -196,6 +197,49 @@ await queue.add("business-task", payload, {
196197
4. **Simple and predictable** - no runtime timezone complexity
197198
5. **Server timezone independent** - works consistently across environments
198199

200+
## Job Results
201+
202+
Job handlers can return results that are automatically stored and made available:
203+
204+
```typescript
205+
interface ProcessResult {
206+
processed: number
207+
errors: string[]
208+
duration: number
209+
}
210+
211+
queue.register<{ items: string[] }, ProcessResult>("process-data", async (job) => {
212+
const startTime = Date.now()
213+
const errors: string[] = []
214+
let processed = 0
215+
216+
for (const item of job.payload.items) {
217+
try {
218+
await processItem(item)
219+
processed++
220+
} catch (error) {
221+
errors.push(`Failed to process ${item}: ${error.message}`)
222+
}
223+
}
224+
225+
// Return result - automatically stored in job.result field
226+
return {
227+
processed,
228+
errors,
229+
duration: Date.now() - startTime
230+
}
231+
})
232+
233+
// Access results in events
234+
queue.on("job:completed", (job) => {
235+
const result = job.result as ProcessResult
236+
console.log(`Processed ${result.processed} items in ${result.duration}ms`)
237+
if (result.errors.length > 0) {
238+
console.warn(`Errors: ${result.errors.join(", ")}`)
239+
}
240+
})
241+
```
242+
199243
## Progress Tracking
200244

201245
```typescript
@@ -235,6 +279,7 @@ queue.on("job:processing", (job) => {
235279

236280
queue.on("job:completed", (job) => {
237281
console.log(`🎉 Job ${job.name} completed successfully`)
282+
console.log(`📊 Result:`, job.result) // Access job result
238283
})
239284

240285
queue.on("job:failed", (job) => {

examples/README.md

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,87 @@ Standalone examples demonstrating different configurations of vorsteh-queue.
55
## Available Examples
66

77
### [drizzle-pg](./drizzle-pg/)
8+
89
Basic usage with Drizzle ORM and node-postgres driver.
10+
911
- Simple job processing
1012
- Priority handling
1113
- Delayed jobs
1214

15+
### [drizzle-pglite](./drizzle-pglite/)
16+
17+
Zero-setup example using Drizzle ORM with PGlite (embedded PostgreSQL).
18+
19+
- No external database required
20+
- Multiple job types with results
21+
- Progress tracking
22+
- Event system
23+
1324
### [drizzle-postgres](./drizzle-postgres/)
25+
1426
Advanced usage with Drizzle ORM and postgres.js driver.
27+
1528
- Multiple job types
1629
- Recurring jobs
1730
- Event handling
1831
- Queue monitoring
1932

33+
### [event-system](./event-system/)
34+
35+
Comprehensive event monitoring and statistics.
36+
37+
- Job lifecycle events
38+
- Real-time statistics
39+
- Performance monitoring
40+
- Error tracking
41+
42+
### [pm2-workers](./pm2-workers/)
43+
44+
Production deployment with PM2 process manager.
45+
46+
- Multiple worker processes
47+
- Process management
48+
- Scaling configuration
49+
- Production monitoring
50+
51+
### [prisma-client](./prisma-client/)
52+
53+
Prisma ORM with PostgreSQL using driver adapters.
54+
55+
- Type-safe database operations
56+
- Modern Prisma client w/o rust engine
57+
- Job management
58+
59+
### [prisma-client-js](./prisma-client-js/)
60+
61+
Prisma ORM with traditional prisma-client-js provider.
62+
63+
- Legacy Prisma setup
64+
- PostgreSQL integration
65+
- Job management
66+
67+
### [progress-tracking](./progress-tracking/)
68+
69+
Real-time job progress tracking demonstration.
70+
71+
- Progress updates
72+
- Long-running jobs
73+
- Progress monitoring
74+
- Status reporting
75+
76+
### [result-storage](./result-storage/)
77+
78+
Job result storage and retrieval with progress tracking.
79+
80+
- Job return values
81+
- Result persistence
82+
- Progress tracking
83+
- Error handling
84+
2085
## Quick Start
2186

2287
1. Choose an example directory
2388
2. Follow the README instructions in that directory
2489
3. Ensure PostgreSQL is running locally or update the DATABASE_URL
2590

26-
Each example is self-contained and can be used as a starting point for your own implementation.
91+
Each example is self-contained and can be used as a starting point for your own implementation.

examples/prisma-client-js/prisma/schema.prisma

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ model QueueJob {
2424
completedAt DateTime? @map("completed_at") @db.Timestamptz(6)
2525
failedAt DateTime? @map("failed_at") @db.Timestamptz(6)
2626
error Json? @db.JsonB
27+
result Json? @db.JsonB
2728
progress Int? @default(0)
2829
cron String? @db.VarChar(255)
2930
repeatEvery Int? @map("repeat_every")

examples/prisma-client/prisma/schema.prisma

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ model QueueJob {
3131
completedAt DateTime? @map("completed_at") @db.Timestamptz(6)
3232
failedAt DateTime? @map("failed_at") @db.Timestamptz(6)
3333
error Json? @db.JsonB
34+
result Json? @db.JsonB
3435
progress Int? @default(0)
3536
cron String? @db.VarChar(255)
3637
repeatEvery Int? @map("repeat_every")

examples/result-storage/README.md

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# Result Storage Example
2+
3+
This example demonstrates how to use job results in Vorsteh Queue. Job handlers can return values that are automatically stored in the database and made available through events and job records.
4+
5+
## Features Demonstrated
6+
7+
- **Result Storage**: Job handlers return results that are automatically stored
8+
- **Type Safety**: Full TypeScript support for job payloads and results
9+
- **Progress Tracking**: Real-time progress updates during job processing
10+
- **Error Handling**: Proper error collection and reporting in results
11+
- **Event Access**: Accessing results through job completion events
12+
13+
## Running the Example
14+
15+
```bash
16+
pnpm install
17+
pnpm start
18+
```
19+
20+
## Key Concepts
21+
22+
### Job Handler Results
23+
24+
```typescript
25+
interface ProcessDataResult {
26+
processed: number
27+
failed: number
28+
duration: number
29+
errors: string[]
30+
}
31+
32+
queue.register<ProcessDataPayload, ProcessDataResult>("process-data", async (job) => {
33+
// Process data...
34+
35+
// Return result - automatically stored in job.result field
36+
return {
37+
processed: 45,
38+
failed: 5,
39+
duration: 1250,
40+
errors: ["Failed to process item-23"]
41+
}
42+
})
43+
```
44+
45+
### Accessing Results
46+
47+
Results are available in job completion events:
48+
49+
```typescript
50+
queue.on("job:completed", (job) => {
51+
console.log("Job result:", job.result)
52+
// Result is typed according to your handler's return type
53+
})
54+
```
55+
56+
### Database Storage
57+
58+
Results are stored as JSON in the database `result` field and can be queried later for analytics, debugging, or reporting purposes.
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
{
2+
"name": "result-storage-example",
3+
"version": "1.0.0",
4+
"description": "demonstrating job result storage and retrieval with progress tracking using Drizzle ORM with PGlite (embedded PostgreSQL)",
5+
"private": true,
6+
"type": "module",
7+
"scripts": {
8+
"dev": "tsx src/index.ts",
9+
"db:push": "drizzle-kit push"
10+
},
11+
"dependencies": {
12+
"@electric-sql/pglite": "^0.3.5",
13+
"@vorsteh-queue/adapter-drizzle": "workspace:*",
14+
"@vorsteh-queue/core": "workspace:*",
15+
"drizzle-orm": "^0.44.3"
16+
},
17+
"devDependencies": {
18+
"drizzle-kit": "^0.31.4",
19+
"tsx": "4.20.3",
20+
"typescript": "^5.8.3"
21+
}
22+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import { PGlite } from "@electric-sql/pglite"
2+
import { drizzle } from "drizzle-orm/pglite"
3+
4+
import * as schema from "./schema"
5+
6+
// Shared embedded database connection
7+
const client = new PGlite()
8+
9+
export const db = drizzle(client, { schema })
10+
export { client }

0 commit comments

Comments
 (0)