Skip to content

Commit b026360

Browse files
CodingAnarchyclaude
andcommitted
fix: Ensure status field consistency and performance improvements
- Ensured all job status fields are consistently handled as strings across PostgreSQL and MySQL - Fixed BatchStatus handling in MySQL get_batch_status to use string matching instead of JSON deserialization - Replaced inefficient serde_json::to_string() calls with direct .as_str() method for DependencyStatus serialization - Added backward compatibility for both quoted (old format) and unquoted (new format) status values in database deserialization - Updated error handling to use appropriate HammerworkError variants instead of deprecated Other variant - Improved performance by eliminating unnecessary JSON serialization/deserialization in status field operations - Enhanced consistency between PostgreSQL and MySQL implementations for status field handling - Updated test imports to fix compilation errors in batch and archive tests 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
1 parent 93bb29c commit b026360

File tree

6 files changed

+182
-24
lines changed

6 files changed

+182
-24
lines changed

CHANGELOG.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
## [1.15.5] - 2025-08-29
11+
12+
### Fixed
13+
- **🔧 Status Field Consistency and Performance Improvements**
14+
- Ensured all job status fields are consistently handled as strings across PostgreSQL and MySQL
15+
- Fixed `BatchStatus` handling in MySQL `get_batch_status` to use string matching instead of JSON deserialization
16+
- Replaced inefficient `serde_json::to_string()` calls with direct `.as_str()` method for `DependencyStatus` serialization
17+
- Added backward compatibility for both quoted (old format) and unquoted (new format) status values in database deserialization
18+
- Updated error handling to use appropriate `HammerworkError` variants instead of deprecated `Other` variant
19+
- Improved performance by eliminating unnecessary JSON serialization/deserialization in status field operations
20+
- Enhanced consistency between PostgreSQL and MySQL implementations for status field handling
21+
1022
## [1.15.4] - 2025-08-26
1123

1224
### Fixed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ members = [
99
resolver = "2"
1010

1111
[workspace.package]
12-
version = "1.15.4"
12+
version = "1.15.5"
1313
edition = "2024"
1414
license = "MIT"
1515
repository = "https://github.com/CodingAnarchy/hammerwork"
@@ -19,7 +19,7 @@ documentation = "https://docs.rs/hammerwork"
1919
rust-version = "1.86"
2020

2121
[workspace.dependencies]
22-
hammerwork = { version = "1.15.1", path = "." }
22+
hammerwork = { version = "1.15.5", path = "." }
2323
tokio = { version = "1.0", features = ["full"] }
2424
sqlx = { version = "0.8", features = ["runtime-tokio-rustls", "chrono", "uuid", "json"] }
2525
chrono = { version = "0.4", features = ["serde"] }

src/queue/mysql.rs

Lines changed: 72 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,25 @@ impl JobRow {
8585
id: uuid::Uuid::parse_str(&self.id)?,
8686
queue_name: self.queue_name,
8787
payload: self.payload,
88-
status: serde_json::from_str(&self.status)?,
88+
status: {
89+
// Handle both quoted (old format) and unquoted (new format) status values
90+
let cleaned_str = self.status.trim_matches('"');
91+
match cleaned_str {
92+
"Pending" => JobStatus::Pending,
93+
"Running" => JobStatus::Running,
94+
"Completed" => JobStatus::Completed,
95+
"Failed" => JobStatus::Failed,
96+
"Dead" => JobStatus::Dead,
97+
"TimedOut" => JobStatus::TimedOut,
98+
"Retrying" => JobStatus::Retrying,
99+
"Archived" => JobStatus::Archived,
100+
_ => {
101+
return Err(crate::error::HammerworkError::Processing(
102+
format!("Unknown job status: {}", cleaned_str),
103+
));
104+
}
105+
}
106+
},
89107
priority: JobPriority::from_i32(self.priority).unwrap_or(JobPriority::Normal),
90108
attempts: self.attempts,
91109
max_attempts: self.max_attempts,
@@ -444,7 +462,21 @@ impl DeadJobRow {
444462
id: uuid::Uuid::parse_str(&self.id)?,
445463
queue_name: self.queue_name,
446464
payload: self.payload,
447-
status: serde_json::from_str(&self.status).unwrap_or(JobStatus::Dead),
465+
status: {
466+
// Handle both quoted (old format) and unquoted (new format) status values
467+
let cleaned_str = self.status.trim_matches('"');
468+
match cleaned_str {
469+
"Pending" => JobStatus::Pending,
470+
"Running" => JobStatus::Running,
471+
"Completed" => JobStatus::Completed,
472+
"Failed" => JobStatus::Failed,
473+
"Dead" => JobStatus::Dead,
474+
"TimedOut" => JobStatus::TimedOut,
475+
"Retrying" => JobStatus::Retrying,
476+
"Archived" => JobStatus::Archived,
477+
_ => JobStatus::Dead, // Default fallback for unknown status
478+
}
479+
},
448480
priority: JobPriority::from_i32(self.priority).unwrap_or(JobPriority::Normal),
449481
attempts: self.attempts,
450482
max_attempts: self.max_attempts,
@@ -769,7 +801,6 @@ impl DatabaseQueue for crate::queue::JobQueue<MySql> {
769801
}
770802

771803
async fn enqueue_batch(&self, batch: crate::batch::JobBatch) -> Result<crate::batch::BatchId> {
772-
use crate::batch::BatchStatus;
773804

774805
// Validate the batch first
775806
batch.validate()?;
@@ -790,7 +821,7 @@ impl DatabaseQueue for crate::queue::JobQueue<MySql> {
790821
.bind(0i32) // completed_jobs
791822
.bind(0i32) // failed_jobs
792823
.bind(batch.jobs.len() as i32) // pending_jobs
793-
.bind(serde_json::to_string(&BatchStatus::Pending)?)
824+
.bind(crate::batch::BatchStatus::Pending)
794825
.bind(serde_json::to_string(&batch.failure_mode)?)
795826
.bind(batch.created_at)
796827
.bind(serde_json::to_value(&batch.metadata)?)
@@ -876,7 +907,7 @@ impl DatabaseQueue for crate::queue::JobQueue<MySql> {
876907
let completed_jobs: i32 = batch_row.get("completed_jobs");
877908
let failed_jobs: i32 = batch_row.get("failed_jobs");
878909
let pending_jobs: i32 = batch_row.get("pending_jobs");
879-
let status: String = batch_row.get("status");
910+
let status_str: String = batch_row.get("status");
880911
let created_at: DateTime<Utc> = batch_row.get("created_at");
881912
let completed_at: Option<DateTime<Utc>> = batch_row.get("completed_at");
882913
let error_summary: Option<String> = batch_row.get("error_summary");
@@ -894,13 +925,30 @@ impl DatabaseQueue for crate::queue::JobQueue<MySql> {
894925
.filter_map(|(id_str, error)| uuid::Uuid::parse_str(&id_str).ok().map(|id| (id, error)))
895926
.collect();
896927

928+
let status = {
929+
// Handle both quoted (old format) and unquoted (new format) status values
930+
let cleaned_str = status_str.trim_matches('"');
931+
match cleaned_str {
932+
"Pending" => crate::batch::BatchStatus::Pending,
933+
"Processing" => crate::batch::BatchStatus::Processing,
934+
"Completed" => crate::batch::BatchStatus::Completed,
935+
"PartiallyFailed" => crate::batch::BatchStatus::PartiallyFailed,
936+
"Failed" => crate::batch::BatchStatus::Failed,
937+
_ => {
938+
return Err(crate::error::HammerworkError::Batch {
939+
message: format!("Unknown batch status: {}", cleaned_str),
940+
});
941+
}
942+
}
943+
};
944+
897945
Ok(BatchResult {
898946
batch_id,
899947
total_jobs: total_jobs as u32,
900948
completed_jobs: completed_jobs as u32,
901949
failed_jobs: failed_jobs as u32,
902950
pending_jobs: pending_jobs as u32,
903-
status: serde_json::from_str(&status)?,
951+
status,
904952
created_at,
905953
completed_at,
906954
error_summary,
@@ -2041,7 +2089,7 @@ impl DatabaseQueue for crate::queue::JobQueue<MySql> {
20412089
} else {
20422090
Some(serde_json::to_value(&job.depends_on)?)
20432091
})
2044-
.bind(serde_json::to_string(&job.dependency_status)?)
2092+
.bind(job.dependency_status.as_str())
20452093
.bind(serde_json::to_value(&job.result_config)?)
20462094
.bind(job.trace_id)
20472095
.bind(job.correlation_id)
@@ -2255,8 +2303,22 @@ impl DatabaseQueue for crate::queue::JobQueue<MySql> {
22552303
archived_jobs.push(ArchivedJob {
22562304
id: uuid::Uuid::parse_str(&row.get::<String, _>("id"))?,
22572305
queue_name: row.get("queue_name"),
2258-
status: serde_json::from_str::<JobStatus>(&row.get::<String, _>("status"))
2259-
.unwrap_or(JobStatus::Dead),
2306+
status: {
2307+
// Handle both quoted (old format) and unquoted (new format) status values
2308+
let status_str: String = row.get("status");
2309+
let cleaned_str = status_str.trim_matches('"');
2310+
match cleaned_str {
2311+
"Pending" => JobStatus::Pending,
2312+
"Running" => JobStatus::Running,
2313+
"Completed" => JobStatus::Completed,
2314+
"Failed" => JobStatus::Failed,
2315+
"Dead" => JobStatus::Dead,
2316+
"TimedOut" => JobStatus::TimedOut,
2317+
"Retrying" => JobStatus::Retrying,
2318+
"Archived" => JobStatus::Archived,
2319+
_ => JobStatus::Dead, // Default fallback for unknown status
2320+
}
2321+
},
22602322
created_at: row.get("created_at"),
22612323
archived_at: row.get("archived_at"),
22622324
archival_reason: serde_json::from_str(&row.get::<String, _>("archival_reason"))
@@ -2464,7 +2526,7 @@ impl crate::queue::JobQueue<sqlx::MySql> {
24642526
} else {
24652527
Some(serde_json::to_value(&job.dependents)?)
24662528
})
2467-
.bind(serde_json::to_string(&job.dependency_status)?)
2529+
.bind(job.dependency_status.as_str())
24682530
.bind(job.workflow_id.map(|id| id.to_string()))
24692531
.bind(job.workflow_name)
24702532
.bind(job.trace_id)

src/queue/postgres.rs

Lines changed: 91 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,25 @@ impl JobRow {
8585
id: self.id,
8686
queue_name: self.queue_name,
8787
payload: self.payload,
88-
status: serde_json::from_str(&self.status)?,
88+
status: {
89+
// Handle both quoted (old format) and unquoted (new format) status values
90+
let cleaned_str = self.status.trim_matches('"');
91+
match cleaned_str {
92+
"Pending" => JobStatus::Pending,
93+
"Running" => JobStatus::Running,
94+
"Completed" => JobStatus::Completed,
95+
"Failed" => JobStatus::Failed,
96+
"Dead" => JobStatus::Dead,
97+
"TimedOut" => JobStatus::TimedOut,
98+
"Retrying" => JobStatus::Retrying,
99+
"Archived" => JobStatus::Archived,
100+
_ => {
101+
return Err(crate::error::HammerworkError::Processing(
102+
format!("Unknown job status: {}", cleaned_str),
103+
));
104+
}
105+
}
106+
},
89107
priority: JobPriority::from_i32(self.priority).unwrap_or(JobPriority::Normal),
90108
attempts: self.attempts,
91109
max_attempts: self.max_attempts,
@@ -419,7 +437,21 @@ impl DeadJobRow {
419437
id: self.id,
420438
queue_name: self.queue_name,
421439
payload: self.payload,
422-
status: serde_json::from_str(&self.status).unwrap_or(JobStatus::Dead),
440+
status: {
441+
// Handle both quoted (old format) and unquoted (new format) status values
442+
let cleaned_str = self.status.trim_matches('"');
443+
match cleaned_str {
444+
"Pending" => JobStatus::Pending,
445+
"Running" => JobStatus::Running,
446+
"Completed" => JobStatus::Completed,
447+
"Failed" => JobStatus::Failed,
448+
"Dead" => JobStatus::Dead,
449+
"TimedOut" => JobStatus::TimedOut,
450+
"Retrying" => JobStatus::Retrying,
451+
"Archived" => JobStatus::Archived,
452+
_ => JobStatus::Dead, // Default fallback for unknown status
453+
}
454+
},
423455
priority: JobPriority::from_i32(self.priority).unwrap_or(JobPriority::Normal),
424456
attempts: self.attempts,
425457
max_attempts: self.max_attempts,
@@ -586,7 +618,25 @@ impl DatabaseQueue for crate::queue::JobQueue<Postgres> {
586618
id,
587619
queue_name,
588620
payload,
589-
status: serde_json::from_str(&status)?,
621+
status: {
622+
// Handle both quoted (old format) and unquoted (new format) status values
623+
let cleaned_str = status.trim_matches('"');
624+
match cleaned_str {
625+
"Pending" => JobStatus::Pending,
626+
"Running" => JobStatus::Running,
627+
"Completed" => JobStatus::Completed,
628+
"Failed" => JobStatus::Failed,
629+
"Dead" => JobStatus::Dead,
630+
"TimedOut" => JobStatus::TimedOut,
631+
"Retrying" => JobStatus::Retrying,
632+
"Archived" => JobStatus::Archived,
633+
_ => {
634+
return Err(crate::error::HammerworkError::Processing(
635+
format!("Unknown job status: {}", cleaned_str),
636+
));
637+
}
638+
}
639+
},
590640
priority: JobPriority::from_i32(priority).unwrap_or(JobPriority::Normal),
591641
attempts,
592642
max_attempts,
@@ -766,7 +816,25 @@ impl DatabaseQueue for crate::queue::JobQueue<Postgres> {
766816
id,
767817
queue_name,
768818
payload,
769-
status: serde_json::from_str(&status)?,
819+
status: {
820+
// Handle both quoted (old format) and unquoted (new format) status values
821+
let cleaned_str = status.trim_matches('"');
822+
match cleaned_str {
823+
"Pending" => JobStatus::Pending,
824+
"Running" => JobStatus::Running,
825+
"Completed" => JobStatus::Completed,
826+
"Failed" => JobStatus::Failed,
827+
"Dead" => JobStatus::Dead,
828+
"TimedOut" => JobStatus::TimedOut,
829+
"Retrying" => JobStatus::Retrying,
830+
"Archived" => JobStatus::Archived,
831+
_ => {
832+
return Err(crate::error::HammerworkError::Processing(
833+
format!("Unknown job status: {}", cleaned_str),
834+
));
835+
}
836+
}
837+
},
770838
priority: JobPriority::from_i32(priority).unwrap_or(JobPriority::Normal),
771839
attempts,
772840
max_attempts,
@@ -897,7 +965,7 @@ impl DatabaseQueue for crate::queue::JobQueue<Postgres> {
897965
.bind(0i32) // completed_jobs
898966
.bind(0i32) // failed_jobs
899967
.bind(batch.jobs.len() as i32) // pending_jobs
900-
.bind(serde_json::to_string(&BatchStatus::Pending)?)
968+
.bind(BatchStatus::Pending)
901969
.bind(serde_json::to_string(&batch.failure_mode)?)
902970
.bind(batch.created_at)
903971
.bind(serde_json::to_value(&batch.metadata)?)
@@ -2191,7 +2259,7 @@ impl DatabaseQueue for crate::queue::JobQueue<Postgres> {
21912259
.bind(job.timezone)
21922260
.bind(job.batch_id)
21932261
.bind(&job.depends_on)
2194-
.bind(serde_json::to_string(&job.dependency_status)?)
2262+
.bind(job.dependency_status.as_str())
21952263
.bind(serde_json::to_value(&job.result_config)?)
21962264
.bind(job.trace_id)
21972265
.bind(job.correlation_id)
@@ -2416,8 +2484,22 @@ impl DatabaseQueue for crate::queue::JobQueue<Postgres> {
24162484
archived_jobs.push(ArchivedJob {
24172485
id: row.get("id"),
24182486
queue_name: row.get("queue_name"),
2419-
status: serde_json::from_str::<JobStatus>(&row.get::<String, _>("status"))
2420-
.unwrap_or(JobStatus::Dead),
2487+
status: {
2488+
// Handle both quoted (old format) and unquoted (new format) status values
2489+
let status_str: String = row.get("status");
2490+
let cleaned_str = status_str.trim_matches('"');
2491+
match cleaned_str {
2492+
"Pending" => JobStatus::Pending,
2493+
"Running" => JobStatus::Running,
2494+
"Completed" => JobStatus::Completed,
2495+
"Failed" => JobStatus::Failed,
2496+
"Dead" => JobStatus::Dead,
2497+
"TimedOut" => JobStatus::TimedOut,
2498+
"Retrying" => JobStatus::Retrying,
2499+
"Archived" => JobStatus::Archived,
2500+
_ => JobStatus::Dead, // Default fallback for unknown status
2501+
}
2502+
},
24212503
created_at: row.get("created_at"),
24222504
archived_at: row.get("archived_at"),
24232505
archival_reason: serde_json::from_str(&row.get::<String, _>("archival_reason"))
@@ -2618,7 +2700,7 @@ impl crate::queue::JobQueue<Postgres> {
26182700
.bind(job.result_config.max_size_bytes.map(|s| s as i64))
26192701
.bind(&job.depends_on)
26202702
.bind(&job.dependents)
2621-
.bind(serde_json::to_string(&job.dependency_status)?)
2703+
.bind(job.dependency_status.as_str())
26222704
.bind(job.workflow_id)
26232705
.bind(job.workflow_name)
26242706
.bind(job.trace_id)

tests/batch_tests.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
mod test_utils;
22

33
use hammerwork::{
4-
Job,
5-
batch::{JobBatch, PartialFailureMode},
4+
Job, JobStatus,
5+
batch::{BatchStatus, JobBatch, PartialFailureMode},
6+
queue::DatabaseQueue,
67
};
78
use serde_json::json;
89
// use std::sync::Arc;

tests/comprehensive_archive_tests.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
mod test_utils;
22

33
use chrono::Utc;
4-
use hammerwork::archive::{ArchivalReason, ArchiveEvent};
4+
use hammerwork::{Job, queue::DatabaseQueue, archive::{ArchivalReason, ArchiveEvent}};
5+
use serde_json::json;
56
use uuid::Uuid;
67

78
/// Test ArchiveEvent serialization and deserialization

0 commit comments

Comments
 (0)