Skip to content

Commit 0d06b5b

Browse files
authored
Merge branch 'main' into abhi/new-benchmarks
2 parents 874c1b9 + ad91db7 commit 0d06b5b

File tree

37 files changed

+971
-473
lines changed

37 files changed

+971
-473
lines changed

Cargo.toml

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -34,24 +34,24 @@ delta_kernel = { version = "0.16.0", features = [
3434

3535

3636
# arrow
37-
arrow = { version = "56.0.0" }
38-
arrow-arith = { version = "56.0.0" }
39-
arrow-array = { version = "56.0.0", features = ["chrono-tz"] }
40-
arrow-buffer = { version = "56.0.0" }
41-
arrow-cast = { version = "56.0.0" }
42-
arrow-ipc = { version = "56.0.0" }
43-
arrow-json = { version = "56.0.0" }
44-
arrow-ord = { version = "56.0.0" }
45-
arrow-row = { version = "56.0.0" }
46-
arrow-schema = { version = "56.0.0" }
47-
arrow-select = { version = "56.0.0" }
37+
arrow = { version = "56.2" }
38+
arrow-arith = { version = "56.2" }
39+
arrow-array = { version = "56.2", features = ["chrono-tz"] }
40+
arrow-buffer = { version = "56.2" }
41+
arrow-cast = { version = "56.2" }
42+
arrow-ipc = { version = "56.2" }
43+
arrow-json = { version = "56.2" }
44+
arrow-ord = { version = "56.2" }
45+
arrow-row = { version = "56.2" }
46+
arrow-schema = { version = "56.2" }
47+
arrow-select = { version = "56.2" }
4848
object_store = { version = "0.12.1" }
49-
parquet = { version = "56.0.0" }
49+
parquet = { version = "56.2" }
5050

5151
# datafusion
52-
datafusion = "50.0.0"
53-
datafusion-ffi = "50.0.0"
54-
datafusion-proto = "50.0.0"
52+
datafusion = "50.2"
53+
datafusion-ffi = "50.2"
54+
datafusion-proto = "50.2"
5555

5656
# serde
5757
serde = { version = "1.0.194", features = ["derive"] }

crates/aws/tests/repair_s3_rename_test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use deltalake_core::logstore::object_store::{
99
use deltalake_core::{DeltaTableBuilder, ObjectStore, Path};
1010
use deltalake_test::utils::IntegrationContext;
1111
use futures::stream::BoxStream;
12-
use object_store::{MultipartUpload, PutMultipartOpts, PutPayload};
12+
use object_store::{MultipartUpload, PutMultipartOptions, PutPayload};
1313
use serial_test::serial;
1414
use std::ops::Range;
1515
use std::sync::{Arc, Mutex};
@@ -237,7 +237,7 @@ impl ObjectStore for DelayedObjectStore {
237237
async fn put_multipart_opts(
238238
&self,
239239
location: &Path,
240-
options: PutMultipartOpts,
240+
options: PutMultipartOptions,
241241
) -> ObjectStoreResult<Box<dyn MultipartUpload>> {
242242
self.inner.put_multipart_opts(location, options).await
243243
}

crates/catalog-unity/src/datafusion.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use datafusion::catalog::SchemaProvider;
66
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
77
use datafusion::common::DataFusionError;
88
use datafusion::datasource::TableProvider;
9-
use futures::FutureExt;
109
use moka::future::Cache;
1110
use moka::Expiry;
1211
use std::any::Any;
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
use std::sync::Arc;
2+
3+
use dashmap::{mapref::one::Ref, DashMap};
4+
use datafusion::execution::{
5+
object_store::{ObjectStoreRegistry, ObjectStoreUrl},
6+
TaskContext,
7+
};
8+
use delta_kernel::engine::parse_json as arrow_parse_json;
9+
use delta_kernel::{
10+
engine::default::{
11+
executor::tokio::{TokioBackgroundExecutor, TokioMultiThreadExecutor},
12+
json::DefaultJsonHandler,
13+
parquet::DefaultParquetHandler,
14+
},
15+
error::DeltaResult as KernelResult,
16+
schema::SchemaRef,
17+
EngineData, FileDataReadResultIterator, FileMeta, JsonHandler, ParquetHandler, PredicateRef,
18+
};
19+
use itertools::Itertools;
20+
use tokio::runtime::{Handle, RuntimeFlavor};
21+
22+
use super::storage::{group_by_store, AsObjectStoreUrl};
23+
24+
#[derive(Clone)]
25+
pub struct DataFusionFileFormatHandler {
26+
ctx: Arc<TaskContext>,
27+
pq_registry: Arc<DashMap<ObjectStoreUrl, Arc<dyn ParquetHandler>>>,
28+
json_registry: Arc<DashMap<ObjectStoreUrl, Arc<dyn JsonHandler>>>,
29+
handle: Handle,
30+
}
31+
32+
impl DataFusionFileFormatHandler {
33+
/// Create a new [`DatafusionParquetHandler`] instance.
34+
pub fn new(ctx: Arc<TaskContext>, handle: Handle) -> Self {
35+
Self {
36+
ctx,
37+
pq_registry: DashMap::new().into(),
38+
json_registry: DashMap::new().into(),
39+
handle,
40+
}
41+
}
42+
43+
fn registry(&self) -> Arc<dyn ObjectStoreRegistry> {
44+
self.ctx.runtime_env().object_store_registry.clone()
45+
}
46+
47+
fn get_or_create_pq(
48+
&self,
49+
url: ObjectStoreUrl,
50+
) -> KernelResult<Ref<'_, ObjectStoreUrl, Arc<dyn ParquetHandler>>> {
51+
if let Some(handler) = self.pq_registry.get(&url) {
52+
return Ok(handler);
53+
}
54+
let store = self
55+
.registry()
56+
.get_store(url.as_ref())
57+
.map_err(delta_kernel::Error::generic_err)?;
58+
59+
let handler: Arc<dyn ParquetHandler> = match self.handle.runtime_flavor() {
60+
RuntimeFlavor::MultiThread => Arc::new(DefaultParquetHandler::new(
61+
store,
62+
Arc::new(TokioMultiThreadExecutor::new(self.handle.clone())),
63+
)),
64+
RuntimeFlavor::CurrentThread => Arc::new(DefaultParquetHandler::new(
65+
store,
66+
Arc::new(TokioBackgroundExecutor::new()),
67+
)),
68+
_ => panic!("unsupported runtime flavor"),
69+
};
70+
71+
self.pq_registry.insert(url.clone(), handler);
72+
Ok(self.pq_registry.get(&url).unwrap())
73+
}
74+
75+
fn get_or_create_json(
76+
&self,
77+
url: ObjectStoreUrl,
78+
) -> KernelResult<Ref<'_, ObjectStoreUrl, Arc<dyn JsonHandler>>> {
79+
if let Some(handler) = self.json_registry.get(&url) {
80+
return Ok(handler);
81+
}
82+
let store = self
83+
.registry()
84+
.get_store(url.as_ref())
85+
.map_err(delta_kernel::Error::generic_err)?;
86+
87+
let handler: Arc<dyn JsonHandler> = match self.handle.runtime_flavor() {
88+
RuntimeFlavor::MultiThread => Arc::new(DefaultJsonHandler::new(
89+
store,
90+
Arc::new(TokioMultiThreadExecutor::new(self.handle.clone())),
91+
)),
92+
RuntimeFlavor::CurrentThread => Arc::new(DefaultJsonHandler::new(
93+
store,
94+
Arc::new(TokioBackgroundExecutor::new()),
95+
)),
96+
_ => panic!("unsupported runtime flavor"),
97+
};
98+
99+
self.json_registry.insert(url.clone(), handler);
100+
Ok(self.json_registry.get(&url).unwrap())
101+
}
102+
}
103+
104+
impl ParquetHandler for DataFusionFileFormatHandler {
105+
fn read_parquet_files(
106+
&self,
107+
files: &[FileMeta],
108+
physical_schema: SchemaRef,
109+
predicate: Option<PredicateRef>,
110+
) -> KernelResult<FileDataReadResultIterator> {
111+
let grouped_files = group_by_store(files.to_vec());
112+
Ok(Box::new(
113+
grouped_files
114+
.into_iter()
115+
.map(|(url, files)| {
116+
self.get_or_create_pq(url)?.read_parquet_files(
117+
&files.to_vec(),
118+
physical_schema.clone(),
119+
predicate.clone(),
120+
)
121+
})
122+
// TODO: this should not do any blocking operations, since this should
123+
// happen when the iterators are polled and we are just creating a vec of iterators.
124+
// Is this correct?
125+
.try_collect::<_, Vec<_>, _>()?
126+
.into_iter()
127+
.flatten(),
128+
))
129+
}
130+
}
131+
132+
impl JsonHandler for DataFusionFileFormatHandler {
133+
fn parse_json(
134+
&self,
135+
json_strings: Box<dyn EngineData>,
136+
output_schema: SchemaRef,
137+
) -> KernelResult<Box<dyn EngineData>> {
138+
arrow_parse_json(json_strings, output_schema)
139+
}
140+
141+
fn read_json_files(
142+
&self,
143+
files: &[FileMeta],
144+
physical_schema: SchemaRef,
145+
predicate: Option<PredicateRef>,
146+
) -> KernelResult<FileDataReadResultIterator> {
147+
let grouped_files = group_by_store(files.to_vec());
148+
Ok(Box::new(
149+
grouped_files
150+
.into_iter()
151+
.map(|(url, files)| {
152+
self.get_or_create_json(url)?.read_json_files(
153+
&files.to_vec(),
154+
physical_schema.clone(),
155+
predicate.clone(),
156+
)
157+
})
158+
// TODO: this should not do any blocking operations, since this should
159+
// happen when the iterators are polled and we are just creating a vec of iterators.
160+
// Is this correct?
161+
.try_collect::<_, Vec<_>, _>()?
162+
.into_iter()
163+
.flatten(),
164+
))
165+
}
166+
167+
fn write_json_file(
168+
&self,
169+
path: &url::Url,
170+
data: Box<dyn Iterator<Item = KernelResult<Box<dyn EngineData>>> + Send + '_>,
171+
overwrite: bool,
172+
) -> KernelResult<()> {
173+
self.get_or_create_json(path.as_object_store_url())?
174+
.write_json_file(path, data, overwrite)
175+
}
176+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
use std::sync::Arc;
2+
3+
use datafusion::catalog::Session;
4+
use datafusion::execution::TaskContext;
5+
use delta_kernel::{Engine, EvaluationHandler, JsonHandler, ParquetHandler, StorageHandler};
6+
use tokio::runtime::Handle;
7+
8+
use self::file_formats::DataFusionFileFormatHandler;
9+
use self::storage::DataFusionStorageHandler;
10+
use crate::kernel::ARROW_HANDLER;
11+
12+
mod file_formats;
13+
mod storage;
14+
15+
/// A Datafusion based Kernel Engine
16+
#[derive(Clone)]
17+
pub struct DataFusionEngine {
18+
storage: Arc<DataFusionStorageHandler>,
19+
formats: Arc<DataFusionFileFormatHandler>,
20+
}
21+
22+
impl DataFusionEngine {
23+
pub fn new_from_session(session: &dyn Session) -> Arc<Self> {
24+
Self::new(session.task_ctx(), Handle::current()).into()
25+
}
26+
27+
pub fn new(ctx: Arc<TaskContext>, handle: Handle) -> Self {
28+
let storage = Arc::new(DataFusionStorageHandler::new(ctx.clone(), handle.clone()));
29+
let formats = Arc::new(DataFusionFileFormatHandler::new(ctx, handle));
30+
Self { storage, formats }
31+
}
32+
}
33+
34+
impl Engine for DataFusionEngine {
35+
fn evaluation_handler(&self) -> Arc<dyn EvaluationHandler> {
36+
ARROW_HANDLER.clone()
37+
}
38+
39+
fn storage_handler(&self) -> Arc<dyn StorageHandler> {
40+
self.storage.clone()
41+
}
42+
43+
fn json_handler(&self) -> Arc<dyn JsonHandler> {
44+
self.formats.clone()
45+
}
46+
47+
fn parquet_handler(&self) -> Arc<dyn ParquetHandler> {
48+
self.formats.clone()
49+
}
50+
}

0 commit comments

Comments
 (0)