Skip to content

Commit 7fde148

Browse files
authored
feat(catalog): Implement update_table for GlueCatalog (#1584)
## Which issue does this PR close? - Addresses the Glue part of #1389 ## What changes are included in this PR? - Implemented `update_table` for `GlueCatalog` - Added test for `GlueCatalog::update_table` - Fixed exception type in `RestCatalog` - Fixed a typo in `ErrorKind` ## Are these changes tested? added a test
1 parent 979cc07 commit 7fde148

File tree

4 files changed

+122
-7
lines changed

4 files changed

+122
-7
lines changed

crates/catalog/glue/src/catalog.rs

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
use std::collections::HashMap;
1919
use std::fmt::Debug;
2020

21+
use anyhow::anyhow;
2122
use async_trait::async_trait;
23+
use aws_sdk_glue::operation::update_table::UpdateTableError;
2224
use aws_sdk_glue::types::TableInput;
2325
use iceberg::io::{
2426
FileIO, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, S3_SESSION_TOKEN,
@@ -709,10 +711,56 @@ impl Catalog for GlueCatalog {
709711
))
710712
}
711713

712-
async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
713-
Err(Error::new(
714-
ErrorKind::FeatureUnsupported,
715-
"Updating a table is not supported yet",
716-
))
714+
async fn update_table(&self, commit: TableCommit) -> Result<Table> {
715+
let table_ident = commit.identifier().clone();
716+
let table_namespace = validate_namespace(table_ident.namespace())?;
717+
let current_table = self.load_table(&table_ident).await?;
718+
let current_metadata_location = current_table.metadata_location_result()?.to_string();
719+
720+
let staged_table = commit.apply(current_table)?;
721+
let staged_metadata_location = staged_table.metadata_location_result()?;
722+
723+
// Write new metadata
724+
staged_table
725+
.metadata()
726+
.write_to(staged_table.file_io(), staged_metadata_location)
727+
.await?;
728+
729+
// Persist staged table to Glue
730+
let builder = self
731+
.client
732+
.0
733+
.update_table()
734+
.database_name(table_namespace)
735+
.set_skip_archive(Some(true)) // todo make this configurable
736+
.table_input(convert_to_glue_table(
737+
table_ident.name(),
738+
staged_metadata_location.to_string(),
739+
staged_table.metadata(),
740+
staged_table.metadata().properties(),
741+
Some(current_metadata_location),
742+
)?);
743+
let builder = with_catalog_id!(builder, self.config);
744+
let _ = builder.send().await.map_err(|e| {
745+
let error = e.into_service_error();
746+
match error {
747+
UpdateTableError::EntityNotFoundException(_) => Error::new(
748+
ErrorKind::TableNotFound,
749+
format!("Table {table_ident} is not found"),
750+
),
751+
UpdateTableError::ConcurrentModificationException(_) => Error::new(
752+
ErrorKind::CatalogCommitConflicts,
753+
format!("Commit failed for table: {table_ident}"),
754+
)
755+
.with_retryable(true),
756+
_ => Error::new(
757+
ErrorKind::Unexpected,
758+
format!("Operation failed for table: {table_ident} for hitting aws sdk error"),
759+
),
760+
}
761+
.with_source(anyhow!("aws sdk error: {:?}", error))
762+
})?;
763+
764+
Ok(staged_table)
717765
}
718766
}

crates/catalog/glue/tests/glue_catalog_test.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use std::sync::RwLock;
2424
use ctor::{ctor, dtor};
2525
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
2626
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
27+
use iceberg::transaction::{ApplyTransactionAction, Transaction};
2728
use iceberg::{
2829
Catalog, CatalogBuilder, Namespace, NamespaceIdent, Result, TableCreation, TableIdent,
2930
};
@@ -402,3 +403,69 @@ async fn test_list_namespace() -> Result<()> {
402403

403404
Ok(())
404405
}
406+
407+
#[tokio::test]
408+
async fn test_update_table() -> Result<()> {
409+
let catalog = get_catalog().await;
410+
let creation = set_table_creation(None, "my_table")?;
411+
let namespace = Namespace::new(NamespaceIdent::new("test_update_table".into()));
412+
413+
catalog
414+
.create_namespace(namespace.name(), HashMap::new())
415+
.await?;
416+
417+
let expected = catalog.create_table(namespace.name(), creation).await?;
418+
419+
let table = catalog
420+
.load_table(&TableIdent::new(
421+
namespace.name().clone(),
422+
"my_table".to_string(),
423+
))
424+
.await?;
425+
426+
assert_eq!(table.identifier(), expected.identifier());
427+
assert_eq!(table.metadata_location(), expected.metadata_location());
428+
assert_eq!(table.metadata(), expected.metadata());
429+
430+
// Store the original metadata location for comparison
431+
let original_metadata_location = table.metadata_location();
432+
433+
// Update table properties using the transaction
434+
let tx = Transaction::new(&table);
435+
let tx = tx
436+
.update_table_properties()
437+
.set("test_property".to_string(), "test_value".to_string())
438+
.apply(tx)?;
439+
440+
// Commit the transaction to the catalog
441+
let updated_table = tx.commit(&catalog).await?;
442+
443+
// Verify the update was successful
444+
assert_eq!(
445+
updated_table.metadata().properties().get("test_property"),
446+
Some(&"test_value".to_string())
447+
);
448+
449+
// Verify the metadata location has been updated
450+
assert_ne!(
451+
updated_table.metadata_location(),
452+
original_metadata_location,
453+
"Metadata location should be updated after commit"
454+
);
455+
456+
// Load the table again from the catalog to verify changes were persisted
457+
let reloaded_table = catalog.load_table(table.identifier()).await?;
458+
459+
// Verify the reloaded table matches the updated table
460+
assert_eq!(
461+
reloaded_table.metadata().properties().get("test_property"),
462+
Some(&"test_value".to_string())
463+
);
464+
assert_eq!(
465+
reloaded_table.metadata_location(),
466+
updated_table.metadata_location(),
467+
"Reloaded table should have the same metadata location as the updated table"
468+
);
469+
470+
Ok(())
471+
}

crates/catalog/rest/src/catalog.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -917,7 +917,7 @@ impl Catalog for RestCatalog {
917917
StatusCode::OK => deserialize_catalog_response(http_response).await?,
918918
StatusCode::NOT_FOUND => {
919919
return Err(Error::new(
920-
ErrorKind::Unexpected,
920+
ErrorKind::TableNotFound,
921921
"Tried to update a table that does not exist",
922922
));
923923
}

crates/iceberg/src/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ pub enum ErrorKind {
5050
/// Iceberg table already exists at creation.
5151
TableAlreadyExists,
5252

53-
/// Iceberg namespace already exists at creation.
53+
/// Iceberg namespace does not exist.
5454
NamespaceNotFound,
5555

5656
/// Iceberg table does not exist.

0 commit comments

Comments
 (0)