Skip to content

Commit 0bb67f4

Browse files
omkar-fossLiam Brannigan
authored andcommitted
feat(python): add capability to read unity catalog (uc://) uris
This adds capability to read directly from uc:// uris using the local catalog-unity crate. This also exposes the UC temporary credentials in storage_options of the `DeltaTable` instance so polars or similar readers can use it. Signed-off-by: Omkar P <[email protected]> Signed-off-by: Liam Brannigan <[email protected]>
1 parent cdf3d9f commit 0bb67f4

File tree

4 files changed

+146
-7
lines changed

4 files changed

+146
-7
lines changed

crates/catalog-unity/src/lib.rs

Lines changed: 140 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,12 @@ compile_error!(
77
for this crate to function properly."
88
);
99

10+
use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory};
1011
use reqwest::header::{HeaderValue, InvalidHeaderValue, AUTHORIZATION};
12+
use reqwest::Url;
13+
use std::collections::HashMap;
1114
use std::str::FromStr;
15+
use std::sync::Arc;
1216

1317
use crate::credential::{
1418
AzureCliCredential, ClientSecretOAuthProvider, CredentialProvider, WorkspaceOAuthProvider,
@@ -19,11 +23,13 @@ use crate::models::{
1923
};
2024

2125
use deltalake_core::data_catalog::DataCatalogResult;
22-
use deltalake_core::{DataCatalog, DataCatalogError};
26+
use deltalake_core::{DataCatalog, DataCatalogError, DeltaResult, DeltaTableBuilder, Path};
2327

2428
use crate::client::retry::*;
25-
use deltalake_core::storage::str_is_truthy;
26-
29+
use deltalake_core::storage::{
30+
factories, str_is_truthy, IORuntime, ObjectStoreFactory, ObjectStoreRef, RetryConfigParse,
31+
StorageOptions,
32+
};
2733
pub mod client;
2834
pub mod credential;
2935
#[cfg(feature = "datafusion")]
@@ -201,6 +207,11 @@ pub enum UnityCatalogConfigKey {
201207
/// - `azure_use_azure_cli`
202208
/// - `use_azure_cli`
203209
UseAzureCli,
210+
211+
/// Allow http url (e.g. http://localhost:8080/api/2.1/...)
212+
/// Supported keys:
213+
/// - `unity_allow_http_url`
214+
AllowHttpUrl,
204215
}
205216

206217
impl FromStr for UnityCatalogConfigKey {
@@ -246,6 +257,7 @@ impl FromStr for UnityCatalogConfigKey {
246257
| "unity_workspace_url"
247258
| "databricks_workspace_url"
248259
| "databricks_host" => Ok(UnityCatalogConfigKey::WorkspaceUrl),
260+
"allow_http_url" | "unity_allow_http_url" => Ok(UnityCatalogConfigKey::AllowHttpUrl),
249261
_ => Err(DataCatalogError::UnknownConfigKey {
250262
catalog: "unity",
251263
key: s.to_string(),
@@ -259,6 +271,7 @@ impl AsRef<str> for UnityCatalogConfigKey {
259271
fn as_ref(&self) -> &str {
260272
match self {
261273
UnityCatalogConfigKey::AccessToken => "unity_access_token",
274+
UnityCatalogConfigKey::AllowHttpUrl => "unity_allow_http_url",
262275
UnityCatalogConfigKey::AuthorityHost => "unity_authority_host",
263276
UnityCatalogConfigKey::AuthorityId => "unity_authority_id",
264277
UnityCatalogConfigKey::ClientId => "unity_client_id",
@@ -311,6 +324,9 @@ pub struct UnityCatalogBuilder {
311324
/// When set to true, azure cli has to be used for acquiring access token
312325
use_azure_cli: bool,
313326

327+
/// When set to true, http will be allowed in the catalog url
328+
allow_http_url: bool,
329+
314330
/// Retry config
315331
retry_config: RetryConfig,
316332

@@ -333,6 +349,9 @@ impl UnityCatalogBuilder {
333349
) -> DataCatalogResult<Self> {
334350
match UnityCatalogConfigKey::from_str(key.as_ref())? {
335351
UnityCatalogConfigKey::AccessToken => self.bearer_token = Some(value.into()),
352+
UnityCatalogConfigKey::AllowHttpUrl => {
353+
self.allow_http_url = str_is_truthy(&value.into())
354+
}
336355
UnityCatalogConfigKey::ClientId => self.client_id = Some(value.into()),
337356
UnityCatalogConfigKey::ClientSecret => self.client_secret = Some(value.into()),
338357
UnityCatalogConfigKey::AuthorityId => self.authority_id = Some(value.into()),
@@ -431,6 +450,45 @@ impl UnityCatalogBuilder {
431450
self
432451
}
433452

453+
/// Returns the storage location and temporary token to be used with the
454+
/// Unity Catalog table.
455+
pub async fn get_uc_location_and_token(
456+
table_uri: &str,
457+
) -> Result<(String, HashMap<String, String>), UnityCatalogError> {
458+
let uri_parts: Vec<&str> = table_uri[5..].split('.').collect();
459+
if uri_parts.len() != 3 {
460+
panic!("Invalid Unity Catalog URI: {}", table_uri);
461+
}
462+
463+
let catalog_id = uri_parts[0];
464+
let database_name = uri_parts[1];
465+
let table_name = uri_parts[2];
466+
467+
let unity_catalog = match UnityCatalogBuilder::from_env().build() {
468+
Ok(uc) => uc,
469+
Err(_e) => panic!("Unable to build Unity Catalog."),
470+
};
471+
let storage_location = match unity_catalog
472+
.get_table_storage_location(Some(catalog_id.to_string()), database_name, table_name)
473+
.await
474+
{
475+
Ok(s) => s,
476+
Err(_e) => panic!("Unable to find the table's storage location."),
477+
};
478+
let temp_creds_res = unity_catalog
479+
.get_temp_table_credentials(catalog_id, database_name, table_name)
480+
.await?;
481+
let credentials = match temp_creds_res {
482+
TableTempCredentialsResponse::Success(temp_creds) => {
483+
temp_creds.get_credentials().unwrap()
484+
}
485+
TableTempCredentialsResponse::Error(_error) => {
486+
panic!("Unable to get temporary credentials from Unity Catalog.")
487+
}
488+
};
489+
Ok((storage_location, credentials))
490+
}
491+
434492
fn get_credential_provider(&self) -> Option<CredentialProvider> {
435493
if let Some(token) = self.bearer_token.as_ref() {
436494
return Some(CredentialProvider::BearerToken(token.clone()));
@@ -488,7 +546,12 @@ impl UnityCatalogBuilder {
488546
.trim_end_matches('/')
489547
.to_string();
490548

491-
let client = self.client_options.client()?;
549+
let client_options = if self.allow_http_url {
550+
self.client_options.with_allow_http(true)
551+
} else {
552+
self.client_options
553+
};
554+
let client = client_options.client()?;
492555

493556
Ok(UnityCatalog {
494557
client,
@@ -649,7 +712,7 @@ impl UnityCatalog {
649712
self.catalog_url(),
650713
catalog_id.as_ref(),
651714
database_name.as_ref(),
652-
table_name.as_ref()
715+
table_name.as_ref(),
653716
))
654717
.header(AUTHORIZATION, token)
655718
.send()
@@ -692,6 +755,67 @@ impl UnityCatalog {
692755
}
693756
}
694757

758+
#[derive(Clone, Default, Debug)]
759+
pub struct UnityCatalogFactory {}
760+
761+
impl RetryConfigParse for UnityCatalogFactory {}
762+
763+
impl ObjectStoreFactory for UnityCatalogFactory {
764+
fn parse_url_opts(
765+
&self,
766+
table_uri: &Url,
767+
options: &StorageOptions,
768+
) -> DeltaResult<(ObjectStoreRef, Path)> {
769+
use futures::executor::block_on;
770+
771+
let result = block_on(UnityCatalogBuilder::get_uc_location_and_token(
772+
table_uri.as_str(),
773+
));
774+
775+
let (table_path, temp_creds) = match result {
776+
Ok(tup) => tup,
777+
Err(_err) => panic!("Unable to get UC location and token."),
778+
};
779+
780+
let mut storage_options = options.0.clone();
781+
782+
if !temp_creds.is_empty() {
783+
storage_options.extend(temp_creds);
784+
}
785+
786+
let mut builder =
787+
DeltaTableBuilder::from_uri(&table_path).with_io_runtime(IORuntime::default());
788+
if !storage_options.is_empty() {
789+
builder = builder.with_storage_options(storage_options.clone());
790+
}
791+
792+
let prefix = Path::parse(table_uri.path())?;
793+
let store = builder.build()?.object_store();
794+
795+
Ok((store, prefix))
796+
}
797+
}
798+
799+
impl LogStoreFactory for UnityCatalogFactory {
800+
fn with_options(
801+
&self,
802+
store: ObjectStoreRef,
803+
location: &Url,
804+
options: &StorageOptions,
805+
) -> DeltaResult<Arc<dyn LogStore>> {
806+
Ok(default_logstore(store, location, options))
807+
}
808+
}
809+
810+
/// Register an [ObjectStoreFactory] for common UnityCatalogFactory [Url] schemes
811+
pub fn register_handlers(_additional_prefixes: Option<Url>) {
812+
let factory = Arc::new(UnityCatalogFactory::default());
813+
let scheme = "uc";
814+
let url = Url::parse(&format!("{}://", scheme)).unwrap();
815+
factories().insert(url.clone(), factory.clone());
816+
logstores().insert(url.clone(), factory.clone());
817+
}
818+
695819
#[async_trait::async_trait]
696820
impl DataCatalog for UnityCatalog {
697821
type Error = UnityCatalogError;
@@ -731,6 +855,7 @@ mod tests {
731855
use crate::models::tests::{GET_SCHEMA_RESPONSE, GET_TABLE_RESPONSE, LIST_SCHEMAS_RESPONSE};
732856
use crate::models::*;
733857
use crate::UnityCatalogBuilder;
858+
use deltalake_core::DataCatalog;
734859
use httpmock::prelude::*;
735860

736861
#[tokio::test]
@@ -788,5 +913,15 @@ mod tests {
788913
get_table_response.unwrap(),
789914
GetTableResponse::Success(_)
790915
));
916+
917+
let storage_location = client
918+
.get_table_storage_location(
919+
Some("catalog_name".to_string()),
920+
"schema_name",
921+
"table_name",
922+
)
923+
.await
924+
.unwrap();
925+
assert!(storage_location.eq_ignore_ascii_case("string"));
791926
}
792927
}

crates/core/src/table/builder.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -361,8 +361,9 @@ fn resolve_uri_type(table_uri: impl AsRef<str>) -> DeltaResult<UriType> {
361361
Ok(UriType::LocalPath(PathBuf::from(table_uri)))
362362
} else {
363363
Err(DeltaTableError::InvalidTableLocation(format!(
364-
"Unknown scheme: {}",
365-
scheme
364+
"Unknown scheme: {}. Known schemes: {}",
365+
scheme,
366+
known_schemes.join(",")
366367
)))
367368
}
368369
} else {

crates/deltalake/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ pub use deltalake_core::*;
77
pub use deltalake_aws as aws;
88
#[cfg(feature = "azure")]
99
pub use deltalake_azure as azure;
10+
#[cfg(feature = "unity-experimental")]
11+
pub use deltalake_catalog_unity as unity_catalog;
1012
#[cfg(feature = "gcs")]
1113
pub use deltalake_gcp as gcp;
1214
#[cfg(feature = "hdfs")]

python/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2461,6 +2461,7 @@ fn _internal(m: &Bound<'_, PyModule>) -> PyResult<()> {
24612461
deltalake::hdfs::register_handlers(None);
24622462
deltalake_mount::register_handlers(None);
24632463
deltalake::lakefs::register_handlers(None);
2464+
deltalake::unity_catalog::register_handlers(None);
24642465

24652466
let py = m.py();
24662467
m.add("DeltaError", py.get_type_bound::<DeltaError>())?;

0 commit comments

Comments
 (0)