Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,20 @@ static LOG_DOMAIN_METADATA_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
)]))
});

static LOG_METADATA_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked([StructField::nullable(
METADATA_NAME,
Metadata::to_schema(),
)]))
});

static LOG_PROTOCOL_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked([StructField::nullable(
PROTOCOL_NAME,
Protocol::to_schema(),
)]))
});

#[internal_api]
/// Gets the schema for all actions that can appear in commits
/// logs. This excludes actions that can only appear in checkpoints.
Expand Down Expand Up @@ -158,6 +172,14 @@ pub(crate) fn get_log_domain_metadata_schema() -> &'static SchemaRef {
&LOG_DOMAIN_METADATA_SCHEMA
}

pub(crate) fn get_log_metadata_schema() -> &'static SchemaRef {
&LOG_METADATA_SCHEMA
}

pub(crate) fn get_log_protocol_schema() -> &'static SchemaRef {
&LOG_PROTOCOL_SCHEMA
}

/// Returns true if the schema contains file actions (add or remove)
/// columns.
#[internal_api]
Expand Down
232 changes: 223 additions & 9 deletions kernel/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use url::Url;
use crate::actions::deletion_vector::DeletionVectorPath;
use crate::actions::{
as_log_add_schema, domain_metadata::scan_domain_metadatas, get_log_commit_info_schema,
get_log_domain_metadata_schema, get_log_remove_schema, get_log_txn_schema, CommitInfo,
DomainMetadata, SetTransaction, INTERNAL_DOMAIN_PREFIX,
get_log_domain_metadata_schema, get_log_metadata_schema, get_log_protocol_schema,
get_log_remove_schema, get_log_txn_schema, CommitInfo, DomainMetadata, Metadata, Protocol,
SetTransaction, INTERNAL_DOMAIN_PREFIX,
};
#[cfg(feature = "catalog-managed")]
use crate::committer::FileSystemCommitter;
Expand All @@ -26,6 +27,7 @@ use crate::scan::log_replay::{
use crate::scan::scan_row_schema;
use crate::schema::{ArrayType, MapType, SchemaRef, StructField, StructType, StructTypeBuilder};
use crate::snapshot::SnapshotRef;
use crate::table_configuration::TableConfiguration;
use crate::utils::{current_time_ms, require};
use crate::{
DataType, DeltaResult, Engine, EngineData, Expression, ExpressionRef, IntoEngineData,
Expand Down Expand Up @@ -147,6 +149,10 @@ pub struct Transaction {
domain_removals: Vec<String>,
// Whether this transaction contains any logical data changes.
data_change: bool,
// New protocol to be committed in this transaction.
new_protocol: Option<Protocol>,
// New metadata to be committed in this transaction.
new_metadata: Option<Metadata>,
}

impl std::fmt::Debug for Transaction {
Expand Down Expand Up @@ -191,6 +197,8 @@ impl Transaction {
domain_metadata_additions: vec![],
domain_removals: vec![],
data_change: true,
new_protocol: None,
new_metadata: None,
})
}

Expand Down Expand Up @@ -262,7 +270,29 @@ impl Transaction {
.into_iter()
.map(|txn| txn.into_engine_data(get_log_txn_schema().clone(), engine));

// Step 2: Construct commit info with ICT if enabled
// Step 2: Validate Protocol and Metadata updates if present
let commit_version = self.read_snapshot.version() + 1;
let new_protocol = self.new_protocol.clone();
let new_metadata = self.new_metadata.clone();
if new_protocol.is_some() || new_metadata.is_some() {
let final_protocol = new_protocol
.as_ref()
.unwrap_or_else(|| self.read_snapshot.table_configuration().protocol())
.clone();
let final_metadata = new_metadata
.as_ref()
.unwrap_or_else(|| self.read_snapshot.table_configuration().metadata())
.clone();

TableConfiguration::try_new(
final_metadata,
final_protocol,
self.read_snapshot.table_root().clone(),
commit_version,
)?;
}

// Step 3: Construct commit info with ICT if enabled
let in_commit_timestamp =
self.read_snapshot
.get_in_commit_timestamp(engine)?
Expand All @@ -281,19 +311,28 @@ impl Transaction {
let commit_info_action =
commit_info.into_engine_data(get_log_commit_info_schema().clone(), engine);

// Step 3: Generate add actions and get data for domain metadata actions (e.g. row tracking high watermark)
let commit_version = self.read_snapshot.version() + 1;
// Step 4: Convert Protocol and Metadata to EngineData if present
let protocol_action = new_protocol
.map(|p| p.into_engine_data(get_log_protocol_schema().clone(), engine))
.transpose()?;
let metadata_action = new_metadata
.map(|m| m.into_engine_data(get_log_metadata_schema().clone(), engine))
.transpose()?;

// Step 5: Generate add actions and get data for domain metadata actions (e.g. row tracking high watermark)
let (add_actions, row_tracking_domain_metadata) =
self.generate_adds(engine, commit_version)?;

// Step 4: Generate all domain metadata actions (user and system domains)
// Step 6: Generate all domain metadata actions (user and system domains)
let domain_metadata_actions =
self.generate_domain_metadata_actions(engine, row_tracking_domain_metadata)?;

// Step 5: Generate remove actions
// Step 7: Generate remove actions
let remove_actions = self.generate_remove_actions(engine)?;

let actions = iter::once(commit_info_action)
.chain(protocol_action.into_iter().map(Ok))
.chain(metadata_action.into_iter().map(Ok))
.chain(add_actions)
.chain(set_transaction_actions)
.chain(domain_metadata_actions);
Expand All @@ -302,7 +341,7 @@ impl Transaction {
.map(|action_result| action_result.map(FilteredEngineData::with_all_rows_selected))
.chain(remove_actions);

// Step 6: Commit via the committer
// Step 8: Commit via the committer
#[cfg(feature = "catalog-managed")]
if self.committer.any_ref().is::<FileSystemCommitter>()
&& self
Expand Down Expand Up @@ -406,6 +445,24 @@ impl Transaction {
self
}

/// Update the protocol for this transaction.
/// The new protocol will be validated during commit to ensure it is compatible with the metadata.
#[internal_api]
#[allow(dead_code)]
pub(crate) fn update_protocol(mut self, protocol: Protocol) -> Self {
self.new_protocol = Some(protocol);
self
}

/// Update the metadata for this transaction.
/// The new metadata will be validated during commit to ensure it is compatible with the protocol.
#[internal_api]
#[allow(dead_code)]
pub(crate) fn update_metadata(mut self, metadata: Metadata) -> Self {
self.new_metadata = Some(metadata);
self
}

/// Validate that user domains don't conflict with system domains or each other.
fn validate_user_domain_operations(&self) -> DeltaResult<()> {
let mut seen_domains = HashSet::new();
Expand Down Expand Up @@ -1032,10 +1089,17 @@ pub struct RetryableTransaction {
#[cfg(test)]
mod tests {
use super::*;
use crate::actions::{Metadata, Protocol};
use crate::engine::sync::SyncEngine;
use crate::schema::MapType;
use crate::error::Error;
use crate::schema::{DataType, MapType, StructField, StructType};
use crate::utils::current_time_ms;
use crate::Snapshot;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tempfile::TempDir;
use url::Url;

// TODO: create a finer-grained unit tests for transactions (issue#1091)
#[test]
Expand Down Expand Up @@ -1107,4 +1171,154 @@ mod tests {

Ok(())
}

fn setup_snapshot() -> (TempDir, Arc<SyncEngine>, SnapshotRef) {
let source_path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let temp_dir = TempDir::new().unwrap();
let target_path = temp_dir.path();

let log_dir = target_path.join("_delta_log");
std::fs::create_dir_all(&log_dir).unwrap();

std::fs::copy(
source_path.join("_delta_log/00000000000000000000.json"),
log_dir.join("00000000000000000000.json"),
)
.unwrap();
std::fs::copy(
source_path.join("_delta_log/00000000000000000001.json"),
log_dir.join("00000000000000000001.json"),
)
.unwrap();

let url = Url::from_directory_path(target_path).unwrap();
let engine = Arc::new(SyncEngine::new());
let snapshot = Snapshot::builder_for(url)
.at_version(1)
.build(engine.as_ref())
.unwrap();

(temp_dir, engine, snapshot)
}

#[test]
fn test_transaction_update_metadata_only() -> DeltaResult<()> {
let (_temp_dir, engine, snapshot) = setup_snapshot();

let current_metadata = snapshot.table_configuration().metadata();
let new_metadata = Metadata::try_new(
current_metadata.name().map(|s| s.to_string()),
Some("Updated Description".to_string()),
current_metadata.parse_schema()?,
current_metadata.partition_columns().clone(),
current_time_ms()?,
current_metadata.configuration().clone(),
)?;

let transaction = snapshot
.transaction(Box::new(FileSystemCommitter::new()))?
.update_metadata(new_metadata)
.with_data_change(false);

let result = transaction.commit(engine.as_ref());
assert!(
result.is_ok(),
"Metadata only commit failed: {:?}",
result.err()
);

Ok(())
}

#[test]
fn test_transaction_validation_failure() -> DeltaResult<()> {
let (_temp_dir, engine, snapshot) = setup_snapshot();

let schema_with_ntz = StructType::new_unchecked(vec![
StructField::nullable("id", DataType::LONG),
StructField::nullable("ts", DataType::TIMESTAMP_NTZ),
]);

let invalid_metadata = Metadata::try_new(
None,
None,
schema_with_ntz,
vec![],
current_time_ms()?,
HashMap::new(),
)?;

let old_protocol = Protocol::try_new(1, 2, None::<Vec<String>>, None::<Vec<String>>)?;

let transaction = snapshot
.transaction(Box::new(FileSystemCommitter::new()))?
.update_metadata(invalid_metadata)
.update_protocol(old_protocol);

let result = transaction.commit(engine.as_ref());

match result {
Ok(_) => panic!("Transaction should have failed validation"),
Err(Error::Unsupported(msg)) => {
assert!(
msg.contains("timestampNtz"),
"Error message should mention timestampNtz, got: {}",
msg
);
}
Err(e) => panic!("Expected Unsupported error, got: {:?}", e),
}

Ok(())
}

#[test]
fn test_transaction_valid_protocol_upgrade() -> DeltaResult<()> {
let (_temp_dir, engine, snapshot) = setup_snapshot();

let new_protocol = Protocol::try_new(1, 3, None::<Vec<String>>, None::<Vec<String>>)?;

let transaction = snapshot
.transaction(Box::new(FileSystemCommitter::new()))?
.update_protocol(new_protocol)
.with_data_change(false);

let result = transaction.commit(engine.as_ref());
assert!(
result.is_ok(),
"Protocol upgrade failed: {:?}",
result.err()
);

Ok(())
}

#[test]
fn test_transaction_update_protocol_and_metadata() -> DeltaResult<()> {
let (_temp_dir, engine, snapshot) = setup_snapshot();

let current_metadata = snapshot.table_configuration().metadata();
let new_metadata = Metadata::try_new(
current_metadata.name().map(|s| s.to_string()),
Some("Updated Description".to_string()),
current_metadata.parse_schema()?,
current_metadata.partition_columns().clone(),
current_time_ms()?,
current_metadata.configuration().clone(),
)?;

let new_protocol = Protocol::try_new(1, 3, None::<Vec<String>>, None::<Vec<String>>)?;

let transaction = snapshot
.transaction(Box::new(FileSystemCommitter::new()))?
.update_metadata(new_metadata)
.update_protocol(new_protocol)
.with_data_change(false);

let result = transaction.commit(engine.as_ref());
assert!(result.is_ok(), "Combined update failed: {:?}", result.err());

Ok(())
}
}
Loading