Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ pub mod error;
pub mod expressions;
mod log_compaction;
mod log_path;
mod log_reader;
pub mod scan;
pub mod schema;
pub mod snapshot;
Expand Down
133 changes: 133 additions & 0 deletions kernel/src/log_reader/commit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
//! Commit phase for log replay - processes JSON commit files.

use std::sync::Arc;

Check failure on line 3 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / msrv-run-tests

unused import: `std::sync::Arc`

Check failure on line 3 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / arrow_integration_test (macOS-latest)

unused import: `std::sync::Arc`

Check failure on line 3 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / arrow_integration_test (ubuntu-latest)

unused import: `std::sync::Arc`

Check failure on line 3 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / docs

unused import: `std::sync::Arc`

Check failure on line 3 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `std::sync::Arc`

Check failure on line 3 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / run-examples

unused import: `std::sync::Arc`

Check failure on line 3 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest)

unused import: `std::sync::Arc`

Check failure on line 3 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / coverage

unused import: `std::sync::Arc`

use crate::actions::{get_commit_schema, ADD_NAME, REMOVE_NAME};

Check failure on line 5 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / msrv-run-tests

unused imports: `ADD_NAME`, `REMOVE_NAME`, and `get_commit_schema`

Check failure on line 5 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / arrow_integration_test (macOS-latest)

unused imports: `ADD_NAME`, `REMOVE_NAME`, and `get_commit_schema`

Check failure on line 5 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / arrow_integration_test (ubuntu-latest)

unused imports: `ADD_NAME`, `REMOVE_NAME`, and `get_commit_schema`

Check failure on line 5 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / docs

unused imports: `ADD_NAME`, `REMOVE_NAME`, and `get_commit_schema`

Check failure on line 5 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused imports: `ADD_NAME`, `REMOVE_NAME`, and `get_commit_schema`

Check failure on line 5 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / run-examples

unused imports: `ADD_NAME`, `REMOVE_NAME`, and `get_commit_schema`

Check failure on line 5 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest)

unused imports: `ADD_NAME`, `REMOVE_NAME`, and `get_commit_schema`

Check failure on line 5 in kernel/src/log_reader/commit.rs

View workflow job for this annotation

GitHub Actions / coverage

unused imports: `ADD_NAME`, `REMOVE_NAME`, and `get_commit_schema`
use crate::log_replay::ActionsBatch;
use crate::log_segment::LogSegment;
use crate::schema::SchemaRef;
use crate::{DeltaResult, Engine};

/// Phase that processes JSON commit files.
pub(crate) struct CommitReader {
actions: Box<dyn Iterator<Item = DeltaResult<ActionsBatch>> + Send>,
}

impl CommitReader {
/// Create a new commit phase from a log segment.
///
/// # Parameters
/// - `log_segment`: The log segment to process
/// - `engine`: Engine for reading files
pub(crate) fn try_new(
engine: &dyn Engine,
log_segment: &LogSegment,
schema: SchemaRef,
) -> DeltaResult<Self> {
let commit_files = log_segment.find_commit_cover();
let actions = engine
.json_handler()
.read_json_files(&commit_files, schema, None)?
.map(|batch| batch.map(|b| ActionsBatch::new(b, true)));

Ok(Self {
actions: Box::new(actions),
})
}
}

impl Iterator for CommitReader {
type Item = DeltaResult<ActionsBatch>;

fn next(&mut self) -> Option<Self::Item> {
self.actions.next()
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::engine::default::executor::tokio::TokioBackgroundExecutor;
use crate::engine::default::DefaultEngine;
use crate::log_replay::LogReplayProcessor;
use crate::scan::log_replay::ScanLogReplayProcessor;
use crate::scan::state_info::StateInfo;
use crate::scan::COMMIT_READ_SCHEMA;
use object_store::local::LocalFileSystem;
use std::path::PathBuf;
use std::sync::Arc as StdArc;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

???

Is there some name collision that makes Arc unusable?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sigh Claude that day REALLY didn't want to use Arc. Also this should've stayed draft, mb

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it's marked draft. I just made a feedback pass because we've been iterating on this together anyway.


fn load_test_table(
table_name: &str,
) -> DeltaResult<(
StdArc<DefaultEngine<TokioBackgroundExecutor>>,
StdArc<crate::Snapshot>,
url::Url,
)> {
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push("tests/data");
path.push(table_name);

let path = std::fs::canonicalize(path)
.map_err(|e| crate::Error::Generic(format!("Failed to canonicalize path: {}", e)))?;

let url = url::Url::from_directory_path(path)
.map_err(|_| crate::Error::Generic("Failed to create URL from path".to_string()))?;

let store = StdArc::new(LocalFileSystem::new());
let engine = StdArc::new(DefaultEngine::new(store));
let snapshot = crate::Snapshot::builder_for(url.clone()).build(engine.as_ref())?;

Ok((engine, snapshot, url))
}

#[test]
fn test_commit_phase_processes_commits() -> DeltaResult<()> {
let (engine, snapshot, _url) = load_test_table("table-without-dv-small")?;
let log_segment = StdArc::new(snapshot.log_segment().clone());

let state_info = StdArc::new(StateInfo::try_new(
snapshot.schema(),
snapshot.table_configuration(),
None,
(),
)?);

let mut processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info)?;
let schema = COMMIT_READ_SCHEMA.clone();
let mut commit_phase = CommitReader::try_new(engine.as_ref(), &log_segment, schema)?;

let mut batch_count = 0;
let mut file_paths = Vec::new();

for result in commit_phase {
let batch = result?;
let metadata = processor.process_actions_batch(batch)?;
let paths = metadata.visit_scan_files(
vec![],
|ps: &mut Vec<String>, path, _, _, _, _, _| {
ps.push(path.to_string());
},
)?;
file_paths.extend(paths);
batch_count += 1;
}

// table-without-dv-small has exactly 1 commit file
assert_eq!(
batch_count, 1,
"table-without-dv-small should have exactly 1 commit batch"
);

// table-without-dv-small has exactly 1 add file
file_paths.sort();
let expected_files =
vec!["part-00000-517f5d32-9c95-48e8-82b4-0229cc194867-c000.snappy.parquet"];
assert_eq!(
file_paths, expected_files,
"CommitReader should find exactly the expected file"
);

Ok(())
}
}
1 change: 1 addition & 0 deletions kernel/src/log_reader/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub(crate) mod commit;
13 changes: 3 additions & 10 deletions kernel/src/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::actions::{
PROTOCOL_NAME, SIDECAR_NAME,
};
use crate::last_checkpoint_hint::LastCheckpointHint;
use crate::log_reader::commit::CommitReader;
use crate::log_replay::ActionsBatch;
use crate::path::{LogPathFileType, ParsedLogPath};
use crate::schema::{SchemaRef, StructField, ToSchema as _};
Expand Down Expand Up @@ -303,15 +304,7 @@ impl LogSegment {
meta_predicate: Option<PredicateRef>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ActionsBatch>> + Send> {
// `replay` expects commit files to be sorted in descending order, so the return value here is correct
let commits_and_compactions = self.find_commit_cover();
let commit_stream = engine
.json_handler()
.read_json_files(
&commits_and_compactions,
commit_read_schema,
meta_predicate.clone(),
)?
.map_ok(|batch| ActionsBatch::new(batch, true));
let commit_stream = CommitReader::try_new(engine, self, commit_read_schema)?;

let checkpoint_stream =
self.create_checkpoint_stream(engine, checkpoint_read_schema, meta_predicate)?;
Expand Down Expand Up @@ -340,7 +333,7 @@ impl LogSegment {
/// returns files is DESCENDING ORDER, as that's what `replay` expects. This function assumes
/// that all files in `self.ascending_commit_files` and `self.ascending_compaction_files` are in
/// range for this log segment. This invariant is maintained by our listing code.
fn find_commit_cover(&self) -> Vec<FileMeta> {
pub(crate) fn find_commit_cover(&self) -> Vec<FileMeta> {
// Create an iterator sorted in ascending order by (initial version, end version), e.g.
// [00.json, 00.09.compacted.json, 00.99.compacted.json, 01.json, 02.json, ..., 10.json,
// 10.19.compacted.json, 11.json, ...]
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub(crate) struct ScanLogReplayProcessor {

impl ScanLogReplayProcessor {
/// Create a new [`ScanLogReplayProcessor`] instance
fn new(engine: &dyn Engine, state_info: Arc<StateInfo>) -> DeltaResult<Self> {
pub(crate) fn new(engine: &dyn Engine, state_info: Arc<StateInfo>) -> DeltaResult<Self> {
// Extract the physical predicate from StateInfo's PhysicalPredicate enum.
// The DataSkippingFilter and partition_filter components expect the predicate
// in the format Option<(PredicateRef, SchemaRef)>, so we need to convert from
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub(crate) mod state_info;

// safety: we define get_commit_schema() and _know_ it contains ADD_NAME and REMOVE_NAME
#[allow(clippy::unwrap_used)]
static COMMIT_READ_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
pub(crate) static COMMIT_READ_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
get_commit_schema()
.project(&[ADD_NAME, REMOVE_NAME])
.unwrap()
Expand Down
Loading