Skip to content

Commit e2b067b

Browse files
committed
feat: utilize StorageConfig for the log store's parse_url_opts
The parse_url_opts trait method has been a little bit of a thorn lately since we have wanted to pass more rich configuration options for some log store implementations through it. This change utilizes the StorageConfig struct which already contains most of the options that are necessary and gives a reasonable way to extend the struct depending on what our future needs are. This _does_ however represent a public API change, so I have bumped all the necessary crate versions to reflect that. NOTE: The `handle` argument _might_ now be redundant since StorageConfig has a `runtime` field inside it. Closes #3589 Signed-off-by: R. Tyler Croy <[email protected]>
1 parent de62d46 commit e2b067b

File tree

23 files changed

+103
-106
lines changed

23 files changed

+103
-106
lines changed

crates/aws/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "deltalake-aws"
3-
version = "0.10.0"
3+
version = "0.11.0"
44
authors.workspace = true
55
keywords.workspace = true
66
readme.workspace = true
@@ -13,7 +13,7 @@ rust-version.workspace = true
1313

1414
[dependencies]
1515
# path dependencies
16-
deltalake-core = { version = "0.27.0", path = "../core" }
16+
deltalake-core = { version = "0.28.0", path = "../core" , features = ["cloud"]}
1717

1818
# workspace dependencies
1919
async-trait = { workspace = true }

crates/aws/src/storage.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,14 @@ use deltalake_core::logstore::object_store::{
1313
GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, ObjectStoreScheme,
1414
PutMultipartOpts, PutOptions, PutPayload, PutResult, Result as ObjectStoreResult,
1515
};
16-
use deltalake_core::logstore::{config::str_is_truthy, ObjectStoreFactory, ObjectStoreRef};
16+
use deltalake_core::logstore::{
17+
config::str_is_truthy, ObjectStoreFactory, ObjectStoreRef, StorageConfig,
18+
};
1719
use deltalake_core::{DeltaResult, DeltaTableError, ObjectStoreError, Path};
1820
use futures::stream::BoxStream;
1921
use futures::Future;
2022
use object_store::aws::AmazonS3;
2123
use object_store::client::SpawnedReqwestConnector;
22-
use object_store::RetryConfig;
2324
use tokio::runtime::Handle;
2425
use tracing::log::*;
2526
use url::Url;
@@ -39,16 +40,15 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
3940
fn parse_url_opts(
4041
&self,
4142
url: &Url,
42-
storage_options: &HashMap<String, String>,
43-
retry: &RetryConfig,
43+
config: &StorageConfig,
4444
handle: Option<Handle>,
4545
) -> DeltaResult<(ObjectStoreRef, Path)> {
46-
let options = self.with_env_s3(storage_options);
46+
let options = self.with_env_s3(&config.raw);
4747

4848
// All S3-likes should start their builder the same way
4949
let mut builder = AmazonS3Builder::new()
5050
.with_url(url.to_string())
51-
.with_retry(retry.clone());
51+
.with_retry(config.retry.clone());
5252

5353
if let Some(handle) = handle {
5454
builder = builder.with_http_connector(SpawnedReqwestConnector::new(handle));

crates/azure/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "deltalake-azure"
3-
version = "0.10.0"
3+
version = "0.11.0"
44
authors.workspace = true
55
keywords.workspace = true
66
readme.workspace = true
@@ -12,7 +12,7 @@ repository.workspace = true
1212
rust-version.workspace = true
1313

1414
[dependencies]
15-
deltalake-core = { version = "0.27.0", path = "../core" }
15+
deltalake-core = { version = "0.28.0", path = "../core", features = ["cloud"] }
1616

1717
# workspace depenndecies
1818
async-trait = { workspace = true }
@@ -26,7 +26,7 @@ regex = { workspace = true }
2626
url = { workspace = true }
2727

2828
[dev-dependencies]
29-
deltalake-core = { version = "0.27.0", path = "../core", features = [
29+
deltalake-core = { version = "0.28.0", path = "../core", features = [
3030
"datafusion",
3131
] }
3232
chrono = { workspace = true }

crates/azure/src/lib.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use deltalake_core::logstore::{
99
use deltalake_core::{DeltaResult, DeltaTableError, Path};
1010
use object_store::azure::{AzureConfigKey, MicrosoftAzureBuilder};
1111
use object_store::client::SpawnedReqwestConnector;
12-
use object_store::{ObjectStoreScheme, RetryConfig};
12+
use object_store::ObjectStoreScheme;
1313
use tokio::runtime::Handle;
1414
use url::Url;
1515

@@ -40,15 +40,13 @@ impl ObjectStoreFactory for AzureFactory {
4040
fn parse_url_opts(
4141
&self,
4242
url: &Url,
43-
options: &HashMap<String, String>,
44-
retry: &RetryConfig,
43+
config: &StorageConfig,
4544
handle: Option<Handle>,
4645
) -> DeltaResult<(ObjectStoreRef, Path)> {
47-
let config = config::AzureConfigHelper::try_new(options.as_azure_options())?.build()?;
48-
4946
let mut builder = MicrosoftAzureBuilder::new()
5047
.with_url(url.to_string())
51-
.with_retry(retry.clone());
48+
.with_retry(config.retry.clone());
49+
let config = config::AzureConfigHelper::try_new(config.raw.as_azure_options())?.build()?;
5250

5351
if let Some(handle) = handle {
5452
builder = builder.with_http_connector(SpawnedReqwestConnector::new(handle));
@@ -95,3 +93,21 @@ pub fn register_handlers(_additional_prefixes: Option<Url>) {
9593
logstore_factories().insert(url.clone(), factory.clone());
9694
}
9795
}
96+
97+
#[cfg(test)]
98+
mod tests {
99+
use super::*;
100+
use std::collections::HashMap;
101+
102+
#[test]
103+
fn test_as_azure_options() {
104+
use object_store::azure::AzureConfigKey;
105+
let mut options = HashMap::default();
106+
let key = "AZURE_STORAGE_ACCOUNT_KEY".to_string();
107+
let value = "value".to_string();
108+
options.insert(key, value.clone());
109+
110+
let converted = options.as_azure_options();
111+
assert_eq!(converted.get(&AzureConfigKey::AccessKey), Some(&value));
112+
}
113+
}

crates/catalog-glue/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "deltalake-catalog-glue"
3-
version = "0.11.0"
3+
version = "0.12.0"
44
authors.workspace = true
55
keywords.workspace = true
66
readme.workspace = true
@@ -15,7 +15,7 @@ rust-version.workspace = true
1515
async-trait = { workspace = true }
1616
aws-config = "1"
1717
aws-sdk-glue = "=1.78.0"
18-
deltalake-core = { version = "0.27.0", path = "../core" }
18+
deltalake-core = { version = "0.28.0", path = "../core" }
1919
thiserror = { workspace = true }
2020

2121
[dev-dependencies]

crates/catalog-unity/Cargo.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "deltalake-catalog-unity"
3-
version = "0.11.0"
3+
version = "0.12.0"
44
authors.workspace = true
55
keywords.workspace = true
66
readme.workspace = true
@@ -20,10 +20,10 @@ thiserror.workspace = true
2020
futures.workspace = true
2121
chrono.workspace = true
2222
tracing.workspace = true
23-
deltalake-core = { version = "0.27.0", path = "../core" }
24-
deltalake-aws = { version = "0.10.0", path = "../aws", optional = true }
25-
deltalake-azure = { version = "0.10.0", path = "../azure", optional = true }
26-
deltalake-gcp = { version = "0.11.0", path = "../gcp", optional = true }
23+
deltalake-core = { version = "0.28.0", path = "../core" }
24+
deltalake-aws = { version = "0.11.0", path = "../aws", optional = true }
25+
deltalake-azure = { version = "0.11.0", path = "../azure", optional = true }
26+
deltalake-gcp = { version = "0.12.0", path = "../gcp", optional = true }
2727
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json", "http2"] }
2828
reqwest-retry = "0.7"
2929
reqwest-middleware = { version = "0.4.0", features = ["json"] }

crates/catalog-unity/src/lib.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -839,24 +839,24 @@ impl ObjectStoreFactory for UnityCatalogFactory {
839839
fn parse_url_opts(
840840
&self,
841841
table_uri: &Url,
842-
options: &HashMap<String, String>,
843-
_retry: &RetryConfig,
842+
config: &StorageConfig,
844843
handle: Option<Handle>,
845844
) -> DeltaResult<(ObjectStoreRef, Path)> {
846845
let (table_path, temp_creds) = UnityCatalogBuilder::execute_uc_future(
847846
UnityCatalogBuilder::get_uc_location_and_token(table_uri.as_str()),
848847
)??;
849848

850-
let mut storage_options = options.clone();
849+
let mut storage_options = config.raw.clone();
851850
storage_options.extend(temp_creds);
852851

853852
// TODO(roeap): we should not have to go through the table here.
854853
// ideally we just create the right storage ...
855854
let mut builder = DeltaTableBuilder::from_uri(&table_path);
856855

857-
if let Some(handle) = handle {
858-
builder = builder.with_io_runtime(IORuntime::RT(handle));
856+
if let Some(runtime) = &config.runtime {
857+
builder = builder.with_io_runtime(runtime.clone());
859858
}
859+
860860
if !storage_options.is_empty() {
861861
builder = builder.with_storage_options(storage_options.clone());
862862
}

crates/core/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "deltalake-core"
3-
version = "0.27.0"
3+
version = "0.28.0"
44
authors.workspace = true
55
keywords.workspace = true
66
readme.workspace = true
@@ -15,7 +15,7 @@ rust-version.workspace = true
1515
features = ["datafusion", "json"]
1616

1717
[dependencies]
18-
deltalake-derive = { version = "0.27.0", path = "../derive" }
18+
deltalake-derive = { version = "0.28.0", path = "../derive" }
1919

2020
delta_kernel.workspace = true
2121

crates/core/src/logstore/config.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,12 +300,13 @@ pub fn str_is_truthy(val: &str) -> bool {
300300
mod tests {
301301
use super::*;
302302
use maplit::hashmap;
303-
use object_store::RetryConfig;
304303
use std::time::Duration;
305304

306305
// Test retry config parsing
306+
#[cfg(feature = "cloud")]
307307
#[test]
308308
fn test_retry_config_from_options() {
309+
use object_store::RetryConfig;
309310
let options = hashmap! {
310311
"max_retries".to_string() => "100".to_string() ,
311312
"retry_timeout".to_string() => "300s".to_string() ,
@@ -324,8 +325,10 @@ mod tests {
324325
}
325326

326327
// Test ParseResult functionality
328+
#[cfg(feature = "cloud")]
327329
#[test]
328330
fn test_parse_result_handling() {
331+
use object_store::RetryConfig;
329332
let options = hashmap! {
330333
"retry_timeout".to_string() => "300s".to_string(),
331334
"max_retries".to_string() => "not_a_number".to_string(),

crates/core/src/logstore/factories.rs

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ use std::{
44
};
55

66
use dashmap::DashMap;
7-
#[cfg(feature = "cloud")]
8-
use object_store::RetryConfig;
97
use object_store::{path::Path, DynObjectStore};
108
use tokio::runtime::Handle;
119
use url::Url;
@@ -26,20 +24,10 @@ pub trait ObjectStoreFactory: Send + Sync {
2624
/// corresponding to the path segment of the URL.
2725
///
2826
/// The store should __NOT__ apply the decorations via the passed `options`
29-
#[cfg(feature = "cloud")]
3027
fn parse_url_opts(
3128
&self,
3229
url: &Url,
33-
options: &HashMap<String, String>,
34-
retry: &RetryConfig,
35-
handle: Option<Handle>,
36-
) -> DeltaResult<(ObjectStoreRef, Path)>;
37-
38-
#[cfg(not(feature = "cloud"))]
39-
fn parse_url_opts(
40-
&self,
41-
url: &Url,
42-
options: &HashMap<String, String>,
30+
config: &StorageConfig,
4331
handle: Option<Handle>,
4432
) -> DeltaResult<(ObjectStoreRef, Path)>;
4533
}
@@ -51,11 +39,10 @@ impl ObjectStoreFactory for DefaultObjectStoreFactory {
5139
fn parse_url_opts(
5240
&self,
5341
url: &Url,
54-
options: &HashMap<String, String>,
55-
#[cfg(feature = "cloud")] _retry: &RetryConfig,
42+
config: &StorageConfig,
5643
handle: Option<Handle>,
5744
) -> DeltaResult<(ObjectStoreRef, Path)> {
58-
let (mut store, path) = default_parse_url_opts(url, options)?;
45+
let (mut store, path) = default_parse_url_opts(url, &config.raw)?;
5946

6047
if let Some(handle) = handle {
6148
store = Arc::new(DeltaIOStorageBackend::new(store, handle)) as Arc<DynObjectStore>;
@@ -102,11 +89,7 @@ where
10289
let scheme = Url::parse(&format!("{}://", url.scheme())).unwrap();
10390
let storage_config = StorageConfig::parse_options(options)?;
10491
if let Some(factory) = object_store_factories().get(&scheme) {
105-
#[cfg(feature = "cloud")]
106-
let (store, _prefix) =
107-
factory.parse_url_opts(url, &storage_config.raw, &storage_config.retry, None)?;
108-
#[cfg(not(feature = "cloud"))]
109-
let (store, _prefix) = factory.parse_url_opts(url, &storage_config.raw, None)?;
92+
let (store, _prefix) = factory.parse_url_opts(url, &storage_config, None)?;
11093
let store = storage_config.decorate_store(store, url)?;
11194
Ok(Arc::new(store))
11295
} else {
@@ -176,3 +159,6 @@ pub fn logstore_factories() -> LogStoreFactoryRegistry {
176159
})
177160
.clone()
178161
}
162+
163+
#[cfg(test)]
164+
mod tests {}

0 commit comments

Comments
 (0)