Skip to content

Commit 1619e57

Browse files
lliangyu-linliurenjie1024
authored andcommitted
feat(catalog): Implement catalog loader for hms (apache#1612)
## Which issue does this PR close? - Closes apache#1257 ## What changes are included in this PR? * Implement CatalogBuilder for hms catalog ## Are these changes tested? Yes, updated existing `hms_catalog_test` --------- Co-authored-by: Renjie Liu <[email protected]>
1 parent 4c8c58d commit 1619e57

File tree

8 files changed

+195
-39
lines changed

8 files changed

+195
-39
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ iceberg = { version = "0.6.0", path = "./crates/iceberg" }
8080
iceberg-catalog-rest = { version = "0.6.0", path = "./crates/catalog/rest" }
8181
iceberg-catalog-glue = { version = "0.6.0", path = "./crates/catalog/glue" }
8282
iceberg-catalog-s3tables = { version = "0.6.0", path = "./crates/catalog/s3tables" }
83+
iceberg-catalog-hms = { version = "0.6.0", path = "./crates/catalog/hms" }
8384
iceberg-datafusion = { version = "0.6.0", path = "./crates/integrations/datafusion" }
8485
indicatif = "0.17"
8586
itertools = "0.13"

crates/catalog/hms/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ pilota = { workspace = true }
3838
serde_json = { workspace = true }
3939
tokio = { workspace = true }
4040
tracing = { workspace = true }
41-
typed-builder = { workspace = true }
4241
volo-thrift = { workspace = true }
4342

4443
# Transitive dependencies below

crates/catalog/hms/src/catalog.rs

Lines changed: 98 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,106 @@ use iceberg::io::FileIO;
2929
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
3030
use iceberg::table::Table;
3131
use iceberg::{
32-
Catalog, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, TableCommit,
33-
TableCreation, TableIdent,
32+
Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
33+
TableCommit, TableCreation, TableIdent,
3434
};
35-
use typed_builder::TypedBuilder;
3635
use volo_thrift::MaybeException;
3736

3837
use super::utils::*;
3938
use crate::error::{from_io_error, from_thrift_error, from_thrift_exception};
4039

40+
/// HMS catalog address
41+
pub const HMS_CATALOG_PROP_URI: &str = "uri";
42+
43+
/// HMS Catalog thrift transport
44+
pub const HMS_CATALOG_PROP_THRIFT_TRANSPORT: &str = "thrift_transport";
45+
/// HMS Catalog framed thrift transport
46+
pub const THRIFT_TRANSPORT_FRAMED: &str = "framed";
47+
/// HMS Catalog buffered thrift transport
48+
pub const THRIFT_TRANSPORT_BUFFERED: &str = "buffered";
49+
50+
/// HMS Catalog warehouse location
51+
pub const HMS_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
52+
53+
/// Builder for [`RestCatalog`].
54+
#[derive(Debug)]
55+
pub struct HmsCatalogBuilder(HmsCatalogConfig);
56+
57+
impl Default for HmsCatalogBuilder {
58+
fn default() -> Self {
59+
Self(HmsCatalogConfig {
60+
name: None,
61+
address: "".to_string(),
62+
thrift_transport: HmsThriftTransport::default(),
63+
warehouse: "".to_string(),
64+
props: HashMap::new(),
65+
})
66+
}
67+
}
68+
69+
impl CatalogBuilder for HmsCatalogBuilder {
70+
type C = HmsCatalog;
71+
72+
fn load(
73+
mut self,
74+
name: impl Into<String>,
75+
props: HashMap<String, String>,
76+
) -> impl Future<Output = Result<Self::C>> + Send {
77+
self.0.name = Some(name.into());
78+
79+
if props.contains_key(HMS_CATALOG_PROP_URI) {
80+
self.0.address = props.get(HMS_CATALOG_PROP_URI).cloned().unwrap_or_default();
81+
}
82+
83+
if let Some(tt) = props.get(HMS_CATALOG_PROP_THRIFT_TRANSPORT) {
84+
self.0.thrift_transport = match tt.to_lowercase().as_str() {
85+
THRIFT_TRANSPORT_FRAMED => HmsThriftTransport::Framed,
86+
THRIFT_TRANSPORT_BUFFERED => HmsThriftTransport::Buffered,
87+
_ => HmsThriftTransport::default(),
88+
};
89+
}
90+
91+
if props.contains_key(HMS_CATALOG_PROP_WAREHOUSE) {
92+
self.0.warehouse = props
93+
.get(HMS_CATALOG_PROP_WAREHOUSE)
94+
.cloned()
95+
.unwrap_or_default();
96+
}
97+
98+
self.0.props = props
99+
.into_iter()
100+
.filter(|(k, _)| {
101+
k != HMS_CATALOG_PROP_URI
102+
&& k != HMS_CATALOG_PROP_THRIFT_TRANSPORT
103+
&& k != HMS_CATALOG_PROP_WAREHOUSE
104+
})
105+
.collect();
106+
107+
let result = {
108+
if self.0.name.is_none() {
109+
Err(Error::new(
110+
ErrorKind::DataInvalid,
111+
"Catalog name is required",
112+
))
113+
} else if self.0.address.is_empty() {
114+
Err(Error::new(
115+
ErrorKind::DataInvalid,
116+
"Catalog address is required",
117+
))
118+
} else if self.0.warehouse.is_empty() {
119+
Err(Error::new(
120+
ErrorKind::DataInvalid,
121+
"Catalog warehouse is required",
122+
))
123+
} else {
124+
HmsCatalog::new(self.0)
125+
}
126+
};
127+
128+
std::future::ready(result)
129+
}
130+
}
131+
41132
/// Which variant of the thrift transport to communicate with HMS
42133
/// See: <https://github.com/apache/thrift/blob/master/doc/specs/thrift-rpc.md#framed-vs-unframed-transport>
43134
#[derive(Debug, Default)]
@@ -50,12 +141,12 @@ pub enum HmsThriftTransport {
50141
}
51142

52143
/// Hive metastore Catalog configuration.
53-
#[derive(Debug, TypedBuilder)]
54-
pub struct HmsCatalogConfig {
144+
#[derive(Debug)]
145+
pub(crate) struct HmsCatalogConfig {
146+
name: Option<String>,
55147
address: String,
56148
thrift_transport: HmsThriftTransport,
57149
warehouse: String,
58-
#[builder(default)]
59150
props: HashMap<String, String>,
60151
}
61152

@@ -78,7 +169,7 @@ impl Debug for HmsCatalog {
78169

79170
impl HmsCatalog {
80171
/// Create a new hms catalog.
81-
pub fn new(config: HmsCatalogConfig) -> Result<Self> {
172+
fn new(config: HmsCatalogConfig) -> Result<Self> {
82173
let address = config
83174
.address
84175
.as_str()

crates/catalog/hms/src/lib.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,35 @@
1616
// under the License.
1717

1818
//! Iceberg Hive Metastore Catalog implementation.
19+
//!
20+
//! To build a hive metastore with configurations
21+
//! # Example
22+
//!
23+
//! ```rust, no_run
24+
//! use std::collections::HashMap;
25+
//!
26+
//! use iceberg::CatalogBuilder;
27+
//! use iceberg_catalog_hms::{
28+
//! HMS_CATALOG_PROP_URI, HMS_CATALOG_PROP_WAREHOUSE, HmsCatalogBuilder,
29+
//! };
30+
//!
31+
//! #[tokio::main]
32+
//! async fn main() {
33+
//! let catalog = HmsCatalogBuilder::default()
34+
//! .load(
35+
//! "hms",
36+
//! HashMap::from([
37+
//! (HMS_CATALOG_PROP_URI.to_string(), "127.0.0.1:1".to_string()),
38+
//! (
39+
//! HMS_CATALOG_PROP_WAREHOUSE.to_string(),
40+
//! "s3://warehouse".to_string(),
41+
//! ),
42+
//! ]),
43+
//! )
44+
//! .await
45+
//! .unwrap();
46+
//! }
47+
//! ```
1948
2049
#![deny(missing_docs)]
2150

crates/catalog/hms/tests/hms_catalog_test.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,11 @@ use std::sync::RwLock;
2424
use ctor::{ctor, dtor};
2525
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
2626
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
27-
use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent};
28-
use iceberg_catalog_hms::{HmsCatalog, HmsCatalogConfig, HmsThriftTransport};
27+
use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent};
28+
use iceberg_catalog_hms::{
29+
HMS_CATALOG_PROP_THRIFT_TRANSPORT, HMS_CATALOG_PROP_URI, HMS_CATALOG_PROP_WAREHOUSE,
30+
HmsCatalog, HmsCatalogBuilder, THRIFT_TRANSPORT_BUFFERED,
31+
};
2932
use iceberg_test_utils::docker::DockerCompose;
3033
use iceberg_test_utils::{normalize_test_name, set_up};
3134
use port_scanner::scan_port_addr;
@@ -79,6 +82,18 @@ async fn get_catalog() -> HmsCatalog {
7982
}
8083

8184
let props = HashMap::from([
85+
(
86+
HMS_CATALOG_PROP_URI.to_string(),
87+
hms_socket_addr.to_string(),
88+
),
89+
(
90+
HMS_CATALOG_PROP_THRIFT_TRANSPORT.to_string(),
91+
THRIFT_TRANSPORT_BUFFERED.to_string(),
92+
),
93+
(
94+
HMS_CATALOG_PROP_WAREHOUSE.to_string(),
95+
"s3a://warehouse/hive".to_string(),
96+
),
8297
(
8398
S3_ENDPOINT.to_string(),
8499
format!("http://{}", minio_socket_addr),
@@ -106,14 +121,10 @@ async fn get_catalog() -> HmsCatalog {
106121
retries += 1;
107122
}
108123

109-
let config = HmsCatalogConfig::builder()
110-
.address(hms_socket_addr.to_string())
111-
.thrift_transport(HmsThriftTransport::Buffered)
112-
.warehouse("s3a://warehouse/hive".to_string())
113-
.props(props)
114-
.build();
115-
116-
HmsCatalog::new(config).unwrap()
124+
HmsCatalogBuilder::default()
125+
.load("hms", props)
126+
.await
127+
.unwrap()
117128
}
118129

119130
async fn set_test_namespace(catalog: &HmsCatalog, namespace: &NamespaceIdent) -> Result<()> {

crates/catalog/loader/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,6 @@ iceberg = { workspace = true }
3333
iceberg-catalog-rest = { workspace = true }
3434
iceberg-catalog-glue = { workspace = true }
3535
iceberg-catalog-s3tables = { workspace = true }
36+
iceberg-catalog-hms = { workspace = true }
3637
tokio = { workspace = true }
3738
async-trait = { workspace = true }

crates/catalog/loader/src/lib.rs

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::sync::Arc;
2121
use async_trait::async_trait;
2222
use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, Result};
2323
use iceberg_catalog_glue::GlueCatalogBuilder;
24+
use iceberg_catalog_hms::HmsCatalogBuilder;
2425
use iceberg_catalog_rest::RestCatalogBuilder;
2526
use iceberg_catalog_s3tables::S3TablesCatalogBuilder;
2627

@@ -32,6 +33,7 @@ static CATALOG_REGISTRY: &[(&str, CatalogBuilderFactory)] = &[
3233
("rest", || Box::new(RestCatalogBuilder::default())),
3334
("glue", || Box::new(GlueCatalogBuilder::default())),
3435
("s3tables", || Box::new(S3TablesCatalogBuilder::default())),
36+
("hms", || Box::new(HmsCatalogBuilder::default())),
3537
];
3638

3739
/// Return the list of supported catalog types.
@@ -109,17 +111,22 @@ mod tests {
109111
use crate::{CatalogLoader, load};
110112

111113
#[tokio::test]
112-
async fn test_load_glue_catalog() {
113-
use iceberg_catalog_glue::GLUE_CATALOG_PROP_WAREHOUSE;
114+
async fn test_load_unsupported_catalog() {
115+
let result = load("unsupported");
116+
assert!(result.is_err());
117+
}
114118

115-
let catalog_loader = load("glue").unwrap();
116-
let catalog = catalog_loader
119+
#[tokio::test]
120+
async fn test_catalog_loader_pattern() {
121+
use iceberg_catalog_rest::REST_CATALOG_PROP_URI;
122+
123+
let catalog = CatalogLoader::from("rest")
117124
.load(
118-
"glue".to_string(),
125+
"rest".to_string(),
119126
HashMap::from([
120127
(
121-
GLUE_CATALOG_PROP_WAREHOUSE.to_string(),
122-
"s3://test".to_string(),
128+
REST_CATALOG_PROP_URI.to_string(),
129+
"http://localhost:8080".to_string(),
123130
),
124131
("key".to_string(), "value".to_string()),
125132
]),
@@ -130,7 +137,7 @@ mod tests {
130137
}
131138

132139
#[tokio::test]
133-
async fn test_load_rest_catalog() {
140+
async fn test_catalog_loader_pattern_rest_catalog() {
134141
use iceberg_catalog_rest::REST_CATALOG_PROP_URI;
135142

136143
let catalog_loader = load("rest").unwrap();
@@ -151,22 +158,17 @@ mod tests {
151158
}
152159

153160
#[tokio::test]
154-
async fn test_load_unsupported_catalog() {
155-
let result = load("unsupported");
156-
assert!(result.is_err());
157-
}
158-
159-
#[tokio::test]
160-
async fn test_catalog_loader_pattern() {
161-
use iceberg_catalog_rest::REST_CATALOG_PROP_URI;
161+
async fn test_catalog_loader_pattern_glue_catalog() {
162+
use iceberg_catalog_glue::GLUE_CATALOG_PROP_WAREHOUSE;
162163

163-
let catalog = CatalogLoader::from("rest")
164+
let catalog_loader = load("glue").unwrap();
165+
let catalog = catalog_loader
164166
.load(
165-
"rest".to_string(),
167+
"glue".to_string(),
166168
HashMap::from([
167169
(
168-
REST_CATALOG_PROP_URI.to_string(),
169-
"http://localhost:8080".to_string(),
170+
GLUE_CATALOG_PROP_WAREHOUSE.to_string(),
171+
"s3://test".to_string(),
170172
),
171173
("key".to_string(), "value".to_string()),
172174
]),
@@ -196,6 +198,28 @@ mod tests {
196198
assert!(catalog.is_ok());
197199
}
198200

201+
#[tokio::test]
202+
async fn test_catalog_loader_pattern_hms_catalog() {
203+
use iceberg_catalog_hms::{HMS_CATALOG_PROP_URI, HMS_CATALOG_PROP_WAREHOUSE};
204+
205+
let catalog_loader = load("hms").unwrap();
206+
let catalog = catalog_loader
207+
.load(
208+
"hms".to_string(),
209+
HashMap::from([
210+
(HMS_CATALOG_PROP_URI.to_string(), "127.0.0.1:1".to_string()),
211+
(
212+
HMS_CATALOG_PROP_WAREHOUSE.to_string(),
213+
"s3://warehouse".to_string(),
214+
),
215+
("key".to_string(), "value".to_string()),
216+
]),
217+
)
218+
.await;
219+
220+
assert!(catalog.is_ok());
221+
}
222+
199223
#[tokio::test]
200224
async fn test_error_message_includes_supported_types() {
201225
let err = match load("does-not-exist") {

0 commit comments

Comments
 (0)