Skip to content

fix(catalog): Make glue warehouse optional and initialize file io in create or load table #1586

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 49 additions & 30 deletions crates/catalog/glue/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ pub struct GlueCatalogConfig {
uri: Option<String>,
#[builder(default, setter(strip_option(fallback = catalog_id_opt)))]
catalog_id: Option<String>,
warehouse: String,
#[builder(default, setter(strip_option(fallback = warehouse_opt)))]
warehouse: Option<String>,
#[builder(default, setter(strip_option(fallback = io_impl_opt)))]
io_impl: Option<String>,
#[builder(default)]
props: HashMap<String, String>,
}
Expand All @@ -58,7 +61,7 @@ struct GlueClient(aws_sdk_glue::Client);
pub struct GlueCatalog {
config: GlueCatalogConfig,
client: GlueClient,
file_io: FileIO,
catalog_props: HashMap<String, String>,
}

impl Debug for GlueCatalog {
Expand All @@ -73,51 +76,65 @@ impl GlueCatalog {
/// Create a new glue catalog
pub async fn new(config: GlueCatalogConfig) -> Result<Self> {
let sdk_config = create_sdk_config(&config.props, config.uri.as_ref()).await;
let mut file_io_props = config.props.clone();
if !file_io_props.contains_key(S3_ACCESS_KEY_ID) {
if let Some(access_key_id) = file_io_props.get(AWS_ACCESS_KEY_ID) {
file_io_props.insert(S3_ACCESS_KEY_ID.to_string(), access_key_id.to_string());
let mut catalog_props = config.props.clone();
if !catalog_props.contains_key(S3_ACCESS_KEY_ID) {
if let Some(access_key_id) = catalog_props.get(AWS_ACCESS_KEY_ID) {
catalog_props.insert(S3_ACCESS_KEY_ID.to_string(), access_key_id.to_string());
}
}
if !file_io_props.contains_key(S3_SECRET_ACCESS_KEY) {
if let Some(secret_access_key) = file_io_props.get(AWS_SECRET_ACCESS_KEY) {
file_io_props.insert(
if !catalog_props.contains_key(S3_SECRET_ACCESS_KEY) {
if let Some(secret_access_key) = catalog_props.get(AWS_SECRET_ACCESS_KEY) {
catalog_props.insert(
S3_SECRET_ACCESS_KEY.to_string(),
secret_access_key.to_string(),
);
}
}
if !file_io_props.contains_key(S3_REGION) {
if let Some(region) = file_io_props.get(AWS_REGION_NAME) {
file_io_props.insert(S3_REGION.to_string(), region.to_string());
if !catalog_props.contains_key(S3_REGION) {
if let Some(region) = catalog_props.get(AWS_REGION_NAME) {
catalog_props.insert(S3_REGION.to_string(), region.to_string());
}
}
if !file_io_props.contains_key(S3_SESSION_TOKEN) {
if let Some(session_token) = file_io_props.get(AWS_SESSION_TOKEN) {
file_io_props.insert(S3_SESSION_TOKEN.to_string(), session_token.to_string());
if !catalog_props.contains_key(S3_SESSION_TOKEN) {
if let Some(session_token) = catalog_props.get(AWS_SESSION_TOKEN) {
catalog_props.insert(S3_SESSION_TOKEN.to_string(), session_token.to_string());
}
}
if !file_io_props.contains_key(S3_ENDPOINT) {
if !catalog_props.contains_key(S3_ENDPOINT) {
if let Some(aws_endpoint) = config.uri.as_ref() {
file_io_props.insert(S3_ENDPOINT.to_string(), aws_endpoint.to_string());
catalog_props.insert(S3_ENDPOINT.to_string(), aws_endpoint.to_string());
}
}

let client = aws_sdk_glue::Client::new(&sdk_config);

let file_io = FileIO::from_path(&config.warehouse)?
.with_props(file_io_props)
.build()?;

Ok(GlueCatalog {
config,
client: GlueClient(client),
file_io,
catalog_props,
})
}
/// Get the catalogs `FileIO`
pub fn file_io(&self) -> FileIO {
self.file_io.clone()

/// Load the `FileIO` based on io type, metadata location, and warehouse precedence
async fn load_file_io(&self, metadata_location: Option<&str>) -> Result<FileIO> {
if let Some(io_impl) = self.config.io_impl.as_ref() {
Ok(FileIO::from_scheme(io_impl)?
.with_props(&self.catalog_props)
.build()?)
} else if let Some(metadata_path) = metadata_location.as_ref() {
Ok(FileIO::from_path(metadata_path)?
.with_props(&self.catalog_props)
.build()?)
} else if let Some(warehouse_path) = self.config.warehouse.as_ref() {
Ok(FileIO::from_path(warehouse_path)?
.with_props(&self.catalog_props)
.build()?)
} else {
Err(Error::new(
ErrorKind::PreconditionFailed,
"Unable to infer file io from io impl type, metadata location, or warehouse"
))
}
}
}

Expand Down Expand Up @@ -385,7 +402,7 @@ impl Catalog for GlueCatalog {
None => {
let ns = self.get_namespace(namespace).await?;
let location =
get_default_table_location(&ns, &db_name, &table_name, &self.config.warehouse);
get_default_table_location(&ns, &db_name, &table_name, self.config.warehouse.as_ref())?;
creation.location = Some(location.clone());
location
}
Expand All @@ -396,7 +413,8 @@ impl Catalog for GlueCatalog {
let metadata_location =
MetadataLocation::new_with_table_location(location.clone()).to_string();

metadata.write_to(&self.file_io, &metadata_location).await?;
let file_io = self.load_file_io(Some(&metadata_location)).await?;
metadata.write_to(&file_io, &metadata_location).await?;

let glue_table = convert_to_glue_table(
&table_name,
Expand All @@ -417,7 +435,7 @@ impl Catalog for GlueCatalog {
builder.send().await.map_err(from_aws_sdk_error)?;

Table::builder()
.file_io(self.file_io())
.file_io(file_io)
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
Expand Down Expand Up @@ -461,10 +479,11 @@ impl Catalog for GlueCatalog {
Some(table) => {
let metadata_location = get_metadata_location(&table.parameters)?;

let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
let file_io = self.load_file_io(Some(metadata_location.as_str())).await?;
let metadata = TableMetadata::read_from(&file_io, &metadata_location).await?;

Table::builder()
.file_io(self.file_io())
.file_io(file_io)
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(TableIdent::new(
Expand Down
31 changes: 18 additions & 13 deletions crates/catalog/glue/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,21 +208,26 @@ pub(crate) fn get_default_table_location(
namespace: &Namespace,
db_name: impl AsRef<str>,
table_name: impl AsRef<str>,
warehouse: impl AsRef<str>,
) -> String {
warehouse: Option<impl AsRef<str>>,
) -> Result<String> {
let properties = namespace.properties();

match properties.get(LOCATION) {
Some(location) => format!("{}/{}", location, table_name.as_ref()),
Some(location) => Ok(format!("{}/{}", location, table_name.as_ref())),
None => {
let warehouse_location = warehouse.as_ref().trim_end_matches('/');

format!(
"{}/{}.db/{}",
warehouse_location,
db_name.as_ref(),
table_name.as_ref()
)
if let Some(warehouse) = warehouse {
let warehouse_location = warehouse.as_ref().trim_end_matches('/');
Ok(format!(
"{}/{}.db/{}",
warehouse_location,
db_name.as_ref(),
table_name.as_ref()))
} else {
Err(Error::new(
ErrorKind::PreconditionFailed,
"Cannot derive default warehouse location, warehouse path must not be null or empty"
))
}
}
}
}
Expand Down Expand Up @@ -358,7 +363,7 @@ mod tests {

let expected = "db_location/my_table";
let result =
get_default_table_location(&namespace, db_name, table_name, "warehouse_location");
get_default_table_location(&namespace, db_name, table_name, Some("warehouse_location"))?;

assert_eq!(expected, result);

Expand All @@ -373,7 +378,7 @@ mod tests {

let expected = "warehouse_location/default.db/my_table";
let result =
get_default_table_location(&namespace, db_name, table_name, "warehouse_location");
get_default_table_location(&namespace, db_name, table_name, Some("warehouse_location"))?;

assert_eq!(expected, result);

Expand Down
68 changes: 37 additions & 31 deletions crates/catalog/glue/tests/glue_catalog_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
//! Integration tests for glue catalog.

use std::collections::HashMap;
use std::net::SocketAddr;
use std::net::{IpAddr, SocketAddr};
use std::sync::RwLock;

use ctor::{ctor, dtor};
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
Expand Down Expand Up @@ -55,19 +54,23 @@ fn after_all() {
guard.take();
}

async fn get_catalog() -> GlueCatalog {
async fn get_glue_endpoint() -> IpAddr {
let guard = DOCKER_COMPOSE_ENV.read().unwrap();
let docker_compose = guard.as_ref().unwrap();
docker_compose.get_container_ip("moto")
}

async fn get_s3_endpoint() -> IpAddr {
let guard = DOCKER_COMPOSE_ENV.read().unwrap();
let docker_compose = guard.as_ref().unwrap();
docker_compose.get_container_ip("minio")
}

async fn get_catalog(config: Option<GlueCatalogConfig>) -> GlueCatalog {
set_up();

let (glue_catalog_ip, minio_ip) = {
let guard = DOCKER_COMPOSE_ENV.read().unwrap();
let docker_compose = guard.as_ref().unwrap();
(
docker_compose.get_container_ip("moto"),
docker_compose.get_container_ip("minio"),
)
};
let glue_socket_addr = SocketAddr::new(glue_catalog_ip, GLUE_CATALOG_PORT);
let minio_socket_addr = SocketAddr::new(minio_ip, MINIO_PORT);
let glue_socket_addr = SocketAddr::new(get_glue_endpoint().await, GLUE_CATALOG_PORT);
let minio_socket_addr = SocketAddr::new(get_s3_endpoint().await, MINIO_PORT);
while !scan_port_addr(glue_socket_addr) {
info!("Waiting for 1s glue catalog to ready...");
sleep(std::time::Duration::from_millis(1000)).await;
Expand All @@ -89,11 +92,14 @@ async fn get_catalog() -> GlueCatalog {
(S3_REGION.to_string(), "us-east-1".to_string()),
]);

let config = GlueCatalogConfig::builder()
.uri(format!("http://{}", glue_socket_addr))
.warehouse("s3a://warehouse/hive".to_string())
.props(props.clone())
.build();
let config = match config {
Some(glue_config) => glue_config,
None => GlueCatalogConfig::builder()
.uri(format!("http://{}", glue_socket_addr))
.warehouse("s3a://warehouse/hive".to_string())
.props(props.clone())
.build()
};

GlueCatalog::new(config).await.unwrap()
}
Expand Down Expand Up @@ -125,7 +131,7 @@ fn set_table_creation(location: Option<String>, name: impl ToString) -> Result<T

#[tokio::test]
async fn test_rename_table() -> Result<()> {
let catalog = get_catalog().await;
let catalog = get_catalog(None).await;
let creation = set_table_creation(None, "my_table")?;
let namespace = Namespace::new(NamespaceIdent::new("test_rename_table".into()));

Expand All @@ -152,7 +158,7 @@ async fn test_rename_table() -> Result<()> {

#[tokio::test]
async fn test_table_exists() -> Result<()> {
let catalog = get_catalog().await;
let catalog = get_catalog(None).await;
let creation = set_table_creation(None, "my_table")?;
let namespace = Namespace::new(NamespaceIdent::new("test_table_exists".into()));

Expand All @@ -176,7 +182,7 @@ async fn test_table_exists() -> Result<()> {

#[tokio::test]
async fn test_drop_table() -> Result<()> {
let catalog = get_catalog().await;
let catalog = get_catalog(None).await;
let creation = set_table_creation(None, "my_table")?;
let namespace = Namespace::new(NamespaceIdent::new("test_drop_table".into()));

Expand All @@ -197,7 +203,7 @@ async fn test_drop_table() -> Result<()> {

#[tokio::test]
async fn test_load_table() -> Result<()> {
let catalog = get_catalog().await;
let catalog = get_catalog(None).await;
let creation = set_table_creation(None, "my_table")?;
let namespace = Namespace::new(NamespaceIdent::new("test_load_table".into()));

Expand All @@ -223,7 +229,7 @@ async fn test_load_table() -> Result<()> {

#[tokio::test]
async fn test_create_table() -> Result<()> {
let catalog = get_catalog().await;
let catalog = get_catalog(None).await;
let namespace = NamespaceIdent::new("test_create_table".to_string());
set_test_namespace(&catalog, &namespace).await?;
// inject custom location, ignore the namespace prefix
Expand All @@ -237,7 +243,7 @@ async fn test_create_table() -> Result<()> {
.is_some_and(|location| location.starts_with("s3a://warehouse/hive/metadata/00000-"))
);
assert!(
catalog
result
.file_io()
.exists("s3a://warehouse/hive/metadata/")
.await?
Expand All @@ -248,7 +254,7 @@ async fn test_create_table() -> Result<()> {

#[tokio::test]
async fn test_list_tables() -> Result<()> {
let catalog = get_catalog().await;
let catalog = get_catalog(None).await;
let namespace = NamespaceIdent::new("test_list_tables".to_string());
set_test_namespace(&catalog, &namespace).await?;

Expand All @@ -262,7 +268,7 @@ async fn test_list_tables() -> Result<()> {

#[tokio::test]
async fn test_drop_namespace() -> Result<()> {
let catalog = get_catalog().await;
let catalog = get_catalog(None).await;
let namespace = NamespaceIdent::new("test_drop_namespace".to_string());
set_test_namespace(&catalog, &namespace).await?;

Expand All @@ -279,7 +285,7 @@ async fn test_drop_namespace() -> Result<()> {

#[tokio::test]
async fn test_update_namespace() -> Result<()> {
let catalog = get_catalog().await;
let catalog = get_catalog(None).await;
let namespace = NamespaceIdent::new("test_update_namespace".into());
set_test_namespace(&catalog, &namespace).await?;

Expand All @@ -302,7 +308,7 @@ async fn test_update_namespace() -> Result<()> {

#[tokio::test]
async fn test_namespace_exists() -> Result<()> {
let catalog = get_catalog().await;
let catalog = get_catalog(None).await;

let namespace = NamespaceIdent::new("test_namespace_exists".into());

Expand All @@ -319,7 +325,7 @@ async fn test_namespace_exists() -> Result<()> {

#[tokio::test]
async fn test_get_namespace() -> Result<()> {
let catalog = get_catalog().await;
let catalog = get_catalog(None).await;

let namespace = NamespaceIdent::new("test_get_namespace".into());

Expand All @@ -338,7 +344,7 @@ async fn test_get_namespace() -> Result<()> {

#[tokio::test]
async fn test_create_namespace() -> Result<()> {
let catalog = get_catalog().await;
let catalog = get_catalog(None).await;

let properties = HashMap::new();
let namespace = NamespaceIdent::new("test_create_namespace".into());
Expand All @@ -354,7 +360,7 @@ async fn test_create_namespace() -> Result<()> {

#[tokio::test]
async fn test_list_namespace() -> Result<()> {
let catalog = get_catalog().await;
let catalog = get_catalog(None).await;

let namespace = NamespaceIdent::new("test_list_namespace".to_string());
set_test_namespace(&catalog, &namespace).await?;
Expand Down
Loading
Loading