@@ -7,13 +7,13 @@ use pco::wrapped::{ChunkDecompressor, FileDecompressor};
77use vortex_array:: pipeline:: {
88 BindContext , BitView , Kernel , KernelCtx , N , PipelineInputs , PipelinedNode ,
99} ;
10- use vortex_buffer:: { BufferMut , ByteBuffer } ;
10+ use vortex_buffer:: ByteBuffer ;
1111use vortex_compute:: expand:: Expand ;
12- use vortex_dtype:: { NativePType , half} ;
12+ use vortex_dtype:: { NativePType , PTypeDowncastExt , half} ;
1313use vortex_error:: { VortexResult , VortexUnwrap , vortex_err} ;
1414use vortex_mask:: MaskMut ;
1515use vortex_vector:: primitive:: PVectorMut ;
16- use vortex_vector:: { VectorMut , VectorMutOps } ;
16+ use vortex_vector:: { Vector , VectorMutOps , VectorOps } ;
1717
1818use crate :: array:: { number_type_from_dtype, vortex_err_from_pco} ;
1919use crate :: { PcoArray , PcoMetadata } ;
@@ -136,12 +136,7 @@ impl<T: Number + NativePType> PcoKernel<T> {
136136}
137137
138138impl < T : Number + NativePType > Kernel for PcoKernel < T > {
139- fn step (
140- & mut self ,
141- _ctx : & mut KernelCtx ,
142- selection : & BitView ,
143- out : VectorMut ,
144- ) -> VortexResult < VectorMut > {
139+ fn step ( & mut self , _ctx : & KernelCtx , selection : & BitView , out : Vector ) -> VortexResult < Vector > {
145140 let remaining_validity = self . validity . split_off ( N . min ( self . validity . len ( ) ) ) ;
146141 let step_validity = std:: mem:: take ( & mut self . validity ) . freeze ( ) ;
147142 let step_true_count = step_validity. true_count ( ) ;
@@ -152,27 +147,28 @@ impl<T: Number + NativePType> Kernel for PcoKernel<T> {
152147 return Ok ( out) ;
153148 }
154149
155- // PCO only stores valid values, not nulls. Therefore, we decompress `true_count` number of elements.
156- let mut decompressed = BufferMut :: < T > :: with_capacity ( step_true_count) ;
150+ let ( elements, _validity) = out. into_primitive ( ) . downcast :: < T > ( ) . into_parts ( ) ;
157151
158- while decompressed. len ( ) < step_true_count {
152+ let mut elements = elements. into_mut ( ) ;
153+
154+ while elements. len ( ) < step_true_count {
159155 // Ensure the page to read is decompressed.
160156 if self . page_buffer . is_empty ( ) {
161157 self . decompress_current_page ( ) ?;
162158 }
163159
164160 let remaining_in_page = self . page_buffer . len ( ) - self . page_position ;
165- let copy_count = ( step_true_count - decompressed . len ( ) ) . min ( remaining_in_page) ;
161+ let copy_count = ( step_true_count - elements . len ( ) ) . min ( remaining_in_page) ;
166162 let page_slice = & self . page_buffer [ self . page_position ..] [ ..copy_count] ;
167163
168164 // SAFETY: Sufficient capacity is pre-allocated.
169165 unsafe {
170166 std:: ptr:: copy_nonoverlapping (
171167 page_slice. as_ptr ( ) as _ ,
172- decompressed . spare_capacity_mut ( ) . as_mut_ptr ( ) ,
168+ elements . spare_capacity_mut ( ) . as_mut_ptr ( ) ,
173169 copy_count,
174170 ) ;
175- decompressed . set_len ( decompressed . len ( ) + copy_count) ;
171+ elements . set_len ( elements . len ( ) + copy_count) ;
176172 }
177173
178174 self . page_position += copy_count;
@@ -183,11 +179,12 @@ impl<T: Number + NativePType> Kernel for PcoKernel<T> {
183179 }
184180 }
185181
186- Ok ( PVectorMut :: new (
187- decompressed. expand ( & step_validity) ,
188- step_validity. into_mut ( ) ,
189- )
190- . into ( ) )
182+ let mut vec = PVectorMut :: new ( elements. expand ( & step_validity) , step_validity. into_mut ( ) ) ;
183+ if vec. len ( ) < N && vec. len ( ) > selection. true_count ( ) {
184+ vec. append_values ( T :: default ( ) , N - vec. len ( ) ) ;
185+ }
186+
187+ Ok ( vec. freeze ( ) . into ( ) )
191188 }
192189}
193190
0 commit comments