Skip to content

Commit 1d217d4

Browse files
Add support for custom credential loader for S3 FileIO (#1528)
## Which issue does this PR close? - Closes #1527 ## What changes are included in this PR? Adds the ability to provide custom extensions to the `FileIOBuilder`. Currently the only supported extension is `CustomAwsCredentialLoader` which is a newtype around [`AwsCredentialLoad`](https://docs.rs/reqsign/0.16.3/reqsign/trait.AwsCredentialLoad.html), which is what OpenDAL expects. I've added extensions to the `RestCatalog` as well, and when its constructing `FileIO` for table operations, passes along any defined extensions into the `FileIOBuilder`, which then get passed into the underlying OpenDAL constructors. ## Are these changes tested? Yes, tests added in `crates/iceberg/tests/file_io_s3_test.rs` that verify the extension is working.
1 parent 560a124 commit 1d217d4

File tree

7 files changed

+291
-15
lines changed

7 files changed

+291
-15
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/catalog/rest/src/catalog.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@
1717

1818
//! This module contains the iceberg REST catalog implementation.
1919
20+
use std::any::Any;
2021
use std::collections::HashMap;
2122
use std::str::FromStr;
2223

2324
use async_trait::async_trait;
24-
use iceberg::io::FileIO;
25+
use iceberg::io::{self, FileIO};
2526
use iceberg::table::Table;
2627
use iceberg::{
2728
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation,
@@ -240,6 +241,8 @@ pub struct RestCatalog {
240241
/// It's could be different from the config fetched from the server and used at runtime.
241242
user_config: RestCatalogConfig,
242243
ctx: OnceCell<RestContext>,
244+
/// Extensions for the FileIOBuilder.
245+
file_io_extensions: io::Extensions,
243246
}
244247

245248
impl RestCatalog {
@@ -248,9 +251,16 @@ impl RestCatalog {
248251
Self {
249252
user_config: config,
250253
ctx: OnceCell::new(),
254+
file_io_extensions: io::Extensions::default(),
251255
}
252256
}
253257

258+
/// Add an extension to the file IO builder.
259+
pub fn with_file_io_extension<T: Any + Send + Sync>(mut self, ext: T) -> Self {
260+
self.file_io_extensions.add(ext);
261+
self
262+
}
263+
254264
/// Gets the [`RestContext`] from the catalog.
255265
async fn context(&self) -> Result<&RestContext> {
256266
self.ctx
@@ -307,7 +317,10 @@ impl RestCatalog {
307317
};
308318

309319
let file_io = match warehouse_path.or(metadata_location) {
310-
Some(url) => FileIO::from_path(url)?.with_props(props).build()?,
320+
Some(url) => FileIO::from_path(url)?
321+
.with_props(props)
322+
.with_extensions(self.file_io_extensions.clone())
323+
.build()?,
311324
None => {
312325
return Err(Error::new(
313326
ErrorKind::Unexpected,

crates/iceberg/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ storage-fs = ["opendal/services-fs"]
3737
storage-gcs = ["opendal/services-gcs"]
3838
storage-memory = ["opendal/services-memory"]
3939
storage-oss = ["opendal/services-oss"]
40-
storage-s3 = ["opendal/services-s3"]
40+
storage-s3 = ["opendal/services-s3", "reqsign"]
4141

4242
async-std = ["dep:async-std"]
4343
tokio = ["tokio/rt-multi-thread"]
@@ -76,6 +76,7 @@ ordered-float = { workspace = true }
7676
parquet = { workspace = true, features = ["async"] }
7777
rand = { workspace = true }
7878
reqwest = { workspace = true }
79+
reqsign = { version = "0.16.3", optional = true, default-features = false }
7980
roaring = { workspace = true }
8081
rust_decimal = { workspace = true }
8182
serde = { workspace = true }

crates/iceberg/src/io/file_io.rs

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::any::{Any, TypeId};
1819
use std::collections::HashMap;
1920
use std::ops::Range;
2021
use std::sync::Arc;
@@ -167,6 +168,31 @@ impl FileIO {
167168
}
168169
}
169170

171+
/// Container for storing type-safe extensions used to configure underlying FileIO behavior.
172+
#[derive(Clone, Debug, Default)]
173+
pub struct Extensions(HashMap<TypeId, Arc<dyn Any + Send + Sync>>);
174+
175+
impl Extensions {
176+
/// Add an extension.
177+
pub fn add<T: Any + Send + Sync>(&mut self, ext: T) {
178+
self.0.insert(TypeId::of::<T>(), Arc::new(ext));
179+
}
180+
181+
/// Extends the current set of extensions with another set of extensions.
182+
pub fn extend(&mut self, extensions: Extensions) {
183+
self.0.extend(extensions.0);
184+
}
185+
186+
/// Fetch an extension.
187+
pub fn get<T>(&self) -> Option<Arc<T>>
188+
where T: 'static + Send + Sync + Clone {
189+
let type_id = TypeId::of::<T>();
190+
self.0
191+
.get(&type_id)
192+
.and_then(|arc_any| Arc::clone(arc_any).downcast::<T>().ok())
193+
}
194+
}
195+
170196
/// Builder for [`FileIO`].
171197
#[derive(Clone, Debug)]
172198
pub struct FileIOBuilder {
@@ -176,6 +202,8 @@ pub struct FileIOBuilder {
176202
scheme_str: Option<String>,
177203
/// Arguments for operator.
178204
props: HashMap<String, String>,
205+
/// Optional extensions to configure the underlying FileIO behavior.
206+
extensions: Extensions,
179207
}
180208

181209
impl FileIOBuilder {
@@ -185,6 +213,7 @@ impl FileIOBuilder {
185213
Self {
186214
scheme_str: Some(scheme_str.to_string()),
187215
props: HashMap::default(),
216+
extensions: Extensions::default(),
188217
}
189218
}
190219

@@ -193,14 +222,19 @@ impl FileIOBuilder {
193222
Self {
194223
scheme_str: None,
195224
props: HashMap::default(),
225+
extensions: Extensions::default(),
196226
}
197227
}
198228

199229
/// Fetch the scheme string.
200230
///
201231
/// The scheme_str will be empty if it's None.
202-
pub fn into_parts(self) -> (String, HashMap<String, String>) {
203-
(self.scheme_str.unwrap_or_default(), self.props)
232+
pub fn into_parts(self) -> (String, HashMap<String, String>, Extensions) {
233+
(
234+
self.scheme_str.unwrap_or_default(),
235+
self.props,
236+
self.extensions,
237+
)
204238
}
205239

206240
/// Add argument for operator.
@@ -219,6 +253,24 @@ impl FileIOBuilder {
219253
self
220254
}
221255

256+
/// Add an extension to the file IO builder.
257+
pub fn with_extension<T: Any + Send + Sync>(mut self, ext: T) -> Self {
258+
self.extensions.add(ext);
259+
self
260+
}
261+
262+
/// Adds multiple extensions to the file IO builder.
263+
pub fn with_extensions(mut self, extensions: Extensions) -> Self {
264+
self.extensions.extend(extensions);
265+
self
266+
}
267+
268+
/// Fetch an extension from the file IO builder.
269+
pub fn extension<T>(&self) -> Option<Arc<T>>
270+
where T: 'static + Send + Sync + Clone {
271+
self.extensions.get::<T>()
272+
}
273+
222274
/// Builds [`FileIO`].
223275
pub fn build(self) -> Result<FileIO> {
224276
let storage = Storage::build(self.clone())?;

crates/iceberg/src/io/storage.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ use opendal::{Operator, Scheme};
3131
#[cfg(feature = "storage-azdls")]
3232
use super::AzureStorageScheme;
3333
use super::FileIOBuilder;
34+
#[cfg(feature = "storage-s3")]
35+
use crate::io::CustomAwsCredentialLoader;
3436
use crate::{Error, ErrorKind};
3537

3638
/// The storage carries all supported storage services in iceberg
@@ -47,6 +49,7 @@ pub(crate) enum Storage {
4749
/// Storing the scheme string here to return the correct path.
4850
configured_scheme: String,
4951
config: Arc<S3Config>,
52+
customized_credential_load: Option<CustomAwsCredentialLoader>,
5053
},
5154
#[cfg(feature = "storage-gcs")]
5255
Gcs { config: Arc<GcsConfig> },
@@ -67,7 +70,7 @@ pub(crate) enum Storage {
6770
impl Storage {
6871
/// Convert iceberg config to opendal config.
6972
pub(crate) fn build(file_io_builder: FileIOBuilder) -> crate::Result<Self> {
70-
let (scheme_str, props) = file_io_builder.into_parts();
73+
let (scheme_str, props, extensions) = file_io_builder.into_parts();
7174
let scheme = Self::parse_scheme(&scheme_str)?;
7275

7376
match scheme {
@@ -79,6 +82,9 @@ impl Storage {
7982
Scheme::S3 => Ok(Self::S3 {
8083
configured_scheme: scheme_str,
8184
config: super::s3_config_parse(props)?.into(),
85+
customized_credential_load: extensions
86+
.get::<CustomAwsCredentialLoader>()
87+
.map(Arc::unwrap_or_clone),
8288
}),
8389
#[cfg(feature = "storage-gcs")]
8490
Scheme::Gcs => Ok(Self::Gcs {
@@ -144,8 +150,9 @@ impl Storage {
144150
Storage::S3 {
145151
configured_scheme,
146152
config,
153+
customized_credential_load,
147154
} => {
148-
let op = super::s3_config_build(config, path)?;
155+
let op = super::s3_config_build(config, customized_credential_load, path)?;
149156
let op_info = op.info();
150157

151158
// Check prefix of s3 path.

crates/iceberg/src/io/storage_s3.rs

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,13 @@
1616
// under the License.
1717

1818
use std::collections::HashMap;
19+
use std::sync::Arc;
1920

21+
use async_trait::async_trait;
2022
use opendal::services::S3Config;
2123
use opendal::{Configurator, Operator};
24+
pub use reqsign::{AwsCredential, AwsCredentialLoad};
25+
use reqwest::Client;
2226
use url::Url;
2327

2428
use crate::io::is_truthy;
@@ -151,7 +155,11 @@ pub(crate) fn s3_config_parse(mut m: HashMap<String, String>) -> Result<S3Config
151155
}
152156

153157
/// Build new opendal operator from give path.
154-
pub(crate) fn s3_config_build(cfg: &S3Config, path: &str) -> Result<Operator> {
158+
pub(crate) fn s3_config_build(
159+
cfg: &S3Config,
160+
customized_credential_load: &Option<CustomAwsCredentialLoader>,
161+
path: &str,
162+
) -> Result<Operator> {
155163
let url = Url::parse(path)?;
156164
let bucket = url.host_str().ok_or_else(|| {
157165
Error::new(
@@ -160,11 +168,49 @@ pub(crate) fn s3_config_build(cfg: &S3Config, path: &str) -> Result<Operator> {
160168
)
161169
})?;
162170

163-
let builder = cfg
171+
let mut builder = cfg
164172
.clone()
165173
.into_builder()
166174
// Set bucket name.
167175
.bucket(bucket);
168176

177+
if let Some(customized_credential_load) = customized_credential_load {
178+
builder = builder
179+
.customized_credential_load(customized_credential_load.clone().into_opendal_loader());
180+
}
181+
169182
Ok(Operator::new(builder)?.finish())
170183
}
184+
185+
/// Custom AWS credential loader.
186+
/// This can be used to load credentials from a custom source, such as the AWS SDK.
187+
///
188+
/// This should be set as an extension on `FileIOBuilder`.
189+
#[derive(Clone)]
190+
pub struct CustomAwsCredentialLoader(Arc<dyn AwsCredentialLoad>);
191+
192+
impl std::fmt::Debug for CustomAwsCredentialLoader {
193+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194+
f.debug_struct("CustomAwsCredentialLoader")
195+
.finish_non_exhaustive()
196+
}
197+
}
198+
199+
impl CustomAwsCredentialLoader {
200+
/// Create a new custom AWS credential loader.
201+
pub fn new(loader: Arc<dyn AwsCredentialLoad>) -> Self {
202+
Self(loader)
203+
}
204+
205+
/// Convert this loader into an opendal compatible loader for customized AWS credentials.
206+
pub fn into_opendal_loader(self) -> Box<dyn AwsCredentialLoad> {
207+
Box::new(self)
208+
}
209+
}
210+
211+
#[async_trait]
212+
impl AwsCredentialLoad for CustomAwsCredentialLoader {
213+
async fn load_credential(&self, client: Client) -> anyhow::Result<Option<AwsCredential>> {
214+
self.0.load_credential(client).await
215+
}
216+
}

0 commit comments

Comments
 (0)