Skip to content

Commit 497a70b

Browse files
committed
fix: ensure a consistent RecordBatch schema is used during checkpoint writes
Since the kernel is just spitting back whatever data it had read verbatim, we are now assuming that our first batch emitted for the checkpoint is "correct" and then casting everything thereafter This also drops the use of the Table API which @zachschuermann is removing in the next release of kernel Fixes #3527 Signed-off-by: R. Tyler Croy <[email protected]>
1 parent b0a231c commit 497a70b

File tree

1 file changed

+24
-7
lines changed

1 file changed

+24
-7
lines changed

crates/core/src/protocol/checkpoints.rs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ use arrow_array::{BooleanArray, RecordBatch};
77
use chrono::Utc;
88
use delta_kernel::engine::arrow_data::ArrowEngineData;
99
use delta_kernel::engine_data::FilteredEngineData;
10-
use delta_kernel::snapshot::LastCheckpointHint;
11-
use delta_kernel::{FileMeta, Table};
10+
use delta_kernel::snapshot::{LastCheckpointHint, Snapshot};
11+
use delta_kernel::FileMeta;
1212
use futures::{StreamExt, TryStreamExt};
1313
use object_store::path::Path;
1414
use object_store::{Error, ObjectStore};
@@ -37,12 +37,11 @@ pub(crate) async fn create_checkpoint_for(
3737
};
3838
let engine = log_store.engine(operation_id).await;
3939

40-
let table = Table::try_from_uri(table_root)?;
41-
4240
let task_engine = engine.clone();
43-
let snapshot = spawn_blocking(move || table.snapshot(task_engine.as_ref(), Some(version)))
44-
.await
45-
.map_err(|e| DeltaTableError::Generic(e.to_string()))??;
41+
let snapshot =
42+
spawn_blocking(move || Snapshot::try_new(table_root, task_engine.as_ref(), Some(version)))
43+
.await
44+
.map_err(|e| DeltaTableError::Generic(e.to_string()))??;
4645
let snapshot = Arc::new(snapshot);
4746

4847
let cp_writer = snapshot.checkpoint()?;
@@ -65,6 +64,13 @@ pub(crate) async fn create_checkpoint_for(
6564
let mut writer = AsyncArrowWriter::try_new(object_store_writer, first_batch.schema(), None)?;
6665
writer.write(&first_batch).await?;
6766

67+
// Hold onto the schema used for future batches.
68+
// This ensures that each batch is consistent since the kernel will yeet back the data that it
69+
// read from prior checkpoints regardless of whether they are identical in schema.
70+
//
71+
// See: <https://github.com/delta-io/delta-rs/issues/3527>!
72+
let checkpoint_schema = first_batch.schema();
73+
6874
let mut current_batch;
6975
loop {
7076
(current_batch, cp_data) = spawn_blocking(move || {
@@ -79,6 +85,17 @@ pub(crate) async fn create_checkpoint_for(
7985
let Some(batch) = current_batch else {
8086
break;
8187
};
88+
89+
// If the subsequently yielded batches do not match the first batch written for whatever
90+
// reason, attempt to safely cast the batches to ensure a coherent checkpoint parquet file
91+
//
92+
// See also: <https://github.com/delta-io/delta-rs/issues/3527>
93+
let batch = if batch.schema() != checkpoint_schema {
94+
crate::cast_record_batch(&batch, checkpoint_schema.clone(), true, true)?
95+
} else {
96+
batch
97+
};
98+
8299
writer.write(&batch).await?;
83100
}
84101

0 commit comments

Comments
 (0)