diff --git a/cpp/benchmarks/groupby/group_m2_var_std.cpp b/cpp/benchmarks/groupby/group_m2_var_std.cpp index 9b153eab824..b9e995e1e9f 100644 --- a/cpp/benchmarks/groupby/group_m2_var_std.cpp +++ b/cpp/benchmarks/groupby/group_m2_var_std.cpp @@ -15,6 +15,7 @@ namespace { template void run_benchmark(nvbench::state& state, cudf::size_type num_rows, + cudf::size_type num_aggs, cudf::size_type value_key_ratio, double null_probability) { @@ -27,30 +28,34 @@ void run_benchmark(nvbench::state& state, return create_random_column(cudf::type_to_id(), row_count{num_rows}, profile); }(); - auto const values = [&] { - auto builder = data_profile_builder().cardinality(0).distribution( - cudf::type_to_id(), distribution_id::UNIFORM, 0, num_rows); - if (null_probability > 0) { - builder.null_probability(null_probability); + auto values_builder = data_profile_builder().cardinality(0).distribution( + cudf::type_to_id(), distribution_id::UNIFORM, 0, num_rows); + if (null_probability > 0) { + values_builder.null_probability(null_probability); + } else { + values_builder.no_validity(); + } + + std::vector> values_cols; + std::vector requests; + values_cols.reserve(num_aggs); + requests.reserve(num_aggs); + for (cudf::size_type i = 0; i < num_aggs; i++) { + auto values = create_random_column( + cudf::type_to_id(), row_count{num_rows}, data_profile{values_builder}); + auto request = cudf::groupby::aggregation_request{}; + request.values = values->view(); + if constexpr (Agg == cudf::aggregation::Kind::M2) { + request.aggregations.push_back(cudf::make_m2_aggregation()); + } else if constexpr (Agg == cudf::aggregation::Kind::VARIANCE) { + request.aggregations.push_back(cudf::make_variance_aggregation()); + } else if constexpr (Agg == cudf::aggregation::Kind::STD) { + request.aggregations.push_back(cudf::make_std_aggregation()); } else { - builder.no_validity(); + throw std::runtime_error("Unsupported aggregation kind."); } - return create_random_column( - cudf::type_to_id(), row_count{num_rows}, data_profile{builder}); - }(); - - // Vector of 1 request - std::vector requests(1); - requests.back().values = values->view(); - if constexpr (Agg == cudf::aggregation::Kind::M2) { - requests.back().aggregations.push_back(cudf::make_m2_aggregation()); - } else if constexpr (Agg == cudf::aggregation::Kind::VARIANCE) { - requests.back().aggregations.push_back( - cudf::make_variance_aggregation()); - } else if constexpr (Agg == cudf::aggregation::Kind::STD) { - requests.back().aggregations.push_back(cudf::make_std_aggregation()); - } else { - throw std::runtime_error("Unsupported aggregation kind."); + values_cols.emplace_back(std::move(values)); + requests.emplace_back(std::move(request)); } auto const mem_stats_logger = cudf::memory_stats_logger(); @@ -75,8 +80,8 @@ void bench_groupby_m2_var_std(nvbench::state& state, auto const value_key_ratio = static_cast(state.get_int64("value_key_ratio")); auto const num_rows = static_cast(state.get_int64("num_rows")); auto const null_probability = state.get_float64("null_probability"); - - run_benchmark(state, num_rows, value_key_ratio, null_probability); + auto const num_aggs = static_cast(state.get_int64("num_aggs")); + run_benchmark(state, num_rows, num_aggs, value_key_ratio, null_probability); } using Types = nvbench::type_list; @@ -87,5 +92,6 @@ using AggKinds = nvbench::enum_type_list(bitmask_type const* row_bitmask, global_set_t const& key_set, host_span h_agg_kinds, device_span d_agg_kinds, + std::span force_non_nullable, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); diff --git a/cpp/src/groupby/hash/compute_global_memory_aggs.cuh b/cpp/src/groupby/hash/compute_global_memory_aggs.cuh index 8fab0d3c864..fa3a7981f75 100644 --- a/cpp/src/groupby/hash/compute_global_memory_aggs.cuh +++ b/cpp/src/groupby/hash/compute_global_memory_aggs.cuh @@ -72,6 +72,7 @@ std::pair, rmm::device_uvector> compute_aggs_d SetType const& key_set, host_span h_agg_kinds, device_span d_agg_kinds, + std::span force_non_nullable, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { @@ -88,9 +89,13 @@ std::pair, rmm::device_uvector> compute_aggs_d }(); auto const d_values = table_device_view::create(values, stream); - auto agg_results = create_results_table( - static_cast(unique_keys.size()), values, h_agg_kinds, stream, mr); - auto d_results_ptr = mutable_table_device_view::create(*agg_results, stream); + auto agg_results = create_results_table(static_cast(unique_keys.size()), + values, + h_agg_kinds, + force_non_nullable, + stream, + mr); + auto d_results_ptr = mutable_table_device_view::create(*agg_results, stream); thrust::for_each_n(rmm::exec_policy_nosync(stream), thrust::make_counting_iterator(int64_t{0}), @@ -116,13 +121,15 @@ std::pair, rmm::device_uvector> compute_aggs_s SetType const& key_set, host_span h_agg_kinds, device_span d_agg_kinds, + std::span force_non_nullable, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { auto const num_rows = values.num_rows(); auto const d_values = table_device_view::create(values, stream); - auto agg_results = create_results_table(num_rows, values, h_agg_kinds, stream, mr); - auto d_results_ptr = mutable_table_device_view::create(*agg_results, stream); + auto agg_results = + create_results_table(num_rows, values, h_agg_kinds, force_non_nullable, stream, mr); + auto d_results_ptr = mutable_table_device_view::create(*agg_results, stream); thrust::for_each_n( rmm::exec_policy_nosync(stream), @@ -153,14 +160,27 @@ std::pair, rmm::device_uvector> compute_global SetType const& key_set, host_span h_agg_kinds, device_span d_agg_kinds, + std::span force_non_nullable, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { return h_agg_kinds.size() > GROUPBY_DENSE_OUTPUT_THRESHOLD - ? compute_aggs_dense_output( - row_bitmask, values, key_set, h_agg_kinds, d_agg_kinds, stream, mr) - : compute_aggs_sparse_output_gather( - row_bitmask, values, key_set, h_agg_kinds, d_agg_kinds, stream, mr); + ? compute_aggs_dense_output(row_bitmask, + values, + key_set, + h_agg_kinds, + d_agg_kinds, + force_non_nullable, + stream, + mr) + : compute_aggs_sparse_output_gather(row_bitmask, + values, + key_set, + h_agg_kinds, + d_agg_kinds, + force_non_nullable, + stream, + mr); } } // namespace cudf::groupby::detail::hash diff --git a/cpp/src/groupby/hash/compute_global_memory_aggs.hpp b/cpp/src/groupby/hash/compute_global_memory_aggs.hpp index 69aa14ab200..2f2c3ff7f51 100644 --- a/cpp/src/groupby/hash/compute_global_memory_aggs.hpp +++ b/cpp/src/groupby/hash/compute_global_memory_aggs.hpp @@ -20,6 +20,7 @@ std::pair, rmm::device_uvector> compute_global SetType const& key_set, host_span h_agg_kinds, device_span d_agg_kinds, + std::span force_non_nullable, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); } // namespace cudf::groupby::detail::hash diff --git a/cpp/src/groupby/hash/compute_global_memory_aggs_null.cu b/cpp/src/groupby/hash/compute_global_memory_aggs_null.cu index 9d39870627c..168cfe0800a 100644 --- a/cpp/src/groupby/hash/compute_global_memory_aggs_null.cu +++ b/cpp/src/groupby/hash/compute_global_memory_aggs_null.cu @@ -13,6 +13,7 @@ compute_global_memory_aggs(bitmask_type const* row_bitmas nullable_global_set_t const& key_set, host_span h_agg_kinds, device_span d_agg_kinds, + std::span force_non_nullable, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); diff --git a/cpp/src/groupby/hash/compute_single_pass_aggs.cuh b/cpp/src/groupby/hash/compute_single_pass_aggs.cuh index 510c656ea05..d09202eafa5 100644 --- a/cpp/src/groupby/hash/compute_single_pass_aggs.cuh +++ b/cpp/src/groupby/hash/compute_single_pass_aggs.cuh @@ -38,7 +38,7 @@ std::pair, bool> compute_single_pass_aggs( { // Collect the single-pass aggregations that can be processed in this function. // The compound aggregations that require multiple passes will be handled separately later on. - auto const [values, agg_kinds, aggs, has_compound_aggs] = + auto const [values, agg_kinds, aggs, force_non_nullable, has_compound_aggs] = extract_single_pass_aggs(requests, stream); auto const d_agg_kinds = cudf::detail::make_device_uvector_async( agg_kinds, stream, cudf::get_current_device_resource_ref()); @@ -50,7 +50,7 @@ std::pair, bool> compute_single_pass_aggs( // present. auto const run_aggs_by_global_mem_kernel = [&] { auto [agg_results, unique_key_indices] = compute_global_memory_aggs( - row_bitmask, values, global_set, agg_kinds, d_agg_kinds, stream, mr); + row_bitmask, values, global_set, agg_kinds, d_agg_kinds, force_non_nullable, stream, mr); finalize_output(values, aggs, agg_results, cache, stream); return std::pair{std::move(unique_key_indices), has_compound_aggs}; }; @@ -210,8 +210,8 @@ std::pair, bool> compute_single_pass_aggs( key_transform_map = rmm::device_uvector{0, stream}; // done, free up memory early auto const d_spass_values = table_device_view::create(values, stream); - auto agg_results = - create_results_table(static_cast(unique_keys.size()), values, agg_kinds, stream, mr); + auto agg_results = create_results_table( + static_cast(unique_keys.size()), values, agg_kinds, force_non_nullable, stream, mr); auto d_results_ptr = mutable_table_device_view::create(*agg_results, stream); compute_shared_memory_aggs(grid_size, diff --git a/cpp/src/groupby/hash/extract_single_pass_aggs.cpp b/cpp/src/groupby/hash/extract_single_pass_aggs.cpp index 242920d2c11..0aae34a039c 100644 --- a/cpp/src/groupby/hash/extract_single_pass_aggs.cpp +++ b/cpp/src/groupby/hash/extract_single_pass_aggs.cpp @@ -92,47 +92,54 @@ class groupby_simple_aggregations_collector final return aggs; } - - std::vector> visit( - data_type, cudf::detail::correlation_aggregation const&) override - { - std::vector> aggs; - aggs.push_back(make_sum_aggregation()); - // COUNT_VALID - aggs.push_back(make_count_aggregation()); - - return aggs; - } }; std::tuple, std::vector>, + std::vector, bool> extract_single_pass_aggs(host_span requests, rmm::cuda_stream_view stream) { std::vector columns; std::vector> aggs; + std::vector force_non_nullable; auto agg_kinds = cudf::detail::make_empty_host_vector(requests.size(), stream); bool has_compound_aggs = false; for (auto const& request : requests) { - auto const& agg_v = request.aggregations; + auto const& input_aggs = request.aggregations; + + // The set of input aggregations. + std::unordered_set input_agg_kinds_set; + for (auto const& agg : input_aggs) { + input_agg_kinds_set.insert(agg->kind); + } + // The aggregations extracted from the input request, including the original single-pass + // aggregations and the intermediate single-pass aggregations replacing compound + // aggregations. std::unordered_set agg_kinds_set; + auto insert_agg = [&](column_view const& request_values, std::unique_ptr&& agg) { if (agg_kinds_set.insert(agg->kind).second) { + // Check if the inserted aggregation is an input aggregation or a replacement aggregation. + // If it is not an input aggregation, we can force its output to be non-nullable + // (by storing `1` value in the `force_non_nullable` vector). + auto const is_input_agg = input_agg_kinds_set.contains(agg->kind); + force_non_nullable.push_back(is_input_agg ? 0 : 1); + agg_kinds.push_back(agg->kind); aggs.push_back(std::move(agg)); columns.push_back(request_values); } }; - auto values_type = cudf::is_dictionary(request.values.type()) - ? cudf::dictionary_column_view(request.values).keys().type() - : request.values.type(); - for (auto const& agg : agg_v) { + auto const values_type = cudf::is_dictionary(request.values.type()) + ? cudf::dictionary_column_view(request.values).keys().type() + : request.values.type(); + for (auto const& agg : input_aggs) { groupby_simple_aggregations_collector collector; auto spass_aggs = agg->get_simple_aggregations(values_type, collector); if (spass_aggs.size() > 1 || !spass_aggs.front()->is_equal(*agg)) { @@ -145,7 +152,11 @@ extract_single_pass_aggs(host_span requests, } } - return {table_view(columns), std::move(agg_kinds), std::move(aggs), has_compound_aggs}; + return {table_view(columns), + std::move(agg_kinds), + std::move(aggs), + std::move(force_non_nullable), + has_compound_aggs}; } std::vector get_simple_aggregations(groupby_aggregation const& agg, diff --git a/cpp/src/groupby/hash/extract_single_pass_aggs.hpp b/cpp/src/groupby/hash/extract_single_pass_aggs.hpp index 158e566f474..144aa719ce7 100644 --- a/cpp/src/groupby/hash/extract_single_pass_aggs.hpp +++ b/cpp/src/groupby/hash/extract_single_pass_aggs.hpp @@ -15,9 +15,16 @@ namespace cudf::groupby::detail::hash { /** - * @brief Extract all compound aggregations into single pass aggs + * @brief Extract single pass aggregations from the given aggregation requests. * - * For example, a MEAN aggregation will be flattened into a SUM and a COUNT_VALID aggregation. + * During extraction, compound aggregations will be replaced by their corresponding single pass + * aggregations dependencies. For example, a MEAN aggregation will be replaced by a SUM and a + * COUNT_VALID aggregation. + * + * For some single-pass aggregations, we also try to reduce overhead by forcing their results + * columns to be non-nullable. For example, a SUM aggregation needed only as the intermediate result + * for M2 aggregation will not need to have a nullmask to avoid the extra nullmask update and null + * count computation overhead. * * @param requests The aggregation requests * @param stream The CUDA stream @@ -25,12 +32,15 @@ namespace cudf::groupby::detail::hash { * @return A tuple containing: * - A table_view containing the input values columns for the single-pass aggregations, * - A vector of aggregation kinds corresponding to each of these values columns, - * - A vector of aggregation objects corresponding to each of these values columns, and + * - A vector of aggregation objects corresponding to each of these values columns, + * - A vector of binary values indicating if the corresponding result will be forced to be + * non-nullable, and * - A boolean value indicating if there are any multi-pass (compound) aggregations. */ std::tuple, std::vector>, + std::vector, bool> extract_single_pass_aggs(host_span requests, rmm::cuda_stream_view stream); diff --git a/cpp/src/groupby/hash/hash_compound_agg_finalizer.cu b/cpp/src/groupby/hash/hash_compound_agg_finalizer.cu index b2ce885c388..c2ac6027c4a 100644 --- a/cpp/src/groupby/hash/hash_compound_agg_finalizer.cu +++ b/cpp/src/groupby/hash/hash_compound_agg_finalizer.cu @@ -12,6 +12,8 @@ #include #include #include +#include +#include #include #include @@ -46,14 +48,13 @@ auto hash_compound_agg_finalizer::gather_argminmax(aggregation const& agg) static_cast(arg_result.data()), nullptr, 0); - auto gather_argminmax = - cudf::detail::gather(table_view({col}), - null_removed_map, - arg_result.nullable() ? cudf::out_of_bounds_policy::NULLIFY - : cudf::out_of_bounds_policy::DONT_CHECK, - cudf::detail::negative_index_policy::NOT_ALLOWED, - stream, - mr); + auto gather_argminmax = cudf::detail::gather( + table_view{{col}}, + null_removed_map, + col.nullable() ? cudf::out_of_bounds_policy::NULLIFY : cudf::out_of_bounds_policy::DONT_CHECK, + cudf::detail::negative_index_policy::NOT_ALLOWED, + stream, + mr); return std::move(gather_argminmax->release()[0]); } @@ -81,18 +82,40 @@ void hash_compound_agg_finalizer::visit(cudf::detail::mean_aggregation const& ag { if (cache->has_result(col, agg)) { return; } - auto const sum_agg = make_sum_aggregation(); - auto const count_agg = make_count_aggregation(); - auto const sum_result = cache->get_result(col, *sum_agg); - auto const count_result = cache->get_result(col, *count_agg); - + auto const sum_agg = make_sum_aggregation(); + auto const count_agg = make_count_aggregation(); + auto const sum_result = cache->get_result(col, *sum_agg); + auto const count_result = cache->get_result(col, *count_agg); + auto const null_removed_sum = [&] { + if (sum_result.null_count() == 0) { return sum_result; } + return column_view{ + sum_result.type(), sum_result.size(), sum_result.head(), nullptr, 0, sum_result.offset()}; + }(); + + // Perform division without any null masks, and generate the null mask for the result later. + // This is because the null mask (if exists) is just needed to be copied from the sum result, + // and copying is faster than running the `bitmask_and` kernel. auto result = - cudf::detail::binary_operation(sum_result, + cudf::detail::binary_operation(null_removed_sum, count_result, binary_operator::DIV, cudf::detail::target_type(input_type, aggregation::MEAN), stream, mr); + // SUM result only has nulls if it is an input aggregation, not intermediate-only aggregation. + if (sum_result.has_nulls()) { + result->set_null_mask(cudf::detail::copy_bitmask(sum_result, stream, mr), + sum_result.null_count()); + } else if (col.has_nulls()) { // SUM aggregation is only intermediate result, thus it is forced + // to be non-nullable + auto [null_mask, null_count] = cudf::detail::valid_if( + count_result.begin(), + count_result.end(), + [] __device__(size_type const count) -> bool { return count > 0; }, + stream, + mr); + if (null_count > 0) { result->set_null_mask(std::move(null_mask), null_count); } + } cache->add_result(col, agg, std::move(result)); } @@ -100,12 +123,9 @@ void hash_compound_agg_finalizer::visit(cudf::detail::m2_aggregation const& agg) { if (cache->has_result(col, agg)) { return; } - auto const sum_sqr_agg = make_sum_of_squares_aggregation(); - auto const sum_agg = make_sum_aggregation(); - auto const count_agg = make_count_aggregation(); - this->visit(*sum_sqr_agg); - this->visit(*sum_agg); - this->visit(*count_agg); + auto const sum_sqr_agg = make_sum_of_squares_aggregation(); + auto const sum_agg = make_sum_aggregation(); + auto const count_agg = make_count_aggregation(); auto const sum_sqr_result = cache->get_result(col, *sum_sqr_agg); auto const sum_result = cache->get_result(col, *sum_agg); auto const count_result = cache->get_result(col, *count_agg); @@ -118,10 +138,11 @@ void hash_compound_agg_finalizer::visit(cudf::detail::var_aggregation const& agg { if (cache->has_result(col, agg)) { return; } - auto const m2_agg = make_m2_aggregation(); - auto const count_agg = make_count_aggregation(); + auto const m2_agg = make_m2_aggregation(); + // Since M2 is a compound aggregation, we need to "finalize" it using aggregation finalizer's + // "visit" method. this->visit(*dynamic_cast(m2_agg.get())); - this->visit(*count_agg); + auto const count_agg = make_count_aggregation(); auto const m2_result = cache->get_result(col, *m2_agg); auto const count_result = cache->get_result(col, *count_agg); @@ -133,10 +154,11 @@ void hash_compound_agg_finalizer::visit(cudf::detail::std_aggregation const& agg { if (cache->has_result(col, agg)) { return; } - auto const m2_agg = make_m2_aggregation(); - auto const count_agg = make_count_aggregation(); + auto const m2_agg = make_m2_aggregation(); + // Since M2 is a compound aggregation, we need to "finalize" it using aggregation finalizer's + // "visit" method. this->visit(*dynamic_cast(m2_agg.get())); - this->visit(*count_agg); + auto const count_agg = make_count_aggregation(); auto const m2_result = cache->get_result(col, *m2_agg); auto const count_result = cache->get_result(col, *count_agg); diff --git a/cpp/src/groupby/hash/output_utils.cu b/cpp/src/groupby/hash/output_utils.cu index 208135c78d1..4d5f473149c 100644 --- a/cpp/src/groupby/hash/output_utils.cu +++ b/cpp/src/groupby/hash/output_utils.cu @@ -46,12 +46,14 @@ struct result_column_creator { { } - std::unique_ptr operator()(column_view const& col, aggregation::Kind const& agg) const + std::unique_ptr operator()(column_view const& col, + aggregation::Kind const& agg, + bool force_non_nullable) const { auto const col_type = is_dictionary(col.type()) ? dictionary_column_view(col).keys().type() : col.type(); - auto const nullable = - agg != aggregation::COUNT_VALID && agg != aggregation::COUNT_ALL && col.has_nulls(); + auto const nullable = !force_non_nullable && agg != aggregation::COUNT_VALID && + agg != aggregation::COUNT_ALL && col.has_nulls(); // TODO: Remove adjusted buffer size workaround once https://github.com/NVIDIA/cccl/issues/6430 // is fixed. Use adjusted buffer size for small data types to ensure atomic operation safety. auto const make_uninitialized_column = [&](data_type d_type, size_type size, mask_state state) { @@ -97,15 +99,22 @@ struct result_column_creator { std::unique_ptr create_results_table(size_type output_size, table_view const& values, host_span agg_kinds, + std::span force_non_nullable, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { + CUDF_EXPECTS(values.num_columns() == static_cast(agg_kinds.size()), + "The number of values columns and size of agg_kinds vector must be the same."); + CUDF_EXPECTS( + values.num_columns() == static_cast(force_non_nullable.size()), + "The number of values columns and size of force_non_nullable vector must be the same."); + + auto const column_creator = result_column_creator{output_size, stream, mr}; std::vector> output_cols; - std::transform(values.begin(), - values.end(), - agg_kinds.begin(), - std::back_inserter(output_cols), - result_column_creator{output_size, stream, mr}); + for (size_t i = 0; i < agg_kinds.size(); i++) { + output_cols.emplace_back( + column_creator(values.column(i), agg_kinds[i], static_cast(force_non_nullable[i]))); + } auto result_table = std::make_unique
(std::move(output_cols)); cudf::detail::initialize_with_identity(result_table->mutable_view(), agg_kinds, stream); return result_table; diff --git a/cpp/src/groupby/hash/output_utils.hpp b/cpp/src/groupby/hash/output_utils.hpp index cec7413ff40..1bb84c930cd 100644 --- a/cpp/src/groupby/hash/output_utils.hpp +++ b/cpp/src/groupby/hash/output_utils.hpp @@ -23,6 +23,8 @@ namespace cudf::groupby::detail::hash { * @param output_size Number of rows in the output table * @param values The values columns to be aggregated * @param agg_kinds The aggregation kinds corresponding to each input column + * @param force_non_nullable A binary values vector indicating if the corresponding result + * will be forced to be non-nullable * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned table's device memory * @return The table containing columns for storing aggregation results @@ -30,6 +32,7 @@ namespace cudf::groupby::detail::hash { std::unique_ptr
create_results_table(size_type output_size, table_view const& values, host_span agg_kinds, + std::span force_non_nullable, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr);