Skip to content

Commit 645f015

Browse files
authored
feat: sign storage URLs (#31)
1 parent 9c6a522 commit 645f015

File tree

10 files changed

+235
-21
lines changed

10 files changed

+235
-21
lines changed

crates/datafusion/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@ unitycatalog-common = { workspace = true }
1717

1818
datafusion = { version = "49", features = ["avro"] }
1919
datafusion-session = { version = "49" }
20+
datafusion-macros = { version = "49" }
21+
datafusion-doc = { version = "49" }
2022

2123
arrow = "55"
2224
async-trait = "0.1"
2325
bytes = "1.6.0"
2426
chrono = "0.4.40"
2527
dashmap = "6.0.1"
2628
futures = "0.3"
29+
http = "1.2.0"
2730
itertools = "0.14"
2831
object_store = "0.12.3"
2932
ordered-float = { version = "5.0" }

crates/datafusion/src/commands/vacuum/logical.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ pub(crate) static VACUUM_RETURN_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
2828
pub(crate) static VACUUM_RETURN_SCHEMA_DF: LazyLock<DFSchemaRef> =
2929
LazyLock::new(|| DFSchemaRef::new(DFSchema::try_from(VACUUM_RETURN_SCHEMA.clone()).unwrap()));
3030

31-
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
31+
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash, Default)]
3232
pub enum VacuumMode {
33+
#[default]
3334
Full,
3435
Lite,
3536
}
@@ -43,12 +44,6 @@ impl fmt::Display for VacuumMode {
4344
}
4445
}
4546

46-
impl Default for VacuumMode {
47-
fn default() -> Self {
48-
VacuumMode::Full
49-
}
50-
}
51-
5247
#[derive(Debug, Clone, PartialEq, PartialOrd)]
5348
pub struct VacuumStatement {
5449
pub name: ObjectName,

crates/datafusion/src/commands/vacuum/physical.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ impl ExecutionPlan for VacuumExec {
114114
.column(0)
115115
.as_string::<i32>()
116116
.iter()
117-
.filter_map(|s| s.map(|p| Path::from(p)))
117+
.filter_map(|s| s.map(Path::from))
118118
.map(Ok);
119119
let delete_files = futures::stream::iter(path_iter).boxed();
120120
let results = s

crates/datafusion/src/execution/directory_listing.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -209,9 +209,9 @@ where
209209

210210
Poll::Ready(Some(Ok(batch)))
211211
}
212-
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e.into()))),
213-
Poll::Ready(None) => return Poll::Ready(None),
214-
Poll::Pending => return Poll::Pending,
212+
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
213+
Poll::Ready(None) => Poll::Ready(None),
214+
Poll::Pending => Poll::Pending,
215215
}
216216
}
217217

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
mod sign_url;
2+
3+
pub use sign_url::SignStorageUrl;
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
use std::any::Any;
2+
use std::sync::Arc;
3+
use std::time::Duration;
4+
5+
use arrow::array::{ArrayRef, AsArray, LargeStringArray, StringArray, StringViewArray};
6+
use arrow::datatypes::DataType;
7+
use datafusion::common::{Result, exec_err, not_impl_err, plan_datafusion_err, plan_err};
8+
use datafusion::config::ConfigOptions;
9+
use datafusion::execution::object_store::ObjectStoreUrl;
10+
use datafusion::logical_expr::{
11+
ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
12+
async_udf::AsyncScalarUDFImpl,
13+
};
14+
use datafusion_macros::user_doc;
15+
use datafusion_session::SessionStore;
16+
use http::Method;
17+
use itertools::Itertools;
18+
use object_store::ObjectStore;
19+
use object_store::aws::AmazonS3;
20+
use object_store::azure::MicrosoftAzure;
21+
use object_store::gcp::GoogleCloudStorage;
22+
use object_store::local::LocalFileSystem;
23+
use object_store::memory::InMemory;
24+
use object_store::path::Path;
25+
use object_store::signer::Signer;
26+
use url::Url;
27+
28+
/// SignStorageUrl is a scalar user-defined function that signs a URL.
29+
///
30+
/// The resulting URL can be used to access the signed resource (file, directory/key).
31+
#[user_doc(
32+
doc_section(label = "String Functions"),
33+
description = "Sign a URL",
34+
syntax_example = "sign_storage_url('https://example.com')"
35+
)]
36+
#[derive(Debug)]
37+
pub struct SignStorageUrl {
38+
signature: Signature,
39+
session_store: Arc<SessionStore>,
40+
}
41+
42+
impl SignStorageUrl {
43+
pub fn new(session_store: Arc<SessionStore>) -> Self {
44+
Self {
45+
session_store,
46+
signature: Signature::uniform(
47+
1,
48+
vec![DataType::Utf8, DataType::Utf8View, DataType::LargeUtf8],
49+
Volatility::Volatile,
50+
),
51+
}
52+
}
53+
}
54+
55+
/// Implement the ScalarUDFImpl trait for AddOne
56+
impl ScalarUDFImpl for SignStorageUrl {
57+
fn as_any(&self) -> &dyn Any {
58+
self
59+
}
60+
61+
fn name(&self) -> &str {
62+
"sign_storage_url"
63+
}
64+
65+
fn signature(&self) -> &Signature {
66+
&self.signature
67+
}
68+
69+
fn return_type(&self, args: &[DataType]) -> Result<DataType> {
70+
if !matches!(
71+
args.first(),
72+
Some(&DataType::Utf8) | Some(&DataType::LargeUtf8) | Some(&DataType::Utf8View)
73+
) {
74+
return plan_err!("sign_storage_url only accepts string-like arguments");
75+
}
76+
// safety: we just checked above that the argument is a Some(..) variant
77+
Ok(args.first().unwrap().clone())
78+
}
79+
80+
// The actual implementation would add one to the argument
81+
fn invoke_with_args(&self, _: ScalarFunctionArgs) -> Result<ColumnarValue> {
82+
not_impl_err!("SignStorageUrl can only be called from async contexts")
83+
}
84+
85+
fn documentation(&self) -> Option<&Documentation> {
86+
self.doc()
87+
}
88+
}
89+
90+
#[async_trait::async_trait]
91+
impl AsyncScalarUDFImpl for SignStorageUrl {
92+
async fn invoke_async_with_args(
93+
&self,
94+
args: ScalarFunctionArgs,
95+
_options: &ConfigOptions,
96+
) -> Result<ArrayRef> {
97+
let args = ColumnarValue::values_to_arrays(&args.args)?;
98+
99+
// we parse the url and split it into the base URL for the storage bucket
100+
// and the path within the bucket. We track the indices of the original values
101+
// in order to send only valid values to the storage provider and later
102+
// recinstruct the proper response.
103+
let parse_str = |(idx, value): (usize, Option<&str>)| {
104+
value.and_then(|v| {
105+
url::Url::parse(v).ok().and_then(|url| {
106+
Some((
107+
ObjectStoreUrl::parse(&url[..url::Position::BeforePath]).ok()?,
108+
(idx, Path::from_url_path(url.path()).ok()?),
109+
))
110+
})
111+
})
112+
};
113+
let urls: Vec<_> = if let Some(vals) = args[0].as_string_opt::<i32>() {
114+
vals.iter().enumerate().flat_map(parse_str).collect()
115+
} else if let Some(vals) = args[0].as_string_opt::<i64>() {
116+
vals.iter().enumerate().flat_map(parse_str).collect()
117+
} else if let Some(vals) = args[0].as_string_view_opt() {
118+
vals.iter().enumerate().flat_map(parse_str).collect()
119+
} else {
120+
return plan_err!("sign_storage_url only accepts string arguments");
121+
};
122+
123+
let registry = self
124+
.session_store
125+
.get_session()
126+
.upgrade()
127+
.ok_or_else(|| plan_datafusion_err!("session store is not available"))?
128+
.read()
129+
.runtime_env()
130+
.object_store_registry
131+
.clone();
132+
// TODO: allow passing the desired duration as method argument
133+
let expires_in = Duration::new(60 * 60, 0);
134+
135+
// we group all valid urls by their store and generate signed urls
136+
// for all urls under that store. The signers usually need to communicate
137+
// with the storage service only once to generate the signing key, after which
138+
// they can sign any number of urls without further communication.
139+
let store_map = urls.into_iter().into_group_map();
140+
let mut result_buffer = Vec::with_capacity(args[0].len());
141+
for (store_url, paths_and_idx) in store_map {
142+
let store = registry.get_store(store_url.as_ref())?;
143+
let (indices, paths): (Vec<_>, Vec<_>) = paths_and_idx.into_iter().unzip();
144+
let signed_urls =
145+
signed_urls(&store_url, store, &paths, Method::GET, expires_in).await?;
146+
result_buffer.extend(indices.into_iter().zip(signed_urls));
147+
}
148+
149+
// construct a result vector from the individual store results.
150+
let mut results = vec![None; args[0].len()];
151+
for (i, url) in result_buffer.into_iter() {
152+
results[i] = Some(url.to_string());
153+
}
154+
155+
// return the results as the same data type as the input array
156+
match args[0].data_type() {
157+
DataType::Utf8 => Ok(Arc::new(StringArray::from(results))),
158+
DataType::LargeUtf8 => Ok(Arc::new(LargeStringArray::from(results))),
159+
DataType::Utf8View => Ok(Arc::new(StringViewArray::from(results))),
160+
// safety: We limited the data types when we are reading the paths from the array.
161+
_ => unreachable!(),
162+
}
163+
}
164+
}
165+
166+
// auxiliary trait to cast `ObjectStore` to `Any`.
167+
trait DowncastableStore: ObjectStore + Any {
168+
fn as_any(&self) -> &dyn Any;
169+
}
170+
171+
impl<T: ObjectStore + Any> DowncastableStore for T {
172+
fn as_any(&self) -> &dyn Any {
173+
self
174+
}
175+
}
176+
177+
async fn signed_urls(
178+
store_url: &ObjectStoreUrl,
179+
store: Arc<dyn ObjectStore>,
180+
paths: &[Path],
181+
method: Method,
182+
expires_in: Duration,
183+
) -> Result<Vec<Url>> {
184+
if let Some(signer) = store.as_any().downcast_ref::<MicrosoftAzure>() {
185+
return Ok(signer.signed_urls(method, paths, expires_in).await?);
186+
}
187+
188+
if let Some(signer) = store.as_any().downcast_ref::<AmazonS3>() {
189+
return Ok(signer.signed_urls(method, paths, expires_in).await?);
190+
}
191+
192+
if let Some(signer) = store.as_any().downcast_ref::<GoogleCloudStorage>() {
193+
return Ok(signer.signed_urls(method, paths, expires_in).await?);
194+
}
195+
196+
if store.as_any().downcast_ref::<LocalFileSystem>().is_some() {
197+
return Ok(paths
198+
.iter()
199+
.map(|path| AsRef::<Url>::as_ref(store_url).join(path.as_ref()).unwrap())
200+
.collect());
201+
}
202+
203+
if store.as_any().downcast_ref::<InMemory>().is_some() {
204+
return Ok(paths
205+
.iter()
206+
.map(|path| AsRef::<Url>::as_ref(store_url).join(path.as_ref()).unwrap())
207+
.collect());
208+
}
209+
210+
exec_err!("not a signing store")
211+
}

crates/datafusion/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ pub mod config;
66
mod engine;
77
mod error;
88
mod execution;
9+
pub mod functions;
910
mod planner;
1011
mod schema_provider;
1112
mod session;

crates/datafusion/src/planner.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ use sqlparser::dialect::dialect_from_str;
1212
use tracing::debug;
1313

1414
use crate::{
15-
KernelSessionExt,
1615
commands::{VacuumPlanNode, plan_vacuum},
1716
sql::{ExecuteUnityCatalogPlanNode, HFParserBuilder, Statement, uc_statement_to_plan},
1817
unity::UnityCatalogRequestExec,
@@ -30,7 +29,7 @@ impl QueryPlanner for OpenLakehouseQueryPlanner {
3029
logical_plan: &LogicalPlan,
3130
session_state: &SessionState,
3231
) -> Result<Arc<dyn ExecutionPlan>> {
33-
// Teach the default physical planner how to plan TopK nodes.
32+
// Teach the default physical planner how to plan open lakehouse nodes.
3433
let physical_planner = DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(
3534
OpenLakehousePlanner {},
3635
)]);

crates/datafusion/src/session.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::sync::{Arc, OnceLock, Weak};
33
use datafusion::common::{DataFusionError, Result as DFResult, TableReference};
44
use datafusion::execution::TaskContext;
55
use datafusion::execution::object_store::ObjectStoreRegistry;
6+
use datafusion::logical_expr::async_udf::AsyncScalarUDF;
67
use datafusion::prelude::{DataFrame, SessionContext};
78
use datafusion_session::{Session, SessionStore};
89
use delta_kernel::engine::default::executor::tokio::{
@@ -17,6 +18,7 @@ use url::Url;
1718

1819
use crate::config::OpenLakehouseConfig;
1920
use crate::engine::DataFusionEngine;
21+
use crate::functions::SignStorageUrl;
2022
use crate::planner::{OpenLakehouseQueryPlanner, SessionStateExt};
2123
use crate::table_provider::{DeltaTableProvider, DeltaTableSnapshot, TableSnapshot};
2224
use crate::utils::AsObjectStoreUrl;
@@ -404,17 +406,22 @@ fn with_kernel_extension(
404406
let extension = KernelExtension {
405407
engine,
406408
object_store_factory,
407-
session_store,
409+
session_store: session_store.clone(),
408410
uc_client: Default::default(),
409411
};
410412
if let Some(uc_client) = uc_client {
411413
let _ = extension.uc_client.set(uc_client);
412414
}
413415
new_config.set_extension(Arc::new(extension));
416+
417+
let sign_storage_url = AsyncScalarUDF::new(Arc::new(SignStorageUrl::new(session_store)));
418+
let scalar_functions = vec![Arc::new(sign_storage_url.into_scalar_udf())];
419+
414420
let ctx: SessionContext = ctx
415421
.into_state_builder()
416422
.with_session_id(session_id)
417423
.with_config(new_config)
424+
.with_scalar_functions(scalar_functions)
418425
.with_query_planner(Arc::new(OpenLakehouseQueryPlanner {}))
419426
.build()
420427
.into();

crates/datafusion/src/sql/commands/vacuum.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ pub(crate) static VACUUM_RETURN_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
2828
pub(crate) static VACUUM_RETURN_SCHEMA_DF: LazyLock<DFSchemaRef> =
2929
LazyLock::new(|| DFSchemaRef::new(DFSchema::try_from(VACUUM_RETURN_SCHEMA.clone()).unwrap()));
3030

31-
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
31+
#[derive(Debug, Default, Clone, PartialEq, Eq, PartialOrd, Hash)]
3232
pub enum Mode {
33+
#[default]
3334
Full,
3435
Lite,
3536
}
@@ -43,12 +44,6 @@ impl fmt::Display for Mode {
4344
}
4445
}
4546

46-
impl Default for Mode {
47-
fn default() -> Self {
48-
Mode::Full
49-
}
50-
}
51-
5247
#[derive(Debug, Clone, PartialEq, PartialOrd)]
5348
pub struct VacuumStatement {
5449
pub name: ObjectName,

0 commit comments

Comments
 (0)