Skip to content

Commit 904cb50

Browse files
authored
feat: e2e flow for unity requests via SQL (#14)
* feat: setup simple configuration * feat: scaffold query planning
1 parent 8757b39 commit 904cb50

File tree

13 files changed

+1180
-25
lines changed

13 files changed

+1180
-25
lines changed

Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,9 @@ delta_kernel = { version = "0.14.0", features = [
1818
"arrow",
1919
"internal-api",
2020
] }
21+
# unitycatalog-common = { path = "../unitycatalog-rs/crates/common", default-features = false, features = [
22+
# "rest-client",
23+
# ] }
24+
unitycatalog-common = { git = "https://github.com/unitycatalog-incubator/unitycatalog-rs", rev = "b840bd195c2c1c3ec5c692e245acef794f677190", default-features = false, features = [
25+
"rest-client",
26+
] }

crates/datafusion/Cargo.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,25 @@ version.workspace = true
1111

1212
[dependencies]
1313
delta_kernel = { workspace = true }
14+
unitycatalog-common = { workspace = true }
1415

1516
datafusion = { version = "49", features = ["avro"] }
16-
datafusion-functions = { version = "49" }
1717
datafusion-session = { version = "49" }
1818

19+
arrow = "55"
1920
async-trait = "0.1"
2021
bytes = "1.6.0"
2122
chrono = "0.4.40"
2223
dashmap = "6.0.1"
2324
futures = "0.3"
2425
itertools = "0.14"
2526
parking_lot = "0.12"
27+
serde = "1.0"
28+
serde_json = "1.0"
29+
sqlparser = { version = "0.55.0", default-features = false, features = [
30+
"std",
31+
"visitor",
32+
] }
2633
tokio = { version = "1", features = ["full"] }
2734
tracing = "0.1"
2835
url = "2.5.4"

crates/datafusion/examples/simple_read.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use datafusion::arrow::util::pretty::print_batches;
44
use datafusion::catalog::CatalogProvider;
55
use datafusion::catalog::MemoryCatalogProvider;
66
use datafusion::execution::context::SessionContext;
7+
use deltalake_datafusion::config::OpenLakehouseConfig;
78
use deltalake_datafusion::{DeltaLakeSchemaProvider, KernelContextExt as _};
89
use url::Url;
910

@@ -14,7 +15,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
1415
let base = Url::from_directory_path(std::fs::canonicalize(PATH)?).unwrap();
1516
let table_url = base.join("column_mapping/delta")?;
1617

17-
let ctx = SessionContext::new().enable_delta_kernel(None);
18+
let config = OpenLakehouseConfig::session_config().set_str(
19+
"lakehouse.unity.uri",
20+
"http://localhost:8080/api/2.1/unity-catalog/",
21+
);
22+
23+
let ctx = SessionContext::new_with_config(config).enable_delta_kernel(None);
1824

1925
// register and read a single table
2026
// TODO: right now the table provider does not internally refresh the snapshot
@@ -35,5 +41,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
3541
let df = df.collect().await?;
3642
print_batches(&df)?;
3743

44+
create_catalog(&ctx).await?;
45+
46+
Ok(())
47+
}
48+
49+
async fn create_catalog(ctx: &SessionContext) -> Result<(), Box<dyn std::error::Error>> {
50+
let df = ctx.sql_delta("CREATE CATALOG hello_from_df").await?;
51+
let df = df.collect().await?;
52+
print_batches(&df)?;
53+
54+
let df = ctx.sql_delta("DROP CATALOG hello_from_df").await?;
55+
let df = df.collect().await?;
56+
print_batches(&df)?;
57+
3858
Ok(())
3959
}

crates/datafusion/src/config.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
use datafusion::common::extensions_options;
2+
use datafusion::config::ConfigExtension;
3+
use datafusion::prelude::SessionConfig;
4+
5+
extensions_options! {
6+
/// Configuration for integration with Unity Catalog
7+
pub struct UnityCatalogConfig {
8+
/// Server Url for Unity Catalog
9+
pub uri: Option<String>, default = None
10+
/// Access Token (PAT) for Unity Catalog server
11+
pub token: Option<String>, default = None
12+
}
13+
}
14+
15+
impl ConfigExtension for UnityCatalogConfig {
16+
const PREFIX: &'static str = "unity";
17+
}
18+
19+
extensions_options! {
20+
pub struct DeltaLakeConfig {
21+
pub enable_caching: bool, default = false
22+
}
23+
}
24+
25+
impl ConfigExtension for DeltaLakeConfig {
26+
const PREFIX: &'static str = "delta";
27+
}
28+
29+
extensions_options! {
30+
pub struct OpenLakehouseConfig {
31+
/// Configuration to connect to unity catalog server
32+
pub unity: UnityCatalogConfig, default = UnityCatalogConfig::default()
33+
/// Configuration for interacting with delta tables
34+
pub delta: DeltaLakeConfig, default = DeltaLakeConfig::default()
35+
}
36+
}
37+
38+
impl ConfigExtension for OpenLakehouseConfig {
39+
const PREFIX: &'static str = "lakehouse";
40+
}
41+
42+
impl OpenLakehouseConfig {
43+
pub fn session_config() -> SessionConfig {
44+
SessionConfig::new().with_option_extension(OpenLakehouseConfig::default())
45+
}
46+
}
47+
48+
#[cfg(test)]
49+
mod tests {
50+
use datafusion::config::{ConfigOptions, Extensions};
51+
52+
use super::*;
53+
54+
#[test]
55+
fn test_delta_lake_config() {
56+
let mut extensions = Extensions::new();
57+
extensions.insert(OpenLakehouseConfig::default());
58+
59+
let mut config = ConfigOptions::new().with_extensions(extensions);
60+
config
61+
.set("lakehouse.unity.uri", "http://example.com")
62+
.unwrap();
63+
config.set("lakehouse.unity.token", "token").unwrap();
64+
65+
let lakehouse_config = config.extensions.get::<OpenLakehouseConfig>().unwrap();
66+
67+
assert_eq!(
68+
lakehouse_config.unity.uri,
69+
Some("http://example.com".to_string())
70+
);
71+
assert_eq!(lakehouse_config.unity.token, Some("token".to_string()));
72+
}
73+
}

crates/datafusion/src/engine/expressions/to_datafusion.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use datafusion::common::scalar::ScalarStructBuilder;
22
use datafusion::common::{DataFusionError, Result as DFResult, ScalarValue, not_impl_err};
3+
use datafusion::functions::core::expr_ext::FieldAccessor;
4+
use datafusion::functions::expr_fn::named_struct;
35
use datafusion::logical_expr::{Expr, col, lit};
4-
use datafusion_functions::core::expr_ext::FieldAccessor;
5-
use datafusion_functions::expr_fn::named_struct;
66
use delta_kernel::Predicate;
77
use delta_kernel::arrow::datatypes::{DataType as ArrowDataType, Field as ArrowField};
88
use delta_kernel::engine::arrow_conversion::TryIntoArrow;

crates/datafusion/src/exec.rs

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
use std::{any::Any, fmt, sync::Arc};
2+
3+
use arrow::array::RecordBatch;
4+
use datafusion::{
5+
common::{DFSchemaRef, Statistics, internal_err},
6+
error::Result,
7+
execution::{SendableRecordBatchStream, TaskContext},
8+
physical_expr::EquivalenceProperties,
9+
physical_plan::{
10+
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PlanProperties,
11+
execution_plan::{Boundedness, EmissionType},
12+
stream::RecordBatchStreamAdapter,
13+
},
14+
};
15+
use unitycatalog_common::client::UnityCatalogClient;
16+
17+
use crate::KernelTaskContextExt;
18+
19+
#[async_trait::async_trait]
20+
pub trait ExecutableUnityCatalogStement: std::fmt::Debug + Send + Sync + 'static {
21+
fn name(&self) -> &str;
22+
async fn execute(&self, client: UnityCatalogClient) -> Result<RecordBatch>;
23+
fn return_schema(&self) -> &DFSchemaRef;
24+
}
25+
26+
pub struct UnityCatalogRequestExec {
27+
request: Arc<dyn ExecutableUnityCatalogStement>,
28+
cache: PlanProperties,
29+
}
30+
31+
impl UnityCatalogRequestExec {
32+
pub fn new(request: Arc<dyn ExecutableUnityCatalogStement>) -> Self {
33+
Self {
34+
cache: PlanProperties::new(
35+
EquivalenceProperties::new(Arc::new(request.return_schema().as_arrow().clone())),
36+
Partitioning::UnknownPartitioning(1),
37+
EmissionType::Incremental,
38+
Boundedness::Bounded,
39+
),
40+
request,
41+
}
42+
}
43+
}
44+
45+
impl std::fmt::Debug for UnityCatalogRequestExec {
46+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
47+
f.debug_struct("UnityCatalogRequestExec")
48+
.field("request", &self.request)
49+
.finish()
50+
}
51+
}
52+
53+
impl DisplayAs for UnityCatalogRequestExec {
54+
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
55+
match t {
56+
DisplayFormatType::Default | DisplayFormatType::Verbose => {
57+
write!(f, "ExecuteUCStatement: statement={:?}", self.request)
58+
}
59+
DisplayFormatType::TreeRender => {
60+
// TODO: collect info
61+
write!(f, "")
62+
}
63+
}
64+
}
65+
}
66+
67+
#[async_trait::async_trait]
68+
impl ExecutionPlan for UnityCatalogRequestExec {
69+
fn name(&self) -> &'static str {
70+
Self::static_name()
71+
}
72+
73+
/// Return a reference to Any that can be used for downcasting
74+
fn as_any(&self) -> &dyn Any {
75+
self
76+
}
77+
78+
fn properties(&self) -> &PlanProperties {
79+
&self.cache
80+
}
81+
82+
fn required_input_distribution(&self) -> Vec<Distribution> {
83+
vec![Distribution::SinglePartition]
84+
}
85+
86+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
87+
vec![]
88+
}
89+
90+
fn with_new_children(
91+
self: Arc<Self>,
92+
_: Vec<Arc<dyn ExecutionPlan>>,
93+
) -> Result<Arc<dyn ExecutionPlan>> {
94+
Ok(self)
95+
}
96+
97+
/// Execute one partition and return an iterator over RecordBatch
98+
fn execute(
99+
&self,
100+
partition: usize,
101+
context: Arc<TaskContext>,
102+
) -> Result<SendableRecordBatchStream> {
103+
if 0 != partition {
104+
return internal_err!("CreateCatalogExec invalid partition {partition}");
105+
}
106+
107+
let uc_client = context.kernel_ext()?.unity_catalog_client()?;
108+
let request = self.request.clone();
109+
110+
Ok(Box::pin(RecordBatchStreamAdapter::new(
111+
self.schema(),
112+
Box::pin(futures::stream::once(async move {
113+
request.execute(uc_client).await
114+
})),
115+
)))
116+
}
117+
118+
fn statistics(&self) -> Result<Statistics> {
119+
Ok(Statistics::new_unknown(&self.schema()))
120+
}
121+
}

crates/datafusion/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
// Datafusion error will trigger this warning.
22
#![allow(clippy::result_large_err)]
33

4+
pub mod config;
45
mod engine;
56
mod error;
7+
mod exec;
68
mod log_table_provider;
9+
mod planner;
710
mod schema_provider;
811
mod session;
12+
pub mod sql;
913
mod table_provider;
1014
mod utils;
1115

0 commit comments

Comments
 (0)