Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,27 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.2.1] - 2025-06-30

### Fixed
- **πŸ› MySQL Query Field Completeness**
- Fixed `Database(ColumnNotFound("trace_id"))` errors in MySQL dequeue operations
- Updated MySQL `dequeue()` and `dequeue_with_priority_weights()` queries to include all tracing fields: `trace_id`, `correlation_id`, `parent_span_id`, `span_context`
- Ensures JobRow struct mapping works correctly with all database schema fields added in migration 009_add_tracing.mysql.sql
- Fixed two failing tests: `test_mysql_dequeue_includes_all_fields` and `test_mysql_dequeue_with_priority_weights_includes_all_fields`

### Enhanced
- **πŸ§ͺ Test Infrastructure Improvements**
- Improved test isolation using unique queue names to prevent test interference
- Fixed race conditions in result storage tests by implementing proper job completion polling
- Enhanced test database setup to use migration-based approach ensuring schema consistency
- Fixed 6 failing doctests in worker.rs by correcting async/await usage in documentation examples

### Technical Implementation
- MySQL dequeue queries now SELECT all 34 fields required by JobRow struct mapping
- Complete field list includes: id, queue_name, payload, status, priority, attempts, max_attempts, timeout_seconds, created_at, scheduled_at, started_at, completed_at, failed_at, timed_out_at, error_message, cron_schedule, next_run_at, recurring, timezone, batch_id, result_data, result_stored_at, result_expires_at, result_storage_type, result_ttl_seconds, result_max_size_bytes, depends_on, dependents, dependency_status, workflow_id, workflow_name, trace_id, correlation_id, parent_span_id, span_context
- All tests now passing: 228 unit tests, 135 doctests, 0 failures

## [1.2.0] - 2025-06-29

### Added
Expand Down
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
members = [
".",
"cargo-hammerwork",
"hammerwork-web",
"integrations/postgres-integration",
"integrations/mysql-integration",
]
resolver = "2"

[workspace.package]
version = "1.2.0"
version = "1.2.1"
edition = "2024"
license = "MIT"
repository = "https://github.com/CodingAnarchy/hammerwork"
Expand Down
56 changes: 56 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ A high-performance, database-driven job queue for Rust with comprehensive featur

## Features

- **πŸ“Š Web Dashboard**: Modern real-time web interface for monitoring queues, managing jobs, and system administration with authentication and WebSocket updates
- **πŸ” Job Tracing & Correlation**: Comprehensive distributed tracing with OpenTelemetry integration, trace IDs, correlation IDs, and lifecycle event hooks
- **πŸ”— Job Dependencies & Workflows**: Create complex data processing pipelines with job dependencies, sequential chains, and parallel processing with synchronization barriers
- **Multi-database support**: PostgreSQL and MySQL backends with optimized dependency queries
Expand All @@ -22,6 +23,8 @@ A high-performance, database-driven job queue for Rust with comprehensive featur

## Installation

### Core Library

```toml
[dependencies]
# Default features include metrics and alerting
Expand All @@ -38,13 +41,32 @@ hammerwork = { version = "1.2", features = ["postgres"], default-features = fals

**Feature Flags**: `postgres`, `mysql`, `metrics` (default), `alerting` (default), `tracing` (optional)

### Web Dashboard (Optional)

```bash
# Install the web dashboard
cargo install hammerwork-web --features postgres

# Or add to your project
[dependencies]
hammerwork-web = { version = "1.2", features = ["postgres"] }
```

Start the dashboard:

```bash
hammerwork-web --database-url postgresql://localhost/hammerwork
# Dashboard available at http://localhost:8080
```

## Quick Start

See the [Quick Start Guide](docs/quick-start.md) for complete examples with PostgreSQL and MySQL.

## Documentation

- **[Quick Start Guide](docs/quick-start.md)** - Get started with PostgreSQL and MySQL
- **[Web Dashboard](hammerwork-web/README.md)** - Real-time web interface for queue monitoring and job management
- **[Job Tracing & Correlation](docs/tracing.md)** - Distributed tracing, correlation IDs, and OpenTelemetry integration
- **[Job Dependencies & Workflows](docs/workflows.md)** - Complex pipelines, job dependencies, and orchestration
- **[Job Types & Configuration](docs/job-types.md)** - Job creation, priorities, timeouts, cron jobs
Expand Down Expand Up @@ -208,6 +230,37 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

This enables end-to-end tracing across your entire job processing pipeline with automatic span creation, correlation tracking, and integration with observability platforms like Jaeger, Zipkin, or DataDog.

## Web Dashboard

Start the real-time web dashboard for monitoring and managing your job queues:

```bash
# Start with PostgreSQL
hammerwork-web --database-url postgresql://localhost/hammerwork

# Start with authentication
hammerwork-web \
--database-url postgresql://localhost/hammerwork \
--auth \
--username admin \
--password mypassword

# Start with custom configuration
hammerwork-web --config dashboard.toml
```

The dashboard provides:

- **Real-time Monitoring**: Live queue statistics, job counts, and throughput metrics
- **Job Management**: View, retry, cancel, and inspect jobs with detailed payload information
- **Queue Administration**: Clear queues, monitor performance, and manage priorities
- **Interactive Charts**: Throughput graphs and job status distributions
- **WebSocket Updates**: Real-time updates without page refresh
- **REST API**: Complete programmatic access to all dashboard features
- **Authentication**: Secure access with bcrypt password hashing and rate limiting

Access the dashboard at `http://localhost:8080` after starting the server.

## Database Setup

### Using Migrations (Recommended)
Expand All @@ -223,6 +276,9 @@ cargo hammerwork migrate --database-url postgresql://localhost/hammerwork

# Check migration status
cargo hammerwork status --database-url postgresql://localhost/hammerwork

# Start the web dashboard after migrations
hammerwork-web --database-url postgresql://localhost/hammerwork
```

### Application Usage
Expand Down
39 changes: 0 additions & 39 deletions ROADMAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,6 @@

This roadmap outlines planned features for Hammerwork, prioritized by impact level and implementation complexity. Features are organized into phases based on their value proposition to users and estimated development effort.

## Phase 1: High Impact, Medium-High Complexity
*Features that provide significant value but require more substantial implementation effort*

### 🌐 Admin Dashboard & CLI Tools
**Impact: High** | **Complexity: Medium-High** | **Priority: Medium**

Critical for operational management and developer productivity.

**βœ… CLI Tools - COMPLETED v1.1.0+**
The `cargo-hammerwork` CLI provides comprehensive job queue management:
- Database migrations and setup
- Job management (enqueue, list, retry, cancel, inspect)
- Queue operations (list, clear, statistics)
- Worker management and monitoring
- Batch operations and cron scheduling
- Workflow visualization and dependency management
- Real-time monitoring and health checks

```bash
# Available CLI commands
cargo hammerwork migration run
cargo hammerwork job list --queue email
cargo hammerwork queue stats
cargo hammerwork worker start --queue default
cargo hammerwork monitor dashboard
```

**🚧 Web Dashboard - REMAINING**
Web-based admin interface for visual queue management:

```rust
// Future web-based admin interface
let admin_server = AdminServer::new()
.with_queue_monitoring()
.with_job_management()
.with_worker_controls()
.bind("127.0.0.1:8080");
```

## Phase 2: Medium Impact, Variable Complexity
*Valuable features for specific use cases or operational efficiency*

Expand Down
22 changes: 13 additions & 9 deletions cargo-hammerwork/src/commands/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,28 +528,27 @@ impl WorkflowCommand {

for job in jobs {
if !visited.contains(&job.id) {
self.calculate_job_level(job, &job_map, &mut levels, &mut visited, 0);
Self::calculate_job_level(job, &job_map, &mut levels, &mut visited, 0);
}
}

// Group jobs by level
let mut result: HashMap<usize, Vec<&JobNode>> = HashMap::new();
for (job_id, level) in levels {
if let Some(job) = job_map.get(&job_id) {
result.entry(level).or_insert_with(Vec::new).push(job);
result.entry(level).or_default().push(job);
}
}

result.into_iter().collect()
}

fn calculate_job_level(
&self,
job: &JobNode,
job_map: &HashMap<String, &JobNode>,
levels: &mut HashMap<String, usize>,
visited: &mut HashSet<String>,
current_level: usize,
_current_level: usize,
) {
if visited.contains(&job.id) {
return;
Expand All @@ -564,7 +563,13 @@ impl WorkflowCommand {
.filter_map(|dep_id| {
if let Some(dep_job) = job_map.get(dep_id) {
if !visited.contains(dep_id) {
self.calculate_job_level(dep_job, job_map, levels, visited, current_level);
Self::calculate_job_level(
dep_job,
job_map,
levels,
visited,
_current_level,
);
}
levels.get(dep_id).copied()
} else {
Expand Down Expand Up @@ -803,21 +808,20 @@ impl WorkflowCommand {

for root in &roots {
if !visited.contains(&root.id) {
self.print_job_tree_node(root, &job_map, &mut visited, 0, target_job_id);
Self::print_job_tree_node(root, &job_map, &mut visited, 0, target_job_id);
}
}

// Handle any remaining unvisited jobs (cycles or disconnected components)
for job in jobs {
if !visited.contains(&job.id) {
println!(" [Disconnected]");
self.print_job_tree_node(job, &job_map, &mut visited, 0, target_job_id);
Self::print_job_tree_node(job, &job_map, &mut visited, 0, target_job_id);
}
}
}

fn print_job_tree_node(
&self,
job: &JobNode,
job_map: &HashMap<String, &JobNode>,
visited: &mut HashSet<String>,
Expand Down Expand Up @@ -847,7 +851,7 @@ impl WorkflowCommand {
for dependent_id in &job.dependents {
if let Some(dependent_job) = job_map.get(dependent_id) {
if !visited.contains(dependent_id) {
self.print_job_tree_node(
Self::print_job_tree_node(
dependent_job,
job_map,
visited,
Expand Down
8 changes: 4 additions & 4 deletions cargo-hammerwork/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,16 @@ use std::path::PathBuf;
pub struct Config {
/// Database connection URL (e.g., "postgresql://localhost/hammerwork")
pub database_url: Option<String>,

/// Default queue name for operations
pub default_queue: Option<String>,

/// Default limit for list operations
pub default_limit: Option<u32>,

/// Log level (error, warn, info, debug, trace)
pub log_level: Option<String>,

/// Database connection pool size
pub connection_pool_size: Option<u32>,
}
Expand Down
2 changes: 1 addition & 1 deletion cargo-hammerwork/src/utils/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
//!
//! // Connect to MySQL
//! let mysql_pool = DatabasePool::connect(
//! "mysql://localhost/hammerwork",
//! "mysql://localhost/hammerwork",
//! 5
//! ).await?;
//! # Ok(())
Expand Down
6 changes: 3 additions & 3 deletions cargo-hammerwork/src/utils/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ use std::fmt;
/// use cargo_hammerwork::utils::display::JobTable;
///
/// let mut table = JobTable::new();
///
///
/// // Add multiple jobs
/// table.add_job_row(
/// "job-id-1", "email", "pending", "normal", 0,
Expand All @@ -80,7 +80,7 @@ use std::fmt;
/// "job-id-2", "data-processing", "running", "high", 1,
/// "2024-01-01 09:55:00", "2024-01-01 10:00:00"
/// );
///
///
/// // The table will display with color-coded status and priority
/// ```
pub struct JobTable {
Expand Down Expand Up @@ -189,7 +189,7 @@ impl fmt::Display for JobTable {
/// stats.add_stats_row("pending", "normal", 100);
/// stats.add_stats_row("running", "high", 10);
/// stats.add_stats_row("failed", "critical", 2);
///
///
/// // Display shows icons: 🟑 pending, πŸ”΅ running, πŸ”΄ failed
/// ```
pub struct StatsTable {
Expand Down
1 change: 0 additions & 1 deletion cargo-hammerwork/tests/sql_query_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use sqlx::{MySqlPool, PgPool, Row};
/// Tests for SQL query validation and correctness
/// These tests validate that our dynamic SQL queries are syntactically correct
/// and produce expected results

#[cfg(test)]
mod postgres_tests {
use super::*;
Expand Down
10 changes: 8 additions & 2 deletions examples/autoscaling_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,23 @@ async fn main() -> Result<()> {
// Uncomment the appropriate line below based on your database setup

// For PostgreSQL:
#[cfg(feature = "postgres")]
#[cfg(all(feature = "postgres", not(feature = "mysql")))]
let pool = sqlx::PgPool::connect("postgresql://localhost/hammerwork")
.await
.map_err(hammerwork::HammerworkError::Database)?;

// For MySQL:
#[cfg(feature = "mysql")]
#[cfg(all(feature = "mysql", not(feature = "postgres")))]
let pool = sqlx::MySqlPool::connect("mysql://localhost/hammerwork")
.await
.map_err(hammerwork::HammerworkError::Database)?;

// Default to PostgreSQL when both features are enabled
#[cfg(all(feature = "postgres", feature = "mysql"))]
let pool = sqlx::PgPool::connect("postgresql://localhost/hammerwork")
.await
.map_err(hammerwork::HammerworkError::Database)?;

let queue = Arc::new(JobQueue::new(pool));

// Initialize database tables
Expand Down
6 changes: 4 additions & 2 deletions examples/priority_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,14 +404,16 @@ async fn demonstrate_worker_pool_priorities(
let mut pool = WorkerPool::new().with_stats_collector(stats_collector.clone());

// Add workers with different priority configurations
let handler: Arc<
type JobHandler = Arc<
dyn Fn(
Job,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = hammerwork::Result<()>> + Send>,
> + Send
+ Sync,
> = Arc::new(|job: Job| {
>;

let handler: JobHandler = Arc::new(|job: Job| {
Box::pin(async move {
info!(
"πŸ”„ Pool worker processing: {} (Priority: {:?})",
Expand Down
8 changes: 6 additions & 2 deletions examples/retry_strategies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let database_url = get_database_url();
info!("πŸ”— Connecting to database: {}", database_url);

#[cfg(feature = "postgres")]
#[cfg(all(feature = "postgres", not(feature = "mysql")))]
let pool = sqlx::PgPool::connect(&database_url).await?;

#[cfg(feature = "mysql")]
#[cfg(all(feature = "mysql", not(feature = "postgres")))]
let pool = sqlx::MySqlPool::connect(&database_url).await?;

// Default to PostgreSQL when both features are enabled
#[cfg(all(feature = "postgres", feature = "mysql"))]
let pool = sqlx::PgPool::connect(&database_url).await?;

let queue = Arc::new(JobQueue::new(pool));

// Note: Database tables should be created using migrations
Expand Down
Loading
Loading