From 444e7d0d13ee8afc675ded7861f5593a512afef7 Mon Sep 17 00:00:00 2001 From: dentiny Date: Tue, 5 Aug 2025 19:27:57 +0000 Subject: [PATCH 1/5] delete column implementation --- crates/iceberg/src/transaction/mod.rs | 8 + .../iceberg/src/transaction/update_schema.rs | 144 ++++++++++++++++++ 2 files changed, 152 insertions(+) create mode 100644 crates/iceberg/src/transaction/update_schema.rs diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 06549a95c..48f320e16 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -60,6 +60,7 @@ mod snapshot; mod sort_order; mod update_location; mod update_properties; +mod update_schema; mod update_statistics; mod upgrade_format_version; @@ -81,6 +82,7 @@ use crate::transaction::append::FastAppendAction; use crate::transaction::sort_order::ReplaceSortOrderAction; use crate::transaction::update_location::UpdateLocationAction; use crate::transaction::update_properties::UpdatePropertiesAction; +use crate::transaction::update_schema::UpdateSchemaAction; use crate::transaction::update_statistics::UpdateStatisticsAction; use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction; use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate}; @@ -163,6 +165,12 @@ impl Transaction { UpdateStatisticsAction::new() } + /// Update schema of the table. + pub fn update_schema(&self) -> UpdateSchemaAction { + let current_schema = self.table.metadata().current_schema().clone(); + UpdateSchemaAction::new(current_schema) + } + /// Commit transaction. pub async fn commit(self, catalog: &dyn Catalog) -> Result { if self.actions.is_empty() { diff --git a/crates/iceberg/src/transaction/update_schema.rs b/crates/iceberg/src/transaction/update_schema.rs new file mode 100644 index 000000000..3e5509e2e --- /dev/null +++ b/crates/iceberg/src/transaction/update_schema.rs @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use async_trait::async_trait; +use uuid::Uuid; + +use crate::error::{Error, ErrorKind, Result}; +use crate::spec::{Schema, SchemaBuilder, SchemaRef}; +use crate::table::Table; +use crate::transaction::snapshot::SnapshotProducer; +use crate::transaction::{ActionCommit, TransactionAction}; +use crate::{TableRequirement, TableUpdate}; + +/// UpdateSchemaAction is a transaction action for performing schema evolution to the table. +pub struct UpdateSchemaAction { + /// Schema update attributes. + case_sensitive: bool, + /// Current schema before update. + schema: SchemaRef, + /// Current field ids. + identifier_field_ids: HashSet, + /// Columns to drop on the table. + deletes: HashSet, +} + +impl UpdateSchemaAction { + pub(crate) fn new(schema: SchemaRef) -> Self { + let identifier_field_ids = schema.identifier_field_ids().collect::>(); + Self { + case_sensitive: false, + schema, + identifier_field_ids, + deletes: HashSet::new(), + } + } + + /// Set case sensitivity when updating schema by column names. + pub fn set_case_sensitivity(mut self, case_sensitivity: bool) -> Self { + self.case_sensitive = case_sensitivity; + self + } + + /// Deletes a column from a table. + /// + /// # Arguments + /// + /// * `column_name` - The path to the column. + /// + /// # Returns + /// + /// Returns the `UpdateSchema` with the delete operation staged. + pub fn delete_column(&mut self, column_name: Vec) -> Result<&mut Self> { + let full_name = column_name.join("."); + + // Get field id to drop. + let field = if self.case_sensitive { + self.schema.field_by_name(&full_name) + } else { + self.schema.field_by_name_case_insensitive(&full_name) + } + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Delete column name,'{}' , doesn't exist in the schema", + full_name + ), + ) + })?; + + // Validate columns to drop cannot be the table identifier. + if self.identifier_field_ids.contains(&field.id) { + return Err(Error::new( + ErrorKind::PreconditionFailed, + format!( + "Column '{}' is the table identifier, which canot be dropped.", + full_name + ), + )); + } + + self.deletes.insert(field.id); + + Ok(self) + } + + /// Get updated schema. + fn get_updated_schema(&self) -> Result { + let old_schema_id = self.schema.schema_id(); + let new_schema_id = old_schema_id + 1; + + let mut new_fields = vec![]; + for (field_id, field) in self.schema.field_id_to_fields() { + if self.deletes.contains(field_id) { + continue; + } + new_fields.push(field.clone()); + } + + let schema_builder = Schema::builder(); + let new_schema = schema_builder + .with_schema_id(new_schema_id) + .with_identifier_field_ids(self.identifier_field_ids.clone()) + .with_fields(new_fields) + .build()?; + Ok(new_schema) + } +} + +#[async_trait] +impl TransactionAction for UpdateSchemaAction { + async fn commit(self: Arc, _table: &Table) -> Result { + let mut updates: Vec = vec![]; + let requirements: Vec = vec![]; + if self.deletes.is_empty() { + return Ok(ActionCommit::new(updates, requirements)); + } + + let new_schema = self.get_updated_schema()?; + let new_schema_id = new_schema.schema_id(); + updates.push(TableUpdate::AddSchema { schema: new_schema }); + updates.push(TableUpdate::SetCurrentSchema { + schema_id: new_schema_id, + }); + Ok(ActionCommit::new(updates, requirements)) + } +} From 477519eb6a3c9eaf9d642ddddca961018cb63912 Mon Sep 17 00:00:00 2001 From: dentiny Date: Tue, 5 Aug 2025 22:11:39 +0000 Subject: [PATCH 2/5] add unit test --- .../iceberg/src/transaction/update_schema.rs | 116 ++++++++++++++++-- 1 file changed, 109 insertions(+), 7 deletions(-) diff --git a/crates/iceberg/src/transaction/update_schema.rs b/crates/iceberg/src/transaction/update_schema.rs index 3e5509e2e..217cb3dfc 100644 --- a/crates/iceberg/src/transaction/update_schema.rs +++ b/crates/iceberg/src/transaction/update_schema.rs @@ -15,20 +15,20 @@ // specific language governing permissions and limitations // under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::sync::Arc; use async_trait::async_trait; -use uuid::Uuid; use crate::error::{Error, ErrorKind, Result}; -use crate::spec::{Schema, SchemaBuilder, SchemaRef}; +use crate::spec::{Schema, SchemaRef, TableMetadataBuilder}; use crate::table::Table; -use crate::transaction::snapshot::SnapshotProducer; use crate::transaction::{ActionCommit, TransactionAction}; use crate::{TableRequirement, TableUpdate}; /// UpdateSchemaAction is a transaction action for performing schema evolution to the table. +/// +/// TODO(hjiang): Currently only drop column is supported, need to implement a few more schema evolution operation, i.e., add columns, rename columns, change nullability, update default, etc. pub struct UpdateSchemaAction { /// Schema update attributes. case_sensitive: bool, @@ -90,13 +90,20 @@ impl UpdateSchemaAction { return Err(Error::new( ErrorKind::PreconditionFailed, format!( - "Column '{}' is the table identifier, which canot be dropped.", + "Column '{}' is the table identifier, which cannot be dropped.", full_name ), )); } + // Validate not all columns are dropped. self.deletes.insert(field.id); + if self.schema.field_id_to_fields().len() == self.deletes.len() { + return Err(Error::new( + ErrorKind::PreconditionFailed, + format!("Cannot delete all columns '{}' in the table.", full_name), + )); + } Ok(self) } @@ -134,11 +141,106 @@ impl TransactionAction for UpdateSchemaAction { } let new_schema = self.get_updated_schema()?; - let new_schema_id = new_schema.schema_id(); updates.push(TableUpdate::AddSchema { schema: new_schema }); updates.push(TableUpdate::SetCurrentSchema { - schema_id: new_schema_id, + schema_id: TableMetadataBuilder::LAST_ADDED, }); Ok(ActionCommit::new(updates, requirements)) } } + +#[cfg(test)] +mod tests { + use std::collections::{HashMap, HashSet}; + use std::sync::Arc; + + use tempfile::TempDir; + + use crate::io::FileIOBuilder; + use crate::spec::Schema; + use crate::transaction::tests::make_v2_table; + use crate::transaction::{ApplyTransactionAction, Transaction}; + use crate::{Catalog, MemoryCatalog, NamespaceIdent, TableCreation}; + + /// Test util function to get [`TableCreation`]. + async fn create_table( + catalog: &mut MemoryCatalog, + schema: Arc, + warehouse_location: &str, + ) { + let table_name = "test1".to_string(); + let namespace_ident = NamespaceIdent::from_vec(vec!["ns1".to_string()]).unwrap(); + + let table_creation = TableCreation::builder() + .name(table_name.clone()) + .location(format!( + "{}/{}/{}", + warehouse_location, + namespace_ident.to_url_string(), + table_name + )) + .schema(schema.as_ref().clone()) + .build(); + + catalog + .create_namespace(&namespace_ident, /*properties=*/ HashMap::new()) + .await + .unwrap(); + catalog + .create_table(&namespace_ident, table_creation) + .await + .unwrap(); + } + + #[test] + fn test_delete_empty_columns() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + let action = tx.update_schema(); + assert!(action.deletes.is_empty()); + } + + #[test] + fn test_fail_to_delete_identifier_column() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + let mut action = tx.update_schema(); + let res = action.delete_column(vec!["x".to_string()]); + assert!(res.is_err()); + } + + #[test] + fn test_delete_non_identifier_column() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + let mut action = tx.update_schema(); + action.delete_column(vec!["z".to_string()]).unwrap(); + assert_eq!(action.deletes, HashSet::from([(3)])); + } + + // Test column deletion with memory catalog. + #[tokio::test] + async fn test_delete_columns_with_catalog() { + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let temp_dir = TempDir::new().unwrap(); + let warehouse_location = temp_dir.path().to_str().unwrap().to_string(); + + let table = make_v2_table(); + let mut memory_catalog = MemoryCatalog::new(file_io, Some(warehouse_location.clone())); + let schema = table.metadata().current_schema().clone(); + create_table(&mut memory_catalog, schema, &warehouse_location).await; + + let mut tx = Transaction::new(&table); + let mut action = tx.update_schema(); + action.delete_column(vec!["z".to_string()]).unwrap(); + tx = action.apply(tx).unwrap(); + + let table = tx.commit(&memory_catalog).await.unwrap(); + let schema = table.metadata().current_schema(); + assert!(schema.field_by_id(/*field_id=*/ 1).is_some()); + assert!(schema.field_by_id(/*field_id=*/ 2).is_some()); + assert!(schema.field_by_id(/*field_id=*/ 3).is_none()); + assert_eq!(schema.highest_field_id(), 2); + assert_eq!(schema.identifier_field_ids().len(), 2); + } +} From 57531f6d15540e4737bf714b48c03c2661715cd1 Mon Sep 17 00:00:00 2001 From: dentiny Date: Mon, 11 Aug 2025 07:35:35 +0000 Subject: [PATCH 3/5] check at commit --- .../iceberg/src/transaction/update_schema.rs | 182 ++++++++++++------ 1 file changed, 124 insertions(+), 58 deletions(-) diff --git a/crates/iceberg/src/transaction/update_schema.rs b/crates/iceberg/src/transaction/update_schema.rs index 217cb3dfc..8f9e94467 100644 --- a/crates/iceberg/src/transaction/update_schema.rs +++ b/crates/iceberg/src/transaction/update_schema.rs @@ -34,20 +34,16 @@ pub struct UpdateSchemaAction { case_sensitive: bool, /// Current schema before update. schema: SchemaRef, - /// Current field ids. - identifier_field_ids: HashSet, - /// Columns to drop on the table. - deletes: HashSet, + /// Columns names to drop from the table. + drops: HashSet, } impl UpdateSchemaAction { pub(crate) fn new(schema: SchemaRef) -> Self { - let identifier_field_ids = schema.identifier_field_ids().collect::>(); Self { case_sensitive: false, schema, - identifier_field_ids, - deletes: HashSet::new(), + drops: HashSet::new(), } } @@ -57,65 +53,76 @@ impl UpdateSchemaAction { self } - /// Deletes a column from a table. + /// drops a column from a table. /// /// # Arguments /// - /// * `column_name` - The path to the column. + /// * `column_name` - column to drop. /// /// # Returns /// - /// Returns the `UpdateSchema` with the delete operation staged. - pub fn delete_column(&mut self, column_name: Vec) -> Result<&mut Self> { - let full_name = column_name.join("."); + /// Returns the `UpdateSchema` with the drop operation staged. + pub fn drop_column(mut self, column_name: String) -> Self { + self.drops.insert(column_name); + self + } - // Get field id to drop. - let field = if self.case_sensitive { - self.schema.field_by_name(&full_name) - } else { - self.schema.field_by_name_case_insensitive(&full_name) - } - .ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!( - "Delete column name,'{}' , doesn't exist in the schema", - full_name - ), - ) - })?; - - // Validate columns to drop cannot be the table identifier. - if self.identifier_field_ids.contains(&field.id) { + /// Validate columns to drop, and get their field ids. + fn get_field_ids_to_drop(&self) -> Result> { + // Validate not all columns are dropped. + if self.schema.field_id_to_fields().len() == self.drops.len() { return Err(Error::new( ErrorKind::PreconditionFailed, - format!( - "Column '{}' is the table identifier, which cannot be dropped.", - full_name - ), + format!("Cannot drop all columns in the table."), )); } - // Validate not all columns are dropped. - self.deletes.insert(field.id); - if self.schema.field_id_to_fields().len() == self.deletes.len() { - return Err(Error::new( - ErrorKind::PreconditionFailed, - format!("Cannot delete all columns '{}' in the table.", full_name), - )); + let identifier_field_ids = self.schema.identifier_field_ids().collect::>(); + let mut field_ids_to_drop = HashSet::new(); + + // Get field id to drop. + for cur_column_name in self.drops.iter() { + let field = if self.case_sensitive { + self.schema.field_by_name(&cur_column_name) + } else { + self.schema.field_by_name_case_insensitive(&cur_column_name) + } + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "drop column name,'{}' , doesn't exist in the schema", + cur_column_name + ), + ) + })?; + + // Validate columns to drop cannot be the table identifier. + if identifier_field_ids.contains(&field.id) { + return Err(Error::new( + ErrorKind::PreconditionFailed, + format!( + "Column '{}' is the table identifier, which cannot be dropped.", + cur_column_name + ), + )); + } + + field_ids_to_drop.insert(field.id); } - Ok(self) + Ok(field_ids_to_drop) } /// Get updated schema. fn get_updated_schema(&self) -> Result { + let field_ids_to_drop = self.get_field_ids_to_drop()?; let old_schema_id = self.schema.schema_id(); let new_schema_id = old_schema_id + 1; let mut new_fields = vec![]; for (field_id, field) in self.schema.field_id_to_fields() { - if self.deletes.contains(field_id) { + if field_ids_to_drop.contains(field_id) { continue; } new_fields.push(field.clone()); @@ -124,7 +131,7 @@ impl UpdateSchemaAction { let schema_builder = Schema::builder(); let new_schema = schema_builder .with_schema_id(new_schema_id) - .with_identifier_field_ids(self.identifier_field_ids.clone()) + .with_identifier_field_ids(self.schema.identifier_field_ids()) .with_fields(new_fields) .build()?; Ok(new_schema) @@ -136,7 +143,7 @@ impl TransactionAction for UpdateSchemaAction { async fn commit(self: Arc, _table: &Table) -> Result { let mut updates: Vec = vec![]; let requirements: Vec = vec![]; - if self.deletes.is_empty() { + if self.drops.is_empty() { return Ok(ActionCommit::new(updates, requirements)); } @@ -193,34 +200,71 @@ mod tests { } #[test] - fn test_delete_empty_columns() { + fn test_drop_empty_columns() { let table = make_v2_table(); let tx = Transaction::new(&table); let action = tx.update_schema(); - assert!(action.deletes.is_empty()); + assert!(action.drops.is_empty()); } #[test] - fn test_fail_to_delete_identifier_column() { + fn test_drop_column() { let table = make_v2_table(); let tx = Transaction::new(&table); let mut action = tx.update_schema(); - let res = action.delete_column(vec!["x".to_string()]); - assert!(res.is_err()); + action = action.drop_column("z".to_string()); + assert_eq!(action.drops, HashSet::from([("z".to_string())])); } - #[test] - fn test_delete_non_identifier_column() { + // Test invalid column drop: identifier ids get dropped. + #[tokio::test] + async fn test_drop_identifier_ids_with_catalog() { + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let temp_dir = TempDir::new().unwrap(); + let warehouse_location = temp_dir.path().to_str().unwrap().to_string(); + let table = make_v2_table(); - let tx = Transaction::new(&table); + let mut memory_catalog = MemoryCatalog::new(file_io, Some(warehouse_location.clone())); + let schema = table.metadata().current_schema().clone(); + create_table(&mut memory_catalog, schema, &warehouse_location).await; + + let mut tx = Transaction::new(&table); let mut action = tx.update_schema(); - action.delete_column(vec!["z".to_string()]).unwrap(); - assert_eq!(action.deletes, HashSet::from([(3)])); + action = action.drop_column("x".to_string()); + tx = action.apply(tx).unwrap(); + + let res = tx.commit(&memory_catalog).await; + assert!(res.is_err()); } - // Test column deletion with memory catalog. + // Test empty columns drop with memory catalog. #[tokio::test] - async fn test_delete_columns_with_catalog() { + async fn test_drop_empty_columns_with_catalog() { + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let temp_dir = TempDir::new().unwrap(); + let warehouse_location = temp_dir.path().to_str().unwrap().to_string(); + + let table = make_v2_table(); + let mut memory_catalog = MemoryCatalog::new(file_io, Some(warehouse_location.clone())); + let schema = table.metadata().current_schema().clone(); + create_table(&mut memory_catalog, schema, &warehouse_location).await; + + let mut tx = Transaction::new(&table); + let action = tx.update_schema(); + tx = action.apply(tx).unwrap(); + + let table = tx.commit(&memory_catalog).await.unwrap(); + let schema = table.metadata().current_schema(); + assert!(schema.field_by_id(/*field_id=*/ 1).is_some()); + assert!(schema.field_by_id(/*field_id=*/ 2).is_some()); + assert!(schema.field_by_id(/*field_id=*/ 3).is_some()); + assert_eq!(schema.highest_field_id(), 3); + assert_eq!(schema.identifier_field_ids().len(), 2); + } + + // Test column drop with memory catalog. + #[tokio::test] + async fn test_drop_columns_with_catalog() { let file_io = FileIOBuilder::new_fs_io().build().unwrap(); let temp_dir = TempDir::new().unwrap(); let warehouse_location = temp_dir.path().to_str().unwrap().to_string(); @@ -232,7 +276,7 @@ mod tests { let mut tx = Transaction::new(&table); let mut action = tx.update_schema(); - action.delete_column(vec!["z".to_string()]).unwrap(); + action = action.drop_column("z".to_string()); tx = action.apply(tx).unwrap(); let table = tx.commit(&memory_catalog).await.unwrap(); @@ -243,4 +287,26 @@ mod tests { assert_eq!(schema.highest_field_id(), 2); assert_eq!(schema.identifier_field_ids().len(), 2); } + + // Test case sensitive column drop with memory catalog. + #[tokio::test] + async fn test_drop_case_sensitive_columns_with_catalog() { + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let temp_dir = TempDir::new().unwrap(); + let warehouse_location = temp_dir.path().to_str().unwrap().to_string(); + + let table = make_v2_table(); + let mut memory_catalog = MemoryCatalog::new(file_io, Some(warehouse_location.clone())); + let schema = table.metadata().current_schema().clone(); + create_table(&mut memory_catalog, schema, &warehouse_location).await; + + let mut tx = Transaction::new(&table); + let mut action = tx.update_schema(); + action = action.set_case_sensitivity(true); + action = action.drop_column("Z".to_string()); + tx = action.apply(tx).unwrap(); + + let res = tx.commit(&memory_catalog).await; + assert!(res.is_err()); + } } From e700b0db4d2abcc8a64866efe3dd830a6046a249 Mon Sep 17 00:00:00 2001 From: dentiny Date: Mon, 11 Aug 2025 07:43:57 +0000 Subject: [PATCH 4/5] preallocation --- crates/iceberg/src/transaction/update_schema.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/transaction/update_schema.rs b/crates/iceberg/src/transaction/update_schema.rs index 8f9e94467..0c3d8bf98 100644 --- a/crates/iceberg/src/transaction/update_schema.rs +++ b/crates/iceberg/src/transaction/update_schema.rs @@ -78,7 +78,7 @@ impl UpdateSchemaAction { } let identifier_field_ids = self.schema.identifier_field_ids().collect::>(); - let mut field_ids_to_drop = HashSet::new(); + let mut field_ids_to_drop = HashSet::with_capacity(self.drops.len()); // Get field id to drop. for cur_column_name in self.drops.iter() { From 7147934a6402bb6fbfa97f28c404528e93a881e6 Mon Sep 17 00:00:00 2001 From: dentiny Date: Mon, 11 Aug 2025 07:53:08 +0000 Subject: [PATCH 5/5] lint --- crates/iceberg/src/transaction/update_schema.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/iceberg/src/transaction/update_schema.rs b/crates/iceberg/src/transaction/update_schema.rs index 0c3d8bf98..a33abf3fd 100644 --- a/crates/iceberg/src/transaction/update_schema.rs +++ b/crates/iceberg/src/transaction/update_schema.rs @@ -73,7 +73,7 @@ impl UpdateSchemaAction { if self.schema.field_id_to_fields().len() == self.drops.len() { return Err(Error::new( ErrorKind::PreconditionFailed, - format!("Cannot drop all columns in the table."), + "Cannot drop all columns in the table.".to_string(), )); } @@ -83,9 +83,9 @@ impl UpdateSchemaAction { // Get field id to drop. for cur_column_name in self.drops.iter() { let field = if self.case_sensitive { - self.schema.field_by_name(&cur_column_name) + self.schema.field_by_name(cur_column_name) } else { - self.schema.field_by_name_case_insensitive(&cur_column_name) + self.schema.field_by_name_case_insensitive(cur_column_name) } .ok_or_else(|| { Error::new(