|
| 1 | +// SPDX-License-Identifier: Apache-2.0 |
| 2 | +// SPDX-FileCopyrightText: Copyright the Vortex contributors |
| 3 | + |
| 4 | +use std::marker::PhantomData; |
| 5 | +use std::sync::Arc; |
| 6 | + |
| 7 | +use num_traits::ToPrimitive; |
| 8 | +use vortex_buffer::{Buffer, BufferMut, ByteBuffer}; |
| 9 | +use vortex_compute::filter::Filter; |
| 10 | +use vortex_dtype::{DType, PTypeDowncastExt, match_each_integer_ptype}; |
| 11 | +use vortex_error::{VortexExpect, VortexResult}; |
| 12 | +use vortex_vector::Vector; |
| 13 | +use vortex_vector::binaryview::{ |
| 14 | + BinaryType, BinaryView, BinaryViewType, BinaryViewVector, StringType, |
| 15 | +}; |
| 16 | + |
| 17 | +use crate::ArrayRef; |
| 18 | +use crate::arrays::{VarBinArray, VarBinVTable}; |
| 19 | +use crate::execution::{BatchKernel, BatchKernelRef, BindCtx, MaskExecution}; |
| 20 | +use crate::vtable::{OperatorVTable, ValidityHelper}; |
| 21 | + |
| 22 | +impl OperatorVTable<VarBinVTable> for VarBinVTable { |
| 23 | + fn bind( |
| 24 | + array: &VarBinArray, |
| 25 | + selection: Option<&ArrayRef>, |
| 26 | + ctx: &mut dyn BindCtx, |
| 27 | + ) -> VortexResult<BatchKernelRef> { |
| 28 | + let mask = ctx.bind_selection(array.len(), selection)?; |
| 29 | + let validity = ctx.bind_validity(array.validity(), array.len(), selection)?; |
| 30 | + let offsets = ctx.bind(array.offsets(), None)?; |
| 31 | + |
| 32 | + match array.dtype() { |
| 33 | + DType::Utf8(_) => Ok(Box::new(VarBinKernel::<StringType>::new( |
| 34 | + offsets, |
| 35 | + array.bytes().clone(), |
| 36 | + validity, |
| 37 | + mask, |
| 38 | + ))), |
| 39 | + DType::Binary(_) => Ok(Box::new(VarBinKernel::<BinaryType>::new( |
| 40 | + offsets, |
| 41 | + array.bytes().clone(), |
| 42 | + validity, |
| 43 | + mask, |
| 44 | + ))), |
| 45 | + _ => unreachable!("invalid DType for VarBinArray {}", array.dtype()), |
| 46 | + } |
| 47 | + } |
| 48 | +} |
| 49 | + |
| 50 | +struct VarBinKernel<V> { |
| 51 | + offsets: BatchKernelRef, |
| 52 | + bytes: ByteBuffer, |
| 53 | + validity: MaskExecution, |
| 54 | + selection: MaskExecution, |
| 55 | + _type: PhantomData<V>, |
| 56 | +} |
| 57 | + |
| 58 | +impl<V> VarBinKernel<V> { |
| 59 | + fn new( |
| 60 | + offsets: BatchKernelRef, |
| 61 | + bytes: ByteBuffer, |
| 62 | + validity: MaskExecution, |
| 63 | + selection: MaskExecution, |
| 64 | + ) -> Self { |
| 65 | + Self { |
| 66 | + offsets, |
| 67 | + bytes, |
| 68 | + validity, |
| 69 | + selection, |
| 70 | + _type: PhantomData, |
| 71 | + } |
| 72 | + } |
| 73 | +} |
| 74 | + |
| 75 | +impl<V: BinaryViewType> BatchKernel for VarBinKernel<V> { |
| 76 | + fn execute(self: Box<Self>) -> VortexResult<Vector> { |
| 77 | + let offsets = self.offsets.execute()?.into_primitive(); |
| 78 | + |
| 79 | + match_each_integer_ptype!(offsets.ptype(), |T| { |
| 80 | + let pvec = offsets.downcast::<T>(); |
| 81 | + // NOTE: discard the validity because offsets must be non-nullable |
| 82 | + let (offsets, _) = pvec.into_parts(); |
| 83 | + let first = offsets[0]; |
| 84 | + |
| 85 | + let lens: Buffer<u32> = offsets |
| 86 | + .iter() |
| 87 | + .copied() |
| 88 | + .skip(1) |
| 89 | + .scan(first, |prev, next| { |
| 90 | + let len = (next - *prev) |
| 91 | + .to_u32() |
| 92 | + .vortex_expect("offset must map to u32"); |
| 93 | + *prev = next; |
| 94 | + Some(len) |
| 95 | + }) |
| 96 | + .collect(); |
| 97 | + |
| 98 | + let mut views = BufferMut::with_capacity(lens.len()); |
| 99 | + |
| 100 | + for (offset, len) in std::iter::zip(offsets, lens) { |
| 101 | + let offset = offset.to_u32().vortex_expect("offset must fit in u32"); |
| 102 | + let bytes = &self.bytes[offset as usize..(offset + len) as usize]; |
| 103 | + let view = if len as usize <= BinaryView::MAX_INLINED_SIZE { |
| 104 | + BinaryView::new_inlined(bytes) |
| 105 | + } else { |
| 106 | + BinaryView::make_view(bytes, 0, offset) |
| 107 | + }; |
| 108 | + views.push(view); |
| 109 | + } |
| 110 | + |
| 111 | + let selection = self.selection.execute()?; |
| 112 | + let validity = self.validity.execute()?; |
| 113 | + |
| 114 | + let views = views.freeze().filter(&selection); |
| 115 | + |
| 116 | + Ok(Vector::from(BinaryViewVector::<V>::new( |
| 117 | + views, |
| 118 | + Arc::new([self.bytes.clone()]), |
| 119 | + validity, |
| 120 | + ))) |
| 121 | + }) |
| 122 | + } |
| 123 | +} |
| 124 | + |
| 125 | +#[cfg(test)] |
| 126 | +mod tests { |
| 127 | + use rstest::{fixture, rstest}; |
| 128 | + use vortex_dtype::{DType, Nullability}; |
| 129 | + |
| 130 | + use crate::IntoArray; |
| 131 | + use crate::arrays::builder::VarBinBuilder; |
| 132 | + use crate::arrays::{BoolArray, VarBinArray}; |
| 133 | + |
| 134 | + #[fixture] |
| 135 | + fn strings() -> VarBinArray { |
| 136 | + let mut strings = VarBinBuilder::<u32>::with_capacity(5); |
| 137 | + strings.append_value("inlined"); |
| 138 | + strings.append_null(); |
| 139 | + strings.append_value("large string 1"); |
| 140 | + strings.append_value("large string 2"); |
| 141 | + strings.append_value("large string 3"); |
| 142 | + strings.finish(DType::Utf8(Nullability::Nullable)) |
| 143 | + } |
| 144 | + |
| 145 | + #[rstest] |
| 146 | + fn test_bind(strings: VarBinArray) { |
| 147 | + // Attempt to bind with a full selection. |
| 148 | + let strings_vec = strings |
| 149 | + .bind(None, &mut ()) |
| 150 | + .unwrap() |
| 151 | + .execute() |
| 152 | + .unwrap() |
| 153 | + .into_string(); |
| 154 | + assert_eq!(strings_vec.get(0), Some("inlined")); |
| 155 | + assert_eq!(strings_vec.get(1), None); |
| 156 | + assert_eq!(strings_vec.get(2), Some("large string 1")); |
| 157 | + assert_eq!(strings_vec.get(3), Some("large string 2")); |
| 158 | + assert_eq!(strings_vec.get(4), Some("large string 3")); |
| 159 | + } |
| 160 | + |
| 161 | + #[rstest] |
| 162 | + fn test_bind_with_selection(strings: VarBinArray) { |
| 163 | + let selection = BoolArray::from_iter([false, true, false, true, true]).into_array(); |
| 164 | + let strings_vec = strings |
| 165 | + .bind(Some(&selection), &mut ()) |
| 166 | + .unwrap() |
| 167 | + .execute() |
| 168 | + .unwrap() |
| 169 | + .into_string(); |
| 170 | + |
| 171 | + assert_eq!(strings_vec.get(0), None); |
| 172 | + assert_eq!(strings_vec.get(1), Some("large string 2")); |
| 173 | + assert_eq!(strings_vec.get(2), Some("large string 3")); |
| 174 | + } |
| 175 | +} |
0 commit comments