Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 53 additions & 5 deletions crates/catalog/glue/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
use std::collections::HashMap;
use std::fmt::Debug;

use anyhow::anyhow;
use async_trait::async_trait;
use aws_sdk_glue::operation::update_table::UpdateTableError;
use aws_sdk_glue::types::TableInput;
use iceberg::io::{
FileIO, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, S3_SESSION_TOKEN,
Expand Down Expand Up @@ -709,10 +711,56 @@ impl Catalog for GlueCatalog {
))
}

async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
Err(Error::new(
ErrorKind::FeatureUnsupported,
"Updating a table is not supported yet",
))
async fn update_table(&self, commit: TableCommit) -> Result<Table> {
let table_ident = commit.identifier().clone();
let table_namespace = validate_namespace(table_ident.namespace())?;
let current_table = self.load_table(&table_ident).await?;
let current_metadata_location = current_table.metadata_location_result()?.to_string();

let staged_table = commit.apply(current_table)?;
let staged_metadata_location = staged_table.metadata_location_result()?;

// Write new metadata
staged_table
.metadata()
.write_to(staged_table.file_io(), staged_metadata_location)
.await?;

// Persist staged table to Glue
let builder = self
.client
.0
.update_table()
.database_name(table_namespace)
.set_skip_archive(Some(true)) // todo make this configurable
.table_input(convert_to_glue_table(
table_ident.name(),
staged_metadata_location.to_string(),
staged_table.metadata(),
staged_table.metadata().properties(),
Some(current_metadata_location),
)?);
let builder = with_catalog_id!(builder, self.config);
let _ = builder.send().await.map_err(|e| {
let error = e.into_service_error();
match error {
UpdateTableError::EntityNotFoundException(_) => Error::new(
ErrorKind::TableNotFound,
format!("Table {table_ident} is not found"),
),
UpdateTableError::ConcurrentModificationException(_) => Error::new(
ErrorKind::CatalogCommitConflicts,
format!("Commit failed for table: {table_ident}"),
)
.with_retryable(true),
_ => Error::new(
ErrorKind::Unexpected,
format!("Operation failed for table: {table_ident} for hitting aws sdk error"),
),
}
.with_source(anyhow!("aws sdk error: {:?}", error))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.with_source(anyhow!("aws sdk error: {:?}", error))
.with_source(anyhow!("AWS SDK error: {:?}", error))

nit

})?;

Ok(staged_table)
}
}
67 changes: 67 additions & 0 deletions crates/catalog/glue/tests/glue_catalog_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::sync::RwLock;
use ctor::{ctor, dtor};
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg::transaction::{ApplyTransactionAction, Transaction};
use iceberg::{
Catalog, CatalogBuilder, Namespace, NamespaceIdent, Result, TableCreation, TableIdent,
};
Expand Down Expand Up @@ -402,3 +403,69 @@ async fn test_list_namespace() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_update_table() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should there be tests for non-happy paths?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good point, but I didn't test conflicts, because currently it's tricky to forge conflicts that are non-retryable without more complicated transaction actions like overwrite or rewrite

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there isn't a way to mock the response?

let catalog = get_catalog().await;
let creation = set_table_creation(None, "my_table")?;
let namespace = Namespace::new(NamespaceIdent::new("test_update_table".into()));

catalog
.create_namespace(namespace.name(), HashMap::new())
.await?;

let expected = catalog.create_table(namespace.name(), creation).await?;

let table = catalog
.load_table(&TableIdent::new(
namespace.name().clone(),
"my_table".to_string(),
))
.await?;

assert_eq!(table.identifier(), expected.identifier());
assert_eq!(table.metadata_location(), expected.metadata_location());
assert_eq!(table.metadata(), expected.metadata());

// Store the original metadata location for comparison
let original_metadata_location = table.metadata_location();

// Update table properties using the transaction
let tx = Transaction::new(&table);
let tx = tx
.update_table_properties()
.set("test_property".to_string(), "test_value".to_string())
.apply(tx)?;

// Commit the transaction to the catalog
let updated_table = tx.commit(&catalog).await?;

// Verify the update was successful
assert_eq!(
updated_table.metadata().properties().get("test_property"),
Some(&"test_value".to_string())
);

// Verify the metadata location has been updated
assert_ne!(
updated_table.metadata_location(),
original_metadata_location,
"Metadata location should be updated after commit"
);

// Load the table again from the catalog to verify changes were persisted
let reloaded_table = catalog.load_table(table.identifier()).await?;

// Verify the reloaded table matches the updated table
assert_eq!(
reloaded_table.metadata().properties().get("test_property"),
Some(&"test_value".to_string())
);
assert_eq!(
reloaded_table.metadata_location(),
updated_table.metadata_location(),
"Reloaded table should have the same metadata location as the updated table"
);

Ok(())
}
2 changes: 1 addition & 1 deletion crates/catalog/rest/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ impl Catalog for RestCatalog {
StatusCode::OK => deserialize_catalog_response(http_response).await?,
StatusCode::NOT_FOUND => {
return Err(Error::new(
ErrorKind::Unexpected,
ErrorKind::TableNotFound,
"Tried to update a table that does not exist",
));
}
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub enum ErrorKind {
/// Iceberg table already exists at creation.
TableAlreadyExists,

/// Iceberg namespace already exists at creation.
/// Iceberg namespace does not exist.
NamespaceNotFound,

/// Iceberg table does not exist.
Expand Down
Loading