Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/groupby/hash/compute_global_memory_aggs.cu
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ compute_global_memory_aggs<global_set_t>(bitmask_type const* row_bitmask,
global_set_t const& key_set,
host_span<aggregation::Kind const> h_agg_kinds,
device_span<aggregation::Kind const> d_agg_kinds,
host_span<int const> force_non_nullable,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

Expand Down
34 changes: 27 additions & 7 deletions cpp/src/groupby/hash/compute_global_memory_aggs.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ std::pair<std::unique_ptr<table>, rmm::device_uvector<size_type>> compute_aggs_d
SetType const& key_set,
host_span<aggregation::Kind const> h_agg_kinds,
device_span<aggregation::Kind const> d_agg_kinds,
host_span<int const> force_non_nullable,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
Expand All @@ -88,8 +89,12 @@ std::pair<std::unique_ptr<table>, rmm::device_uvector<size_type>> compute_aggs_d
}();

auto const d_values = table_device_view::create(values, stream);
auto agg_results = create_results_table(
static_cast<size_type>(unique_keys.size()), values, h_agg_kinds, stream, mr);
auto agg_results = create_results_table(static_cast<size_type>(unique_keys.size()),
values,
h_agg_kinds,
force_non_nullable,
stream,
mr);
auto d_results_ptr = mutable_table_device_view::create(*agg_results, stream);

thrust::for_each_n(rmm::exec_policy_nosync(stream),
Expand All @@ -116,12 +121,14 @@ std::pair<std::unique_ptr<table>, rmm::device_uvector<size_type>> compute_aggs_s
SetType const& key_set,
host_span<aggregation::Kind const> h_agg_kinds,
device_span<aggregation::Kind const> d_agg_kinds,
host_span<int const> force_non_nullable,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
auto const num_rows = values.num_rows();
auto const d_values = table_device_view::create(values, stream);
auto agg_results = create_results_table(num_rows, values, h_agg_kinds, stream, mr);
auto agg_results =
create_results_table(num_rows, values, h_agg_kinds, force_non_nullable, stream, mr);
auto d_results_ptr = mutable_table_device_view::create(*agg_results, stream);

thrust::for_each_n(
Expand Down Expand Up @@ -153,14 +160,27 @@ std::pair<std::unique_ptr<table>, rmm::device_uvector<size_type>> compute_global
SetType const& key_set,
host_span<aggregation::Kind const> h_agg_kinds,
device_span<aggregation::Kind const> d_agg_kinds,
host_span<int const> force_non_nullable,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
return h_agg_kinds.size() > GROUPBY_DENSE_OUTPUT_THRESHOLD
? compute_aggs_dense_output(
row_bitmask, values, key_set, h_agg_kinds, d_agg_kinds, stream, mr)
: compute_aggs_sparse_output_gather(
row_bitmask, values, key_set, h_agg_kinds, d_agg_kinds, stream, mr);
? compute_aggs_dense_output(row_bitmask,
values,
key_set,
h_agg_kinds,
d_agg_kinds,
force_non_nullable,
stream,
mr)
: compute_aggs_sparse_output_gather(row_bitmask,
values,
key_set,
h_agg_kinds,
d_agg_kinds,
force_non_nullable,
stream,
mr);
}

} // namespace cudf::groupby::detail::hash
1 change: 1 addition & 0 deletions cpp/src/groupby/hash/compute_global_memory_aggs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ std::pair<std::unique_ptr<table>, rmm::device_uvector<size_type>> compute_global
SetType const& key_set,
host_span<aggregation::Kind const> h_agg_kinds,
device_span<aggregation::Kind const> d_agg_kinds,
host_span<int const> force_non_nullable,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
} // namespace cudf::groupby::detail::hash
1 change: 1 addition & 0 deletions cpp/src/groupby/hash/compute_global_memory_aggs_null.cu
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ compute_global_memory_aggs<nullable_global_set_t>(bitmask_type const* row_bitmas
nullable_global_set_t const& key_set,
host_span<aggregation::Kind const> h_agg_kinds,
device_span<aggregation::Kind const> d_agg_kinds,
host_span<int const> force_non_nullable,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

Expand Down
8 changes: 4 additions & 4 deletions cpp/src/groupby/hash/compute_single_pass_aggs.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ std::pair<rmm::device_uvector<size_type>, bool> compute_single_pass_aggs(
{
// Collect the single-pass aggregations that can be processed in this function.
// The compound aggregations that require multiple passes will be handled separately later on.
auto const [values, agg_kinds, aggs, has_compound_aggs] =
auto const [values, agg_kinds, aggs, force_non_nullable, has_compound_aggs] =
extract_single_pass_aggs(requests, stream);
auto const d_agg_kinds = cudf::detail::make_device_uvector_async(
agg_kinds, stream, cudf::get_current_device_resource_ref());
Expand All @@ -50,7 +50,7 @@ std::pair<rmm::device_uvector<size_type>, bool> compute_single_pass_aggs(
// present.
auto const run_aggs_by_global_mem_kernel = [&] {
auto [agg_results, unique_key_indices] = compute_global_memory_aggs(
row_bitmask, values, global_set, agg_kinds, d_agg_kinds, stream, mr);
row_bitmask, values, global_set, agg_kinds, d_agg_kinds, force_non_nullable, stream, mr);
finalize_output(values, aggs, agg_results, cache, stream);
return std::pair{std::move(unique_key_indices), has_compound_aggs};
};
Expand Down Expand Up @@ -210,8 +210,8 @@ std::pair<rmm::device_uvector<size_type>, bool> compute_single_pass_aggs(
key_transform_map = rmm::device_uvector<size_type>{0, stream}; // done, free up memory early

auto const d_spass_values = table_device_view::create(values, stream);
auto agg_results =
create_results_table(static_cast<size_type>(unique_keys.size()), values, agg_kinds, stream, mr);
auto agg_results = create_results_table(
static_cast<size_type>(unique_keys.size()), values, agg_kinds, force_non_nullable, stream, mr);
auto d_results_ptr = mutable_table_device_view::create(*agg_results, stream);

compute_shared_memory_aggs(grid_size,
Expand Down
45 changes: 28 additions & 17 deletions cpp/src/groupby/hash/extract_single_pass_aggs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,47 +92,54 @@ class groupby_simple_aggregations_collector final

return aggs;
}

std::vector<std::unique_ptr<aggregation>> visit(
data_type, cudf::detail::correlation_aggregation const&) override
{
std::vector<std::unique_ptr<aggregation>> aggs;
aggs.push_back(make_sum_aggregation());
// COUNT_VALID
aggs.push_back(make_count_aggregation());

return aggs;
}
Comment on lines -96 to -105
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused code: correlation aggregation is not yet supported in hash-based pipeline.

};

std::tuple<table_view,
cudf::detail::host_vector<aggregation::Kind>,
std::vector<std::unique_ptr<aggregation>>,
std::vector<int>,
bool>
extract_single_pass_aggs(host_span<aggregation_request const> requests,
rmm::cuda_stream_view stream)
{
std::vector<column_view> columns;
std::vector<std::unique_ptr<aggregation>> aggs;
std::vector<int> force_non_nullable;
auto agg_kinds = cudf::detail::make_empty_host_vector<aggregation::Kind>(requests.size(), stream);

bool has_compound_aggs = false;
for (auto const& request : requests) {
auto const& agg_v = request.aggregations;
auto const& input_aggs = request.aggregations;

// The set of input aggregations.
std::unordered_set<aggregation::Kind> input_agg_kinds_set;
for (auto const& agg : input_aggs) {
input_agg_kinds_set.insert(agg->kind);
}

// The aggregations extracted from the input request, including the original single-pass
// aggregations and the intermediate single-pass aggregations replacing compound
// aggregations.
std::unordered_set<aggregation::Kind> agg_kinds_set;

auto insert_agg = [&](column_view const& request_values, std::unique_ptr<aggregation>&& agg) {
if (agg_kinds_set.insert(agg->kind).second) {
// Check if the inserted aggregation is an input aggregation or a replacement aggregation.
// If it is not an input aggregation, we can force its output to be non-nullable
// (by storing `1` value in the `force_non_nullable` vector).
auto const is_input_agg = input_agg_kinds_set.contains(agg->kind);
force_non_nullable.push_back(is_input_agg ? 0 : 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
force_non_nullable.push_back(is_input_agg ? 0 : 1);
force_non_nullable.push_back(not is_input_agg);

Using int (4-bytes) for this vector seems like a waste. Perhaps using int8 instead. Using bool will not work with spans unfortunately.
I also recommend using std::span instead of cudf::host_span if possible.
We can revisit the other host_span usages here in a later PR. Reference: #20539

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the code to use int8_t.
I also tried using cuda::std::span but couldn't compile due to:

error: no suitable user-defined conversion from "std::tuple_element<1UL, const std::tuple<cudf::table_view, cudf::detail::host_vector<cudf::aggregation::Kind>, std::vector<std::unique_ptr<cudf::aggregation, std::default_delete<cudf::aggregation>>, std::allocator<std::unique_ptr<cudf::aggregation, std::default_delete<cudf::aggregation>>>>, std::vector<int8_t, std::allocator<int8_t>>, bool>>::type" 
(aka "const std::__tuple_element_t<1UL, std::tuple<cudf::table_view, cudf::detail::host_vector<cudf::aggregation::Kind>, std::vector<std::unique_ptr<cudf::aggregation, std::default_delete<cudf::aggregation>>, std::allocator<std::unique_ptr<cudf::aggregation, std::default_delete<cudf::aggregation>>>>, std::vector<signed char, std::allocator<signed char>>, bool>>") 
to "cuda::std::__4::span<const cudf::aggregation::Kind, 18446744073709551615UL>" exists

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is host-only, the std::span should be enough. No need for cuda::std::span.
I'm not sure if that will help this though. I can look at this more closely next week.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I found that the issue is due to implicit converting cudf::detail::host_vector to (cuda::)std::span. The implicit conversion is not implemented yet, so we should do that soon.
When trying to use std::span in place of host_span, the modification chains down to nearly dozen of unrelated files thus I would rather have std::span adoption implemented in other separate PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I tried std::span locally and only had to change 6 files in this PR.

Copy link
Contributor

@davidwendt davidwendt Nov 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The host_vector should not come into play. Are you trying to change all of the host_span usages?
I would recommend to only change the host_span usage for this new int8 variable and we'll change any of the other host_span usages in a follow up PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I tried to change host_span for this new variable and another one, since they are passing to the same function:

std::unique_ptr<table> create_results_table(size_type output_size,
                                            table_view const& values,
                                            host_span<aggregation::Kind const> agg_kinds,
                                            host_span<int8_t const> force_non_nullable,

so it makes more sense to change both together.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand. But it would require too many changes.
Whereas changing just the one for this PR will be enough for it to pass.
Please just change the one variable. We can change the other one(s) in a later PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. std::span looks great now. Excited waiting for the full adoption 👍


agg_kinds.push_back(agg->kind);
aggs.push_back(std::move(agg));
columns.push_back(request_values);
}
};

auto values_type = cudf::is_dictionary(request.values.type())
? cudf::dictionary_column_view(request.values).keys().type()
: request.values.type();
for (auto const& agg : agg_v) {
auto const values_type = cudf::is_dictionary(request.values.type())
? cudf::dictionary_column_view(request.values).keys().type()
: request.values.type();
for (auto const& agg : input_aggs) {
groupby_simple_aggregations_collector collector;
auto spass_aggs = agg->get_simple_aggregations(values_type, collector);
if (spass_aggs.size() > 1 || !spass_aggs.front()->is_equal(*agg)) {
Expand All @@ -145,7 +152,11 @@ extract_single_pass_aggs(host_span<aggregation_request const> requests,
}
}

return {table_view(columns), std::move(agg_kinds), std::move(aggs), has_compound_aggs};
return {table_view(columns),
std::move(agg_kinds),
std::move(aggs),
std::move(force_non_nullable),
has_compound_aggs};
}

std::vector<aggregation::Kind> get_simple_aggregations(groupby_aggregation const& agg,
Expand Down
16 changes: 13 additions & 3 deletions cpp/src/groupby/hash/extract_single_pass_aggs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,32 @@
namespace cudf::groupby::detail::hash {

/**
* @brief Extract all compound aggregations into single pass aggs
* @brief Extract single pass aggregations from the given aggregation requests.
*
* For example, a MEAN aggregation will be flattened into a SUM and a COUNT_VALID aggregation.
* During extraction, compound aggregations will be replaced by their corresponding single pass
* aggregations dependencies. For example, a MEAN aggregation will be replaced by a SUM and a
* COUNT_VALID aggregation.
*
* For some single-pass aggregations, we also try to reduce overhead by forcing their results
* columns to be non-nullable. For example, a SUM aggregation needed only as the intermediate result
* for M2 aggregation will not need to have a nullmask to avoid the extra nullmask update and null
* count computation overhead.
*
* @param requests The aggregation requests
* @param stream The CUDA stream
*
* @return A tuple containing:
* - A table_view containing the input values columns for the single-pass aggregations,
* - A vector of aggregation kinds corresponding to each of these values columns,
* - A vector of aggregation objects corresponding to each of these values columns, and
* - A vector of aggregation objects corresponding to each of these values columns,
* - A vector of binary values indicating if the corresponding result will be forced to be
* non-nullable, and
* - A boolean value indicating if there are any multi-pass (compound) aggregations.
*/
std::tuple<table_view,
cudf::detail::host_vector<aggregation::Kind>,
std::vector<std::unique_ptr<aggregation>>,
std::vector<int>,
bool>
extract_single_pass_aggs(host_span<aggregation_request const> requests,
rmm::cuda_stream_view stream);
Expand Down
57 changes: 40 additions & 17 deletions cpp/src/groupby/hash/hash_compound_agg_finalizer.cu
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include <cudf/detail/aggregation/result_cache.hpp>
#include <cudf/detail/binaryop.hpp>
#include <cudf/detail/gather.hpp>
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/valid_if.cuh>
#include <cudf/dictionary/dictionary_column_view.hpp>
#include <cudf/types.hpp>

Expand Down Expand Up @@ -81,31 +83,50 @@ void hash_compound_agg_finalizer::visit(cudf::detail::mean_aggregation const& ag
{
if (cache->has_result(col, agg)) { return; }

auto const sum_agg = make_sum_aggregation();
auto const count_agg = make_count_aggregation();
auto const sum_result = cache->get_result(col, *sum_agg);
auto const count_result = cache->get_result(col, *count_agg);

auto const sum_agg = make_sum_aggregation();
auto const count_agg = make_count_aggregation();
auto const sum_result = cache->get_result(col, *sum_agg);
auto const count_result = cache->get_result(col, *count_agg);
auto const null_removed_sum = [&] {
if (sum_result.null_count() == 0) { return sum_result; }
return column_view{
sum_result.type(), sum_result.size(), sum_result.head(), nullptr, 0, sum_result.offset()};
}();

// Perform division without any null masks, and generate the null mask for the result later.
// This is because the null mask (if exists) is just needed to be copied from the sum result,
// and copying is faster than running the `bitmask_and` kernel.
auto result =
cudf::detail::binary_operation(sum_result,
cudf::detail::binary_operation(null_removed_sum,
count_result,
binary_operator::DIV,
cudf::detail::target_type(input_type, aggregation::MEAN),
stream,
mr);
// SUM result only has nulls if it is an input aggregation, not intermediate-only aggregation.
if (sum_result.has_nulls()) {
result->set_null_mask(cudf::detail::copy_bitmask(sum_result, stream, mr),
sum_result.null_count());
} else if (col.has_nulls()) { // SUM aggregation is only intermediate result, thus it is forced
// to be non-nullable
auto [null_mask, null_count] = cudf::detail::valid_if(
count_result.begin<size_type>(),
count_result.end<size_type>(),
[] __device__(size_type const count) -> bool { return count > 0; },
stream,
mr);
if (null_count > 0) { result->set_null_mask(std::move(null_mask), null_count); }
}
cache->add_result(col, agg, std::move(result));
}

void hash_compound_agg_finalizer::visit(cudf::detail::m2_aggregation const& agg)
{
if (cache->has_result(col, agg)) { return; }

auto const sum_sqr_agg = make_sum_of_squares_aggregation();
auto const sum_agg = make_sum_aggregation();
auto const count_agg = make_count_aggregation();
this->visit(*sum_sqr_agg);
this->visit(*sum_agg);
this->visit(*count_agg);
auto const sum_sqr_agg = make_sum_of_squares_aggregation();
auto const sum_agg = make_sum_aggregation();
auto const count_agg = make_count_aggregation();
auto const sum_sqr_result = cache->get_result(col, *sum_sqr_agg);
auto const sum_result = cache->get_result(col, *sum_agg);
auto const count_result = cache->get_result(col, *count_agg);
Expand All @@ -118,10 +139,11 @@ void hash_compound_agg_finalizer::visit(cudf::detail::var_aggregation const& agg
{
if (cache->has_result(col, agg)) { return; }

auto const m2_agg = make_m2_aggregation();
auto const count_agg = make_count_aggregation();
auto const m2_agg = make_m2_aggregation();
// Since M2 is a compound aggregation, we need to "finalize" it using aggregation finalizer's
// "visit" method.
this->visit(*dynamic_cast<cudf::detail::m2_aggregation*>(m2_agg.get()));
this->visit(*count_agg);
auto const count_agg = make_count_aggregation();
auto const m2_result = cache->get_result(col, *m2_agg);
auto const count_result = cache->get_result(col, *count_agg);

Expand All @@ -134,9 +156,10 @@ void hash_compound_agg_finalizer::visit(cudf::detail::std_aggregation const& agg
if (cache->has_result(col, agg)) { return; }

auto const m2_agg = make_m2_aggregation();
auto const count_agg = make_count_aggregation();
// Since M2 is a compound aggregation, we need to "finalize" it using aggregation finalizer's
// "visit" method.
this->visit(*dynamic_cast<cudf::detail::m2_aggregation*>(m2_agg.get()));
this->visit(*count_agg);
auto const count_agg = make_count_aggregation();
auto const m2_result = cache->get_result(col, *m2_agg);
auto const count_result = cache->get_result(col, *count_agg);

Expand Down
26 changes: 18 additions & 8 deletions cpp/src/groupby/hash/output_utils.cu
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,15 @@ struct result_column_creator {
{
}

std::unique_ptr<column> operator()(column_view const& col, aggregation::Kind const& agg) const
std::unique_ptr<column> operator()(column_view const& col,
aggregation::Kind const& agg,
bool force_non_nullable) const
{
auto const col_type =
is_dictionary(col.type()) ? dictionary_column_view(col).keys().type() : col.type();
auto const nullable =
agg != aggregation::COUNT_VALID && agg != aggregation::COUNT_ALL && col.has_nulls();
auto const nullable = force_non_nullable ? false
: agg != aggregation::COUNT_VALID &&
agg != aggregation::COUNT_ALL && col.has_nulls();
// TODO: Remove adjusted buffer size workaround once https://github.com/NVIDIA/cccl/issues/6430
// is fixed. Use adjusted buffer size for small data types to ensure atomic operation safety.
auto const make_uninitialized_column = [&](data_type d_type, size_type size, mask_state state) {
Expand Down Expand Up @@ -97,15 +100,22 @@ struct result_column_creator {
std::unique_ptr<table> create_results_table(size_type output_size,
table_view const& values,
host_span<aggregation::Kind const> agg_kinds,
host_span<int const> force_non_nullable,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_EXPECTS(values.num_columns() == static_cast<size_type>(agg_kinds.size()),
"The number of values columns and size of agg_kinds vector must be the same.");
CUDF_EXPECTS(
values.num_columns() == static_cast<size_type>(force_non_nullable.size()),
"The number of values columns and size of force_non_nullable vector must be the same.");

auto const column_creator = result_column_creator{output_size, stream, mr};
std::vector<std::unique_ptr<column>> output_cols;
std::transform(values.begin(),
values.end(),
agg_kinds.begin(),
std::back_inserter(output_cols),
result_column_creator{output_size, stream, mr});
for (size_t i = 0; i < agg_kinds.size(); i++) {
output_cols.emplace_back(
column_creator(values.column(i), agg_kinds[i], static_cast<bool>(force_non_nullable[i])));
}
auto result_table = std::make_unique<table>(std::move(output_cols));
cudf::detail::initialize_with_identity(result_table->mutable_view(), agg_kinds, stream);
return result_table;
Expand Down
Loading
Loading