Skip to content

Commit 062a0fe

Browse files
scovichnicklan
andauthored
refactor: harmonize checkpoint and log compaction iterators (#1436)
## What changes are proposed in this pull request? Log replay for checkpoint writes and log compaction writes uses physically separate but logically identical iterator implementations. Eliminate the duplication by creating one iterator type that both can use. ## Breaking changes The two replaced iterators were pub ## How was this change tested? Existing unit tests. --------- Co-authored-by: Nick Lanham <[email protected]>
1 parent 561c7e1 commit 062a0fe

File tree

8 files changed

+121
-140
lines changed

8 files changed

+121
-140
lines changed

kernel/src/action_reconciliation/log_replay.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@ pub(crate) struct ActionReconciliationProcessor {
6464
///
6565
/// It contains the filtered batch of actions to be included, along with statistics about the
6666
/// number of actions filtered for inclusion.
67+
///
68+
/// # Warning
69+
///
70+
/// This iterator must be fully consumed to ensure proper collection of statistics. Additionally,
71+
/// all yielded data must be written to the specified path before e.g. calling
72+
/// [`CheckpointWriter::finalize`]. Failing to do so may result in data loss or corruption.
6773
pub(crate) struct ActionReconciliationBatch {
6874
/// The filtered batch of actions.
6975
pub(crate) filtered_data: FilteredEngineData,
@@ -79,6 +85,81 @@ impl HasSelectionVector for ActionReconciliationBatch {
7985
}
8086
}
8187

88+
/// Iterator over action reconciliation data.
89+
///
90+
/// This iterator yields a stream of [`FilteredEngineData`] items while, tracking action
91+
/// counts. Used by both checkpoint and log compaction workflows.
92+
pub struct ActionReconciliationIterator {
93+
inner: Box<dyn Iterator<Item = DeltaResult<ActionReconciliationBatch>> + Send>,
94+
actions_count: i64,
95+
add_actions_count: i64,
96+
is_exhausted: bool,
97+
}
98+
99+
impl ActionReconciliationIterator {
100+
/// Create a new iterator with counters initialized to 0
101+
pub(crate) fn new(
102+
inner: Box<dyn Iterator<Item = DeltaResult<ActionReconciliationBatch>> + Send>,
103+
) -> Self {
104+
Self {
105+
inner,
106+
actions_count: 0,
107+
add_actions_count: 0,
108+
is_exhausted: false,
109+
}
110+
}
111+
112+
/// True if this iterator has been exhausted (ie all batches have been processed)
113+
pub(crate) fn is_exhausted(&self) -> bool {
114+
self.is_exhausted
115+
}
116+
117+
/// Get the total number of actions processed so far
118+
pub(crate) fn actions_count(&self) -> i64 {
119+
self.actions_count
120+
}
121+
122+
/// Get the total number of add actions processed so far
123+
pub(crate) fn add_actions_count(&self) -> i64 {
124+
self.add_actions_count
125+
}
126+
127+
/// Helper to transform a batch: update metrics and extract filtered data
128+
fn transform_batch(
129+
&mut self,
130+
batch: Option<DeltaResult<ActionReconciliationBatch>>,
131+
) -> Option<DeltaResult<FilteredEngineData>> {
132+
let Some(batch) = batch else {
133+
self.is_exhausted = true;
134+
return None;
135+
};
136+
Some(batch.map(|batch| {
137+
self.actions_count += batch.actions_count;
138+
self.add_actions_count += batch.add_actions_count;
139+
batch.filtered_data
140+
}))
141+
}
142+
}
143+
144+
impl std::fmt::Debug for ActionReconciliationIterator {
145+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146+
f.debug_struct("ActionReconciliationIterator")
147+
.field("actions_count", &self.actions_count)
148+
.field("add_actions_count", &self.add_actions_count)
149+
.field("is_exhausted", &self.is_exhausted)
150+
.finish()
151+
}
152+
}
153+
154+
impl Iterator for ActionReconciliationIterator {
155+
type Item = DeltaResult<FilteredEngineData>;
156+
157+
fn next(&mut self) -> Option<Self::Item> {
158+
let batch = self.inner.next();
159+
self.transform_batch(batch)
160+
}
161+
}
162+
82163
impl LogReplayProcessor for ActionReconciliationProcessor {
83164
type Output = ActionReconciliationBatch;
84165

kernel/src/action_reconciliation/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ use crate::{DeltaResult, Error};
2323

2424
pub(crate) mod log_replay;
2525

26+
pub use log_replay::ActionReconciliationIterator;
27+
2628
const SECONDS_PER_MINUTE: u64 = 60;
2729
const MINUTES_PER_HOUR: u64 = 60;
2830
const HOURS_PER_DAY: u64 = 24;

kernel/src/checkpoint/mod.rs

Lines changed: 17 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
//! ## Architecture
1717
//!
1818
//! - [`CheckpointWriter`] - Core component that manages the checkpoint creation workflow
19-
//! - [`CheckpointDataIterator`] - Iterator over the checkpoint data to be written
19+
//! - [`ActionReconciliationIterator`] - Iterator over the checkpoint data to be written
2020
//!
2121
//! ## Usage
2222
//!
@@ -31,7 +31,7 @@
3131
//!
3232
//! ```no_run
3333
//! # use std::sync::Arc;
34-
//! # use delta_kernel::checkpoint::CheckpointDataIterator;
34+
//! # use delta_kernel::ActionReconciliationIterator;
3535
//! # use delta_kernel::checkpoint::CheckpointWriter;
3636
//! # use delta_kernel::Engine;
3737
//! # use delta_kernel::Snapshot;
@@ -40,7 +40,7 @@
4040
//! # use delta_kernel::Error;
4141
//! # use delta_kernel::FileMeta;
4242
//! # use url::Url;
43-
//! fn write_checkpoint_file(path: Url, data: &CheckpointDataIterator) -> DeltaResult<FileMeta> {
43+
//! fn write_checkpoint_file(path: Url, data: &ActionReconciliationIterator) -> DeltaResult<FileMeta> {
4444
//! todo!() /* engine-specific logic to write data to object storage*/
4545
//! }
4646
//!
@@ -89,7 +89,7 @@ use std::sync::{Arc, LazyLock};
8989
use crate::action_reconciliation::log_replay::{
9090
ActionReconciliationBatch, ActionReconciliationProcessor,
9191
};
92-
use crate::action_reconciliation::RetentionCalculator;
92+
use crate::action_reconciliation::{ActionReconciliationIterator, RetentionCalculator};
9393
use crate::actions::{
9494
Add, Metadata, Protocol, Remove, SetTransaction, Sidecar, ADD_NAME, CHECKPOINT_METADATA_NAME,
9595
METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, SET_TRANSACTION_NAME, SIDECAR_NAME,
@@ -145,43 +145,6 @@ static CHECKPOINT_METADATA_ACTION_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(||
145145
)]))
146146
});
147147

148-
/// An iterator over the checkpoint data to be written to the file.
149-
///
150-
/// This iterator yields filtered checkpoint data batches ([`FilteredEngineData`]) and
151-
/// tracks action statistics required for finalizing the checkpoint.
152-
///
153-
/// # Warning
154-
/// The [`CheckpointDataIterator`] must be fully consumed to ensure proper collection of statistics for
155-
/// the checkpoint. Additionally, all yielded data must be written to the specified path before calling
156-
/// [`CheckpointWriter::finalize`]. Failing to do so may result in data loss or corruption.
157-
pub struct CheckpointDataIterator {
158-
/// The nested iterator that yields checkpoint batches with action counts
159-
checkpoint_batch_iterator:
160-
Box<dyn Iterator<Item = DeltaResult<ActionReconciliationBatch>> + Send>,
161-
/// Running total of actions included in the checkpoint
162-
actions_count: i64,
163-
/// Running total of add actions included in the checkpoint
164-
add_actions_count: i64,
165-
}
166-
167-
impl Iterator for CheckpointDataIterator {
168-
type Item = DeltaResult<FilteredEngineData>;
169-
170-
/// Advances the iterator and returns the next value.
171-
///
172-
/// This implementation transforms the `ActionReconciliationBatch` items from the nested iterator into
173-
/// [`FilteredEngineData`] items for the engine to write, while accumulating action counts from
174-
/// each batch. The [`CheckpointDataIterator`] is passed back to the kernel on call to
175-
/// [`CheckpointWriter::finalize`] for counts to be read and written to the `_last_checkpoint` file
176-
fn next(&mut self) -> Option<Self::Item> {
177-
Some(self.checkpoint_batch_iterator.next()?.map(|batch| {
178-
self.actions_count += batch.actions_count;
179-
self.add_actions_count += batch.add_actions_count;
180-
batch.filtered_data
181-
}))
182-
}
183-
}
184-
185148
/// Orchestrates the process of creating a checkpoint for a table.
186149
///
187150
/// The [`CheckpointWriter`] is the entry point for generating checkpoint data for a Delta table.
@@ -253,7 +216,7 @@ impl CheckpointWriter {
253216
/// # Parameters
254217
/// - `engine`: Implementation of [`Engine`] APIs.
255218
///
256-
/// # Returns: [`CheckpointDataIterator`] containing the checkpoint data
219+
/// # Returns: [`ActionReconciliationIterator`] containing the checkpoint data
257220
// This method is the core of the checkpoint generation process. It:
258221
// 1. Determines whether to write a V1 or V2 checkpoint based on the table's
259222
// `v2Checkpoints` feature support
@@ -262,7 +225,10 @@ impl CheckpointWriter {
262225
// 4. Chains the checkpoint metadata action if writing a V2 spec checkpoint
263226
// (i.e., if `v2Checkpoints` feature is supported by table)
264227
// 5. Generates the appropriate checkpoint path
265-
pub fn checkpoint_data(&self, engine: &dyn Engine) -> DeltaResult<CheckpointDataIterator> {
228+
pub fn checkpoint_data(
229+
&self,
230+
engine: &dyn Engine,
231+
) -> DeltaResult<ActionReconciliationIterator> {
266232
let is_v2_checkpoints_supported = self
267233
.snapshot
268234
.table_configuration()
@@ -284,12 +250,10 @@ impl CheckpointWriter {
284250
let checkpoint_metadata =
285251
is_v2_checkpoints_supported.then(|| self.create_checkpoint_metadata_batch(engine));
286252

287-
// Wrap the iterator in a CheckpointDataIterator to track action counts
288-
Ok(CheckpointDataIterator {
289-
checkpoint_batch_iterator: Box::new(checkpoint_data.chain(checkpoint_metadata)),
290-
actions_count: 0,
291-
add_actions_count: 0,
292-
})
253+
// Wrap the iterator to track action counts
254+
Ok(ActionReconciliationIterator::new(Box::new(
255+
checkpoint_data.chain(checkpoint_metadata),
256+
)))
293257
}
294258

295259
/// Finalizes checkpoint creation by saving metadata about the checkpoint.
@@ -313,10 +277,10 @@ impl CheckpointWriter {
313277
self,
314278
engine: &dyn Engine,
315279
metadata: &FileMeta,
316-
mut checkpoint_data: CheckpointDataIterator,
280+
checkpoint_data: ActionReconciliationIterator,
317281
) -> DeltaResult<()> {
318282
// Ensure the checkpoint data iterator is fully exhausted
319-
if checkpoint_data.checkpoint_batch_iterator.next().is_some() {
283+
if !checkpoint_data.is_exhausted() {
320284
return Err(Error::checkpoint_write(
321285
"The checkpoint data iterator must be fully consumed and written to storage before calling finalize"
322286
));
@@ -332,8 +296,8 @@ impl CheckpointWriter {
332296
let data = create_last_checkpoint_data(
333297
engine,
334298
self.version,
335-
checkpoint_data.actions_count,
336-
checkpoint_data.add_actions_count,
299+
checkpoint_data.actions_count(),
300+
checkpoint_data.add_actions_count(),
337301
size_in_bytes,
338302
);
339303

kernel/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,12 +148,12 @@ pub mod history_manager;
148148
#[cfg(not(feature = "internal-api"))]
149149
pub(crate) mod history_manager;
150150

151-
pub use crate::engine_data::FilteredEngineData;
151+
pub use action_reconciliation::ActionReconciliationIterator;
152152
pub use delta_kernel_derive;
153-
pub use engine_data::{EngineData, RowVisitor};
153+
pub use engine_data::{EngineData, FilteredEngineData, RowVisitor};
154154
pub use error::{DeltaResult, Error};
155155
pub use expressions::{Expression, ExpressionRef, Predicate, PredicateRef};
156-
pub use log_compaction::{should_compact, LogCompactionDataIterator, LogCompactionWriter};
156+
pub use log_compaction::{should_compact, LogCompactionWriter};
157157
pub use snapshot::Snapshot;
158158
pub use snapshot::SnapshotRef;
159159

kernel/src/log_compaction/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@
2727
//!
2828
//! ```no_run
2929
//! # use std::sync::Arc;
30-
//! # use delta_kernel::{LogCompactionDataIterator, LogCompactionWriter};
30+
//! # use delta_kernel::{ActionReconciliationIterator, LogCompactionWriter};
3131
//! # use delta_kernel::{Engine, Snapshot, DeltaResult, Error, FileMeta};
3232
//! # use url::Url;
3333
//!
3434
//! // Engine-specific function to write compaction data
35-
//! fn write_compaction_file(path: &Url, data: LogCompactionDataIterator) -> DeltaResult<FileMeta> {
35+
//! fn write_compaction_file(path: &Url, data: ActionReconciliationIterator) -> DeltaResult<FileMeta> {
3636
//! // In a real implementation, this would write the data to cloud storage
3737
//! todo!("Write data batches to storage at path: {}", path)
3838
//! }
@@ -84,7 +84,7 @@ use crate::schema::{SchemaRef, StructField, StructType, ToSchema as _};
8484

8585
mod writer;
8686

87-
pub use writer::{should_compact, LogCompactionDataIterator, LogCompactionWriter};
87+
pub use writer::{should_compact, LogCompactionWriter};
8888

8989
#[cfg(test)]
9090
mod tests;

kernel/src/log_compaction/tests.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,12 @@ fn test_compaction_data() {
107107
let iterator = result.unwrap();
108108

109109
// Test iterator methods
110-
assert_eq!(iterator.total_actions(), 0);
111-
assert_eq!(iterator.total_add_actions(), 0);
110+
assert_eq!(iterator.actions_count(), 0);
111+
assert_eq!(iterator.add_actions_count(), 0);
112112

113113
// Test debug implementation
114114
let debug_str = format!("{:?}", iterator);
115-
assert!(debug_str.contains("LogCompactionDataIterator"));
115+
assert!(debug_str.contains("ActionReconciliationIterator"));
116116
assert!(debug_str.contains("actions_count"));
117117
assert!(debug_str.contains("add_actions_count"));
118118
}
@@ -152,8 +152,8 @@ fn test_compaction_data_with_actual_iterator() {
152152
let mut iterator = writer.compaction_data(&engine).unwrap();
153153

154154
let mut batch_count = 0;
155-
let initial_actions = iterator.total_actions();
156-
let initial_add_actions = iterator.total_add_actions();
155+
let initial_actions = iterator.actions_count();
156+
let initial_add_actions = iterator.add_actions_count();
157157

158158
// Both should start at 0
159159
assert_eq!(initial_actions, 0);
@@ -164,8 +164,8 @@ fn test_compaction_data_with_actual_iterator() {
164164
assert!(batch_result.is_ok());
165165

166166
// After processing some batches, the counts should be >= the initial counts
167-
assert!(iterator.total_actions() >= initial_actions);
168-
assert!(iterator.total_add_actions() >= initial_add_actions);
167+
assert!(iterator.actions_count() >= initial_actions);
168+
assert!(iterator.add_actions_count() >= initial_add_actions);
169169
}
170170

171171
assert!(batch_count > 0, "Expected to process at least one batch");
@@ -223,8 +223,8 @@ fn test_version_filtering() {
223223
);
224224

225225
let iterator = result.unwrap();
226-
assert!(iterator.total_actions() >= 0);
227-
assert!(iterator.total_add_actions() >= 0);
226+
assert!(iterator.actions_count() >= 0);
227+
assert!(iterator.add_actions_count() >= 0);
228228
}
229229
}
230230

0 commit comments

Comments
 (0)