Skip to content

Commit e51afec

Browse files
CodingAnarchyclaude
andcommitted
feat: Add comprehensive Job Tracing & Correlation system
- Add distributed tracing fields to Job struct (trace_id, correlation_id, parent_span_id, span_context) - Create database migration 009_add_tracing for PostgreSQL and MySQL with optimized indexes - Add job builder methods for tracing configuration (.with_trace_id(), .with_correlation_id(), etc.) - Implement OpenTelemetry integration with OTLP export support (feature-gated) - Add worker event hooks for job lifecycle monitoring (on_job_start, on_job_complete, etc.) - Create comprehensive tracing.rs module with TraceId, CorrelationId, and TracingConfig types - Add automatic span creation and trace context propagation for job processing - Update documentation with tracing examples and feature flag information - Maintain backward compatibility - existing jobs work unchanged - Add 22 new tests for complete tracing functionality coverage This enables production-ready distributed tracing for debugging and monitoring job processing across service boundaries with integration to observability platforms like Jaeger, Zipkin, and DataDog. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
1 parent a6490ad commit e51afec

File tree

14 files changed

+2048
-74
lines changed

14 files changed

+2048
-74
lines changed

CHANGELOG.md

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,79 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [1.2.0] - 2025-06-29
9+
10+
### Added
11+
- **🔍 Job Tracing & Correlation** - Comprehensive distributed tracing system for production observability
12+
- **Core Tracing Fields**: Added `trace_id`, `correlation_id`, `parent_span_id`, and `span_context` fields to Job struct
13+
- **Database Migrations**: New migration 009_add_tracing for PostgreSQL and MySQL with optimized indexes for trace/correlation ID lookups
14+
- **Job Builder Methods**: Added `.with_trace_id()`, `.with_correlation_id()`, `.with_parent_span_id()`, and `.with_span_context()` for easy job tracing configuration
15+
- **TraceId and CorrelationId Types**: New strongly-typed identifiers with generation, conversion, and validation methods
16+
- **OpenTelemetry Integration**: Feature-gated OpenTelemetry support with OTLP export to Jaeger, Zipkin, DataDog, etc.
17+
- **TracingConfig**: Complete OpenTelemetry configuration with service metadata, resource attributes, and endpoint configuration
18+
- **Automatic Span Creation**: `create_job_span()` function creates spans with rich job metadata and trace context propagation
19+
- **Span Context Management**: `set_job_trace_context()` for extracting and storing trace context from OpenTelemetry spans
20+
21+
- **🎯 Worker Event Hooks** - Lifecycle event system for custom tracing and monitoring integration
22+
- **JobHookEvent**: Event data structure with job metadata, timestamps, duration, and error information
23+
- **JobEventHooks**: Configurable lifecycle callbacks for job start, completion, failure, timeout, and retry events
24+
- **Builder Pattern**: Convenient `.on_job_start()`, `.on_job_complete()`, `.on_job_fail()`, `.on_job_timeout()`, `.on_job_retry()` methods
25+
- **Worker Integration**: Event hooks integrated into job processing pipeline with automatic event firing
26+
- **Automatic Span Management**: OpenTelemetry spans automatically created and updated throughout job lifecycle
27+
28+
- **⚡ Production-Ready Tracing Infrastructure**
29+
- **Feature Gated**: All tracing functionality behind optional `tracing` feature flag for minimal overhead
30+
- **Backward Compatible**: Existing jobs and workers continue working unchanged
31+
- **Database Optimized**: Indexed trace and correlation ID columns for efficient querying
32+
- **OpenTelemetry Standards**: Full OTLP support with configurable exporters and sampling
33+
- **Span Attributes**: Rich span metadata including job ID, queue name, priority, status, and custom business data
34+
- **Error Tracking**: Automatic span status updates for success, failure, and timeout scenarios
35+
36+
- **🧪 Comprehensive Testing** - 177 total tests including 22 new tracing-specific tests
37+
- **Unit Tests**: Complete coverage of TraceId, CorrelationId, TracingConfig, and span creation functionality
38+
- **Integration Tests**: Event hook testing with realistic job processing scenarios
39+
- **Feature Testing**: Validation of tracing feature flag behavior and optional inclusion
40+
- **Span Testing**: OpenTelemetry span creation and attribute validation
41+
42+
### Enhanced
43+
- **📖 Documentation Updates**
44+
- **README.md**: Added Job Tracing & Correlation feature to main features list with comprehensive example
45+
- **Installation Guide**: Updated to show tracing feature installation options
46+
- **Tracing Example**: Complete OpenTelemetry setup example with worker event hooks and correlation tracking
47+
- **Feature Flags**: Updated to include `tracing` (optional) feature flag documentation
48+
- **Database Schema**: Updated schema documentation to mention distributed tracing fields
49+
50+
- **🗺️ ROADMAP.md**: Removed completed "Job Tracing & Correlation" feature from Phase 1 priorities
51+
52+
### Technical Implementation
53+
- **OpenTelemetry Dependencies**: Added feature-gated dependencies: `opentelemetry`, `opentelemetry_sdk`, `opentelemetry-otlp`, `tracing-opentelemetry`
54+
- **Async Integration**: Full async/await support with tokio runtime integration
55+
- **Memory Efficient**: Trace context stored as optional strings with minimal memory overhead
56+
- **Type Safety**: Strongly typed trace and correlation IDs with comprehensive validation
57+
- **Database Agnostic**: Tracing works identically across PostgreSQL and MySQL backends
58+
- **Export Support**: OTLP export to all major observability platforms (Jaeger, Zipkin, DataDog, New Relic, etc.)
59+
60+
### Usage Example
61+
```rust
62+
// Initialize tracing
63+
let config = TracingConfig::new()
64+
.with_service_name("job-processor")
65+
.with_otlp_endpoint("http://jaeger:4317");
66+
init_tracing(config).await?;
67+
68+
// Create traced jobs
69+
let job = Job::new("email_queue".to_string(), json!({"to": "[email protected]"}))
70+
.with_trace_id("trace-123")
71+
.with_correlation_id("order-456");
72+
73+
// Worker with event hooks
74+
let worker = Worker::new(queue, "email_queue".to_string(), handler)
75+
.on_job_start(|event| { /* custom tracing logic */ })
76+
.on_job_complete(|event| { /* success tracking */ });
77+
```
78+
79+
This release provides comprehensive distributed tracing capabilities essential for debugging and monitoring job processing in production distributed systems.
80+
881
## [1.1.0] - 2025-06-29
982

1083
### Added

Cargo.toml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ members = [
88
resolver = "2"
99

1010
[workspace.package]
11-
version = "1.1.0"
11+
version = "1.2.0"
1212
edition = "2024"
1313
license = "MIT"
1414
repository = "https://github.com/CodingAnarchy/hammerwork"
@@ -18,7 +18,7 @@ documentation = "https://docs.rs/hammerwork"
1818
rust-version = "1.86"
1919

2020
[workspace.dependencies]
21-
hammerwork = { version = "1.0.0", path = "." }
21+
hammerwork = { version = "1.2.0", path = "." }
2222
tokio = { version = "1.0", features = ["full"] }
2323
sqlx = { version = "0.8", features = ["runtime-tokio-rustls", "chrono", "uuid", "json"] }
2424
chrono = { version = "0.4", features = ["serde"] }
@@ -77,13 +77,18 @@ reqwest = { version = "0.12", features = ["json"], optional = true }
7777
warp = { version = "0.3", optional = true }
7878
clap = { workspace = true }
7979
tracing-subscriber = { workspace = true }
80+
opentelemetry = { version = "0.22", optional = true }
81+
opentelemetry_sdk = { version = "0.22", features = ["rt-tokio"], optional = true }
82+
opentelemetry-otlp = { version = "0.15", features = ["tokio"], optional = true }
83+
tracing-opentelemetry = { version = "0.23", optional = true }
8084

8185
[features]
8286
default = ["metrics", "alerting"]
8387
postgres = ["sqlx/postgres"]
8488
mysql = ["sqlx/mysql"]
8589
metrics = ["prometheus", "warp"]
8690
alerting = ["reqwest"]
91+
tracing = ["opentelemetry", "opentelemetry_sdk", "opentelemetry-otlp", "tracing-opentelemetry"]
8792

8893
[dev-dependencies]
8994
tokio-test = { workspace = true }

README.md

Lines changed: 89 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ A high-performance, database-driven job queue for Rust with comprehensive featur
44

55
## Features
66

7+
- **🔍 Job Tracing & Correlation**: Comprehensive distributed tracing with OpenTelemetry integration, trace IDs, correlation IDs, and lifecycle event hooks
78
- **🔗 Job Dependencies & Workflows**: Create complex data processing pipelines with job dependencies, sequential chains, and parallel processing with synchronization barriers
89
- **Multi-database support**: PostgreSQL and MySQL backends with optimized dependency queries
910
- **Advanced retry strategies**: Exponential backoff, linear, Fibonacci, and custom retry patterns with jitter
@@ -24,15 +25,18 @@ A high-performance, database-driven job queue for Rust with comprehensive featur
2425
```toml
2526
[dependencies]
2627
# Default features include metrics and alerting
27-
hammerwork = { version = "1.0", features = ["postgres"] }
28+
hammerwork = { version = "1.2", features = ["postgres"] }
2829
# or
29-
hammerwork = { version = "1.0", features = ["mysql"] }
30+
hammerwork = { version = "1.2", features = ["mysql"] }
31+
32+
# With distributed tracing
33+
hammerwork = { version = "1.2", features = ["postgres", "tracing"] }
3034

3135
# Minimal installation
32-
hammerwork = { version = "1.0", features = ["postgres"], default-features = false }
36+
hammerwork = { version = "1.2", features = ["postgres"], default-features = false }
3337
```
3438

35-
**Feature Flags**: `postgres`, `mysql`, `metrics` (default), `alerting` (default)
39+
**Feature Flags**: `postgres`, `mysql`, `metrics` (default), `alerting` (default), `tracing` (optional)
3640

3741
## Quick Start
3842

@@ -41,6 +45,7 @@ See the [Quick Start Guide](docs/quick-start.md) for complete examples with Post
4145
## Documentation
4246

4347
- **[Quick Start Guide](docs/quick-start.md)** - Get started with PostgreSQL and MySQL
48+
- **[Job Tracing & Correlation](docs/tracing.md)** - Distributed tracing, correlation IDs, and OpenTelemetry integration
4449
- **[Job Dependencies & Workflows](docs/workflows.md)** - Complex pipelines, job dependencies, and orchestration
4550
- **[Job Types & Configuration](docs/job-types.md)** - Job creation, priorities, timeouts, cron jobs
4651
- **[Worker Configuration](docs/worker-configuration.md)** - Worker setup, rate limiting, statistics
@@ -126,6 +131,83 @@ queue.enqueue_workflow(workflow).await?;
126131

127132
Jobs will only execute when their dependencies are satisfied, enabling sophisticated data processing pipelines and business workflows.
128133

134+
## Tracing Example
135+
136+
Enable comprehensive distributed tracing with OpenTelemetry integration:
137+
138+
```rust
139+
use hammerwork::{Job, Worker, tracing::{TracingConfig, init_tracing}, queue::DatabaseQueue};
140+
use serde_json::json;
141+
use std::sync::Arc;
142+
143+
#[tokio::main]
144+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
145+
// Initialize distributed tracing
146+
let tracing_config = TracingConfig::new()
147+
.with_service_name("job-processor")
148+
.with_service_version("1.0.0")
149+
.with_environment("production")
150+
.with_otlp_endpoint("http://jaeger:4317");
151+
152+
init_tracing(tracing_config).await?;
153+
154+
let pool = sqlx::PgPool::connect("postgresql://localhost/hammerwork").await?;
155+
let queue = Arc::new(JobQueue::new(pool));
156+
157+
// Create traced jobs with correlation for business workflows
158+
let trace_id = "trace-12345";
159+
let correlation_id = "order-67890";
160+
161+
let payment_job = Job::new("payment_queue".to_string(), json!({
162+
"order_id": "67890",
163+
"amount": 299.99
164+
}))
165+
.with_trace_id(trace_id)
166+
.with_correlation_id(correlation_id);
167+
168+
let email_job = Job::new("email_queue".to_string(), json!({
169+
"order_id": "67890",
170+
"template": "order_confirmation"
171+
}))
172+
.with_trace_id(trace_id)
173+
.with_correlation_id(correlation_id)
174+
.depends_on(&payment_job.id);
175+
176+
// Worker with lifecycle event hooks for observability
177+
let handler = Arc::new(|job: Job| Box::pin(async move {
178+
println!("Processing: {:?}", job.payload);
179+
// Your business logic here
180+
Ok(())
181+
}));
182+
183+
let worker = Worker::new(queue.clone(), "payment_queue".to_string(), handler)
184+
.on_job_start(|event| {
185+
println!("Job {} started (trace: {}, correlation: {})",
186+
event.job.id,
187+
event.job.trace_id.unwrap_or_default(),
188+
event.job.correlation_id.unwrap_or_default());
189+
})
190+
.on_job_complete(|event| {
191+
println!("Job {} completed in {:?}",
192+
event.job.id,
193+
event.duration.unwrap_or_default());
194+
})
195+
.on_job_fail(|event| {
196+
eprintln!("Job {} failed: {}",
197+
event.job.id,
198+
event.error.unwrap_or_default());
199+
});
200+
201+
// Enqueue jobs - they'll be automatically traced
202+
queue.enqueue(payment_job).await?;
203+
queue.enqueue(email_job).await?;
204+
205+
Ok(())
206+
}
207+
```
208+
209+
This enables end-to-end tracing across your entire job processing pipeline with automatic span creation, correlation tracking, and integration with observability platforms like Jaeger, Zipkin, or DataDog.
210+
129211
## Database Setup
130212

131213
### Using Migrations (Recommended)
@@ -160,12 +242,12 @@ queue.enqueue(job).await?;
160242
### Database Schema
161243

162244
Hammerwork uses optimized tables with comprehensive indexing:
163-
- **`hammerwork_jobs`** - Main job table with priorities, timeouts, cron scheduling, retry strategies, and result storage
245+
- **`hammerwork_jobs`** - Main job table with priorities, timeouts, cron scheduling, retry strategies, result storage, and distributed tracing fields
164246
- **`hammerwork_batches`** - Batch metadata and tracking (v0.7.0+)
165247
- **`hammerwork_job_results`** - Job result storage with TTL and expiration (v0.8.0+)
166248
- **`hammerwork_migrations`** - Migration tracking for schema evolution
167249

168-
The schema supports all features including job prioritization, advanced retry strategies, timeouts, cron scheduling, batch processing, result storage with TTL, worker autoscaling, and comprehensive lifecycle tracking. See [Database Migrations](docs/migrations.md) for details.
250+
The schema supports all features including job prioritization, advanced retry strategies, timeouts, cron scheduling, batch processing, result storage with TTL, distributed tracing with trace/correlation IDs, worker autoscaling, and comprehensive lifecycle tracking. See [Database Migrations](docs/migrations.md) for details.
169251

170252
## Development
171253

@@ -194,6 +276,7 @@ Working examples in `examples/`:
194276
- `retry_strategies.rs` - Advanced retry patterns with exponential backoff and jitter
195277
- `result_storage_example.rs` - Job result storage and retrieval
196278
- `autoscaling_example.rs` - Dynamic worker pool scaling based on queue depth
279+
- `tracing_example.rs` - Distributed tracing with OpenTelemetry and event hooks
197280

198281
```bash
199282
cargo run --example postgres_example --features postgres

ROADMAP.md

Lines changed: 1 addition & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -2,60 +2,9 @@
22

33
This roadmap outlines planned features for Hammerwork, prioritized by impact level and implementation complexity. Features are organized into phases based on their value proposition to users and estimated development effort.
44

5-
## ✅ Completed Features
6-
7-
### 🔗 Job Dependencies & Workflows
8-
**Impact: Very High** | **Complexity: High** | **Status: ✅ COMPLETED**
9-
10-
**Game-changing feature for complex data processing pipelines and business workflows.**
11-
12-
**Core Implementation Complete:**
13-
- Job dependency tracking with `depends_on()` and `depends_on_jobs()` methods
14-
- `JobGroup` workflow orchestration with sequential and parallel job execution
15-
- Dependency graph validation with cycle detection
16-
- Database schema with dependency fields for PostgreSQL and MySQL
17-
- Dependency-aware job polling (only executes jobs with satisfied dependencies)
18-
- Failure policy configuration (`FailFast`, `ContinueOnFailure`, `Manual`)
19-
20-
```rust
21-
// Sequential job chains
22-
let job1 = Job::new("process_data".to_string(), data1);
23-
let job2 = Job::new("transform_data".to_string(), data2)
24-
.depends_on(&job1.id);
25-
let job3 = Job::new("export_data".to_string(), data3)
26-
.depends_on(&job2.id);
27-
28-
// Parallel job groups with barriers
29-
let job_group = JobGroup::new("data_pipeline")
30-
.add_parallel_jobs(vec![job_a, job_b, job_c])
31-
.then(final_job); // Runs after all parallel jobs complete
32-
```
33-
34-
🚧 **Remaining Work:** Full workflow method implementations, completion triggers, and CLI integration.
35-
365
## Phase 1: High Impact, Medium-High Complexity
376
*Features that provide significant value but require more substantial implementation effort*
387

39-
### 🔍 Job Tracing & Correlation
40-
**Impact: High** | **Complexity: Medium-High** | **Priority: Medium-High**
41-
42-
Essential for debugging and monitoring in distributed systems.
43-
44-
```rust
45-
// Distributed tracing support
46-
let job = Job::new("process_order".to_string(), order_data)
47-
.with_trace_id("trace-123")
48-
.with_correlation_id("order-456")
49-
.with_span_context(span_context);
50-
51-
// Job lifecycle events
52-
worker.on_job_start(|job| tracing::info!("Job started: {}", job.id));
53-
worker.on_job_complete(|job, duration| {
54-
metrics::histogram!("job.duration", duration, "queue" => job.queue_name);
55-
});
56-
```
57-
58-
598
### 🌐 Admin Dashboard & CLI Tools
609
**Impact: High** | **Complexity: Medium-High** | **Priority: Medium**
6110

@@ -240,8 +189,7 @@ Features are ordered within each phase by priority and should generally be imple
240189

241190
**Phase 1 (Advanced Features) - CURRENT PRIORITIES**
242191
1. Job Dependencies & Workflows
243-
2. Job Tracing & Correlation
244-
3. Admin Dashboard & CLI Tools
192+
2. Admin Dashboard & CLI Tools
245193

246194
**Phase 2 (Operational Features)**
247195
1. Job Archiving & Retention

src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ pub enum HammerworkError {
3737

3838
#[error("Workflow error: {message}")]
3939
Workflow { message: String },
40+
41+
#[error("Tracing error: {message}")]
42+
Tracing { message: String },
4043
}
4144

4245
#[cfg(test)]

0 commit comments

Comments
 (0)