Skip to content

Commit b8b59d1

Browse files
pythonspeedrtyler
authored andcommitted
feat: make EagerSnapshot file-loading be optional forever
Previously the files would be loaded on a conflict, now they're never loaded. Signed-off-by: Itamar Turner-Trauring <[email protected]>
1 parent 0372bff commit b8b59d1

File tree

3 files changed

+57
-31
lines changed

3 files changed

+57
-31
lines changed

crates/core/src/delta_datafusion/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -721,7 +721,8 @@ impl<'a> DeltaScanBuilder<'a> {
721721
// Should we update datafusion_table_statistics to optionally take the mask?
722722
let stats = if let Some(mask) = pruning_mask {
723723
let es = self.snapshot.snapshot();
724-
let pruned_stats = prune_file_statistics(&es.files, mask);
724+
let empty = vec![];
725+
let pruned_stats = prune_file_statistics(es.files.as_ref().unwrap_or(&empty), mask);
725726
LogDataHandler::new(&pruned_stats, es.metadata(), es.schema()).statistics()
726727
} else {
727728
self.snapshot.datafusion_table_statistics()

crates/core/src/kernel/snapshot/mod.rs

Lines changed: 53 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::io::{BufRead, BufReader, Cursor};
2121
use ::serde::{Deserialize, Serialize};
2222
use arrow_array::RecordBatch;
2323
use delta_kernel::path::{LogPathFileType, ParsedLogPath};
24+
use either::Either;
2425
use futures::stream::BoxStream;
2526
use futures::{StreamExt, TryStreamExt};
2627
use object_store::path::Path;
@@ -342,8 +343,8 @@ pub struct EagerSnapshot {
342343
pub(crate) transactions: Option<HashMap<String, Transaction>>,
343344

344345
// NOTE: this is a Vec of RecordBatch instead of a single RecordBatch because
345-
// we do not yet enforce a consistent schema across all batches we read from the log.
346-
pub(crate) files: Vec<RecordBatch>,
346+
// we do not yet enforce a consistent schema across all batches we read from the log. If `None`, that indicates this was created with `config.require_files` set to `false`.
347+
pub(crate) files: Option<Vec<RecordBatch>>,
347348
}
348349

349350
impl EagerSnapshot {
@@ -370,13 +371,13 @@ impl EagerSnapshot {
370371
let snapshot = Snapshot::try_new(log_store, config.clone(), version).await?;
371372

372373
let files = match config.require_files {
373-
true => {
374+
true => Some(
374375
snapshot
375376
.files(log_store, &mut visitors)?
376377
.try_collect()
377-
.await?
378-
}
379-
false => vec![],
378+
.await?,
379+
),
380+
false => None,
380381
};
381382

382383
let mut sn = Self {
@@ -424,7 +425,7 @@ impl EagerSnapshot {
424425
.collect::<DeltaResult<Vec<_>>>()?;
425426
Ok(Self {
426427
snapshot,
427-
files,
428+
files: Some(files),
428429
tracked_actions: Default::default(),
429430
transactions: None,
430431
})
@@ -458,12 +459,13 @@ impl EagerSnapshot {
458459

459460
let mut schema_actions: HashSet<_> =
460461
visitors.iter().flat_map(|v| v.required_actions()).collect();
462+
let require_files = self.files.is_some();
461463
let files = std::mem::take(&mut self.files);
462464

463465
schema_actions.insert(ActionType::Add);
464466
let checkpoint_stream = if new_slice.checkpoint_files.is_empty() {
465-
// NOTE: we don't need to add the visitor relevant data here, as it is repüresented in the state already
466-
futures::stream::iter(files.into_iter().map(Ok)).boxed()
467+
// NOTE: we don't need to add the visitor relevant data here, as it is represented in the state already
468+
futures::stream::iter(files.unwrap_or_default().into_iter().map(Ok)).boxed()
467469
} else {
468470
let read_schema =
469471
StructType::new(schema_actions.iter().map(|a| a.schema_field().clone()));
@@ -478,13 +480,19 @@ impl EagerSnapshot {
478480

479481
let mapper = LogMapper::try_new(&self.snapshot, None)?;
480482

481-
let files =
482-
ReplayStream::try_new(log_stream, checkpoint_stream, &self.snapshot, &mut visitors)?
483-
.map(|batch| batch.and_then(|b| mapper.map_batch(b)))
484-
.try_collect()
485-
.await?;
483+
if require_files {
484+
let files = ReplayStream::try_new(
485+
log_stream,
486+
checkpoint_stream,
487+
&self.snapshot,
488+
&mut visitors,
489+
)?
490+
.map(|batch| batch.and_then(|b| mapper.map_batch(b)))
491+
.try_collect()
492+
.await?;
486493

487-
self.files = files;
494+
self.files = Some(files);
495+
}
488496
self.process_visitors(visitors)?;
489497

490498
Ok(())
@@ -540,17 +548,31 @@ impl EagerSnapshot {
540548

541549
/// Get a [`LogDataHandler`] for the snapshot to inspect the currently loaded state of the log.
542550
pub fn log_data(&self) -> LogDataHandler<'_> {
543-
LogDataHandler::new(&self.files, self.metadata(), self.schema())
551+
static EMPTY: Vec<RecordBatch> = vec![];
552+
LogDataHandler::new(
553+
&self.files.as_ref().unwrap_or(&EMPTY),
554+
self.metadata(),
555+
self.schema(),
556+
)
557+
}
558+
559+
/// Iterate over tracked `&RecordBatch`, if any.
560+
fn files_iter(&self) -> impl Iterator<Item = &RecordBatch> {
561+
if let Some(ref files) = self.files {
562+
Either::Left(files.iter())
563+
} else {
564+
Either::Right(std::iter::empty())
565+
}
544566
}
545567

546568
/// Get the number of files in the snapshot
547569
pub fn files_count(&self) -> usize {
548-
self.files.iter().map(|f| f.num_rows()).sum()
570+
self.files_iter().map(|f| f.num_rows()).sum()
549571
}
550572

551573
/// Get the files in the snapshot
552574
pub fn file_actions(&self) -> DeltaResult<impl Iterator<Item = Add> + '_> {
553-
Ok(self.files.iter().flat_map(|b| read_adds(b)).flatten())
575+
Ok(self.files_iter().flat_map(|b| read_adds(b)).flatten())
554576
}
555577

556578
/// Get a file action iterator for the given version
@@ -560,7 +582,7 @@ impl EagerSnapshot {
560582

561583
/// Get an iterator for the CDC files added in this version
562584
pub fn cdc_files(&self) -> DeltaResult<impl Iterator<Item = AddCDCFile> + '_> {
563-
Ok(self.files.iter().flat_map(|b| read_cdf_adds(b)).flatten())
585+
Ok(self.files_iter().flat_map(|b| read_cdf_adds(b)).flatten())
564586
}
565587

566588
/// Iterate over all latest app transactions
@@ -634,15 +656,18 @@ impl EagerSnapshot {
634656
LogMapper::try_new(&self.snapshot, None)?
635657
};
636658

637-
self.files = files
638-
.into_iter()
639-
.chain(
640-
self.files
641-
.iter()
642-
.flat_map(|batch| scanner.process_files_batch(batch, false)),
643-
)
644-
.map(|b| mapper.map_batch(b))
645-
.collect::<DeltaResult<Vec<_>>>()?;
659+
if self.files.is_some() {
660+
self.files = Some(
661+
files
662+
.into_iter()
663+
.chain(
664+
self.files_iter()
665+
.flat_map(|batch| scanner.process_files_batch(batch, false)),
666+
)
667+
.map(|b| mapper.map_batch(b))
668+
.collect::<DeltaResult<Vec<_>>>()?,
669+
);
670+
}
646671

647672
if let Some(metadata) = metadata {
648673
self.snapshot.metadata = metadata;

crates/core/src/kernel/snapshot/serde.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ impl Serialize for EagerSnapshot {
128128
seq.serialize_element(&self.snapshot)?;
129129
seq.serialize_element(&self.tracked_actions)?;
130130
seq.serialize_element(&self.transactions)?;
131-
for batch in self.files.iter() {
131+
for batch in self.files_iter() {
132132
let mut buffer = vec![];
133133
let mut writer = FileWriter::try_new(&mut buffer, batch.schema().as_ref())
134134
.map_err(serde::ser::Error::custom)?;
@@ -176,7 +176,7 @@ impl<'de> Visitor<'de> for EagerSnapshotVisitor {
176176
}
177177
Ok(EagerSnapshot {
178178
snapshot,
179-
files,
179+
files: Some(files),
180180
tracked_actions,
181181
transactions,
182182
})

0 commit comments

Comments
 (0)