Skip to content

Commit 53ba1e8

Browse files
committed
chore: return a bool on snapshot update to indicate whether files were read
This makes it easier to understand whether or not the function has actually _read_ the ReplayStream Signed-off-by: R. Tyler Croy <[email protected]>
1 parent b8b59d1 commit 53ba1e8

File tree

6 files changed

+41
-35
lines changed

6 files changed

+41
-35
lines changed

crates/core/src/kernel/snapshot/mod.rs

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -432,13 +432,18 @@ impl EagerSnapshot {
432432
}
433433

434434
/// Update the snapshot to the given version
435+
///
436+
/// This will return a true value if the [LogStore] was read from. This can be helpful for
437+
/// understanding whether the snapshot loaded data or not
435438
pub async fn update(
436439
&mut self,
437440
log_store: &dyn LogStore,
438441
target_version: Option<i64>,
439-
) -> DeltaResult<()> {
442+
) -> DeltaResult<bool> {
443+
// Whether or not data has been read by this function
444+
let mut read_data = false;
440445
if Some(self.version()) == target_version {
441-
return Ok(());
446+
return Ok(read_data);
442447
}
443448

444449
let new_slice = self
@@ -447,7 +452,7 @@ impl EagerSnapshot {
447452
.await?;
448453

449454
if new_slice.is_none() {
450-
return Ok(());
455+
return Ok(read_data);
451456
}
452457
let new_slice = new_slice.unwrap();
453458

@@ -457,9 +462,15 @@ impl EagerSnapshot {
457462
.flat_map(get_visitor)
458463
.collect::<Vec<_>>();
459464

465+
// If files is `None` then this can exit early because the snapshot has intentionally been
466+
// loaded _without_ files
467+
if self.files.is_none() {
468+
self.process_visitors(visitors)?;
469+
return Ok(read_data);
470+
}
471+
460472
let mut schema_actions: HashSet<_> =
461473
visitors.iter().flat_map(|v| v.required_actions()).collect();
462-
let require_files = self.files.is_some();
463474
let files = std::mem::take(&mut self.files);
464475

465476
schema_actions.insert(ActionType::Add);
@@ -479,23 +490,17 @@ impl EagerSnapshot {
479490
let log_stream = new_slice.commit_stream(log_store, &read_schema, &self.snapshot.config)?;
480491

481492
let mapper = LogMapper::try_new(&self.snapshot, None)?;
493+
let files =
494+
ReplayStream::try_new(log_stream, checkpoint_stream, &self.snapshot, &mut visitors)?
495+
.map(|batch| batch.and_then(|b| mapper.map_batch(b)))
496+
.try_collect()
497+
.await?;
482498

483-
if require_files {
484-
let files = ReplayStream::try_new(
485-
log_stream,
486-
checkpoint_stream,
487-
&self.snapshot,
488-
&mut visitors,
489-
)?
490-
.map(|batch| batch.and_then(|b| mapper.map_batch(b)))
491-
.try_collect()
492-
.await?;
493-
494-
self.files = Some(files);
495-
}
499+
self.files = Some(files);
500+
read_data = true;
496501
self.process_visitors(visitors)?;
497502

498-
Ok(())
503+
Ok(read_data)
499504
}
500505

501506
/// Get the underlying snapshot
@@ -550,7 +555,7 @@ impl EagerSnapshot {
550555
pub fn log_data(&self) -> LogDataHandler<'_> {
551556
static EMPTY: Vec<RecordBatch> = vec![];
552557
LogDataHandler::new(
553-
&self.files.as_ref().unwrap_or(&EMPTY),
558+
self.files.as_ref().unwrap_or(&EMPTY),
554559
self.metadata(),
555560
self.schema(),
556561
)

crates/core/src/lib.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -231,9 +231,11 @@ mod tests {
231231
let table_newest_version = crate::open_table(path).await.unwrap();
232232
let mut table_to_update = crate::open_table_with_version(path, 0).await.unwrap();
233233
// calling update several times should not produce any duplicates
234-
table_to_update.update().await.unwrap();
235-
table_to_update.update().await.unwrap();
236-
table_to_update.update().await.unwrap();
234+
// The first call should have read some data
235+
assert_eq!(true, table_to_update.update().await.unwrap());
236+
// Subsequent calls should not
237+
assert_eq!(false, table_to_update.update().await.unwrap());
238+
assert_eq!(false, table_to_update.update().await.unwrap());
237239

238240
assert_eq!(
239241
table_newest_version.get_files_iter().unwrap().collect_vec(),

crates/core/src/table/builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ impl DeltaTableBuilder {
290290
DeltaVersion::Newest => table.load().await?,
291291
DeltaVersion::Version(v) => table.load_version(v).await?,
292292
DeltaVersion::Timestamp(ts) => table.load_with_datetime(ts).await?,
293-
}
293+
};
294294
Ok(table)
295295
}
296296
}

crates/core/src/table/mod.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -195,13 +195,13 @@ impl DeltaTable {
195195
}
196196

197197
/// Load DeltaTable with data from latest checkpoint
198-
pub async fn load(&mut self) -> Result<(), DeltaTableError> {
198+
pub async fn load(&mut self) -> Result<bool, DeltaTableError> {
199199
self.update_incremental(None).await
200200
}
201201

202202
/// Updates the DeltaTable to the most recent state committed to the transaction log by
203203
/// loading the last checkpoint and incrementally applying each version since.
204-
pub async fn update(&mut self) -> Result<(), DeltaTableError> {
204+
pub async fn update(&mut self) -> Result<bool, DeltaTableError> {
205205
self.update_incremental(None).await
206206
}
207207

@@ -210,21 +210,21 @@ impl DeltaTable {
210210
pub async fn update_incremental(
211211
&mut self,
212212
max_version: Option<i64>,
213-
) -> Result<(), DeltaTableError> {
213+
) -> Result<bool, DeltaTableError> {
214214
match self.state.as_mut() {
215215
Some(state) => state.update(&self.log_store, max_version).await,
216216
_ => {
217217
let state =
218218
DeltaTableState::try_new(&self.log_store, self.config.clone(), max_version)
219219
.await?;
220220
self.state = Some(state);
221-
Ok(())
221+
Ok(true)
222222
}
223223
}
224224
}
225225

226226
/// Loads the DeltaTable state for the given version.
227-
pub async fn load_version(&mut self, version: i64) -> Result<(), DeltaTableError> {
227+
pub async fn load_version(&mut self, version: i64) -> Result<bool, DeltaTableError> {
228228
if let Some(snapshot) = &self.state {
229229
if snapshot.version() > version {
230230
self.state = None;
@@ -379,7 +379,7 @@ impl DeltaTable {
379379
pub async fn load_with_datetime(
380380
&mut self,
381381
datetime: DateTime<Utc>,
382-
) -> Result<(), DeltaTableError> {
382+
) -> Result<bool, DeltaTableError> {
383383
let mut min_version: i64 = -1;
384384
let log_store = self.log_store();
385385
let prefix = log_store.log_path();

crates/core/src/table/state.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,9 +204,8 @@ impl DeltaTableState {
204204
&mut self,
205205
log_store: &dyn LogStore,
206206
version: Option<i64>,
207-
) -> Result<(), DeltaTableError> {
208-
self.snapshot.update(log_store, version).await?;
209-
Ok(())
207+
) -> Result<bool, DeltaTableError> {
208+
self.snapshot.update(log_store, version).await
210209
}
211210

212211
/// Obtain Add actions for files that match the filter

python/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ impl RawDeltaTable {
306306
/// Load the internal [RawDeltaTable] with the table state from the specified `version`
307307
///
308308
/// This will acquire the internal lock since it is a mutating operation!
309-
pub fn load_version(&self, py: Python, version: i64) -> PyResult<()> {
309+
pub fn load_version(&self, py: Python, version: i64) -> PyResult<bool> {
310310
py.allow_threads(|| {
311311
#[allow(clippy::await_holding_lock)]
312312
rt().block_on(async {
@@ -359,7 +359,7 @@ impl RawDeltaTable {
359359
})
360360
}
361361

362-
pub fn load_with_datetime(&self, py: Python, ds: &str) -> PyResult<()> {
362+
pub fn load_with_datetime(&self, py: Python, ds: &str) -> PyResult<bool> {
363363
py.allow_threads(|| {
364364
let datetime =
365365
DateTime::<Utc>::from(DateTime::<FixedOffset>::parse_from_rfc3339(ds).map_err(
@@ -1014,7 +1014,7 @@ impl RawDeltaTable {
10141014
.collect())
10151015
}
10161016

1017-
pub fn update_incremental(&self) -> PyResult<()> {
1017+
pub fn update_incremental(&self) -> PyResult<bool> {
10181018
#[allow(clippy::await_holding_lock)]
10191019
#[allow(deprecated)]
10201020
Ok(rt()

0 commit comments

Comments
 (0)