Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,7 @@ add_library(
src/sort/stable_sort.cu
src/sort/top_k.cu
src/stream_compaction/apply_boolean_mask.cu
src/stream_compaction/approx_distinct_count.cu
src/stream_compaction/distinct.cu
src/stream_compaction/distinct_count.cu
src/stream_compaction/distinct_helpers.cu
Expand Down
22 changes: 21 additions & 1 deletion cpp/include/cudf/detail/stream_compaction.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2019-2024, NVIDIA CORPORATION.
* SPDX-FileCopyrightText: Copyright (c) 2019-2025, NVIDIA CORPORATION.
* SPDX-License-Identifier: Apache-2.0
*/

Expand Down Expand Up @@ -118,5 +118,25 @@ cudf::size_type distinct_count(table_view const& input,
null_equality nulls_equal,
rmm::cuda_stream_view stream);

/**
* @copydoc cudf::approx_distinct_count(column_view const&, null_policy, nan_policy, int,
* null_equality, nan_equality, rmm::cuda_stream_view)
*/
cudf::size_type approx_distinct_count(column_view const& input,
int precision,
null_policy null_handling,
nan_policy nan_handling,
rmm::cuda_stream_view stream);

/**
* @copydoc cudf::approx_distinct_count(table_view const&, int, null_policy, nan_policy,
* rmm::cuda_stream_view)
*/
cudf::size_type approx_distinct_count(table_view const& input,
int precision,
null_policy null_handling,
nan_policy nan_handling,
rmm::cuda_stream_view stream);

} // namespace detail
} // namespace CUDF_EXPORT cudf
43 changes: 43 additions & 0 deletions cpp/include/cudf/stream_compaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,49 @@ cudf::size_type distinct_count(table_view const& input,
null_equality nulls_equal = null_equality::EQUAL,
rmm::cuda_stream_view stream = cudf::get_default_stream());

/**
* @brief Approximate count of distinct elements in the column_view using HyperLogLog.
*
* Uses the HyperLogLog++ algorithm to provide a fast approximation of the number of distinct
* elements in a column. All NaN values are treated as equal, and all null values are treated as
* equal.
*
* @param input The column_view whose distinct elements will be approximately counted
* @param precision The precision parameter for HyperLogLog (4-18). Higher precision gives
* better accuracy but uses more memory. Default is 12.
* @param null_handling `INCLUDE` or `EXCLUDE` null values (default: `EXCLUDE`)
* @param nan_handling `NAN_IS_VALID` or `NAN_IS_NULL` (default: `NAN_IS_NULL`)
* @param stream CUDA stream used for device memory operations and kernel launches
*
* @return Approximate number of distinct elements in the column
*/
cudf::size_type approx_distinct_count(column_view const& input,
int precision = 12,
null_policy null_handling = null_policy::EXCLUDE,
nan_policy nan_handling = nan_policy::NAN_IS_NULL,
rmm::cuda_stream_view stream = cudf::get_default_stream());

/**
* @brief Approximate count of distinct rows in a table using HyperLogLog.
*
* Uses the HyperLogLog++ algorithm to provide a fast approximation of the number of distinct
* rows in a table. All NaN values are treated as equal, and all null values are treated as equal.
*
* @param input Table whose distinct rows will be approximately counted
* @param precision The precision parameter for HyperLogLog (4-18). Higher precision gives
* better accuracy but uses more memory. Default is 12.
* @param null_handling `INCLUDE` or `EXCLUDE` rows with nulls (default: `EXCLUDE`)
* @param nan_handling `NAN_IS_VALID` or `NAN_IS_NULL` (default: `NAN_IS_NULL`)
* @param stream CUDA stream used for device memory operations and kernel launches
*
* @return Approximate number of distinct rows in the table
*/
cudf::size_type approx_distinct_count(table_view const& input,
int precision = 12,
null_policy null_handling = null_policy::EXCLUDE,
nan_policy nan_handling = nan_policy::NAN_IS_NULL,
rmm::cuda_stream_view stream = cudf::get_default_stream());

/**
* @brief Creates a new column by applying a filter function against every
* element of the input columns.
Expand Down
154 changes: 154 additions & 0 deletions cpp/src/stream_compaction/approx_distinct_count.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION.
* SPDX-License-Identifier: Apache-2.0
*/

#include "stream_compaction_common.cuh"

#include <cudf/column/column_device_view.cuh>
#include <cudf/column/column_view.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/row_operator/hashing.cuh>
#include <cudf/detail/stream_compaction.hpp>
#include <cudf/stream_compaction.hpp>
#include <cudf/table/table_view.hpp>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/type_checks.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>
#include <rmm/mr/polymorphic_allocator.hpp>

#include <cuco/hyperloglog.cuh>
#include <thrust/copy.h>
#include <thrust/execution_policy.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/transform.h>

#include <algorithm>

namespace cudf {
namespace detail {

// Internal implementation function
cudf::size_type approx_distinct_count_impl(table_view const& input,
int precision,
null_policy null_handling,
nan_policy nan_handling,
rmm::cuda_stream_view stream)
{
auto const num_rows = input.num_rows();
if (num_rows == 0) { return 0; }

// Clamp precision to valid range for HyperLogLog
precision = std::max(4, std::min(18, precision));

auto const has_nulls = nullate::DYNAMIC{cudf::has_nested_nulls(input)};
auto const preprocessed_input =
cudf::detail::row::hash::preprocessed_table::create(input, stream);
auto const row_hasher = cudf::detail::row::hash::row_hasher(preprocessed_input);
auto const hash_key = row_hasher.device_hasher(has_nulls);

auto hll = cuco::hyperloglog<cudf::hash_value_type,
cuda::thread_scope_device,
cuco::xxhash_64<cudf::hash_value_type>,
rmm::mr::polymorphic_allocator<cuda::std::byte>>{
cuco::sketch_size_kb{static_cast<double>(4 * (1ull << precision) / 1024.0)},
cuco::xxhash_64<cudf::hash_value_type>{},
rmm::mr::polymorphic_allocator<cuda::std::byte>{},
cuda::stream_ref{stream.value()}};

auto const iter = thrust::counting_iterator<cudf::size_type>(0);

rmm::device_uvector<cudf::hash_value_type> hash_values(num_rows, stream);
thrust::transform(
rmm::exec_policy_nosync(stream), iter, iter + num_rows, hash_values.begin(), hash_key);

// Create a temporary table for distinct processing if needed
if (nan_handling == nan_policy::NAN_IS_NULL || null_handling == null_policy::EXCLUDE) {
if (num_rows < 10000) {
if (input.num_columns() == 1) {
return cudf::distinct_count(input.column(0), null_handling, nan_handling);
} else {
return cudf::distinct_count(input, cudf::null_equality::EQUAL);
}
}
}

if (null_handling == null_policy::EXCLUDE && has_nulls) {
auto const [row_bitmask, null_count] =
cudf::detail::bitmask_or(input, stream, cudf::get_current_device_resource_ref());

if (null_count > 0) {
row_validity pred{static_cast<bitmask_type const*>(row_bitmask.data())};
auto counting_iter = thrust::counting_iterator<size_type>(0);

rmm::device_uvector<cudf::hash_value_type> filtered_hashes(num_rows - null_count, stream);
auto end_iter = thrust::copy_if(rmm::exec_policy(stream),
hash_values.begin(),
hash_values.end(),
counting_iter,
filtered_hashes.begin(),
pred);

auto actual_count = std::distance(filtered_hashes.begin(), end_iter);
if (actual_count > 0) {
hll.add(filtered_hashes.begin(),
filtered_hashes.begin() + actual_count,
cuda::stream_ref{stream.value()});
}
return static_cast<cudf::size_type>(hll.estimate(cuda::stream_ref{stream.value()}));
}
}

hll.add(hash_values.begin(), hash_values.end(), cuda::stream_ref{stream.value()});
return static_cast<cudf::size_type>(hll.estimate(cuda::stream_ref{stream.value()}));
}

cudf::size_type approx_distinct_count(table_view const& input,
int precision,
null_policy null_handling,
nan_policy nan_handling,
rmm::cuda_stream_view stream)
{
return approx_distinct_count_impl(input, precision, null_handling, nan_handling, stream);
}

cudf::size_type approx_distinct_count(column_view const& input,
int precision,
null_policy null_handling,
nan_policy nan_handling,
rmm::cuda_stream_view stream)
{
// Convert column to single-column table and use unified implementation
cudf::table_view single_col_table({input});
return approx_distinct_count_impl(
single_col_table, precision, null_handling, nan_handling, stream);
}

} // namespace detail

// Public API implementations
cudf::size_type approx_distinct_count(column_view const& input,
int precision,
null_policy null_handling,
nan_policy nan_handling,
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
return detail::approx_distinct_count(input, precision, null_handling, nan_handling, stream);
}

cudf::size_type approx_distinct_count(table_view const& input,
int precision,
null_policy null_handling,
nan_policy nan_handling,
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
return detail::approx_distinct_count(input, precision, null_handling, nan_handling, stream);
}

} // namespace cudf
1 change: 1 addition & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ ConfigureTest(
ConfigureTest(
STREAM_COMPACTION_TEST
stream_compaction/apply_boolean_mask_tests.cpp
stream_compaction/approx_distinct_count_tests.cpp
stream_compaction/distinct_count_tests.cpp
stream_compaction/distinct_tests.cpp
stream_compaction/drop_nans_tests.cpp
Expand Down
Loading