@@ -27,8 +27,8 @@ use datafusion_functions_json;
2727use delta_kernel:: arrow:: record_batch:: RecordBatch ;
2828use deltalake:: datafusion:: parquet:: file:: properties:: WriterProperties ;
2929use deltalake:: kernel:: transaction:: CommitProperties ;
30- use deltalake:: { DeltaOps , DeltaTable , DeltaTableBuilder } ;
3130use deltalake:: PartitionFilter ;
31+ use deltalake:: { DeltaOps , DeltaTable , DeltaTableBuilder } ;
3232use futures:: StreamExt ;
3333use serde:: { Deserialize , Serialize } ;
3434use sqlx:: { postgres:: PgPoolOptions , PgPool } ;
@@ -363,10 +363,16 @@ impl Database {
363363 Err ( e) => {
364364 retry_count += 1 ;
365365 if retry_count >= max_retries {
366- error ! ( "Failed to initialize shared Foyer cache after {} retries: {}. Continuing without cache." , max_retries, e) ;
366+ error ! (
367+ "Failed to initialize shared Foyer cache after {} retries: {}. Continuing without cache." ,
368+ max_retries, e
369+ ) ;
367370 break None ;
368371 }
369- warn ! ( "Failed to initialize shared Foyer cache (attempt {}/{}): {}. Retrying..." , retry_count, max_retries, e) ;
372+ warn ! (
373+ "Failed to initialize shared Foyer cache (attempt {}/{}): {}. Retrying..." ,
374+ retry_count, max_retries, e
375+ ) ;
370376 tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 100 ) ) . await ;
371377 }
372378 }
@@ -416,8 +422,7 @@ impl Database {
416422 let db = Arc :: new ( self . clone ( ) ) ;
417423
418424 // Light optimize job - every 5 minutes for small recent files
419- let light_optimize_schedule = env:: var ( "TIMEFUSION_LIGHT_OPTIMIZE_SCHEDULE" )
420- . unwrap_or_else ( |_| "0 */5 * * * *" . to_string ( ) ) ;
425+ let light_optimize_schedule = env:: var ( "TIMEFUSION_LIGHT_OPTIMIZE_SCHEDULE" ) . unwrap_or_else ( |_| "0 */5 * * * *" . to_string ( ) ) ;
421426
422427 if !light_optimize_schedule. is_empty ( ) {
423428 info ! ( "Light optimize job scheduled with cron expression: {}" , light_optimize_schedule) ;
@@ -431,12 +436,10 @@ impl Database {
431436 for ( ( project_id, table_name) , table) in db. project_configs . read ( ) . await . iter ( ) {
432437 match db. optimize_table_light ( table) . await {
433438 Ok ( _) => {
434- debug ! ( "Light optimize completed for project '{}' table '{}'" ,
435- project_id, table_name) ;
439+ info ! ( "Light optimize completed for project '{}' table '{}'" , project_id, table_name) ;
436440 }
437441 Err ( e) => {
438- error ! ( "Light optimize failed for project '{}' table '{}': {}" ,
439- project_id, table_name, e) ;
442+ error ! ( "Light optimize failed for project '{}' table '{}': {}" , project_id, table_name, e) ;
440443 }
441444 }
442445 }
@@ -453,7 +456,10 @@ impl Database {
453456 let optimize_schedule = env:: var ( "TIMEFUSION_OPTIMIZE_SCHEDULE" ) . unwrap_or_else ( |_| "0 */30 * * * *" . to_string ( ) ) ;
454457
455458 if !optimize_schedule. is_empty ( ) {
456- info ! ( "Optimize job scheduled with cron expression: {} (processes last 28 hours only)" , optimize_schedule) ;
459+ info ! (
460+ "Optimize job scheduled with cron expression: {} (processes last 28 hours only)" ,
461+ optimize_schedule
462+ ) ;
457463
458464 let optimize_job = Job :: new_async ( & optimize_schedule, {
459465 let db = db. clone ( ) ;
@@ -1308,8 +1314,6 @@ impl Database {
13081314 /// Targets files < 10MB from today's partition only
13091315 pub async fn optimize_table_light ( & self , table_ref : & Arc < RwLock < DeltaTable > > ) -> Result < ( ) > {
13101316 let start_time = std:: time:: Instant :: now ( ) ;
1311- info ! ( "Starting light Delta table optimization for small recent files" ) ;
1312-
13131317 // Get a clone of the table to avoid holding the lock during the operation
13141318 let table_clone = {
13151319 let table = table_ref. read ( ) . await ;
@@ -1322,11 +1326,9 @@ impl Database {
13221326 // Only optimize today's partition for light optimization
13231327 let today = Utc :: now ( ) . date_naive ( ) ;
13241328 info ! ( "Light optimizing files from date: {}" , today) ;
1325-
1329+
13261330 // Create partition filter for today only
1327- let partition_filters = vec ! [
1328- PartitionFilter :: try_from( ( "date" , "=" , today. to_string( ) . as_str( ) ) ) ?,
1329- ] ;
1331+ let partition_filters = vec ! [ PartitionFilter :: try_from( ( "date" , "=" , today. to_string( ) . as_str( ) ) ) ?] ;
13301332
13311333 let optimize_result = DeltaOps ( table_clone)
13321334 . optimize ( )
@@ -1342,9 +1344,7 @@ impl Database {
13421344 let duration = start_time. elapsed ( ) ;
13431345 info ! (
13441346 "Light optimization completed in {:?}: {} files removed, {} files added" ,
1345- duration,
1346- metrics. num_files_removed,
1347- metrics. num_files_added,
1347+ duration, metrics. num_files_removed, metrics. num_files_added,
13481348 ) ;
13491349
13501350 // Update the table reference with the optimized version
0 commit comments