diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index fb4bd36b8d..66f7f4c01a 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -18,7 +18,9 @@ use std::collections::HashMap; use std::fmt::Debug; +use anyhow::anyhow; use async_trait::async_trait; +use aws_sdk_glue::operation::update_table::UpdateTableError; use aws_sdk_glue::types::TableInput; use iceberg::io::{ FileIO, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, S3_SESSION_TOKEN, @@ -635,10 +637,56 @@ impl Catalog for GlueCatalog { )) } - async fn update_table(&self, _commit: TableCommit) -> Result { - Err(Error::new( - ErrorKind::FeatureUnsupported, - "Updating a table is not supported yet", - )) + async fn update_table(&self, commit: TableCommit) -> Result
{ + let table_ident = commit.identifier().clone(); + let table_namespace = validate_namespace(table_ident.namespace())?; + let current_table = self.load_table(&table_ident).await?; + let current_metadata_location = current_table.metadata_location_result()?.to_string(); + + let staged_table = commit.apply(current_table)?; + let staged_metadata_location = staged_table.metadata_location_result()?; + + // Write new metadata + staged_table + .metadata() + .write_to(staged_table.file_io(), staged_metadata_location) + .await?; + + // Persist staged table to Glue + let builder = self + .client + .0 + .update_table() + .database_name(table_namespace) + .set_skip_archive(Some(true)) // todo make this configurable + .table_input(convert_to_glue_table( + table_ident.name(), + staged_metadata_location.to_string(), + staged_table.metadata(), + staged_table.metadata().properties(), + Some(current_metadata_location), + )?); + let builder = with_catalog_id!(builder, self.config); + let _ = builder.send().await.map_err(|e| { + let error = e.into_service_error(); + match error { + UpdateTableError::EntityNotFoundException(_) => Error::new( + ErrorKind::TableNotFound, + format!("Table {table_ident} is not found"), + ), + UpdateTableError::ConcurrentModificationException(_) => Error::new( + ErrorKind::CatalogCommitConflicts, + format!("Commit failed for table: {table_ident}"), + ) + .with_retryable(true), + _ => Error::new( + ErrorKind::Unexpected, + "Operation failed for hitting aws sdk error", + ), + } + .with_source(anyhow!("aws sdk error: {:?}", error)) + })?; + + Ok(staged_table) } } diff --git a/crates/catalog/glue/tests/glue_catalog_test.rs b/crates/catalog/glue/tests/glue_catalog_test.rs index bec9494fe9..76d8be8140 100644 --- a/crates/catalog/glue/tests/glue_catalog_test.rs +++ b/crates/catalog/glue/tests/glue_catalog_test.rs @@ -24,6 +24,7 @@ use std::sync::RwLock; use ctor::{ctor, dtor}; use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; +use iceberg::transaction::{ApplyTransactionAction, Transaction}; use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCreation, TableIdent}; use iceberg_catalog_glue::{ AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY, GlueCatalog, GlueCatalogConfig, @@ -367,3 +368,69 @@ async fn test_list_namespace() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_update_table() -> Result<()> { + let catalog = get_catalog().await; + let creation = set_table_creation(None, "my_table")?; + let namespace = Namespace::new(NamespaceIdent::new("test_update_table".into())); + + catalog + .create_namespace(namespace.name(), HashMap::new()) + .await?; + + let expected = catalog.create_table(namespace.name(), creation).await?; + + let table = catalog + .load_table(&TableIdent::new( + namespace.name().clone(), + "my_table".to_string(), + )) + .await?; + + assert_eq!(table.identifier(), expected.identifier()); + assert_eq!(table.metadata_location(), expected.metadata_location()); + assert_eq!(table.metadata(), expected.metadata()); + + // Store the original metadata location for comparison + let original_metadata_location = table.metadata_location(); + + // Update table properties using the transaction + let tx = Transaction::new(&table); + let tx = tx + .update_table_properties() + .set("test_property".to_string(), "test_value".to_string()) + .apply(tx)?; + + // Commit the transaction to the catalog + let updated_table = tx.commit(&catalog).await?; + + // Verify the update was successful + assert_eq!( + updated_table.metadata().properties().get("test_property"), + Some(&"test_value".to_string()) + ); + + // Verify the metadata location has been updated + assert_ne!( + updated_table.metadata_location(), + original_metadata_location, + "Metadata location should be updated after commit" + ); + + // Load the table again from the catalog to verify changes were persisted + let reloaded_table = catalog.load_table(table.identifier()).await?; + + // Verify the reloaded table matches the updated table + assert_eq!( + reloaded_table.metadata().properties().get("test_property"), + Some(&"test_value".to_string()) + ); + assert_eq!( + reloaded_table.metadata_location(), + updated_table.metadata_location(), + "Reloaded table should have the same metadata location as the updated table" + ); + + Ok(()) +} diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index f6c6336247..0522f1971d 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -837,7 +837,7 @@ impl Catalog for RestCatalog { StatusCode::OK => deserialize_catalog_response(http_response).await?, StatusCode::NOT_FOUND => { return Err(Error::new( - ErrorKind::Unexpected, + ErrorKind::TableNotFound, "Tried to update a table that does not exist", )); } diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs index 7ae01f1b51..6ab3a78c8b 100644 --- a/crates/iceberg/src/error.rs +++ b/crates/iceberg/src/error.rs @@ -50,7 +50,7 @@ pub enum ErrorKind { /// Iceberg table already exists at creation. TableAlreadyExists, - /// Iceberg namespace already exists at creation. + /// Iceberg namespace does not exist. NamespaceNotFound, /// Iceberg table does not exist.