diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index ccf2b3e5d1..42d8a9cab4 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -23,6 +23,7 @@ use aws_sdk_s3tables::operation::create_table::CreateTableOutput; use aws_sdk_s3tables::operation::get_namespace::GetNamespaceOutput; use aws_sdk_s3tables::operation::get_table::GetTableOutput; use aws_sdk_s3tables::operation::list_tables::ListTablesOutput; +use aws_sdk_s3tables::operation::update_table_metadata_location::UpdateTableMetadataLocationError; use aws_sdk_s3tables::types::OpenTableFormat; use iceberg::io::{FileIO, FileIOBuilder}; use iceberg::spec::{TableMetadata, TableMetadataBuilder}; @@ -80,6 +81,39 @@ impl S3TablesCatalog { file_io, }) } + + async fn load_table_with_version_token( + &self, + table_ident: &TableIdent, + ) -> Result<(Table, String)> { + let req = self + .s3tables_client + .get_table() + .table_bucket_arn(self.config.table_bucket_arn.clone()) + .namespace(table_ident.namespace().to_url_string()) + .name(table_ident.name()); + let resp: GetTableOutput = req.send().await.map_err(from_aws_sdk_error)?; + + // when a table is created, it's possible that the metadata location is not set. + let metadata_location = resp.metadata_location().ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + format!( + "Table {} does not have metadata location", + table_ident.name() + ), + ) + })?; + let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?; + + let table = Table::builder() + .identifier(table_ident.clone()) + .metadata(metadata) + .metadata_location(metadata_location) + .file_io(self.file_io.clone()) + .build()?; + Ok((table, resp.version_token)) + } } #[async_trait] @@ -368,33 +402,7 @@ impl Catalog for S3TablesCatalog { /// - Errors from the underlying database query process, converted using /// `from_aws_sdk_error`. async fn load_table(&self, table_ident: &TableIdent) -> Result { - let req = self - .s3tables_client - .get_table() - .table_bucket_arn(self.config.table_bucket_arn.clone()) - .namespace(table_ident.namespace().to_url_string()) - .name(table_ident.name()); - let resp: GetTableOutput = req.send().await.map_err(from_aws_sdk_error)?; - - // when a table is created, it's possible that the metadata location is not set. - let metadata_location = resp.metadata_location().ok_or_else(|| { - Error::new( - ErrorKind::Unexpected, - format!( - "Table {} does not have metadata location", - table_ident.name() - ), - ) - })?; - let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?; - - let table = Table::builder() - .identifier(table_ident.clone()) - .metadata(metadata) - .metadata_location(metadata_location) - .file_io(self.file_io.clone()) - .build()?; - Ok(table) + Ok(self.load_table_with_version_token(table_ident).await?.0) } /// Drops an existing table from the s3tables catalog. @@ -480,13 +488,50 @@ impl Catalog for S3TablesCatalog { } /// Updates an existing table within the s3tables catalog. - /// - /// This function is still in development and will always return an error. - async fn update_table(&self, _commit: TableCommit) -> Result
{ - Err(Error::new( - ErrorKind::FeatureUnsupported, - "Updating a table is not supported yet", - )) + async fn update_table(&self, commit: TableCommit) -> Result
{ + let table_ident = commit.identifier().clone(); + let table_namespace = table_ident.namespace(); + let (current_table, version_token) = + self.load_table_with_version_token(&table_ident).await?; + + let staged_table = commit.apply(current_table)?; + let staged_metadata_location = staged_table.metadata_location_result()?; + + staged_table + .metadata() + .write_to(staged_table.file_io(), staged_metadata_location) + .await?; + + let builder = self + .s3tables_client + .update_table_metadata_location() + .table_bucket_arn(&self.config.table_bucket_arn) + .namespace(table_namespace.to_url_string()) + .name(table_ident.name()) + .version_token(version_token) + .metadata_location(staged_metadata_location); + + let _ = builder.send().await.map_err(|e| { + let error = e.into_service_error(); + match error { + UpdateTableMetadataLocationError::ConflictException(_) => Error::new( + ErrorKind::CatalogCommitConflicts, + format!("Commit conflicted for table: {table_ident}"), + ) + .with_retryable(true), + UpdateTableMetadataLocationError::NotFoundException(_) => Error::new( + ErrorKind::TableNotFound, + format!("Table {table_ident} is not found"), + ), + _ => Error::new( + ErrorKind::Unexpected, + "Operation failed for hitting aws sdk error", + ), + } + .with_source(anyhow!("aws sdk error: {:?}", error)) + })?; + + Ok(staged_table) } } @@ -503,6 +548,7 @@ where T: std::fmt::Debug { #[cfg(test)] mod tests { use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; + use iceberg::transaction::{ApplyTransactionAction, Transaction}; use super::*; @@ -624,4 +670,89 @@ mod tests { assert!(!catalog.table_exists(&table_ident).await.unwrap()); catalog.drop_namespace(&namespace).await.unwrap(); } + + #[tokio::test] + async fn test_s3tables_update_table() { + let catalog = match load_s3tables_catalog_from_env().await { + Ok(Some(catalog)) => catalog, + Ok(None) => return, + Err(e) => panic!("Error loading catalog: {}", e), + }; + + // Create a test namespace and table + let namespace = NamespaceIdent::new("test_s3tables_update_table".to_string()); + let table_ident = + TableIdent::new(namespace.clone(), "test_s3tables_update_table".to_string()); + + // Clean up any existing resources from previous test runs + catalog.drop_table(&table_ident).await.ok(); + catalog.drop_namespace(&namespace).await.ok(); + + // Create namespace and table + catalog + .create_namespace(&namespace, HashMap::new()) + .await + .unwrap(); + + let creation = { + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + TableCreation::builder() + .name(table_ident.name().to_string()) + .properties(HashMap::new()) + .schema(schema) + .build() + }; + + let table = catalog.create_table(&namespace, creation).await.unwrap(); + + // Create a transaction to update the table + let tx = Transaction::new(&table); + + // Store the original metadata location for comparison + let original_metadata_location = table.metadata_location(); + + // Update table properties using the transaction + let tx = tx + .update_table_properties() + .set("test_property".to_string(), "test_value".to_string()) + .apply(tx) + .unwrap(); + + // Commit the transaction to the catalog + let updated_table = tx.commit(&catalog).await.unwrap(); + + // Verify the update was successful + assert_eq!( + updated_table.metadata().properties().get("test_property"), + Some(&"test_value".to_string()) + ); + + // Verify the metadata location has been updated + assert_ne!( + updated_table.metadata_location(), + original_metadata_location, + "Metadata location should be updated after commit" + ); + + // Load the table again from the catalog to verify changes were persisted + let reloaded_table = catalog.load_table(&table_ident).await.unwrap(); + + // Verify the reloaded table matches the updated table + assert_eq!( + reloaded_table.metadata().properties().get("test_property"), + Some(&"test_value".to_string()) + ); + assert_eq!( + reloaded_table.metadata_location(), + updated_table.metadata_location(), + "Reloaded table should have the same metadata location as the updated table" + ); + } }