diff --git a/vortex-gpu/kernels/dict_take.cu b/vortex-gpu/kernels/dict_take.cu index 3f3b1b0a551..1f29b729a8b 100644 --- a/vortex-gpu/kernels/dict_take.cu +++ b/vortex-gpu/kernels/dict_take.cu @@ -12,7 +12,7 @@ __device__ void dict_take( ValueT *__restrict values_out ) { auto i = threadIdx.x; - auto block_offset = (blockIdx.x * 1024); + auto block_offset = blockIdx.x * 1024; auto codes = codes_array + block_offset; auto out = values_out + block_offset; @@ -20,8 +20,8 @@ __device__ void dict_take( const int thread_ops = 32; for (auto j = 0; j < thread_ops; j++) { - auto idx = i * thread_ops + j; - out[idx] = values[codes[idx]]; + auto idx = i * thread_ops + j; + out[idx] = values[codes[idx]]; } } @@ -33,8 +33,8 @@ __device__ void dict_take_masked( ValueT *__restrict values_out ) { auto i = threadIdx.x; - auto block_offset = (blockIdx.x * 1024); - auto mask_block_offset = (blockIdx.x * (1024 / 32)); + auto block_offset = blockIdx.x * 1024; + auto mask_block_offset = blockIdx.x * (1024 / 32); auto codes = codes_array + block_offset; auto mask = mask_array + mask_block_offset; diff --git a/vortex-gpu/kernels/rle_decompress.cu b/vortex-gpu/kernels/rle_decompress.cu new file mode 100644 index 00000000000..77686515976 --- /dev/null +++ b/vortex-gpu/kernels/rle_decompress.cu @@ -0,0 +1,143 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#include +#include +#include + +template +__device__ void rle_decompress( + const IndicesT *__restrict indices_array, + const ValueT *__restrict values_array, + const OffsetsT *__restrict offsets, + ValueT *__restrict values_out +) { + auto i = threadIdx.x; + auto block_offset = blockIdx.x * 1024; + + auto indices = indices_array + block_offset; + auto out = values_out + block_offset; + auto values = values_array + offsets[blockIdx.x]; + + const int thread_ops = 32; + + for (auto j = 0; j < thread_ops; j++) { + auto idx = i * thread_ops + j; + out[idx] = values[indices[idx]]; + } +} + +// Macro to generate the extern "C" wrapper for each type combination +#define GENERATE_KERNEL(indices_suffix, value_suffix, offsets_suffix, IndicesType, ValueType, OffsetsType) \ +extern "C" __global__ void rle_decompress_i##indices_suffix##_v##value_suffix##_o##offsets_suffix( \ + const IndicesType *__restrict indices_array, \ + const ValueType *__restrict values_array, \ + const OffsetsType *__restrict offsets, \ + ValueType *__restrict values_out \ +) { \ + rle_decompress(indices_array, values_array, offsets, values_out); \ +} + +// Generate all combinations +// Unsigned types +GENERATE_KERNEL(u8, u8, u8, uint8_t, uint8_t, uint8_t) +GENERATE_KERNEL(u8, u8, u16, uint8_t, uint8_t, uint16_t) +GENERATE_KERNEL(u8, u8, u32, uint8_t, uint8_t, uint32_t) +GENERATE_KERNEL(u8, u8, u64, uint8_t, uint8_t, uint64_t) + +GENERATE_KERNEL(u8, u16, u8, uint8_t, uint16_t, uint8_t) +GENERATE_KERNEL(u8, u16, u16, uint8_t, uint16_t, uint16_t) +GENERATE_KERNEL(u8, u16, u32, uint8_t, uint16_t, uint32_t) +GENERATE_KERNEL(u8, u16, u64, uint8_t, uint16_t, uint64_t) + +GENERATE_KERNEL(u8, u32, u8, uint8_t, uint32_t, uint8_t) +GENERATE_KERNEL(u8, u32, u16, uint8_t, uint32_t, uint16_t) +GENERATE_KERNEL(u8, u32, u32, uint8_t, uint32_t, uint32_t) +GENERATE_KERNEL(u8, u32, u64, uint8_t, uint32_t, uint64_t) + +GENERATE_KERNEL(u8, u64, u8, uint8_t, uint64_t, uint8_t) +GENERATE_KERNEL(u8, u64, u16, uint8_t, uint64_t, uint16_t) +GENERATE_KERNEL(u8, u64, u32, uint8_t, uint64_t, uint32_t) +GENERATE_KERNEL(u8, u64, u64, uint8_t, uint64_t, uint64_t) + +GENERATE_KERNEL(u16, u8, u8, uint16_t, uint8_t, uint8_t) +GENERATE_KERNEL(u16, u8, u16, uint16_t, uint8_t, uint16_t) +GENERATE_KERNEL(u16, u8, u32, uint16_t, uint8_t, uint32_t) +GENERATE_KERNEL(u16, u8, u64, uint16_t, uint8_t, uint64_t) + +GENERATE_KERNEL(u16, u16, u8, uint16_t, uint16_t, uint8_t) +GENERATE_KERNEL(u16, u16, u16, uint16_t, uint16_t, uint16_t) +GENERATE_KERNEL(u16, u16, u32, uint16_t, uint16_t, uint32_t) +GENERATE_KERNEL(u16, u16, u64, uint16_t, uint16_t, uint64_t) + +GENERATE_KERNEL(u16, u32, u8, uint16_t, uint32_t, uint8_t) +GENERATE_KERNEL(u16, u32, u16, uint16_t, uint32_t, uint16_t) +GENERATE_KERNEL(u16, u32, u32, uint16_t, uint32_t, uint32_t) +GENERATE_KERNEL(u16, u32, u64, uint16_t, uint32_t, uint64_t) + +GENERATE_KERNEL(u16, u64, u8, uint16_t, uint64_t, uint8_t) +GENERATE_KERNEL(u16, u64, u16, uint16_t, uint64_t, uint16_t) +GENERATE_KERNEL(u16, u64, u32, uint16_t, uint64_t, uint32_t) +GENERATE_KERNEL(u16, u64, u64, uint16_t, uint64_t, uint64_t) + +// Signed types +GENERATE_KERNEL(u8, i8, u8, uint8_t, int8_t, uint8_t) +GENERATE_KERNEL(u8, i8, u16, uint8_t, int8_t, uint16_t) +GENERATE_KERNEL(u8, i8, u32, uint8_t, int8_t, uint32_t) +GENERATE_KERNEL(u8, i8, u64, uint8_t, int8_t, uint64_t) + +GENERATE_KERNEL(u8, i16, u8, uint8_t, int16_t, uint8_t) +GENERATE_KERNEL(u8, i16, u16, uint8_t, int16_t, uint16_t) +GENERATE_KERNEL(u8, i16, u32, uint8_t, int16_t, uint32_t) +GENERATE_KERNEL(u8, i16, u64, uint8_t, int16_t, uint64_t) + +GENERATE_KERNEL(u8, i32, u8, uint8_t, int32_t, uint8_t) +GENERATE_KERNEL(u8, i32, u16, uint8_t, int32_t, uint16_t) +GENERATE_KERNEL(u8, i32, u32, uint8_t, int32_t, uint32_t) +GENERATE_KERNEL(u8, i32, u64, uint8_t, int32_t, uint64_t) + +GENERATE_KERNEL(u8, i64, u8, uint8_t, int64_t, uint8_t) +GENERATE_KERNEL(u8, i64, u16, uint8_t, int64_t, uint16_t) +GENERATE_KERNEL(u8, i64, u32, uint8_t, int64_t, uint32_t) +GENERATE_KERNEL(u8, i64, u64, uint8_t, int64_t, uint64_t) + +GENERATE_KERNEL(u16, i8, u8, uint16_t, int8_t, uint8_t) +GENERATE_KERNEL(u16, i8, u16, uint16_t, int8_t, uint16_t) +GENERATE_KERNEL(u16, i8, u32, uint16_t, int8_t, uint32_t) +GENERATE_KERNEL(u16, i8, u64, uint16_t, int8_t, uint64_t) + +GENERATE_KERNEL(u16, i16, u8, uint16_t, int16_t, uint8_t) +GENERATE_KERNEL(u16, i16, u16, uint16_t, int16_t, uint16_t) +GENERATE_KERNEL(u16, i16, u32, uint16_t, int16_t, uint32_t) +GENERATE_KERNEL(u16, i16, u64, uint16_t, int16_t, uint64_t) + +GENERATE_KERNEL(u16, i32, u8, uint16_t, int32_t, uint8_t) +GENERATE_KERNEL(u16, i32, u16, uint16_t, int32_t, uint16_t) +GENERATE_KERNEL(u16, i32, u32, uint16_t, int32_t, uint32_t) +GENERATE_KERNEL(u16, i32, u64, uint16_t, int32_t, uint64_t) + +GENERATE_KERNEL(u16, i64, u8, uint16_t, int64_t, uint8_t) +GENERATE_KERNEL(u16, i64, u16, uint16_t, int64_t, uint16_t) +GENERATE_KERNEL(u16, i64, u32, uint16_t, int64_t, uint32_t) +GENERATE_KERNEL(u16, i64, u64, uint16_t, int64_t, uint64_t) + +// Float types +GENERATE_KERNEL(u8, f32, u8, uint8_t, float, uint8_t) +GENERATE_KERNEL(u8, f32, u16, uint8_t, float, uint16_t) +GENERATE_KERNEL(u8, f32, u32, uint8_t, float, uint32_t) +GENERATE_KERNEL(u8, f32, u64, uint8_t, float, uint64_t) + +GENERATE_KERNEL(u8, f64, u8, uint8_t, double, uint8_t) +GENERATE_KERNEL(u8, f64, u16, uint8_t, double, uint16_t) +GENERATE_KERNEL(u8, f64, u32, uint8_t, double, uint32_t) +GENERATE_KERNEL(u8, f64, u64, uint8_t, double, uint64_t) + +GENERATE_KERNEL(u16, f32, u8, uint16_t, float, uint8_t) +GENERATE_KERNEL(u16, f32, u16, uint16_t, float, uint16_t) +GENERATE_KERNEL(u16, f32, u32, uint16_t, float, uint32_t) +GENERATE_KERNEL(u16, f32, u64, uint16_t, float, uint64_t) + +GENERATE_KERNEL(u16, f64, u8, uint16_t, double, uint8_t) +GENERATE_KERNEL(u16, f64, u16, uint16_t, double, uint16_t) +GENERATE_KERNEL(u16, f64, u32, uint16_t, double, uint32_t) +GENERATE_KERNEL(u16, f64, u64, uint16_t, double, uint64_t) diff --git a/vortex-gpu/src/bit_unpack.rs b/vortex-gpu/src/bit_unpack.rs index 86f3d54a007..e69388f5263 100644 --- a/vortex-gpu/src/bit_unpack.rs +++ b/vortex-gpu/src/bit_unpack.rs @@ -4,7 +4,7 @@ // This code is only exercised on CI with cuda and linux #![allow(dead_code)] -use std::sync::{Arc, LazyLock}; +use std::sync::Arc; use std::time::Duration; use cudarc::driver::sys::CUevent_flags::CU_EVENT_DEFAULT; @@ -13,7 +13,6 @@ use cudarc::driver::{ PushKernelArg, }; use cudarc::nvrtc::Ptx; -use parking_lot::RwLock; use vortex_array::Canonical; use vortex_array::arrays::PrimitiveArray; use vortex_array::validity::Validity; @@ -21,7 +20,6 @@ use vortex_buffer::{Buffer, BufferMut}; use vortex_dtype::{NativePType, PType, match_each_unsigned_integer_ptype}; use vortex_error::{VortexExpect, VortexResult, vortex_err}; use vortex_fastlanes::BitPackedArray; -use vortex_utils::aliases::hash_map::HashMap; use crate::task::GPUTask; @@ -40,16 +38,10 @@ impl UnpackKernelId { } } -static CUDA_KERNELS: LazyLock>> = - LazyLock::new(|| RwLock::new(HashMap::new())); - fn cuda_bit_unpack_kernel( kernel_id: UnpackKernelId, ctx: Arc, ) -> VortexResult { - if let Some(kernel) = CUDA_KERNELS.read().get(&kernel_id) { - return Ok(kernel.clone()); - } let module = ctx .load_module(Ptx::from_file(format!( "kernels/gen/fls_{}_bit_unpack.ptx", @@ -57,7 +49,7 @@ fn cuda_bit_unpack_kernel( ))) .map_err(|e| vortex_err!("Failed to load kernel module: {e}"))?; - let kernel_func = module + module .load_function( format!( "fls_unpack_{}bw_{}ow_{}t", @@ -71,9 +63,7 @@ fn cuda_bit_unpack_kernel( ) .as_ref(), ) - .map_err(|e| vortex_err!("Failed to load function: {e}"))?; - CUDA_KERNELS.write().insert(kernel_id, kernel_func.clone()); - Ok(kernel_func) + .map_err(|e| vortex_err!("Failed to load function: {e}")) } pub fn cuda_bit_unpack( diff --git a/vortex-gpu/src/for_.rs b/vortex-gpu/src/for_.rs index 68bdfd1da53..a935a786f66 100644 --- a/vortex-gpu/src/for_.rs +++ b/vortex-gpu/src/for_.rs @@ -181,12 +181,5 @@ mod tests { primitive_array.as_slice::(), unpacked.as_slice::() ); - for i in 0..primitive_array.len() { - assert_eq!( - primitive_array.as_slice::()[i], - unpacked.as_slice::()[i], - "i {i}" - ); - } } } diff --git a/vortex-gpu/src/lib.rs b/vortex-gpu/src/lib.rs index 0af2782f515..b7ec66c9978 100644 --- a/vortex-gpu/src/lib.rs +++ b/vortex-gpu/src/lib.rs @@ -4,6 +4,7 @@ pub mod bit_unpack; pub mod for_; mod for_bp; +mod rle_decompress; mod take; mod task; diff --git a/vortex-gpu/src/rle_decompress.rs b/vortex-gpu/src/rle_decompress.rs new file mode 100644 index 00000000000..cb00d29b0c4 --- /dev/null +++ b/vortex-gpu/src/rle_decompress.rs @@ -0,0 +1,241 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#![allow(dead_code)] + +use std::sync::Arc; + +use cudarc::driver::{ + CudaContext, CudaFunction, CudaSlice, CudaStream, CudaViewMut, DeviceRepr, LaunchConfig, + PushKernelArg, ValidAsZeroBits, +}; +use cudarc::nvrtc::Ptx; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::validity::Validity; +use vortex_array::{Canonical, ToCanonical}; +use vortex_buffer::BufferMut; +use vortex_dtype::{ + NativePType, UnsignedPType, match_each_native_ptype, match_each_unsigned_integer_ptype, +}; +use vortex_error::{VortexExpect, VortexResult, vortex_err, vortex_panic}; +use vortex_fastlanes::RLEArray; + +use crate::task::GPUTask; + +struct RLETask { + values: CudaSlice, + indices: CudaSlice, + offsets: CudaSlice, + output: CudaSlice, + func: CudaFunction, + launch_config: LaunchConfig, + stream: Arc, + len: usize, +} + +impl GPUTask for RLETask { + fn launch_task(&mut self) -> VortexResult<()> { + let mut launch = self.stream.launch_builder(&self.func); + launch.arg(&self.indices); + launch.arg(&self.values); + launch.arg(&self.offsets); + launch.arg(&mut self.output); + unsafe { launch.launch(self.launch_config) } + .map_err(|e| vortex_err!("Failed to launch: {e}")) + .map(|_| ()) + } + + fn export_result(&mut self) -> VortexResult { + let rounded_len = self.len.next_multiple_of(1024); + let mut buffer = BufferMut::::with_capacity(rounded_len); + unsafe { buffer.set_len(rounded_len) } + + self.stream + .memcpy_dtoh(&self.output, &mut buffer) + .map_err(|e| vortex_err!("Failed to copy to device: {e}"))?; + self.stream + .synchronize() + .map_err(|e| vortex_err!("Failed to synchronize: {e}"))?; + + Ok(Canonical::Primitive(PrimitiveArray::new( + buffer.freeze().slice(0..self.len), + Validity::NonNullable, + ))) + } + + fn output(&mut self) -> CudaViewMut<'_, u8> { + unsafe { + self.output + .transmute_mut(self.len() * size_of::()) + .vortex_expect("Failed to transmute") + } + } + + fn len(&self) -> usize { + self.len + } +} + +#[allow(clippy::cognitive_complexity)] +pub fn new_task( + rle: &RLEArray, + ctx: Arc, + stream: Arc, +) -> VortexResult> { + assert_eq!(rle.offset(), 0); + assert_eq!( + rle.values_idx_offsets() + .scalar_at(0) + .as_primitive() + .as_::() + .vortex_expect("non null offset"), + 0u64 + ); + + match_each_native_ptype!(rle.values().dtype().as_ptype(), |V| { + match_each_unsigned_integer_ptype!(rle.values_idx_offsets().dtype().as_ptype(), |O| { + // RLE indices are always u16 (or u8 if downcasted). + match rle.indices().dtype().as_ptype() { + PType::U8 => cuda_rle_task( + rle.indices().to_primitive().as_slice::(), + rle.values().to_primitive().as_slice::(), + rle.values_idx_offsets().to_primitive().as_slice::(), + rle.len(), + ctx, + stream, + ) + .map(|t| Box::new(t) as Box), + PType::U16 => cuda_rle_task( + rle.indices().to_primitive().as_slice::(), + rle.values().to_primitive().as_slice::(), + rle.values_idx_offsets().to_primitive().as_slice::(), + rle.len(), + ctx, + stream, + ) + .map(|t| Box::new(t) as Box), + _ => vortex_panic!( + "Unsupported index type for RLE decoding: {}", + rle.indices().dtype().as_ptype() + ), + } + }) + }) +} + +fn cuda_rle_task( + indices: &[Indices], + values: &[Values], + offsets: &[Offsets], + len: usize, + ctx: Arc, + stream: Arc, +) -> VortexResult> +where + Values: NativePType + DeviceRepr + ValidAsZeroBits, + Indices: UnsignedPType + DeviceRepr, + Offsets: UnsignedPType + DeviceRepr, +{ + let kernel_func = cuda_rle_kernel::(ctx)?; + let num_chunks = + u32::try_from(indices.len().div_ceil(1024)).vortex_expect("num chunks overflow"); + + let cu_indices = stream + .memcpy_stod(indices) + .map_err(|e| vortex_err!("Failed to copy to device: {e}"))?; + let cu_values = stream + .memcpy_stod(values) + .map_err(|e| vortex_err!("Failed to copy to device: {e}"))?; + let cu_offsets = stream + .memcpy_stod(offsets) + .map_err(|e| vortex_err!("Failed to copy to device: {e}"))?; + + let output_len = len.next_multiple_of(1024); + let cu_out = unsafe { + stream + .alloc::(output_len) + .map_err(|e| vortex_err!("Failed to allocate stream: {e}"))? + }; + + Ok(RLETask { + values: cu_values, + indices: cu_indices, + offsets: cu_offsets, + output: cu_out, + func: kernel_func, + launch_config: LaunchConfig { + grid_dim: (num_chunks, 1, 1), + block_dim: (32, 1, 1), + shared_mem_bytes: 0, + }, + stream, + len, + }) +} + +fn cuda_rle_kernel(ctx: Arc) -> VortexResult +where + Indices: UnsignedPType, + Values: NativePType, + Offsets: UnsignedPType, +{ + let module = ctx + .load_module(Ptx::from_file("kernels/rle_decompress.ptx")) + .map_err(|e| vortex_err!("Failed to load kernel module: {e}"))?; + + let kernel_name = format!( + "rle_decompress_i{}_v{}_o{}", + &Indices::PTYPE, + &Values::PTYPE, + &Offsets::PTYPE, + ); + + module + .load_function(&kernel_name) + .map_err(|e| vortex_err!("Failed to load function: {e}")) +} + +pub fn cuda_rle_decompress( + array: &RLEArray, + ctx: Arc, +) -> VortexResult { + let stream = ctx.default_stream(); + let mut task = new_task(array, ctx, stream)?; + task.launch_task()?; + task.export_result().map(|c| c.into_primitive()) +} + +#[cfg(all(target_os = "linux", feature = "cuda"))] +#[cfg(test)] +mod tests { + use cudarc::driver::CudaContext; + use rstest::rstest; + use vortex_array::arrays::PrimitiveArray; + use vortex_array::validity::Validity; + use vortex_buffer::Buffer; + use vortex_dtype::NativePType; + use vortex_error::VortexUnwrap; + use vortex_fastlanes::RLEArray; + + use crate::rle_decompress::cuda_rle_decompress; + + #[rstest] + #[case::u8((0u8..100).collect::>())] + #[case::u16((0u16..2000).collect::>())] + #[case::u32((0u32..2000).collect::>())] + #[case::u64((0u64..2000).collect::>())] + #[case::i8((-100i8..100).collect::>())] + #[case::i16((-2000i16..2000).collect::>())] + #[case::i32((-2000i32..2000).collect::>())] + #[case::i64((-2000i64..2000).collect::>())] + #[case::f32((-2000..2000).map(|i| i as f32).collect::>())] + #[case::f64((-2000..2000).map(|i| i as f64).collect::>())] + fn test_cuda_rle_decompress(#[case] values: Buffer) { + let primitive_array = PrimitiveArray::new(values, Validity::NonNullable); + let array = RLEArray::encode(&primitive_array).vortex_unwrap(); + let ctx = CudaContext::new(0).unwrap(); + ctx.set_blocking_synchronize().unwrap(); + let unpacked = cuda_rle_decompress(&array, ctx).unwrap(); + assert_eq!(primitive_array.as_slice::(), unpacked.as_slice::()); + } +} diff --git a/vortex-gpu/src/take.rs b/vortex-gpu/src/take.rs index 4e55ff3a95b..7b1886241d2 100644 --- a/vortex-gpu/src/take.rs +++ b/vortex-gpu/src/take.rs @@ -41,12 +41,12 @@ pub fn cuda_take_masked( let values = dict.values().to_primitive(); let codes = dict.codes().to_primitive(); - let result = match_each_native_ptype!(values.ptype(), |V| { + match_each_native_ptype!(values.ptype(), |V| { match_each_unsigned_integer_ptype!(codes.ptype(), |C| { cuda_take_impl::(codes, values, mask, ctx) }) - }); - result.map(Some) + }) + .map(Some) } fn cuda_take_impl( @@ -75,10 +75,9 @@ where let cu_codes = stream .memcpy_stod(codes_sl) .map_err(|e| vortex_err!("Failed to copy to device: {e}"))?; - let mut cu_out = { - // TODO(joe): use uninit memory + let mut cu_out = unsafe { stream - .alloc_zeros::(codes.len().next_multiple_of(1024)) + .alloc::(codes.len().next_multiple_of(1024)) .map_err(|e| vortex_err!("Failed to allocate stream: {e}"))? };