|
1 | 1 | // SPDX-License-Identifier: Apache-2.0 |
2 | 2 | // SPDX-FileCopyrightText: Copyright the Vortex contributors |
3 | 3 |
|
| 4 | +use std::sync::Arc; |
| 5 | + |
| 6 | +use vortex_mask::{Mask, MaskIter, MaskMut}; |
4 | 7 | use vortex_vector::fixed_size_list::{FixedSizeListVector, FixedSizeListVectorMut}; |
| 8 | +use vortex_vector::{Vector, VectorMut, VectorMutOps, VectorOps}; |
5 | 9 |
|
6 | 10 | use crate::filter::Filter; |
7 | 11 |
|
8 | | -// TODO(aduffy): there really isn't a cheap way to implement these is there. |
| 12 | +// TODO(connor): Implement filtering for the other mask types (`BitView`). |
| 13 | + |
| 14 | +/// Density threshold for choosing between indices and slices representation when expanding masks. |
| 15 | +/// |
| 16 | +/// When the mask density is below this threshold, we use indices. Otherwise, we use slices. |
| 17 | +/// |
| 18 | +/// Note that this is somewhat arbitrarily chosen... |
| 19 | +const MASK_EXPANSION_DENSITY_THRESHOLD: f64 = 0.05; |
9 | 20 |
|
10 | | -impl<M> Filter<M> for &FixedSizeListVector { |
| 21 | +impl<M> Filter<M> for &FixedSizeListVector |
| 22 | +where |
| 23 | + for<'a> &'a Mask: Filter<M, Output = Mask>, |
| 24 | + for<'a> &'a Vector: Filter<Mask, Output = Vector>, |
| 25 | +{ |
11 | 26 | type Output = FixedSizeListVector; |
12 | 27 |
|
13 | | - fn filter(self, _selection: &M) -> Self::Output { |
14 | | - // We need to spread the mask out to point to offsets from |
15 | | - // the inner vector type |
16 | | - todo!() |
| 28 | + fn filter(self, selection: &M) -> Self::Output { |
| 29 | + let list_size = self.list_size(); |
| 30 | + let filtered_validity = self.validity().filter(selection); |
| 31 | + |
| 32 | + let filtered_elements = if list_size != 0 { |
| 33 | + // Expand the mask to cover all elements within selected lists. |
| 34 | + let elements_mask = compute_fsl_elements_mask(&filtered_validity, list_size as usize); |
| 35 | + |
| 36 | + // Filter the child elements vector. |
| 37 | + self.elements().as_ref().filter(&elements_mask) |
| 38 | + } else { |
| 39 | + debug_assert!( |
| 40 | + self.elements().is_empty(), |
| 41 | + "degenerate FixedSizeListVector is invalid, it should have no elements" |
| 42 | + ); |
| 43 | + |
| 44 | + self.elements().as_ref().clone() |
| 45 | + }; |
| 46 | + |
| 47 | + // SAFETY: We have verified that: |
| 48 | + // - The case when `list_size == 0` is safe (elements is empty and stays empty). |
| 49 | + // - The `filtered_elements` is guaranteed to have length that is a multiple of `list_size`. |
| 50 | + // - `filtered_validity` has the correct length because we filter with the same |
| 51 | + // `selection` mask. |
| 52 | + unsafe { |
| 53 | + FixedSizeListVector::new_unchecked( |
| 54 | + Arc::new(filtered_elements), |
| 55 | + list_size, |
| 56 | + filtered_validity, |
| 57 | + ) |
| 58 | + } |
17 | 59 | } |
18 | 60 | } |
19 | 61 |
|
20 | | -impl<M> Filter<M> for &mut FixedSizeListVectorMut { |
| 62 | +impl<M> Filter<M> for &mut FixedSizeListVectorMut |
| 63 | +where |
| 64 | + for<'a> &'a mut MaskMut: Filter<M, Output = ()>, |
| 65 | + for<'a> &'a mut VectorMut: Filter<Mask, Output = ()>, |
| 66 | +{ |
21 | 67 | type Output = (); |
22 | 68 |
|
23 | | - fn filter(self, _selection: &M) -> Self::Output { |
24 | | - // We need to spread the mask out to point to offsets from |
25 | | - // the inner vector type |
26 | | - todo!() |
| 69 | + fn filter(self, selection: &M) -> Self::Output { |
| 70 | + let list_size = self.list_size(); |
| 71 | + |
| 72 | + // Filter validity first to get the new length. |
| 73 | + // SAFETY: We will ensure the elements vector is filtered with an appropriately sized mask |
| 74 | + // to maintain the invariant `elements.len() == len * list_size`. |
| 75 | + unsafe { |
| 76 | + self.validity_mut().filter(selection); |
| 77 | + self.set_len(self.validity().len()); |
| 78 | + } |
| 79 | + |
| 80 | + if list_size != 0 { |
| 81 | + // Expand the mask to cover all elements within selected lists. |
| 82 | + // We need to freeze a copy of the validity to get a Mask for the computation. |
| 83 | + let validity_frozen = self.validity().clone().freeze(); |
| 84 | + let elements_mask = compute_fsl_elements_mask(&validity_frozen, list_size as usize); |
| 85 | + |
| 86 | + // Filter the elements vector with the expanded mask. |
| 87 | + // SAFETY: The expanded mask has the correct length (`validity.len() * list_size`), |
| 88 | + // which maintains the invariant after filtering. |
| 89 | + unsafe { |
| 90 | + self.elements_mut().filter(&elements_mask); |
| 91 | + } |
| 92 | + |
| 93 | + debug_assert_eq!( |
| 94 | + self.elements().len(), |
| 95 | + self.len() * list_size as usize, |
| 96 | + "elements length must equal len * list_size after filtering" |
| 97 | + ); |
| 98 | + } else { |
| 99 | + debug_assert!( |
| 100 | + self.elements().is_empty(), |
| 101 | + "degenerate FixedSizeListVector is invalid, it should have no elements" |
| 102 | + ); |
| 103 | + } |
27 | 104 | } |
28 | 105 | } |
| 106 | + |
| 107 | +impl<M> Filter<M> for FixedSizeListVector |
| 108 | +where |
| 109 | + for<'a> &'a FixedSizeListVector: Filter<M, Output = FixedSizeListVector>, |
| 110 | + for<'a> &'a mut FixedSizeListVectorMut: Filter<M, Output = ()>, |
| 111 | +{ |
| 112 | + type Output = Self; |
| 113 | + |
| 114 | + fn filter(self, selection: &M) -> Self { |
| 115 | + match self.try_into_mut() { |
| 116 | + // If we have exclusive access, we can perform the filter in place. |
| 117 | + Ok(mut vector_mut) => { |
| 118 | + (&mut vector_mut).filter(selection); |
| 119 | + vector_mut.freeze() |
| 120 | + } |
| 121 | + // Otherwise, allocate a new vector and fill it in (delegate to the |
| 122 | + // `&FixedSizeListVector` impl). |
| 123 | + Err(vector) => (&vector).filter(selection), |
| 124 | + } |
| 125 | + } |
| 126 | +} |
| 127 | + |
| 128 | +/// Given a mask for a fixed-size list array, creates a new mask for the underlying elements. |
| 129 | +/// |
| 130 | +/// This function simply "expands" out the input `selection_mask` by duplicating each bit |
| 131 | +/// `list_size` times. |
| 132 | +/// |
| 133 | +/// The output [`Mask`] is guaranteed to have a length equal to `selection_mask.len() * list_size`. |
| 134 | +fn compute_fsl_elements_mask(selection_mask: &Mask, list_size: usize) -> Mask { |
| 135 | + let expanded_len = selection_mask.len() * list_size; |
| 136 | + |
| 137 | + let values = match selection_mask { |
| 138 | + Mask::AllTrue(_) => return Mask::AllTrue(expanded_len), |
| 139 | + Mask::AllFalse(_) => return Mask::AllFalse(expanded_len), |
| 140 | + Mask::Values(values) => values, |
| 141 | + }; |
| 142 | + |
| 143 | + // Use threshold_iter to choose the optimal representation based on density. |
| 144 | + let expanded_slices = match values.threshold_iter(MASK_EXPANSION_DENSITY_THRESHOLD) { |
| 145 | + MaskIter::Slices(slices) => { |
| 146 | + // Expand a dense mask (represented as slices) by scaling each slice by `list_size`. |
| 147 | + slices |
| 148 | + .iter() |
| 149 | + .map(|&(start, end)| (start * list_size, end * list_size)) |
| 150 | + .collect() |
| 151 | + } |
| 152 | + MaskIter::Indices(indices) => { |
| 153 | + // Expand a sparse mask (represented as indices) by duplicating each index `list_size` |
| 154 | + // times. |
| 155 | + // |
| 156 | + // Note that in the worst case, it is possible that we create only a few slices with a |
| 157 | + // small range (for example, when list_size <= 2). This could be further optimized, |
| 158 | + // but we choose simplicity for now. |
| 159 | + indices |
| 160 | + .iter() |
| 161 | + .map(|&idx| { |
| 162 | + let start = idx * list_size; |
| 163 | + let end = (idx + 1) * list_size; |
| 164 | + (start, end) |
| 165 | + }) |
| 166 | + .collect() |
| 167 | + } |
| 168 | + }; |
| 169 | + |
| 170 | + Mask::from_slices(expanded_len, expanded_slices) |
| 171 | +} |
0 commit comments