Skip to content

Commit 05b9a5d

Browse files
committed
fix: remove block on call in cuda_execute
Signed-off-by: Alexander Droste <alexander.droste@protonmail.com>
1 parent 8f13bda commit 05b9a5d

File tree

3 files changed

+53
-5
lines changed

3 files changed

+53
-5
lines changed

vortex-cuda/src/dynamic_dispatch/plan_builder.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
1010
use std::sync::Arc;
1111

12-
use futures::executor::block_on;
1312
use vortex::array::ArrayRef;
1413
use vortex::array::DynArray;
1514
use vortex::array::ExecutionCtx;
@@ -324,13 +323,24 @@ impl PlanBuilderState<'_> {
324323
fn walk_primitive(&mut self, array: ArrayRef) -> VortexResult<Pipeline> {
325324
let prim = array.to_canonical()?.into_primitive();
326325
let PrimitiveArrayParts { buffer, .. } = prim.into_parts();
327-
let device_buf = block_on(self.ctx.ensure_on_device(buffer))?;
326+
327+
// TODO(0ax1): Optimize device buffer allocation and copying.
328+
//
329+
// Ideally, there would be a buffer pool of preallocated device memory
330+
// such that retrieving a device pointer is O(1) when building the
331+
// dynamic dispatch plan. In the current setup, we need to allocate the
332+
// buffer before we can get the device pointer. As the memory is
333+
// allocated via the global allocator, which does not pin the host
334+
// memory to physical addresses unlike `cudaHostAlloc`, the subsequent
335+
// memory copy from host to device is sync and cannot be pushed to the
336+
// CUDA stream as an async operation.
337+
let device_buf = self.ctx.ensure_on_device_sync(buffer)?;
328338
let ptr = device_buf.cuda_device_ptr()?;
329339
self.device_buffers.push(device_buf);
330340
Ok(Pipeline {
331341
source: SourceOp::load(),
332342
scalar_ops: vec![],
333-
input_ptr: ptr as u64,
343+
input_ptr: ptr,
334344
})
335345
}
336346

@@ -354,13 +364,13 @@ impl PlanBuilderState<'_> {
354364
vortex_bail!("Dynamic dispatch does not support BitPackedArray with patches");
355365
}
356366

357-
let device_buf = block_on(self.ctx.ensure_on_device(packed))?;
367+
let device_buf = self.ctx.ensure_on_device_sync(packed)?;
358368
let ptr = device_buf.cuda_device_ptr()?;
359369
self.device_buffers.push(device_buf);
360370
Ok(Pipeline {
361371
source: SourceOp::bitunpack(bit_width, offset),
362372
scalar_ops: vec![],
363-
input_ptr: ptr as u64,
373+
input_ptr: ptr,
364374
})
365375
}
366376

vortex-cuda/src/executor.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,22 @@ impl CudaExecutionCtx {
252252
self.stream.copy_to_device(host_buffer)?.await
253253
}
254254

255+
/// Synchronous variant of [`ensure_on_device`](Self::ensure_on_device).
256+
///
257+
/// Safe to call from within an async executor (no nested `block_on`).
258+
/// The copy is enqueued on the stream and completes before any subsequent
259+
/// work on the same stream.
260+
pub fn ensure_on_device_sync(&self, handle: BufferHandle) -> VortexResult<BufferHandle> {
261+
if handle.is_on_device() {
262+
return Ok(handle);
263+
}
264+
let host_buffer = handle
265+
.as_host_opt()
266+
.ok_or_else(|| vortex_err!("Buffer is not on host"))?
267+
.clone();
268+
self.stream.copy_to_device_sync(host_buffer.as_ref())
269+
}
270+
255271
/// Returns a reference to the underlying [`VortexCudaStream`].
256272
///
257273
/// Through [`Deref`][std::ops::Deref], this also provides access to the

vortex-cuda/src/stream.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,28 @@ impl VortexCudaStream {
8989
Ok(BufferHandle::new_device(Arc::new(cuda_buf)))
9090
}))
9191
}
92+
93+
/// Synchronous variant of [`copy_to_device`](Self::copy_to_device).
94+
///
95+
/// Allocates device memory, enqueues the H2D copy on the stream, and
96+
/// returns immediately. The device pointer is valid as soon as this call
97+
/// returns; the copy completes before any later work on the same stream.
98+
///
99+
/// For **pageable** host memory (the common case), `memcpy_htod` stages
100+
/// the source into a driver-managed pinned buffer before returning, so
101+
/// the source data is safe to drop after this call.
102+
pub(crate) fn copy_to_device_sync<T>(&self, data: &[T]) -> VortexResult<BufferHandle>
103+
where
104+
T: DeviceRepr + Debug + Send + Sync + 'static,
105+
{
106+
let mut cuda_slice: CudaSlice<T> = self.device_alloc(data.len())?;
107+
108+
self.memcpy_htod(data, &mut cuda_slice)
109+
.map_err(|e| vortex_err!("Failed to schedule H2D copy: {}", e))?;
110+
111+
let cuda_buf = CudaDeviceBuffer::new(cuda_slice);
112+
Ok(BufferHandle::new_device(Arc::new(cuda_buf)))
113+
}
92114
}
93115

94116
/// Registers a callback and asynchronously waits for its completion.

0 commit comments

Comments
 (0)