Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 32 additions & 26 deletions cpp/benchmarks/groupby/group_m2_var_std.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace {
template <typename Type, cudf::aggregation::Kind Agg>
void run_benchmark(nvbench::state& state,
cudf::size_type num_rows,
cudf::size_type num_aggs,
cudf::size_type value_key_ratio,
double null_probability)
{
Expand All @@ -27,30 +28,34 @@ void run_benchmark(nvbench::state& state,
return create_random_column(cudf::type_to_id<int32_t>(), row_count{num_rows}, profile);
}();

auto const values = [&] {
auto builder = data_profile_builder().cardinality(0).distribution(
cudf::type_to_id<Type>(), distribution_id::UNIFORM, 0, num_rows);
if (null_probability > 0) {
builder.null_probability(null_probability);
auto values_builder = data_profile_builder().cardinality(0).distribution(
cudf::type_to_id<Type>(), distribution_id::UNIFORM, 0, num_rows);
if (null_probability > 0) {
values_builder.null_probability(null_probability);
} else {
values_builder.no_validity();
}

std::vector<std::unique_ptr<cudf::column>> values_cols;
std::vector<cudf::groupby::aggregation_request> requests;
values_cols.reserve(num_aggs);
requests.reserve(num_aggs);
for (cudf::size_type i = 0; i < num_aggs; i++) {
auto values = create_random_column(
cudf::type_to_id<Type>(), row_count{num_rows}, data_profile{values_builder});
auto request = cudf::groupby::aggregation_request{};
request.values = values->view();
if constexpr (Agg == cudf::aggregation::Kind::M2) {
request.aggregations.push_back(cudf::make_m2_aggregation<cudf::groupby_aggregation>());
} else if constexpr (Agg == cudf::aggregation::Kind::VARIANCE) {
request.aggregations.push_back(cudf::make_variance_aggregation<cudf::groupby_aggregation>());
} else if constexpr (Agg == cudf::aggregation::Kind::STD) {
request.aggregations.push_back(cudf::make_std_aggregation<cudf::groupby_aggregation>());
} else {
builder.no_validity();
throw std::runtime_error("Unsupported aggregation kind.");
}
return create_random_column(
cudf::type_to_id<Type>(), row_count{num_rows}, data_profile{builder});
}();

// Vector of 1 request
std::vector<cudf::groupby::aggregation_request> requests(1);
requests.back().values = values->view();
if constexpr (Agg == cudf::aggregation::Kind::M2) {
requests.back().aggregations.push_back(cudf::make_m2_aggregation<cudf::groupby_aggregation>());
} else if constexpr (Agg == cudf::aggregation::Kind::VARIANCE) {
requests.back().aggregations.push_back(
cudf::make_variance_aggregation<cudf::groupby_aggregation>());
} else if constexpr (Agg == cudf::aggregation::Kind::STD) {
requests.back().aggregations.push_back(cudf::make_std_aggregation<cudf::groupby_aggregation>());
} else {
throw std::runtime_error("Unsupported aggregation kind.");
values_cols.emplace_back(std::move(values));
requests.emplace_back(std::move(request));
}

auto const mem_stats_logger = cudf::memory_stats_logger();
Expand All @@ -75,8 +80,8 @@ void bench_groupby_m2_var_std(nvbench::state& state,
auto const value_key_ratio = static_cast<cudf::size_type>(state.get_int64("value_key_ratio"));
auto const num_rows = static_cast<cudf::size_type>(state.get_int64("num_rows"));
auto const null_probability = state.get_float64("null_probability");

run_benchmark<Type, Agg>(state, num_rows, value_key_ratio, null_probability);
auto const num_aggs = static_cast<cudf::size_type>(state.get_int64("num_aggs"));
run_benchmark<Type, Agg>(state, num_rows, num_aggs, value_key_ratio, null_probability);
}

using Types = nvbench::type_list<int32_t, double>;
Expand All @@ -87,5 +92,6 @@ using AggKinds = nvbench::enum_type_list<cudf::aggregation::Kind::M2,
NVBENCH_BENCH_TYPES(bench_groupby_m2_var_std, NVBENCH_TYPE_AXES(Types, AggKinds))
.set_name("groupby_m2_var_std")
.add_int64_axis("value_key_ratio", {20, 100})
.add_int64_axis("num_rows", {100'000, 10'000'000, 100'000'000})
.add_float64_axis("null_probability", {0, 0.5});
.add_int64_axis("num_rows", {100'000, 10'000'000})
.add_float64_axis("null_probability", {0, 0.5})
.add_int64_axis("num_aggs", {1, 10, 50, 100});
1 change: 1 addition & 0 deletions cpp/src/groupby/hash/compute_global_memory_aggs.cu
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ compute_global_memory_aggs<global_set_t>(bitmask_type const* row_bitmask,
global_set_t const& key_set,
host_span<aggregation::Kind const> h_agg_kinds,
device_span<aggregation::Kind const> d_agg_kinds,
host_span<int8_t const> force_non_nullable,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

Expand Down
38 changes: 29 additions & 9 deletions cpp/src/groupby/hash/compute_global_memory_aggs.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ std::pair<std::unique_ptr<table>, rmm::device_uvector<size_type>> compute_aggs_d
SetType const& key_set,
host_span<aggregation::Kind const> h_agg_kinds,
device_span<aggregation::Kind const> d_agg_kinds,
host_span<int8_t const> force_non_nullable,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
Expand All @@ -88,9 +89,13 @@ std::pair<std::unique_ptr<table>, rmm::device_uvector<size_type>> compute_aggs_d
}();

auto const d_values = table_device_view::create(values, stream);
auto agg_results = create_results_table(
static_cast<size_type>(unique_keys.size()), values, h_agg_kinds, stream, mr);
auto d_results_ptr = mutable_table_device_view::create(*agg_results, stream);
auto agg_results = create_results_table(static_cast<size_type>(unique_keys.size()),
values,
h_agg_kinds,
force_non_nullable,
stream,
mr);
auto d_results_ptr = mutable_table_device_view::create(*agg_results, stream);

thrust::for_each_n(rmm::exec_policy_nosync(stream),
thrust::make_counting_iterator(int64_t{0}),
Expand All @@ -116,13 +121,15 @@ std::pair<std::unique_ptr<table>, rmm::device_uvector<size_type>> compute_aggs_s
SetType const& key_set,
host_span<aggregation::Kind const> h_agg_kinds,
device_span<aggregation::Kind const> d_agg_kinds,
host_span<int8_t const> force_non_nullable,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
auto const num_rows = values.num_rows();
auto const d_values = table_device_view::create(values, stream);
auto agg_results = create_results_table(num_rows, values, h_agg_kinds, stream, mr);
auto d_results_ptr = mutable_table_device_view::create(*agg_results, stream);
auto agg_results =
create_results_table(num_rows, values, h_agg_kinds, force_non_nullable, stream, mr);
auto d_results_ptr = mutable_table_device_view::create(*agg_results, stream);

thrust::for_each_n(
rmm::exec_policy_nosync(stream),
Expand Down Expand Up @@ -153,14 +160,27 @@ std::pair<std::unique_ptr<table>, rmm::device_uvector<size_type>> compute_global
SetType const& key_set,
host_span<aggregation::Kind const> h_agg_kinds,
device_span<aggregation::Kind const> d_agg_kinds,
host_span<int8_t const> force_non_nullable,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
return h_agg_kinds.size() > GROUPBY_DENSE_OUTPUT_THRESHOLD
? compute_aggs_dense_output(
row_bitmask, values, key_set, h_agg_kinds, d_agg_kinds, stream, mr)
: compute_aggs_sparse_output_gather(
row_bitmask, values, key_set, h_agg_kinds, d_agg_kinds, stream, mr);
? compute_aggs_dense_output(row_bitmask,
values,
key_set,
h_agg_kinds,
d_agg_kinds,
force_non_nullable,
stream,
mr)
: compute_aggs_sparse_output_gather(row_bitmask,
values,
key_set,
h_agg_kinds,
d_agg_kinds,
force_non_nullable,
stream,
mr);
}

} // namespace cudf::groupby::detail::hash
1 change: 1 addition & 0 deletions cpp/src/groupby/hash/compute_global_memory_aggs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ std::pair<std::unique_ptr<table>, rmm::device_uvector<size_type>> compute_global
SetType const& key_set,
host_span<aggregation::Kind const> h_agg_kinds,
device_span<aggregation::Kind const> d_agg_kinds,
host_span<int8_t const> force_non_nullable,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);
} // namespace cudf::groupby::detail::hash
1 change: 1 addition & 0 deletions cpp/src/groupby/hash/compute_global_memory_aggs_null.cu
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ compute_global_memory_aggs<nullable_global_set_t>(bitmask_type const* row_bitmas
nullable_global_set_t const& key_set,
host_span<aggregation::Kind const> h_agg_kinds,
device_span<aggregation::Kind const> d_agg_kinds,
host_span<int8_t const> force_non_nullable,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

Expand Down
8 changes: 4 additions & 4 deletions cpp/src/groupby/hash/compute_single_pass_aggs.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ std::pair<rmm::device_uvector<size_type>, bool> compute_single_pass_aggs(
{
// Collect the single-pass aggregations that can be processed in this function.
// The compound aggregations that require multiple passes will be handled separately later on.
auto const [values, agg_kinds, aggs, has_compound_aggs] =
auto const [values, agg_kinds, aggs, force_non_nullable, has_compound_aggs] =
extract_single_pass_aggs(requests, stream);
auto const d_agg_kinds = cudf::detail::make_device_uvector_async(
agg_kinds, stream, cudf::get_current_device_resource_ref());
Expand All @@ -50,7 +50,7 @@ std::pair<rmm::device_uvector<size_type>, bool> compute_single_pass_aggs(
// present.
auto const run_aggs_by_global_mem_kernel = [&] {
auto [agg_results, unique_key_indices] = compute_global_memory_aggs(
row_bitmask, values, global_set, agg_kinds, d_agg_kinds, stream, mr);
row_bitmask, values, global_set, agg_kinds, d_agg_kinds, force_non_nullable, stream, mr);
finalize_output(values, aggs, agg_results, cache, stream);
return std::pair{std::move(unique_key_indices), has_compound_aggs};
};
Expand Down Expand Up @@ -210,8 +210,8 @@ std::pair<rmm::device_uvector<size_type>, bool> compute_single_pass_aggs(
key_transform_map = rmm::device_uvector<size_type>{0, stream}; // done, free up memory early

auto const d_spass_values = table_device_view::create(values, stream);
auto agg_results =
create_results_table(static_cast<size_type>(unique_keys.size()), values, agg_kinds, stream, mr);
auto agg_results = create_results_table(
static_cast<size_type>(unique_keys.size()), values, agg_kinds, force_non_nullable, stream, mr);
auto d_results_ptr = mutable_table_device_view::create(*agg_results, stream);

compute_shared_memory_aggs(grid_size,
Expand Down
45 changes: 28 additions & 17 deletions cpp/src/groupby/hash/extract_single_pass_aggs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,47 +92,54 @@ class groupby_simple_aggregations_collector final

return aggs;
}

std::vector<std::unique_ptr<aggregation>> visit(
data_type, cudf::detail::correlation_aggregation const&) override
{
std::vector<std::unique_ptr<aggregation>> aggs;
aggs.push_back(make_sum_aggregation());
// COUNT_VALID
aggs.push_back(make_count_aggregation());

return aggs;
}
Comment on lines -96 to -105
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused code: correlation aggregation is not yet supported in hash-based pipeline.

};

std::tuple<table_view,
cudf::detail::host_vector<aggregation::Kind>,
std::vector<std::unique_ptr<aggregation>>,
std::vector<int8_t>,
bool>
extract_single_pass_aggs(host_span<aggregation_request const> requests,
rmm::cuda_stream_view stream)
{
std::vector<column_view> columns;
std::vector<std::unique_ptr<aggregation>> aggs;
std::vector<int8_t> force_non_nullable;
auto agg_kinds = cudf::detail::make_empty_host_vector<aggregation::Kind>(requests.size(), stream);

bool has_compound_aggs = false;
for (auto const& request : requests) {
auto const& agg_v = request.aggregations;
auto const& input_aggs = request.aggregations;

// The set of input aggregations.
std::unordered_set<aggregation::Kind> input_agg_kinds_set;
for (auto const& agg : input_aggs) {
input_agg_kinds_set.insert(agg->kind);
}

// The aggregations extracted from the input request, including the original single-pass
// aggregations and the intermediate single-pass aggregations replacing compound
// aggregations.
std::unordered_set<aggregation::Kind> agg_kinds_set;

auto insert_agg = [&](column_view const& request_values, std::unique_ptr<aggregation>&& agg) {
if (agg_kinds_set.insert(agg->kind).second) {
// Check if the inserted aggregation is an input aggregation or a replacement aggregation.
// If it is not an input aggregation, we can force its output to be non-nullable
// (by storing `1` value in the `force_non_nullable` vector).
auto const is_input_agg = input_agg_kinds_set.contains(agg->kind);
force_non_nullable.push_back(is_input_agg ? 0 : 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
force_non_nullable.push_back(is_input_agg ? 0 : 1);
force_non_nullable.push_back(not is_input_agg);

Using int (4-bytes) for this vector seems like a waste. Perhaps using int8 instead. Using bool will not work with spans unfortunately.
I also recommend using std::span instead of cudf::host_span if possible.
We can revisit the other host_span usages here in a later PR. Reference: #20539

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the code to use int8_t.
I also tried using cuda::std::span but couldn't compile due to:

error: no suitable user-defined conversion from "std::tuple_element<1UL, const std::tuple<cudf::table_view, cudf::detail::host_vector<cudf::aggregation::Kind>, std::vector<std::unique_ptr<cudf::aggregation, std::default_delete<cudf::aggregation>>, std::allocator<std::unique_ptr<cudf::aggregation, std::default_delete<cudf::aggregation>>>>, std::vector<int8_t, std::allocator<int8_t>>, bool>>::type" 
(aka "const std::__tuple_element_t<1UL, std::tuple<cudf::table_view, cudf::detail::host_vector<cudf::aggregation::Kind>, std::vector<std::unique_ptr<cudf::aggregation, std::default_delete<cudf::aggregation>>, std::allocator<std::unique_ptr<cudf::aggregation, std::default_delete<cudf::aggregation>>>>, std::vector<signed char, std::allocator<signed char>>, bool>>") 
to "cuda::std::__4::span<const cudf::aggregation::Kind, 18446744073709551615UL>" exists

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is host-only, the std::span should be enough. No need for cuda::std::span.
I'm not sure if that will help this though. I can look at this more closely next week.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I found that the issue is due to implicit converting cudf::detail::host_vector to (cuda::)std::span. The implicit conversion is not implemented yet, so we should do that soon.
When trying to use std::span in place of host_span, the modification chains down to nearly dozen of unrelated files thus I would rather have std::span adoption implemented in other separate PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I tried std::span locally and only had to change 6 files in this PR.

Copy link
Contributor

@davidwendt davidwendt Nov 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The host_vector should not come into play. Are you trying to change all of the host_span usages?
I would recommend to only change the host_span usage for this new int8 variable and we'll change any of the other host_span usages in a follow up PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I tried to change host_span for this new variable and another one, since they are passing to the same function:

std::unique_ptr<table> create_results_table(size_type output_size,
                                            table_view const& values,
                                            host_span<aggregation::Kind const> agg_kinds,
                                            host_span<int8_t const> force_non_nullable,

so it makes more sense to change both together.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand. But it would require too many changes.
Whereas changing just the one for this PR will be enough for it to pass.
Please just change the one variable. We can change the other one(s) in a later PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. std::span looks great now. Excited waiting for the full adoption 👍


agg_kinds.push_back(agg->kind);
aggs.push_back(std::move(agg));
columns.push_back(request_values);
}
};

auto values_type = cudf::is_dictionary(request.values.type())
? cudf::dictionary_column_view(request.values).keys().type()
: request.values.type();
for (auto const& agg : agg_v) {
auto const values_type = cudf::is_dictionary(request.values.type())
? cudf::dictionary_column_view(request.values).keys().type()
: request.values.type();
for (auto const& agg : input_aggs) {
groupby_simple_aggregations_collector collector;
auto spass_aggs = agg->get_simple_aggregations(values_type, collector);
if (spass_aggs.size() > 1 || !spass_aggs.front()->is_equal(*agg)) {
Expand All @@ -145,7 +152,11 @@ extract_single_pass_aggs(host_span<aggregation_request const> requests,
}
}

return {table_view(columns), std::move(agg_kinds), std::move(aggs), has_compound_aggs};
return {table_view(columns),
std::move(agg_kinds),
std::move(aggs),
std::move(force_non_nullable),
has_compound_aggs};
}

std::vector<aggregation::Kind> get_simple_aggregations(groupby_aggregation const& agg,
Expand Down
16 changes: 13 additions & 3 deletions cpp/src/groupby/hash/extract_single_pass_aggs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,32 @@
namespace cudf::groupby::detail::hash {

/**
* @brief Extract all compound aggregations into single pass aggs
* @brief Extract single pass aggregations from the given aggregation requests.
*
* For example, a MEAN aggregation will be flattened into a SUM and a COUNT_VALID aggregation.
* During extraction, compound aggregations will be replaced by their corresponding single pass
* aggregations dependencies. For example, a MEAN aggregation will be replaced by a SUM and a
* COUNT_VALID aggregation.
*
* For some single-pass aggregations, we also try to reduce overhead by forcing their results
* columns to be non-nullable. For example, a SUM aggregation needed only as the intermediate result
* for M2 aggregation will not need to have a nullmask to avoid the extra nullmask update and null
* count computation overhead.
*
* @param requests The aggregation requests
* @param stream The CUDA stream
*
* @return A tuple containing:
* - A table_view containing the input values columns for the single-pass aggregations,
* - A vector of aggregation kinds corresponding to each of these values columns,
* - A vector of aggregation objects corresponding to each of these values columns, and
* - A vector of aggregation objects corresponding to each of these values columns,
* - A vector of binary values indicating if the corresponding result will be forced to be
* non-nullable, and
* - A boolean value indicating if there are any multi-pass (compound) aggregations.
*/
std::tuple<table_view,
cudf::detail::host_vector<aggregation::Kind>,
std::vector<std::unique_ptr<aggregation>>,
std::vector<int8_t>,
bool>
extract_single_pass_aggs(host_span<aggregation_request const> requests,
rmm::cuda_stream_view stream);
Expand Down
Loading
Loading