@@ -31,6 +31,7 @@ use futures::{StreamExt, TryStreamExt};
3131use object_store:: Error ;
3232use object_store:: { path:: Path , ObjectStore } ;
3333use serde:: Serialize ;
34+ use tracing:: log:: * ;
3435
3536use super :: { CustomExecuteHandler , Operation } ;
3637use crate :: errors:: { DeltaResult , DeltaTableError } ;
@@ -76,6 +77,20 @@ pub trait Clock: Debug + Send + Sync {
7677 fn current_timestamp_millis ( & self ) -> i64 ;
7778}
7879
80+ /// Type of Vacuum operation to perform
81+ #[ derive( Debug , Default , Clone , PartialEq ) ]
82+ pub enum VacuumMode {
83+ /// The `lite` mode will only remove files which are referenced in the `_delta_log` associated
84+ /// with `remove` action
85+ #[ default]
86+ Lite ,
87+ /// A `full` mode vacuum will remove _all_ data files no longer actively referenced in the
88+ /// `_delta_log` table. For example, if parquet files exist in the table directory but are no
89+ /// longer mentioned as `add` actions in the transaction log, then this mode will scan storage
90+ /// and remove those files.
91+ Full ,
92+ }
93+
7994/// Vacuum a Delta table with the given options
8095/// See this module's documentation for more information
8196pub struct VacuumBuilder {
@@ -89,6 +104,8 @@ pub struct VacuumBuilder {
89104 enforce_retention_duration : bool ,
90105 /// Don't delete the files. Just determine which files can be deleted
91106 dry_run : bool ,
107+ /// Mode of vacuum that should be run
108+ mode : VacuumMode ,
92109 /// Override the source of time
93110 clock : Option < Arc < dyn Clock > > ,
94111 /// Additional information to add to the commit
@@ -144,6 +161,7 @@ impl VacuumBuilder {
144161 retention_period : None ,
145162 enforce_retention_duration : true ,
146163 dry_run : false ,
164+ mode : VacuumMode :: Lite ,
147165 clock : None ,
148166 commit_properties : CommitProperties :: default ( ) ,
149167 custom_execute_handler : None ,
@@ -156,6 +174,12 @@ impl VacuumBuilder {
156174 self
157175 }
158176
177+ /// Override the default vacuum mode (lite)
178+ pub fn with_mode ( mut self , mode : VacuumMode ) -> Self {
179+ self . mode = mode;
180+ self
181+ }
182+
159183 /// Only determine which files should be deleted
160184 pub fn with_dry_run ( mut self , dry_run : bool ) -> Self {
161185 self . dry_run = dry_run;
@@ -189,6 +213,10 @@ impl VacuumBuilder {
189213
190214 /// Determine which files can be deleted. Does not actually perform the deletion
191215 async fn create_vacuum_plan ( & self ) -> Result < VacuumPlan , VacuumError > {
216+ if self . mode == VacuumMode :: Full {
217+ info ! ( "Vacuum configured to run with 'VacuumMode::Full'. It will scan for orphaned parquet files in the Delta table directory and remove those as well!" ) ;
218+ }
219+
192220 let min_retention = Duration :: milliseconds (
193221 self . snapshot
194222 . table_config ( )
@@ -228,12 +256,24 @@ impl VacuumBuilder {
228256 while let Some ( obj_meta) = all_files. next ( ) . await {
229257 // TODO should we allow NotFound here in case we have a temporary commit file in the list
230258 let obj_meta = obj_meta. map_err ( DeltaTableError :: from) ?;
231- if valid_files. contains ( & obj_meta. location ) // file is still being tracked in table
232- || !expired_tombstones. contains ( obj_meta. location . as_ref ( ) ) // file is not an expired tombstone
233- || is_hidden_directory ( partition_columns, & obj_meta. location ) ?
234- {
259+ // file is still being tracked in table
260+ if valid_files. contains ( & obj_meta. location ) {
235261 continue ;
236262 }
263+ if is_hidden_directory ( partition_columns, & obj_meta. location ) ? {
264+ continue ;
265+ }
266+ // file is not an expired tombstone _and_ this is a "Lite" vacuum
267+ // If the file is not an expired tombstone and we have gotten to here with a
268+ // VacuumMode::Full then it should be added to the deletion plan
269+ if !expired_tombstones. contains ( obj_meta. location . as_ref ( ) ) {
270+ if self . mode == VacuumMode :: Lite {
271+ debug ! ( "The file {:?} was not referenced in a log file, but VacuumMode::Lite means it will not be vacuumed" , & obj_meta. location) ;
272+ continue ;
273+ } else {
274+ debug ! ( "The file {:?} was not referenced in a log file, but VacuumMode::Full means it *will be vacuumed*" , & obj_meta. location) ;
275+ }
276+ }
237277
238278 files_to_delete. push ( obj_meta. location ) ;
239279 file_sizes. push ( obj_meta. size as i64 ) ;
@@ -436,7 +476,44 @@ mod tests {
436476 use std:: time:: SystemTime ;
437477
438478 #[ tokio:: test]
439- async fn vacuum_delta_8_0_table ( ) {
479+ async fn test_vacuum_full ( ) -> DeltaResult < ( ) > {
480+ let table = open_table ( "../test/tests/data/simple_commit" ) . await ?;
481+
482+ let ( _table, result) = VacuumBuilder :: new ( table. log_store ( ) , table. snapshot ( ) ?. clone ( ) )
483+ . with_retention_period ( Duration :: hours ( 0 ) )
484+ . with_dry_run ( true )
485+ . with_mode ( VacuumMode :: Lite )
486+ . with_enforce_retention_duration ( false )
487+ . await ?;
488+ // When running lite, this table with superfluous parquet files should not have anything to
489+ // delete
490+ assert ! ( result. files_deleted. is_empty( ) ) ;
491+
492+ let ( _table, result) = VacuumBuilder :: new ( table. log_store ( ) , table. snapshot ( ) ?. clone ( ) )
493+ . with_retention_period ( Duration :: hours ( 0 ) )
494+ . with_dry_run ( true )
495+ . with_mode ( VacuumMode :: Full )
496+ . with_enforce_retention_duration ( false )
497+ . await ?;
498+ let mut files_deleted = result. files_deleted . clone ( ) ;
499+ files_deleted. sort ( ) ;
500+ // When running with full, these superfluous parquet files which are not actually
501+ // referenced in the _delta_log commits should be considered for the
502+ // low-orbit ion-cannon
503+ assert_eq ! (
504+ files_deleted,
505+ vec![
506+ "part-00000-512e1537-8aaa-4193-b8b4-bef3de0de409-c000.snappy.parquet" ,
507+ "part-00000-b44fcdb0-8b06-4f3a-8606-f8311a96f6dc-c000.snappy.parquet" ,
508+ "part-00001-185eca06-e017-4dea-ae49-fc48b973e37e-c000.snappy.parquet" ,
509+ "part-00001-4327c977-2734-4477-9507-7ccf67924649-c000.snappy.parquet" ,
510+ ]
511+ ) ;
512+ Ok ( ( ) )
513+ }
514+
515+ #[ tokio:: test]
516+ async fn vacuum_delta_8_0_table ( ) -> DeltaResult < ( ) > {
440517 let table = open_table ( "../test/tests/data/delta-0.8.0" ) . await . unwrap ( ) ;
441518
442519 let result = VacuumBuilder :: new ( table. log_store ( ) , table. snapshot ( ) . unwrap ( ) . clone ( ) )
@@ -453,8 +530,7 @@ mod tests {
453530 . with_retention_period ( Duration :: hours ( 0 ) )
454531 . with_dry_run ( true )
455532 . with_enforce_retention_duration ( false )
456- . await
457- . unwrap ( ) ;
533+ . await ?;
458534 // do not enforce retention duration check with 0 hour will purge all files
459535 assert_eq ! (
460536 result. files_deleted,
@@ -465,8 +541,7 @@ mod tests {
465541 VacuumBuilder :: new ( table. log_store ( ) , table. snapshot ( ) . unwrap ( ) . clone ( ) )
466542 . with_retention_period ( Duration :: hours ( 169 ) )
467543 . with_dry_run ( true )
468- . await
469- . unwrap ( ) ;
544+ . await ?;
470545
471546 assert_eq ! (
472547 result. files_deleted,
@@ -483,9 +558,9 @@ mod tests {
483558 VacuumBuilder :: new ( table. log_store ( ) , table. snapshot ( ) . unwrap ( ) . clone ( ) )
484559 . with_retention_period ( Duration :: hours ( retention_hours as i64 ) )
485560 . with_dry_run ( true )
486- . await
487- . unwrap ( ) ;
561+ . await ?;
488562
489563 assert_eq ! ( result. files_deleted, empty) ;
564+ Ok ( ( ) )
490565 }
491566}
0 commit comments