Skip to content

Commit b3acc15

Browse files
Add compaction test
Signed-off-by: Abhi Agarwal <[email protected]>
1 parent e731f79 commit b3acc15

File tree

2 files changed

+111
-0
lines changed

2 files changed

+111
-0
lines changed

etl-destinations/tests/deltalake_pipeline.rs

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ use rand::random;
1515
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
1616
use etl::types::PgNumeric;
1717
use serde_json::json;
18+
use std::collections::HashMap;
19+
use std::num::NonZeroU64;
1820
use std::str::FromStr;
1921
use std::sync::Arc;
2022
use uuid::Uuid;
@@ -1213,3 +1215,86 @@ async fn test_large_transaction_batching() {
12131215
// Due to the batch timeout, in practice, there will be more commits than the batch size.
12141216
assert!(commits.len() >= (insert_count / batch_size));
12151217
}
1218+
1219+
#[tokio::test(flavor = "multi_thread")]
1220+
async fn compaction_minimizes_small_files() {
1221+
init_test_tracing();
1222+
1223+
let database = spawn_source_database().await;
1224+
let database_schema = setup_test_database_schema(&database, TableSelection::UsersOnly).await;
1225+
1226+
let delta_database = setup_delta_connection().await;
1227+
1228+
let store = NotifyingStore::new();
1229+
1230+
// Configure compaction to run after every commit for the users table.
1231+
let mut table_config: HashMap<String, Arc<etl_destinations::deltalake::DeltaTableConfig>> =
1232+
HashMap::new();
1233+
table_config.insert(
1234+
database_schema.users_schema().name.name.clone(),
1235+
Arc::new(etl_destinations::deltalake::DeltaTableConfig {
1236+
compact_after_commits: Some(NonZeroU64::new(1).unwrap()),
1237+
..Default::default()
1238+
}),
1239+
);
1240+
1241+
let raw_destination = delta_database
1242+
.build_destination_with_config(store.clone(), table_config)
1243+
.await;
1244+
let destination = TestDestinationWrapper::wrap(raw_destination);
1245+
1246+
// Use a batch size of 1 so each insert becomes a separate commit and small file.
1247+
let pipeline_id: PipelineId = random();
1248+
let mut pipeline = create_pipeline_with(
1249+
&database.config,
1250+
pipeline_id,
1251+
database_schema.publication_name(),
1252+
store.clone(),
1253+
destination.clone(),
1254+
Some(BatchConfig {
1255+
max_size: 1,
1256+
max_fill_ms: 1000,
1257+
}),
1258+
);
1259+
1260+
let users_state_notify = store
1261+
.notify_on_table_state_type(
1262+
database_schema.users_schema().id,
1263+
TableReplicationPhaseType::SyncDone,
1264+
)
1265+
.await;
1266+
1267+
pipeline.start().await.unwrap();
1268+
users_state_notify.notified().await;
1269+
1270+
// Generate several inserts to create many small files (one per commit).
1271+
let insert_count: u64 = 12;
1272+
let event_notify = destination
1273+
.wait_for_events_count(vec![(EventType::Insert, insert_count)])
1274+
.await;
1275+
1276+
for i in 1..=insert_count {
1277+
database
1278+
.insert_values(
1279+
database_schema.users_schema().name.clone(),
1280+
&["name", "age"],
1281+
&[&format!("c_user_{i}"), &(i as i32)],
1282+
)
1283+
.await
1284+
.unwrap();
1285+
}
1286+
1287+
event_notify.notified().await;
1288+
1289+
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1290+
1291+
pipeline.shutdown_and_wait().await.unwrap();
1292+
1293+
let users_table = delta_database
1294+
.load_table(&database_schema.users_schema().name)
1295+
.await
1296+
.unwrap();
1297+
1298+
assert_table_snapshot!("compaction_minimizes_small_files", users_table.clone());
1299+
assert!(users_table.snapshot().unwrap().file_paths_iter().count() <= 12);
1300+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
---
2+
source: etl-destinations/tests/deltalake_pipeline.rs
3+
expression: users_table
4+
---
5+
# Schema
6+
- id: Primitive(Long) nullable=false
7+
- name: Primitive(String) nullable=false
8+
- age: Primitive(Integer) nullable=false
9+
10+
# Data
11+
+----+-----------+-----+
12+
| id | name | age |
13+
+----+-----------+-----+
14+
| 1 | c_user_1 | 1 |
15+
| 2 | c_user_2 | 2 |
16+
| 3 | c_user_3 | 3 |
17+
| 4 | c_user_4 | 4 |
18+
| 5 | c_user_5 | 5 |
19+
| 6 | c_user_6 | 6 |
20+
| 7 | c_user_7 | 7 |
21+
| 8 | c_user_8 | 8 |
22+
| 9 | c_user_9 | 9 |
23+
| 10 | c_user_10 | 10 |
24+
| 11 | c_user_11 | 11 |
25+
| 12 | c_user_12 | 12 |
26+
+----+-----------+-----+

0 commit comments

Comments
 (0)