Skip to content

Commit cb672ac

Browse files
authored
feat: allow for lazy loading files in operations (#3872)
# Description Thus far we had a bunch of `unwraps` in our `DeltaOps` that would panic for a missing `EagerSnapshot`. In this PR we handle missing snapshot by resolving the options inside an operations future. This also made it quite simple to re-load files in case we need them but `require_files=false`. This effectively allows lazy laoding the files while we still default to eagerly loading the data. The PR does look quite large, but the patterns are very repetitive. * make eager snapshot optional on builders * resolve snapshot at beginning of operation future To safeguard against trying to execute operations without files loaded, we pushed the respective check on the delta config into the methods accessing the files. Signed-off-by: Robert Pack <[email protected]>
1 parent 7b98e28 commit cb672ac

29 files changed

+354
-323
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
8383
num_cpus = { version = "1" }
8484

8585
[workspace.metadata.typos]
86-
files.extend-exclude = ["CHANGELOG.md"]
86+
files.extend-exclude = ["CHANGELOG.md", "crates/benchmarks/queries/tpcds/*.sql"]
8787
default.extend-ignore-re = [
8888
# Custom ignore regex patterns: https://github.com/crate-ci/typos/blob/master/docs/reference.md#example-configurations
8989
"(?s)//\\s*spellchecker:ignore-next-line[^\\n]*\\n[^\\n]*",

crates/core/src/delta_datafusion/cdf/scan.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,15 @@ pub struct DeltaCdfTableProvider {
2828
impl DeltaCdfTableProvider {
2929
/// Build a DeltaCDFTableProvider
3030
pub fn try_new(cdf_builder: CdfLoadBuilder) -> DeltaResult<Self> {
31-
let mut fields = cdf_builder.snapshot.input_schema().fields().to_vec();
31+
let mut fields = cdf_builder
32+
.snapshot
33+
.as_ref()
34+
.ok_or(DeltaTableError::generic(
35+
"expected initialized snapshot for DeltaCdfTableProvider",
36+
))?
37+
.input_schema()
38+
.fields()
39+
.to_vec();
3240
for f in ADD_PARTITION_SCHEMA.clone() {
3341
fields.push(f.into());
3442
}

crates/core/src/delta_datafusion/table_provider.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -626,7 +626,7 @@ impl<'a> DeltaScanBuilder<'a> {
626626
let mut pruned_batches = Vec::new();
627627
let mut mask_offset = 0;
628628

629-
for batch in &self.snapshot.files {
629+
for batch in self.snapshot.files()? {
630630
let batch_size = batch.num_rows();
631631
let batch_mask = &mask[mask_offset..mask_offset + batch_size];
632632
let batch_mask_array = BooleanArray::from(batch_mask.to_vec());

crates/core/src/kernel/snapshot/mod.rs

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,25 @@ impl Snapshot {
463463
pub struct EagerSnapshot {
464464
snapshot: Snapshot,
465465
// logical files in the snapshot
466-
pub(crate) files: Vec<RecordBatch>,
466+
files: Vec<RecordBatch>,
467+
}
468+
469+
pub(crate) async fn resolve_snapshot(
470+
log_store: &dyn LogStore,
471+
maybe_snapshot: Option<EagerSnapshot>,
472+
require_files: bool,
473+
) -> DeltaResult<EagerSnapshot> {
474+
if let Some(snapshot) = maybe_snapshot {
475+
if require_files {
476+
snapshot.with_files(log_store).await
477+
} else {
478+
Ok(snapshot)
479+
}
480+
} else {
481+
let mut config = DeltaTableConfig::default();
482+
config.require_files = require_files;
483+
EagerSnapshot::try_new(log_store, config, None).await
484+
}
467485
}
468486

469487
impl EagerSnapshot {
@@ -474,15 +492,36 @@ impl EagerSnapshot {
474492
version: Option<i64>,
475493
) -> DeltaResult<Self> {
476494
let snapshot = Snapshot::try_new(log_store, config.clone(), version).await?;
495+
Self::try_new_with_snapshot(log_store, snapshot).await
496+
}
477497

478-
let files = match config.require_files {
498+
pub(crate) async fn try_new_with_snapshot(
499+
log_store: &dyn LogStore,
500+
snapshot: Snapshot,
501+
) -> DeltaResult<Self> {
502+
let files = match snapshot.load_config().require_files {
479503
true => snapshot.files(log_store, None).try_collect().await?,
480504
false => vec![],
481505
};
482-
483506
Ok(Self { snapshot, files })
484507
}
485508

509+
pub(crate) async fn with_files(mut self, log_store: &dyn LogStore) -> DeltaResult<Self> {
510+
if self.snapshot.config.require_files {
511+
return Ok(self);
512+
}
513+
self.snapshot.config.require_files = true;
514+
Self::try_new_with_snapshot(log_store, self.snapshot).await
515+
}
516+
517+
pub(crate) fn files(&self) -> DeltaResult<&[RecordBatch]> {
518+
if self.snapshot.config.require_files {
519+
Ok(&self.files)
520+
} else {
521+
Err(DeltaTableError::NotInitializedWithFiles("files".into()))
522+
}
523+
}
524+
486525
/// Update the snapshot to the given version
487526
pub(crate) async fn update(
488527
&mut self,
@@ -588,6 +627,12 @@ impl EagerSnapshot {
588627
log_store: &dyn LogStore,
589628
predicate: Option<PredicateRef>,
590629
) -> BoxStream<'_, DeltaResult<LogicalFileView>> {
630+
if !self.snapshot.load_config().require_files {
631+
return Box::pin(once(ready(Err(DeltaTableError::NotInitializedWithFiles(
632+
"file_views".into(),
633+
)))));
634+
}
635+
591636
self.snapshot
592637
.files_from(
593638
log_store,

crates/core/src/operations/add_column.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,17 @@ use itertools::Itertools;
99
use super::{CustomExecuteHandler, Operation};
1010
use crate::kernel::schema::merge_delta_struct;
1111
use crate::kernel::transaction::{CommitBuilder, CommitProperties};
12-
use crate::kernel::{EagerSnapshot, MetadataExt, ProtocolExt as _, StructField, StructTypeExt};
12+
use crate::kernel::{
13+
resolve_snapshot, EagerSnapshot, MetadataExt, ProtocolExt as _, StructField, StructTypeExt,
14+
};
1315
use crate::logstore::LogStoreRef;
1416
use crate::protocol::DeltaOperation;
1517
use crate::{DeltaResult, DeltaTable, DeltaTableError};
1618

1719
/// Add new columns and/or nested fields to a table
1820
pub struct AddColumnBuilder {
1921
/// A snapshot of the table's state
20-
snapshot: EagerSnapshot,
22+
snapshot: Option<EagerSnapshot>,
2123
/// Fields to add/merge into schema
2224
fields: Option<Vec<StructField>>,
2325
/// Delta object store for handling data files
@@ -27,7 +29,7 @@ pub struct AddColumnBuilder {
2729
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
2830
}
2931

30-
impl Operation<()> for AddColumnBuilder {
32+
impl Operation for AddColumnBuilder {
3133
fn log_store(&self) -> &LogStoreRef {
3234
&self.log_store
3335
}
@@ -38,7 +40,7 @@ impl Operation<()> for AddColumnBuilder {
3840

3941
impl AddColumnBuilder {
4042
/// Create a new builder
41-
pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self {
43+
pub(crate) fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
4244
Self {
4345
snapshot,
4446
log_store,
@@ -75,7 +77,9 @@ impl std::future::IntoFuture for AddColumnBuilder {
7577
let this = self;
7678

7779
Box::pin(async move {
78-
let mut metadata = this.snapshot.metadata().clone();
80+
let snapshot = resolve_snapshot(&this.log_store, this.snapshot.clone(), false).await?;
81+
82+
let mut metadata = snapshot.metadata().clone();
7983
let fields = match this.fields.clone() {
8084
Some(v) => v,
8185
None => return Err(DeltaTableError::Generic("No fields provided".to_string())),
@@ -95,10 +99,10 @@ impl std::future::IntoFuture for AddColumnBuilder {
9599
));
96100
}
97101

98-
let table_schema = this.snapshot.schema();
102+
let table_schema = snapshot.schema();
99103
let new_table_schema = merge_delta_struct(table_schema.as_ref(), fields_right)?;
100104

101-
let current_protocol = this.snapshot.protocol();
105+
let current_protocol = snapshot.protocol();
102106

103107
let new_protocol = current_protocol
104108
.clone()
@@ -121,7 +125,7 @@ impl std::future::IntoFuture for AddColumnBuilder {
121125
.with_actions(actions)
122126
.with_operation_id(operation_id)
123127
.with_post_commit_hook_handler(this.get_custom_execute_handler())
124-
.build(Some(&this.snapshot), this.log_store.clone(), operation)
128+
.build(Some(&snapshot), this.log_store.clone(), operation)
125129
.await?;
126130

127131
this.post_execute(operation_id).await?;

crates/core/src/operations/add_feature.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use itertools::Itertools;
88

99
use super::{CustomExecuteHandler, Operation};
1010
use crate::kernel::transaction::{CommitBuilder, CommitProperties};
11-
use crate::kernel::{EagerSnapshot, ProtocolExt as _, TableFeatures};
11+
use crate::kernel::{resolve_snapshot, EagerSnapshot, ProtocolExt as _, TableFeatures};
1212
use crate::logstore::LogStoreRef;
1313
use crate::protocol::DeltaOperation;
1414
use crate::DeltaTable;
@@ -17,7 +17,7 @@ use crate::{DeltaResult, DeltaTableError};
1717
/// Enable table features for a table
1818
pub struct AddTableFeatureBuilder {
1919
/// A snapshot of the table's state
20-
snapshot: EagerSnapshot,
20+
snapshot: Option<EagerSnapshot>,
2121
/// Name of the feature
2222
name: Vec<TableFeatures>,
2323
/// Allow protocol versions to be increased by setting features
@@ -29,7 +29,7 @@ pub struct AddTableFeatureBuilder {
2929
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
3030
}
3131

32-
impl super::Operation<()> for AddTableFeatureBuilder {
32+
impl super::Operation for AddTableFeatureBuilder {
3333
fn log_store(&self) -> &LogStoreRef {
3434
&self.log_store
3535
}
@@ -40,7 +40,7 @@ impl super::Operation<()> for AddTableFeatureBuilder {
4040

4141
impl AddTableFeatureBuilder {
4242
/// Create a new builder
43-
pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self {
43+
pub(crate) fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
4444
Self {
4545
name: vec![],
4646
allow_protocol_versions_increase: false,
@@ -92,6 +92,8 @@ impl std::future::IntoFuture for AddTableFeatureBuilder {
9292
let this = self;
9393

9494
Box::pin(async move {
95+
let snapshot = resolve_snapshot(&this.log_store, this.snapshot.clone(), false).await?;
96+
9597
let name = if this.name.is_empty() {
9698
return Err(DeltaTableError::Generic("No features provided".to_string()));
9799
} else {
@@ -107,7 +109,7 @@ impl std::future::IntoFuture for AddTableFeatureBuilder {
107109
let reader_features = reader_features.into_iter().flatten().collect_vec();
108110
let writer_features = writer_features.into_iter().flatten().collect_vec();
109111

110-
let mut protocol = this.snapshot.protocol().clone();
112+
let mut protocol = snapshot.protocol().clone();
111113

112114
if !this.allow_protocol_versions_increase {
113115
if !reader_features.is_empty()
@@ -135,7 +137,7 @@ impl std::future::IntoFuture for AddTableFeatureBuilder {
135137
.with_actions(actions)
136138
.with_operation_id(operation_id)
137139
.with_post_commit_hook_handler(this.get_custom_execute_handler())
138-
.build(Some(&this.snapshot), this.log_store.clone(), operation)
140+
.build(Some(&snapshot), this.log_store.clone(), operation)
139141
.await?;
140142

141143
this.post_execute(operation_id).await?;

crates/core/src/operations/constraints.rs

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ use super::{CustomExecuteHandler, Operation};
1515
use crate::delta_datafusion::expr::fmt_expr_to_sql;
1616
use crate::delta_datafusion::{create_session, register_store, DeltaDataChecker, DeltaScanBuilder};
1717
use crate::kernel::transaction::{CommitBuilder, CommitProperties};
18-
use crate::kernel::{EagerSnapshot, MetadataExt, ProtocolExt as _, ProtocolInner};
18+
use crate::kernel::{
19+
resolve_snapshot, EagerSnapshot, MetadataExt, ProtocolExt as _, ProtocolInner,
20+
};
1921
use crate::logstore::LogStoreRef;
2022
use crate::operations::datafusion_utils::Expression;
2123
use crate::protocol::DeltaOperation;
@@ -25,7 +27,7 @@ use crate::{DeltaResult, DeltaTable, DeltaTableError};
2527
/// Build a constraint to add to a table
2628
pub struct ConstraintBuilder {
2729
/// A snapshot of the table's state
28-
snapshot: EagerSnapshot,
30+
snapshot: Option<EagerSnapshot>,
2931
/// Name of the constraint
3032
name: Option<String>,
3133
/// Constraint expression
@@ -39,7 +41,7 @@ pub struct ConstraintBuilder {
3941
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
4042
}
4143

42-
impl super::Operation<()> for ConstraintBuilder {
44+
impl super::Operation for ConstraintBuilder {
4345
fn log_store(&self) -> &LogStoreRef {
4446
&self.log_store
4547
}
@@ -50,7 +52,7 @@ impl super::Operation<()> for ConstraintBuilder {
5052

5153
impl ConstraintBuilder {
5254
/// Create a new builder
53-
pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self {
55+
pub(crate) fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
5456
Self {
5557
name: None,
5658
expr: None,
@@ -101,11 +103,8 @@ impl std::future::IntoFuture for ConstraintBuilder {
101103
let this = self;
102104

103105
Box::pin(async move {
104-
if !this.snapshot.load_config().require_files {
105-
return Err(DeltaTableError::NotInitializedWithFiles(
106-
"ADD CONSTRAINTS".into(),
107-
));
108-
}
106+
let snapshot = resolve_snapshot(&this.log_store, this.snapshot.clone(), true).await?;
107+
109108
let operation_id = this.get_operation_id();
110109
this.pre_execute(operation_id).await?;
111110

@@ -118,7 +117,7 @@ impl std::future::IntoFuture for ConstraintBuilder {
118117
.expr
119118
.ok_or_else(|| DeltaTableError::Generic("No Expression provided".to_string()))?;
120119

121-
let mut metadata = this.snapshot.metadata().clone();
120+
let mut metadata = snapshot.metadata().clone();
122121
let configuration_key = format!("delta.constraints.{name}");
123122

124123
if metadata.configuration().contains_key(&configuration_key) {
@@ -132,10 +131,9 @@ impl std::future::IntoFuture for ConstraintBuilder {
132131
.unwrap_or_else(|| Arc::new(create_session().into_inner().state()));
133132
register_store(this.log_store.clone(), session.runtime_env().as_ref());
134133

135-
let scan =
136-
DeltaScanBuilder::new(&this.snapshot, this.log_store.clone(), session.as_ref())
137-
.build()
138-
.await?;
134+
let scan = DeltaScanBuilder::new(&snapshot, this.log_store.clone(), session.as_ref())
135+
.build()
136+
.await?;
139137

140138
let schema = scan.schema().to_dfschema()?;
141139
let expr = into_expr(expr, &schema, session.as_ref())?;
@@ -175,7 +173,7 @@ impl std::future::IntoFuture for ConstraintBuilder {
175173
metadata =
176174
metadata.add_config_key(format!("delta.constraints.{name}"), expr_str.clone())?;
177175

178-
let old_protocol = this.snapshot.protocol();
176+
let old_protocol = snapshot.protocol();
179177
let protocol = ProtocolInner {
180178
min_reader_version: if old_protocol.min_reader_version() > 1 {
181179
old_protocol.min_reader_version()
@@ -213,7 +211,7 @@ impl std::future::IntoFuture for ConstraintBuilder {
213211
.with_actions(actions)
214212
.with_operation_id(operation_id)
215213
.with_post_commit_hook_handler(this.custom_execute_handler.clone())
216-
.build(Some(&this.snapshot), this.log_store.clone(), operation)
214+
.build(Some(&snapshot), this.log_store.clone(), operation)
217215
.await?;
218216

219217
if let Some(handler) = this.custom_execute_handler {

crates/core/src/operations/convert_to_delta.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ impl Default for ConvertToDeltaBuilder {
168168
}
169169
}
170170

171-
impl super::Operation<()> for ConvertToDeltaBuilder {
171+
impl super::Operation for ConvertToDeltaBuilder {
172172
fn log_store(&self) -> &LogStoreRef {
173173
self.log_store
174174
.as_ref()

crates/core/src/operations/create.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ pub struct CreateBuilder {
6868
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
6969
}
7070

71-
impl super::Operation<()> for CreateBuilder {
71+
impl super::Operation for CreateBuilder {
7272
fn log_store(&self) -> &LogStoreRef {
7373
self.log_store
7474
.as_ref()

0 commit comments

Comments
 (0)