Skip to content

Commit 8118585

Browse files
CodingAnarchyclaude
andcommitted
feat: Complete queue pause/resume system for operational control v1.11.0
Implement comprehensive queue pause and resume functionality across all Hammerwork interfaces: **🔧 Core Library (hammerwork)** - Add pause_queue(), resume_queue(), is_queue_paused(), get_queue_pause_info(), get_paused_queues() to DatabaseQueue trait - Full PostgreSQL and MySQL backend implementation with optimized queries - Database migration 014 adding hammerwork_queue_pause table with audit fields - Worker integration: automatically skip job dequeuing for paused queues - Graceful operation: jobs in progress continue, new jobs blocked - QueuePauseInfo struct for comprehensive pause state tracking **🌐 Web Interface (hammerwork-web)** - Interactive pause/resume buttons with dynamic UI updates - Visual status badges: 🟢 Active / 🟡 Paused with color-coded styling - Enhanced queue API responses including pause state information - Real-time user feedback with success/error notifications - Updated queue table layout with Status column for operational visibility **🖥️ CLI Tools (cargo-hammerwork)** - queue pause/resume commands with comprehensive feedback - queue paused command to list all paused queues with details - Enhanced queue list command showing pause status with emoji indicators - Full audit trail support recording CLI operations for compliance **🏗️ Database Schema** - New hammerwork_queue_pause table with queue_name, timestamps, audit fields - Automatic timestamp management for PostgreSQL (triggers) and MySQL - Proper indexing on paused_at for efficient query performance - Cross-database compatibility with database-specific SQL optimizations **✨ Key Features** - Persistent pause state across worker restarts and database reconnections - Audit trail tracking who paused/resumed queues and when - Immediate operational control for maintenance windows and emergencies - Consistent interface across library, web UI, and CLI - Zero downtime: pausing doesn't interrupt running jobs 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
1 parent 8ba0eeb commit 8118585

File tree

16 files changed

+812
-56
lines changed

16 files changed

+812
-56
lines changed

CHANGELOG.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,44 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [1.11.0] - 2025-07-14
9+
10+
### Added
11+
- **⏸️ Queue Pause/Resume Functionality**
12+
- Complete queue pause and resume system for operational control and maintenance windows
13+
- New `pause_queue()`, `resume_queue()`, `is_queue_paused()`, `get_queue_pause_info()`, and `get_paused_queues()` methods in DatabaseQueue trait
14+
- Database migration 014 adding `hammerwork_queue_pause` table for persistent pause state storage
15+
- Full PostgreSQL and MySQL backend implementation with optimized queries and proper indexing
16+
- Worker integration automatically respecting paused queues - workers skip job dequeuing when queues are paused
17+
- Graceful operation: jobs already in progress continue to completion while new jobs are blocked
18+
- Audit trail support tracking who paused/resumed queues and when for operational transparency
19+
20+
- **🌐 Web UI Queue Management**
21+
- Enhanced web dashboard with visual queue status indicators showing active/paused state
22+
- Interactive pause/resume buttons with dynamic UI updates based on current queue state
23+
- Real-time status badges with color-coded indicators: 🟢 Active, 🟡 Paused
24+
- Immediate user feedback with success/error notifications for all queue operations
25+
- Updated queue API endpoints supporting pause/resume actions via `/api/queues/{name}/actions`
26+
- Extended queue information API responses including `is_paused`, `paused_at`, and `paused_by` fields
27+
28+
- **🏗️ Database Schema and Migration**
29+
- New `hammerwork_queue_pause` table with queue_name (primary key), timestamps, and audit fields
30+
- Automatic timestamp management for PostgreSQL (triggers) and MySQL (ON UPDATE CURRENT_TIMESTAMP)
31+
- Proper indexing on `paused_at` for efficient query performance
32+
- Cross-database compatibility with database-specific SQL optimizations
33+
34+
### Enhanced
35+
- **📊 API Responses**
36+
- Queue information now includes pause status, pause timestamp, and who initiated the pause
37+
- Enhanced queue statistics with operational state visibility
38+
- Improved error handling and user feedback for all queue management operations
39+
40+
- **🎨 Web Interface**
41+
- Updated queue table layout with new Status column for better visibility
42+
- Added success/warning button styles for pause/resume actions
43+
- Enhanced CSS styling with consistent color scheme and visual feedback
44+
- Improved user experience with contextual action buttons
45+
846
## [1.10.0] - 2025-07-14
947

1048
### Added

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ members = [
99
resolver = "2"
1010

1111
[workspace.package]
12-
version = "1.10.0"
12+
version = "1.11.0"
1313
edition = "2024"
1414
license = "MIT"
1515
repository = "https://github.com/CodingAnarchy/hammerwork"
@@ -19,7 +19,7 @@ documentation = "https://docs.rs/hammerwork"
1919
rust-version = "1.86"
2020

2121
[workspace.dependencies]
22-
hammerwork = { version = "1.10.0", path = "." }
22+
hammerwork = { version = "1.11.0", path = "." }
2323
tokio = { version = "1.0", features = ["full"] }
2424
sqlx = { version = "0.8", features = ["runtime-tokio-rustls", "chrono", "uuid", "json"] }
2525
chrono = { version = "0.4", features = ["serde"] }

cargo-hammerwork/CHANGELOG.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,35 @@ All notable changes to the cargo-hammerwork CLI tool will be documented in this
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [1.11.0] - 2025-07-14
9+
10+
### Added
11+
- **⏸️ Queue Pause/Resume Commands**
12+
- `queue pause` - Pause job processing for specific queues with audit tracking
13+
- `queue resume` - Resume job processing for previously paused queues
14+
- `queue paused` - List all currently paused queues with detailed information
15+
- Enhanced `queue list` command to show pause status for all queues
16+
- Complete PostgreSQL and MySQL support with proper error handling and user feedback
17+
- CLI operations automatically record who performed the action for audit trails
18+
19+
- **📊 Enhanced Queue Management**
20+
- Visual status indicators in queue listings: ⏸️ Paused / ▶️ Active
21+
- Detailed pause information including timestamp, user, and reason
22+
- Improved queue overview with operational state visibility
23+
- Better error messages and confirmation feedback for queue operations
24+
25+
### Enhanced
26+
- **🗄️ Database Migration System**
27+
- Updated migration registry to include queue pause functionality (migration 014)
28+
- Enhanced migration tracking and execution for comprehensive schema management
29+
- Cross-database compatibility improvements for PostgreSQL and MySQL migrations
30+
31+
- **🚀 CLI User Experience**
32+
- Enhanced queue listing with status column for better operational visibility
33+
- Improved command feedback with emoji indicators and clear messaging
34+
- Comprehensive error handling for queue operations
35+
- Consistent command-line interface across all queue management functions
36+
837
## [1.5.0] - 2025-07-02
938

1039
### Added

cargo-hammerwork/src/commands/queue.rs

Lines changed: 188 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ pub enum QueueCommand {
4848
#[arg(short = 'n', long, help = "Queue name")]
4949
queue: String,
5050
},
51+
#[command(about = "List all paused queues")]
52+
Paused {
53+
#[arg(short, long, help = "Database connection URL")]
54+
database_url: Option<String>,
55+
},
5156
#[command(about = "Get queue health status")]
5257
Health {
5358
#[arg(short, long, help = "Database connection URL")]
@@ -85,6 +90,9 @@ impl QueueCommand {
8590
QueueCommand::Resume { queue, .. } => {
8691
resume_queue(pool, queue).await?;
8792
}
93+
QueueCommand::Paused { .. } => {
94+
list_paused_queues(pool).await?;
95+
}
8896
QueueCommand::Health { queue, .. } => {
8997
show_queue_health(pool, queue.clone()).await?;
9098
}
@@ -99,6 +107,7 @@ impl QueueCommand {
99107
QueueCommand::Clear { database_url, .. } => database_url,
100108
QueueCommand::Pause { database_url, .. } => database_url,
101109
QueueCommand::Resume { database_url, .. } => database_url,
110+
QueueCommand::Paused { database_url, .. } => database_url,
102111
QueueCommand::Health { database_url, .. } => database_url,
103112
};
104113

@@ -114,21 +123,24 @@ impl QueueCommand {
114123
async fn list_queues(pool: DatabasePool) -> Result<()> {
115124
let query = r#"
116125
SELECT
117-
queue_name,
118-
COUNT(*) as total_jobs,
119-
COUNT(CASE WHEN status = 'pending' THEN 1 END) as pending,
120-
COUNT(CASE WHEN status = 'running' THEN 1 END) as running,
121-
COUNT(CASE WHEN status = 'completed' THEN 1 END) as completed,
122-
COUNT(CASE WHEN status = 'failed' THEN 1 END) as failed,
123-
COUNT(CASE WHEN status = 'dead' THEN 1 END) as dead
124-
FROM hammerwork_jobs
125-
GROUP BY queue_name
126-
ORDER BY queue_name
126+
j.queue_name,
127+
COUNT(j.id) as total_jobs,
128+
COUNT(CASE WHEN j.status = 'pending' THEN 1 END) as pending,
129+
COUNT(CASE WHEN j.status = 'running' THEN 1 END) as running,
130+
COUNT(CASE WHEN j.status = 'completed' THEN 1 END) as completed,
131+
COUNT(CASE WHEN j.status = 'failed' THEN 1 END) as failed,
132+
COUNT(CASE WHEN j.status = 'dead' THEN 1 END) as dead,
133+
CASE WHEN p.queue_name IS NOT NULL THEN true ELSE false END as is_paused
134+
FROM hammerwork_jobs j
135+
LEFT JOIN hammerwork_queue_pause p ON j.queue_name = p.queue_name
136+
GROUP BY j.queue_name, p.queue_name
137+
ORDER BY j.queue_name
127138
"#;
128139

129140
let mut table = comfy_table::Table::new();
130141
table.set_header(vec![
131142
"Queue",
143+
"Status",
132144
"Total",
133145
"Pending",
134146
"Running",
@@ -148,9 +160,17 @@ async fn list_queues(pool: DatabasePool) -> Result<()> {
148160
let completed: i64 = row.try_get("completed")?;
149161
let failed: i64 = row.try_get("failed")?;
150162
let dead: i64 = row.try_get("dead")?;
163+
let is_paused: bool = row.try_get("is_paused")?;
164+
165+
let status = if is_paused {
166+
"⏸️ Paused"
167+
} else {
168+
"▶️ Active"
169+
};
151170

152171
table.add_row(vec![
153172
queue_name,
173+
status.to_string(),
154174
total.to_string(),
155175
pending.to_string(),
156176
running.to_string(),
@@ -170,9 +190,17 @@ async fn list_queues(pool: DatabasePool) -> Result<()> {
170190
let completed: i64 = row.try_get("completed")?;
171191
let failed: i64 = row.try_get("failed")?;
172192
let dead: i64 = row.try_get("dead")?;
193+
let is_paused: bool = row.try_get("is_paused")?;
194+
195+
let status = if is_paused {
196+
"⏸️ Paused"
197+
} else {
198+
"▶️ Active"
199+
};
173200

174201
table.add_row(vec![
175202
queue_name,
203+
status.to_string(),
176204
total.to_string(),
177205
pending.to_string(),
178206
running.to_string(),
@@ -379,30 +407,90 @@ async fn clear_queue(
379407
Ok(())
380408
}
381409

382-
async fn pause_queue(_pool: DatabasePool, queue: &str) -> Result<()> {
383-
// Note: This is a placeholder. In a real implementation, you might:
384-
// 1. Add a queue_paused table or column
385-
// 2. Update worker logic to respect paused queues
386-
// 3. Store pause state in Redis or similar
410+
async fn pause_queue(pool: DatabasePool, queue: &str) -> Result<()> {
411+
match pool {
412+
DatabasePool::Postgres(pg_pool) => {
413+
let result = sqlx::query(
414+
r#"
415+
INSERT INTO hammerwork_queue_pause (queue_name, paused_by, paused_at, created_at, updated_at)
416+
VALUES ($1, $2, NOW(), NOW(), NOW())
417+
ON CONFLICT (queue_name)
418+
DO UPDATE SET
419+
paused_by = EXCLUDED.paused_by,
420+
paused_at = NOW(),
421+
updated_at = NOW()
422+
"#,
423+
)
424+
.bind(queue)
425+
.bind("cli")
426+
.execute(&pg_pool)
427+
.await?;
428+
429+
if result.rows_affected() > 0 {
430+
println!("⏸️ Queue '{}' has been paused", queue);
431+
info!("Queue '{}' has been paused via CLI", queue);
432+
} else {
433+
println!("⚠️ Failed to pause queue '{}'", queue);
434+
}
435+
}
436+
DatabasePool::MySQL(mysql_pool) => {
437+
let result = sqlx::query(
438+
r#"
439+
INSERT INTO hammerwork_queue_pause (queue_name, paused_by, paused_at, created_at, updated_at)
440+
VALUES (?, ?, NOW(), NOW(), NOW())
441+
ON DUPLICATE KEY UPDATE
442+
paused_by = VALUES(paused_by),
443+
paused_at = NOW(),
444+
updated_at = NOW()
445+
"#,
446+
)
447+
.bind(queue)
448+
.bind("cli")
449+
.execute(&mysql_pool)
450+
.await?;
451+
452+
if result.rows_affected() > 0 {
453+
println!("⏸️ Queue '{}' has been paused", queue);
454+
info!("Queue '{}' has been paused via CLI", queue);
455+
} else {
456+
println!("⚠️ Failed to pause queue '{}'", queue);
457+
}
458+
}
459+
}
387460

388-
println!(
389-
"⏸️ Queue '{}' paused (placeholder - implement pause logic)",
390-
queue
391-
);
392-
info!("Queue '{}' has been paused", queue);
393461
Ok(())
394462
}
395463

396-
async fn resume_queue(_pool: DatabasePool, queue: &str) -> Result<()> {
397-
// Note: This is a placeholder. In a real implementation, you would:
398-
// 1. Remove from paused queues table/column
399-
// 2. Resume worker processing for this queue
464+
async fn resume_queue(pool: DatabasePool, queue: &str) -> Result<()> {
465+
match pool {
466+
DatabasePool::Postgres(pg_pool) => {
467+
let result = sqlx::query("DELETE FROM hammerwork_queue_pause WHERE queue_name = $1")
468+
.bind(queue)
469+
.execute(&pg_pool)
470+
.await?;
471+
472+
if result.rows_affected() > 0 {
473+
println!("▶️ Queue '{}' has been resumed", queue);
474+
info!("Queue '{}' has been resumed via CLI", queue);
475+
} else {
476+
println!("ℹ️ Queue '{}' was not paused", queue);
477+
}
478+
}
479+
DatabasePool::MySQL(mysql_pool) => {
480+
let result = sqlx::query("DELETE FROM hammerwork_queue_pause WHERE queue_name = ?")
481+
.bind(queue)
482+
.execute(&mysql_pool)
483+
.await?;
484+
485+
if result.rows_affected() > 0 {
486+
println!("▶️ Queue '{}' has been resumed", queue);
487+
info!("Queue '{}' has been resumed via CLI", queue);
488+
} else {
489+
println!("ℹ️ Queue '{}' was not paused", queue);
490+
}
491+
}
492+
}
400493

401-
println!(
402-
"▶️ Queue '{}' resumed (placeholder - implement resume logic)",
403-
queue
404-
);
405-
info!("Queue '{}' has been resumed", queue);
406494
Ok(())
407495
}
408496

@@ -573,3 +661,74 @@ async fn show_queue_health(pool: DatabasePool, queue: Option<String>) -> Result<
573661

574662
Ok(())
575663
}
664+
665+
async fn list_paused_queues(pool: DatabasePool) -> Result<()> {
666+
let query = r#"
667+
SELECT
668+
queue_name,
669+
paused_at,
670+
paused_by,
671+
reason
672+
FROM hammerwork_queue_pause
673+
ORDER BY paused_at DESC
674+
"#;
675+
676+
let mut table = comfy_table::Table::new();
677+
table.set_header(vec![
678+
"Queue Name",
679+
"Paused At",
680+
"Paused By",
681+
"Reason",
682+
]);
683+
684+
match pool {
685+
DatabasePool::Postgres(pg_pool) => {
686+
let rows = sqlx::query(query).fetch_all(&pg_pool).await?;
687+
688+
if rows.is_empty() {
689+
println!("✅ No paused queues found - all queues are active");
690+
return Ok(());
691+
}
692+
693+
for row in rows {
694+
let queue_name: String = row.try_get("queue_name")?;
695+
let paused_at: chrono::DateTime<chrono::Utc> = row.try_get("paused_at")?;
696+
let paused_by: Option<String> = row.try_get("paused_by")?;
697+
let reason: Option<String> = row.try_get("reason")?;
698+
699+
table.add_row(vec![
700+
queue_name,
701+
paused_at.format("%Y-%m-%d %H:%M:%S UTC").to_string(),
702+
paused_by.unwrap_or_else(|| "Unknown".to_string()),
703+
reason.unwrap_or_else(|| "-".to_string()),
704+
]);
705+
}
706+
}
707+
DatabasePool::MySQL(mysql_pool) => {
708+
let rows = sqlx::query(query).fetch_all(&mysql_pool).await?;
709+
710+
if rows.is_empty() {
711+
println!("✅ No paused queues found - all queues are active");
712+
return Ok(());
713+
}
714+
715+
for row in rows {
716+
let queue_name: String = row.try_get("queue_name")?;
717+
let paused_at: chrono::DateTime<chrono::Utc> = row.try_get("paused_at")?;
718+
let paused_by: Option<String> = row.try_get("paused_by")?;
719+
let reason: Option<String> = row.try_get("reason")?;
720+
721+
table.add_row(vec![
722+
queue_name,
723+
paused_at.format("%Y-%m-%d %H:%M:%S UTC").to_string(),
724+
paused_by.unwrap_or_else(|| "Unknown".to_string()),
725+
reason.unwrap_or_else(|| "-".to_string()),
726+
]);
727+
}
728+
}
729+
}
730+
731+
println!("⏸️ Paused Queues");
732+
println!("{}", table);
733+
Ok(())
734+
}

0 commit comments

Comments
 (0)