Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
626 changes: 626 additions & 0 deletions kernel/src/distributed/driver.rs

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions kernel/src/distributed/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub(crate) mod driver;

pub(crate) use driver::{DriverPhase, DriverPhaseResult};

Check failure on line 3 in kernel/src/distributed/mod.rs

View workflow job for this annotation

GitHub Actions / run-examples

unused import: `DriverPhaseResult`

Check failure on line 3 in kernel/src/distributed/mod.rs

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest)

unused import: `DriverPhaseResult`

Check failure on line 3 in kernel/src/distributed/mod.rs

View workflow job for this annotation

GitHub Actions / msrv-run-tests

unused import: `DriverPhaseResult`

Check failure on line 3 in kernel/src/distributed/mod.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `DriverPhaseResult`

Check failure on line 3 in kernel/src/distributed/mod.rs

View workflow job for this annotation

GitHub Actions / coverage

unused import: `DriverPhaseResult`

Check failure on line 3 in kernel/src/distributed/mod.rs

View workflow job for this annotation

GitHub Actions / build (macOS-latest)

unused import: `DriverPhaseResult`

Check failure on line 3 in kernel/src/distributed/mod.rs

View workflow job for this annotation

GitHub Actions / arrow_integration_test (ubuntu-latest)

unused import: `DriverPhaseResult`

Check failure on line 3 in kernel/src/distributed/mod.rs

View workflow job for this annotation

GitHub Actions / arrow_integration_test (macOS-latest)

unused import: `DriverPhaseResult`

Check failure on line 3 in kernel/src/distributed/mod.rs

View workflow job for this annotation

GitHub Actions / docs

unused import: `DriverPhaseResult`
2 changes: 2 additions & 0 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,13 @@ mod action_reconciliation;
pub mod actions;
pub mod checkpoint;
pub mod committer;
mod distributed;
pub mod engine_data;
pub mod error;
pub mod expressions;
mod log_compaction;
mod log_path;
mod log_reader;
pub mod scan;
pub mod schema;
pub mod snapshot;
Expand Down
133 changes: 133 additions & 0 deletions kernel/src/log_reader/commit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
//! Commit phase for log replay - processes JSON commit files.

use std::sync::Arc;

Check failure on line 3 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / run-examples

unused import: `std::sync::Arc`

Check failure on line 3 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest)

unused import: `std::sync::Arc`

Check failure on line 3 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / msrv-run-tests

unused import: `std::sync::Arc`

Check failure on line 3 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `std::sync::Arc`

Check failure on line 3 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / coverage

unused import: `std::sync::Arc`

Check failure on line 3 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / build (macOS-latest)

unused import: `std::sync::Arc`

Check failure on line 3 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / arrow_integration_test (ubuntu-latest)

unused import: `std::sync::Arc`

Check failure on line 3 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / arrow_integration_test (macOS-latest)

unused import: `std::sync::Arc`

Check failure on line 3 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / docs

unused import: `std::sync::Arc`

use crate::actions::{get_commit_schema, ADD_NAME, REMOVE_NAME};

Check failure on line 5 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / run-examples

unused imports: `ADD_NAME`, `REMOVE_NAME`, and `get_commit_schema`

Check failure on line 5 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest)

unused imports: `ADD_NAME`, `REMOVE_NAME`, and `get_commit_schema`

Check failure on line 5 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / msrv-run-tests

unused imports: `ADD_NAME`, `REMOVE_NAME`, and `get_commit_schema`

Check failure on line 5 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused imports: `ADD_NAME`, `REMOVE_NAME`, and `get_commit_schema`

Check failure on line 5 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / coverage

unused imports: `ADD_NAME`, `REMOVE_NAME`, and `get_commit_schema`

Check failure on line 5 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / build (macOS-latest)

unused imports: `ADD_NAME`, `REMOVE_NAME`, and `get_commit_schema`

Check failure on line 5 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / arrow_integration_test (ubuntu-latest)

unused imports: `ADD_NAME`, `REMOVE_NAME`, and `get_commit_schema`

Check failure on line 5 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / arrow_integration_test (macOS-latest)

unused imports: `ADD_NAME`, `REMOVE_NAME`, and `get_commit_schema`

Check failure on line 5 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / docs

unused imports: `ADD_NAME`, `REMOVE_NAME`, and `get_commit_schema`
use crate::log_replay::ActionsBatch;
use crate::log_segment::LogSegment;
use crate::schema::SchemaRef;
use crate::{DeltaResult, Engine};

/// Phase that processes JSON commit files.
pub(crate) struct CommitReader {
actions: Box<dyn Iterator<Item = DeltaResult<ActionsBatch>> + Send>,
}

impl CommitReader {
/// Create a new commit phase from a log segment.
///
/// # Parameters
/// - `log_segment`: The log segment to process
/// - `engine`: Engine for reading files
pub(crate) fn try_new(
engine: &dyn Engine,
log_segment: &LogSegment,
schema: SchemaRef,
) -> DeltaResult<Self> {
let commit_files = log_segment.find_commit_cover();
let actions = engine
.json_handler()
.read_json_files(&commit_files, schema, None)?
.map(|batch| batch.map(|b| ActionsBatch::new(b, true)));

Ok(Self {
actions: Box::new(actions),
})
}
}

impl Iterator for CommitReader {
type Item = DeltaResult<ActionsBatch>;

fn next(&mut self) -> Option<Self::Item> {
self.actions.next()
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::engine::default::executor::tokio::TokioBackgroundExecutor;
use crate::engine::default::DefaultEngine;
use crate::log_replay::LogReplayProcessor;
use crate::scan::log_replay::ScanLogReplayProcessor;
use crate::scan::state_info::StateInfo;
use crate::scan::COMMIT_READ_SCHEMA;
use object_store::local::LocalFileSystem;
use std::path::PathBuf;
use std::sync::Arc as StdArc;

fn load_test_table(
table_name: &str,
) -> DeltaResult<(
StdArc<DefaultEngine<TokioBackgroundExecutor>>,
StdArc<crate::Snapshot>,
url::Url,
)> {
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push("tests/data");
path.push(table_name);

let path = std::fs::canonicalize(path)
.map_err(|e| crate::Error::Generic(format!("Failed to canonicalize path: {}", e)))?;

let url = url::Url::from_directory_path(path)
.map_err(|_| crate::Error::Generic("Failed to create URL from path".to_string()))?;

let store = StdArc::new(LocalFileSystem::new());
let engine = StdArc::new(DefaultEngine::new(store));
let snapshot = crate::Snapshot::builder_for(url.clone()).build(engine.as_ref())?;

Ok((engine, snapshot, url))
}

#[test]
fn test_commit_phase_processes_commits() -> DeltaResult<()> {
let (engine, snapshot, _url) = load_test_table("table-without-dv-small")?;
let log_segment = StdArc::new(snapshot.log_segment().clone());

let state_info = StdArc::new(StateInfo::try_new(
snapshot.schema(),
snapshot.table_configuration(),
None,
(),
)?);

let mut processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info)?;
let schema = COMMIT_READ_SCHEMA.clone();
let mut commit_phase = CommitReader::try_new(engine.as_ref(), &log_segment, schema)?;

let mut batch_count = 0;
let mut file_paths = Vec::new();

for result in commit_phase {
let batch = result?;
let metadata = processor.process_actions_batch(batch)?;
let paths = metadata.visit_scan_files(
vec![],
|ps: &mut Vec<String>, path, _, _, _, _, _| {
ps.push(path.to_string());
},
)?;
file_paths.extend(paths);
batch_count += 1;
}

// table-without-dv-small has exactly 1 commit file
assert_eq!(
batch_count, 1,
"table-without-dv-small should have exactly 1 commit batch"
);

// table-without-dv-small has exactly 1 add file
file_paths.sort();
let expected_files =
vec!["part-00000-517f5d32-9c95-48e8-82b4-0229cc194867-c000.snappy.parquet"];
assert_eq!(
file_paths, expected_files,
"CommitReader should find exactly the expected file"
);

Ok(())
}
}
183 changes: 183 additions & 0 deletions kernel/src/log_reader/leaf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
//! Sidecar phase for log replay - processes sidecar/leaf parquet files.

use std::sync::Arc;

use crate::log_replay::ActionsBatch;
use crate::schema::SchemaRef;
use crate::{DeltaResult, Engine, FileMeta};

/// Phase that processes sidecar or leaf parquet files.
///
/// This phase is distributable - you can partition `files` and create multiple
/// instances on different executors.
#[allow(unused)]
pub(crate) struct LeafCheckpointReader {
actions: Box<dyn Iterator<Item = DeltaResult<ActionsBatch>> + Send>,
}

impl LeafCheckpointReader {
/// Create a new sidecar phase from file list.
///
/// # Distributability
///
/// This phase is designed to be distributable. To distribute:
/// 1. Partition `files` across N executors
/// 2. Create N `LeafCheckpointReader` instances, one per executor with its file partition
///
/// # Parameters
/// - `files`: Sidecar/leaf files to process
/// - `engine`: Engine for reading files
/// - `schema`: Schema to use when reading sidecar files (projected based on processor requirements)
#[allow(unused)]
pub(crate) fn new(
files: Vec<FileMeta>,
engine: Arc<dyn Engine>,
schema: SchemaRef,
) -> DeltaResult<Self> {
let actions = engine
.parquet_handler()
.read_parquet_files(&files, schema, None)?
.map(|batch| batch.map(|b| ActionsBatch::new(b, false)));

Ok(Self {
actions: Box::new(actions),
})
}
}

impl Iterator for LeafCheckpointReader {
type Item = DeltaResult<ActionsBatch>;

fn next(&mut self) -> Option<Self::Item> {
self.actions.next()
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::actions::{get_commit_schema, ADD_NAME};
use crate::engine::default::DefaultEngine;
use crate::log_reader::manifest::{AfterManifest, ManifestPhase};
use crate::log_replay::LogReplayProcessor;
use crate::scan::log_replay::ScanLogReplayProcessor;
use crate::scan::state_info::StateInfo;
use crate::{Error, Snapshot, SnapshotRef};
use object_store::local::LocalFileSystem;
use std::sync::Arc;
use tempfile::TempDir;
use url::Url;

fn load_test_table(
table_name: &str,
) -> DeltaResult<(Arc<dyn Engine>, SnapshotRef, Url, TempDir)> {
let test_dir = test_utils::load_test_data("tests/data", table_name)
.map_err(|e| Error::generic(format!("Failed to load test data: {}", e)))?;
let test_path = test_dir.path().join(table_name);

let url = url::Url::from_directory_path(&test_path)
.map_err(|_| Error::generic("Failed to create URL from path"))?;

let store = Arc::new(LocalFileSystem::new());
let engine = Arc::new(DefaultEngine::new(store));
let snapshot = Snapshot::builder_for(url.clone()).build(engine.as_ref())?;

Ok((engine, snapshot, url, test_dir))
}

#[test]
fn test_sidecar_phase_processes_files() -> DeltaResult<()> {
let (engine, snapshot, _table_root, _tempdir) =
load_test_table("v2-checkpoints-json-with-sidecars")?;
let log_segment = snapshot.log_segment();

let state_info = Arc::new(StateInfo::try_new(
snapshot.schema(),
snapshot.table_configuration(),
None,
(),
)?);

let mut processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info)?;

// First we need to run through manifest phase to get the sidecar files
if log_segment.checkpoint_parts.is_empty() {
println!("Test table has no checkpoint parts, skipping");
return Ok(());
}

// Get the first checkpoint part
let checkpoint_file = &log_segment.checkpoint_parts[0];
let manifest_file = checkpoint_file.location.clone();

let mut manifest_phase =
ManifestPhase::new(manifest_file, log_segment.log_root.clone(), engine.clone())?;

// Drain manifest phase and apply processor
for batch in manifest_phase.by_ref() {
let batch = batch?;
processor.process_actions_batch(batch)?;
}

let after_manifest = manifest_phase.finalize()?;

match after_manifest {
AfterManifest::Sidecars { sidecars } => {
println!("Testing with {} sidecar files", sidecars.len());

let schema = get_commit_schema().project(&[ADD_NAME])?;

let mut sidecar_phase =
LeafCheckpointReader::new(sidecars, engine.clone(), schema)?;

let mut sidecar_file_paths = Vec::new();
let mut batch_count = 0;

while let Some(result) = sidecar_phase.next() {
let batch = result?;
let metadata = processor.process_actions_batch(batch)?;
let paths = metadata.visit_scan_files(
vec![],
|ps: &mut Vec<String>, path, _, _, _, _, _| {
ps.push(path.to_string());
},
)?;
sidecar_file_paths.extend(paths);
batch_count += 1;
}

sidecar_file_paths.sort();

// v2-checkpoints-json-with-sidecars has exactly 2 sidecar files with 101 total files
assert_eq!(
batch_count, 2,
"LeafCheckpointReader should process exactly 2 sidecar batches"
);

assert_eq!(
sidecar_file_paths.len(),
101,
"LeafCheckpointReader should find exactly 101 files from sidecars"
);

// Verify first few files match expected (sampling to keep test readable)
let expected_first_files = vec![
"test%25file%25prefix-part-00000-01086c52-1b86-48d0-8889-517fe626849d-c000.snappy.parquet",
"test%25file%25prefix-part-00000-0fd71c0e-fd08-4685-87d6-aae77532d3ea-c000.snappy.parquet",
"test%25file%25prefix-part-00000-2710dd7f-9fa5-429d-b3fb-c005ba16e062-c000.snappy.parquet",
];

assert_eq!(
&sidecar_file_paths[..3],
&expected_first_files[..],
"LeafCheckpointReader should process files in expected order"
);
}
AfterManifest::Done => {
println!("No sidecars found - test inconclusive");
}
}

Ok(())
}
}
Loading
Loading