Skip to content

Commit 1824127

Browse files
committed
run light optimize every 5 mins but with 16mb targets, and main optimize would have 128mb targets
1 parent f4aa919 commit 1824127

File tree

1 file changed

+8
-12
lines changed

1 file changed

+8
-12
lines changed

src/database.rs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ pub fn extract_project_id(batch: &RecordBatch) -> Option<String> {
5757

5858
// Constants for optimization and vacuum operations
5959
const DEFAULT_VACUUM_RETENTION_HOURS: u64 = 72; // 2 weeks
60-
const DEFAULT_OPTIMIZE_TARGET_SIZE: i64 = 536870912; // 512MB
60+
const DEFAULT_OPTIMIZE_TARGET_SIZE: i64 = 128 * 1024 * 1024; // 512MB
6161
const DEFAULT_PAGE_ROW_COUNT_LIMIT: usize = 20000;
6262
const ZSTD_COMPRESSION_LEVEL: i32 = 6; // Balance between compression ratio and speed
6363

@@ -468,7 +468,7 @@ impl Database {
468468
Box::pin(async move {
469469
info!("Running scheduled optimize on all tables");
470470
for ((project_id, table_name), table) in db.project_configs.read().await.iter() {
471-
if let Err(e) = db.optimize_table(table, None).await {
471+
if let Err(e) = db.optimize_table(table, table_name, None).await {
472472
error!("Optimize failed for project '{}' table '{}': {}", project_id, table_name, e);
473473
}
474474
}
@@ -975,7 +975,7 @@ impl Database {
975975
let delta_ops = DeltaOps::try_from_uri_with_storage_options(&storage_uri, storage_options.clone()).await?;
976976
let commit_properties = CommitProperties::default().with_create_checkpoint(true).with_cleanup_expired_logs(Some(true));
977977

978-
let checkpoint_interval = env::var("TIMEFUSION_CHECKPOINT_INTERVAL").unwrap_or_else(|_| "50".to_string());
978+
let checkpoint_interval = env::var("TIMEFUSION_CHECKPOINT_INTERVAL").unwrap_or_else(|_| "10".to_string());
979979

980980
let mut config = HashMap::new();
981981
config.insert("delta.checkpointInterval".to_string(), Some(checkpoint_interval));
@@ -1236,7 +1236,7 @@ impl Database {
12361236

12371237
/// Optimize the Delta table using Z-ordering on timestamp and id columns
12381238
/// This improves query performance for time-based queries
1239-
pub async fn optimize_table(&self, table_ref: &Arc<RwLock<DeltaTable>>, _target_size: Option<i64>) -> Result<()> {
1239+
pub async fn optimize_table(&self, table_ref: &Arc<RwLock<DeltaTable>>, table_name: &str, _target_size: Option<i64>) -> Result<()> {
12401240
// Log the start of the optimization operation
12411241
let start_time = std::time::Instant::now();
12421242
info!("Starting Delta table optimization with Z-ordering (last 28 hours only)");
@@ -1271,7 +1271,7 @@ impl Database {
12711271
.optimize()
12721272
.with_filters(&partition_filters)
12731273
.with_type(deltalake::operations::optimize::OptimizeType::ZOrder(
1274-
get_default_schema().z_order_columns.clone(),
1274+
get_schema(table_name).unwrap_or_else(get_default_schema).z_order_columns.clone(),
12751275
))
12761276
.with_target_size(target_size)
12771277
.with_writer_properties(writer_properties)
@@ -1320,10 +1320,6 @@ impl Database {
13201320
table.clone()
13211321
};
13221322

1323-
// Target 64MB files for quick compaction of small files
1324-
let target_size = 67_108_864; // 64MB
1325-
1326-
// Only optimize today's partition for light optimization
13271323
let today = Utc::now().date_naive();
13281324
info!("Light optimizing files from date: {}", today);
13291325

@@ -1334,9 +1330,9 @@ impl Database {
13341330
.optimize()
13351331
.with_filters(&partition_filters)
13361332
.with_type(deltalake::operations::optimize::OptimizeType::Compact)
1337-
.with_target_size(target_size)
1333+
.with_target_size(16 * 1024 * 1024)
13381334
.with_writer_properties(Self::create_writer_properties())
1339-
.with_min_commit_interval(tokio::time::Duration::from_secs(60)) // 1 minute min interval
1335+
.with_min_commit_interval(tokio::time::Duration::from_secs(30)) // 1 minute min interval
13401336
.await;
13411337

13421338
match optimize_result {
@@ -2242,7 +2238,7 @@ mod tests {
22422238

22432239
// Get the table and optimize it
22442240
if let Ok(table_ref) = db.get_or_create_table(&project, "otel_logs_and_spans").await {
2245-
let _ = db.optimize_table(&table_ref, Some(1024 * 1024)).await;
2241+
let _ = db.optimize_table(&table_ref, "otel_logs_and_spans", Some(1024 * 1024)).await;
22462242
}
22472243
})
22482244
};

0 commit comments

Comments
 (0)