Skip to content

Commit 444e7d0

Browse files
committed
delete column implementation
1 parent fb6fef3 commit 444e7d0

File tree

2 files changed

+152
-0
lines changed

2 files changed

+152
-0
lines changed

crates/iceberg/src/transaction/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ mod snapshot;
6060
mod sort_order;
6161
mod update_location;
6262
mod update_properties;
63+
mod update_schema;
6364
mod update_statistics;
6465
mod upgrade_format_version;
6566

@@ -81,6 +82,7 @@ use crate::transaction::append::FastAppendAction;
8182
use crate::transaction::sort_order::ReplaceSortOrderAction;
8283
use crate::transaction::update_location::UpdateLocationAction;
8384
use crate::transaction::update_properties::UpdatePropertiesAction;
85+
use crate::transaction::update_schema::UpdateSchemaAction;
8486
use crate::transaction::update_statistics::UpdateStatisticsAction;
8587
use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction;
8688
use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate};
@@ -163,6 +165,12 @@ impl Transaction {
163165
UpdateStatisticsAction::new()
164166
}
165167

168+
/// Update schema of the table.
169+
pub fn update_schema(&self) -> UpdateSchemaAction {
170+
let current_schema = self.table.metadata().current_schema().clone();
171+
UpdateSchemaAction::new(current_schema)
172+
}
173+
166174
/// Commit transaction.
167175
pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
168176
if self.actions.is_empty() {
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::{HashMap, HashSet};
19+
use std::sync::Arc;
20+
21+
use async_trait::async_trait;
22+
use uuid::Uuid;
23+
24+
use crate::error::{Error, ErrorKind, Result};
25+
use crate::spec::{Schema, SchemaBuilder, SchemaRef};
26+
use crate::table::Table;
27+
use crate::transaction::snapshot::SnapshotProducer;
28+
use crate::transaction::{ActionCommit, TransactionAction};
29+
use crate::{TableRequirement, TableUpdate};
30+
31+
/// UpdateSchemaAction is a transaction action for performing schema evolution to the table.
32+
pub struct UpdateSchemaAction {
33+
/// Schema update attributes.
34+
case_sensitive: bool,
35+
/// Current schema before update.
36+
schema: SchemaRef,
37+
/// Current field ids.
38+
identifier_field_ids: HashSet<i32>,
39+
/// Columns to drop on the table.
40+
deletes: HashSet<i32>,
41+
}
42+
43+
impl UpdateSchemaAction {
44+
pub(crate) fn new(schema: SchemaRef) -> Self {
45+
let identifier_field_ids = schema.identifier_field_ids().collect::<HashSet<i32>>();
46+
Self {
47+
case_sensitive: false,
48+
schema,
49+
identifier_field_ids,
50+
deletes: HashSet::new(),
51+
}
52+
}
53+
54+
/// Set case sensitivity when updating schema by column names.
55+
pub fn set_case_sensitivity(mut self, case_sensitivity: bool) -> Self {
56+
self.case_sensitive = case_sensitivity;
57+
self
58+
}
59+
60+
/// Deletes a column from a table.
61+
///
62+
/// # Arguments
63+
///
64+
/// * `column_name` - The path to the column.
65+
///
66+
/// # Returns
67+
///
68+
/// Returns the `UpdateSchema` with the delete operation staged.
69+
pub fn delete_column(&mut self, column_name: Vec<String>) -> Result<&mut Self> {
70+
let full_name = column_name.join(".");
71+
72+
// Get field id to drop.
73+
let field = if self.case_sensitive {
74+
self.schema.field_by_name(&full_name)
75+
} else {
76+
self.schema.field_by_name_case_insensitive(&full_name)
77+
}
78+
.ok_or_else(|| {
79+
Error::new(
80+
ErrorKind::DataInvalid,
81+
format!(
82+
"Delete column name,'{}' , doesn't exist in the schema",
83+
full_name
84+
),
85+
)
86+
})?;
87+
88+
// Validate columns to drop cannot be the table identifier.
89+
if self.identifier_field_ids.contains(&field.id) {
90+
return Err(Error::new(
91+
ErrorKind::PreconditionFailed,
92+
format!(
93+
"Column '{}' is the table identifier, which canot be dropped.",
94+
full_name
95+
),
96+
));
97+
}
98+
99+
self.deletes.insert(field.id);
100+
101+
Ok(self)
102+
}
103+
104+
/// Get updated schema.
105+
fn get_updated_schema(&self) -> Result<Schema> {
106+
let old_schema_id = self.schema.schema_id();
107+
let new_schema_id = old_schema_id + 1;
108+
109+
let mut new_fields = vec![];
110+
for (field_id, field) in self.schema.field_id_to_fields() {
111+
if self.deletes.contains(field_id) {
112+
continue;
113+
}
114+
new_fields.push(field.clone());
115+
}
116+
117+
let schema_builder = Schema::builder();
118+
let new_schema = schema_builder
119+
.with_schema_id(new_schema_id)
120+
.with_identifier_field_ids(self.identifier_field_ids.clone())
121+
.with_fields(new_fields)
122+
.build()?;
123+
Ok(new_schema)
124+
}
125+
}
126+
127+
#[async_trait]
128+
impl TransactionAction for UpdateSchemaAction {
129+
async fn commit(self: Arc<Self>, _table: &Table) -> Result<ActionCommit> {
130+
let mut updates: Vec<TableUpdate> = vec![];
131+
let requirements: Vec<TableRequirement> = vec![];
132+
if self.deletes.is_empty() {
133+
return Ok(ActionCommit::new(updates, requirements));
134+
}
135+
136+
let new_schema = self.get_updated_schema()?;
137+
let new_schema_id = new_schema.schema_id();
138+
updates.push(TableUpdate::AddSchema { schema: new_schema });
139+
updates.push(TableUpdate::SetCurrentSchema {
140+
schema_id: new_schema_id,
141+
});
142+
Ok(ActionCommit::new(updates, requirements))
143+
}
144+
}

0 commit comments

Comments
 (0)