Skip to content

Commit 16a38df

Browse files
CodingAnarchyclaude
andcommitted
Add comprehensive batch processing support to Workers
This enhancement completes the Job Batching & Bulk Operations feature by adding optimized batch processing capabilities to the Worker system. ## Key Features Added ### Worker Batch Processing - **BatchProcessingStats**: New statistics tracking for batch job processing - **Batch-aware job processing**: Workers detect and optimize batch job handling - **Enhanced monitoring**: Batch completion tracking and success rate calculation - **Failure handling**: Batch-specific error logging and status updates ### Worker Configuration - `with_batch_processing_enabled()`: Enable batch processing optimizations - `get_batch_stats()`: Access batch processing statistics - Automatic batch status monitoring and completion detection ### Enhanced Job Processing - Batch ID tracking throughout job lifecycle - Batch completion rate monitoring (>95% success threshold) - Per-batch failure rate and processing time statistics - Automatic batch status updates on job completion ### Statistics & Monitoring - Jobs processed/completed/failed per batch - Batch success rates and completion tracking - Average processing times for batch jobs - Real-time batch progress monitoring ## Implementation Details ### Worker Enhancements - Added `BatchProcessingStats` struct for comprehensive batch metrics - Extended Worker with batch_processing_enabled flag and batch_stats tracking - Enhanced process_job() method with batch-aware processing - Added helper methods for batch status checking and failure handling ### New Components - `update_batch_stats()`: Thread-safe statistics updates - `check_and_update_batch_status()`: Monitors batch completion - `handle_batch_job_failure()`: Batch-specific failure handling ### Testing & Examples - Comprehensive test suite for worker batch processing - Real-world example demonstrating batch worker capabilities - Integration tests for batch statistics and monitoring ## Usage Example ```rust let worker = Worker::new(queue, "batch_queue".to_string(), handler) .with_batch_processing_enabled(true) .with_stats_collector(stats_collector); // Get batch processing statistics let stats = worker.get_batch_stats(); println\!("Batch success rate: {:.1}%", stats.batch_success_rate() * 100.0); ``` ## Files Modified - src/worker.rs: Enhanced with batch processing capabilities - src/lib.rs: Export BatchProcessingStats - README.md: Updated feature list and examples - examples/worker_batch_example.rs: Comprehensive batch worker demo - tests/worker_batch_tests.rs: Batch processing test suite This completes the Job Batching & Bulk Operations feature, providing end-to-end support for high-performance batch job processing with comprehensive monitoring and statistics. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
1 parent 6339ce0 commit 16a38df

File tree

5 files changed

+780
-2
lines changed

5 files changed

+780
-2
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ A high-performance, database-driven job queue for Rust with comprehensive featur
66

77
- **Multi-database support**: PostgreSQL and MySQL backends
88
- **Job prioritization**: Five priority levels with weighted and strict scheduling algorithms
9-
- **Batch operations**: High-performance bulk job enqueuing for improved throughput
9+
- **Batch operations**: High-performance bulk job enqueuing with optimized worker processing
1010
- **Cron scheduling**: Full cron expression support with timezone awareness
1111
- **Rate limiting**: Token bucket rate limiting with configurable burst limits
1212
- **Monitoring**: Prometheus metrics and advanced alerting (enabled by default)
@@ -108,6 +108,8 @@ Working examples in `examples/`:
108108
- `mysql_example.rs` - MySQL with workers and priorities
109109
- `cron_example.rs` - Cron scheduling with timezones
110110
- `priority_example.rs` - Priority system demonstration
111+
- `batch_example.rs` - Bulk job enqueuing and processing
112+
- `worker_batch_example.rs` - Worker batch processing features
111113

112114
```bash
113115
cargo run --example postgres_example --features postgres

examples/worker_batch_example.rs

Lines changed: 350 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
use hammerwork::{
2+
batch::{JobBatch, PartialFailureMode},
3+
worker::{BatchProcessingStats, Worker, WorkerPool, JobHandler},
4+
Job, JobQueue, JobPriority,
5+
queue::DatabaseQueue,
6+
stats::InMemoryStatsCollector,
7+
HammerworkError,
8+
};
9+
use serde_json::json;
10+
use sqlx::{Pool, Postgres};
11+
use std::sync::Arc;
12+
use std::time::Duration;
13+
use tracing::{info, warn, error};
14+
15+
#[tokio::main]
16+
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
17+
tracing_subscriber::fmt().init();
18+
19+
// Connect to PostgreSQL
20+
let database_url = std::env::var("DATABASE_URL")
21+
.unwrap_or_else(|_| "postgresql://localhost/hammerwork_test".to_string());
22+
23+
let pool = Pool::<Postgres>::connect(&database_url).await?;
24+
let queue = Arc::new(JobQueue::new(pool));
25+
26+
// Initialize database tables
27+
#[cfg(feature = "postgres")]
28+
{
29+
queue.create_tables().await?;
30+
}
31+
32+
// Create statistics collector
33+
let stats_collector = Arc::new(InMemoryStatsCollector::new_default());
34+
35+
// Demo: Worker with batch processing enabled
36+
info!("=== Worker Batch Processing Demo ===");
37+
batch_worker_demo(&queue, &stats_collector).await?;
38+
39+
info!("Worker batch processing demo completed successfully!");
40+
Ok(())
41+
}
42+
43+
async fn batch_worker_demo(
44+
queue: &Arc<JobQueue<Postgres>>,
45+
stats_collector: &Arc<dyn hammerwork::StatisticsCollector>,
46+
) -> std::result::Result<(), Box<dyn std::error::Error>> {
47+
info!("Setting up worker with batch processing enabled...");
48+
49+
// Create a job handler that demonstrates batch processing features
50+
let handler: JobHandler = Arc::new(|job: Job| {
51+
Box::pin(async move {
52+
let job_type = job.payload.get("type").and_then(|v| v.as_str()).unwrap_or("unknown");
53+
let batch_info = if job.batch_id.is_some() {
54+
format!(" (batch: {})", job.batch_id.unwrap())
55+
} else {
56+
String::new()
57+
};
58+
59+
match job_type {
60+
"data_processing" => {
61+
let file_name = job.payload.get("file").and_then(|v| v.as_str()).unwrap_or("unknown");
62+
info!("🔄 Processing data file: {}{}", file_name, batch_info);
63+
64+
// Simulate processing time based on file size
65+
let size = job.payload.get("size_mb").and_then(|v| v.as_u64()).unwrap_or(1);
66+
let processing_time = Duration::from_millis(size * 10); // 10ms per MB
67+
tokio::time::sleep(processing_time).await;
68+
69+
// Simulate occasional processing failures
70+
if file_name.contains("corrupted") {
71+
return Err(HammerworkError::Processing(
72+
format!("File {} is corrupted and cannot be processed", file_name)
73+
));
74+
}
75+
76+
info!("✅ Completed processing: {}", file_name);
77+
}
78+
"image_resize" => {
79+
let image_id = job.payload.get("image_id").and_then(|v| v.as_str()).unwrap_or("unknown");
80+
let dimensions = job.payload.get("target_size").and_then(|v| v.as_str()).unwrap_or("1024x768");
81+
82+
info!("🖼️ Resizing image {} to {}{}", image_id, dimensions, batch_info);
83+
84+
// Simulate image processing
85+
tokio::time::sleep(Duration::from_millis(100)).await;
86+
87+
// Simulate memory issues for large images
88+
if dimensions.contains("4K") {
89+
return Err(HammerworkError::Processing(
90+
format!("Insufficient memory to resize image {} to {}", image_id, dimensions)
91+
));
92+
}
93+
94+
info!("✅ Image resized: {}", image_id);
95+
}
96+
"email_send" => {
97+
let recipient = job.payload.get("to").and_then(|v| v.as_str()).unwrap_or("unknown");
98+
let subject = job.payload.get("subject").and_then(|v| v.as_str()).unwrap_or("No subject");
99+
100+
info!("📧 Sending email to {} with subject '{}'{}", recipient, subject, batch_info);
101+
102+
// Simulate email sending
103+
tokio::time::sleep(Duration::from_millis(50)).await;
104+
105+
// Simulate email delivery failures
106+
if recipient.contains("bounced") {
107+
return Err(HammerworkError::Processing(
108+
format!("Email delivery failed to {}: Recipient address bounced", recipient)
109+
));
110+
}
111+
112+
info!("✅ Email sent to: {}", recipient);
113+
}
114+
_ => {
115+
error!("❌ Unknown job type: {}", job_type);
116+
return Err(HammerworkError::Processing(
117+
format!("Unknown job type: {}", job_type)
118+
));
119+
}
120+
}
121+
122+
Ok(())
123+
})
124+
});
125+
126+
// Create worker with batch processing enabled
127+
let worker = Worker::new(
128+
queue.clone(),
129+
"batch_processing_queue".to_string(),
130+
handler,
131+
)
132+
.with_batch_processing_enabled(true) // Enable batch processing features
133+
.with_poll_interval(Duration::from_millis(100))
134+
.with_max_retries(3)
135+
.with_stats_collector(stats_collector.clone())
136+
.with_priority_weights(hammerwork::PriorityWeights::new()
137+
.with_weight(JobPriority::Critical, 50)
138+
.with_weight(JobPriority::High, 20)
139+
.with_weight(JobPriority::Normal, 10));
140+
141+
// Get initial batch statistics
142+
let initial_stats = worker.get_batch_stats();
143+
info!("Initial batch stats - Jobs processed: {}, Batches completed: {}",
144+
initial_stats.jobs_processed, initial_stats.batches_completed);
145+
146+
// Create multiple batches to demonstrate batch processing
147+
let batches = create_demo_batches();
148+
let mut batch_ids = Vec::new();
149+
150+
for batch in batches {
151+
info!("Enqueueing batch: {} with {} jobs", batch.name, batch.job_count());
152+
let batch_id = queue.enqueue_batch(batch).await?;
153+
batch_ids.push(batch_id);
154+
}
155+
156+
// Start the worker pool
157+
let mut worker_pool = WorkerPool::new();
158+
worker_pool.add_worker(worker);
159+
160+
info!("Starting worker pool to process {} batches...", batch_ids.len());
161+
162+
// Run the worker pool for a limited time
163+
let worker_handle = tokio::spawn(async move {
164+
if let Err(e) = worker_pool.start().await {
165+
error!("Worker pool error: {}", e);
166+
}
167+
});
168+
169+
// Monitor batch progress
170+
let mut completed_batches = 0;
171+
for _ in 0..60 { // Monitor for up to 60 seconds
172+
tokio::time::sleep(Duration::from_secs(1)).await;
173+
174+
let mut all_complete = true;
175+
for &batch_id in &batch_ids {
176+
let batch_result = queue.get_batch_status(batch_id).await?;
177+
178+
if batch_result.pending_jobs > 0 {
179+
all_complete = false;
180+
} else if batch_result.pending_jobs == 0 && batch_result.total_jobs > 0 {
181+
// This batch just completed
182+
if completed_batches < batch_ids.len() {
183+
completed_batches += 1;
184+
let success_rate = batch_result.success_rate();
185+
info!("Batch {} completed! Success rate: {:.1}% ({}/{} jobs successful)",
186+
batch_id, success_rate * 100.0, batch_result.completed_jobs, batch_result.total_jobs);
187+
}
188+
}
189+
}
190+
191+
if all_complete {
192+
info!("🎉 All batches completed!");
193+
break;
194+
}
195+
}
196+
197+
// Stop the worker
198+
worker_handle.abort();
199+
200+
// Display final batch processing statistics
201+
info!("\n=== Final Batch Processing Statistics ===");
202+
203+
for &batch_id in &batch_ids {
204+
let batch_result = queue.get_batch_status(batch_id).await?;
205+
info!("Batch {}: {} total, {} completed, {} failed, {:.1}% success rate",
206+
batch_id,
207+
batch_result.total_jobs,
208+
batch_result.completed_jobs,
209+
batch_result.failed_jobs,
210+
batch_result.success_rate() * 100.0);
211+
}
212+
213+
// Display overall statistics
214+
let overall_stats = stats_collector.get_system_statistics(Duration::from_secs(300)).await?;
215+
info!("\nOverall Statistics:");
216+
info!(" Total processed: {}", overall_stats.total_processed);
217+
info!(" Completed: {}", overall_stats.completed);
218+
info!(" Failed: {}", overall_stats.failed);
219+
info!(" Success rate: {:.1}%", (1.0 - overall_stats.error_rate) * 100.0);
220+
221+
if overall_stats.average_processing_time_ms > 0.0 {
222+
info!(" Average processing time: {:.1}ms", overall_stats.average_processing_time_ms);
223+
}
224+
225+
// Clean up batches
226+
for batch_id in batch_ids {
227+
queue.delete_batch(batch_id).await?;
228+
}
229+
230+
info!("Batch processing demo completed and cleaned up successfully!\n");
231+
232+
Ok(())
233+
}
234+
235+
/// Create sample batches for demonstration
236+
fn create_demo_batches() -> Vec<JobBatch> {
237+
vec![
238+
// Batch 1: Data processing jobs
239+
JobBatch::new("data_processing_batch")
240+
.with_jobs(vec![
241+
Job::new(
242+
"batch_processing_queue".to_string(),
243+
json!({
244+
"type": "data_processing",
245+
"file": "dataset_001.csv",
246+
"size_mb": 50
247+
})
248+
).as_high_priority(),
249+
Job::new(
250+
"batch_processing_queue".to_string(),
251+
json!({
252+
"type": "data_processing",
253+
"file": "dataset_002.csv",
254+
"size_mb": 30
255+
})
256+
),
257+
Job::new(
258+
"batch_processing_queue".to_string(),
259+
json!({
260+
"type": "data_processing",
261+
"file": "corrupted_data.csv", // This will fail
262+
"size_mb": 25
263+
})
264+
),
265+
Job::new(
266+
"batch_processing_queue".to_string(),
267+
json!({
268+
"type": "data_processing",
269+
"file": "dataset_003.csv",
270+
"size_mb": 40
271+
})
272+
),
273+
])
274+
.with_partial_failure_handling(PartialFailureMode::ContinueOnError)
275+
.with_metadata("department", "data_science")
276+
.with_metadata("priority", "high"),
277+
278+
// Batch 2: Image processing jobs
279+
JobBatch::new("image_processing_batch")
280+
.with_jobs(vec![
281+
Job::new(
282+
"batch_processing_queue".to_string(),
283+
json!({
284+
"type": "image_resize",
285+
"image_id": "IMG_001.jpg",
286+
"target_size": "1920x1080"
287+
})
288+
).as_critical(),
289+
Job::new(
290+
"batch_processing_queue".to_string(),
291+
json!({
292+
"type": "image_resize",
293+
"image_id": "IMG_002.jpg",
294+
"target_size": "1024x768"
295+
})
296+
),
297+
Job::new(
298+
"batch_processing_queue".to_string(),
299+
json!({
300+
"type": "image_resize",
301+
"image_id": "IMG_003.jpg",
302+
"target_size": "4K" // This will fail due to memory
303+
})
304+
),
305+
])
306+
.with_partial_failure_handling(PartialFailureMode::CollectErrors)
307+
.with_metadata("campaign", "product_images")
308+
.with_metadata("quality", "high"),
309+
310+
// Batch 3: Email notifications
311+
JobBatch::new("email_notification_batch")
312+
.with_jobs(vec![
313+
Job::new(
314+
"batch_processing_queue".to_string(),
315+
json!({
316+
"type": "email_send",
317+
318+
"subject": "Welcome to our service!"
319+
})
320+
).as_high_priority(),
321+
Job::new(
322+
"batch_processing_queue".to_string(),
323+
json!({
324+
"type": "email_send",
325+
326+
"subject": "Your order has shipped"
327+
})
328+
),
329+
Job::new(
330+
"batch_processing_queue".to_string(),
331+
json!({
332+
"type": "email_send",
333+
"to": "[email protected]", // This will fail
334+
"subject": "Monthly newsletter"
335+
})
336+
),
337+
Job::new(
338+
"batch_processing_queue".to_string(),
339+
json!({
340+
"type": "email_send",
341+
342+
"subject": "Password reset confirmation"
343+
})
344+
).as_critical(),
345+
])
346+
.with_partial_failure_handling(PartialFailureMode::ContinueOnError)
347+
.with_metadata("campaign_id", "Q4_2024")
348+
.with_metadata("email_type", "transactional"),
349+
]
350+
}

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ pub use rate_limit::{RateLimit, RateLimiter, ThrottleConfig};
129129
pub use stats::{
130130
DeadJobSummary, InMemoryStatsCollector, JobStatistics, QueueStats, StatisticsCollector,
131131
};
132-
pub use worker::{Worker, WorkerPool};
132+
pub use worker::{BatchProcessingStats, Worker, WorkerPool};
133133

134134
#[cfg(feature = "metrics")]
135135
pub use metrics::{MetricsConfig, PrometheusMetricsCollector};

0 commit comments

Comments
 (0)