Skip to content

Commit 82301bd

Browse files
committed
chore: resolve feedback
Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com>
1 parent 974c2a1 commit 82301bd

File tree

2 files changed

+20
-15
lines changed

2 files changed

+20
-15
lines changed

crates/core/src/protocol/log_compaction.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use crate::protocol::to_rb;
1313

1414
use crate::{DeltaResult, DeltaTable, DeltaTableError};
1515

16+
#[tracing::instrument(skip(log_store, snapshot), fields(operation = "log_compaction", start_version = start_version, end_version = end_version, table_uri = %log_store.root_url()))]
1617
pub(crate) async fn compact_logs_for(
1718
start_version: u64,
1819
end_version: u64,
@@ -29,12 +30,19 @@ pub(crate) async fn compact_logs_for(
2930
"Invalid version range: end_version {end_version} must be greater than start_version {start_version}"
3031
)));
3132
}
33+
let mut inner_snapshot = snapshot.inner.clone();
3234

33-
let new_snapshot = KernelSnapshot::builder_from(snapshot.inner.clone())
34-
.at_version(end_version)
35-
.build(task_engine.as_ref())?;
35+
if end_version > inner_snapshot.version() {
36+
inner_snapshot = spawn_blocking_with_span(move || {
37+
KernelSnapshot::builder_from(inner_snapshot)
38+
.at_version(end_version)
39+
.build(task_engine.as_ref())
40+
})
41+
.await
42+
.map_err(|e| DeltaTableError::Generic(e.to_string()))??;
43+
}
3644

37-
let mut lc_writer = new_snapshot.log_compaction_writer(start_version, end_version)?;
45+
let mut lc_writer = inner_snapshot.log_compaction_writer(start_version, end_version)?;
3846

3947
let lc_url = lc_writer.compaction_path();
4048
let lc_path = Path::from_url_path(lc_url.path())?;

python/src/lib.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1563,8 +1563,8 @@ impl RawDeltaTable {
15631563
pub fn compact_logs(
15641564
&self,
15651565
py: Python,
1566-
starting_version: i64,
1567-
ending_version: i64,
1566+
starting_version: u64,
1567+
ending_version: u64,
15681568
) -> PyResult<()> {
15691569
py.detach(|| {
15701570
let operation_id = Uuid::new_v4();
@@ -1585,15 +1585,12 @@ impl RawDeltaTable {
15851585
#[allow(clippy::await_holding_lock)]
15861586
let result = rt().block_on(async {
15871587
match self._table.lock() {
1588-
Ok(table) => compact_logs(
1589-
&table,
1590-
starting_version as u64,
1591-
ending_version as u64,
1592-
Some(operation_id),
1593-
)
1594-
.await
1595-
.map_err(PythonError::from)
1596-
.map_err(PyErr::from),
1588+
Ok(table) => {
1589+
compact_logs(&table, starting_version, ending_version, Some(operation_id))
1590+
.await
1591+
.map_err(PythonError::from)
1592+
.map_err(PyErr::from)
1593+
}
15971594
Err(e) => Err(PyRuntimeError::new_err(e.to_string())),
15981595
}
15991596
});

0 commit comments

Comments
 (0)