Skip to content

Commit bdb44ae

Browse files
authored
feat(catalog): Implement update_table for SqlCatalog (#1911)
## Which issue does this PR close? - Closes the SQL catalog part of #1389 ## What changes are included in this PR? - Implement `update_table()` for SQL catalog - Add corresponding `test_update_table` test ## Are these changes tested? Yes. Covered by new `test_update_table` test.
1 parent 2944ccb commit bdb44ae

File tree

1 file changed

+102
-5
lines changed

1 file changed

+102
-5
lines changed

crates/catalog/sql/src/catalog.rs

Lines changed: 102 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -917,11 +917,55 @@ impl Catalog for SqlCatalog {
917917
.build()?)
918918
}
919919

920-
async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
921-
Err(Error::new(
922-
ErrorKind::FeatureUnsupported,
923-
"Updating a table is not supported yet",
924-
))
920+
/// Updates an existing table within the SQL catalog.
921+
async fn update_table(&self, commit: TableCommit) -> Result<Table> {
922+
let table_ident = commit.identifier().clone();
923+
let current_table = self.load_table(&table_ident).await?;
924+
let current_metadata_location = current_table.metadata_location_result()?.to_string();
925+
926+
let staged_table = commit.apply(current_table)?;
927+
let staged_metadata_location = staged_table.metadata_location_result()?;
928+
929+
staged_table
930+
.metadata()
931+
.write_to(staged_table.file_io(), &staged_metadata_location)
932+
.await?;
933+
934+
let update_result = self
935+
.execute(
936+
&format!(
937+
"UPDATE {CATALOG_TABLE_NAME}
938+
SET {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?, {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} = ?
939+
WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
940+
AND {CATALOG_FIELD_TABLE_NAME} = ?
941+
AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
942+
AND (
943+
{CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
944+
OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
945+
)
946+
AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?"
947+
),
948+
vec![
949+
Some(staged_metadata_location),
950+
Some(current_metadata_location.as_str()),
951+
Some(&self.name),
952+
Some(table_ident.name()),
953+
Some(&table_ident.namespace().join(".")),
954+
Some(current_metadata_location.as_str()),
955+
],
956+
None,
957+
)
958+
.await?;
959+
960+
if update_result.rows_affected() == 0 {
961+
return Err(Error::new(
962+
ErrorKind::CatalogCommitConflicts,
963+
format!("Commit conflicted for table: {table_ident}"),
964+
)
965+
.with_retryable(true));
966+
}
967+
968+
Ok(staged_table)
925969
}
926970
}
927971

@@ -932,6 +976,7 @@ mod tests {
932976

933977
use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
934978
use iceberg::table::Table;
979+
use iceberg::transaction::{ApplyTransactionAction, Transaction};
935980
use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent};
936981
use itertools::Itertools;
937982
use regex::Regex;
@@ -2293,4 +2338,56 @@ mod tests {
22932338
assert_eq!(table.identifier(), expected_table.identifier());
22942339
assert_eq!(table.metadata_location(), Some(metadata_location.as_str()));
22952340
}
2341+
2342+
#[tokio::test]
2343+
async fn test_update_table() {
2344+
let warehouse_loc = temp_path();
2345+
let catalog = new_sql_catalog(warehouse_loc).await;
2346+
2347+
// Create a test namespace and table
2348+
let namespace_ident = NamespaceIdent::new("ns1".into());
2349+
create_namespace(&catalog, &namespace_ident).await;
2350+
let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
2351+
create_table(&catalog, &table_ident).await;
2352+
2353+
let table = catalog.load_table(&table_ident).await.unwrap();
2354+
2355+
// Store the original metadata location for comparison
2356+
let original_metadata_location = table.metadata_location().unwrap().to_string();
2357+
2358+
// Create a transaction to update the table
2359+
let tx = Transaction::new(&table);
2360+
let tx = tx
2361+
.update_table_properties()
2362+
.set("test_property".to_string(), "test_value".to_string())
2363+
.apply(tx)
2364+
.unwrap();
2365+
2366+
// Commit the transaction to the catalog
2367+
let updated_table = tx.commit(&catalog).await.unwrap();
2368+
2369+
// Verify the update was successful
2370+
assert_eq!(
2371+
updated_table.metadata().properties().get("test_property"),
2372+
Some(&"test_value".to_string())
2373+
);
2374+
// Verify the metadata location has been updated
2375+
assert_ne!(
2376+
updated_table.metadata_location().unwrap(),
2377+
original_metadata_location.as_str()
2378+
);
2379+
2380+
// Load the table again from the catalog to verify changes were persisted
2381+
let reloaded = catalog.load_table(&table_ident).await.unwrap();
2382+
2383+
// Verify the reloaded table matches the updated table
2384+
assert_eq!(
2385+
reloaded.metadata().properties().get("test_property"),
2386+
Some(&"test_value".to_string())
2387+
);
2388+
assert_eq!(
2389+
reloaded.metadata_location(),
2390+
updated_table.metadata_location()
2391+
);
2392+
}
22962393
}

0 commit comments

Comments
 (0)