@@ -217,7 +217,10 @@ async fn batch_enqueue(
217217 total_errors += 1 ;
218218 eprintln ! ( "❌ Line {}: Invalid JSON - {}" , line_num + 1 , e) ;
219219 if !continue_on_error {
220- return Err ( anyhow:: anyhow!( "JSON parsing failed at line {}" , line_num + 1 ) ) ;
220+ return Err ( anyhow:: anyhow!(
221+ "JSON parsing failed at line {}" ,
222+ line_num + 1
223+ ) ) ;
221224 }
222225 continue ;
223226 }
@@ -261,22 +264,32 @@ async fn batch_enqueue(
261264 println ! ( "Total errors: {}" , total_errors) ;
262265 }
263266
264- info ! ( "Batch enqueue completed: {} processed, {} errors" , total_processed, total_errors) ;
267+ info ! (
268+ "Batch enqueue completed: {} processed, {} errors" ,
269+ total_processed, total_errors
270+ ) ;
265271 Ok ( ( ) )
266272}
267273
268- async fn insert_single_job ( pool : & DatabasePool , queue : & str , payload : & Value , priority : & str ) -> Result < ( ) > {
274+ async fn insert_single_job (
275+ pool : & DatabasePool ,
276+ queue : & str ,
277+ payload : & Value ,
278+ priority : & str ,
279+ ) -> Result < ( ) > {
269280 let job_id = uuid:: Uuid :: new_v4 ( ) . to_string ( ) ;
270281 let now = chrono:: Utc :: now ( ) ;
271282
272283 match pool {
273284 DatabasePool :: Postgres ( pg_pool) => {
274- sqlx:: query ( r#"
285+ sqlx:: query (
286+ r#"
275287 INSERT INTO hammerwork_jobs (
276288 id, queue_name, payload, status, priority, attempts, max_attempts,
277289 created_at, scheduled_at
278290 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
279- "# )
291+ "# ,
292+ )
280293 . bind ( & job_id)
281294 . bind ( queue)
282295 . bind ( payload)
@@ -290,12 +303,14 @@ async fn insert_single_job(pool: &DatabasePool, queue: &str, payload: &Value, pr
290303 . await ?;
291304 }
292305 DatabasePool :: MySQL ( mysql_pool) => {
293- sqlx:: query ( r#"
306+ sqlx:: query (
307+ r#"
294308 INSERT INTO hammerwork_jobs (
295309 id, queue_name, payload, status, priority, attempts, max_attempts,
296310 created_at, scheduled_at
297311 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
298- "# )
312+ "# ,
313+ )
299314 . bind ( & job_id)
300315 . bind ( queue)
301316 . bind ( payload)
@@ -309,7 +324,7 @@ async fn insert_single_job(pool: &DatabasePool, queue: &str, payload: &Value, pr
309324 . await ?;
310325 }
311326 }
312-
327+
313328 Ok ( ( ) )
314329}
315330
@@ -323,27 +338,36 @@ async fn batch_retry(
323338 dry_run : bool ,
324339) -> Result < ( ) > {
325340 if !confirm && !dry_run {
326- println ! ( "⚠️ This will retry multiple jobs. Use --confirm to proceed or --dry-run to preview." ) ;
341+ println ! (
342+ "⚠️ This will retry multiple jobs. Use --confirm to proceed or --dry-run to preview."
343+ ) ;
327344 return Ok ( ( ) ) ;
328345 }
329346
330347 // Build filter conditions
331348 let mut conditions = Vec :: new ( ) ;
332-
349+
333350 if let Some ( queue_name) = & queue {
334351 conditions. push ( format ! ( "queue_name = '{}'" , queue_name) ) ;
335352 }
336-
353+
337354 match status. as_deref ( ) {
338355 Some ( "failed" ) => conditions. push ( "status = 'Failed'" . to_string ( ) ) ,
339356 Some ( "dead" ) => conditions. push ( "status = 'Dead'" . to_string ( ) ) ,
340357 None => conditions. push ( "status IN ('Failed', 'Dead')" . to_string ( ) ) ,
341- _ => return Err ( anyhow:: anyhow!( "Invalid status filter. Use 'failed' or 'dead'" ) ) ,
358+ _ => {
359+ return Err ( anyhow:: anyhow!(
360+ "Invalid status filter. Use 'failed' or 'dead'"
361+ ) ) ;
362+ }
342363 }
343364
344365 if let Some ( hours) = failed_since_hours {
345366 let cutoff = chrono:: Utc :: now ( ) - chrono:: Duration :: hours ( hours as i64 ) ;
346- conditions. push ( format ! ( "failed_at > '{}'" , cutoff. format( "%Y-%m-%d %H:%M:%S" ) ) ) ;
367+ conditions. push ( format ! (
368+ "failed_at > '{}'" ,
369+ cutoff. format( "%Y-%m-%d %H:%M:%S" )
370+ ) ) ;
347371 }
348372
349373 if max_attempts_reached {
@@ -357,7 +381,10 @@ async fn batch_retry(
357381 } ;
358382
359383 // Count jobs to retry
360- let count_query = format ! ( "SELECT COUNT(*) as count FROM hammerwork_jobs WHERE {}" , where_clause) ;
384+ let count_query = format ! (
385+ "SELECT COUNT(*) as count FROM hammerwork_jobs WHERE {}" ,
386+ where_clause
387+ ) ;
361388 let job_count = execute_count_query ( & pool, & count_query) . await ?;
362389
363390 println ! ( "🔄 Batch Retry Analysis" ) ;
@@ -406,27 +433,36 @@ async fn batch_cancel(
406433 dry_run : bool ,
407434) -> Result < ( ) > {
408435 if !confirm && !dry_run {
409- println ! ( "⚠️ This will cancel multiple jobs. Use --confirm to proceed or --dry-run to preview." ) ;
436+ println ! (
437+ "⚠️ This will cancel multiple jobs. Use --confirm to proceed or --dry-run to preview."
438+ ) ;
410439 return Ok ( ( ) ) ;
411440 }
412441
413442 // Build filter conditions
414443 let mut conditions = Vec :: new ( ) ;
415-
444+
416445 if let Some ( queue_name) = & queue {
417446 conditions. push ( format ! ( "queue_name = '{}'" , queue_name) ) ;
418447 }
419-
448+
420449 match status. as_deref ( ) {
421450 Some ( "pending" ) => conditions. push ( "status = 'Pending'" . to_string ( ) ) ,
422451 Some ( "running" ) => conditions. push ( "status = 'Running'" . to_string ( ) ) ,
423452 None => conditions. push ( "status IN ('Pending', 'Running')" . to_string ( ) ) ,
424- _ => return Err ( anyhow:: anyhow!( "Invalid status filter. Use 'pending' or 'running'" ) ) ,
453+ _ => {
454+ return Err ( anyhow:: anyhow!(
455+ "Invalid status filter. Use 'pending' or 'running'"
456+ ) ) ;
457+ }
425458 }
426459
427460 if let Some ( hours) = older_than_hours {
428461 let cutoff = chrono:: Utc :: now ( ) - chrono:: Duration :: hours ( hours as i64 ) ;
429- conditions. push ( format ! ( "created_at < '{}'" , cutoff. format( "%Y-%m-%d %H:%M:%S" ) ) ) ;
462+ conditions. push ( format ! (
463+ "created_at < '{}'" ,
464+ cutoff. format( "%Y-%m-%d %H:%M:%S" )
465+ ) ) ;
430466 }
431467
432468 let where_clause = if conditions. is_empty ( ) {
@@ -436,7 +472,10 @@ async fn batch_cancel(
436472 } ;
437473
438474 // Count jobs to cancel
439- let count_query = format ! ( "SELECT COUNT(*) as count FROM hammerwork_jobs WHERE {}" , where_clause) ;
475+ let count_query = format ! (
476+ "SELECT COUNT(*) as count FROM hammerwork_jobs WHERE {}" ,
477+ where_clause
478+ ) ;
440479 let job_count = execute_count_query ( & pool, & count_query) . await ?;
441480
442481 println ! ( "🚫 Batch Cancel Analysis" ) ;
@@ -463,4 +502,4 @@ async fn batch_cancel(
463502 println ! ( " Cancelled {} jobs" , deleted) ;
464503
465504 Ok ( ( ) )
466- }
505+ }
0 commit comments