|
15 | 15 | // specific language governing permissions and limitations
|
16 | 16 | // under the License.
|
17 | 17 |
|
18 |
| -use std::collections::{HashMap, HashSet}; |
| 18 | +use std::collections::HashSet; |
19 | 19 | use std::sync::Arc;
|
20 | 20 |
|
21 | 21 | use async_trait::async_trait;
|
22 |
| -use uuid::Uuid; |
23 | 22 |
|
24 | 23 | use crate::error::{Error, ErrorKind, Result};
|
25 |
| -use crate::spec::{Schema, SchemaBuilder, SchemaRef}; |
| 24 | +use crate::spec::{Schema, SchemaRef, TableMetadataBuilder}; |
26 | 25 | use crate::table::Table;
|
27 |
| -use crate::transaction::snapshot::SnapshotProducer; |
28 | 26 | use crate::transaction::{ActionCommit, TransactionAction};
|
29 | 27 | use crate::{TableRequirement, TableUpdate};
|
30 | 28 |
|
31 | 29 | /// UpdateSchemaAction is a transaction action for performing schema evolution to the table.
|
| 30 | +/// |
| 31 | +/// TODO(hjiang): Currently only drop column is supported, need to implement a few more schema evolution operation, i.e., add columns, rename columns, change nullability, update default, etc. |
32 | 32 | pub struct UpdateSchemaAction {
|
33 | 33 | /// Schema update attributes.
|
34 | 34 | case_sensitive: bool,
|
@@ -90,13 +90,20 @@ impl UpdateSchemaAction {
|
90 | 90 | return Err(Error::new(
|
91 | 91 | ErrorKind::PreconditionFailed,
|
92 | 92 | format!(
|
93 |
| - "Column '{}' is the table identifier, which canot be dropped.", |
| 93 | + "Column '{}' is the table identifier, which cannot be dropped.", |
94 | 94 | full_name
|
95 | 95 | ),
|
96 | 96 | ));
|
97 | 97 | }
|
98 | 98 |
|
| 99 | + // Validate not all columns are dropped. |
99 | 100 | self.deletes.insert(field.id);
|
| 101 | + if self.schema.field_id_to_fields().len() == self.deletes.len() { |
| 102 | + return Err(Error::new( |
| 103 | + ErrorKind::PreconditionFailed, |
| 104 | + format!("Cannot delete all columns '{}' in the table.", full_name), |
| 105 | + )); |
| 106 | + } |
100 | 107 |
|
101 | 108 | Ok(self)
|
102 | 109 | }
|
@@ -134,11 +141,108 @@ impl TransactionAction for UpdateSchemaAction {
|
134 | 141 | }
|
135 | 142 |
|
136 | 143 | let new_schema = self.get_updated_schema()?;
|
137 |
| - let new_schema_id = new_schema.schema_id(); |
138 | 144 | updates.push(TableUpdate::AddSchema { schema: new_schema });
|
139 | 145 | updates.push(TableUpdate::SetCurrentSchema {
|
140 |
| - schema_id: new_schema_id, |
| 146 | + schema_id: TableMetadataBuilder::LAST_ADDED, |
141 | 147 | });
|
142 | 148 | Ok(ActionCommit::new(updates, requirements))
|
143 | 149 | }
|
144 | 150 | }
|
| 151 | + |
| 152 | +#[cfg(test)] |
| 153 | +mod tests { |
| 154 | + use std::collections::{HashMap, HashSet}; |
| 155 | + use std::sync::Arc; |
| 156 | + |
| 157 | + use tempfile::TempDir; |
| 158 | + |
| 159 | + use crate::io::FileIOBuilder; |
| 160 | + use crate::spec::Schema; |
| 161 | + use crate::transaction::tests::make_v2_table; |
| 162 | + use crate::transaction::{ApplyTransactionAction, Transaction}; |
| 163 | + use crate::{Catalog, MemoryCatalog, NamespaceIdent, TableCreation, TableIdent}; |
| 164 | + |
| 165 | + /// Test util function to get [`TableCreation`]. |
| 166 | + async fn create_table( |
| 167 | + catalog: &mut MemoryCatalog, |
| 168 | + schema: Arc<Schema>, |
| 169 | + warehouse_location: &str, |
| 170 | + ) { |
| 171 | + let table_name = "test1".to_string(); |
| 172 | + let namespace_ident = NamespaceIdent::from_vec(vec!["ns1".to_string()]).unwrap(); |
| 173 | + // let table_ident = TableIdent::new(namespace_ident.clone(), table_name.clone()); |
| 174 | + // let file_io = FileIOBuilder::new_fs_io().build().unwrap(); |
| 175 | + |
| 176 | + let table_creation = TableCreation::builder() |
| 177 | + .name(table_name.clone()) |
| 178 | + .location(format!( |
| 179 | + "{}/{}/{}", |
| 180 | + warehouse_location, |
| 181 | + namespace_ident.to_url_string(), |
| 182 | + table_name |
| 183 | + )) |
| 184 | + .schema(schema.as_ref().clone()) |
| 185 | + .build(); |
| 186 | + |
| 187 | + catalog |
| 188 | + .create_namespace(&namespace_ident, /*properties=*/ HashMap::new()) |
| 189 | + .await |
| 190 | + .unwrap(); |
| 191 | + catalog |
| 192 | + .create_table(&namespace_ident, table_creation) |
| 193 | + .await |
| 194 | + .unwrap(); |
| 195 | + } |
| 196 | + |
| 197 | + #[test] |
| 198 | + fn test_delete_empty_columns() { |
| 199 | + let table = make_v2_table(); |
| 200 | + let tx = Transaction::new(&table); |
| 201 | + let action = tx.update_schema(); |
| 202 | + assert!(action.deletes.is_empty()); |
| 203 | + } |
| 204 | + |
| 205 | + #[test] |
| 206 | + fn test_fail_to_delete_identifier_column() { |
| 207 | + let table = make_v2_table(); |
| 208 | + let tx = Transaction::new(&table); |
| 209 | + let mut action = tx.update_schema(); |
| 210 | + let res = action.delete_column(vec!["x".to_string()]); |
| 211 | + assert!(res.is_err()); |
| 212 | + } |
| 213 | + |
| 214 | + #[test] |
| 215 | + fn test_delete_non_identifier_column() { |
| 216 | + let table = make_v2_table(); |
| 217 | + let tx = Transaction::new(&table); |
| 218 | + let mut action = tx.update_schema(); |
| 219 | + action.delete_column(vec!["z".to_string()]).unwrap(); |
| 220 | + assert_eq!(action.deletes, HashSet::from([(3)])); |
| 221 | + } |
| 222 | + |
| 223 | + // Test column deletion with memory catalog. |
| 224 | + #[tokio::test] |
| 225 | + async fn test_delete_columns_with_catalog() { |
| 226 | + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); |
| 227 | + let temp_dir = TempDir::new().unwrap(); |
| 228 | + let warehouse_location = temp_dir.path().to_str().unwrap().to_string(); |
| 229 | + |
| 230 | + let table = make_v2_table(); |
| 231 | + let mut memory_catalog = MemoryCatalog::new(file_io, Some(warehouse_location.clone())); |
| 232 | + let schema = table.metadata().current_schema().clone(); |
| 233 | + create_table(&mut memory_catalog, schema, &warehouse_location).await; |
| 234 | + |
| 235 | + let mut tx = Transaction::new(&table); |
| 236 | + let mut action = tx.update_schema(); |
| 237 | + action.delete_column(vec!["z".to_string()]).unwrap(); |
| 238 | + tx = action.apply(tx).unwrap(); |
| 239 | + |
| 240 | + let table = tx.commit(&memory_catalog).await.unwrap(); |
| 241 | + let schema = table.metadata().current_schema(); |
| 242 | + assert!(schema.field_by_id(/*field_id=*/ 1).is_some()); |
| 243 | + assert!(schema.field_by_id(/*field_id=*/ 2).is_some()); |
| 244 | + assert!(schema.field_by_id(/*field_id=*/ 3).is_none()); |
| 245 | + assert_eq!(schema.highest_field_id(), 2); |
| 246 | + assert_eq!(schema.identifier_field_ids().len(), 2); |
| 247 | + } |
| 248 | +} |
0 commit comments