Skip to content

Commit 3cc0eaa

Browse files
committed
Integrate FileIO with Storage trait
1 parent b24ab63 commit 3cc0eaa

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+785
-668
lines changed

bindings/python/src/datafusion_table_provider.rs

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use std::sync::Arc;
2222
use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
2323
use datafusion_ffi::table_provider::FFI_TableProvider;
2424
use iceberg::TableIdent;
25-
use iceberg::io::FileIO;
25+
use iceberg::io::{FileIOBuilder, OpenDalStorageFactory, StorageFactory};
2626
use iceberg::table::StaticTable;
2727
use iceberg_datafusion::table::IcebergStaticTableProvider;
2828
use pyo3::exceptions::{PyRuntimeError, PyValueError};
@@ -31,6 +31,29 @@ use pyo3::types::{PyAny, PyCapsule};
3131

3232
use crate::runtime::runtime;
3333

34+
/// Parse the scheme from a URL and return the appropriate StorageFactory.
35+
fn storage_factory_from_path(path: &str) -> PyResult<Arc<dyn StorageFactory>> {
36+
let scheme = path
37+
.split("://")
38+
.next()
39+
.ok_or_else(|| PyRuntimeError::new_err(format!("Invalid path, missing scheme: {path}")))?;
40+
41+
let factory: Arc<dyn StorageFactory> = match scheme {
42+
"file" | "" => Arc::new(OpenDalStorageFactory::Fs),
43+
"s3" | "s3a" => Arc::new(OpenDalStorageFactory::S3 {
44+
customized_credential_load: None,
45+
}),
46+
"memory" => Arc::new(OpenDalStorageFactory::Memory),
47+
_ => {
48+
return Err(PyRuntimeError::new_err(format!(
49+
"Unsupported storage scheme: {scheme}"
50+
)));
51+
}
52+
};
53+
54+
Ok(factory)
55+
}
56+
3457
pub(crate) fn validate_pycapsule(capsule: &Bound<PyCapsule>, name: &str) -> PyResult<()> {
3558
let capsule_name = capsule.name()?;
3659
if capsule_name.is_none() {
@@ -85,16 +108,15 @@ impl PyIcebergDataFusionTable {
85108
let table_ident = TableIdent::from_strs(identifier)
86109
.map_err(|e| PyRuntimeError::new_err(format!("Invalid table identifier: {e}")))?;
87110

88-
let mut builder = FileIO::from_path(&metadata_location)
89-
.map_err(|e| PyRuntimeError::new_err(format!("Failed to init FileIO: {e}")))?;
111+
let factory = storage_factory_from_path(&metadata_location)?;
112+
113+
let mut builder = FileIOBuilder::new(factory);
90114

91115
if let Some(props) = file_io_properties {
92116
builder = builder.with_props(props);
93117
}
94118

95-
let file_io = builder
96-
.build()
97-
.map_err(|e| PyRuntimeError::new_err(format!("Failed to build FileIO: {e}")))?;
119+
let file_io = builder.build();
98120

99121
let static_table =
100122
StaticTable::from_metadata_file(&metadata_location, table_ident, file_io)

crates/catalog/glue/src/catalog.rs

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,16 @@
1717

1818
use std::collections::HashMap;
1919
use std::fmt::Debug;
20+
use std::sync::Arc;
2021

2122
use anyhow::anyhow;
2223
use async_trait::async_trait;
2324
use aws_sdk_glue::operation::create_table::CreateTableError;
2425
use aws_sdk_glue::operation::update_table::UpdateTableError;
2526
use aws_sdk_glue::types::TableInput;
2627
use iceberg::io::{
27-
FileIO, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, S3_SESSION_TOKEN,
28+
FileIO, FileIOBuilder, LocalFsStorageFactory, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION,
29+
S3_SECRET_ACCESS_KEY, S3_SESSION_TOKEN, StorageFactory,
2830
};
2931
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
3032
use iceberg::table::Table;
@@ -51,47 +53,58 @@ pub const GLUE_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
5153

5254
/// Builder for [`GlueCatalog`].
5355
#[derive(Debug)]
54-
pub struct GlueCatalogBuilder(GlueCatalogConfig);
56+
pub struct GlueCatalogBuilder {
57+
config: GlueCatalogConfig,
58+
storage_factory: Option<Arc<dyn StorageFactory>>,
59+
}
5560

5661
impl Default for GlueCatalogBuilder {
5762
fn default() -> Self {
58-
Self(GlueCatalogConfig {
59-
name: None,
60-
uri: None,
61-
catalog_id: None,
62-
warehouse: "".to_string(),
63-
props: HashMap::new(),
64-
})
63+
Self {
64+
config: GlueCatalogConfig {
65+
name: None,
66+
uri: None,
67+
catalog_id: None,
68+
warehouse: "".to_string(),
69+
props: HashMap::new(),
70+
},
71+
storage_factory: None,
72+
}
6573
}
6674
}
6775

6876
impl CatalogBuilder for GlueCatalogBuilder {
6977
type C = GlueCatalog;
7078

79+
fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
80+
self.storage_factory = Some(storage_factory);
81+
self
82+
}
83+
7184
fn load(
7285
mut self,
7386
name: impl Into<String>,
7487
props: HashMap<String, String>,
7588
) -> impl Future<Output = Result<Self::C>> + Send {
76-
self.0.name = Some(name.into());
89+
self.config.name = Some(name.into());
7790

7891
if props.contains_key(GLUE_CATALOG_PROP_URI) {
79-
self.0.uri = props.get(GLUE_CATALOG_PROP_URI).cloned()
92+
self.config.uri = props.get(GLUE_CATALOG_PROP_URI).cloned()
8093
}
8194

8295
if props.contains_key(GLUE_CATALOG_PROP_CATALOG_ID) {
83-
self.0.catalog_id = props.get(GLUE_CATALOG_PROP_CATALOG_ID).cloned()
96+
self.config.catalog_id = props.get(GLUE_CATALOG_PROP_CATALOG_ID).cloned()
8497
}
8598

8699
if props.contains_key(GLUE_CATALOG_PROP_WAREHOUSE) {
87-
self.0.warehouse = props
100+
self.config.warehouse = props
88101
.get(GLUE_CATALOG_PROP_WAREHOUSE)
89102
.cloned()
90103
.unwrap_or_default();
91104
}
92105

93106
// Collect other remaining properties
94-
self.0.props = props
107+
self.config.props = props
95108
.into_iter()
96109
.filter(|(k, _)| {
97110
k != GLUE_CATALOG_PROP_URI
@@ -101,20 +114,20 @@ impl CatalogBuilder for GlueCatalogBuilder {
101114
.collect();
102115

103116
async move {
104-
if self.0.name.is_none() {
117+
if self.config.name.is_none() {
105118
return Err(Error::new(
106119
ErrorKind::DataInvalid,
107120
"Catalog name is required",
108121
));
109122
}
110-
if self.0.warehouse.is_empty() {
123+
if self.config.warehouse.is_empty() {
111124
return Err(Error::new(
112125
ErrorKind::DataInvalid,
113126
"Catalog warehouse is required",
114127
));
115128
}
116129

117-
GlueCatalog::new(self.0).await
130+
GlueCatalog::new(self.config, self.storage_factory).await
118131
}
119132
}
120133
}
@@ -148,7 +161,10 @@ impl Debug for GlueCatalog {
148161

149162
impl GlueCatalog {
150163
/// Create a new glue catalog
151-
async fn new(config: GlueCatalogConfig) -> Result<Self> {
164+
async fn new(
165+
config: GlueCatalogConfig,
166+
storage_factory: Option<Arc<dyn StorageFactory>>,
167+
) -> Result<Self> {
152168
let sdk_config = create_sdk_config(&config.props, config.uri.as_ref()).await;
153169
let mut file_io_props = config.props.clone();
154170
if !file_io_props.contains_key(S3_ACCESS_KEY_ID)
@@ -182,9 +198,11 @@ impl GlueCatalog {
182198

183199
let client = aws_sdk_glue::Client::new(&sdk_config);
184200

185-
let file_io = FileIO::from_path(&config.warehouse)?
201+
// Use provided factory or default to LocalFsStorageFactory
202+
let factory = storage_factory.unwrap_or_else(|| Arc::new(LocalFsStorageFactory));
203+
let file_io = FileIOBuilder::new(factory)
186204
.with_props(file_io_props)
187-
.build()?;
205+
.build();
188206

189207
Ok(GlueCatalog {
190208
config,

crates/catalog/glue/tests/glue_catalog_test.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,12 @@
2121
//! Each test uses unique namespaces based on module path to avoid conflicts.
2222
2323
use std::collections::HashMap;
24+
use std::sync::Arc;
2425

25-
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
26+
use iceberg::io::{
27+
FileIOBuilder, OpenDalStorageFactory, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION,
28+
S3_SECRET_ACCESS_KEY,
29+
};
2630
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
2731
use iceberg::transaction::{ApplyTransactionAction, Transaction};
2832
use iceberg::{
@@ -59,11 +63,11 @@ async fn get_catalog() -> GlueCatalog {
5963
]);
6064

6165
// Wait for bucket to actually exist
62-
let file_io = iceberg::io::FileIO::from_path("s3a://")
63-
.unwrap()
64-
.with_props(props.clone())
65-
.build()
66-
.unwrap();
66+
let file_io = FileIOBuilder::new(Arc::new(OpenDalStorageFactory::S3 {
67+
customized_credential_load: None,
68+
}))
69+
.with_props(props.clone())
70+
.build();
6771

6872
let mut retries = 0;
6973
while retries < 30 {

crates/catalog/hms/src/catalog.rs

Lines changed: 39 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,15 @@
1818
use std::collections::HashMap;
1919
use std::fmt::{Debug, Formatter};
2020
use std::net::ToSocketAddrs;
21+
use std::sync::Arc;
2122

2223
use anyhow::anyhow;
2324
use async_trait::async_trait;
2425
use hive_metastore::{
2526
ThriftHiveMetastoreClient, ThriftHiveMetastoreClientBuilder,
2627
ThriftHiveMetastoreGetDatabaseException, ThriftHiveMetastoreGetTableException,
2728
};
28-
use iceberg::io::FileIO;
29+
use iceberg::io::{FileIO, FileIOBuilder, LocalFsStorageFactory, StorageFactory};
2930
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
3031
use iceberg::table::Table;
3132
use iceberg::{
@@ -50,52 +51,63 @@ pub const THRIFT_TRANSPORT_BUFFERED: &str = "buffered";
5051
/// HMS Catalog warehouse location
5152
pub const HMS_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
5253

53-
/// Builder for [`RestCatalog`].
54+
/// Builder for [`HmsCatalog`].
5455
#[derive(Debug)]
55-
pub struct HmsCatalogBuilder(HmsCatalogConfig);
56+
pub struct HmsCatalogBuilder {
57+
config: HmsCatalogConfig,
58+
storage_factory: Option<Arc<dyn StorageFactory>>,
59+
}
5660

5761
impl Default for HmsCatalogBuilder {
5862
fn default() -> Self {
59-
Self(HmsCatalogConfig {
60-
name: None,
61-
address: "".to_string(),
62-
thrift_transport: HmsThriftTransport::default(),
63-
warehouse: "".to_string(),
64-
props: HashMap::new(),
65-
})
63+
Self {
64+
config: HmsCatalogConfig {
65+
name: None,
66+
address: "".to_string(),
67+
thrift_transport: HmsThriftTransport::default(),
68+
warehouse: "".to_string(),
69+
props: HashMap::new(),
70+
},
71+
storage_factory: None,
72+
}
6673
}
6774
}
6875

6976
impl CatalogBuilder for HmsCatalogBuilder {
7077
type C = HmsCatalog;
7178

79+
fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
80+
self.storage_factory = Some(storage_factory);
81+
self
82+
}
83+
7284
fn load(
7385
mut self,
7486
name: impl Into<String>,
7587
props: HashMap<String, String>,
7688
) -> impl Future<Output = Result<Self::C>> + Send {
77-
self.0.name = Some(name.into());
89+
self.config.name = Some(name.into());
7890

7991
if props.contains_key(HMS_CATALOG_PROP_URI) {
80-
self.0.address = props.get(HMS_CATALOG_PROP_URI).cloned().unwrap_or_default();
92+
self.config.address = props.get(HMS_CATALOG_PROP_URI).cloned().unwrap_or_default();
8193
}
8294

8395
if let Some(tt) = props.get(HMS_CATALOG_PROP_THRIFT_TRANSPORT) {
84-
self.0.thrift_transport = match tt.to_lowercase().as_str() {
96+
self.config.thrift_transport = match tt.to_lowercase().as_str() {
8597
THRIFT_TRANSPORT_FRAMED => HmsThriftTransport::Framed,
8698
THRIFT_TRANSPORT_BUFFERED => HmsThriftTransport::Buffered,
8799
_ => HmsThriftTransport::default(),
88100
};
89101
}
90102

91103
if props.contains_key(HMS_CATALOG_PROP_WAREHOUSE) {
92-
self.0.warehouse = props
104+
self.config.warehouse = props
93105
.get(HMS_CATALOG_PROP_WAREHOUSE)
94106
.cloned()
95107
.unwrap_or_default();
96108
}
97109

98-
self.0.props = props
110+
self.config.props = props
99111
.into_iter()
100112
.filter(|(k, _)| {
101113
k != HMS_CATALOG_PROP_URI
@@ -105,23 +117,23 @@ impl CatalogBuilder for HmsCatalogBuilder {
105117
.collect();
106118

107119
let result = {
108-
if self.0.name.is_none() {
120+
if self.config.name.is_none() {
109121
Err(Error::new(
110122
ErrorKind::DataInvalid,
111123
"Catalog name is required",
112124
))
113-
} else if self.0.address.is_empty() {
125+
} else if self.config.address.is_empty() {
114126
Err(Error::new(
115127
ErrorKind::DataInvalid,
116128
"Catalog address is required",
117129
))
118-
} else if self.0.warehouse.is_empty() {
130+
} else if self.config.warehouse.is_empty() {
119131
Err(Error::new(
120132
ErrorKind::DataInvalid,
121133
"Catalog warehouse is required",
122134
))
123135
} else {
124-
HmsCatalog::new(self.0)
136+
HmsCatalog::new(self.config, self.storage_factory)
125137
}
126138
};
127139

@@ -169,7 +181,10 @@ impl Debug for HmsCatalog {
169181

170182
impl HmsCatalog {
171183
/// Create a new hms catalog.
172-
fn new(config: HmsCatalogConfig) -> Result<Self> {
184+
fn new(
185+
config: HmsCatalogConfig,
186+
storage_factory: Option<Arc<dyn StorageFactory>>,
187+
) -> Result<Self> {
173188
let address = config
174189
.address
175190
.as_str()
@@ -194,9 +209,11 @@ impl HmsCatalog {
194209
.build(),
195210
};
196211

197-
let file_io = FileIO::from_path(&config.warehouse)?
212+
// Use provided factory or default to LocalFsStorageFactory
213+
let factory = storage_factory.unwrap_or_else(|| Arc::new(LocalFsStorageFactory));
214+
let file_io = FileIOBuilder::new(factory)
198215
.with_props(&config.props)
199-
.build()?;
216+
.build();
200217

201218
Ok(Self {
202219
config,

0 commit comments

Comments
 (0)