Skip to content

Commit c63acb9

Browse files
committed
Fix:A bunch of tiny improvements here and there
1 parent 6f40955 commit c63acb9

File tree

7 files changed

+328
-111
lines changed

7 files changed

+328
-111
lines changed

Cargo.lock

Lines changed: 163 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,8 @@ rustls = "0.23.23"
3939
actix-service = "2.0.2"
4040
lazy_static = "1.5.0"
4141
bcrypt = "0.17.0"
42+
opentelemetry = "0.28.0"
43+
opentelemetry-otlp = "0.28.0"
44+
tracing-opentelemetry = "0.29.0"
45+
bincode = "1.3.3"
46+
opentelemetry_sdk = { version = "0.28.0", features = ["experimental_async_runtime"] }

src/database.rs

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use datafusion::prelude::*;
22
use datafusion::arrow::record_batch::RecordBatch;
33
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
44
use datafusion::arrow::array::{StringArray, TimestampMicrosecondArray, Int32Array, Int64Array, ListBuilder, StringBuilder};
5-
use datafusion::common::DataFusionError;
65
use deltalake::{DeltaTableBuilder, DeltaOps};
76
use std::collections::HashMap;
87
use std::sync::{Arc, RwLock};
@@ -397,21 +396,18 @@ impl Database {
397396
let configs = self.project_configs.read().map_err(|e| anyhow::anyhow!("Lock error: {:?}", e))?;
398397
configs.get(project_id).ok_or_else(|| anyhow::anyhow!("Project not found"))?.clone()
399398
};
400-
let table_name = "table_events".to_string();
401-
let optimize_sql = format!("OPTIMIZE TABLE {}", table_name);
402-
match self.ctx.sql(&optimize_sql).await {
403-
Ok(_) => {
404-
let new_table = DeltaTableBuilder::from_uri(&conn_str).load().await?;
405-
*table_ref.write().map_err(|e| anyhow::anyhow!("Lock error: {:?}", e))? = new_table;
406-
COMPACTION_COUNTER.inc();
407-
tracing::info!("Compaction for project '{}' completed.", project_id);
408-
Ok(())
409-
}
410-
Err(e) => {
411-
tracing::error!("Failed to execute '{}': {:?}", optimize_sql, e);
412-
Err(anyhow::anyhow!("Compaction failed: {:?}", e))
413-
}
414-
}
399+
400+
let (table, _metrics) = DeltaOps::try_from_uri(&conn_str)
401+
.await
402+
.map_err(|e| anyhow::anyhow!("Failed to load table for optimization: {:?}", e))?
403+
.optimize()
404+
.await
405+
.map_err(|e| anyhow::anyhow!("Optimization failed: {:?}", e))?;
406+
407+
*table_ref.write().map_err(|e| anyhow::anyhow!("Lock error: {:?}", e))? = table;
408+
COMPACTION_COUNTER.inc();
409+
tracing::info!("Compaction for project '{}' completed.", project_id);
410+
Ok(())
415411
}
416412

417413
pub async fn compact_all_projects(&self) -> Result<()> {

0 commit comments

Comments
 (0)