Skip to content

Commit 79f9787

Browse files
CTTYYiyang-C
authored andcommitted
feat(catalog): Implement update_table for GlueCatalog (apache#1584)
## Which issue does this PR close? - Addresses the Glue part of apache#1389 ## What changes are included in this PR? - Implemented `update_table` for `GlueCatalog` - Added test for `GlueCatalog::update_table` - Fixed exception type in `RestCatalog` - Fixed a typo in `ErrorKind` ## Are these changes tested? added a test
1 parent 1619e57 commit 79f9787

File tree

4 files changed

+122
-7
lines changed

4 files changed

+122
-7
lines changed

crates/catalog/glue/src/catalog.rs

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
use std::collections::HashMap;
1919
use std::fmt::Debug;
2020

21+
use anyhow::anyhow;
2122
use async_trait::async_trait;
23+
use aws_sdk_glue::operation::update_table::UpdateTableError;
2224
use aws_sdk_glue::types::TableInput;
2325
use iceberg::io::{
2426
FileIO, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, S3_SESSION_TOKEN,
@@ -769,10 +771,56 @@ impl Catalog for GlueCatalog {
769771
}
770772
}
771773

772-
async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
773-
Err(Error::new(
774-
ErrorKind::FeatureUnsupported,
775-
"Updating a table is not supported yet",
776-
))
774+
async fn update_table(&self, commit: TableCommit) -> Result<Table> {
775+
let table_ident = commit.identifier().clone();
776+
let table_namespace = validate_namespace(table_ident.namespace())?;
777+
let current_table = self.load_table(&table_ident).await?;
778+
let current_metadata_location = current_table.metadata_location_result()?.to_string();
779+
780+
let staged_table = commit.apply(current_table)?;
781+
let staged_metadata_location = staged_table.metadata_location_result()?;
782+
783+
// Write new metadata
784+
staged_table
785+
.metadata()
786+
.write_to(staged_table.file_io(), staged_metadata_location)
787+
.await?;
788+
789+
// Persist staged table to Glue
790+
let builder = self
791+
.client
792+
.0
793+
.update_table()
794+
.database_name(table_namespace)
795+
.set_skip_archive(Some(true)) // todo make this configurable
796+
.table_input(convert_to_glue_table(
797+
table_ident.name(),
798+
staged_metadata_location.to_string(),
799+
staged_table.metadata(),
800+
staged_table.metadata().properties(),
801+
Some(current_metadata_location),
802+
)?);
803+
let builder = with_catalog_id!(builder, self.config);
804+
let _ = builder.send().await.map_err(|e| {
805+
let error = e.into_service_error();
806+
match error {
807+
UpdateTableError::EntityNotFoundException(_) => Error::new(
808+
ErrorKind::TableNotFound,
809+
format!("Table {table_ident} is not found"),
810+
),
811+
UpdateTableError::ConcurrentModificationException(_) => Error::new(
812+
ErrorKind::CatalogCommitConflicts,
813+
format!("Commit failed for table: {table_ident}"),
814+
)
815+
.with_retryable(true),
816+
_ => Error::new(
817+
ErrorKind::Unexpected,
818+
format!("Operation failed for table: {table_ident} for hitting aws sdk error"),
819+
),
820+
}
821+
.with_source(anyhow!("aws sdk error: {:?}", error))
822+
})?;
823+
824+
Ok(staged_table)
777825
}
778826
}

crates/catalog/glue/tests/glue_catalog_test.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use std::sync::RwLock;
2424
use ctor::{ctor, dtor};
2525
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
2626
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
27+
use iceberg::transaction::{ApplyTransactionAction, Transaction};
2728
use iceberg::{
2829
Catalog, CatalogBuilder, Namespace, NamespaceIdent, Result, TableCreation, TableIdent,
2930
};
@@ -431,3 +432,69 @@ async fn test_register_table() -> Result<()> {
431432

432433
Ok(())
433434
}
435+
436+
#[tokio::test]
437+
async fn test_update_table() -> Result<()> {
438+
let catalog = get_catalog().await;
439+
let creation = set_table_creation(None, "my_table")?;
440+
let namespace = Namespace::new(NamespaceIdent::new("test_update_table".into()));
441+
442+
catalog
443+
.create_namespace(namespace.name(), HashMap::new())
444+
.await?;
445+
446+
let expected = catalog.create_table(namespace.name(), creation).await?;
447+
448+
let table = catalog
449+
.load_table(&TableIdent::new(
450+
namespace.name().clone(),
451+
"my_table".to_string(),
452+
))
453+
.await?;
454+
455+
assert_eq!(table.identifier(), expected.identifier());
456+
assert_eq!(table.metadata_location(), expected.metadata_location());
457+
assert_eq!(table.metadata(), expected.metadata());
458+
459+
// Store the original metadata location for comparison
460+
let original_metadata_location = table.metadata_location();
461+
462+
// Update table properties using the transaction
463+
let tx = Transaction::new(&table);
464+
let tx = tx
465+
.update_table_properties()
466+
.set("test_property".to_string(), "test_value".to_string())
467+
.apply(tx)?;
468+
469+
// Commit the transaction to the catalog
470+
let updated_table = tx.commit(&catalog).await?;
471+
472+
// Verify the update was successful
473+
assert_eq!(
474+
updated_table.metadata().properties().get("test_property"),
475+
Some(&"test_value".to_string())
476+
);
477+
478+
// Verify the metadata location has been updated
479+
assert_ne!(
480+
updated_table.metadata_location(),
481+
original_metadata_location,
482+
"Metadata location should be updated after commit"
483+
);
484+
485+
// Load the table again from the catalog to verify changes were persisted
486+
let reloaded_table = catalog.load_table(table.identifier()).await?;
487+
488+
// Verify the reloaded table matches the updated table
489+
assert_eq!(
490+
reloaded_table.metadata().properties().get("test_property"),
491+
Some(&"test_value".to_string())
492+
);
493+
assert_eq!(
494+
reloaded_table.metadata_location(),
495+
updated_table.metadata_location(),
496+
"Reloaded table should have the same metadata location as the updated table"
497+
);
498+
499+
Ok(())
500+
}

crates/catalog/rest/src/catalog.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -917,7 +917,7 @@ impl Catalog for RestCatalog {
917917
StatusCode::OK => deserialize_catalog_response(http_response).await?,
918918
StatusCode::NOT_FOUND => {
919919
return Err(Error::new(
920-
ErrorKind::Unexpected,
920+
ErrorKind::TableNotFound,
921921
"Tried to update a table that does not exist",
922922
));
923923
}

crates/iceberg/src/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ pub enum ErrorKind {
5050
/// Iceberg table already exists at creation.
5151
TableAlreadyExists,
5252

53-
/// Iceberg namespace already exists at creation.
53+
/// Iceberg namespace does not exist.
5454
NamespaceNotFound,
5555

5656
/// Iceberg table does not exist.

0 commit comments

Comments
 (0)