Skip to content

Commit 1d36132

Browse files
authored
Docs for DataFusion crate (#2064)
And remove unused option to disable pushdown Part of #1905
1 parent ebcfa20 commit 1d36132

File tree

9 files changed

+36
-176
lines changed

9 files changed

+36
-176
lines changed

bench-vortex/benches/datafusion.rs

Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use vortex::sampling_compressor::compressors::r#for::FoRCompressor;
2323
use vortex::sampling_compressor::compressors::CompressorRef;
2424
use vortex::sampling_compressor::SamplingCompressor;
2525
use vortex::{ArrayData, Context};
26-
use vortex_datafusion::memory::{VortexMemTable, VortexMemTableOptions};
26+
use vortex_datafusion::memory::VortexMemTable;
2727

2828
pub static CTX: LazyLock<Context> = LazyLock::new(|| {
2929
Context::default().with_encodings([
@@ -157,51 +157,29 @@ fn bench_arrow<M: Measurement>(mut group: BenchmarkGroup<M>, session: &SessionCo
157157
fn bench_vortex<M: Measurement>(
158158
mut group: BenchmarkGroup<M>,
159159
session: &SessionContext,
160-
enable_pushdown: bool,
161160
compress: bool,
162161
) {
163162
let vortex_dataset = toy_dataset_vortex(compress);
164-
let vortex_table = Arc::new(VortexMemTable::new(
165-
vortex_dataset,
166-
VortexMemTableOptions::default().with_pushdown(enable_pushdown),
167-
));
163+
let vortex_table = Arc::new(VortexMemTable::new(vortex_dataset));
168164

169165
measure_provider(&mut group, session, vortex_table);
170166
}
171167

172168
fn bench_datafusion(c: &mut Criterion) {
173169
bench_arrow(c.benchmark_group("arrow"), &SessionContext::new());
174170

175-
// compress=true, pushdown enabled
171+
// compress=true
176172
bench_vortex(
177-
c.benchmark_group("vortex-pushdown-compressed"),
173+
c.benchmark_group("vortex-compressed"),
178174
&SessionContext::new(),
179175
true,
180-
true,
181-
);
182-
183-
// compress=false, pushdown enabled
184-
bench_vortex(
185-
c.benchmark_group("vortex-pushdown-uncompressed"),
186-
&SessionContext::new(),
187-
true,
188-
false,
189176
);
190177

191-
// compress=true, pushdown disabled
178+
// compress=false
192179
bench_vortex(
193-
c.benchmark_group("vortex-nopushdown-compressed"),
180+
c.benchmark_group("vortex-uncompressed"),
194181
&SessionContext::new(),
195182
false,
196-
true,
197-
);
198-
199-
// compress=false, pushdown disabled
200-
bench_vortex(
201-
c.benchmark_group("vortex-nopushdown-uncompressed"),
202-
&SessionContext::new(),
203-
false,
204-
false,
205183
);
206184
}
207185

bench-vortex/benches/tpch.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,7 @@ fn benchmark(c: &mut Criterion) {
1515
let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap();
1616

1717
let vortex_ctx = runtime
18-
.block_on(load_datasets(
19-
&data_dir,
20-
Format::InMemoryVortex {
21-
enable_pushdown: true,
22-
},
23-
false,
24-
))
18+
.block_on(load_datasets(&data_dir, Format::InMemoryVortex, false))
2519
.unwrap();
2620
let arrow_ctx = runtime
2721
.block_on(load_datasets(&data_dir, Format::Arrow, false))
@@ -50,9 +44,7 @@ fn benchmark(c: &mut Criterion) {
5044
&vortex_ctx,
5145
&sql_queries,
5246
q,
53-
Format::InMemoryVortex {
54-
enable_pushdown: true,
55-
},
47+
Format::InMemoryVortex,
5648
)
5749
.await;
5850
assert_eq!(expected_row_count, row_count, "Mismatched row count {row_count} instead of {expected_row_count} in query {q} for in memory pushdown format");

bench-vortex/src/lib.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ pub enum Format {
6161
Csv,
6262
Arrow,
6363
Parquet,
64-
InMemoryVortex { enable_pushdown: bool },
64+
InMemoryVortex,
6565
OnDiskVortex { enable_compression: bool },
6666
}
6767

@@ -71,8 +71,8 @@ impl std::fmt::Display for Format {
7171
Format::Csv => write!(f, "csv"),
7272
Format::Arrow => write!(f, "arrow"),
7373
Format::Parquet => write!(f, "parquet"),
74-
Format::InMemoryVortex { enable_pushdown } => {
75-
write!(f, "in_memory_vortex(pushdown={enable_pushdown})")
74+
Format::InMemoryVortex => {
75+
write!(f, "in_memory_vortex")
7676
}
7777
Format::OnDiskVortex { enable_compression } => {
7878
write!(f, "on_disk_vortex(compressed={enable_compression})")
@@ -87,12 +87,7 @@ impl Format {
8787
Format::Csv => "csv".to_string(),
8888
Format::Arrow => "arrow".to_string(),
8989
Format::Parquet => "parquet".to_string(),
90-
Format::InMemoryVortex { enable_pushdown } => if *enable_pushdown {
91-
"vortex-in-memory-pushdown"
92-
} else {
93-
"vortex-in-memory"
94-
}
95-
.to_string(),
90+
Format::InMemoryVortex => "vortex-in-memory".to_string(),
9691
Format::OnDiskVortex { enable_compression } => if *enable_compression {
9792
"vortex-file-compressed"
9893
} else {

bench-vortex/src/tpch/mod.rs

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use vortex::file::{VortexWriteOptions, VORTEX_FILE_EXTENSION};
2020
use vortex::sampling_compressor::SamplingCompressor;
2121
use vortex::variants::StructArrayTrait;
2222
use vortex::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant};
23-
use vortex_datafusion::memory::VortexMemTableOptions;
2423
use vortex_datafusion::persistent::VortexFormat;
2524
use vortex_datafusion::SessionContextExt;
2625

@@ -64,17 +63,8 @@ pub async fn load_datasets<P: AsRef<Path>>(
6463
Format::Parquet => {
6564
register_parquet(&context, stringify!($name), &$name, $schema).await
6665
}
67-
Format::InMemoryVortex {
68-
enable_pushdown, ..
69-
} => {
70-
register_vortex(
71-
&context,
72-
stringify!($name),
73-
&$name,
74-
$schema,
75-
enable_pushdown,
76-
)
77-
.await
66+
Format::InMemoryVortex => {
67+
register_vortex(&context, stringify!($name), &$name, $schema).await
7868
}
7969
Format::OnDiskVortex { enable_compression } => {
8070
register_vortex_file(
@@ -305,7 +295,6 @@ async fn register_vortex(
305295
name: &str,
306296
file: &Path,
307297
schema: &Schema,
308-
enable_pushdown: bool,
309298
) -> anyhow::Result<()> {
310299
let record_batches = session
311300
.read_csv(
@@ -330,11 +319,7 @@ async fn register_vortex(
330319
let dtype = chunks[0].dtype().clone();
331320
let chunked_array = ChunkedArray::try_new(chunks, dtype)?.into_array();
332321

333-
session.register_mem_vortex_opts(
334-
name,
335-
chunked_array,
336-
VortexMemTableOptions::default().with_pushdown(enable_pushdown),
337-
)?;
322+
session.register_mem_vortex(name, chunked_array)?;
338323

339324
Ok(())
340325
}

vortex-datafusion/src/lib.rs

Lines changed: 11 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
//! Connectors to enable DataFusion to read Vortex data.
2-
2+
#![deny(missing_docs)]
33
#![allow(clippy::nonminimal_bool)]
44
#![allow(clippy::cast_possible_truncation)]
55

@@ -12,7 +12,7 @@ use datafusion_expr::{Expr, Operator};
1212
use vortex_array::{ArrayDType, ArrayData};
1313
use vortex_error::vortex_err;
1414

15-
use crate::memory::{VortexMemTable, VortexMemTableOptions};
15+
use crate::memory::VortexMemTable;
1616

1717
pub mod memory;
1818
pub mod persistent;
@@ -50,36 +50,17 @@ fn supported_data_types(dt: DataType) -> bool {
5050
is_supported
5151
}
5252

53+
/// Extension function to the DataFusion [`SessionContext`] for registering Vortex tables.
5354
pub trait SessionContextExt {
54-
fn register_mem_vortex<S: AsRef<str>>(&self, name: S, array: ArrayData) -> DFResult<()> {
55-
self.register_mem_vortex_opts(name, array, VortexMemTableOptions::default())
56-
}
55+
/// Register an in-memory Vortex [`ArrayData`] as a DataFusion table.
56+
fn register_mem_vortex<S: AsRef<str>>(&self, name: S, array: ArrayData) -> DFResult<()>;
5757

58-
fn register_mem_vortex_opts<S: AsRef<str>>(
59-
&self,
60-
name: S,
61-
array: ArrayData,
62-
options: VortexMemTableOptions,
63-
) -> DFResult<()>;
64-
65-
fn read_mem_vortex(&self, array: ArrayData) -> DFResult<DataFrame> {
66-
self.read_mem_vortex_opts(array, VortexMemTableOptions::default())
67-
}
68-
69-
fn read_mem_vortex_opts(
70-
&self,
71-
array: ArrayData,
72-
options: VortexMemTableOptions,
73-
) -> DFResult<DataFrame>;
58+
/// Read an in-memory Vortex [`ArrayData`] into a DataFusion [`DataFrame`].
59+
fn read_mem_vortex(&self, array: ArrayData) -> DFResult<DataFrame>;
7460
}
7561

7662
impl SessionContextExt for SessionContext {
77-
fn register_mem_vortex_opts<S: AsRef<str>>(
78-
&self,
79-
name: S,
80-
array: ArrayData,
81-
options: VortexMemTableOptions,
82-
) -> DFResult<()> {
63+
fn register_mem_vortex<S: AsRef<str>>(&self, name: S, array: ArrayData) -> DFResult<()> {
8364
if !array.dtype().is_struct() {
8465
return Err(vortex_err!(
8566
"Vortex arrays must have struct type, found {}",
@@ -88,16 +69,12 @@ impl SessionContextExt for SessionContext {
8869
.into());
8970
}
9071

91-
let vortex_table = VortexMemTable::new(array, options);
72+
let vortex_table = VortexMemTable::new(array);
9273
self.register_table(name.as_ref(), Arc::new(vortex_table))
9374
.map(|_| ())
9475
}
9576

96-
fn read_mem_vortex_opts(
97-
&self,
98-
array: ArrayData,
99-
options: VortexMemTableOptions,
100-
) -> DFResult<DataFrame> {
77+
fn read_mem_vortex(&self, array: ArrayData) -> DFResult<DataFrame> {
10178
if !array.dtype().is_struct() {
10279
return Err(vortex_err!(
10380
"Vortex arrays must have struct type, found {}",
@@ -106,7 +83,7 @@ impl SessionContextExt for SessionContext {
10683
.into());
10784
}
10885

109-
let vortex_table = VortexMemTable::new(array, options);
86+
let vortex_table = VortexMemTable::new(array);
11087

11188
self.read_table(Arc::new(vortex_table))
11289
}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
//! In-memory implementation of a Vortex table provider.
12
mod exec;
23
mod plans;
34
mod provider;
45
mod statistics;
56
mod stream;
67

7-
pub use provider::{VortexMemTable, VortexMemTableOptions};
8+
pub use provider::VortexMemTable;

vortex-datafusion/src/memory/provider.rs

Lines changed: 2 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ use crate::memory::plans::{RowSelectorExec, TakeRowsExec};
3232
pub struct VortexMemTable {
3333
array: ChunkedArray,
3434
schema_ref: SchemaRef,
35-
options: VortexMemTableOptions,
3635
}
3736

3837
impl VortexMemTable {
@@ -41,7 +40,7 @@ impl VortexMemTable {
4140
/// # Panics
4241
///
4342
/// Creation will panic if the provided array is not of `DType::Struct` type.
44-
pub fn new(array: ArrayData, options: VortexMemTableOptions) -> Self {
43+
pub fn new(array: ArrayData) -> Self {
4544
let arrow_schema = infer_schema(array.dtype()).vortex_expect("schema is inferable");
4645
let schema_ref = SchemaRef::new(arrow_schema);
4746

@@ -54,11 +53,7 @@ impl VortexMemTable {
5453
}
5554
};
5655

57-
Self {
58-
array,
59-
schema_ref,
60-
options,
61-
}
56+
Self { array, schema_ref }
6257
}
6358
}
6459

@@ -141,15 +136,6 @@ impl TableProvider for VortexMemTable {
141136
&self,
142137
filters: &[&Expr],
143138
) -> DFResult<Vec<TableProviderFilterPushDown>> {
144-
// In the case the caller has configured this provider with filter pushdown disabled,
145-
// do not attempt to apply any filters at scan time.
146-
if !self.options.enable_pushdown {
147-
return Ok(filters
148-
.iter()
149-
.map(|_| TableProviderFilterPushDown::Unsupported)
150-
.collect());
151-
}
152-
153139
filters
154140
.iter()
155141
.map(|expr| {
@@ -163,27 +149,6 @@ impl TableProvider for VortexMemTable {
163149
}
164150
}
165151

166-
/// Optional configurations to pass when loading a [VortexMemTable].
167-
#[derive(Debug, Clone)]
168-
pub struct VortexMemTableOptions {
169-
pub enable_pushdown: bool,
170-
}
171-
172-
impl Default for VortexMemTableOptions {
173-
fn default() -> Self {
174-
Self {
175-
enable_pushdown: true,
176-
}
177-
}
178-
}
179-
180-
impl VortexMemTableOptions {
181-
pub fn with_pushdown(mut self, enable_pushdown: bool) -> Self {
182-
self.enable_pushdown = enable_pushdown;
183-
self
184-
}
185-
}
186-
187152
/// Construct an operator plan that executes in two stages.
188153
///
189154
/// The first plan stage only materializes the columns related to the provided set of filter
@@ -220,7 +185,6 @@ mod test {
220185
use vortex_array::array::{PrimitiveArray, StructArray, VarBinViewArray};
221186
use vortex_array::{ArrayData, IntoArrayData};
222187

223-
use crate::memory::VortexMemTableOptions;
224188
use crate::{can_be_pushed_down, SessionContextExt as _};
225189

226190
fn presidents_array() -> ArrayData {
@@ -271,44 +235,6 @@ mod test {
271235
);
272236
}
273237

274-
#[tokio::test]
275-
#[cfg_attr(miri, ignore)]
276-
async fn test_datafusion_no_pushdown() {
277-
let ctx = SessionContext::new();
278-
279-
let df = ctx
280-
.read_mem_vortex_opts(
281-
presidents_array(),
282-
// Disable pushdown. We run this test to make sure that the naive codepath also
283-
// produces correct results and does not panic anywhere.
284-
VortexMemTableOptions::default().with_pushdown(false),
285-
)
286-
.unwrap();
287-
288-
let distinct_names = df
289-
.filter(col("term_start").gt_eq(lit(1795)))
290-
.unwrap()
291-
.filter(col("term_start").lt(lit(2000)))
292-
.unwrap()
293-
.aggregate(vec![], vec![count_distinct(col("president"))])
294-
.unwrap()
295-
.collect()
296-
.await
297-
.unwrap();
298-
299-
assert_eq!(distinct_names.len(), 1);
300-
301-
assert_eq!(
302-
*distinct_names[0]
303-
.column(0)
304-
.as_primitive::<Int64Type>()
305-
.values()
306-
.first()
307-
.unwrap(),
308-
4i64
309-
);
310-
}
311-
312238
#[test]
313239
fn test_can_be_pushed_down0() {
314240
let e = BinaryExpr {

0 commit comments

Comments
 (0)