Skip to content

feat(catalog): Implement update_table for GlueCatalog #1584

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 53 additions & 5 deletions crates/catalog/glue/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
use std::collections::HashMap;
use std::fmt::Debug;

use anyhow::anyhow;
use async_trait::async_trait;
use aws_sdk_glue::operation::update_table::UpdateTableError;
use aws_sdk_glue::types::TableInput;
use iceberg::io::{
FileIO, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, S3_SESSION_TOKEN,
Expand Down Expand Up @@ -635,10 +637,56 @@ impl Catalog for GlueCatalog {
))
}

async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
Err(Error::new(
ErrorKind::FeatureUnsupported,
"Updating a table is not supported yet",
))
async fn update_table(&self, commit: TableCommit) -> Result<Table> {
let table_ident = commit.identifier().clone();
let table_namespace = validate_namespace(table_ident.namespace())?;
let current_table = self.load_table(&table_ident).await?;
let current_metadata_location = current_table.metadata_location_result()?.to_string();

let staged_table = commit.apply(current_table)?;
let staged_metadata_location = staged_table.metadata_location_result()?;

// Write new metadata
staged_table
.metadata()
.write_to(staged_table.file_io(), staged_metadata_location)
.await?;

// Persist staged table to Glue
let builder = self
.client
.0
.update_table()
.database_name(table_namespace)
.set_skip_archive(Some(true)) // todo make this configurable
.table_input(convert_to_glue_table(
table_ident.name(),
staged_metadata_location.to_string(),
staged_table.metadata(),
staged_table.metadata().properties(),
Some(current_metadata_location),
)?);
let builder = with_catalog_id!(builder, self.config);
let _ = builder.send().await.map_err(|e| {
let error = e.into_service_error();
match error {
UpdateTableError::EntityNotFoundException(_) => Error::new(
ErrorKind::TableNotFound,
format!("Table {table_ident} is not found"),
),
UpdateTableError::ConcurrentModificationException(_) => Error::new(
ErrorKind::CatalogCommitConflicts,
format!("Commit failed for table: {table_ident}"),
)
.with_retryable(true),
_ => Error::new(
ErrorKind::Unexpected,
"Operation failed for hitting aws sdk error",
),
}
.with_source(anyhow!("aws sdk error: {:?}", error))
})?;

Ok(staged_table)
}
}
67 changes: 67 additions & 0 deletions crates/catalog/glue/tests/glue_catalog_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::sync::RwLock;
use ctor::{ctor, dtor};
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg::transaction::{ApplyTransactionAction, Transaction};
use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCreation, TableIdent};
use iceberg_catalog_glue::{
AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY, GlueCatalog, GlueCatalogConfig,
Expand Down Expand Up @@ -367,3 +368,69 @@ async fn test_list_namespace() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_update_table() -> Result<()> {
let catalog = get_catalog().await;
let creation = set_table_creation(None, "my_table")?;
let namespace = Namespace::new(NamespaceIdent::new("test_update_table".into()));

catalog
.create_namespace(namespace.name(), HashMap::new())
.await?;

let expected = catalog.create_table(namespace.name(), creation).await?;

let table = catalog
.load_table(&TableIdent::new(
namespace.name().clone(),
"my_table".to_string(),
))
.await?;

assert_eq!(table.identifier(), expected.identifier());
assert_eq!(table.metadata_location(), expected.metadata_location());
assert_eq!(table.metadata(), expected.metadata());

// Store the original metadata location for comparison
let original_metadata_location = table.metadata_location();

// Update table properties using the transaction
let tx = Transaction::new(&table);
let tx = tx
.update_table_properties()
.set("test_property".to_string(), "test_value".to_string())
.apply(tx)?;

// Commit the transaction to the catalog
let updated_table = tx.commit(&catalog).await?;

// Verify the update was successful
assert_eq!(
updated_table.metadata().properties().get("test_property"),
Some(&"test_value".to_string())
);

// Verify the metadata location has been updated
assert_ne!(
updated_table.metadata_location(),
original_metadata_location,
"Metadata location should be updated after commit"
);

// Load the table again from the catalog to verify changes were persisted
let reloaded_table = catalog.load_table(table.identifier()).await?;

// Verify the reloaded table matches the updated table
assert_eq!(
reloaded_table.metadata().properties().get("test_property"),
Some(&"test_value".to_string())
);
assert_eq!(
reloaded_table.metadata_location(),
updated_table.metadata_location(),
"Reloaded table should have the same metadata location as the updated table"
);

Ok(())
}
2 changes: 1 addition & 1 deletion crates/catalog/rest/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ impl Catalog for RestCatalog {
StatusCode::OK => deserialize_catalog_response(http_response).await?,
StatusCode::NOT_FOUND => {
return Err(Error::new(
ErrorKind::Unexpected,
ErrorKind::TableNotFound,
"Tried to update a table that does not exist",
));
}
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub enum ErrorKind {
/// Iceberg table already exists at creation.
TableAlreadyExists,

/// Iceberg namespace already exists at creation.
/// Iceberg namespace does not exist.
NamespaceNotFound,

/// Iceberg table does not exist.
Expand Down
Loading