diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 228fab320..20eeb438a 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -123,6 +123,20 @@ static LOG_DOMAIN_METADATA_SCHEMA: LazyLock = LazyLock::new(|| { )])) }); +static LOG_METADATA_SCHEMA: LazyLock = LazyLock::new(|| { + Arc::new(StructType::new_unchecked([StructField::nullable( + METADATA_NAME, + Metadata::to_schema(), + )])) +}); + +static LOG_PROTOCOL_SCHEMA: LazyLock = LazyLock::new(|| { + Arc::new(StructType::new_unchecked([StructField::nullable( + PROTOCOL_NAME, + Protocol::to_schema(), + )])) +}); + #[internal_api] /// Gets the schema for all actions that can appear in commits /// logs. This excludes actions that can only appear in checkpoints. @@ -158,6 +172,14 @@ pub(crate) fn get_log_domain_metadata_schema() -> &'static SchemaRef { &LOG_DOMAIN_METADATA_SCHEMA } +pub(crate) fn get_log_metadata_schema() -> &'static SchemaRef { + &LOG_METADATA_SCHEMA +} + +pub(crate) fn get_log_protocol_schema() -> &'static SchemaRef { + &LOG_PROTOCOL_SCHEMA +} + /// Returns true if the schema contains file actions (add or remove) /// columns. #[internal_api] diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index 8cf7f36f1..f13b6ef71 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -9,8 +9,9 @@ use url::Url; use crate::actions::deletion_vector::DeletionVectorPath; use crate::actions::{ as_log_add_schema, domain_metadata::scan_domain_metadatas, get_log_commit_info_schema, - get_log_domain_metadata_schema, get_log_remove_schema, get_log_txn_schema, CommitInfo, - DomainMetadata, SetTransaction, INTERNAL_DOMAIN_PREFIX, + get_log_domain_metadata_schema, get_log_metadata_schema, get_log_protocol_schema, + get_log_remove_schema, get_log_txn_schema, CommitInfo, DomainMetadata, Metadata, Protocol, + SetTransaction, INTERNAL_DOMAIN_PREFIX, }; #[cfg(feature = "catalog-managed")] use crate::committer::FileSystemCommitter; @@ -26,6 +27,7 @@ use crate::scan::log_replay::{ use crate::scan::scan_row_schema; use crate::schema::{ArrayType, MapType, SchemaRef, StructField, StructType, StructTypeBuilder}; use crate::snapshot::SnapshotRef; +use crate::table_configuration::TableConfiguration; use crate::utils::{current_time_ms, require}; use crate::{ DataType, DeltaResult, Engine, EngineData, Expression, ExpressionRef, IntoEngineData, @@ -147,6 +149,10 @@ pub struct Transaction { domain_removals: Vec, // Whether this transaction contains any logical data changes. data_change: bool, + // New protocol to be committed in this transaction. + new_protocol: Option, + // New metadata to be committed in this transaction. + new_metadata: Option, } impl std::fmt::Debug for Transaction { @@ -191,6 +197,8 @@ impl Transaction { domain_metadata_additions: vec![], domain_removals: vec![], data_change: true, + new_protocol: None, + new_metadata: None, }) } @@ -262,7 +270,29 @@ impl Transaction { .into_iter() .map(|txn| txn.into_engine_data(get_log_txn_schema().clone(), engine)); - // Step 2: Construct commit info with ICT if enabled + // Step 2: Validate Protocol and Metadata updates if present + let commit_version = self.read_snapshot.version() + 1; + let new_protocol = self.new_protocol.clone(); + let new_metadata = self.new_metadata.clone(); + if new_protocol.is_some() || new_metadata.is_some() { + let final_protocol = new_protocol + .as_ref() + .unwrap_or_else(|| self.read_snapshot.table_configuration().protocol()) + .clone(); + let final_metadata = new_metadata + .as_ref() + .unwrap_or_else(|| self.read_snapshot.table_configuration().metadata()) + .clone(); + + TableConfiguration::try_new( + final_metadata, + final_protocol, + self.read_snapshot.table_root().clone(), + commit_version, + )?; + } + + // Step 3: Construct commit info with ICT if enabled let in_commit_timestamp = self.read_snapshot .get_in_commit_timestamp(engine)? @@ -281,19 +311,28 @@ impl Transaction { let commit_info_action = commit_info.into_engine_data(get_log_commit_info_schema().clone(), engine); - // Step 3: Generate add actions and get data for domain metadata actions (e.g. row tracking high watermark) - let commit_version = self.read_snapshot.version() + 1; + // Step 4: Convert Protocol and Metadata to EngineData if present + let protocol_action = new_protocol + .map(|p| p.into_engine_data(get_log_protocol_schema().clone(), engine)) + .transpose()?; + let metadata_action = new_metadata + .map(|m| m.into_engine_data(get_log_metadata_schema().clone(), engine)) + .transpose()?; + + // Step 5: Generate add actions and get data for domain metadata actions (e.g. row tracking high watermark) let (add_actions, row_tracking_domain_metadata) = self.generate_adds(engine, commit_version)?; - // Step 4: Generate all domain metadata actions (user and system domains) + // Step 6: Generate all domain metadata actions (user and system domains) let domain_metadata_actions = self.generate_domain_metadata_actions(engine, row_tracking_domain_metadata)?; - // Step 5: Generate remove actions + // Step 7: Generate remove actions let remove_actions = self.generate_remove_actions(engine)?; let actions = iter::once(commit_info_action) + .chain(protocol_action.into_iter().map(Ok)) + .chain(metadata_action.into_iter().map(Ok)) .chain(add_actions) .chain(set_transaction_actions) .chain(domain_metadata_actions); @@ -302,7 +341,7 @@ impl Transaction { .map(|action_result| action_result.map(FilteredEngineData::with_all_rows_selected)) .chain(remove_actions); - // Step 6: Commit via the committer + // Step 8: Commit via the committer #[cfg(feature = "catalog-managed")] if self.committer.any_ref().is::() && self @@ -406,6 +445,24 @@ impl Transaction { self } + /// Update the protocol for this transaction. + /// The new protocol will be validated during commit to ensure it is compatible with the metadata. + #[internal_api] + #[allow(dead_code)] + pub(crate) fn update_protocol(mut self, protocol: Protocol) -> Self { + self.new_protocol = Some(protocol); + self + } + + /// Update the metadata for this transaction. + /// The new metadata will be validated during commit to ensure it is compatible with the protocol. + #[internal_api] + #[allow(dead_code)] + pub(crate) fn update_metadata(mut self, metadata: Metadata) -> Self { + self.new_metadata = Some(metadata); + self + } + /// Validate that user domains don't conflict with system domains or each other. fn validate_user_domain_operations(&self) -> DeltaResult<()> { let mut seen_domains = HashSet::new(); @@ -1032,10 +1089,17 @@ pub struct RetryableTransaction { #[cfg(test)] mod tests { use super::*; + use crate::actions::{Metadata, Protocol}; use crate::engine::sync::SyncEngine; - use crate::schema::MapType; + use crate::error::Error; + use crate::schema::{DataType, MapType, StructField, StructType}; + use crate::utils::current_time_ms; use crate::Snapshot; + use std::collections::HashMap; use std::path::PathBuf; + use std::sync::Arc; + use tempfile::TempDir; + use url::Url; // TODO: create a finer-grained unit tests for transactions (issue#1091) #[test] @@ -1107,4 +1171,154 @@ mod tests { Ok(()) } + + fn setup_snapshot() -> (TempDir, Arc, SnapshotRef) { + let source_path = + std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap(); + let temp_dir = TempDir::new().unwrap(); + let target_path = temp_dir.path(); + + let log_dir = target_path.join("_delta_log"); + std::fs::create_dir_all(&log_dir).unwrap(); + + std::fs::copy( + source_path.join("_delta_log/00000000000000000000.json"), + log_dir.join("00000000000000000000.json"), + ) + .unwrap(); + std::fs::copy( + source_path.join("_delta_log/00000000000000000001.json"), + log_dir.join("00000000000000000001.json"), + ) + .unwrap(); + + let url = Url::from_directory_path(target_path).unwrap(); + let engine = Arc::new(SyncEngine::new()); + let snapshot = Snapshot::builder_for(url) + .at_version(1) + .build(engine.as_ref()) + .unwrap(); + + (temp_dir, engine, snapshot) + } + + #[test] + fn test_transaction_update_metadata_only() -> DeltaResult<()> { + let (_temp_dir, engine, snapshot) = setup_snapshot(); + + let current_metadata = snapshot.table_configuration().metadata(); + let new_metadata = Metadata::try_new( + current_metadata.name().map(|s| s.to_string()), + Some("Updated Description".to_string()), + current_metadata.parse_schema()?, + current_metadata.partition_columns().clone(), + current_time_ms()?, + current_metadata.configuration().clone(), + )?; + + let transaction = snapshot + .transaction(Box::new(FileSystemCommitter::new()))? + .update_metadata(new_metadata) + .with_data_change(false); + + let result = transaction.commit(engine.as_ref()); + assert!( + result.is_ok(), + "Metadata only commit failed: {:?}", + result.err() + ); + + Ok(()) + } + + #[test] + fn test_transaction_validation_failure() -> DeltaResult<()> { + let (_temp_dir, engine, snapshot) = setup_snapshot(); + + let schema_with_ntz = StructType::new_unchecked(vec![ + StructField::nullable("id", DataType::LONG), + StructField::nullable("ts", DataType::TIMESTAMP_NTZ), + ]); + + let invalid_metadata = Metadata::try_new( + None, + None, + schema_with_ntz, + vec![], + current_time_ms()?, + HashMap::new(), + )?; + + let old_protocol = Protocol::try_new(1, 2, None::>, None::>)?; + + let transaction = snapshot + .transaction(Box::new(FileSystemCommitter::new()))? + .update_metadata(invalid_metadata) + .update_protocol(old_protocol); + + let result = transaction.commit(engine.as_ref()); + + match result { + Ok(_) => panic!("Transaction should have failed validation"), + Err(Error::Unsupported(msg)) => { + assert!( + msg.contains("timestampNtz"), + "Error message should mention timestampNtz, got: {}", + msg + ); + } + Err(e) => panic!("Expected Unsupported error, got: {:?}", e), + } + + Ok(()) + } + + #[test] + fn test_transaction_valid_protocol_upgrade() -> DeltaResult<()> { + let (_temp_dir, engine, snapshot) = setup_snapshot(); + + let new_protocol = Protocol::try_new(1, 3, None::>, None::>)?; + + let transaction = snapshot + .transaction(Box::new(FileSystemCommitter::new()))? + .update_protocol(new_protocol) + .with_data_change(false); + + let result = transaction.commit(engine.as_ref()); + assert!( + result.is_ok(), + "Protocol upgrade failed: {:?}", + result.err() + ); + + Ok(()) + } + + #[test] + fn test_transaction_update_protocol_and_metadata() -> DeltaResult<()> { + let (_temp_dir, engine, snapshot) = setup_snapshot(); + + let current_metadata = snapshot.table_configuration().metadata(); + let new_metadata = Metadata::try_new( + current_metadata.name().map(|s| s.to_string()), + Some("Updated Description".to_string()), + current_metadata.parse_schema()?, + current_metadata.partition_columns().clone(), + current_time_ms()?, + current_metadata.configuration().clone(), + )?; + + let new_protocol = Protocol::try_new(1, 3, None::>, None::>)?; + + let transaction = snapshot + .transaction(Box::new(FileSystemCommitter::new()))? + .update_metadata(new_metadata) + .update_protocol(new_protocol) + .with_data_change(false); + + let result = transaction.commit(engine.as_ref()); + assert!(result.is_ok(), "Combined update failed: {:?}", result.err()); + + Ok(()) + } }