Skip to content

Commit 46b17e5

Browse files
Merge branch 'develop' into operators-bench
2 parents 7d3125c + c93b6a7 commit 46b17e5

File tree

4 files changed

+59
-39
lines changed

4 files changed

+59
-39
lines changed

vortex-array/src/arrays/dict/vtable/rules.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use crate::arrays::DictArray;
1515
use crate::arrays::DictVTable;
1616
use crate::arrays::ScalarFnArray;
1717
use crate::builtins::ArrayBuiltins;
18+
use crate::expr::Pack;
1819
use crate::optimizer::ArrayOptimizer;
1920
use crate::optimizer::rules::ArrayParentReduceRule;
2021
use crate::optimizer::rules::ParentRuleSet;
@@ -44,6 +45,12 @@ impl ArrayParentReduceRule<DictVTable> for DictionaryScalarFnValuesPushDownRule
4445
// Check that the scalar function can actually be pushed down.
4546
let sig = parent.scalar_fn().signature();
4647

48+
// Don't push down pack expressions since we might want to unpack them in exporters
49+
// later.
50+
if parent.scalar_fn().is::<Pack>() {
51+
return Ok(None);
52+
}
53+
4754
// If the scalar function is fallible, we cannot push it down since it may fail over a
4855
// value that isn't referenced by any code.
4956
if !array.all_values_referenced && sig.is_fallible() {

vortex-duckdb/src/scan.rs

Lines changed: 26 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,8 @@ use vortex::error::vortex_bail;
4040
use vortex::error::vortex_err;
4141
use vortex::expr::Expression;
4242
use vortex::expr::Pack;
43-
use vortex::expr::and;
4443
use vortex::expr::and_collect;
4544
use vortex::expr::col;
46-
use vortex::expr::lit;
4745
use vortex::expr::root;
4846
use vortex::expr::select;
4947
use vortex::file::OpenOptionsSessionExt;
@@ -53,6 +51,7 @@ use vortex::io::runtime::BlockingRuntime;
5351
use vortex::io::runtime::current::ThreadSafeIterator;
5452
use vortex::layout::layouts::USE_VORTEX_OPERATORS;
5553
use vortex::session::VortexSession;
54+
use vortex_utils::aliases::hash_set::HashSet;
5655

5756
use crate::RUNTIME;
5857
use crate::SESSION;
@@ -179,38 +178,32 @@ fn extract_table_filter_expr(
179178
init: &TableInitInput<VortexTableFunction>,
180179
column_ids: &[u64],
181180
) -> VortexResult<Option<Expression>> {
182-
let table_filter_expr = init
183-
.table_filter_set()
184-
.and_then(|filter| {
185-
filter
186-
.into_iter()
187-
.map(|(idx, ex)| {
188-
let idx_u: usize = idx.as_();
189-
let col_idx: usize = column_ids[idx_u].as_();
190-
let name = init
191-
.bind_data()
192-
.column_names
193-
.get(col_idx)
194-
.vortex_expect("exists");
195-
try_from_table_filter(
196-
&ex,
197-
&col(name.as_str()),
198-
init.bind_data().first_file.dtype(),
199-
)
200-
})
201-
.reduce(|l, r| l?.zip(r?).map(|(l, r)| Ok(and(l, r))).transpose())
202-
})
203-
.transpose()?
204-
.flatten();
205-
206-
let complex_filter_expr = and_collect(init.bind_data().filter_exprs.clone());
207-
let filter_expr = complex_filter_expr
208-
.into_iter()
209-
.chain(table_filter_expr)
210-
.reduce(and)
211-
.unwrap_or_else(|| lit(true));
181+
let mut table_filter_exprs: HashSet<Expression> = if let Some(filter) = init.table_filter_set()
182+
{
183+
filter
184+
.into_iter()
185+
.map(|(idx, ex)| {
186+
let idx_u: usize = idx.as_();
187+
let col_idx: usize = column_ids[idx_u].as_();
188+
let name = init
189+
.bind_data()
190+
.column_names
191+
.get(col_idx)
192+
.vortex_expect("exists");
193+
try_from_table_filter(
194+
&ex,
195+
&col(name.as_str()),
196+
init.bind_data().first_file.dtype(),
197+
)
198+
})
199+
.collect::<VortexResult<Option<HashSet<_>>>>()?
200+
.unwrap_or_else(HashSet::new)
201+
} else {
202+
HashSet::new()
203+
};
212204

213-
Ok(Some(filter_expr))
205+
table_filter_exprs.extend(init.bind_data().filter_exprs.clone());
206+
Ok(and_collect(table_filter_exprs.into_iter().collect_vec()))
214207
}
215208

216209
/// Helper function to open a Vortex file from either a local or S3 URL

vortex-scan/src/arrow.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@ use arrow_schema::SchemaRef;
1010
use futures::Stream;
1111
use futures::TryStreamExt;
1212
use vortex_array::ArrayRef;
13+
use vortex_array::arrow::ArrowArrayExecutor;
1314
use vortex_array::arrow::IntoArrowArray;
1415
use vortex_error::VortexResult;
1516
use vortex_io::runtime::BlockingRuntime;
17+
use vortex_layout::layouts::USE_VORTEX_OPERATORS;
18+
use vortex_session::VortexSession;
1619

1720
use crate::ScanBuilder;
1821

@@ -28,9 +31,10 @@ impl ScanBuilder<ArrayRef> {
2831
runtime: &B,
2932
) -> VortexResult<impl RecordBatchReader + 'static> {
3033
let data_type = DataType::Struct(schema.fields().clone());
34+
let session = self.session().clone();
3135

3236
let iter = self
33-
.map(move |chunk| to_record_batch(chunk, &data_type))
37+
.map(move |chunk| to_record_batch(chunk, &data_type, &session))
3438
.into_iter(runtime)?
3539
.map(|result| result.map_err(|e| ArrowError::ExternalError(Box::new(e))));
3640

@@ -42,19 +46,29 @@ impl ScanBuilder<ArrayRef> {
4246
schema: SchemaRef,
4347
) -> VortexResult<impl Stream<Item = Result<RecordBatch, ArrowError>> + Send + 'static> {
4448
let data_type = DataType::Struct(schema.fields().clone());
49+
let session = self.session().clone();
4550

4651
let stream = self
47-
.map(move |chunk| to_record_batch(chunk, &data_type))
52+
.map(move |chunk| to_record_batch(chunk, &data_type, &session))
4853
.into_stream()?
4954
.map_err(|e| ArrowError::ExternalError(Box::new(e)));
5055

5156
Ok(stream)
5257
}
5358
}
5459

55-
fn to_record_batch(chunk: ArrayRef, data_type: &DataType) -> VortexResult<RecordBatch> {
56-
let arrow = chunk.into_arrow(data_type)?;
57-
Ok(RecordBatch::from(arrow.as_struct().clone()))
60+
fn to_record_batch(
61+
chunk: ArrayRef,
62+
data_type: &DataType,
63+
session: &VortexSession,
64+
) -> VortexResult<RecordBatch> {
65+
if *USE_VORTEX_OPERATORS {
66+
let arrow = chunk.execute_arrow(data_type, session)?;
67+
Ok(RecordBatch::from(arrow.as_struct().clone()))
68+
} else {
69+
let arrow = chunk.into_arrow(data_type)?;
70+
Ok(RecordBatch::from(arrow.as_struct().clone()))
71+
}
5872
}
5973

6074
/// We create an adapter for record batch iterators that supports clone.
@@ -114,6 +128,7 @@ mod tests {
114128
use vortex_error::VortexResult;
115129

116130
use super::*;
131+
use crate::test::SESSION;
117132

118133
fn create_test_struct_array() -> VortexResult<ArrayRef> {
119134
// Create Arrow arrays
@@ -152,7 +167,7 @@ mod tests {
152167
let schema = create_arrow_schema();
153168
let data_type = DataType::Struct(schema.fields().clone());
154169

155-
let batch = to_record_batch(vortex_array, &data_type)?;
170+
let batch = to_record_batch(vortex_array, &data_type, &SESSION)?;
156171
assert_eq!(batch.num_columns(), 2);
157172
assert_eq!(batch.num_rows(), 4);
158173

vortex-scan/src/scan_builder.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,11 @@ impl<A: 'static + Send> ScanBuilder<A> {
180180
self.projection.return_dtype(self.layout_reader.dtype())
181181
}
182182

183+
/// The session used by the scan.
184+
pub fn session(&self) -> &VortexSession {
185+
&self.session
186+
}
187+
183188
/// Map each split of the scan. The function will be run on the spawned task.
184189
pub fn map<B: 'static>(
185190
self,

0 commit comments

Comments
 (0)