@@ -34,20 +34,16 @@ pub struct UpdateSchemaAction {
34
34
case_sensitive : bool ,
35
35
/// Current schema before update.
36
36
schema : SchemaRef ,
37
- /// Current field ids.
38
- identifier_field_ids : HashSet < i32 > ,
39
- /// Columns to drop on the table.
40
- deletes : HashSet < i32 > ,
37
+ /// Columns names to drop from the table.
38
+ drops : HashSet < String > ,
41
39
}
42
40
43
41
impl UpdateSchemaAction {
44
42
pub ( crate ) fn new ( schema : SchemaRef ) -> Self {
45
- let identifier_field_ids = schema. identifier_field_ids ( ) . collect :: < HashSet < i32 > > ( ) ;
46
43
Self {
47
44
case_sensitive : false ,
48
45
schema,
49
- identifier_field_ids,
50
- deletes : HashSet :: new ( ) ,
46
+ drops : HashSet :: new ( ) ,
51
47
}
52
48
}
53
49
@@ -57,65 +53,76 @@ impl UpdateSchemaAction {
57
53
self
58
54
}
59
55
60
- /// Deletes a column from a table.
56
+ /// drops a column from a table.
61
57
///
62
58
/// # Arguments
63
59
///
64
- /// * `column_name` - The path to the column .
60
+ /// * `column_name` - column to drop .
65
61
///
66
62
/// # Returns
67
63
///
68
- /// Returns the `UpdateSchema` with the delete operation staged.
69
- pub fn delete_column ( & mut self , column_name : Vec < String > ) -> Result < & mut Self > {
70
- let full_name = column_name. join ( "." ) ;
64
+ /// Returns the `UpdateSchema` with the drop operation staged.
65
+ pub fn drop_column ( mut self , column_name : String ) -> Self {
66
+ self . drops . insert ( column_name) ;
67
+ self
68
+ }
71
69
72
- // Get field id to drop.
73
- let field = if self . case_sensitive {
74
- self . schema . field_by_name ( & full_name)
75
- } else {
76
- self . schema . field_by_name_case_insensitive ( & full_name)
77
- }
78
- . ok_or_else ( || {
79
- Error :: new (
80
- ErrorKind :: DataInvalid ,
81
- format ! (
82
- "Delete column name,'{}' , doesn't exist in the schema" ,
83
- full_name
84
- ) ,
85
- )
86
- } ) ?;
87
-
88
- // Validate columns to drop cannot be the table identifier.
89
- if self . identifier_field_ids . contains ( & field. id ) {
70
+ /// Validate columns to drop, and get their field ids.
71
+ fn get_field_ids_to_drop ( & self ) -> Result < HashSet < i32 > > {
72
+ // Validate not all columns are dropped.
73
+ if self . schema . field_id_to_fields ( ) . len ( ) == self . drops . len ( ) {
90
74
return Err ( Error :: new (
91
75
ErrorKind :: PreconditionFailed ,
92
- format ! (
93
- "Column '{}' is the table identifier, which cannot be dropped." ,
94
- full_name
95
- ) ,
76
+ format ! ( "Cannot drop all columns in the table." ) ,
96
77
) ) ;
97
78
}
98
79
99
- // Validate not all columns are dropped.
100
- self . deletes . insert ( field. id ) ;
101
- if self . schema . field_id_to_fields ( ) . len ( ) == self . deletes . len ( ) {
102
- return Err ( Error :: new (
103
- ErrorKind :: PreconditionFailed ,
104
- format ! ( "Cannot delete all columns '{}' in the table." , full_name) ,
105
- ) ) ;
80
+ let identifier_field_ids = self . schema . identifier_field_ids ( ) . collect :: < HashSet < i32 > > ( ) ;
81
+ let mut field_ids_to_drop = HashSet :: new ( ) ;
82
+
83
+ // Get field id to drop.
84
+ for cur_column_name in self . drops . iter ( ) {
85
+ let field = if self . case_sensitive {
86
+ self . schema . field_by_name ( & cur_column_name)
87
+ } else {
88
+ self . schema . field_by_name_case_insensitive ( & cur_column_name)
89
+ }
90
+ . ok_or_else ( || {
91
+ Error :: new (
92
+ ErrorKind :: DataInvalid ,
93
+ format ! (
94
+ "drop column name,'{}' , doesn't exist in the schema" ,
95
+ cur_column_name
96
+ ) ,
97
+ )
98
+ } ) ?;
99
+
100
+ // Validate columns to drop cannot be the table identifier.
101
+ if identifier_field_ids. contains ( & field. id ) {
102
+ return Err ( Error :: new (
103
+ ErrorKind :: PreconditionFailed ,
104
+ format ! (
105
+ "Column '{}' is the table identifier, which cannot be dropped." ,
106
+ cur_column_name
107
+ ) ,
108
+ ) ) ;
109
+ }
110
+
111
+ field_ids_to_drop. insert ( field. id ) ;
106
112
}
107
113
108
- Ok ( self )
114
+ Ok ( field_ids_to_drop )
109
115
}
110
116
111
117
/// Get updated schema.
112
118
fn get_updated_schema ( & self ) -> Result < Schema > {
119
+ let field_ids_to_drop = self . get_field_ids_to_drop ( ) ?;
113
120
let old_schema_id = self . schema . schema_id ( ) ;
114
121
let new_schema_id = old_schema_id + 1 ;
115
122
116
123
let mut new_fields = vec ! [ ] ;
117
124
for ( field_id, field) in self . schema . field_id_to_fields ( ) {
118
- if self . deletes . contains ( field_id) {
125
+ if field_ids_to_drop . contains ( field_id) {
119
126
continue ;
120
127
}
121
128
new_fields. push ( field. clone ( ) ) ;
@@ -124,7 +131,7 @@ impl UpdateSchemaAction {
124
131
let schema_builder = Schema :: builder ( ) ;
125
132
let new_schema = schema_builder
126
133
. with_schema_id ( new_schema_id)
127
- . with_identifier_field_ids ( self . identifier_field_ids . clone ( ) )
134
+ . with_identifier_field_ids ( self . schema . identifier_field_ids ( ) )
128
135
. with_fields ( new_fields)
129
136
. build ( ) ?;
130
137
Ok ( new_schema)
@@ -136,7 +143,7 @@ impl TransactionAction for UpdateSchemaAction {
136
143
async fn commit ( self : Arc < Self > , _table : & Table ) -> Result < ActionCommit > {
137
144
let mut updates: Vec < TableUpdate > = vec ! [ ] ;
138
145
let requirements: Vec < TableRequirement > = vec ! [ ] ;
139
- if self . deletes . is_empty ( ) {
146
+ if self . drops . is_empty ( ) {
140
147
return Ok ( ActionCommit :: new ( updates, requirements) ) ;
141
148
}
142
149
@@ -193,34 +200,97 @@ mod tests {
193
200
}
194
201
195
202
#[ test]
196
- fn test_delete_empty_columns ( ) {
203
+ fn test_drop_empty_columns ( ) {
197
204
let table = make_v2_table ( ) ;
198
205
let tx = Transaction :: new ( & table) ;
199
206
let action = tx. update_schema ( ) ;
200
- assert ! ( action. deletes . is_empty( ) ) ;
207
+ assert ! ( action. drops . is_empty( ) ) ;
201
208
}
202
209
203
210
#[ test]
204
- fn test_fail_to_delete_identifier_column ( ) {
211
+ fn test_drop_column ( ) {
205
212
let table = make_v2_table ( ) ;
206
213
let tx = Transaction :: new ( & table) ;
207
214
let mut action = tx. update_schema ( ) ;
208
- let res = action. delete_column ( vec ! [ "x" . to_string( ) ] ) ;
215
+ action = action. drop_column ( "z" . to_string ( ) ) ;
216
+ assert_eq ! ( action. drops, HashSet :: from( [ ( "z" . to_string( ) ) ] ) ) ;
217
+ }
218
+
219
+ // Test invalid column drop: identifier ids get dropped.
220
+ #[ tokio:: test]
221
+ async fn test_drop_identifier_ids_with_catalog ( ) {
222
+ let file_io = FileIOBuilder :: new_fs_io ( ) . build ( ) . unwrap ( ) ;
223
+ let temp_dir = TempDir :: new ( ) . unwrap ( ) ;
224
+ let warehouse_location = temp_dir. path ( ) . to_str ( ) . unwrap ( ) . to_string ( ) ;
225
+
226
+ let table = make_v2_table ( ) ;
227
+ let mut memory_catalog = MemoryCatalog :: new ( file_io, Some ( warehouse_location. clone ( ) ) ) ;
228
+ let schema = table. metadata ( ) . current_schema ( ) . clone ( ) ;
229
+ create_table ( & mut memory_catalog, schema, & warehouse_location) . await ;
230
+
231
+ let mut tx = Transaction :: new ( & table) ;
232
+ let mut action = tx. update_schema ( ) ;
233
+ action = action. drop_column ( "x" . to_string ( ) ) ;
234
+ tx = action. apply ( tx) . unwrap ( ) ;
235
+
236
+ let res = tx. commit ( & memory_catalog) . await ;
209
237
assert ! ( res. is_err( ) ) ;
210
238
}
211
239
212
- #[ test]
213
- fn test_delete_non_identifier_column ( ) {
240
+ // Test empty columns drop with memory catalog.
241
+ #[ tokio:: test]
242
+ async fn test_drop_empty_columns_with_catalog ( ) {
243
+ let file_io = FileIOBuilder :: new_fs_io ( ) . build ( ) . unwrap ( ) ;
244
+ let temp_dir = TempDir :: new ( ) . unwrap ( ) ;
245
+ let warehouse_location = temp_dir. path ( ) . to_str ( ) . unwrap ( ) . to_string ( ) ;
246
+
214
247
let table = make_v2_table ( ) ;
215
- let tx = Transaction :: new ( & table) ;
248
+ let mut memory_catalog = MemoryCatalog :: new ( file_io, Some ( warehouse_location. clone ( ) ) ) ;
249
+ let schema = table. metadata ( ) . current_schema ( ) . clone ( ) ;
250
+ create_table ( & mut memory_catalog, schema, & warehouse_location) . await ;
251
+
252
+ let mut tx = Transaction :: new ( & table) ;
253
+ let action = tx. update_schema ( ) ;
254
+ tx = action. apply ( tx) . unwrap ( ) ;
255
+
256
+ let table = tx. commit ( & memory_catalog) . await . unwrap ( ) ;
257
+ let schema = table. metadata ( ) . current_schema ( ) ;
258
+ assert ! ( schema. field_by_id( /*field_id=*/ 1 ) . is_some( ) ) ;
259
+ assert ! ( schema. field_by_id( /*field_id=*/ 2 ) . is_some( ) ) ;
260
+ assert ! ( schema. field_by_id( /*field_id=*/ 3 ) . is_some( ) ) ;
261
+ assert_eq ! ( schema. highest_field_id( ) , 3 ) ;
262
+ assert_eq ! ( schema. identifier_field_ids( ) . len( ) , 2 ) ;
263
+ }
264
+
265
+ // Test column drop with memory catalog.
266
+ #[ tokio:: test]
267
+ async fn test_drop_columns_with_catalog ( ) {
268
+ let file_io = FileIOBuilder :: new_fs_io ( ) . build ( ) . unwrap ( ) ;
269
+ let temp_dir = TempDir :: new ( ) . unwrap ( ) ;
270
+ let warehouse_location = temp_dir. path ( ) . to_str ( ) . unwrap ( ) . to_string ( ) ;
271
+
272
+ let table = make_v2_table ( ) ;
273
+ let mut memory_catalog = MemoryCatalog :: new ( file_io, Some ( warehouse_location. clone ( ) ) ) ;
274
+ let schema = table. metadata ( ) . current_schema ( ) . clone ( ) ;
275
+ create_table ( & mut memory_catalog, schema, & warehouse_location) . await ;
276
+
277
+ let mut tx = Transaction :: new ( & table) ;
216
278
let mut action = tx. update_schema ( ) ;
217
- action. delete_column ( vec ! [ "z" . to_string( ) ] ) . unwrap ( ) ;
218
- assert_eq ! ( action. deletes, HashSet :: from( [ ( 3 ) ] ) ) ;
279
+ action = action. drop_column ( "z" . to_string ( ) ) ;
280
+ tx = action. apply ( tx) . unwrap ( ) ;
281
+
282
+ let table = tx. commit ( & memory_catalog) . await . unwrap ( ) ;
283
+ let schema = table. metadata ( ) . current_schema ( ) ;
284
+ assert ! ( schema. field_by_id( /*field_id=*/ 1 ) . is_some( ) ) ;
285
+ assert ! ( schema. field_by_id( /*field_id=*/ 2 ) . is_some( ) ) ;
286
+ assert ! ( schema. field_by_id( /*field_id=*/ 3 ) . is_none( ) ) ;
287
+ assert_eq ! ( schema. highest_field_id( ) , 2 ) ;
288
+ assert_eq ! ( schema. identifier_field_ids( ) . len( ) , 2 ) ;
219
289
}
220
290
221
- // Test column deletion with memory catalog.
291
+ // Test case insensitive column drop with memory catalog.
222
292
#[ tokio:: test]
223
- async fn test_delete_columns_with_catalog ( ) {
293
+ async fn test_drop_case_insensitive_columns_with_catalog ( ) {
224
294
let file_io = FileIOBuilder :: new_fs_io ( ) . build ( ) . unwrap ( ) ;
225
295
let temp_dir = TempDir :: new ( ) . unwrap ( ) ;
226
296
let warehouse_location = temp_dir. path ( ) . to_str ( ) . unwrap ( ) . to_string ( ) ;
@@ -232,7 +302,7 @@ mod tests {
232
302
233
303
let mut tx = Transaction :: new ( & table) ;
234
304
let mut action = tx. update_schema ( ) ;
235
- action. delete_column ( vec ! [ "z ". to_string( ) ] ) . unwrap ( ) ;
305
+ action = action . drop_column ( "Z ". to_string ( ) ) ;
236
306
tx = action. apply ( tx) . unwrap ( ) ;
237
307
238
308
let table = tx. commit ( & memory_catalog) . await . unwrap ( ) ;
0 commit comments