33
44use std:: collections:: BTreeSet ;
55use std:: ops:: { BitAnd , Range } ;
6- use std:: sync:: { Arc , OnceLock } ;
6+ use std:: sync:: { Arc , LazyLock , OnceLock } ;
77
88use arrow_buffer:: BooleanBufferBuilder ;
99use futures:: future:: { BoxFuture , Shared } ;
@@ -20,10 +20,10 @@ use vortex_expr::{ExprRef, root};
2020use vortex_mask:: Mask ;
2121use vortex_utils:: aliases:: dash_map:: DashMap ;
2222
23- use crate :: LayoutReader ;
2423use crate :: layouts:: zoned:: ZonedLayout ;
2524use crate :: layouts:: zoned:: zone_map:: ZoneMap ;
2625use crate :: segments:: SegmentSource ;
26+ use crate :: { LayoutReader , LayoutReaderRef , LazyReaderChildren } ;
2727
2828type SharedZoneMap = Shared < BoxFuture < ' static , SharedVortexResult < ZoneMap > > > ;
2929type SharedPruningResult = Shared < BoxFuture < ' static , SharedVortexResult < Arc < PruningResult > > > > ;
@@ -32,21 +32,17 @@ type PredicateCache = Arc<OnceLock<Option<ExprRef>>>;
3232pub struct ZonedReader {
3333 layout : ZonedLayout ,
3434 name : Arc < str > ,
35-
36- /// Data layout reader
37- data_child : Arc < dyn LayoutReader > ,
38- /// Zone map layout reader.
39- zones_child : Arc < dyn LayoutReader > ,
35+ lazy_children : LazyReaderChildren ,
4036
4137 /// A cache of expr -> optional pruning result (applying the pruning expr to the zone map)
42- pruning_result : DashMap < ExprRef , Option < SharedPruningResult > > ,
38+ pruning_result : LazyLock < DashMap < ExprRef , Option < SharedPruningResult > > > ,
4339
4440 /// Shared zone map
4541 zone_map : OnceLock < SharedZoneMap > ,
4642
4743 /// A cache of expr -> optional pruning predicate.
4844 /// This also uses the present_stats from the `ZonedLayout`
49- pruning_predicates : Arc < DashMap < ExprRef , PredicateCache > > ,
45+ pruning_predicates : LazyLock < Arc < DashMap < ExprRef , PredicateCache > > > ,
5046}
5147
5248impl ZonedReader {
@@ -55,24 +51,24 @@ impl ZonedReader {
5551 name : Arc < str > ,
5652 segment_source : Arc < dyn SegmentSource > ,
5753 ) -> VortexResult < Self > {
58- let data_child = layout
59- . data
60- . new_reader ( name. clone ( ) , segment_source. clone ( ) ) ?;
61- let zones_child = layout
62- . zones
63- . new_reader ( format ! ( "{name}.zones" ) . into ( ) , segment_source) ?;
54+ let lazy_children =
55+ LazyReaderChildren :: new ( layout. children . clone ( ) , segment_source. clone ( ) ) ;
6456
6557 Ok ( Self {
6658 layout,
6759 name,
68- data_child,
69- zones_child,
60+ lazy_children,
7061 pruning_result : Default :: default ( ) ,
7162 zone_map : Default :: default ( ) ,
7263 pruning_predicates : Default :: default ( ) ,
7364 } )
7465 }
7566
67+ #[ inline]
68+ fn data_child ( & self ) -> VortexResult < & LayoutReaderRef > {
69+ self . lazy_children . get ( 0 , self . layout . dtype ( ) , & self . name )
70+ }
71+
7672 /// Get or create the pruning predicate for a given expression.
7773 fn pruning_predicate ( & self , expr : ExprRef ) -> Option < ExprRef > {
7874 self . pruning_predicates
@@ -101,7 +97,16 @@ impl ZonedReader {
10197 let present_stats = self . layout . present_stats . clone ( ) ;
10298
10399 let zones_eval = self
104- . zones_child
100+ . lazy_children
101+ . get (
102+ 1 ,
103+ & ZoneMap :: dtype_for_stats_table (
104+ self . layout . dtype ( ) ,
105+ self . layout . present_stats ( ) ,
106+ ) ,
107+ & format ! ( "{}.zones" , self . name) . into ( ) ,
108+ )
109+ . vortex_expect ( "failed to get zone child" )
105110 . projection_evaluation (
106111 & ( 0 ..nzones as u64 ) ,
107112 & root ( ) ,
@@ -178,11 +183,11 @@ impl LayoutReader for ZonedReader {
178183 }
179184
180185 fn dtype ( & self ) -> & DType {
181- self . data_child . dtype ( )
186+ self . layout . dtype ( )
182187 }
183188
184189 fn row_count ( & self ) -> Precision < u64 > {
185- self . data_child . row_count ( )
190+ Precision :: exact ( self . layout . row_count ( ) )
186191 }
187192
188193 fn register_splits (
@@ -191,7 +196,7 @@ impl LayoutReader for ZonedReader {
191196 row_offset : u64 ,
192197 splits : & mut BTreeSet < u64 > ,
193198 ) -> VortexResult < ( ) > {
194- self . data_child
199+ self . data_child ( ) ?
195200 . register_splits ( field_mask, row_offset, splits)
196201 }
197202
@@ -203,7 +208,7 @@ impl LayoutReader for ZonedReader {
203208 ) -> VortexResult < MaskFuture > {
204209 log:: debug!( "Stats pruning evaluation: {} - {}" , & self . name, expr) ;
205210 let data_eval = self
206- . data_child
211+ . data_child ( ) ?
207212 . pruning_evaluation ( row_range, expr, mask. clone ( ) ) ?;
208213
209214 let Some ( pruning_mask_future) = self . pruning_mask_future ( expr. clone ( ) ) else {
@@ -236,10 +241,10 @@ impl LayoutReader for ZonedReader {
236241 Ok ( MaskFuture :: new ( mask. len ( ) , async move {
237242 log:: debug!( "Invoking stats pruning evaluation {}: {}" , name, expr) ;
238243
239- let pruning_mask = pruning_mask_future. clone ( ) . await ?. mask ( ) ?;
244+ let pruning_mask = pruning_mask_future. await ?. mask ( ) ?;
240245
241246 let mut builder = BooleanBufferBuilder :: new ( mask. len ( ) ) ;
242- for ( zone_idx, & zone_length) in zone_range. clone ( ) . zip_eq ( & zone_lengths) {
247+ for ( zone_idx, & zone_length) in zone_range. zip_eq ( & zone_lengths) {
243248 builder. append_n ( zone_length, !pruning_mask. value ( usize:: try_from ( zone_idx) ?) ) ;
244249 }
245250
@@ -273,7 +278,7 @@ impl LayoutReader for ZonedReader {
273278 expr : & ExprRef ,
274279 mask : MaskFuture ,
275280 ) -> VortexResult < MaskFuture > {
276- self . data_child . filter_evaluation ( row_range, expr, mask)
281+ self . data_child ( ) ? . filter_evaluation ( row_range, expr, mask)
277282 }
278283
279284 fn projection_evaluation (
@@ -284,7 +289,8 @@ impl LayoutReader for ZonedReader {
284289 ) -> VortexResult < BoxFuture < ' static , VortexResult < ArrayRef > > > {
285290 // TODO(ngates): there are some projection expressions that we may also be able to
286291 // short-circuit with statistics.
287- self . data_child . projection_evaluation ( row_range, expr, mask)
292+ self . data_child ( ) ?
293+ . projection_evaluation ( row_range, expr, mask)
288294 }
289295}
290296
0 commit comments