Skip to content

Commit 0372bff

Browse files
committed
feat: remove redundant Handle from parse_url_opts
It looks like `StorageConfig` has everything that implementors of the trait should need, so just using that to pass information through! Signed-off-by: R. Tyler Croy <[email protected]>
1 parent 5356a11 commit 0372bff

File tree

12 files changed

+54
-67
lines changed

12 files changed

+54
-67
lines changed

crates/aws/src/storage.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use futures::stream::BoxStream;
2121
use futures::Future;
2222
use object_store::aws::AmazonS3;
2323
use object_store::client::SpawnedReqwestConnector;
24-
use tokio::runtime::Handle;
2524
use tracing::log::*;
2625
use url::Url;
2726

@@ -41,7 +40,6 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
4140
&self,
4241
url: &Url,
4342
config: &StorageConfig,
44-
handle: Option<Handle>,
4543
) -> DeltaResult<(ObjectStoreRef, Path)> {
4644
let options = self.with_env_s3(&config.raw);
4745

@@ -50,8 +48,9 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
5048
.with_url(url.to_string())
5149
.with_retry(config.retry.clone());
5250

53-
if let Some(handle) = handle {
54-
builder = builder.with_http_connector(SpawnedReqwestConnector::new(handle));
51+
if let Some(runtime) = &config.runtime {
52+
builder =
53+
builder.with_http_connector(SpawnedReqwestConnector::new(runtime.get_handle()));
5554
}
5655

5756
for (key, value) in options.iter() {

crates/azure/src/lib.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ use deltalake_core::{DeltaResult, DeltaTableError, Path};
1010
use object_store::azure::{AzureConfigKey, MicrosoftAzureBuilder};
1111
use object_store::client::SpawnedReqwestConnector;
1212
use object_store::ObjectStoreScheme;
13-
use tokio::runtime::Handle;
1413
use url::Url;
1514

1615
mod config;
@@ -41,17 +40,17 @@ impl ObjectStoreFactory for AzureFactory {
4140
&self,
4241
url: &Url,
4342
config: &StorageConfig,
44-
handle: Option<Handle>,
4543
) -> DeltaResult<(ObjectStoreRef, Path)> {
4644
let mut builder = MicrosoftAzureBuilder::new()
4745
.with_url(url.to_string())
4846
.with_retry(config.retry.clone());
49-
let config = config::AzureConfigHelper::try_new(config.raw.as_azure_options())?.build()?;
50-
51-
if let Some(handle) = handle {
52-
builder = builder.with_http_connector(SpawnedReqwestConnector::new(handle));
47+
if let Some(runtime) = &config.runtime {
48+
builder =
49+
builder.with_http_connector(SpawnedReqwestConnector::new(runtime.get_handle()));
5350
}
5451

52+
let config = config::AzureConfigHelper::try_new(config.raw.as_azure_options())?.build()?;
53+
5554
for (key, value) in config.iter() {
5655
builder = builder.with_config(*key, value.clone());
5756
}

crates/catalog-unity/src/lib.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use std::collections::HashMap;
1717
use std::future::Future;
1818
use std::str::FromStr;
1919
use std::sync::Arc;
20-
use tokio::runtime::Handle;
2120

2221
use crate::credential::{
2322
AzureCliCredential, ClientSecretOAuthProvider, CredentialProvider, WorkspaceOAuthProvider,
@@ -35,7 +34,7 @@ use deltalake_core::{
3534

3635
use crate::client::retry::*;
3736
use deltalake_core::logstore::{
38-
config::str_is_truthy, object_store_factories, IORuntime, ObjectStoreFactory, ObjectStoreRef,
37+
config::str_is_truthy, object_store_factories, ObjectStoreFactory, ObjectStoreRef,
3938
};
4039
pub mod client;
4140
pub mod credential;
@@ -840,7 +839,6 @@ impl ObjectStoreFactory for UnityCatalogFactory {
840839
&self,
841840
table_uri: &Url,
842841
config: &StorageConfig,
843-
handle: Option<Handle>,
844842
) -> DeltaResult<(ObjectStoreRef, Path)> {
845843
let (table_path, temp_creds) = UnityCatalogBuilder::execute_uc_future(
846844
UnityCatalogBuilder::get_uc_location_and_token(table_uri.as_str()),

crates/core/src/logstore/config.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,7 @@ mod tests {
345345
}
346346

347347
// Test StorageConfig parsing
348+
#[cfg(feature = "cloud")]
348349
#[test]
349350
fn test_storage_config_parsing() {
350351
let options = hashmap! {

crates/core/src/logstore/factories.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ pub trait ObjectStoreFactory: Send + Sync {
2828
&self,
2929
url: &Url,
3030
config: &StorageConfig,
31-
handle: Option<Handle>,
3231
) -> DeltaResult<(ObjectStoreRef, Path)>;
3332
}
3433

@@ -40,12 +39,12 @@ impl ObjectStoreFactory for DefaultObjectStoreFactory {
4039
&self,
4140
url: &Url,
4241
config: &StorageConfig,
43-
handle: Option<Handle>,
4442
) -> DeltaResult<(ObjectStoreRef, Path)> {
4543
let (mut store, path) = default_parse_url_opts(url, &config.raw)?;
4644

47-
if let Some(handle) = handle {
48-
store = Arc::new(DeltaIOStorageBackend::new(store, handle)) as Arc<DynObjectStore>;
45+
if let Some(runtime) = &config.runtime {
46+
store =
47+
Arc::new(DeltaIOStorageBackend::new(store, runtime.clone())) as Arc<DynObjectStore>;
4948
}
5049
Ok((store, path))
5150
}
@@ -89,7 +88,7 @@ where
8988
let scheme = Url::parse(&format!("{}://", url.scheme())).unwrap();
9089
let storage_config = StorageConfig::parse_options(options)?;
9190
if let Some(factory) = object_store_factories().get(&scheme) {
92-
let (store, _prefix) = factory.parse_url_opts(url, &storage_config, None)?;
91+
let (store, _prefix) = factory.parse_url_opts(url, &storage_config)?;
9392
let store = storage_config.decorate_store(store, url)?;
9493
Ok(Arc::new(store))
9594
} else {

crates/core/src/logstore/mod.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -180,11 +180,7 @@ pub fn logstore_for(location: Url, storage_config: StorageConfig) -> DeltaResult
180180

181181
if let Some(entry) = object_store_factories().get(&scheme) {
182182
debug!("Found a storage provider for {scheme} ({location})");
183-
let (root_store, _prefix) = entry.value().parse_url_opts(
184-
&location,
185-
&storage_config,
186-
storage_config.runtime.clone().map(|rt| rt.get_handle()),
187-
)?;
183+
let (root_store, _prefix) = entry.value().parse_url_opts(&location, &storage_config)?;
188184
return logstore_with(root_store, location, storage_config);
189185
}
190186

crates/core/src/logstore/storage/runtime.rs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -108,18 +108,15 @@ impl IORuntime {
108108
#[derive(Clone)]
109109
pub struct DeltaIOStorageBackend<T: ObjectStore + Clone> {
110110
pub inner: T,
111-
pub rt_handle: Handle,
111+
rt: IORuntime,
112112
}
113113

114114
impl<T> DeltaIOStorageBackend<T>
115115
where
116116
T: ObjectStore + Clone,
117117
{
118-
pub fn new(store: T, handle: Handle) -> Self {
119-
Self {
120-
inner: store,
121-
rt_handle: handle,
122-
}
118+
pub fn new(store: T, rt: IORuntime) -> Self {
119+
Self { inner: store, rt }
123120
}
124121
}
125122

@@ -136,7 +133,10 @@ impl<T: ObjectStore + Clone> DeltaIOStorageBackend<T> {
136133
O: Send + 'static,
137134
{
138135
let store = store.clone();
139-
let fut = self.rt_handle.spawn(async move { f(&store, &path).await });
136+
let fut = self
137+
.rt
138+
.get_handle()
139+
.spawn(async move { f(&store, &path).await });
140140
fut.unwrap_or_else(|e| match e.try_into_panic() {
141141
Ok(p) => std::panic::resume_unwind(p),
142142
Err(e) => Err(ObjectStoreError::JoinError { source: e }),
@@ -160,7 +160,8 @@ impl<T: ObjectStore + Clone> DeltaIOStorageBackend<T> {
160160
{
161161
let store = store.clone();
162162
let fut = self
163-
.rt_handle
163+
.rt
164+
.get_handle()
164165
.spawn(async move { f(&store, &from, &to).await });
165166
fut.unwrap_or_else(|e| match e.try_into_panic() {
166167
Ok(p) => std::panic::resume_unwind(p),
@@ -316,3 +317,13 @@ impl<T: ObjectStore + Clone> ObjectStore for DeltaIOStorageBackend<T> {
316317
.await
317318
}
318319
}
320+
321+
#[cfg(test)]
322+
mod tests {
323+
use super::*;
324+
325+
#[tokio::test]
326+
async fn test_ioruntime_default() {
327+
let _ = IORuntime::default();
328+
}
329+
}

crates/gcp/src/lib.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ use deltalake_core::logstore::{
1010
};
1111
use deltalake_core::{DeltaResult, DeltaTableError, Path};
1212
use object_store::client::SpawnedReqwestConnector;
13-
use tokio::runtime::Handle;
1413
use url::Url;
1514

1615
mod config;
@@ -42,11 +41,14 @@ impl ObjectStoreFactory for GcpFactory {
4241
&self,
4342
url: &Url,
4443
config: &StorageConfig,
45-
handle: Option<Handle>,
4644
) -> DeltaResult<(ObjectStoreRef, Path)> {
4745
let mut builder = GoogleCloudStorageBuilder::new().with_url(url.to_string());
4846
builder = builder.with_retry(config.retry.clone());
4947

48+
if let Some(runtime) = &config.runtime {
49+
builder =
50+
builder.with_http_connector(SpawnedReqwestConnector::new(runtime.get_handle()));
51+
}
5052
let config = config::GcpConfigHelper::try_new(config.raw.as_gcp_options())?.build()?;
5153

5254
let (_, path) =
@@ -55,10 +57,6 @@ impl ObjectStoreFactory for GcpFactory {
5557
})?;
5658
let prefix = Path::parse(path)?;
5759

58-
if let Some(handle) = handle {
59-
builder = builder.with_http_connector(SpawnedReqwestConnector::new(handle));
60-
}
61-
6260
for (key, value) in config.iter() {
6361
builder = builder.with_config(*key, value.clone());
6462
}

crates/hdfs/src/lib.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use deltalake_core::logstore::{
77
use deltalake_core::logstore::{object_store_factories, ObjectStoreFactory, ObjectStoreRef};
88
use deltalake_core::{DeltaResult, Path};
99
use hdfs_native_object_store::HdfsObjectStore;
10-
use tokio::runtime::Handle;
1110
use url::Url;
1211

1312
#[derive(Clone, Default, Debug)]
@@ -18,16 +17,15 @@ impl ObjectStoreFactory for HdfsFactory {
1817
&self,
1918
url: &Url,
2019
config: &StorageConfig,
21-
handle: Option<Handle>,
2220
) -> DeltaResult<(ObjectStoreRef, Path)> {
2321
let mut store: ObjectStoreRef = Arc::new(HdfsObjectStore::with_config(
2422
url.as_str(),
2523
config.raw.clone(),
2624
)?);
2725

2826
// HDFS doesn't have the spawnService, so we still wrap it in the old io storage backend (not as optimal though)
29-
if let Some(handle) = handle {
30-
store = Arc::new(DeltaIOStorageBackend::new(store, handle));
27+
if let Some(runtime) = &config.runtime {
28+
store = Arc::new(DeltaIOStorageBackend::new(store, runtime.clone()));
3129
};
3230
let prefix = Path::parse(url.path())?;
3331
Ok((store, prefix))
@@ -71,7 +69,6 @@ mod tests {
7169
let _ = factory.parse_url_opts(
7270
&Url::parse("hdfs://localhost:9000").expect("Failed to parse hdfs://"),
7371
&StorageConfig::default(),
74-
None,
7572
)?;
7673
Ok(())
7774
}

crates/lakefs/src/logstore.rs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -92,22 +92,14 @@ impl LakeFSLogStore {
9292

9393
/// Build a new object store for an URL using the existing storage options. After
9494
/// branch creation a new object store needs to be created for the branch uri
95-
fn build_new_store(
96-
&self,
97-
url: &Url,
98-
io_runtime: Option<IORuntime>,
99-
) -> DeltaResult<ObjectStoreRef> {
95+
fn build_new_store(&self, url: &Url) -> DeltaResult<ObjectStoreRef> {
10096
// turn location into scheme
10197
let scheme = Url::parse(&format!("{}://", url.scheme()))
10298
.map_err(|_| DeltaTableError::InvalidTableLocation(url.clone().into()))?;
10399

104100
if let Some(entry) = self.config().object_store_factory().get(&scheme) {
105101
debug!("Creating new storage with storage provider for {scheme} ({url})");
106-
let (store, _prefix) = entry.value().parse_url_opts(
107-
url,
108-
&self.config().options,
109-
io_runtime.map(|rt| rt.get_handle()),
110-
)?;
102+
let (store, _prefix) = entry.value().parse_url_opts(url, &self.config().options)?;
111103
return Ok(store);
112104
}
113105
Err(DeltaTableError::InvalidTableLocation(url.to_string()))
@@ -151,8 +143,7 @@ impl LakeFSLogStore {
151143
.await?;
152144

153145
// Build new object store store using the new lakefs url
154-
let txn_root_store =
155-
self.build_new_store(&lakefs_url, self.config().options.runtime.clone())?;
146+
let txn_root_store = self.build_new_store(&lakefs_url)?;
156147
let txn_store = Arc::new(
157148
self.config
158149
.decorate_store(txn_root_store.clone(), Some(&lakefs_url))?,

0 commit comments

Comments
 (0)