Skip to content

Commit d11d9a1

Browse files
committed
Add HLL add_if_async APIs
1 parent 22fb1db commit d11d9a1

File tree

9 files changed

+357
-96
lines changed

9 files changed

+357
-96
lines changed

include/cuco/detail/hyperloglog/hyperloglog.inl

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2024, NVIDIA CORPORATION.
2+
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -79,6 +79,14 @@ constexpr void hyperloglog<T, Scope, Hash, Allocator>::add(InputIt first,
7979
ref_.add(first, last, stream);
8080
}
8181

82+
template <class T, cuda::thread_scope Scope, class Hash, class Allocator>
83+
template <class InputIt, class StencilIt, class Predicate>
84+
constexpr void hyperloglog<T, Scope, Hash, Allocator>::add_if_async(
85+
InputIt first, InputIt last, StencilIt stencil, Predicate pred, cuda::stream_ref stream)
86+
{
87+
ref_.add_if_async(first, last, stencil, pred, stream);
88+
}
89+
8290
template <class T, cuda::thread_scope Scope, class Hash, class Allocator>
8391
template <cuda::thread_scope OtherScope, class OtherAllocator>
8492
constexpr void hyperloglog<T, Scope, Hash, Allocator>::merge_async(

include/cuco/detail/hyperloglog/hyperloglog_impl.cuh

Lines changed: 78 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,14 @@
2626
#include <cuco/utility/traits.hpp>
2727

2828
#include <cuda/atomic>
29+
#include <cuda/functional>
2930
#include <cuda/std/__algorithm/max.h> // TODO #include <cuda/std/algorithm> once available
3031
#include <cuda/std/bit>
3132
#include <cuda/std/cstddef>
3233
#include <cuda/std/span>
3334
#include <cuda/std/utility>
3435
#include <cuda/stream_ref>
36+
#include <thrust/iterator/constant_iterator.h>
3537
#include <thrust/type_traits/is_contiguous_iterator.h>
3638

3739
#include <cooperative_groups.h>
@@ -172,6 +174,60 @@ class hyperloglog_impl {
172174
*/
173175
template <class InputIt>
174176
__host__ constexpr void add_async(InputIt first, InputIt last, cuda::stream_ref stream)
177+
{
178+
auto const always_true = thrust::constant_iterator<bool>(true);
179+
this->add_if_async(first, last, always_true, cuda::std::identity{}, stream);
180+
}
181+
182+
/**
183+
* @brief Adds to be counted items to the estimator.
184+
*
185+
* @note This function synchronizes the given stream. For asynchronous execution use
186+
* `add_async`.
187+
*
188+
* @tparam InputIt Device accessible random access input iterator where
189+
* <tt>std::is_convertible<std::iterator_traits<InputIt>::value_type,
190+
* T></tt> is `true`
191+
*
192+
* @param first Beginning of the sequence of items
193+
* @param last End of the sequence of items
194+
* @param stream CUDA stream this operation is executed in
195+
*/
196+
template <class InputIt>
197+
__host__ constexpr void add(InputIt first, InputIt last, cuda::stream_ref stream)
198+
{
199+
this->add_async(first, last, stream);
200+
#if CCCL_MAJOR_VERSION > 3 || (CCCL_MAJOR_VERSION == 3 && CCCL_MINOR_VERSION >= 1)
201+
stream.sync();
202+
#else
203+
stream.wait();
204+
#endif
205+
}
206+
207+
/**
208+
* @brief Asynchronously adds items in the range `[first, last)` if `pred` of the corresponding
209+
* stencil returns true.
210+
*
211+
* @note The item `*(first + i)` is added if `pred( *(stencil + i) )` returns true.
212+
*
213+
* @tparam InputIt Device accessible random access input iterator where
214+
* <tt>std::is_convertible<std::iterator_traits<InputIt>::value_type,
215+
* T></tt> is `true`
216+
* @tparam StencilIt Device accessible random access iterator whose value_type is
217+
* convertible to Predicate's argument type
218+
* @tparam Predicate Unary predicate callable whose return type must be convertible to `bool` and
219+
* argument type is convertible from <tt>std::iterator_traits<StencilIt>::value_type</tt>
220+
*
221+
* @param first Beginning of the sequence of items
222+
* @param last End of the sequence of items
223+
* @param stencil Beginning of the stencil sequence
224+
* @param pred Predicate to test on every element in the range `[stencil, stencil +
225+
* std::distance(first, last))`
226+
* @param stream CUDA stream this operation is executed in
227+
*/
228+
template <class InputIt, class StencilIt, class Predicate>
229+
__host__ constexpr void add_if_async(
230+
InputIt first, InputIt last, StencilIt stencil, Predicate pred, cuda::stream_ref stream)
175231
{
176232
auto const num_items = cuco::detail::distance(first, last);
177233
if (num_items == 0) { return; }
@@ -181,8 +237,6 @@ class hyperloglog_impl {
181237
int const shmem_bytes = sketch_bytes();
182238
void const* kernel = nullptr;
183239

184-
// In case the input iterator represents a contiguous memory segment we can employ efficient
185-
// vectorized loads
186240
if constexpr (thrust::is_contiguous_iterator_v<InputIt>) {
187241
auto const ptr = thrust::raw_pointer_cast(&first[0]);
188242
auto constexpr max_vector_bytes = 32;
@@ -193,54 +247,60 @@ class hyperloglog_impl {
193247
switch (vector_size) {
194248
case 2:
195249
kernel = reinterpret_cast<void const*>(
196-
cuco::hyperloglog_ns::detail::add_shmem_vectorized<2, hyperloglog_impl>);
250+
cuco::hyperloglog_ns::detail::
251+
add_if_shmem_vectorized<2, StencilIt, Predicate, hyperloglog_impl>);
197252
break;
198253
case 4:
199254
kernel = reinterpret_cast<void const*>(
200-
cuco::hyperloglog_ns::detail::add_shmem_vectorized<4, hyperloglog_impl>);
255+
cuco::hyperloglog_ns::detail::
256+
add_if_shmem_vectorized<4, StencilIt, Predicate, hyperloglog_impl>);
201257
break;
202258
case 8:
203259
kernel = reinterpret_cast<void const*>(
204-
cuco::hyperloglog_ns::detail::add_shmem_vectorized<8, hyperloglog_impl>);
260+
cuco::hyperloglog_ns::detail::
261+
add_if_shmem_vectorized<8, StencilIt, Predicate, hyperloglog_impl>);
205262
break;
206263
case 16:
207264
kernel = reinterpret_cast<void const*>(
208-
cuco::hyperloglog_ns::detail::add_shmem_vectorized<16, hyperloglog_impl>);
265+
cuco::hyperloglog_ns::detail::
266+
add_if_shmem_vectorized<16, StencilIt, Predicate, hyperloglog_impl>);
209267
break;
210268
};
211269
}
212270

213271
if (kernel != nullptr and this->try_reserve_shmem(kernel, shmem_bytes)) {
214272
if constexpr (thrust::is_contiguous_iterator_v<InputIt>) {
215-
// We make use of the occupancy calculator to get the minimum number of blocks which still
216-
// saturates the GPU. This reduces the shmem initialization overhead and atomic contention
217-
// on the final register array during the merge phase.
218273
CUCO_CUDA_TRY(
219274
cudaOccupancyMaxPotentialBlockSize(&grid_size, &block_size, kernel, shmem_bytes));
220275

221276
auto const ptr = thrust::raw_pointer_cast(&first[0]);
222-
void* kernel_args[] = {
223-
(void*)(&ptr), // TODO can't use reinterpret_cast since it can't cast away const
224-
(void*)(&num_items),
225-
reinterpret_cast<void*>(this)};
277+
void* kernel_args[] = {(void*)(&ptr),
278+
(void*)(&num_items),
279+
(void*)(&stencil),
280+
(void*)(&pred),
281+
reinterpret_cast<void*>(this)};
226282
CUCO_CUDA_TRY(
227283
cudaLaunchKernel(kernel, grid_size, block_size, kernel_args, shmem_bytes, stream.get()));
228284
}
229285
} else {
230286
kernel = reinterpret_cast<void const*>(
231-
cuco::hyperloglog_ns::detail::add_shmem<InputIt, hyperloglog_impl>);
232-
void* kernel_args[] = {(void*)(&first), (void*)(&num_items), reinterpret_cast<void*>(this)};
287+
cuco::hyperloglog_ns::detail::
288+
add_if_shmem<InputIt, StencilIt, Predicate, hyperloglog_impl>);
289+
void* kernel_args[] = {(void*)(&first),
290+
(void*)(&num_items),
291+
(void*)(&stencil),
292+
(void*)(&pred),
293+
reinterpret_cast<void*>(this)};
233294
if (this->try_reserve_shmem(kernel, shmem_bytes)) {
234295
CUCO_CUDA_TRY(
235296
cudaOccupancyMaxPotentialBlockSize(&grid_size, &block_size, kernel, shmem_bytes));
236297

237298
CUCO_CUDA_TRY(
238299
cudaLaunchKernel(kernel, grid_size, block_size, kernel_args, shmem_bytes, stream.get()));
239300
} else {
240-
// Computes sketch directly in global memory. (Fallback path in case there is not enough
241-
// shared memory avalable)
242301
kernel = reinterpret_cast<void const*>(
243-
cuco::hyperloglog_ns::detail::add_gmem<InputIt, hyperloglog_impl>);
302+
cuco::hyperloglog_ns::detail::
303+
add_if_gmem<InputIt, StencilIt, Predicate, hyperloglog_impl>);
244304

245305
CUCO_CUDA_TRY(cudaOccupancyMaxPotentialBlockSize(&grid_size, &block_size, kernel, 0));
246306

@@ -250,31 +310,6 @@ class hyperloglog_impl {
250310
}
251311
}
252312

253-
/**
254-
* @brief Adds to be counted items to the estimator.
255-
*
256-
* @note This function synchronizes the given stream. For asynchronous execution use
257-
* `add_async`.
258-
*
259-
* @tparam InputIt Device accessible random access input iterator where
260-
* <tt>std::is_convertible<std::iterator_traits<InputIt>::value_type,
261-
* T></tt> is `true`
262-
*
263-
* @param first Beginning of the sequence of items
264-
* @param last End of the sequence of items
265-
* @param stream CUDA stream this operation is executed in
266-
*/
267-
template <class InputIt>
268-
__host__ constexpr void add(InputIt first, InputIt last, cuda::stream_ref stream)
269-
{
270-
this->add_async(first, last, stream);
271-
#if CCCL_MAJOR_VERSION > 3 || (CCCL_MAJOR_VERSION == 3 && CCCL_MINOR_VERSION >= 1)
272-
stream.sync();
273-
#else
274-
stream.wait();
275-
#endif
276-
}
277-
278313
/**
279314
* @brief Merges the result of `other` estimator reference into `*this` estimator reference.
280315
*

include/cuco/detail/hyperloglog/hyperloglog_ref.inl

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2024, NVIDIA CORPORATION.
2+
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -67,6 +67,14 @@ __host__ constexpr void hyperloglog_ref<T, Scope, Hash>::add(InputIt first,
6767
impl_.add(first, last, stream);
6868
}
6969

70+
template <class T, cuda::thread_scope Scope, class Hash>
71+
template <class InputIt, class StencilIt, class Predicate>
72+
__host__ constexpr void hyperloglog_ref<T, Scope, Hash>::add_if_async(
73+
InputIt first, InputIt last, StencilIt stencil, Predicate pred, cuda::stream_ref stream)
74+
{
75+
impl_.add_if_async(first, last, stencil, pred, stream);
76+
}
77+
7078
template <class T, cuda::thread_scope Scope, class Hash>
7179
template <class CG, cuda::thread_scope OtherScope>
7280
__device__ constexpr void hyperloglog_ref<T, Scope, Hash>::merge(

include/cuco/detail/hyperloglog/kernels.cuh

Lines changed: 52 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2024, NVIDIA CORPORATION.
2+
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -36,17 +36,55 @@ CUCO_KERNEL void clear(RefType ref)
3636
if (block.group_index().x == 0) { ref.clear(block); }
3737
}
3838

39-
template <int32_t VectorSize, class RefType>
40-
CUCO_KERNEL void add_shmem_vectorized(typename RefType::value_type const* first,
41-
cuco::detail::index_type n,
42-
RefType ref)
39+
template <class InputIt, class StencilIt, class Predicate, class RefType>
40+
CUCO_KERNEL void add_if_gmem(
41+
InputIt first, cuco::detail::index_type n, StencilIt stencil, Predicate pred, RefType ref)
42+
{
43+
auto const loop_stride = cuco::detail::grid_stride();
44+
auto idx = cuco::detail::global_thread_id();
45+
46+
while (idx < n) {
47+
if (pred(*(stencil + idx))) { ref.add(*(first + idx)); }
48+
idx += loop_stride;
49+
}
50+
}
51+
52+
template <class InputIt, class StencilIt, class Predicate, class RefType>
53+
CUCO_KERNEL void add_if_shmem(
54+
InputIt first, cuco::detail::index_type n, StencilIt stencil, Predicate pred, RefType ref)
55+
{
56+
using local_ref_type = typename RefType::template with_scope<cuda::thread_scope_block>;
57+
58+
extern __shared__ cuda::std::byte local_sketch[];
59+
60+
auto const loop_stride = cuco::detail::grid_stride();
61+
auto idx = cuco::detail::global_thread_id();
62+
auto const block = cooperative_groups::this_thread_block();
63+
64+
local_ref_type local_ref(cuda::std::span{local_sketch, ref.sketch_bytes()}, {});
65+
local_ref.clear(block);
66+
block.sync();
67+
68+
while (idx < n) {
69+
if (pred(*(stencil + idx))) { local_ref.add(*(first + idx)); }
70+
idx += loop_stride;
71+
}
72+
block.sync();
73+
74+
ref.merge(block, local_ref);
75+
}
76+
77+
template <int32_t VectorSize, class StencilIt, class Predicate, class RefType>
78+
CUCO_KERNEL void add_if_shmem_vectorized(typename RefType::value_type const* first,
79+
cuco::detail::index_type n,
80+
StencilIt stencil,
81+
Predicate pred,
82+
RefType ref)
4383
{
4484
using value_type = typename RefType::value_type;
4585
using vector_type = cuda::std::array<value_type, VectorSize>;
4686
using local_ref_type = typename RefType::template with_scope<cuda::thread_scope_block>;
4787

48-
// Base address of dynamic shared memory is guaranteed to be aligned to at least 16 bytes which is
49-
// sufficient for this purpose
5088
extern __shared__ cuda::std::byte local_sketch[];
5189

5290
auto const loop_stride = cuco::detail::grid_stride();
@@ -58,29 +96,30 @@ CUCO_KERNEL void add_shmem_vectorized(typename RefType::value_type const* first,
5896
local_ref.clear(block);
5997
block.sync();
6098

61-
// each thread processes VectorSize-many items per iteration
6299
vector_type vec;
63100
while (idx < n / VectorSize) {
64101
vec = *reinterpret_cast<vector_type*>(
65102
__builtin_assume_aligned(first + idx * VectorSize, sizeof(vector_type)));
66-
for (auto const& i : vec) {
67-
local_ref.add(i);
103+
for (auto i = 0; i < VectorSize; ++i) {
104+
if (pred(*(stencil + idx * VectorSize + i))) { local_ref.add(vec[i]); }
68105
}
69106
idx += loop_stride;
70107
}
71-
// a single thread processes the remaining items
108+
72109
#if defined(CUCO_HAS_CG_INVOKE_ONE)
73110
cooperative_groups::invoke_one(grid, [&]() {
74111
auto const remainder = n % VectorSize;
75112
cuda::static_for<VectorSize>([&] __device__(auto i) {
76-
if (i() < remainder) { local_ref.add(*(first + n - i() - 1)); }
113+
auto const item_idx = n - i() - 1;
114+
if (i() < remainder && pred(*(stencil + item_idx))) { local_ref.add(*(first + item_idx)); }
77115
});
78116
});
79117
#else
80118
if (grid.thread_rank() == 0) {
81119
auto const remainder = n % VectorSize;
82120
cuda::static_for<VectorSize>([&] __device__(auto i) {
83-
if (i() < remainder) { local_ref.add(*(first + n - i() - 1)); }
121+
auto const item_idx = n - i() - 1;
122+
if (i() < remainder && pred(*(stencil + item_idx))) { local_ref.add(*(first + item_idx)); }
84123
});
85124
}
86125
#endif
@@ -89,43 +128,6 @@ CUCO_KERNEL void add_shmem_vectorized(typename RefType::value_type const* first,
89128
ref.merge(block, local_ref);
90129
}
91130

92-
template <class InputIt, class RefType>
93-
CUCO_KERNEL void add_shmem(InputIt first, cuco::detail::index_type n, RefType ref)
94-
{
95-
using local_ref_type = typename RefType::template with_scope<cuda::thread_scope_block>;
96-
97-
// TODO assert alignment
98-
extern __shared__ cuda::std::byte local_sketch[];
99-
100-
auto const loop_stride = cuco::detail::grid_stride();
101-
auto idx = cuco::detail::global_thread_id();
102-
auto const block = cooperative_groups::this_thread_block();
103-
104-
local_ref_type local_ref(cuda::std::span{local_sketch, ref.sketch_bytes()}, {});
105-
local_ref.clear(block);
106-
block.sync();
107-
108-
while (idx < n) {
109-
local_ref.add(*(first + idx));
110-
idx += loop_stride;
111-
}
112-
block.sync();
113-
114-
ref.merge(block, local_ref);
115-
}
116-
117-
template <class InputIt, class RefType>
118-
CUCO_KERNEL void add_gmem(InputIt first, cuco::detail::index_type n, RefType ref)
119-
{
120-
auto const loop_stride = cuco::detail::grid_stride();
121-
auto idx = cuco::detail::global_thread_id();
122-
123-
while (idx < n) {
124-
ref.add(*(first + idx));
125-
idx += loop_stride;
126-
}
127-
}
128-
129131
template <class OtherRefType, class RefType>
130132
CUCO_KERNEL void merge(OtherRefType other_ref, RefType ref)
131133
{

0 commit comments

Comments
 (0)