Skip to content

Commit 092707f

Browse files
authored
Cache partitioned expressions in StructLayoutReader (#1947)
They're not trivially cheap to build
1 parent dfdeaf0 commit 092707f

File tree

4 files changed

+56
-7
lines changed

4 files changed

+56
-7
lines changed

vortex-array/src/stats/statsset.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,24 @@
1-
use enum_iterator::all;
1+
use enum_iterator::{all, Sequence};
22
use itertools::{EitherOrBoth, Itertools};
33
use vortex_dtype::DType;
44
use vortex_error::{vortex_panic, VortexError};
55
use vortex_scalar::Scalar;
66

77
use crate::stats::Stat;
88

9-
#[derive(Debug, Clone, Default, PartialEq, Eq)]
9+
#[derive(Debug, Clone, PartialEq, Eq)]
1010
pub struct StatsSet {
1111
values: Vec<(Stat, Scalar)>,
1212
}
1313

14+
impl Default for StatsSet {
15+
fn default() -> Self {
16+
Self {
17+
values: Vec::with_capacity(Stat::CARDINALITY),
18+
}
19+
}
20+
}
21+
1422
impl StatsSet {
1523
/// Create new StatSet without validating uniqueness of all the entries
1624
///

vortex-file/src/v2/file.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ impl<I: IoDriver> VortexFile<I> {
6262
.reader(segment_channel.reader(), self.ctx.clone())?;
6363

6464
// Now we give one end of the channel to the layout reader...
65-
log::info!("Starting scan with {} splits", self.splits.len());
65+
log::debug!("Starting scan with {} splits", self.splits.len());
6666
let exec_stream = stream::iter(ArcIter::new(self.splits.clone()))
6767
.map(move |row_range| scan.clone().range_scan(row_range))
6868
.map(move |range_scan| match range_scan {

vortex-layout/src/layouts/struct_/eval_expr.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use vortex_array::array::StructArray;
55
use vortex_array::validity::Validity;
66
use vortex_array::{ArrayData, IntoArrayData};
77
use vortex_error::VortexResult;
8-
use vortex_expr::transform::partition::partition;
98
use vortex_expr::ExprRef;
109
use vortex_scan::RowMask;
1110

@@ -16,7 +15,7 @@ use crate::ExprEvaluator;
1615
impl ExprEvaluator for StructReader {
1716
async fn evaluate_expr(&self, row_mask: RowMask, expr: ExprRef) -> VortexResult<ArrayData> {
1817
// Partition the expression into expressions that can be evaluated over individual fields
19-
let partitioned = partition(expr.clone(), self.struct_dtype())?;
18+
let partitioned = self.partition_expr(expr.clone())?;
2019
let field_readers: Vec<_> = partitioned
2120
.partitions
2221
.iter()

vortex-layout/src/layouts/struct_/reader.rs

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1-
use std::sync::{Arc, OnceLock};
1+
use std::hash::Hash;
2+
use std::sync::{Arc, OnceLock, RwLock};
23

3-
use vortex_array::aliases::hash_map::HashMap;
4+
use vortex_array::aliases::hash_map::{Entry, HashMap};
45
use vortex_array::ContextRef;
56
use vortex_dtype::{DType, FieldName, StructDType};
67
use vortex_error::{vortex_err, vortex_panic, VortexExpect, VortexResult};
8+
use vortex_expr::transform::partition::{partition, PartitionedExpr};
9+
use vortex_expr::ExprRef;
710

811
use crate::layouts::struct_::StructLayout;
912
use crate::segments::AsyncSegmentReader;
@@ -18,6 +21,8 @@ pub struct StructReader {
1821

1922
field_readers: Arc<[OnceLock<Arc<dyn LayoutReader>>]>,
2023
field_lookup: HashMap<FieldName, usize>,
24+
25+
expr_cache: Arc<RwLock<HashMap<ExactExpr, Arc<PartitionedExpr>>>>,
2126
}
2227

2328
impl StructReader {
@@ -52,6 +57,7 @@ impl StructReader {
5257
segments,
5358
field_readers,
5459
field_lookup,
60+
expr_cache: Arc::new(Default::default()),
5561
})
5662
}
5763

@@ -77,10 +83,46 @@ impl StructReader {
7783
child_layout.reader(self.segments.clone(), self.ctx.clone())
7884
})
7985
}
86+
87+
/// Utility for partitioning an expression over the fields of a struct.
88+
pub(crate) fn partition_expr(&self, expr: ExprRef) -> VortexResult<Arc<PartitionedExpr>> {
89+
Ok(
90+
match self
91+
.expr_cache
92+
.write()
93+
.map_err(|_| vortex_err!("poisoned lock"))?
94+
.entry(ExactExpr(expr.clone()))
95+
{
96+
Entry::Occupied(entry) => entry.get().clone(),
97+
Entry::Vacant(entry) => entry
98+
.insert(Arc::new(partition(expr, self.struct_dtype())?))
99+
.clone(),
100+
},
101+
)
102+
}
80103
}
81104

82105
impl LayoutReader for StructReader {
83106
fn layout(&self) -> &LayoutData {
84107
&self.layout
85108
}
86109
}
110+
111+
/// An expression wrapper that performs pointer equality.
112+
/// NOTE(ngates): we should consider if this shoud live in vortex-expr crate?
113+
#[derive(Clone)]
114+
struct ExactExpr(ExprRef);
115+
116+
impl PartialEq for ExactExpr {
117+
fn eq(&self, other: &Self) -> bool {
118+
Arc::ptr_eq(&self.0, &other.0)
119+
}
120+
}
121+
122+
impl Eq for ExactExpr {}
123+
124+
impl Hash for ExactExpr {
125+
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
126+
Arc::as_ptr(&self.0).hash(state)
127+
}
128+
}

0 commit comments

Comments
 (0)