Skip to content

Commit 87acf3c

Browse files
CTTYXuanwo
andauthored
feat(catalog): Implement update_table for S3TablesCatalog (apache#1594)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Addresses the S3Table part of apache#1389 ## What changes are included in this PR? - Implemented update_table for S3TablesCatalog <!-- Provide a summary of the modifications in this PR. List the main changes such as new features, bug fixes, refactoring, or any other updates. --> ## Are these changes tested? added a test <!-- Specify what test covers (unit test, integration test, etc.). If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> --------- Signed-off-by: Xuanwo <[email protected]> Co-authored-by: Xuanwo <[email protected]>
1 parent b20a255 commit 87acf3c

File tree

3 files changed

+167
-34
lines changed

3 files changed

+167
-34
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/catalog/s3tables/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ license = { workspace = true }
2929
repository = { workspace = true }
3030

3131
[dependencies]
32+
anyhow = { workspace = true }
3233
async-trait = { workspace = true }
3334
aws-config = { workspace = true }
3435
aws-sdk-s3tables = { workspace = true }

crates/catalog/s3tables/src/catalog.rs

Lines changed: 165 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use aws_sdk_s3tables::operation::create_table::CreateTableOutput;
2323
use aws_sdk_s3tables::operation::get_namespace::GetNamespaceOutput;
2424
use aws_sdk_s3tables::operation::get_table::GetTableOutput;
2525
use aws_sdk_s3tables::operation::list_tables::ListTablesOutput;
26+
use aws_sdk_s3tables::operation::update_table_metadata_location::UpdateTableMetadataLocationError;
2627
use aws_sdk_s3tables::types::OpenTableFormat;
2728
use iceberg::io::{FileIO, FileIOBuilder};
2829
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
@@ -189,6 +190,39 @@ impl S3TablesCatalog {
189190
file_io,
190191
})
191192
}
193+
194+
async fn load_table_with_version_token(
195+
&self,
196+
table_ident: &TableIdent,
197+
) -> Result<(Table, String)> {
198+
let req = self
199+
.s3tables_client
200+
.get_table()
201+
.table_bucket_arn(self.config.table_bucket_arn.clone())
202+
.namespace(table_ident.namespace().to_url_string())
203+
.name(table_ident.name());
204+
let resp: GetTableOutput = req.send().await.map_err(from_aws_sdk_error)?;
205+
206+
// when a table is created, it's possible that the metadata location is not set.
207+
let metadata_location = resp.metadata_location().ok_or_else(|| {
208+
Error::new(
209+
ErrorKind::Unexpected,
210+
format!(
211+
"Table {} does not have metadata location",
212+
table_ident.name()
213+
),
214+
)
215+
})?;
216+
let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
217+
218+
let table = Table::builder()
219+
.identifier(table_ident.clone())
220+
.metadata(metadata)
221+
.metadata_location(metadata_location)
222+
.file_io(self.file_io.clone())
223+
.build()?;
224+
Ok((table, resp.version_token))
225+
}
192226
}
193227

194228
#[async_trait]
@@ -477,33 +511,7 @@ impl Catalog for S3TablesCatalog {
477511
/// - Errors from the underlying database query process, converted using
478512
/// `from_aws_sdk_error`.
479513
async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
480-
let req = self
481-
.s3tables_client
482-
.get_table()
483-
.table_bucket_arn(self.config.table_bucket_arn.clone())
484-
.namespace(table_ident.namespace().to_url_string())
485-
.name(table_ident.name());
486-
let resp: GetTableOutput = req.send().await.map_err(from_aws_sdk_error)?;
487-
488-
// when a table is created, it's possible that the metadata location is not set.
489-
let metadata_location = resp.metadata_location().ok_or_else(|| {
490-
Error::new(
491-
ErrorKind::Unexpected,
492-
format!(
493-
"Table {} does not have metadata location",
494-
table_ident.name()
495-
),
496-
)
497-
})?;
498-
let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
499-
500-
let table = Table::builder()
501-
.identifier(table_ident.clone())
502-
.metadata(metadata)
503-
.metadata_location(metadata_location)
504-
.file_io(self.file_io.clone())
505-
.build()?;
506-
Ok(table)
514+
Ok(self.load_table_with_version_token(table_ident).await?.0)
507515
}
508516

509517
/// Drops an existing table from the s3tables catalog.
@@ -589,13 +597,50 @@ impl Catalog for S3TablesCatalog {
589597
}
590598

591599
/// Updates an existing table within the s3tables catalog.
592-
///
593-
/// This function is still in development and will always return an error.
594-
async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
595-
Err(Error::new(
596-
ErrorKind::FeatureUnsupported,
597-
"Updating a table is not supported yet",
598-
))
600+
async fn update_table(&self, commit: TableCommit) -> Result<Table> {
601+
let table_ident = commit.identifier().clone();
602+
let table_namespace = table_ident.namespace();
603+
let (current_table, version_token) =
604+
self.load_table_with_version_token(&table_ident).await?;
605+
606+
let staged_table = commit.apply(current_table)?;
607+
let staged_metadata_location = staged_table.metadata_location_result()?;
608+
609+
staged_table
610+
.metadata()
611+
.write_to(staged_table.file_io(), staged_metadata_location)
612+
.await?;
613+
614+
let builder = self
615+
.s3tables_client
616+
.update_table_metadata_location()
617+
.table_bucket_arn(&self.config.table_bucket_arn)
618+
.namespace(table_namespace.to_url_string())
619+
.name(table_ident.name())
620+
.version_token(version_token)
621+
.metadata_location(staged_metadata_location);
622+
623+
let _ = builder.send().await.map_err(|e| {
624+
let error = e.into_service_error();
625+
match error {
626+
UpdateTableMetadataLocationError::ConflictException(_) => Error::new(
627+
ErrorKind::CatalogCommitConflicts,
628+
format!("Commit conflicted for table: {table_ident}"),
629+
)
630+
.with_retryable(true),
631+
UpdateTableMetadataLocationError::NotFoundException(_) => Error::new(
632+
ErrorKind::TableNotFound,
633+
format!("Table {table_ident} is not found"),
634+
),
635+
_ => Error::new(
636+
ErrorKind::Unexpected,
637+
"Operation failed for hitting aws sdk error",
638+
),
639+
}
640+
.with_source(anyhow::Error::msg(format!("aws sdk error: {:?}", error)))
641+
})?;
642+
643+
Ok(staged_table)
599644
}
600645
}
601646

@@ -611,6 +656,7 @@ where T: std::fmt::Debug {
611656
#[cfg(test)]
612657
mod tests {
613658
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
659+
use iceberg::transaction::{ApplyTransactionAction, Transaction};
614660

615661
use super::*;
616662

@@ -737,6 +783,91 @@ mod tests {
737783
catalog.drop_namespace(&namespace).await.unwrap();
738784
}
739785

786+
#[tokio::test]
787+
async fn test_s3tables_update_table() {
788+
let catalog = match load_s3tables_catalog_from_env().await {
789+
Ok(Some(catalog)) => catalog,
790+
Ok(None) => return,
791+
Err(e) => panic!("Error loading catalog: {}", e),
792+
};
793+
794+
// Create a test namespace and table
795+
let namespace = NamespaceIdent::new("test_s3tables_update_table".to_string());
796+
let table_ident =
797+
TableIdent::new(namespace.clone(), "test_s3tables_update_table".to_string());
798+
799+
// Clean up any existing resources from previous test runs
800+
catalog.drop_table(&table_ident).await.ok();
801+
catalog.drop_namespace(&namespace).await.ok();
802+
803+
// Create namespace and table
804+
catalog
805+
.create_namespace(&namespace, HashMap::new())
806+
.await
807+
.unwrap();
808+
809+
let creation = {
810+
let schema = Schema::builder()
811+
.with_schema_id(0)
812+
.with_fields(vec![
813+
NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
814+
NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(),
815+
])
816+
.build()
817+
.unwrap();
818+
TableCreation::builder()
819+
.name(table_ident.name().to_string())
820+
.properties(HashMap::new())
821+
.schema(schema)
822+
.build()
823+
};
824+
825+
let table = catalog.create_table(&namespace, creation).await.unwrap();
826+
827+
// Create a transaction to update the table
828+
let tx = Transaction::new(&table);
829+
830+
// Store the original metadata location for comparison
831+
let original_metadata_location = table.metadata_location();
832+
833+
// Update table properties using the transaction
834+
let tx = tx
835+
.update_table_properties()
836+
.set("test_property".to_string(), "test_value".to_string())
837+
.apply(tx)
838+
.unwrap();
839+
840+
// Commit the transaction to the catalog
841+
let updated_table = tx.commit(&catalog).await.unwrap();
842+
843+
// Verify the update was successful
844+
assert_eq!(
845+
updated_table.metadata().properties().get("test_property"),
846+
Some(&"test_value".to_string())
847+
);
848+
849+
// Verify the metadata location has been updated
850+
assert_ne!(
851+
updated_table.metadata_location(),
852+
original_metadata_location,
853+
"Metadata location should be updated after commit"
854+
);
855+
856+
// Load the table again from the catalog to verify changes were persisted
857+
let reloaded_table = catalog.load_table(&table_ident).await.unwrap();
858+
859+
// Verify the reloaded table matches the updated table
860+
assert_eq!(
861+
reloaded_table.metadata().properties().get("test_property"),
862+
Some(&"test_value".to_string())
863+
);
864+
assert_eq!(
865+
reloaded_table.metadata_location(),
866+
updated_table.metadata_location(),
867+
"Reloaded table should have the same metadata location as the updated table"
868+
);
869+
}
870+
740871
#[tokio::test]
741872
async fn test_builder_load_missing_bucket_arn() {
742873
let builder = S3TablesCatalogBuilder::default();

0 commit comments

Comments
 (0)