From 32260bb3574f1209d4c85669aabf46026ef5a900 Mon Sep 17 00:00:00 2001 From: Leon Lin Date: Wed, 6 Aug 2025 13:56:07 -0700 Subject: [PATCH 1/2] Make warehouse optional and initialize file io in create or load table --- crates/catalog/glue/src/catalog.rs | 79 ++++++++++++------- crates/catalog/glue/src/utils.rs | 31 +++++--- .../catalog/glue/tests/glue_catalog_test.rs | 68 ++++++++-------- 3 files changed, 104 insertions(+), 74 deletions(-) diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index fb4bd36b8d..93cb97fc20 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -47,7 +47,10 @@ pub struct GlueCatalogConfig { uri: Option, #[builder(default, setter(strip_option(fallback = catalog_id_opt)))] catalog_id: Option, - warehouse: String, + #[builder(default, setter(strip_option(fallback = warehouse_opt)))] + warehouse: Option, + #[builder(default, setter(strip_option(fallback = io_impl_opt)))] + io_impl: Option, #[builder(default)] props: HashMap, } @@ -58,7 +61,7 @@ struct GlueClient(aws_sdk_glue::Client); pub struct GlueCatalog { config: GlueCatalogConfig, client: GlueClient, - file_io: FileIO, + catalog_props: HashMap, } impl Debug for GlueCatalog { @@ -73,51 +76,65 @@ impl GlueCatalog { /// Create a new glue catalog pub async fn new(config: GlueCatalogConfig) -> Result { let sdk_config = create_sdk_config(&config.props, config.uri.as_ref()).await; - let mut file_io_props = config.props.clone(); - if !file_io_props.contains_key(S3_ACCESS_KEY_ID) { - if let Some(access_key_id) = file_io_props.get(AWS_ACCESS_KEY_ID) { - file_io_props.insert(S3_ACCESS_KEY_ID.to_string(), access_key_id.to_string()); + let mut catalog_props = config.props.clone(); + if !catalog_props.contains_key(S3_ACCESS_KEY_ID) { + if let Some(access_key_id) = catalog_props.get(AWS_ACCESS_KEY_ID) { + catalog_props.insert(S3_ACCESS_KEY_ID.to_string(), access_key_id.to_string()); } } - if !file_io_props.contains_key(S3_SECRET_ACCESS_KEY) { - if let Some(secret_access_key) = file_io_props.get(AWS_SECRET_ACCESS_KEY) { - file_io_props.insert( + if !catalog_props.contains_key(S3_SECRET_ACCESS_KEY) { + if let Some(secret_access_key) = catalog_props.get(AWS_SECRET_ACCESS_KEY) { + catalog_props.insert( S3_SECRET_ACCESS_KEY.to_string(), secret_access_key.to_string(), ); } } - if !file_io_props.contains_key(S3_REGION) { - if let Some(region) = file_io_props.get(AWS_REGION_NAME) { - file_io_props.insert(S3_REGION.to_string(), region.to_string()); + if !catalog_props.contains_key(S3_REGION) { + if let Some(region) = catalog_props.get(AWS_REGION_NAME) { + catalog_props.insert(S3_REGION.to_string(), region.to_string()); } } - if !file_io_props.contains_key(S3_SESSION_TOKEN) { - if let Some(session_token) = file_io_props.get(AWS_SESSION_TOKEN) { - file_io_props.insert(S3_SESSION_TOKEN.to_string(), session_token.to_string()); + if !catalog_props.contains_key(S3_SESSION_TOKEN) { + if let Some(session_token) = catalog_props.get(AWS_SESSION_TOKEN) { + catalog_props.insert(S3_SESSION_TOKEN.to_string(), session_token.to_string()); } } - if !file_io_props.contains_key(S3_ENDPOINT) { + if !catalog_props.contains_key(S3_ENDPOINT) { if let Some(aws_endpoint) = config.uri.as_ref() { - file_io_props.insert(S3_ENDPOINT.to_string(), aws_endpoint.to_string()); + catalog_props.insert(S3_ENDPOINT.to_string(), aws_endpoint.to_string()); } } let client = aws_sdk_glue::Client::new(&sdk_config); - let file_io = FileIO::from_path(&config.warehouse)? - .with_props(file_io_props) - .build()?; - Ok(GlueCatalog { config, client: GlueClient(client), - file_io, + catalog_props, }) } - /// Get the catalogs `FileIO` - pub fn file_io(&self) -> FileIO { - self.file_io.clone() + + /// Load the `FileIO` based on io type, metadata location, and warehouse precedence + async fn load_file_io(&self, metadata_location: Option<&str>) -> Result { + if let Some(io_impl) = self.config.io_impl.as_ref() { + Ok(FileIO::from_path(io_impl)? + .with_props(&self.catalog_props) + .build()?) + } else if let Some(metadata_path) = metadata_location.as_ref() { + Ok(FileIO::from_path(metadata_path)? + .with_props(&self.catalog_props) + .build()?) + } else if let Some(warehouse_path) = self.config.warehouse.as_ref() { + Ok(FileIO::from_path(warehouse_path)? + .with_props(&self.catalog_props) + .build()?) + } else { + Err(Error::new( + ErrorKind::PreconditionFailed, + "Unable to infer file io from io impl type, metadata location, or warehouse" + )) + } } } @@ -385,7 +402,7 @@ impl Catalog for GlueCatalog { None => { let ns = self.get_namespace(namespace).await?; let location = - get_default_table_location(&ns, &db_name, &table_name, &self.config.warehouse); + get_default_table_location(&ns, &db_name, &table_name, self.config.warehouse.as_ref())?; creation.location = Some(location.clone()); location } @@ -396,7 +413,8 @@ impl Catalog for GlueCatalog { let metadata_location = MetadataLocation::new_with_table_location(location.clone()).to_string(); - metadata.write_to(&self.file_io, &metadata_location).await?; + let file_io = self.load_file_io(Some(&metadata_location)).await?; + metadata.write_to(&file_io, &metadata_location).await?; let glue_table = convert_to_glue_table( &table_name, @@ -417,7 +435,7 @@ impl Catalog for GlueCatalog { builder.send().await.map_err(from_aws_sdk_error)?; Table::builder() - .file_io(self.file_io()) + .file_io(file_io) .metadata_location(metadata_location) .metadata(metadata) .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name)) @@ -461,10 +479,11 @@ impl Catalog for GlueCatalog { Some(table) => { let metadata_location = get_metadata_location(&table.parameters)?; - let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?; + let file_io = self.load_file_io(Some(metadata_location.as_str())).await?; + let metadata = TableMetadata::read_from(&file_io, &metadata_location).await?; Table::builder() - .file_io(self.file_io()) + .file_io(file_io) .metadata_location(metadata_location) .metadata(metadata) .identifier(TableIdent::new( diff --git a/crates/catalog/glue/src/utils.rs b/crates/catalog/glue/src/utils.rs index d2c21b4d74..fa87829717 100644 --- a/crates/catalog/glue/src/utils.rs +++ b/crates/catalog/glue/src/utils.rs @@ -208,21 +208,26 @@ pub(crate) fn get_default_table_location( namespace: &Namespace, db_name: impl AsRef, table_name: impl AsRef, - warehouse: impl AsRef, -) -> String { + warehouse: Option>, +) -> Result { let properties = namespace.properties(); match properties.get(LOCATION) { - Some(location) => format!("{}/{}", location, table_name.as_ref()), + Some(location) => Ok(format!("{}/{}", location, table_name.as_ref())), None => { - let warehouse_location = warehouse.as_ref().trim_end_matches('/'); - - format!( - "{}/{}.db/{}", - warehouse_location, - db_name.as_ref(), - table_name.as_ref() - ) + if let Some(warehouse) = warehouse { + let warehouse_location = warehouse.as_ref().trim_end_matches('/'); + Ok(format!( + "{}/{}.db/{}", + warehouse_location, + db_name.as_ref(), + table_name.as_ref())) + } else { + Err(Error::new( + ErrorKind::PreconditionFailed, + "Cannot derive default warehouse location, warehouse path must not be null or empty" + )) + } } } } @@ -358,7 +363,7 @@ mod tests { let expected = "db_location/my_table"; let result = - get_default_table_location(&namespace, db_name, table_name, "warehouse_location"); + get_default_table_location(&namespace, db_name, table_name, Some("warehouse_location"))?; assert_eq!(expected, result); @@ -373,7 +378,7 @@ mod tests { let expected = "warehouse_location/default.db/my_table"; let result = - get_default_table_location(&namespace, db_name, table_name, "warehouse_location"); + get_default_table_location(&namespace, db_name, table_name, Some("warehouse_location"))?; assert_eq!(expected, result); diff --git a/crates/catalog/glue/tests/glue_catalog_test.rs b/crates/catalog/glue/tests/glue_catalog_test.rs index bec9494fe9..74097107c9 100644 --- a/crates/catalog/glue/tests/glue_catalog_test.rs +++ b/crates/catalog/glue/tests/glue_catalog_test.rs @@ -18,9 +18,8 @@ //! Integration tests for glue catalog. use std::collections::HashMap; -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::sync::RwLock; - use ctor::{ctor, dtor}; use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; @@ -55,19 +54,23 @@ fn after_all() { guard.take(); } -async fn get_catalog() -> GlueCatalog { +async fn get_glue_endpoint() -> IpAddr { + let guard = DOCKER_COMPOSE_ENV.read().unwrap(); + let docker_compose = guard.as_ref().unwrap(); + docker_compose.get_container_ip("moto") +} + +async fn get_s3_endpoint() -> IpAddr { + let guard = DOCKER_COMPOSE_ENV.read().unwrap(); + let docker_compose = guard.as_ref().unwrap(); + docker_compose.get_container_ip("minio") +} + +async fn get_catalog(config: Option) -> GlueCatalog { set_up(); - let (glue_catalog_ip, minio_ip) = { - let guard = DOCKER_COMPOSE_ENV.read().unwrap(); - let docker_compose = guard.as_ref().unwrap(); - ( - docker_compose.get_container_ip("moto"), - docker_compose.get_container_ip("minio"), - ) - }; - let glue_socket_addr = SocketAddr::new(glue_catalog_ip, GLUE_CATALOG_PORT); - let minio_socket_addr = SocketAddr::new(minio_ip, MINIO_PORT); + let glue_socket_addr = SocketAddr::new(get_glue_endpoint().await, GLUE_CATALOG_PORT); + let minio_socket_addr = SocketAddr::new(get_s3_endpoint().await, MINIO_PORT); while !scan_port_addr(glue_socket_addr) { info!("Waiting for 1s glue catalog to ready..."); sleep(std::time::Duration::from_millis(1000)).await; @@ -89,11 +92,14 @@ async fn get_catalog() -> GlueCatalog { (S3_REGION.to_string(), "us-east-1".to_string()), ]); - let config = GlueCatalogConfig::builder() - .uri(format!("http://{}", glue_socket_addr)) - .warehouse("s3a://warehouse/hive".to_string()) - .props(props.clone()) - .build(); + let config = match config { + Some(glue_config) => glue_config, + None => GlueCatalogConfig::builder() + .uri(format!("http://{}", glue_socket_addr)) + .warehouse("s3a://warehouse/hive".to_string()) + .props(props.clone()) + .build() + }; GlueCatalog::new(config).await.unwrap() } @@ -125,7 +131,7 @@ fn set_table_creation(location: Option, name: impl ToString) -> Result Result<()> { - let catalog = get_catalog().await; + let catalog = get_catalog(None).await; let creation = set_table_creation(None, "my_table")?; let namespace = Namespace::new(NamespaceIdent::new("test_rename_table".into())); @@ -152,7 +158,7 @@ async fn test_rename_table() -> Result<()> { #[tokio::test] async fn test_table_exists() -> Result<()> { - let catalog = get_catalog().await; + let catalog = get_catalog(None).await; let creation = set_table_creation(None, "my_table")?; let namespace = Namespace::new(NamespaceIdent::new("test_table_exists".into())); @@ -176,7 +182,7 @@ async fn test_table_exists() -> Result<()> { #[tokio::test] async fn test_drop_table() -> Result<()> { - let catalog = get_catalog().await; + let catalog = get_catalog(None).await; let creation = set_table_creation(None, "my_table")?; let namespace = Namespace::new(NamespaceIdent::new("test_drop_table".into())); @@ -197,7 +203,7 @@ async fn test_drop_table() -> Result<()> { #[tokio::test] async fn test_load_table() -> Result<()> { - let catalog = get_catalog().await; + let catalog = get_catalog(None).await; let creation = set_table_creation(None, "my_table")?; let namespace = Namespace::new(NamespaceIdent::new("test_load_table".into())); @@ -223,7 +229,7 @@ async fn test_load_table() -> Result<()> { #[tokio::test] async fn test_create_table() -> Result<()> { - let catalog = get_catalog().await; + let catalog = get_catalog(None).await; let namespace = NamespaceIdent::new("test_create_table".to_string()); set_test_namespace(&catalog, &namespace).await?; // inject custom location, ignore the namespace prefix @@ -237,7 +243,7 @@ async fn test_create_table() -> Result<()> { .is_some_and(|location| location.starts_with("s3a://warehouse/hive/metadata/00000-")) ); assert!( - catalog + result .file_io() .exists("s3a://warehouse/hive/metadata/") .await? @@ -248,7 +254,7 @@ async fn test_create_table() -> Result<()> { #[tokio::test] async fn test_list_tables() -> Result<()> { - let catalog = get_catalog().await; + let catalog = get_catalog(None).await; let namespace = NamespaceIdent::new("test_list_tables".to_string()); set_test_namespace(&catalog, &namespace).await?; @@ -262,7 +268,7 @@ async fn test_list_tables() -> Result<()> { #[tokio::test] async fn test_drop_namespace() -> Result<()> { - let catalog = get_catalog().await; + let catalog = get_catalog(None).await; let namespace = NamespaceIdent::new("test_drop_namespace".to_string()); set_test_namespace(&catalog, &namespace).await?; @@ -279,7 +285,7 @@ async fn test_drop_namespace() -> Result<()> { #[tokio::test] async fn test_update_namespace() -> Result<()> { - let catalog = get_catalog().await; + let catalog = get_catalog(None).await; let namespace = NamespaceIdent::new("test_update_namespace".into()); set_test_namespace(&catalog, &namespace).await?; @@ -302,7 +308,7 @@ async fn test_update_namespace() -> Result<()> { #[tokio::test] async fn test_namespace_exists() -> Result<()> { - let catalog = get_catalog().await; + let catalog = get_catalog(None).await; let namespace = NamespaceIdent::new("test_namespace_exists".into()); @@ -319,7 +325,7 @@ async fn test_namespace_exists() -> Result<()> { #[tokio::test] async fn test_get_namespace() -> Result<()> { - let catalog = get_catalog().await; + let catalog = get_catalog(None).await; let namespace = NamespaceIdent::new("test_get_namespace".into()); @@ -338,7 +344,7 @@ async fn test_get_namespace() -> Result<()> { #[tokio::test] async fn test_create_namespace() -> Result<()> { - let catalog = get_catalog().await; + let catalog = get_catalog(None).await; let properties = HashMap::new(); let namespace = NamespaceIdent::new("test_create_namespace".into()); @@ -354,7 +360,7 @@ async fn test_create_namespace() -> Result<()> { #[tokio::test] async fn test_list_namespace() -> Result<()> { - let catalog = get_catalog().await; + let catalog = get_catalog(None).await; let namespace = NamespaceIdent::new("test_list_namespace".to_string()); set_test_namespace(&catalog, &namespace).await?; From 9a902eda9a3cc2f1213d85de6b60f9c61e2f63da Mon Sep 17 00:00:00 2001 From: Leon Lin Date: Wed, 6 Aug 2025 16:44:45 -0700 Subject: [PATCH 2/2] Fix file io --- crates/catalog/glue/src/catalog.rs | 2 +- crates/iceberg/src/io/file_io.rs | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index 93cb97fc20..dac593aa80 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -118,7 +118,7 @@ impl GlueCatalog { /// Load the `FileIO` based on io type, metadata location, and warehouse precedence async fn load_file_io(&self, metadata_location: Option<&str>) -> Result { if let Some(io_impl) = self.config.io_impl.as_ref() { - Ok(FileIO::from_path(io_impl)? + Ok(FileIO::from_scheme(io_impl)? .with_props(&self.catalog_props) .build()?) } else if let Some(metadata_path) = metadata_location.as_ref() { diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 6e2d152ed7..811611b836 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -60,6 +60,11 @@ impl FileIO { self.builder } + /// Create file io scheme from scheme type. See [`FileIO`] for supported schemes. + pub fn from_scheme(scheme: impl AsRef) -> Result { + Ok(FileIOBuilder::new(scheme.as_ref())) + } + /// Try to infer file io scheme from path. See [`FileIO`] for supported schemes. /// /// - If it's a valid url, for example `s3://bucket/a`, url scheme will be used, and the rest of the url will be ignored.