@@ -15,6 +15,8 @@ use rand::random;
15
15
use chrono:: { DateTime , NaiveDate , NaiveDateTime , NaiveTime , Utc } ;
16
16
use etl:: types:: PgNumeric ;
17
17
use serde_json:: json;
18
+ use std:: collections:: HashMap ;
19
+ use std:: num:: NonZeroU64 ;
18
20
use std:: str:: FromStr ;
19
21
use std:: sync:: Arc ;
20
22
use uuid:: Uuid ;
@@ -1213,3 +1215,86 @@ async fn test_large_transaction_batching() {
1213
1215
// Due to the batch timeout, in practice, there will be more commits than the batch size.
1214
1216
assert ! ( commits. len( ) >= ( insert_count / batch_size) ) ;
1215
1217
}
1218
+
1219
+ #[ tokio:: test( flavor = "multi_thread" ) ]
1220
+ async fn compaction_minimizes_small_files ( ) {
1221
+ init_test_tracing ( ) ;
1222
+
1223
+ let database = spawn_source_database ( ) . await ;
1224
+ let database_schema = setup_test_database_schema ( & database, TableSelection :: UsersOnly ) . await ;
1225
+
1226
+ let delta_database = setup_delta_connection ( ) . await ;
1227
+
1228
+ let store = NotifyingStore :: new ( ) ;
1229
+
1230
+ // Configure compaction to run after every commit for the users table.
1231
+ let mut table_config: HashMap < String , Arc < etl_destinations:: deltalake:: DeltaTableConfig > > =
1232
+ HashMap :: new ( ) ;
1233
+ table_config. insert (
1234
+ database_schema. users_schema ( ) . name . name . clone ( ) ,
1235
+ Arc :: new ( etl_destinations:: deltalake:: DeltaTableConfig {
1236
+ compact_after_commits : Some ( NonZeroU64 :: new ( 1 ) . unwrap ( ) ) ,
1237
+ ..Default :: default ( )
1238
+ } ) ,
1239
+ ) ;
1240
+
1241
+ let raw_destination = delta_database
1242
+ . build_destination_with_config ( store. clone ( ) , table_config)
1243
+ . await ;
1244
+ let destination = TestDestinationWrapper :: wrap ( raw_destination) ;
1245
+
1246
+ // Use a batch size of 1 so each insert becomes a separate commit and small file.
1247
+ let pipeline_id: PipelineId = random ( ) ;
1248
+ let mut pipeline = create_pipeline_with (
1249
+ & database. config ,
1250
+ pipeline_id,
1251
+ database_schema. publication_name ( ) ,
1252
+ store. clone ( ) ,
1253
+ destination. clone ( ) ,
1254
+ Some ( BatchConfig {
1255
+ max_size : 1 ,
1256
+ max_fill_ms : 1000 ,
1257
+ } ) ,
1258
+ ) ;
1259
+
1260
+ let users_state_notify = store
1261
+ . notify_on_table_state_type (
1262
+ database_schema. users_schema ( ) . id ,
1263
+ TableReplicationPhaseType :: SyncDone ,
1264
+ )
1265
+ . await ;
1266
+
1267
+ pipeline. start ( ) . await . unwrap ( ) ;
1268
+ users_state_notify. notified ( ) . await ;
1269
+
1270
+ // Generate several inserts to create many small files (one per commit).
1271
+ let insert_count: u64 = 12 ;
1272
+ let event_notify = destination
1273
+ . wait_for_events_count ( vec ! [ ( EventType :: Insert , insert_count) ] )
1274
+ . await ;
1275
+
1276
+ for i in 1 ..=insert_count {
1277
+ database
1278
+ . insert_values (
1279
+ database_schema. users_schema ( ) . name . clone ( ) ,
1280
+ & [ "name" , "age" ] ,
1281
+ & [ & format ! ( "c_user_{i}" ) , & ( i as i32 ) ] ,
1282
+ )
1283
+ . await
1284
+ . unwrap ( ) ;
1285
+ }
1286
+
1287
+ event_notify. notified ( ) . await ;
1288
+
1289
+ tokio:: time:: sleep ( std:: time:: Duration :: from_secs ( 1 ) ) . await ;
1290
+
1291
+ pipeline. shutdown_and_wait ( ) . await . unwrap ( ) ;
1292
+
1293
+ let users_table = delta_database
1294
+ . load_table ( & database_schema. users_schema ( ) . name )
1295
+ . await
1296
+ . unwrap ( ) ;
1297
+
1298
+ assert_table_snapshot ! ( "compaction_minimizes_small_files" , users_table. clone( ) ) ;
1299
+ assert ! ( users_table. snapshot( ) . unwrap( ) . file_paths_iter( ) . count( ) <= 12 ) ;
1300
+ }
0 commit comments