Skip to content

Commit fed92df

Browse files
authored
Shuffle datafusion provider (#1312)
Moves files around into a `memory` module
1 parent 2940ac9 commit fed92df

File tree

8 files changed

+180
-364
lines changed

8 files changed

+180
-364
lines changed

vortex-datafusion/src/datatype.rs

Lines changed: 0 additions & 204 deletions
This file was deleted.

vortex-datafusion/src/lib.rs

Lines changed: 5 additions & 157 deletions
Original file line numberDiff line numberDiff line change
@@ -2,37 +2,23 @@
22
33
#![allow(clippy::nonminimal_bool)]
44

5-
use std::any::Any;
6-
use std::fmt::{Debug, Formatter};
7-
use std::pin::Pin;
85
use std::sync::Arc;
9-
use std::task::{Context, Poll};
106

11-
use arrow_array::RecordBatch;
12-
use arrow_schema::{DataType, Schema, SchemaRef};
13-
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
7+
use arrow_schema::{DataType, Schema};
148
use datafusion::prelude::{DataFrame, SessionContext};
15-
use datafusion_common::{exec_datafusion_err, DataFusionError, Result as DFResult, Statistics};
9+
use datafusion_common::Result as DFResult;
1610
use datafusion_execution::object_store::ObjectStoreUrl;
1711
use datafusion_expr::{Expr, Operator};
18-
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
19-
use futures::Stream;
20-
use memory::{VortexMemTable, VortexMemTableOptions};
2112
use persistent::config::VortexTableOptions;
2213
use persistent::provider::VortexFileTableProvider;
23-
use vortex_array::array::ChunkedArray;
24-
use vortex_array::{Array, ArrayDType, IntoArrayVariant};
25-
use vortex_dtype::field::Field;
26-
use vortex_error::{vortex_err, VortexResult};
14+
use vortex_array::{Array, ArrayDType};
15+
use vortex_error::vortex_err;
2716

28-
use crate::statistics::chunked_array_df_stats;
17+
use crate::memory::{VortexMemTable, VortexMemTableOptions};
2918

3019
pub mod memory;
3120
pub mod persistent;
3221

33-
mod plans;
34-
mod statistics;
35-
3622
const SUPPORTED_BINARY_OPS: &[Operator] = &[
3723
Operator::Eq,
3824
Operator::NotEq,
@@ -172,141 +158,3 @@ fn can_be_pushed_down(expr: &Expr, schema: &Schema) -> bool {
172158
_ => false,
173159
}
174160
}
175-
176-
/// Physical plan node for scans against an in-memory, possibly chunked Vortex Array.
177-
#[derive(Clone)]
178-
struct VortexScanExec {
179-
array: ChunkedArray,
180-
scan_projection: Vec<usize>,
181-
plan_properties: PlanProperties,
182-
statistics: Statistics,
183-
}
184-
185-
impl VortexScanExec {
186-
pub fn try_new(
187-
array: ChunkedArray,
188-
scan_projection: Vec<usize>,
189-
plan_properties: PlanProperties,
190-
) -> VortexResult<Self> {
191-
let statistics = chunked_array_df_stats(&array, &scan_projection)?;
192-
Ok(Self {
193-
array,
194-
scan_projection,
195-
plan_properties,
196-
statistics,
197-
})
198-
}
199-
}
200-
201-
impl Debug for VortexScanExec {
202-
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
203-
f.debug_struct("VortexScanExec")
204-
.field("array_length", &self.array.len())
205-
.field("array_dtype", &self.array.dtype())
206-
.field("scan_projection", &self.scan_projection)
207-
.field("plan_properties", &self.plan_properties)
208-
.finish_non_exhaustive()
209-
}
210-
}
211-
212-
impl DisplayAs for VortexScanExec {
213-
fn fmt_as(&self, _display_type: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
214-
Debug::fmt(self, f)
215-
}
216-
}
217-
218-
pub(crate) struct VortexRecordBatchStream {
219-
schema_ref: SchemaRef,
220-
221-
idx: usize,
222-
num_chunks: usize,
223-
chunks: ChunkedArray,
224-
225-
projection: Vec<Field>,
226-
}
227-
228-
impl Stream for VortexRecordBatchStream {
229-
type Item = DFResult<RecordBatch>;
230-
231-
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
232-
if self.idx >= self.num_chunks {
233-
return Poll::Ready(None);
234-
}
235-
236-
// Grab next chunk, project and convert to Arrow.
237-
let chunk = self.chunks.chunk(self.idx)?;
238-
self.idx += 1;
239-
240-
let struct_array = chunk
241-
.into_struct()
242-
.map_err(|vortex_error| DataFusionError::Execution(format!("{}", vortex_error)))?;
243-
244-
let projected_struct = struct_array
245-
.project(&self.projection)
246-
.map_err(|vortex_err| {
247-
exec_datafusion_err!("projection pushdown to Vortex failed: {vortex_err}")
248-
})?;
249-
250-
Poll::Ready(Some(Ok(projected_struct.try_into()?)))
251-
}
252-
253-
fn size_hint(&self) -> (usize, Option<usize>) {
254-
(self.num_chunks, Some(self.num_chunks))
255-
}
256-
}
257-
258-
impl RecordBatchStream for VortexRecordBatchStream {
259-
fn schema(&self) -> SchemaRef {
260-
Arc::clone(&self.schema_ref)
261-
}
262-
}
263-
264-
impl ExecutionPlan for VortexScanExec {
265-
fn name(&self) -> &str {
266-
VortexScanExec::static_name()
267-
}
268-
269-
fn as_any(&self) -> &dyn Any {
270-
self
271-
}
272-
273-
fn properties(&self) -> &PlanProperties {
274-
&self.plan_properties
275-
}
276-
277-
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
278-
// Leaf node
279-
vec![]
280-
}
281-
282-
fn with_new_children(
283-
self: Arc<Self>,
284-
_: Vec<Arc<dyn ExecutionPlan>>,
285-
) -> DFResult<Arc<dyn ExecutionPlan>> {
286-
Ok(self)
287-
}
288-
289-
fn execute(
290-
&self,
291-
_partition: usize,
292-
_context: Arc<TaskContext>,
293-
) -> DFResult<SendableRecordBatchStream> {
294-
// Send back a stream of RecordBatch that returns the next element of the chunk each time.
295-
Ok(Box::pin(VortexRecordBatchStream {
296-
schema_ref: self.schema().clone(),
297-
idx: 0,
298-
num_chunks: self.array.nchunks(),
299-
chunks: self.array.clone(),
300-
projection: self
301-
.scan_projection
302-
.iter()
303-
.copied()
304-
.map(Field::from)
305-
.collect(),
306-
}))
307-
}
308-
309-
fn statistics(&self) -> DFResult<Statistics> {
310-
Ok(self.statistics.clone())
311-
}
312-
}

0 commit comments

Comments
 (0)