Skip to content

Commit 78989ac

Browse files
CodingAnarchyclaude
andcommitted
Add comprehensive job timeout functionality
Implements robust timeout detection and handling for job processing with complete database persistence, statistics integration, and monitoring. Core Features: - Per-job timeout configuration with Job::with_timeout() builder - Worker-level default timeouts with Worker::with_default_timeout() - TimedOut job status for jobs exceeding timeout duration - Async timeout detection using tokio::time::timeout - Intelligent timeout precedence (job-specific overrides worker default) Database Integration: - timeout_seconds and timed_out_at columns in PostgreSQL and MySQL schemas - mark_job_timed_out() method added to DatabaseQueue trait - Complete timeout support in both database implementations - Database queries updated for timeout field handling Statistics & Monitoring: - timed_out field in JobStatistics for timeout event tracking - timed_out_count in QueueStats for per-queue timeout monitoring - Timeout events included in error rate calculations - JobEventType::TimedOut for comprehensive event tracking Testing & Examples: - 14 comprehensive tests covering timeout functionality and edge cases - Enhanced PostgreSQL and MySQL examples with timeout demonstrations - Various timeout scenarios (10s, 60s, 600s) and priority-based configs - Complete test coverage for timeout detection, statistics, and database ops Technical Implementation: - Graceful timeout handling without affecting other jobs - Proper async resource management with tokio timeout - Backward compatible with existing job queue functionality - Type-safe timeout configuration throughout the API 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
1 parent 46f4636 commit 78989ac

File tree

7 files changed

+831
-57
lines changed

7 files changed

+831
-57
lines changed

CHANGELOG.md

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,53 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [0.2.2] - 2025-06-26
9+
10+
### Added
11+
- **Comprehensive Job Timeout Functionality**
12+
- `TimedOut` job status for jobs that exceed their timeout duration
13+
- Per-job timeout configuration with `Job::with_timeout()` builder method
14+
- Worker-level default timeouts with `Worker::with_default_timeout()`
15+
- Timeout detection using `tokio::time::timeout` for efficient async timeout handling
16+
- `timeout` and `timed_out_at` fields added to `Job` struct for complete timeout tracking
17+
- Automatic timeout event recording in statistics with `JobEventType::TimedOut`
18+
19+
- **Enhanced Database Support for Timeouts**
20+
- `timeout_seconds` and `timed_out_at` columns added to database schema
21+
- `mark_job_timed_out()` method added to `DatabaseQueue` trait
22+
- Complete timeout support in both PostgreSQL and MySQL implementations
23+
- Database queries updated to handle timeout fields in job lifecycle operations
24+
- Timeout counts integrated into queue statistics with `timed_out_count` field
25+
26+
- **Timeout Statistics and Monitoring**
27+
- `timed_out` field added to `JobStatistics` for timeout event tracking
28+
- Timeout events included in error rate calculations for comprehensive metrics
29+
- `timed_out_count` added to `QueueStats` for per-queue timeout monitoring
30+
- Enhanced statistics display in examples showing timeout metrics
31+
- Timeout event processing in `InMemoryStatsCollector`
32+
33+
- **Comprehensive Testing**
34+
- 14 new comprehensive tests covering timeout functionality
35+
- Job timeout detection logic testing with edge cases
36+
- Worker timeout configuration and precedence testing
37+
- Timeout statistics integration testing
38+
- Database operation interface testing for timeout methods
39+
- Job lifecycle testing with timeout scenarios
40+
41+
- **Enhanced Examples**
42+
- Updated PostgreSQL example with timeout configuration demonstrations
43+
- Updated MySQL example with various timeout scenarios (10s, 60s, 600s)
44+
- Job timeout precedence examples (job-specific vs worker defaults)
45+
- Priority-based timeout configuration examples (VIP vs standard jobs)
46+
- Comprehensive timeout statistics display in both examples
47+
48+
### Technical Implementation
49+
- Timeout precedence: job-specific timeout takes priority over worker default timeout
50+
- Graceful timeout handling: jobs are marked as `TimedOut` without affecting other jobs
51+
- Async timeout detection: uses `tokio::time::timeout` for efficient resource management
52+
- Database consistency: timeout information persisted and retrievable across job lifecycle
53+
- Statistics integration: timeout events fully integrated into existing statistics framework
54+
855
## [0.2.1] - 2025-06-25
956

1057
### Removed
@@ -107,6 +154,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
107154
---
108155

109156
## Release Links
157+
- [0.2.2](https://github.com/CodingAnarchy/hammerwork/releases/tag/v0.2.2)
110158
- [0.2.1](https://github.com/CodingAnarchy/hammerwork/releases/tag/v0.2.1)
111159
- [0.2.0](https://github.com/CodingAnarchy/hammerwork/releases/tag/v0.2.0)
112160
- [0.1.0](https://github.com/CodingAnarchy/hammerwork/releases/tag/v0.1.0)

examples/mysql_example.rs

Lines changed: 68 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -86,15 +86,17 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
8686
}) as std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
8787
});
8888

89-
// Create workers for different queues with statistics collection
89+
// Create workers for different queues with statistics collection and timeout configuration
9090
let image_worker = Worker::new(queue.clone(), "image_processing".to_string(), image_handler)
9191
.with_poll_interval(tokio::time::Duration::from_secs(1))
9292
.with_max_retries(2)
93+
.with_default_timeout(tokio::time::Duration::from_secs(120)) // 2-minute timeout for image processing
9394
.with_stats_collector(Arc::clone(&stats_collector));
9495

9596
let email_worker = Worker::new(queue.clone(), "email_queue".to_string(), email_handler)
9697
.with_poll_interval(tokio::time::Duration::from_millis(500))
9798
.with_max_retries(3)
99+
.with_default_timeout(tokio::time::Duration::from_secs(30)) // 30-second timeout for emails
98100
.with_stats_collector(Arc::clone(&stats_collector));
99101

100102
let mut worker_pool = WorkerPool::new()
@@ -107,15 +109,15 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
107109
{
108110
use hammerwork::queue::DatabaseQueue;
109111

110-
// Create various test jobs including some that will fail
112+
// Create various test jobs with timeout configurations
111113
let successful_image_job = Job::new(
112114
"image_processing".to_string(),
113115
json!({
114116
"image_url": "https://example.com/photo.jpg",
115117
"resize": "800x600",
116118
"format": "webp"
117119
}),
118-
);
120+
).with_timeout(std::time::Duration::from_secs(60)); // Custom 1-minute timeout for this job
119121

120122
let failing_image_job = Job::new(
121123
"image_processing".to_string(),
@@ -124,7 +126,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
124126
"resize": "800x600",
125127
"format": "webp"
126128
}),
127-
);
129+
); // Uses worker default timeout (2 minutes)
128130

129131
let email_job = Job::new(
130132
"email_queue".to_string(),
@@ -133,7 +135,31 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
133135
"subject": "Your image has been processed",
134136
"template": "image_ready"
135137
}),
136-
);
138+
).with_timeout(std::time::Duration::from_secs(10)); // Quick timeout for email sending
139+
140+
// High-priority email with longer timeout and more retries
141+
let priority_email_job = Job::new(
142+
"email_queue".to_string(),
143+
json!({
144+
145+
"subject": "VIP Image Processing Complete",
146+
"template": "vip_notification",
147+
"priority": "high"
148+
}),
149+
).with_timeout(std::time::Duration::from_secs(45)) // Longer timeout for VIP
150+
.with_max_attempts(5); // More retry attempts for important emails
151+
152+
// Large image processing job with extended timeout
153+
let large_image_job = Job::new(
154+
"image_processing".to_string(),
155+
json!({
156+
"image_url": "https://example.com/huge_photo.raw",
157+
"resize": "4K",
158+
"format": "png",
159+
"effects": ["sharpen", "color_correct", "denoise"]
160+
}),
161+
).with_timeout(std::time::Duration::from_secs(600)) // 10-minute timeout for large processing
162+
.with_max_attempts(2); // Fewer retries for expensive operations
137163

138164
// Schedule a delayed job
139165
let delayed_job = Job::with_delay(
@@ -149,38 +175,49 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
149175
let job1_id = queue.enqueue(successful_image_job).await?;
150176
let job2_id = queue.enqueue(failing_image_job).await?;
151177
let job3_id = queue.enqueue(email_job).await?;
152-
let job4_id = queue.enqueue(delayed_job).await?;
153-
154-
info!("Enqueued test jobs: {}, {}, {}, {}", job1_id, job2_id, job3_id, job4_id);
178+
let job4_id = queue.enqueue(priority_email_job).await?;
179+
let job5_id = queue.enqueue(large_image_job).await?;
180+
let job6_id = queue.enqueue(delayed_job).await?;
181+
182+
info!("Enqueued test jobs with various timeout configurations:");
183+
info!(" {} - image processing with 60s timeout", job1_id);
184+
info!(" {} - failing image job (uses worker default 2min timeout)", job2_id);
185+
info!(" {} - email with 10s timeout", job3_id);
186+
info!(" {} - VIP email with 45s timeout", job4_id);
187+
info!(" {} - large image with 10min timeout", job5_id);
188+
info!(" {} - delayed email (1min delay)", job6_id);
155189

156190
// Let jobs process for a bit
157191
tokio::time::sleep(tokio::time::Duration::from_secs(8)).await;
158192

159-
// Demonstrate statistics collection
160-
info!("=== Job Processing Statistics ===");
193+
// Demonstrate statistics collection with timeout tracking
194+
info!("=== Job Processing Statistics (Including Timeouts) ===");
161195
let system_stats = stats_collector.get_system_statistics(std::time::Duration::from_secs(300)).await?;
162-
info!("System Stats - Total: {}, Completed: {}, Failed: {}, Dead: {}, Error Rate: {:.2}%",
196+
info!("System Stats - Total: {}, Completed: {}, Failed: {}, Dead: {}, Timed Out: {}, Error Rate: {:.2}%",
163197
system_stats.total_processed,
164198
system_stats.completed,
165199
system_stats.failed,
166200
system_stats.dead,
201+
system_stats.timed_out,
167202
system_stats.error_rate * 100.0
168203
);
169204

170205
// Get statistics for each queue
171206
let image_stats = stats_collector.get_queue_statistics("image_processing", std::time::Duration::from_secs(300)).await?;
172-
info!("Image Processing Stats - Total: {}, Completed: {}, Failed: {}, Avg Time: {:.2}ms",
207+
info!("Image Processing Stats - Total: {}, Completed: {}, Failed: {}, Timed Out: {}, Avg Time: {:.2}ms",
173208
image_stats.total_processed,
174209
image_stats.completed,
175210
image_stats.failed,
211+
image_stats.timed_out,
176212
image_stats.avg_processing_time_ms
177213
);
178214

179215
let email_stats = stats_collector.get_queue_statistics("email_queue", std::time::Duration::from_secs(300)).await?;
180-
info!("Email Queue Stats - Total: {}, Completed: {}, Failed: {}, Avg Time: {:.2}ms",
216+
info!("Email Queue Stats - Total: {}, Completed: {}, Failed: {}, Timed Out: {}, Avg Time: {:.2}ms",
181217
email_stats.total_processed,
182218
email_stats.completed,
183219
email_stats.failed,
220+
email_stats.timed_out,
184221
email_stats.avg_processing_time_ms
185222
);
186223

@@ -204,14 +241,15 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
204241
dead_summary.dead_jobs_by_queue
205242
);
206243

207-
// Show queue statistics from database
244+
// Show queue statistics from database (including timeout counts)
208245
let all_queue_stats = queue.get_all_queue_stats().await?;
209246
for queue_stat in all_queue_stats {
210-
info!("Queue '{}' - Pending: {}, Running: {}, Dead: {}, Completed: {}",
247+
info!("Queue '{}' - Pending: {}, Running: {}, Dead: {}, Timed Out: {}, Completed: {}",
211248
queue_stat.queue_name,
212249
queue_stat.pending_count,
213250
queue_stat.running_count,
214251
queue_stat.dead_count,
252+
queue_stat.timed_out_count,
215253
queue_stat.completed_count
216254
);
217255
}
@@ -226,7 +264,21 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
226264
}
227265
}
228266

229-
info!("MySQL example completed successfully with statistics and dead job management.");
267+
info!("MySQL example completed successfully!");
268+
info!("Demonstrated features:");
269+
info!(" ✓ Job timeout configuration (per-job and worker defaults)");
270+
info!(" ✓ Various timeout scenarios (10s email, 60s image, 10min large jobs)");
271+
info!(" ✓ Timeout statistics tracking and monitoring");
272+
info!(" ✓ Dead job management and error analysis");
273+
info!(" ✓ Comprehensive queue statistics with timeout counts");
274+
info!("");
275+
info!("Timeout Features Demonstrated:");
276+
info!(" • Worker default timeouts (30s for email, 2min for image processing)");
277+
info!(" • Job-specific timeouts (10s, 45s, 60s, 600s examples)");
278+
info!(" • Priority-based timeout configuration (VIP vs standard jobs)");
279+
info!(" • Timeout event tracking in statistics");
280+
info!(" • Database timeout count tracking");
281+
info!("");
230282
info!("In a real application, you would start the worker pool to run indefinitely:");
231283
info!("worker_pool.start().await?;");
232284

examples/postgres_example.rs

Lines changed: 69 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,15 +52,17 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
5252
}) as std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
5353
});
5454

55-
// Create and start workers with statistics collection
55+
// Create and start workers with statistics collection and timeout configuration
5656
let worker1 = Worker::new(queue.clone(), "email".to_string(), handler.clone())
5757
.with_poll_interval(tokio::time::Duration::from_secs(1))
5858
.with_max_retries(3)
59+
.with_default_timeout(tokio::time::Duration::from_secs(30)) // 30 second default timeout
5960
.with_stats_collector(Arc::clone(&stats_collector));
6061

6162
let worker2 = Worker::new(queue.clone(), "notifications".to_string(), handler.clone())
6263
.with_poll_interval(tokio::time::Duration::from_secs(2))
6364
.with_max_retries(2)
65+
.with_default_timeout(tokio::time::Duration::from_secs(60)) // 60 second default timeout
6466
.with_stats_collector(Arc::clone(&stats_collector));
6567

6668
let mut worker_pool = WorkerPool::new()
@@ -73,47 +75,75 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
7375
{
7476
use hammerwork::queue::DatabaseQueue;
7577

78+
// Create jobs with various timeout configurations
7679
let job1 = Job::new(
7780
"email".to_string(),
7881
json!({"to": "[email protected]", "subject": "Hello"}),
79-
);
82+
); // Uses worker default timeout (30s)
83+
8084
let job2 = Job::new(
8185
"notifications".to_string(),
8286
json!({"message": "Welcome!", "user_id": 123}),
83-
);
87+
).with_timeout(std::time::Duration::from_secs(10)); // Custom 10-second timeout
88+
8489
let job3 = Job::new("email".to_string(), json!({"action": "fail"})); // This will fail
85-
let job4 = Job::new("email".to_string(), json!({"action": "fail"})); // This will also fail and become dead
90+
91+
let job4 = Job::new("email".to_string(), json!({"action": "fail"})) // This will also fail and become dead
92+
.with_timeout(std::time::Duration::from_secs(5)); // Short timeout for demo
93+
94+
// Job with a very long timeout for heavy processing
95+
let job5 = Job::new(
96+
"email".to_string(),
97+
json!({"to": "[email protected]", "subject": "Heavy Processing"}),
98+
).with_timeout(std::time::Duration::from_secs(300)) // 5-minute timeout
99+
.with_max_attempts(5); // More retry attempts for important jobs
86100

87101
let job1_id = queue.enqueue(job1).await?;
88102
let job2_id = queue.enqueue(job2).await?;
89103
let job3_id = queue.enqueue(job3).await?;
90104
let job4_id = queue.enqueue(job4).await?;
105+
let job5_id = queue.enqueue(job5).await?;
91106

92-
info!("Enqueued test jobs: {}, {}, {}, {}", job1_id, job2_id, job3_id, job4_id);
107+
info!("Enqueued test jobs with timeouts:");
108+
info!(" {} - uses worker default timeout (30s)", job1_id);
109+
info!(" {} - custom 10s timeout", job2_id);
110+
info!(" {} - will fail (uses default timeout)", job3_id);
111+
info!(" {} - will fail with 5s timeout", job4_id);
112+
info!(" {} - heavy processing with 5min timeout", job5_id);
93113

94114
// Let jobs process for a bit
95115
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
96116

97-
// Demonstrate statistics collection
98-
info!("=== Job Processing Statistics ===");
117+
// Demonstrate statistics collection with timeout tracking
118+
info!("=== Job Processing Statistics (Including Timeouts) ===");
99119
let system_stats = stats_collector.get_system_statistics(std::time::Duration::from_secs(300)).await?;
100-
info!("System Stats - Total: {}, Completed: {}, Failed: {}, Dead: {}, Error Rate: {:.2}%",
120+
info!("System Stats - Total: {}, Completed: {}, Failed: {}, Dead: {}, Timed Out: {}, Error Rate: {:.2}%",
101121
system_stats.total_processed,
102122
system_stats.completed,
103123
system_stats.failed,
104124
system_stats.dead,
125+
system_stats.timed_out,
105126
system_stats.error_rate * 100.0
106127
);
107128

108129
// Get queue-specific statistics
109130
let email_stats = stats_collector.get_queue_statistics("email", std::time::Duration::from_secs(300)).await?;
110-
info!("Email Queue Stats - Total: {}, Completed: {}, Failed: {}, Avg Processing Time: {:.2}ms",
131+
info!("Email Queue Stats - Total: {}, Completed: {}, Failed: {}, Timed Out: {}, Avg Processing Time: {:.2}ms",
111132
email_stats.total_processed,
112133
email_stats.completed,
113134
email_stats.failed,
135+
email_stats.timed_out,
114136
email_stats.avg_processing_time_ms
115137
);
116138

139+
let notifications_stats = stats_collector.get_queue_statistics("notifications", std::time::Duration::from_secs(300)).await?;
140+
info!("Notifications Queue Stats - Total: {}, Completed: {}, Timed Out: {}, Avg Processing Time: {:.2}ms",
141+
notifications_stats.total_processed,
142+
notifications_stats.completed,
143+
notifications_stats.timed_out,
144+
notifications_stats.avg_processing_time_ms
145+
);
146+
117147
// Demonstrate dead job management
118148
info!("=== Dead Job Management ===");
119149
let dead_jobs = queue.get_dead_jobs(Some(10), None).await?;
@@ -131,15 +161,30 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
131161
dead_summary.error_patterns
132162
);
133163

134-
// Demonstrate queue statistics from database
164+
// Demonstrate queue statistics from database (including timeout counts)
135165
let queue_stats = queue.get_queue_stats("email").await?;
136-
info!("Email Queue DB Stats - Pending: {}, Running: {}, Dead: {}, Completed: {}",
166+
info!("Email Queue DB Stats - Pending: {}, Running: {}, Dead: {}, Timed Out: {}, Completed: {}",
137167
queue_stats.pending_count,
138168
queue_stats.running_count,
139169
queue_stats.dead_count,
170+
queue_stats.timed_out_count,
140171
queue_stats.completed_count
141172
);
142173

174+
// Show all queue statistics
175+
let all_queue_stats = queue.get_all_queue_stats().await?;
176+
info!("=== All Queue Statistics ===");
177+
for stats in &all_queue_stats {
178+
info!("Queue '{}' - Pending: {}, Running: {}, Dead: {}, Timed Out: {}, Completed: {}",
179+
stats.queue_name,
180+
stats.pending_count,
181+
stats.running_count,
182+
stats.dead_count,
183+
stats.timed_out_count,
184+
stats.completed_count
185+
);
186+
}
187+
143188
// If there are dead jobs, demonstrate retry functionality
144189
if !dead_jobs.is_empty() {
145190
let dead_job_id = dead_jobs[0].id;
@@ -149,7 +194,19 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
149194
}
150195
}
151196

152-
info!("Example completed successfully. Check the statistics and dead job management features.");
197+
info!("Example completed successfully!");
198+
info!("Demonstrated features:");
199+
info!(" ✓ Job timeout configuration (per-job and worker defaults)");
200+
info!(" ✓ Timeout statistics tracking and monitoring");
201+
info!(" ✓ Dead job management and retry functionality");
202+
info!(" ✓ Comprehensive job processing statistics");
203+
info!("");
204+
info!("Timeout Features Shown:");
205+
info!(" • Worker default timeouts (30s for email, 60s for notifications)");
206+
info!(" • Job-specific timeouts (10s, 5s, 300s examples)");
207+
info!(" • Timeout event tracking in statistics");
208+
info!(" • Timeout counts in database queue statistics");
209+
info!("");
153210
info!("In a real application, you would start the worker pool to run indefinitely:");
154211
info!("worker_pool.start().await?;");
155212

0 commit comments

Comments
 (0)