Skip to content

Commit 6bbaf67

Browse files
authored
Add Hyperloglog add_if_async host API (#784)
This PR adds a new `add_if_async` API that allows users to conditionally perform add operations without the need for custom kernels. In addition, it resolves a small type-deduction issue when deriving the hash value type with `decltype`, ensuring that references and `const` qualifiers are properly removed. Required by rapidsai/cudf#20735
1 parent c5b9c66 commit 6bbaf67

File tree

11 files changed

+498
-142
lines changed

11 files changed

+498
-142
lines changed

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ cmake_minimum_required(VERSION 3.23.1 FATAL_ERROR)
1818
set(rapids-cmake-version 25.12)
1919
if(NOT EXISTS ${CMAKE_CURRENT_BINARY_DIR}/CUCO_RAPIDS.cmake)
2020
file(DOWNLOAD
21-
https://raw.githubusercontent.com/rapidsai/rapids-cmake/branch-${rapids-cmake-version}/RAPIDS.cmake
21+
https://raw.githubusercontent.com/rapidsai/rapids-cmake/release/${rapids-cmake-version}/RAPIDS.cmake
2222
${CMAKE_CURRENT_BINARY_DIR}/CUCO_RAPIDS.cmake)
2323
endif()
2424
include(${CMAKE_CURRENT_BINARY_DIR}/CUCO_RAPIDS.cmake)

include/cuco/detail/hyperloglog/hyperloglog.inl

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2024, NVIDIA CORPORATION.
2+
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -49,6 +49,22 @@ constexpr hyperloglog<T, Scope, Hash, Allocator>::hyperloglog(
4949
this->clear_async(stream);
5050
}
5151

52+
template <class T, cuda::thread_scope Scope, class Hash, class Allocator>
53+
constexpr hyperloglog<T, Scope, Hash, Allocator>::hyperloglog(cuco::precision precision,
54+
Hash const& hash,
55+
Allocator const& alloc,
56+
cuda::stream_ref stream)
57+
: allocator_{alloc},
58+
sketch_{
59+
allocator_.allocate(sketch_bytes(precision) / sizeof(register_type), stream),
60+
detail::custom_deleter{sketch_bytes(precision) / sizeof(register_type), allocator_, stream}},
61+
ref_{
62+
cuda::std::span{reinterpret_cast<cuda::std::byte*>(sketch_.get()), sketch_bytes(precision)},
63+
hash}
64+
{
65+
this->clear_async(stream);
66+
}
67+
5268
template <class T, cuda::thread_scope Scope, class Hash, class Allocator>
5369
constexpr void hyperloglog<T, Scope, Hash, Allocator>::clear_async(cuda::stream_ref stream) noexcept
5470
{
@@ -79,6 +95,14 @@ constexpr void hyperloglog<T, Scope, Hash, Allocator>::add(InputIt first,
7995
ref_.add(first, last, stream);
8096
}
8197

98+
template <class T, cuda::thread_scope Scope, class Hash, class Allocator>
99+
template <class InputIt, class StencilIt, class Predicate>
100+
constexpr void hyperloglog<T, Scope, Hash, Allocator>::add_if_async(
101+
InputIt first, InputIt last, StencilIt stencil, Predicate pred, cuda::stream_ref stream)
102+
{
103+
ref_.add_if_async(first, last, stencil, pred, stream);
104+
}
105+
82106
template <class T, cuda::thread_scope Scope, class Hash, class Allocator>
83107
template <cuda::thread_scope OtherScope, class OtherAllocator>
84108
constexpr void hyperloglog<T, Scope, Hash, Allocator>::merge_async(
@@ -158,6 +182,13 @@ constexpr size_t hyperloglog<T, Scope, Hash, Allocator>::sketch_bytes(
158182
return ref_type<>::sketch_bytes(standard_deviation);
159183
}
160184

185+
template <class T, cuda::thread_scope Scope, class Hash, class Allocator>
186+
constexpr size_t hyperloglog<T, Scope, Hash, Allocator>::sketch_bytes(
187+
cuco::precision precision) noexcept
188+
{
189+
return ref_type<>::sketch_bytes(precision);
190+
}
191+
161192
template <class T, cuda::thread_scope Scope, class Hash, class Allocator>
162193
constexpr size_t hyperloglog<T, Scope, Hash, Allocator>::sketch_alignment() noexcept
163194
{

include/cuco/detail/hyperloglog/hyperloglog_impl.cuh

Lines changed: 115 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,21 @@
1919
#include <cuco/detail/error.hpp>
2020
#include <cuco/detail/hyperloglog/finalizer.cuh>
2121
#include <cuco/detail/hyperloglog/kernels.cuh>
22+
#include <cuco/detail/utility/strong_type.cuh>
2223
#include <cuco/detail/utils.hpp>
2324
#include <cuco/hash_functions.cuh>
24-
#include <cuco/types.cuh>
2525
#include <cuco/utility/cuda_thread_scope.cuh>
2626
#include <cuco/utility/traits.hpp>
2727

2828
#include <cuda/atomic>
29+
#include <cuda/functional>
2930
#include <cuda/std/__algorithm/max.h> // TODO #include <cuda/std/algorithm> once available
3031
#include <cuda/std/bit>
3132
#include <cuda/std/cstddef>
3233
#include <cuda/std/span>
3334
#include <cuda/std/utility>
3435
#include <cuda/stream_ref>
36+
#include <thrust/iterator/constant_iterator.h>
3537
#include <thrust/type_traits/is_contiguous_iterator.h>
3638

3739
#include <cooperative_groups.h>
@@ -40,6 +42,9 @@
4042
#include <vector>
4143

4244
namespace cuco::detail {
45+
CUCO_DEFINE_STRONG_TYPE(sketch_size_kb, double);
46+
CUCO_DEFINE_STRONG_TYPE(standard_deviation, double);
47+
CUCO_DEFINE_STRONG_TYPE(precision, int32_t);
4348

4449
/**
4550
* @brief A GPU-accelerated utility for approximating the number of distinct items in a multiset.
@@ -54,9 +59,9 @@ namespace cuco::detail {
5459
template <class T, cuda::thread_scope Scope, class Hash>
5560
class hyperloglog_impl {
5661
// We use `int` here since this is the smallest type that supports native `atomicMax` on GPUs
57-
using fp_type = double; ///< Floating point type used for reduction
58-
using hash_value_type =
59-
decltype(cuda::std::declval<Hash>()(cuda::std::declval<T>())); ///< Hash value type
62+
using fp_type = double; ///< Floating point type used for reduction
63+
using hash_value_type = cuda::std::remove_cvref_t<decltype(cuda::std::declval<Hash>()(
64+
cuda::std::declval<T>()))>; ///< Hash value type
6065
public:
6166
static constexpr auto thread_scope = Scope; ///< CUDA thread scope
6267

@@ -82,9 +87,9 @@ class hyperloglog_impl {
8287
__host__ __device__ constexpr hyperloglog_impl(cuda::std::span<cuda::std::byte> sketch_span,
8388
Hash const& hash)
8489
: hash_{hash},
85-
precision_{cuda::std::countr_zero(
86-
sketch_bytes(cuco::sketch_size_kb(static_cast<double>(sketch_span.size() / 1024.0))) /
87-
sizeof(register_type))},
90+
precision_{cuda::std::countr_zero(sketch_bytes(cuco::detail::sketch_size_kb(
91+
static_cast<double>(sketch_span.size() / 1024.0))) /
92+
sizeof(register_type))},
8893
sketch_{reinterpret_cast<register_type*>(sketch_span.data()),
8994
this->sketch_bytes() / sizeof(register_type)}
9095
{
@@ -172,6 +177,60 @@ class hyperloglog_impl {
172177
*/
173178
template <class InputIt>
174179
__host__ constexpr void add_async(InputIt first, InputIt last, cuda::stream_ref stream)
180+
{
181+
this->add_if_async(
182+
first, last, thrust::constant_iterator<bool>{true}, cuda::std::identity{}, stream);
183+
}
184+
185+
/**
186+
* @brief Adds to be counted items to the estimator.
187+
*
188+
* @note This function synchronizes the given stream. For asynchronous execution use
189+
* `add_async`.
190+
*
191+
* @tparam InputIt Device accessible random access input iterator where
192+
* <tt>std::is_convertible<std::iterator_traits<InputIt>::value_type,
193+
* T></tt> is `true`
194+
*
195+
* @param first Beginning of the sequence of items
196+
* @param last End of the sequence of items
197+
* @param stream CUDA stream this operation is executed in
198+
*/
199+
template <class InputIt>
200+
__host__ constexpr void add(InputIt first, InputIt last, cuda::stream_ref stream)
201+
{
202+
this->add_async(first, last, stream);
203+
#if CCCL_MAJOR_VERSION > 3 || (CCCL_MAJOR_VERSION == 3 && CCCL_MINOR_VERSION >= 1)
204+
stream.sync();
205+
#else
206+
stream.wait();
207+
#endif
208+
}
209+
210+
/**
211+
* @brief Asynchronously adds items in the range `[first, last)` if `pred` of the corresponding
212+
* stencil returns true.
213+
*
214+
* @note The item `*(first + i)` is added if `pred( *(stencil + i) )` returns true.
215+
*
216+
* @tparam InputIt Device accessible random access input iterator where
217+
* <tt>std::is_convertible<std::iterator_traits<InputIt>::value_type,
218+
* T></tt> is `true`
219+
* @tparam StencilIt Device accessible random access iterator whose value_type is
220+
* convertible to Predicate's argument type
221+
* @tparam Predicate Unary predicate callable whose return type must be convertible to `bool` and
222+
* argument type is convertible from <tt>std::iterator_traits<StencilIt>::value_type</tt>
223+
*
224+
* @param first Beginning of the sequence of items
225+
* @param last End of the sequence of items
226+
* @param stencil Beginning of the stencil sequence
227+
* @param pred Predicate to test on every element in the range `[stencil, stencil +
228+
* std::distance(first, last))`
229+
* @param stream CUDA stream this operation is executed in
230+
*/
231+
template <class InputIt, class StencilIt, class Predicate>
232+
__host__ constexpr void add_if_async(
233+
InputIt first, InputIt last, StencilIt stencil, Predicate pred, cuda::stream_ref stream)
175234
{
176235
auto const num_items = cuco::detail::distance(first, last);
177236
if (num_items == 0) { return; }
@@ -181,8 +240,6 @@ class hyperloglog_impl {
181240
int const shmem_bytes = sketch_bytes();
182241
void const* kernel = nullptr;
183242

184-
// In case the input iterator represents a contiguous memory segment we can employ efficient
185-
// vectorized loads
186243
if constexpr (thrust::is_contiguous_iterator_v<InputIt>) {
187244
auto const ptr = thrust::raw_pointer_cast(&first[0]);
188245
auto constexpr max_vector_bytes = 32;
@@ -193,54 +250,60 @@ class hyperloglog_impl {
193250
switch (vector_size) {
194251
case 2:
195252
kernel = reinterpret_cast<void const*>(
196-
cuco::hyperloglog_ns::detail::add_shmem_vectorized<2, hyperloglog_impl>);
253+
cuco::hyperloglog_ns::detail::
254+
add_if_shmem_vectorized<2, StencilIt, Predicate, hyperloglog_impl>);
197255
break;
198256
case 4:
199257
kernel = reinterpret_cast<void const*>(
200-
cuco::hyperloglog_ns::detail::add_shmem_vectorized<4, hyperloglog_impl>);
258+
cuco::hyperloglog_ns::detail::
259+
add_if_shmem_vectorized<4, StencilIt, Predicate, hyperloglog_impl>);
201260
break;
202261
case 8:
203262
kernel = reinterpret_cast<void const*>(
204-
cuco::hyperloglog_ns::detail::add_shmem_vectorized<8, hyperloglog_impl>);
263+
cuco::hyperloglog_ns::detail::
264+
add_if_shmem_vectorized<8, StencilIt, Predicate, hyperloglog_impl>);
205265
break;
206266
case 16:
207267
kernel = reinterpret_cast<void const*>(
208-
cuco::hyperloglog_ns::detail::add_shmem_vectorized<16, hyperloglog_impl>);
268+
cuco::hyperloglog_ns::detail::
269+
add_if_shmem_vectorized<16, StencilIt, Predicate, hyperloglog_impl>);
209270
break;
210271
};
211272
}
212273

213274
if (kernel != nullptr and this->try_reserve_shmem(kernel, shmem_bytes)) {
214275
if constexpr (thrust::is_contiguous_iterator_v<InputIt>) {
215-
// We make use of the occupancy calculator to get the minimum number of blocks which still
216-
// saturates the GPU. This reduces the shmem initialization overhead and atomic contention
217-
// on the final register array during the merge phase.
218276
CUCO_CUDA_TRY(
219277
cudaOccupancyMaxPotentialBlockSize(&grid_size, &block_size, kernel, shmem_bytes));
220278

221279
auto const ptr = thrust::raw_pointer_cast(&first[0]);
222-
void* kernel_args[] = {
223-
(void*)(&ptr), // TODO can't use reinterpret_cast since it can't cast away const
224-
(void*)(&num_items),
225-
reinterpret_cast<void*>(this)};
280+
void* kernel_args[] = {(void*)(&ptr),
281+
(void*)(&num_items),
282+
(void*)(&stencil),
283+
(void*)(&pred),
284+
reinterpret_cast<void*>(this)};
226285
CUCO_CUDA_TRY(
227286
cudaLaunchKernel(kernel, grid_size, block_size, kernel_args, shmem_bytes, stream.get()));
228287
}
229288
} else {
230289
kernel = reinterpret_cast<void const*>(
231-
cuco::hyperloglog_ns::detail::add_shmem<InputIt, hyperloglog_impl>);
232-
void* kernel_args[] = {(void*)(&first), (void*)(&num_items), reinterpret_cast<void*>(this)};
290+
cuco::hyperloglog_ns::detail::
291+
add_if_shmem<InputIt, StencilIt, Predicate, hyperloglog_impl>);
292+
void* kernel_args[] = {(void*)(&first),
293+
(void*)(&num_items),
294+
(void*)(&stencil),
295+
(void*)(&pred),
296+
reinterpret_cast<void*>(this)};
233297
if (this->try_reserve_shmem(kernel, shmem_bytes)) {
234298
CUCO_CUDA_TRY(
235299
cudaOccupancyMaxPotentialBlockSize(&grid_size, &block_size, kernel, shmem_bytes));
236300

237301
CUCO_CUDA_TRY(
238302
cudaLaunchKernel(kernel, grid_size, block_size, kernel_args, shmem_bytes, stream.get()));
239303
} else {
240-
// Computes sketch directly in global memory. (Fallback path in case there is not enough
241-
// shared memory avalable)
242304
kernel = reinterpret_cast<void const*>(
243-
cuco::hyperloglog_ns::detail::add_gmem<InputIt, hyperloglog_impl>);
305+
cuco::hyperloglog_ns::detail::
306+
add_if_gmem<InputIt, StencilIt, Predicate, hyperloglog_impl>);
244307

245308
CUCO_CUDA_TRY(cudaOccupancyMaxPotentialBlockSize(&grid_size, &block_size, kernel, 0));
246309

@@ -250,31 +313,6 @@ class hyperloglog_impl {
250313
}
251314
}
252315

253-
/**
254-
* @brief Adds to be counted items to the estimator.
255-
*
256-
* @note This function synchronizes the given stream. For asynchronous execution use
257-
* `add_async`.
258-
*
259-
* @tparam InputIt Device accessible random access input iterator where
260-
* <tt>std::is_convertible<std::iterator_traits<InputIt>::value_type,
261-
* T></tt> is `true`
262-
*
263-
* @param first Beginning of the sequence of items
264-
* @param last End of the sequence of items
265-
* @param stream CUDA stream this operation is executed in
266-
*/
267-
template <class InputIt>
268-
__host__ constexpr void add(InputIt first, InputIt last, cuda::stream_ref stream)
269-
{
270-
this->add_async(first, last, stream);
271-
#if CCCL_MAJOR_VERSION > 3 || (CCCL_MAJOR_VERSION == 3 && CCCL_MINOR_VERSION >= 1)
272-
stream.sync();
273-
#else
274-
stream.wait();
275-
#endif
276-
}
277-
278316
/**
279317
* @brief Merges the result of `other` estimator reference into `*this` estimator reference.
280318
*
@@ -484,12 +522,13 @@ class hyperloglog_impl {
484522
*
485523
* @return The number of bytes required for the sketch
486524
*/
487-
[[nodiscard]] __host__ __device__ static constexpr size_t sketch_bytes(
488-
cuco::sketch_size_kb sketch_size_kb) noexcept
525+
[[nodiscard]] __host__ __device__ static constexpr cuda::std::size_t sketch_bytes(
526+
cuco::detail::sketch_size_kb sketch_size_kb) noexcept
489527
{
490528
// minimum precision is 4 or 64 bytes
491-
return cuda::std::max(static_cast<size_t>(sizeof(register_type) * 1ull << 4),
492-
cuda::std::bit_floor(static_cast<size_t>(sketch_size_kb * 1024)));
529+
return cuda::std::max(
530+
static_cast<cuda::std::size_t>(sizeof(register_type) * 1ull << 4),
531+
cuda::std::bit_floor(static_cast<cuda::std::size_t>(sketch_size_kb * 1024)));
493532
}
494533

495534
/**
@@ -499,16 +538,16 @@ class hyperloglog_impl {
499538
*
500539
* @return The number of bytes required for the sketch
501540
*/
502-
[[nodiscard]] __host__ __device__ static constexpr std::size_t sketch_bytes(
503-
cuco::standard_deviation standard_deviation) noexcept
541+
[[nodiscard]] __host__ __device__ static constexpr cuda::std::size_t sketch_bytes(
542+
cuco::detail::standard_deviation standard_deviation) noexcept
504543
{
505544
// implementation taken from
506545
// https://github.com/apache/spark/blob/6a27789ad7d59cd133653a49be0bb49729542abe/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/HyperLogLogPlusPlusHelper.scala#L43
507546

508547
// minimum precision is 4 or 64 bytes
509548
auto const precision = cuda::std::max(
510-
static_cast<int32_t>(4),
511-
static_cast<int32_t>(
549+
static_cast<cuda::std::int32_t>(4),
550+
static_cast<cuda::std::int32_t>(
512551
cuda::std::ceil(2.0 * cuda::std::log(1.106 / standard_deviation) / cuda::std::log(2.0))));
513552

514553
// inverse of this function (ommitting the minimum precision constraint) is
@@ -517,14 +556,30 @@ class hyperloglog_impl {
517556
return sizeof(register_type) * (1ull << precision);
518557
}
519558

559+
/**
560+
* @brief Gets the number of bytes required for the sketch storage.
561+
*
562+
* @param precision HyperLogLog precision parameter
563+
*
564+
* @return The number of bytes required for the sketch
565+
*/
566+
[[nodiscard]] __host__ __device__ static constexpr cuda::std::size_t sketch_bytes(
567+
cuco::detail::precision precision) noexcept
568+
{
569+
// minimum precision is 4 or 64 bytes
570+
auto const clamped_precision =
571+
cuda::std::max(cuda::std::int32_t{4}, cuda::std::int32_t{precision});
572+
return cuda::std::size_t{sizeof(register_type) * (1ull << clamped_precision)};
573+
}
574+
520575
/**
521576
* @brief Gets the alignment required for the sketch storage.
522577
*
523578
* @return The required alignment
524579
*/
525-
[[nodiscard]] __host__ __device__ static constexpr size_t sketch_alignment() noexcept
580+
[[nodiscard]] __host__ __device__ static constexpr cuda::std::size_t sketch_alignment() noexcept
526581
{
527-
return alignof(register_type);
582+
return cuda::std::size_t{alignof(register_type)};
528583
}
529584

530585
private:

0 commit comments

Comments
 (0)