Skip to content

Commit ce6ffbf

Browse files
committed
feat: expose peek next commit function to python
1 parent b3f478e commit ce6ffbf

File tree

5 files changed

+64
-1
lines changed

5 files changed

+64
-1
lines changed

crates/deltalake-core/src/errors.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ pub enum DeltaTableError {
7676
#[error("Invalid table version: {0}")]
7777
InvalidVersion(i64),
7878

79+
/// Error returned when the DeltaTable has no delta log version.
80+
#[error("Delta log not found for table version: {0}")]
81+
DeltaLogNotFound(i64),
82+
7983
/// Error returned when the DeltaTable has no data files.
8084
#[error("Corrupted table, cannot read data file {}: {}", .path, .source)]
8185
MissingDataFile {

crates/deltalake-core/src/table/mod.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,23 @@ impl DeltaTable {
484484
self.update().await
485485
}
486486

487+
/// Get the commit obj from the version
488+
pub async fn get_obj_from_version(
489+
&self,
490+
current_version: i64,
491+
) -> Result<Bytes, DeltaTableError> {
492+
let commit_uri = commit_uri_from_version(current_version);
493+
let commit_log_bytes = self.storage.get(&commit_uri).await;
494+
let commit_log_bytes = match commit_log_bytes {
495+
Err(ObjectStoreError::NotFound { .. }) => {
496+
return Err(DeltaTableError::DeltaLogNotFound(current_version));
497+
},
498+
Err(err) => Err(err),
499+
Ok(result) => result.bytes().await,
500+
}?;
501+
Ok(commit_log_bytes)
502+
}
503+
487504
/// Get the list of actions for the next commit
488505
pub async fn peek_next_commit(
489506
&self,

python/deltalake/table.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import json
22
import operator
33
import warnings
4+
import logging
45
from dataclasses import dataclass
56
from datetime import datetime, timedelta
67
from functools import reduce
@@ -36,7 +37,7 @@
3637
from ._internal import RawDeltaTable
3738
from ._util import encode_partition_value
3839
from .data_catalog import DataCatalog
39-
from .exceptions import DeltaProtocolError
40+
from .exceptions import DeltaError, DeltaProtocolError
4041
from .fs import DeltaStorageHandler
4142
from .schema import Schema
4243

@@ -250,6 +251,7 @@ def __init__(
250251
251252
"""
252253
self._storage_options = storage_options
254+
self._latest_version = -1
253255
self._table = RawDeltaTable(
254256
str(table_uri),
255257
version=version,
@@ -796,6 +798,31 @@ def update_incremental(self) -> None:
796798
"""
797799
self._table.update_incremental()
798800

801+
def peek_next_commit(
802+
self, version: int
803+
) -> Tuple[Optional[List[Dict[Any, Any]]], int]:
804+
"""
805+
Peek next commit of the input version.
806+
"""
807+
actions = []
808+
next_version = version + 1
809+
if next_version > self._latest_version:
810+
self._latest_version = self._table.get_latest_version()
811+
while next_version <= self._latest_version:
812+
try:
813+
commit_log_bytes = self._table.get_obj(next_version)
814+
for commit_action in commit_log_bytes.split(b"\n"):
815+
if commit_action:
816+
actions.append(json.loads(commit_action))
817+
return actions, next_version
818+
except DeltaError as e:
819+
if str(e) == f"Delta log not found for table version: {next_version}":
820+
next_version += 1
821+
else:
822+
raise
823+
logging.info(f"Provided Delta Version is up to date. Version: {version}")
824+
return None, version
825+
799826
def create_checkpoint(self) -> None:
800827
self._table.create_checkpoint()
801828

python/src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,13 @@ impl RawDeltaTable {
154154
Ok(self._table.version())
155155
}
156156

157+
pub fn get_obj<'py>(&self, py: Python<'py>, version: i64) -> PyResult<&'py PyBytes> {
158+
let commit_log_bytes = rt()?
159+
.block_on(self._table.get_obj_from_version(version))
160+
.map_err(PythonError::from)?;
161+
return Ok(PyBytes::new(py, &commit_log_bytes));
162+
}
163+
157164
pub fn metadata(&self) -> PyResult<RawDeltaTableMetaData> {
158165
let metadata = self._table.get_metadata().map_err(PythonError::from)?;
159166
Ok(RawDeltaTableMetaData {

python/tests/test_table_read.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,14 @@ def test_writer_fails_on_protocol():
495495
dt.to_pandas()
496496

497497

498+
@pytest.mark.parametrize("version, expected", [(2, (5, 3))])
499+
def test_peek_next_commit(version, expected):
500+
table_path = "../rust/tests/data/simple_table"
501+
dt = DeltaTable(table_path)
502+
actions, current_version = dt.peek_next_commit(version=version)
503+
assert (len(actions), current_version) == expected
504+
505+
498506
class ExcPassThroughThread(Thread):
499507
"""Wrapper around `threading.Thread` that propagates exceptions."""
500508

0 commit comments

Comments
 (0)