@@ -4,6 +4,7 @@ use crate::statistics::DeltaStatisticsExtractor;
44use anyhow:: Result ;
55use arrow_schema:: SchemaRef ;
66use async_trait:: async_trait;
7+ use chrono:: Utc ;
78use datafusion:: arrow:: array:: { Array , AsArray } ;
89use datafusion:: common:: not_impl_err;
910use datafusion:: common:: stats:: Precision ;
@@ -27,6 +28,7 @@ use delta_kernel::arrow::record_batch::RecordBatch;
2728use deltalake:: datafusion:: parquet:: file:: properties:: WriterProperties ;
2829use deltalake:: kernel:: transaction:: CommitProperties ;
2930use deltalake:: { DeltaOps , DeltaTable , DeltaTableBuilder } ;
31+ use deltalake:: PartitionFilter ;
3032use futures:: StreamExt ;
3133use serde:: { Deserialize , Serialize } ;
3234use sqlx:: { postgres:: PgPoolOptions , PgPool } ;
@@ -413,11 +415,45 @@ impl Database {
413415 let scheduler = JobScheduler :: new ( ) . await ?;
414416 let db = Arc :: new ( self . clone ( ) ) ;
415417
418+ // Light optimize job - every 5 minutes for small recent files
419+ let light_optimize_schedule = env:: var ( "TIMEFUSION_LIGHT_OPTIMIZE_SCHEDULE" )
420+ . unwrap_or_else ( |_| "0 */5 * * * *" . to_string ( ) ) ;
421+
422+ if !light_optimize_schedule. is_empty ( ) {
423+ info ! ( "Light optimize job scheduled with cron expression: {}" , light_optimize_schedule) ;
424+
425+ let light_optimize_job = Job :: new_async ( & light_optimize_schedule, {
426+ let db = db. clone ( ) ;
427+ move |_, _| {
428+ let db = db. clone ( ) ;
429+ Box :: pin ( async move {
430+ info ! ( "Running scheduled light optimize on recent small files" ) ;
431+ for ( ( project_id, table_name) , table) in db. project_configs . read ( ) . await . iter ( ) {
432+ match db. optimize_table_light ( table) . await {
433+ Ok ( _) => {
434+ debug ! ( "Light optimize completed for project '{}' table '{}'" ,
435+ project_id, table_name) ;
436+ }
437+ Err ( e) => {
438+ error ! ( "Light optimize failed for project '{}' table '{}': {}" ,
439+ project_id, table_name, e) ;
440+ }
441+ }
442+ }
443+ } )
444+ }
445+ } ) ?;
446+
447+ scheduler. add ( light_optimize_job) . await ?;
448+ } else {
449+ info ! ( "Light optimize job scheduling skipped - empty schedule" ) ;
450+ }
451+
416452 // Optimize job - configurable schedule (default: every 30mins)
417453 let optimize_schedule = env:: var ( "TIMEFUSION_OPTIMIZE_SCHEDULE" ) . unwrap_or_else ( |_| "0 */30 * * * *" . to_string ( ) ) ;
418454
419455 if !optimize_schedule. is_empty ( ) {
420- info ! ( "Optimize job scheduled with cron expression: {}" , optimize_schedule) ;
456+ info ! ( "Optimize job scheduled with cron expression: {} (processes last 28 hours only) " , optimize_schedule) ;
421457
422458 let optimize_job = Job :: new_async ( & optimize_schedule, {
423459 let db = db. clone ( ) ;
@@ -1197,7 +1233,7 @@ impl Database {
11971233 pub async fn optimize_table ( & self , table_ref : & Arc < RwLock < DeltaTable > > , _target_size : Option < i64 > ) -> Result < ( ) > {
11981234 // Log the start of the optimization operation
11991235 let start_time = std:: time:: Instant :: now ( ) ;
1200- info ! ( "Starting Delta table optimization with Z-ordering" ) ;
1236+ info ! ( "Starting Delta table optimization with Z-ordering (last 28 hours only) " ) ;
12011237
12021238 // Get a clone of the table to avoid holding the lock during the operation
12031239 let table_clone = {
@@ -1211,12 +1247,23 @@ impl Database {
12111247 . parse :: < i64 > ( )
12121248 . unwrap_or ( DEFAULT_OPTIMIZE_TARGET_SIZE ) ;
12131249
1250+ // Calculate dates for filtering - last 2 days (today and yesterday)
1251+ let today = Utc :: now ( ) . date_naive ( ) ;
1252+ let yesterday = ( Utc :: now ( ) - chrono:: Duration :: days ( 1 ) ) . date_naive ( ) ;
1253+ info ! ( "Optimizing files from dates: {} and {}" , yesterday, today) ;
1254+
1255+ // Create partition filters for the last 2 days
1256+ let partition_filters = vec ! [
1257+ PartitionFilter :: try_from( ( "date" , "=" , today. to_string( ) . as_str( ) ) ) ?,
1258+ PartitionFilter :: try_from( ( "date" , "=" , yesterday. to_string( ) . as_str( ) ) ) ?,
1259+ ] ;
1260+
12141261 // Run optimize operation with Z-order on the timestamp and id columns
12151262 let writer_properties = Self :: create_writer_properties ( ) ;
12161263
1217- // Note: Z-order functionality is achieved through sorting_columns in writer_properties
12181264 let optimize_result = DeltaOps ( table_clone)
12191265 . optimize ( )
1266+ . with_filters ( & partition_filters)
12201267 . with_type ( deltalake:: operations:: optimize:: OptimizeType :: ZOrder (
12211268 get_default_schema ( ) . z_order_columns . clone ( ) ,
12221269 ) )
@@ -1257,6 +1304,62 @@ impl Database {
12571304 }
12581305 }
12591306
1307+ /// Light optimization for small recent files
1308+ /// Targets files < 10MB from today's partition only
1309+ pub async fn optimize_table_light ( & self , table_ref : & Arc < RwLock < DeltaTable > > ) -> Result < ( ) > {
1310+ let start_time = std:: time:: Instant :: now ( ) ;
1311+ info ! ( "Starting light Delta table optimization for small recent files" ) ;
1312+
1313+ // Get a clone of the table to avoid holding the lock during the operation
1314+ let table_clone = {
1315+ let table = table_ref. read ( ) . await ;
1316+ table. clone ( )
1317+ } ;
1318+
1319+ // Target 64MB files for quick compaction of small files
1320+ let target_size = 67_108_864 ; // 64MB
1321+
1322+ // Only optimize today's partition for light optimization
1323+ let today = Utc :: now ( ) . date_naive ( ) ;
1324+ info ! ( "Light optimizing files from date: {}" , today) ;
1325+
1326+ // Create partition filter for today only
1327+ let partition_filters = vec ! [
1328+ PartitionFilter :: try_from( ( "date" , "=" , today. to_string( ) . as_str( ) ) ) ?,
1329+ ] ;
1330+
1331+ let optimize_result = DeltaOps ( table_clone)
1332+ . optimize ( )
1333+ . with_filters ( & partition_filters)
1334+ . with_type ( deltalake:: operations:: optimize:: OptimizeType :: Compact )
1335+ . with_target_size ( target_size)
1336+ . with_writer_properties ( Self :: create_writer_properties ( ) )
1337+ . with_min_commit_interval ( tokio:: time:: Duration :: from_secs ( 60 ) ) // 1 minute min interval
1338+ . await ;
1339+
1340+ match optimize_result {
1341+ Ok ( ( new_table, metrics) ) => {
1342+ let duration = start_time. elapsed ( ) ;
1343+ info ! (
1344+ "Light optimization completed in {:?}: {} files removed, {} files added" ,
1345+ duration,
1346+ metrics. num_files_removed,
1347+ metrics. num_files_added,
1348+ ) ;
1349+
1350+ // Update the table reference with the optimized version
1351+ let mut table = table_ref. write ( ) . await ;
1352+ * table = new_table;
1353+
1354+ Ok ( ( ) )
1355+ }
1356+ Err ( e) => {
1357+ error ! ( "Light optimization operation failed: {}" , e) ;
1358+ Err ( anyhow:: anyhow!( "Light table optimization failed: {}" , e) )
1359+ }
1360+ }
1361+ }
1362+
12601363 /// Vacuum the Delta table to clean up old files that are no longer needed
12611364 /// This reduces storage costs and improves query performance
12621365 async fn vacuum_table ( & self , table_ref : & Arc < RwLock < DeltaTable > > , retention_hours : u64 ) {
0 commit comments