Skip to content

Commit e5a7963

Browse files
committed
chore!: remove get_earliest_version
Signed-off-by: Robert Pack <[email protected]>
1 parent 2f096a9 commit e5a7963

File tree

7 files changed

+4
-99
lines changed

7 files changed

+4
-99
lines changed

crates/aws/src/logstore/default_logstore.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,6 @@ impl LogStore for S3LogStore {
118118
get_latest_version(self, current_version).await
119119
}
120120

121-
async fn get_earliest_version(&self, current_version: i64) -> DeltaResult<i64> {
122-
get_earliest_version(self, current_version).await
123-
}
124-
125121
fn object_store(&self, _operation_id: Option<Uuid>) -> Arc<dyn ObjectStore> {
126122
self.prefixed_store.clone()
127123
}

crates/aws/src/logstore/dynamodb_logstore.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -310,10 +310,6 @@ impl LogStore for S3DynamoDbLogStore {
310310
}
311311
}
312312

313-
async fn get_earliest_version(&self, current_version: i64) -> DeltaResult<i64> {
314-
get_earliest_version(self, current_version).await
315-
}
316-
317313
fn object_store(&self, _operation_id: Option<Uuid>) -> ObjectStoreRef {
318314
self.prefixed_store.clone()
319315
}

crates/core/src/logstore/default_logstore.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,6 @@ impl LogStore for DefaultLogStore {
111111
super::get_latest_version(self, current_version).await
112112
}
113113

114-
async fn get_earliest_version(&self, current_version: i64) -> DeltaResult<i64> {
115-
super::get_earliest_version(self, current_version).await
116-
}
117-
118114
fn object_store(&self, _: Option<Uuid>) -> Arc<dyn ObjectStore> {
119115
self.prefixed_store.clone()
120116
}

crates/core/src/logstore/mod.rs

Lines changed: 3 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
//!
4848
//! ## Configuration
4949
//!
50-
use std::cmp::min;
5150
use std::collections::HashMap;
5251
use std::io::{BufRead, BufReader, Cursor};
5352
use std::sync::{Arc, LazyLock};
@@ -62,7 +61,7 @@ use delta_kernel::engine::default::DefaultEngine;
6261
use delta_kernel::log_segment::LogSegment;
6362
use delta_kernel::path::{LogPathFileType, ParsedLogPath};
6463
use delta_kernel::{AsAny, Engine};
65-
use futures::{StreamExt, TryStreamExt};
64+
use futures::StreamExt;
6665
use object_store::ObjectStoreScheme;
6766
use object_store::{path::Path, Error as ObjectStoreError, ObjectStore};
6867
use regex::Regex;
@@ -303,9 +302,6 @@ pub trait LogStore: Send + Sync + AsAny {
303302
/// Find latest version currently stored in the delta log.
304303
async fn get_latest_version(&self, start_version: i64) -> DeltaResult<i64>;
305304

306-
/// Find earliest version currently stored in the delta log.
307-
async fn get_earliest_version(&self, start_version: i64) -> DeltaResult<i64>;
308-
309305
/// Get the list of actions for the next commit
310306
async fn peek_next_commit(&self, current_version: i64) -> DeltaResult<PeekCommit> {
311307
let next_version = current_version + 1;
@@ -461,10 +457,6 @@ impl<T: LogStore + ?Sized> LogStore for Arc<T> {
461457
T::get_latest_version(self, start_version).await
462458
}
463459

464-
async fn get_earliest_version(&self, start_version: i64) -> DeltaResult<i64> {
465-
T::get_earliest_version(self, start_version).await
466-
}
467-
468460
async fn peek_next_commit(&self, current_version: i64) -> DeltaResult<PeekCommit> {
469461
T::peek_next_commit(self, current_version).await
470462
}
@@ -693,58 +685,6 @@ pub async fn get_latest_version(
693685
Ok(segment.end_version as i64)
694686
}
695687

696-
/// Default implementation for retrieving the earliest version
697-
pub async fn get_earliest_version(
698-
log_store: &dyn LogStore,
699-
current_version: i64,
700-
) -> DeltaResult<i64> {
701-
let current_version = if current_version < 0 {
702-
0
703-
} else {
704-
current_version
705-
};
706-
707-
let storage = log_store.engine(None).await.storage_handler();
708-
let log_root = log_store.log_root_url();
709-
710-
let segment = spawn_blocking(move || {
711-
LogSegment::for_table_changes(storage.as_ref(), log_root, current_version as u64, None)
712-
})
713-
.await
714-
.map_err(|e| DeltaTableError::Generic(e.to_string()))??;
715-
716-
let version_start = segment.checkpoint_version.unwrap_or(current_version as u64);
717-
718-
// list files to find min version
719-
let version = async {
720-
let mut min_version: i64 = version_start as i64;
721-
let prefix = Some(log_store.log_path());
722-
let offset_path = commit_uri_from_version(version_start as i64);
723-
let object_store = log_store.object_store(None);
724-
725-
// Manually filter until we can provide direction in https://github.com/apache/arrow-rs/issues/6274
726-
let mut files = object_store
727-
.list(prefix)
728-
.try_filter(move |f| futures::future::ready(f.location < offset_path))
729-
.boxed();
730-
731-
while let Some(obj_meta) = files.next().await {
732-
let obj_meta = obj_meta?;
733-
if let Some(log_version) = extract_version_from_filename(obj_meta.location.as_ref()) {
734-
min_version = min(min_version, log_version);
735-
}
736-
}
737-
738-
if min_version < 0 {
739-
return Err(DeltaTableError::not_a_table(log_store.root_uri()));
740-
}
741-
742-
Ok::<i64, DeltaTableError>(min_version)
743-
}
744-
.await?;
745-
Ok(version)
746-
}
747-
748688
/// Read delta log for a specific version
749689
pub async fn read_commit_entry(
750690
storage: &dyn ObjectStore,
@@ -792,6 +732,8 @@ pub async fn abort_commit_entry(
792732

793733
#[cfg(test)]
794734
pub(crate) mod tests {
735+
use futures::TryStreamExt;
736+
795737
use super::*;
796738

797739
#[test]

crates/core/src/table/mod.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -179,11 +179,6 @@ impl DeltaTable {
179179
self.log_store.get_latest_version(self.version()).await
180180
}
181181

182-
/// returns the earliest available version of the table
183-
pub async fn get_earliest_version(&self) -> Result<i64, DeltaTableError> {
184-
self.log_store.get_earliest_version(self.version()).await
185-
}
186-
187182
/// Currently loaded version of the table
188183
pub fn version(&self) -> i64 {
189184
self.state.as_ref().map(|s| s.version()).unwrap_or(-1)

crates/lakefs/src/logstore.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -357,10 +357,6 @@ impl LogStore for LakeFSLogStore {
357357
get_latest_version(self, current_version).await
358358
}
359359

360-
async fn get_earliest_version(&self, current_version: i64) -> DeltaResult<i64> {
361-
get_earliest_version(self, current_version).await
362-
}
363-
364360
fn object_store(&self, operation_id: Option<Uuid>) -> Arc<dyn ObjectStore> {
365361
match operation_id {
366362
Some(id) => {

python/src/lib.rs

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use deltalake::datafusion::logical_expr::LogicalPlanBuilder;
2222
use deltalake::datafusion::prelude::SessionContext;
2323
use deltalake::delta_datafusion::DeltaCdfTableProvider;
2424
use deltalake::errors::DeltaTableError;
25-
use deltalake::kernel::transaction::{CommitBuilder, CommitProperties, TableReference, PROTOCOL};
25+
use deltalake::kernel::transaction::{CommitBuilder, CommitProperties, TableReference};
2626
use deltalake::kernel::{
2727
scalars::ScalarExt, Action, Add, LogicalFile, Remove, StructType, Transaction,
2828
};
@@ -336,22 +336,6 @@ impl RawDeltaTable {
336336
})
337337
}
338338

339-
fn get_earliest_version(&self, py: Python) -> PyResult<i64> {
340-
py.allow_threads(|| {
341-
#[allow(clippy::await_holding_lock)]
342-
rt().block_on(async {
343-
match self._table.lock() {
344-
Ok(table) => table
345-
.get_earliest_version()
346-
.await
347-
.map_err(PythonError::from)
348-
.map_err(PyErr::from),
349-
Err(e) => Err(PyRuntimeError::new_err(e.to_string())),
350-
}
351-
})
352-
})
353-
}
354-
355339
fn get_num_index_cols(&self) -> PyResult<i32> {
356340
self.with_table(|t| {
357341
Ok(t.snapshot()

0 commit comments

Comments
 (0)