Skip to content

feat(catalog): Implement update_table for S3TablesCatalog #1594

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 165 additions & 34 deletions crates/catalog/s3tables/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use aws_sdk_s3tables::operation::create_table::CreateTableOutput;
use aws_sdk_s3tables::operation::get_namespace::GetNamespaceOutput;
use aws_sdk_s3tables::operation::get_table::GetTableOutput;
use aws_sdk_s3tables::operation::list_tables::ListTablesOutput;
use aws_sdk_s3tables::operation::update_table_metadata_location::UpdateTableMetadataLocationError;
use aws_sdk_s3tables::types::OpenTableFormat;
use iceberg::io::{FileIO, FileIOBuilder};
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
Expand Down Expand Up @@ -80,6 +81,39 @@ impl S3TablesCatalog {
file_io,
})
}

async fn load_table_with_version_token(
&self,
table_ident: &TableIdent,
) -> Result<(Table, String)> {
let req = self
.s3tables_client
.get_table()
.table_bucket_arn(self.config.table_bucket_arn.clone())
.namespace(table_ident.namespace().to_url_string())
.name(table_ident.name());
let resp: GetTableOutput = req.send().await.map_err(from_aws_sdk_error)?;

// when a table is created, it's possible that the metadata location is not set.
let metadata_location = resp.metadata_location().ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
format!(
"Table {} does not have metadata location",
table_ident.name()
),
)
})?;
let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;

let table = Table::builder()
.identifier(table_ident.clone())
.metadata(metadata)
.metadata_location(metadata_location)
.file_io(self.file_io.clone())
.build()?;
Ok((table, resp.version_token))
}
}

#[async_trait]
Expand Down Expand Up @@ -368,33 +402,7 @@ impl Catalog for S3TablesCatalog {
/// - Errors from the underlying database query process, converted using
/// `from_aws_sdk_error`.
async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
let req = self
.s3tables_client
.get_table()
.table_bucket_arn(self.config.table_bucket_arn.clone())
.namespace(table_ident.namespace().to_url_string())
.name(table_ident.name());
let resp: GetTableOutput = req.send().await.map_err(from_aws_sdk_error)?;

// when a table is created, it's possible that the metadata location is not set.
let metadata_location = resp.metadata_location().ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
format!(
"Table {} does not have metadata location",
table_ident.name()
),
)
})?;
let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;

let table = Table::builder()
.identifier(table_ident.clone())
.metadata(metadata)
.metadata_location(metadata_location)
.file_io(self.file_io.clone())
.build()?;
Ok(table)
Ok(self.load_table_with_version_token(table_ident).await?.0)
}

/// Drops an existing table from the s3tables catalog.
Expand Down Expand Up @@ -480,13 +488,50 @@ impl Catalog for S3TablesCatalog {
}

/// Updates an existing table within the s3tables catalog.
///
/// This function is still in development and will always return an error.
async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
Err(Error::new(
ErrorKind::FeatureUnsupported,
"Updating a table is not supported yet",
))
async fn update_table(&self, commit: TableCommit) -> Result<Table> {
let table_ident = commit.identifier().clone();
let table_namespace = table_ident.namespace();
let (current_table, version_token) =
self.load_table_with_version_token(&table_ident).await?;

let staged_table = commit.apply(current_table)?;
let staged_metadata_location = staged_table.metadata_location_result()?;

staged_table
.metadata()
.write_to(staged_table.file_io(), staged_metadata_location)
.await?;

let builder = self
.s3tables_client
.update_table_metadata_location()
.table_bucket_arn(&self.config.table_bucket_arn)
.namespace(table_namespace.to_url_string())
.name(table_ident.name())
.version_token(version_token)
.metadata_location(staged_metadata_location);

let _ = builder.send().await.map_err(|e| {
let error = e.into_service_error();
match error {
UpdateTableMetadataLocationError::ConflictException(_) => Error::new(
ErrorKind::CatalogCommitConflicts,
format!("Commit conflicted for table: {table_ident}"),
)
.with_retryable(true),
UpdateTableMetadataLocationError::NotFoundException(_) => Error::new(
ErrorKind::TableNotFound,
format!("Table {table_ident} is not found"),
),
_ => Error::new(
ErrorKind::Unexpected,
"Operation failed for hitting aws sdk error",
),
}
.with_source(anyhow!("aws sdk error: {:?}", error))
})?;

Ok(staged_table)
}
}

Expand All @@ -503,6 +548,7 @@ where T: std::fmt::Debug {
#[cfg(test)]
mod tests {
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg::transaction::{ApplyTransactionAction, Transaction};

use super::*;

Expand Down Expand Up @@ -624,4 +670,89 @@ mod tests {
assert!(!catalog.table_exists(&table_ident).await.unwrap());
catalog.drop_namespace(&namespace).await.unwrap();
}

#[tokio::test]
async fn test_s3tables_update_table() {
let catalog = match load_s3tables_catalog_from_env().await {
Ok(Some(catalog)) => catalog,
Ok(None) => return,
Err(e) => panic!("Error loading catalog: {}", e),
};

// Create a test namespace and table
let namespace = NamespaceIdent::new("test_s3tables_update_table".to_string());
let table_ident =
TableIdent::new(namespace.clone(), "test_s3tables_update_table".to_string());

// Clean up any existing resources from previous test runs
catalog.drop_table(&table_ident).await.ok();
catalog.drop_namespace(&namespace).await.ok();

// Create namespace and table
catalog
.create_namespace(&namespace, HashMap::new())
.await
.unwrap();

let creation = {
let schema = Schema::builder()
.with_schema_id(0)
.with_fields(vec![
NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(),
])
.build()
.unwrap();
TableCreation::builder()
.name(table_ident.name().to_string())
.properties(HashMap::new())
.schema(schema)
.build()
};

let table = catalog.create_table(&namespace, creation).await.unwrap();

// Create a transaction to update the table
let tx = Transaction::new(&table);

// Store the original metadata location for comparison
let original_metadata_location = table.metadata_location();

// Update table properties using the transaction
let tx = tx
.update_table_properties()
.set("test_property".to_string(), "test_value".to_string())
.apply(tx)
.unwrap();

// Commit the transaction to the catalog
let updated_table = tx.commit(&catalog).await.unwrap();

// Verify the update was successful
assert_eq!(
updated_table.metadata().properties().get("test_property"),
Some(&"test_value".to_string())
);

// Verify the metadata location has been updated
assert_ne!(
updated_table.metadata_location(),
original_metadata_location,
"Metadata location should be updated after commit"
);

// Load the table again from the catalog to verify changes were persisted
let reloaded_table = catalog.load_table(&table_ident).await.unwrap();

// Verify the reloaded table matches the updated table
assert_eq!(
reloaded_table.metadata().properties().get("test_property"),
Some(&"test_value".to_string())
);
assert_eq!(
reloaded_table.metadata_location(),
updated_table.metadata_location(),
"Reloaded table should have the same metadata location as the updated table"
);
}
}
Loading