Skip to content

Commit 5f119a0

Browse files
omkar-fossion-elgreco
authored andcommitted
fix: add dedicated function to execute futures
Signed-off-by: Omkar P <[email protected]>
1 parent 2919ac1 commit 5f119a0

File tree

1 file changed

+47
-6
lines changed
  • crates/catalog-unity/src

1 file changed

+47
-6
lines changed

crates/catalog-unity/src/lib.rs

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFa
1212
use reqwest::header::{HeaderValue, InvalidHeaderValue, AUTHORIZATION};
1313
use reqwest::Url;
1414
use std::collections::HashMap;
15+
use std::future::Future;
1516
use std::str::FromStr;
1617
use std::sync::Arc;
1718

@@ -25,7 +26,8 @@ use crate::models::{
2526

2627
use deltalake_core::data_catalog::DataCatalogResult;
2728
use deltalake_core::{
28-
DataCatalog, DataCatalogError, DeltaResult, DeltaTableBuilder, DeltaTableError, Path,
29+
DataCatalog, DataCatalogError, DeltaResult, DeltaTableBuilder, DeltaTableError,
30+
ObjectStoreError, Path,
2931
};
3032

3133
use crate::client::retry::*;
@@ -35,6 +37,8 @@ use deltalake_core::storage::{
3537
};
3638
pub mod client;
3739
pub mod credential;
40+
41+
const STORE_NAME: &str = "UnityCatalogObjectStore";
3842
#[cfg(feature = "datafusion")]
3943
pub mod datafusion;
4044
pub mod models;
@@ -105,6 +109,10 @@ pub enum UnityCatalogError {
105109
#[error("Datafusion error: {0}")]
106110
DatafusionError(#[from] datafusion_common::DataFusionError),
107111

112+
/// Cannot initialize DynamoDbConfiguration due to some sort of threading issue
113+
#[error("Unable to initialize Unity Catalog, potentially a threading issue")]
114+
InitializationError,
115+
108116
/// A generic error from a source
109117
#[error("An error occurred in catalog: {source}")]
110118
Generic {
@@ -487,6 +495,41 @@ impl UnityCatalogBuilder {
487495
self
488496
}
489497

498+
fn execute_uc_future<F, T>(future: F) -> DeltaResult<T>
499+
where
500+
T: Send,
501+
F: Future<Output = T> + Send,
502+
{
503+
match tokio::runtime::Handle::try_current() {
504+
Ok(handle) => match handle.runtime_flavor() {
505+
tokio::runtime::RuntimeFlavor::MultiThread => {
506+
Ok(tokio::task::block_in_place(move || handle.block_on(future)))
507+
}
508+
_ => {
509+
let mut cfg: Option<T> = None;
510+
std::thread::scope(|scope| {
511+
scope.spawn(|| {
512+
cfg = Some(handle.block_on(future));
513+
});
514+
});
515+
cfg.ok_or(DeltaTableError::ObjectStore {
516+
source: ObjectStoreError::Generic {
517+
store: STORE_NAME,
518+
source: Box::new(UnityCatalogError::InitializationError),
519+
},
520+
})
521+
}
522+
},
523+
Err(_) => {
524+
let runtime = tokio::runtime::Builder::new_current_thread()
525+
.enable_all()
526+
.build()
527+
.expect("a tokio runtime is required by the Unity Catalog Builder");
528+
Ok(runtime.block_on(future))
529+
}
530+
}
531+
}
532+
490533
/// Returns the storage location and temporary token to be used with the
491534
/// Unity Catalog table.
492535
pub async fn get_uc_location_and_token(
@@ -800,11 +843,9 @@ impl ObjectStoreFactory for UnityCatalogFactory {
800843
table_uri: &Url,
801844
options: &StorageOptions,
802845
) -> DeltaResult<(ObjectStoreRef, Path)> {
803-
use futures::executor::block_on;
804-
805-
let (table_path, temp_creds) = block_on(UnityCatalogBuilder::get_uc_location_and_token(
806-
table_uri.as_str(),
807-
))?;
846+
let (table_path, temp_creds) = UnityCatalogBuilder::execute_uc_future(
847+
UnityCatalogBuilder::get_uc_location_and_token(table_uri.as_str()),
848+
)?.map_err(UnityCatalogError::from)?;
808849

809850
let mut storage_options = options.0.clone();
810851
storage_options.extend(temp_creds);

0 commit comments

Comments
 (0)