Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions encodings/alp/src/alp/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use std::fmt::Debug;
use std::hash::Hash;
use std::ops::Range;

use vortex_array::Array;
use vortex_array::ArrayBufferVisitor;
Expand All @@ -18,6 +17,7 @@ use vortex_array::IntoArray;
use vortex_array::Precision;
use vortex_array::ProstMetadata;
use vortex_array::SerializeMetadata;
use vortex_array::arrays::SliceVTable;
use vortex_array::buffer::BufferHandle;
use vortex_array::patches::Patches;
use vortex_array::patches::PatchesMetadata;
Expand Down Expand Up @@ -174,9 +174,18 @@ impl VTable for ALPVTable {
)?))
}

fn slice(array: &Self::Array, range: Range<usize>) -> VortexResult<Option<ArrayRef>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a execute_parent here so we don't CPU regress?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Ok(Some(
ALPArray::new(
fn execute_parent(
array: &Self::Array,
parent: &ArrayRef,
_child_idx: usize,
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<Canonical>> {
// CPU-only: if parent is SliceArray, perform slicing of the buffer and any patches
// Note that this triggers compute (binary searching Patches) which we cannot do when the
// buffers live in GPU memory.
if let Some(slice_array) = parent.as_opt::<SliceVTable>() {
let range = slice_array.slice_range().clone();
let sliced_alp = ALPArray::new(
array.encoded().slice(range.clone())?,
array.exponents(),
array
Expand All @@ -185,8 +194,11 @@ impl VTable for ALPVTable {
.transpose()?
.flatten(),
)
.into_array(),
))
.into_array();
return Ok(Some(sliced_alp.execute::<Canonical>(ctx)?));
}

Ok(None)
}
}

Expand Down
10 changes: 10 additions & 0 deletions vortex-array/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -845,4 +845,14 @@ impl<V: VTable> ArrayVisitor for ArrayAdapter<V> {
Ok(metadata) => Debug::fmt(&metadata, f),
}
}

fn is_host(&self) -> bool {
for array in self.depth_first_traversal() {
if !array.buffer_handles().iter().all(BufferHandle::is_on_host) {
return false;
}
}

true
}
}
9 changes: 9 additions & 0 deletions vortex-array/src/array/visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ pub trait ArrayVisitor {

/// Formats a human-readable metadata description.
fn metadata_fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result;

/// Checks if all buffers in the array tree are host-resident.
///
/// This will fail if any buffers of self or child arrays are GPU-resident.
fn is_host(&self) -> bool;
}

impl ArrayVisitor for Arc<dyn Array> {
Expand Down Expand Up @@ -95,6 +100,10 @@ impl ArrayVisitor for Arc<dyn Array> {
fn metadata_fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
self.as_ref().metadata_fmt(f)
}

fn is_host(&self) -> bool {
self.as_ref().is_host()
}
}

pub trait ArrayVisitorExt: Array {
Expand Down
45 changes: 45 additions & 0 deletions vortex-array/src/arrays/filter/rules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,20 @@ use crate::ArrayRef;
use crate::IntoArray;
use crate::arrays::FilterArray;
use crate::arrays::FilterVTable;
use crate::arrays::StructArray;
use crate::arrays::StructArrayParts;
use crate::arrays::StructVTable;
use crate::matchers::Exact;
use crate::optimizer::rules::ArrayParentReduceRule;
use crate::optimizer::rules::ArrayReduceRule;
use crate::optimizer::rules::ParentRuleSet;
use crate::optimizer::rules::ReduceRuleSet;

pub(super) const PARENT_RULES: ParentRuleSet<FilterVTable> =
ParentRuleSet::new(&[ParentRuleSet::lift(&FilterFilterRule)]);

pub(super) const RULES: ReduceRuleSet<FilterVTable> = ReduceRuleSet::new(&[&FilterStructRule]);

/// A simple redecution rule that simplifies a [`FilterArray`] whose child is also a
/// [`FilterArray`].
#[derive(Debug)]
Expand All @@ -39,3 +46,41 @@ impl ArrayParentReduceRule<FilterVTable> for FilterFilterRule {
Ok(Some(new_array.into_array()))
}
}

/// A reduce rule that pushes a filter down into the fields of a StructArray.
#[derive(Debug)]
struct FilterStructRule;

impl ArrayReduceRule<FilterVTable> for FilterStructRule {
fn reduce(&self, array: &FilterArray) -> VortexResult<Option<ArrayRef>> {
let mask = array.filter_mask();
let Some(struct_array) = array.child().as_opt::<StructVTable>() else {
return Ok(None);
};

let len = mask.true_count();
let StructArrayParts {
fields,
struct_fields,
validity,
..
} = struct_array.clone().into_parts();

let filtered_validity = validity.filter(mask)?;

let filtered_fields = fields
.iter()
.map(|field| field.filter(mask.clone()))
.collect::<VortexResult<Vec<_>>>()?;

Ok(Some(
StructArray::new(
struct_fields.names().clone(),
filtered_fields,
len,
filtered_validity,
)
.into_array(),
))
}
}
5 changes: 5 additions & 0 deletions vortex-array/src/arrays/filter/vtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::arrays::filter::array::FilterArray;
use crate::arrays::filter::execute::execute_filter;
use crate::arrays::filter::execute::execute_filter_fast_paths;
use crate::arrays::filter::rules::PARENT_RULES;
use crate::arrays::filter::rules::RULES;
use crate::buffer::BufferHandle;
use crate::executor::ExecutionCtx;
use crate::serde::ArrayChildren;
Expand Down Expand Up @@ -140,6 +141,10 @@ impl VTable for FilterVTable {
) -> VortexResult<Option<ArrayRef>> {
PARENT_RULES.evaluate(array, parent, child_idx)
}

fn reduce(array: &Self::Array) -> VortexResult<Option<ArrayRef>> {
RULES.evaluate(array)
}
}

impl BaseArrayVTable<FilterVTable> for FilterVTable {
Expand Down
13 changes: 13 additions & 0 deletions vortex-array/src/arrays/slice/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ pub struct SliceArray {
pub(super) stats: ArrayStats,
}

pub struct SliceArrayParts {
pub child: ArrayRef,
pub range: Range<usize>,
}

impl SliceArray {
pub fn new(child: ArrayRef, range: Range<usize>) -> Self {
if range.end > child.len() {
Expand All @@ -40,4 +45,12 @@ impl SliceArray {
pub fn child(&self) -> &ArrayRef {
&self.child
}

/// Consume the slice array and return its components.
pub fn into_parts(self) -> SliceArrayParts {
SliceArrayParts {
child: self.child,
range: self.range,
}
}
}
3 changes: 2 additions & 1 deletion vortex-array/src/arrays/varbinview/vtable/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::hash::Hash;

use vortex_dtype::DType;
use vortex_vector::binaryview::BinaryView;

use crate::Precision;
use crate::arrays::varbinview::VarBinViewArray;
Expand All @@ -15,7 +16,7 @@ use crate::vtable::BaseArrayVTable;

impl BaseArrayVTable<VarBinViewVTable> for VarBinViewVTable {
fn len(array: &VarBinViewArray) -> usize {
array.views().len()
array.views_handle().len() / size_of::<BinaryView>()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is a little annoying

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea

}

fn dtype(array: &VarBinViewArray) -> &DType {
Expand Down
15 changes: 15 additions & 0 deletions vortex-array/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ pub trait DeviceBuffer: 'static + Send + Sync + Debug + DynEq + DynHash {

/// Create a new buffer that references a subrange of this buffer at the given
/// slice indices.
///
/// Note that slice indices are in byte units.
fn slice(&self, range: Range<usize>) -> Arc<dyn DeviceBuffer>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we not make this of T?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this change from the CPU version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can't have generics on a dyn trait

Copy link
Contributor Author

@a10y a10y Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BufferHandle has a slice_typed but the Arc<dyn DeviceBuffer> needs to use bytes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can do a ext trait

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont need to now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's done


/// Return a buffer with the given alignment. Where possible, this will be zero-copy.
Expand All @@ -93,6 +95,19 @@ pub trait DeviceBuffer: 'static + Send + Sync + Debug + DynEq + DynHash {
fn aligned(self: Arc<Self>, alignment: Alignment) -> VortexResult<Arc<dyn DeviceBuffer>>;
}

pub trait DeviceBufferExt: DeviceBuffer {
/// Slice a range of elements `T` out of the device buffer.
fn slice_typed<T: Sized>(&self, range: Range<usize>) -> Arc<dyn DeviceBuffer>;
}

impl<B: DeviceBuffer> DeviceBufferExt for B {
fn slice_typed<T: Sized>(&self, range: Range<usize>) -> Arc<dyn DeviceBuffer> {
let start_bytes = range.start * size_of::<T>();
let end_bytes = range.end * size_of::<T>();
self.slice(start_bytes..end_bytes)
}
}

impl Hash for dyn DeviceBuffer {
fn hash<H: Hasher>(&self, state: &mut H) {
self.dyn_hash(state);
Expand Down
30 changes: 18 additions & 12 deletions vortex-array/src/patches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use vortex_vector::primitive::PrimitiveVectorMut;

use crate::Array;
use crate::ArrayRef;
use crate::ArrayVisitor;
use crate::IntoArray;
use crate::ToCanonical;
use crate::arrays::PrimitiveArray;
Expand Down Expand Up @@ -171,25 +172,30 @@ impl Patches {
"Patch indices must be non-nullable unsigned integers, got {:?}",
indices.dtype()
);

vortex_ensure!(
indices.len() <= array_len,
"Patch indices must be shorter than the array length"
);
vortex_ensure!(!indices.is_empty(), "Patch indices must not be empty");

let max = usize::try_from(&indices.scalar_at(indices.len() - 1)?)
.map_err(|_| vortex_err!("indices must be a number"))?;
vortex_ensure!(
max - offset < array_len,
"Patch indices {max:?}, offset {offset} are longer than the array length {array_len}"
);
// Perform validation of components when they are host-resident.
// This is not possible to do eagerly when the data is on GPU memory.
if indices.is_host() && values.is_host() {
let max = usize::try_from(&indices.scalar_at(indices.len() - 1)?)
.map_err(|_| vortex_err!("indices must be a number"))?;
vortex_ensure!(
max - offset < array_len,
"Patch indices {max:?}, offset {offset} are longer than the array length {array_len}"
);

debug_assert!(
is_sorted(indices.as_ref())
.unwrap_or(Some(false))
.unwrap_or(false),
"Patch indices must be sorted"
);
debug_assert!(
is_sorted(indices.as_ref())
.unwrap_or(Some(false))
.unwrap_or(false),
"Patch indices must be sorted"
);
}

Ok(Self {
array_len,
Expand Down
14 changes: 11 additions & 3 deletions vortex-cuda/benches/filter_cuda.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#![allow(clippy::cast_possible_truncation)]

use std::ffi::c_void;
use std::fmt::Debug;
use std::mem::size_of;
use std::time::Duration;

Expand Down Expand Up @@ -124,7 +125,14 @@ async fn run_filter_timed<T: CubFilterable + cudarc::driver::DeviceRepr>(
/// Benchmark filter for a specific type.
fn benchmark_filter_type<T>(c: &mut Criterion, type_name: &str)
where
T: CubFilterable + cudarc::driver::DeviceRepr + From<u8> + Clone + Send + Sync + 'static,
T: CubFilterable
+ cudarc::driver::DeviceRepr
+ From<u8>
+ Debug
+ Clone
+ Send
+ Sync
+ 'static,
{
let mut group = c.benchmark_group(format!("Filter_cuda_{type_name}"));
group.sample_size(10);
Expand Down Expand Up @@ -161,7 +169,7 @@ where
let d_input = d_input_handle
.as_device()
.as_any()
.downcast_ref::<CudaDeviceBuffer<T>>()
.downcast_ref::<CudaDeviceBuffer>()
.unwrap();

// Copy bitmask to device
Expand All @@ -171,7 +179,7 @@ where
let d_bitmask = d_bitmask_handle
.as_device()
.as_any()
.downcast_ref::<CudaDeviceBuffer<u8>>()
.downcast_ref::<CudaDeviceBuffer>()
.unwrap();

// Allocate output and temp buffers
Expand Down
20 changes: 8 additions & 12 deletions vortex-cuda/src/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use vortex_array::arrays::StructArrayParts;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::arrays::VarBinViewArrayParts;
use vortex_array::buffer::BufferHandle;
use vortex_buffer::BitBuffer;
use vortex_buffer::Buffer;
use vortex_buffer::ByteBuffer;
use vortex_error::VortexResult;
Expand All @@ -46,15 +47,10 @@ impl CanonicalCudaExt for Canonical {
..
} = struct_array.into_parts();

let futures = fields
.iter()
.map(|field| field.to_canonical().map(|c| c.into_host()))
.collect::<VortexResult<Vec<_>>>()?;
let host_fields = try_join_all(futures).await?;
let host_fields = host_fields
.into_iter()
.map(Canonical::into_array)
.collect::<Vec<_>>();
let mut host_fields = vec![];
for field in fields.iter() {
host_fields.push(field.to_canonical()?.into_host().await?.into_array());
}

Ok(Canonical::Struct(StructArray::new(
struct_fields.names().clone(),
Expand All @@ -74,9 +70,9 @@ impl CanonicalCudaExt for Canonical {
len,
..
} = bool.into_parts();
Ok(Canonical::Bool(BoolArray::new_handle(
bits, offset, len, validity,
)))

let bits = BitBuffer::new_with_offset(bits.try_into_host()?.await?, offset, len);
Ok(Canonical::Bool(BoolArray::new(bits, validity)))
}
Canonical::Primitive(prim) => {
let PrimitiveArrayParts {
Expand Down
Loading
Loading