Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ pub mod error;
pub mod expressions;
mod log_compaction;
mod log_path;
mod log_reader;
pub mod scan;
pub mod schema;
pub mod snapshot;
Expand Down
133 changes: 133 additions & 0 deletions kernel/src/log_reader/commit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
//! Commit phase for log replay - processes JSON commit files.

use std::sync::Arc;

Check failure on line 3 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `std::sync::Arc`

Check failure on line 3 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / run-examples

unused import: `std::sync::Arc`

Check failure on line 3 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / msrv-run-tests

unused import: `std::sync::Arc`

Check failure on line 3 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / coverage

unused import: `std::sync::Arc`

Check failure on line 3 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest)

unused import: `std::sync::Arc`

Check failure on line 3 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / test (macOS-latest)

unused import: `std::sync::Arc`

Check failure on line 3 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / arrow_integration_test (ubuntu-latest)

unused import: `std::sync::Arc`

Check failure on line 3 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / arrow_integration_test (macOS-latest)

unused import: `std::sync::Arc`

Check failure on line 3 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / docs

unused import: `std::sync::Arc`

use crate::actions::{get_commit_schema, ADD_NAME, REMOVE_NAME};

Check failure on line 5 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused imports: `ADD_NAME`, `REMOVE_NAME`, and `get_commit_schema`

Check failure on line 5 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / run-examples

unused imports: `ADD_NAME`, `REMOVE_NAME`, and `get_commit_schema`

Check failure on line 5 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / msrv-run-tests

unused imports: `ADD_NAME`, `REMOVE_NAME`, and `get_commit_schema`

Check failure on line 5 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / coverage

unused imports: `ADD_NAME`, `REMOVE_NAME`, and `get_commit_schema`

Check failure on line 5 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest)

unused imports: `ADD_NAME`, `REMOVE_NAME`, and `get_commit_schema`

Check failure on line 5 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / test (macOS-latest)

unused imports: `ADD_NAME`, `REMOVE_NAME`, and `get_commit_schema`

Check failure on line 5 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / arrow_integration_test (ubuntu-latest)

unused imports: `ADD_NAME`, `REMOVE_NAME`, and `get_commit_schema`

Check failure on line 5 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / arrow_integration_test (macOS-latest)

unused imports: `ADD_NAME`, `REMOVE_NAME`, and `get_commit_schema`

Check failure on line 5 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / docs

unused imports: `ADD_NAME`, `REMOVE_NAME`, and `get_commit_schema`
use crate::log_replay::ActionsBatch;
use crate::log_segment::LogSegment;
use crate::schema::SchemaRef;
use crate::{DeltaResult, Engine};

/// Phase that processes JSON commit files.
pub(crate) struct CommitReader {
actions: Box<dyn Iterator<Item = DeltaResult<ActionsBatch>> + Send>,
}

impl CommitReader {
/// Create a new commit phase from a log segment.
///
/// # Parameters
/// - `log_segment`: The log segment to process
/// - `engine`: Engine for reading files
pub(crate) fn try_new(
engine: &dyn Engine,
log_segment: &LogSegment,
schema: SchemaRef,
) -> DeltaResult<Self> {
let commit_files = log_segment.find_commit_cover();
let actions = engine
.json_handler()
.read_json_files(&commit_files, schema, None)?
.map(|batch| batch.map(|b| ActionsBatch::new(b, true)));

Ok(Self {
actions: Box::new(actions),
})
}
}

impl Iterator for CommitReader {
type Item = DeltaResult<ActionsBatch>;

fn next(&mut self) -> Option<Self::Item> {
self.actions.next()
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::engine::default::executor::tokio::TokioBackgroundExecutor;
use crate::engine::default::DefaultEngine;
use crate::log_replay::LogReplayProcessor;
use crate::scan::log_replay::ScanLogReplayProcessor;
use crate::scan::state_info::StateInfo;
use crate::scan::COMMIT_READ_SCHEMA;
use object_store::local::LocalFileSystem;
use std::path::PathBuf;
use std::sync::Arc as StdArc;

fn load_test_table(
table_name: &str,
) -> DeltaResult<(
StdArc<DefaultEngine<TokioBackgroundExecutor>>,
StdArc<crate::Snapshot>,
url::Url,
)> {
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push("tests/data");
path.push(table_name);

let path = std::fs::canonicalize(path)
.map_err(|e| crate::Error::Generic(format!("Failed to canonicalize path: {}", e)))?;

let url = url::Url::from_directory_path(path)
.map_err(|_| crate::Error::Generic("Failed to create URL from path".to_string()))?;

let store = StdArc::new(LocalFileSystem::new());
let engine = StdArc::new(DefaultEngine::new(store));
let snapshot = crate::Snapshot::builder_for(url.clone()).build(engine.as_ref())?;

Ok((engine, snapshot, url))
}

#[test]
fn test_commit_phase_processes_commits() -> DeltaResult<()> {
let (engine, snapshot, _url) = load_test_table("table-without-dv-small")?;
let log_segment = StdArc::new(snapshot.log_segment().clone());

let state_info = StdArc::new(StateInfo::try_new(
snapshot.schema(),
snapshot.table_configuration(),
None,
(),
)?);

let mut processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info)?;
let schema = COMMIT_READ_SCHEMA.clone();
let mut commit_phase = CommitReader::try_new(engine.as_ref(), &log_segment, schema)?;

let mut batch_count = 0;
let mut file_paths = Vec::new();

for result in commit_phase {
let batch = result?;
let metadata = processor.process_actions_batch(batch)?;
let paths = metadata.visit_scan_files(
vec![],
|ps: &mut Vec<String>, path, _, _, _, _, _| {
ps.push(path.to_string());
},
)?;
file_paths.extend(paths);
batch_count += 1;
}

// table-without-dv-small has exactly 1 commit file
assert_eq!(
batch_count, 1,
"table-without-dv-small should have exactly 1 commit batch"
);

// table-without-dv-small has exactly 1 add file
file_paths.sort();
let expected_files =
vec!["part-00000-517f5d32-9c95-48e8-82b4-0229cc194867-c000.snappy.parquet"];
assert_eq!(
file_paths, expected_files,
"CommitReader should find exactly the expected file"
);

Ok(())
}
}
183 changes: 183 additions & 0 deletions kernel/src/log_reader/leaf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
//! Sidecar phase for log replay - processes sidecar/leaf parquet files.

use std::sync::Arc;

use crate::log_replay::ActionsBatch;
use crate::schema::SchemaRef;
use crate::{DeltaResult, Engine, FileMeta};

/// Phase that processes sidecar or leaf parquet files.
///
/// This phase is distributable - you can partition `files` and create multiple
/// instances on different executors.
#[allow(unused)]
pub(crate) struct LeafCheckpointReader {
actions: Box<dyn Iterator<Item = DeltaResult<ActionsBatch>> + Send>,
}

impl LeafCheckpointReader {
/// Create a new sidecar phase from file list.
///
/// # Distributability
///
/// This phase is designed to be distributable. To distribute:
/// 1. Partition `files` across N executors
/// 2. Create N `LeafCheckpointReader` instances, one per executor with its file partition
Comment on lines +21 to +25
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like an odd place to bring up distributability? The log replay phase that consumes this reader is the thing getting distributed, and which needs the careful choreography? Seems like this reader is just a detail in that bigger picture.

///
/// # Parameters
/// - `files`: Sidecar/leaf files to process
/// - `engine`: Engine for reading files
/// - `schema`: Schema to use when reading sidecar files (projected based on processor requirements)
#[allow(unused)]
pub(crate) fn new(
files: Vec<FileMeta>,
engine: Arc<dyn Engine>,
schema: SchemaRef,
) -> DeltaResult<Self> {
let actions = engine
.parquet_handler()
.read_parquet_files(&files, schema, None)?
.map(|batch| batch.map(|b| ActionsBatch::new(b, false)));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we're splitting up the phases instead of chaining everything into one big iterator, we should probably reconsider whether we still need these (batch, bool) pairs -- I suspect that whoever invokes the dedup visitor will now know, structurally, whether it's a log or checkpoint batch, and can just pass true or false accordingly?


Ok(Self {
actions: Box::new(actions),
})
}
}

impl Iterator for LeafCheckpointReader {
type Item = DeltaResult<ActionsBatch>;

fn next(&mut self) -> Option<Self::Item> {
self.actions.next()
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::actions::{get_commit_schema, ADD_NAME};
use crate::engine::default::DefaultEngine;
use crate::log_reader::manifest::{AfterManifest, ManifestPhase};
use crate::log_replay::LogReplayProcessor;
use crate::scan::log_replay::ScanLogReplayProcessor;
use crate::scan::state_info::StateInfo;
use crate::{Error, Snapshot, SnapshotRef};
use object_store::local::LocalFileSystem;
use std::sync::Arc;
use tempfile::TempDir;
use url::Url;

fn load_test_table(
table_name: &str,
) -> DeltaResult<(Arc<dyn Engine>, SnapshotRef, Url, TempDir)> {
let test_dir = test_utils::load_test_data("tests/data", table_name)
.map_err(|e| Error::generic(format!("Failed to load test data: {}", e)))?;
let test_path = test_dir.path().join(table_name);

let url = url::Url::from_directory_path(&test_path)
.map_err(|_| Error::generic("Failed to create URL from path"))?;

let store = Arc::new(LocalFileSystem::new());
let engine = Arc::new(DefaultEngine::new(store));
let snapshot = Snapshot::builder_for(url.clone()).build(engine.as_ref())?;

Ok((engine, snapshot, url, test_dir))
}

#[test]
fn test_sidecar_phase_processes_files() -> DeltaResult<()> {
let (engine, snapshot, _table_root, _tempdir) =
load_test_table("v2-checkpoints-json-with-sidecars")?;
let log_segment = snapshot.log_segment();

let state_info = Arc::new(StateInfo::try_new(
snapshot.schema(),
snapshot.table_configuration(),
None,
(),
)?);

let mut processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info)?;

// First we need to run through manifest phase to get the sidecar files
if log_segment.checkpoint_parts.is_empty() {
println!("Test table has no checkpoint parts, skipping");
return Ok(());
}
Comment on lines +104 to +107
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems very strange to have in a test?? Either the test case has a checkpoint or it doesn't.

And in this case it should have a checkpoint, and panicking at L110 below is a perfectly reasonable test failure mode if the checkpoint somehow went missing.


// Get the first checkpoint part
let checkpoint_file = &log_segment.checkpoint_parts[0];
let manifest_file = checkpoint_file.location.clone();

let mut manifest_phase =
ManifestPhase::new(manifest_file, log_segment.log_root.clone(), engine.clone())?;

// Drain manifest phase and apply processor
for batch in manifest_phase.by_ref() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: TIL that Iterator::by_ref is a thing!

let batch = batch?;
processor.process_actions_batch(batch)?;
Comment on lines +118 to +119
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
let batch = batch?;
processor.process_actions_batch(batch)?;
processor.process_actions_batch(batch?)?;

(not sure which way is more readable?)

}

let after_manifest = manifest_phase.finalize()?;

match after_manifest {
AfterManifest::Sidecars { sidecars } => {
println!("Testing with {} sidecar files", sidecars.len());

let schema = get_commit_schema().project(&[ADD_NAME])?;

let mut sidecar_phase =
LeafCheckpointReader::new(sidecars, engine.clone(), schema)?;

let mut sidecar_file_paths = Vec::new();
let mut batch_count = 0;

while let Some(result) = sidecar_phase.next() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this just

Suggested change
while let Some(result) = sidecar_phase.next() {
for result in sidecar_phase {

let batch = result?;
let metadata = processor.process_actions_batch(batch)?;
let paths = metadata.visit_scan_files(
vec![],
|ps: &mut Vec<String>, path, _, _, _, _, _| {
ps.push(path.to_string());
},
)?;
sidecar_file_paths.extend(paths);
batch_count += 1;
}

sidecar_file_paths.sort();

// v2-checkpoints-json-with-sidecars has exactly 2 sidecar files with 101 total files
assert_eq!(
batch_count, 2,
"LeafCheckpointReader should process exactly 2 sidecar batches"
);

assert_eq!(
sidecar_file_paths.len(),
101,
"LeafCheckpointReader should find exactly 101 files from sidecars"
);

// Verify first few files match expected (sampling to keep test readable)
let expected_first_files = vec![
"test%25file%25prefix-part-00000-01086c52-1b86-48d0-8889-517fe626849d-c000.snappy.parquet",
"test%25file%25prefix-part-00000-0fd71c0e-fd08-4685-87d6-aae77532d3ea-c000.snappy.parquet",
"test%25file%25prefix-part-00000-2710dd7f-9fa5-429d-b3fb-c005ba16e062-c000.snappy.parquet",
];

assert_eq!(
&sidecar_file_paths[..3],
&expected_first_files[..],
"LeafCheckpointReader should process files in expected order"
);
}
AfterManifest::Done => {
println!("No sidecars found - test inconclusive");
}
}

Ok(())
}
}
Loading
Loading