Skip to content

Commit 338c78a

Browse files
committed
feat: make EagerSnapshot file-loading be optional forever
Previously the files would be loaded on a conflict, now they're never loaded. Signed-off-by: Itamar Turner-Trauring <[email protected]>
1 parent f8dcef3 commit 338c78a

File tree

3 files changed

+57
-31
lines changed

3 files changed

+57
-31
lines changed

crates/core/src/delta_datafusion/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -721,7 +721,8 @@ impl<'a> DeltaScanBuilder<'a> {
721721
// Should we update datafusion_table_statistics to optionally take the mask?
722722
let stats = if let Some(mask) = pruning_mask {
723723
let es = self.snapshot.snapshot();
724-
let pruned_stats = prune_file_statistics(&es.files, mask);
724+
let empty = vec![];
725+
let pruned_stats = prune_file_statistics(es.files.as_ref().unwrap_or(&empty), mask);
725726
LogDataHandler::new(&pruned_stats, es.metadata(), es.schema()).statistics()
726727
} else {
727728
self.snapshot.datafusion_table_statistics()

crates/core/src/kernel/snapshot/mod.rs

Lines changed: 53 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::io::{BufRead, BufReader, Cursor};
2121
use ::serde::{Deserialize, Serialize};
2222
use arrow_array::RecordBatch;
2323
use delta_kernel::path::{LogPathFileType, ParsedLogPath};
24+
use either::Either;
2425
use futures::stream::BoxStream;
2526
use futures::{StreamExt, TryStreamExt};
2627
use object_store::path::Path;
@@ -343,8 +344,8 @@ pub struct EagerSnapshot {
343344
pub(crate) transactions: Option<HashMap<String, Transaction>>,
344345

345346
// NOTE: this is a Vec of RecordBatch instead of a single RecordBatch because
346-
// we do not yet enforce a consistent schema across all batches we read from the log.
347-
pub(crate) files: Vec<RecordBatch>,
347+
// we do not yet enforce a consistent schema across all batches we read from the log. If `None`, that indicates this was created with `config.require_files` set to `false`.
348+
pub(crate) files: Option<Vec<RecordBatch>>,
348349
}
349350

350351
impl EagerSnapshot {
@@ -371,13 +372,13 @@ impl EagerSnapshot {
371372
let snapshot = Snapshot::try_new(log_store, config.clone(), version).await?;
372373

373374
let files = match config.require_files {
374-
true => {
375+
true => Some(
375376
snapshot
376377
.files(log_store, &mut visitors)?
377378
.try_collect()
378-
.await?
379-
}
380-
false => vec![],
379+
.await?,
380+
),
381+
false => None,
381382
};
382383

383384
let mut sn = Self {
@@ -425,7 +426,7 @@ impl EagerSnapshot {
425426
.collect::<DeltaResult<Vec<_>>>()?;
426427
Ok(Self {
427428
snapshot,
428-
files,
429+
files: Some(files),
429430
tracked_actions: Default::default(),
430431
transactions: None,
431432
})
@@ -459,12 +460,13 @@ impl EagerSnapshot {
459460

460461
let mut schema_actions: HashSet<_> =
461462
visitors.iter().flat_map(|v| v.required_actions()).collect();
463+
let require_files = self.files.is_some();
462464
let files = std::mem::take(&mut self.files);
463465

464466
schema_actions.insert(ActionType::Add);
465467
let checkpoint_stream = if new_slice.checkpoint_files.is_empty() {
466-
// NOTE: we don't need to add the visitor relevant data here, as it is repüresented in the state already
467-
futures::stream::iter(files.into_iter().map(Ok)).boxed()
468+
// NOTE: we don't need to add the visitor relevant data here, as it is represented in the state already
469+
futures::stream::iter(files.unwrap_or_default().into_iter().map(Ok)).boxed()
468470
} else {
469471
let read_schema =
470472
StructType::new(schema_actions.iter().map(|a| a.schema_field().clone()));
@@ -479,13 +481,19 @@ impl EagerSnapshot {
479481

480482
let mapper = LogMapper::try_new(&self.snapshot, None)?;
481483

482-
let files =
483-
ReplayStream::try_new(log_stream, checkpoint_stream, &self.snapshot, &mut visitors)?
484-
.map(|batch| batch.and_then(|b| mapper.map_batch(b)))
485-
.try_collect()
486-
.await?;
484+
if require_files {
485+
let files = ReplayStream::try_new(
486+
log_stream,
487+
checkpoint_stream,
488+
&self.snapshot,
489+
&mut visitors,
490+
)?
491+
.map(|batch| batch.and_then(|b| mapper.map_batch(b)))
492+
.try_collect()
493+
.await?;
487494

488-
self.files = files;
495+
self.files = Some(files);
496+
}
489497
self.process_visitors(visitors)?;
490498

491499
Ok(())
@@ -541,17 +549,31 @@ impl EagerSnapshot {
541549

542550
/// Get a [`LogDataHandler`] for the snapshot to inspect the currently loaded state of the log.
543551
pub fn log_data(&self) -> LogDataHandler<'_> {
544-
LogDataHandler::new(&self.files, self.metadata(), self.schema())
552+
static EMPTY: Vec<RecordBatch> = vec![];
553+
LogDataHandler::new(
554+
&self.files.as_ref().unwrap_or(&EMPTY),
555+
self.metadata(),
556+
self.schema(),
557+
)
558+
}
559+
560+
/// Iterate over tracked `&RecordBatch`, if any.
561+
fn files_iter(&self) -> impl Iterator<Item = &RecordBatch> {
562+
if let Some(ref files) = self.files {
563+
Either::Left(files.iter())
564+
} else {
565+
Either::Right(std::iter::empty())
566+
}
545567
}
546568

547569
/// Get the number of files in the snapshot
548570
pub fn files_count(&self) -> usize {
549-
self.files.iter().map(|f| f.num_rows()).sum()
571+
self.files_iter().map(|f| f.num_rows()).sum()
550572
}
551573

552574
/// Get the files in the snapshot
553575
pub fn file_actions(&self) -> DeltaResult<impl Iterator<Item = Add> + '_> {
554-
Ok(self.files.iter().flat_map(|b| read_adds(b)).flatten())
576+
Ok(self.files_iter().flat_map(|b| read_adds(b)).flatten())
555577
}
556578

557579
/// Get a file action iterator for the given version
@@ -561,7 +583,7 @@ impl EagerSnapshot {
561583

562584
/// Get an iterator for the CDC files added in this version
563585
pub fn cdc_files(&self) -> DeltaResult<impl Iterator<Item = AddCDCFile> + '_> {
564-
Ok(self.files.iter().flat_map(|b| read_cdf_adds(b)).flatten())
586+
Ok(self.files_iter().flat_map(|b| read_cdf_adds(b)).flatten())
565587
}
566588

567589
/// Iterate over all latest app transactions
@@ -635,15 +657,18 @@ impl EagerSnapshot {
635657
LogMapper::try_new(&self.snapshot, None)?
636658
};
637659

638-
self.files = files
639-
.into_iter()
640-
.chain(
641-
self.files
642-
.iter()
643-
.flat_map(|batch| scanner.process_files_batch(batch, false)),
644-
)
645-
.map(|b| mapper.map_batch(b))
646-
.collect::<DeltaResult<Vec<_>>>()?;
660+
if self.files.is_some() {
661+
self.files = Some(
662+
files
663+
.into_iter()
664+
.chain(
665+
self.files_iter()
666+
.flat_map(|batch| scanner.process_files_batch(batch, false)),
667+
)
668+
.map(|b| mapper.map_batch(b))
669+
.collect::<DeltaResult<Vec<_>>>()?,
670+
);
671+
}
647672

648673
if let Some(metadata) = metadata {
649674
self.snapshot.metadata = metadata;

crates/core/src/kernel/snapshot/serde.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ impl Serialize for EagerSnapshot {
128128
seq.serialize_element(&self.snapshot)?;
129129
seq.serialize_element(&self.tracked_actions)?;
130130
seq.serialize_element(&self.transactions)?;
131-
for batch in self.files.iter() {
131+
for batch in self.files_iter() {
132132
let mut buffer = vec![];
133133
let mut writer = FileWriter::try_new(&mut buffer, batch.schema().as_ref())
134134
.map_err(serde::ser::Error::custom)?;
@@ -176,7 +176,7 @@ impl<'de> Visitor<'de> for EagerSnapshotVisitor {
176176
}
177177
Ok(EagerSnapshot {
178178
snapshot,
179-
files,
179+
files: Some(files),
180180
tracked_actions,
181181
transactions,
182182
})

0 commit comments

Comments
 (0)