Skip to content

Commit ac4e12b

Browse files
committed
Correct sparse handling for Aggregation clauses
- Makes Aggregation clauses like `Mean` and `Count` respect input column sparsity - Fixes `CopyToBufferTask` to respect sparsity for arrow
1 parent 7acd347 commit ac4e12b

File tree

7 files changed

+129
-81
lines changed

7 files changed

+129
-81
lines changed

cpp/arcticdb/arrow/array_from_block.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ inline std::optional<sparrow::validity_bitmap> create_validity_bitmap(
1919
) {
2020
if (column.has_extra_buffer(offset, ExtraBufferType::BITMAP)) {
2121
auto& bitmap_buffer = column.get_extra_buffer(offset, ExtraBufferType::BITMAP);
22+
util::check(
23+
bitmap_buffer.blocks().size() == 1,
24+
"Expected a single block bitmap extra buffer but got {} blocks",
25+
bitmap_buffer.blocks().size()
26+
);
2227
return sparrow::validity_bitmap{reinterpret_cast<uint8_t*>(bitmap_buffer.block(0)->release()), bitmap_size};
2328
} else {
2429
return std::nullopt;

cpp/arcticdb/processing/test/test_clause.cpp

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -112,12 +112,7 @@ void check_column(arcticdb::SegmentInMemory segment, std::string_view column_nam
112112
ASSERT_EQ(dt, column.type().data_type());
113113
for (std::size_t idx = 0u; idx < ugv; ++idx) {
114114
if constexpr (std::is_floating_point_v<T>) {
115-
const T val = column.scalar_at<T>(idx).value();
116-
if (std::isnan(val)) {
117-
ASSERT_TRUE(std::isnan(f(idx)));
118-
} else {
119-
ASSERT_EQ(f(idx), val);
120-
}
115+
ASSERT_EQ(f(idx), column.scalar_at<T>(idx));
121116
} else {
122117
ASSERT_EQ(f(idx), column.scalar_at<T>(idx));
123118
}
@@ -192,17 +187,22 @@ TEST(Clause, AggregationSparseColumn) {
192187
return idx % 2 == 0 ? 450 + 10 * idx : 0;
193188
});
194189
check_column<int64_t>(*segments[0], "min_int", unique_grouping_values, [](size_t idx) -> std::optional<int64_t> {
195-
return idx % 2 == 0 ? std::optional{static_cast<int64_t>(idx)} : std::nullopt;
190+
return idx % 2 == 0 ? std::make_optional<int64_t>(idx) : std::nullopt;
196191
});
197192
check_column<int64_t>(*segments[0], "max_int", unique_grouping_values, [](size_t idx) -> std::optional<int64_t> {
198-
return idx % 2 == 0 ? std::optional{static_cast<int64_t>(90 + idx)} : std::nullopt;
199-
});
200-
check_column<double>(*segments[0], "mean_int", unique_grouping_values, [](size_t idx) -> double {
201-
return idx % 2 == 0 ? 45 + idx : std::numeric_limits<double>::quiet_NaN();
193+
return idx % 2 == 0 ? std::make_optional<int64_t>(90 + idx) : std::nullopt;
202194
});
203-
check_column<uint64_t>(*segments[0], "count_int", unique_grouping_values, [](size_t idx) -> uint64_t {
204-
return idx % 2 == 0 ? 10 : 0;
195+
check_column<double>(*segments[0], "mean_int", unique_grouping_values, [](size_t idx) -> std::optional<double> {
196+
return idx % 2 == 0 ? std::make_optional<double>(45 + idx) : std::nullopt;
205197
});
198+
check_column<uint64_t>(
199+
*segments[0],
200+
"count_int",
201+
unique_grouping_values,
202+
[](size_t idx) -> std::optional<uint64_t> {
203+
return idx % 2 == 0 ? std::make_optional<uint64_t>(10) : std::nullopt;
204+
}
205+
);
206206
}
207207

208208
TEST(Clause, AggregationSparseGroupby) {

cpp/arcticdb/processing/test/test_unsorted_aggregation.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class AggregationResult : public ::testing::TestWithParam<DataType> {
8787
if constexpr (is_bool_type(InputTypeTag::data_type())) {
8888
return std::array{2 / 3.0, 0.0, 1.0, 1 / 3.0};
8989
} else if constexpr (is_empty_type(InputTypeTag::data_type())) {
90-
return std::array{0.0, 0.0, 0.0};
90+
return std::array<double, 0>{};
9191
}
9292
}
9393

@@ -148,7 +148,11 @@ TEST_P(AggregationResult, Mean) {
148148
ASSERT_EQ(result.field(0).type(), make_scalar_type(OutputDataTypeTag::data_type()));
149149
ASSERT_EQ(result.field(0).name(), "output");
150150
const Column& aggregated_column = result.column(0);
151-
ASSERT_EQ(aggregated_column.row_count(), group_count);
151+
if constexpr (!is_empty_type(TypeTag::data_type)) {
152+
ASSERT_EQ(aggregated_column.row_count(), group_count);
153+
} else {
154+
ASSERT_EQ(aggregated_column.row_count(), 0);
155+
}
152156
constexpr static std::array expected = get_expected_result_mean<InputDataTypeTag>();
153157
Column::for_each_enumerated<OutputDataTypeTag>(aggregated_column, [&](const auto& row) {
154158
ASSERT_EQ(row.value(), expected[row.idx()]);

cpp/arcticdb/processing/unsorted_aggregation.cpp

Lines changed: 70 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,7 @@ void MeanAggregatorData::aggregate(
429429
const ColumnWithStrings& input_column, const std::vector<size_t>& groups, size_t unique_values
430430
) {
431431
fractions_.resize(unique_values);
432+
sparse_map_.resize(unique_values);
432433
details::visit_type(input_column.column_->type().data_type(), [&input_column, &groups, this](auto col_tag) {
433434
using col_type_info = ScalarTypeInfo<decltype(col_tag)>;
434435
if constexpr (is_sequence_type(col_type_info::data_type)) {
@@ -444,10 +445,12 @@ void MeanAggregatorData::aggregate(
444445
if (ARCTICDB_LIKELY(!std::isnan(enumerating_it.value()))) {
445446
fraction.numerator_ += static_cast<double>(enumerating_it.value());
446447
++fraction.denominator_;
448+
sparse_map_.set(groups[enumerating_it.idx()]);
447449
}
448450
} else {
449451
fraction.numerator_ += static_cast<double>(enumerating_it.value());
450452
++fraction.denominator_;
453+
sparse_map_.set(groups[enumerating_it.idx()]);
451454
}
452455
}
453456
);
@@ -458,34 +461,25 @@ SegmentInMemory MeanAggregatorData::finalize(const ColumnName& output_column_nam
458461
SegmentInMemory res;
459462
if (!fractions_.empty()) {
460463
fractions_.resize(unique_values);
461-
auto col = std::make_shared<Column>(
462-
make_scalar_type(get_output_data_type()),
463-
fractions_.size(),
464-
AllocationType::PRESIZED,
465-
Sparsity::NOT_PERMITTED
466-
);
467-
auto column_data = col->data();
468-
// TODO: Empty type needs more though. Maybe we should emit a column of empty value and leave it to the
469-
// NullValueReducer to handle it. As of this PR (04.07.2025) the empty type is feature flagged and not used so
470-
// we don't worry too much about optimizing it.
464+
sparse_map_.resize(unique_values);
465+
auto col =
466+
create_output_column(make_scalar_type(get_output_data_type()), std::move(sparse_map_), unique_values);
467+
// TODO: Empty type needs more thought. Currently we emit a fully sparse column which will be populated by
468+
// `copy_frame_data_to_buffer` but this might not be the right approach. As of this PR (11.09.2025) the empty
469+
// type is feature flagged and not used so we don't worry too much about optimizing it.
471470
if (data_type_ && *data_type_ == DataType::EMPTYVAL) [[unlikely]] {
472-
std::fill_n(column_data.begin<ScalarTagType<DataTypeTag<DataType::FLOAT64>>>(), fractions_.size(), 0.f);
471+
auto empty_bitset = util::BitSet(unique_values);
472+
col->set_sparse_map(std::move(empty_bitset));
473473
} else {
474474
details::visit_type(col->type().data_type(), [&, this]<typename TypeTag>(TypeTag) {
475475
using OutputDataTypeTag =
476476
std::conditional_t<is_time_type(TypeTag::data_type), TypeTag, DataTypeTag<DataType::FLOAT64>>;
477477
using OutputTypeDescriptor = typename ScalarTypeInfo<OutputDataTypeTag>::TDT;
478-
std::transform(
479-
fractions_.cbegin(),
480-
fractions_.cend(),
481-
column_data.begin<OutputTypeDescriptor>(),
482-
[](const auto& fraction) {
483-
return static_cast<typename OutputDataTypeTag::raw_type>(fraction.to_double());
484-
}
485-
);
478+
Column::for_each_enumerated<OutputTypeDescriptor>(*col, [&](auto row) {
479+
row.value() = static_cast<typename OutputDataTypeTag::raw_type>(fractions_[row.idx()].to_double());
480+
});
486481
});
487482
}
488-
col->set_row_data(fractions_.size() - 1);
489483
res.add_column(scalar_field(get_output_data_type(), output_column_name.value), std::move(col));
490484
}
491485
return res;
@@ -505,6 +499,7 @@ void CountAggregatorData::aggregate(
505499
const ColumnWithStrings& input_column, const std::vector<size_t>& groups, size_t unique_values
506500
) {
507501
aggregated_.resize(unique_values);
502+
sparse_map_.resize(unique_values);
508503
details::visit_type(input_column.column_->type().data_type(), [&input_column, &groups, this](auto col_tag) {
509504
using col_type_info = ScalarTypeInfo<decltype(col_tag)>;
510505
Column::for_each_enumerated<typename col_type_info::TDT>(
@@ -514,10 +509,12 @@ void CountAggregatorData::aggregate(
514509
if (ARCTICDB_LIKELY(!std::isnan(enumerating_it.value()))) {
515510
auto& val = aggregated_[groups[enumerating_it.idx()]];
516511
++val;
512+
sparse_map_.set(groups[enumerating_it.idx()]);
517513
}
518514
} else {
519515
auto& val = aggregated_[groups[enumerating_it.idx()]];
520516
++val;
517+
sparse_map_.set(groups[enumerating_it.idx()]);
521518
}
522519
}
523520
);
@@ -528,13 +525,20 @@ SegmentInMemory CountAggregatorData::finalize(const ColumnName& output_column_na
528525
SegmentInMemory res;
529526
if (!aggregated_.empty()) {
530527
aggregated_.resize(unique_values);
531-
auto pos = res.add_column(
532-
scalar_field(DataType::UINT64, output_column_name.value), unique_values, AllocationType::PRESIZED
533-
);
534-
auto& column = res.column(pos);
535-
auto ptr = reinterpret_cast<uint64_t*>(column.ptr());
536-
column.set_row_data(unique_values - 1);
537-
memcpy(ptr, aggregated_.data(), sizeof(uint64_t) * unique_values);
528+
sparse_map_.resize(unique_values);
529+
auto col =
530+
create_output_column(make_scalar_type(get_output_data_type()), std::move(sparse_map_), unique_values);
531+
if (!col->opt_sparse_map().has_value()) {
532+
// If all values are set we use memcpy for efficiency
533+
auto ptr = reinterpret_cast<uint64_t*>(col->ptr());
534+
memcpy(ptr, aggregated_.data(), sizeof(uint64_t) * unique_values);
535+
} else {
536+
using OutputTypeDescriptor = typename ScalarTypeInfo<DataTypeTag<DataType::UINT64>>::TDT;
537+
Column::for_each_enumerated<OutputTypeDescriptor>(*col, [&](auto row) {
538+
row.value() = aggregated_[row.idx()];
539+
});
540+
}
541+
res.add_column(scalar_field(get_output_data_type(), output_column_name.value), std::move(col));
538542
}
539543
return res;
540544
}
@@ -556,6 +560,7 @@ void FirstAggregatorData::aggregate(
556560
using GlobalTypeDescriptorTag = typename OutputType<GlobalInputType>::type;
557561
using GlobalRawType = typename GlobalTypeDescriptorTag::DataTypeTag::raw_type;
558562
aggregated_.resize(sizeof(GlobalRawType) * unique_values);
563+
sparse_map_.resize(unique_values);
559564
auto col_data = input_column.column_->data();
560565
auto out_ptr = reinterpret_cast<GlobalRawType*>(aggregated_.data());
561566
details::visit_type(
@@ -575,11 +580,13 @@ void FirstAggregatorData::aggregate(
575580
if (is_first_group_el || std::isnan(static_cast<ColumnType>(val))) {
576581
groups_cache_.insert(groups[groups_pos]);
577582
val = GlobalRawType(*ptr);
583+
sparse_map_.set(groups[groups_pos]);
578584
}
579585
} else {
580586
if (is_first_group_el) {
581587
groups_cache_.insert(groups[groups_pos]);
582588
val = GlobalRawType(*ptr);
589+
sparse_map_.set(groups[groups_pos]);
583590
}
584591
}
585592
}
@@ -594,17 +601,23 @@ SegmentInMemory FirstAggregatorData::finalize(const ColumnName& output_column_na
594601
SegmentInMemory res;
595602
if (!aggregated_.empty()) {
596603
details::visit_type(*data_type_, [this, &res, &output_column_name, unique_values](auto col_tag) {
597-
using RawType = typename decltype(col_tag)::DataTypeTag::raw_type;
604+
using col_type_info = ScalarTypeInfo<decltype(col_tag)>;
605+
using RawType = typename col_type_info::RawType;
598606
aggregated_.resize(sizeof(RawType) * unique_values);
599-
auto col = std::make_shared<Column>(
600-
make_scalar_type(data_type_.value()),
601-
unique_values,
602-
AllocationType::PRESIZED,
603-
Sparsity::NOT_PERMITTED
604-
);
605-
memcpy(col->ptr(), aggregated_.data(), aggregated_.size());
607+
sparse_map_.resize(unique_values);
608+
auto col =
609+
create_output_column(make_scalar_type(data_type_.value()), std::move(sparse_map_), unique_values);
610+
if (!col->opt_sparse_map().has_value()) {
611+
memcpy(col->ptr(), aggregated_.data(), aggregated_.size());
612+
} else {
613+
const std::span<const RawType> group_values{
614+
reinterpret_cast<const RawType*>(aggregated_.data()), aggregated_.size() / sizeof(RawType)
615+
};
616+
Column::for_each_enumerated<typename col_type_info::TDT>(*col, [&](auto row) {
617+
row.value() = group_values[row.idx()];
618+
});
619+
}
606620
res.add_column(scalar_field(data_type_.value(), output_column_name.value), col);
607-
col->set_row_data(unique_values - 1);
608621
});
609622
}
610623
return res;
@@ -627,6 +640,7 @@ void LastAggregatorData::aggregate(
627640
using GlobalTypeDescriptorTag = typename OutputType<GlobalInputType>::type;
628641
using GlobalRawType = typename GlobalTypeDescriptorTag::DataTypeTag::raw_type;
629642
aggregated_.resize(sizeof(GlobalRawType) * unique_values);
643+
sparse_map_.resize(unique_values);
630644
auto col_data = input_column.column_->data();
631645
auto out_ptr = reinterpret_cast<GlobalRawType*>(aggregated_.data());
632646
details::visit_type(
@@ -648,9 +662,11 @@ void LastAggregatorData::aggregate(
648662
if (is_first_group_el || !std::isnan(static_cast<ColumnType>(curr))) {
649663
groups_cache_.insert(groups[groups_pos]);
650664
val = curr;
665+
sparse_map_.set(groups[groups_pos]);
651666
}
652667
} else {
653668
val = GlobalRawType(*ptr);
669+
sparse_map_.set(groups[groups_pos]);
654670
}
655671
}
656672
}
@@ -663,18 +679,24 @@ void LastAggregatorData::aggregate(
663679
SegmentInMemory LastAggregatorData::finalize(const ColumnName& output_column_name, bool, size_t unique_values) {
664680
SegmentInMemory res;
665681
if (!aggregated_.empty()) {
666-
details::visit_type(*data_type_, [that = this, &res, &output_column_name, unique_values](auto col_tag) {
667-
using RawType = typename decltype(col_tag)::DataTypeTag::raw_type;
668-
that->aggregated_.resize(sizeof(RawType) * unique_values);
669-
auto col = std::make_shared<Column>(
670-
make_scalar_type(that->data_type_.value()),
671-
unique_values,
672-
AllocationType::PRESIZED,
673-
Sparsity::NOT_PERMITTED
674-
);
675-
memcpy(col->ptr(), that->aggregated_.data(), that->aggregated_.size());
676-
res.add_column(scalar_field(that->data_type_.value(), output_column_name.value), col);
677-
col->set_row_data(unique_values - 1);
682+
details::visit_type(*data_type_, [&res, &output_column_name, unique_values, this](auto col_tag) {
683+
using col_type_info = ScalarTypeInfo<decltype(col_tag)>;
684+
using RawType = typename col_type_info::RawType;
685+
aggregated_.resize(sizeof(RawType) * unique_values);
686+
sparse_map_.resize(unique_values);
687+
auto col =
688+
create_output_column(make_scalar_type(data_type_.value()), std::move(sparse_map_), unique_values);
689+
if (!col->opt_sparse_map().has_value()) {
690+
memcpy(col->ptr(), aggregated_.data(), aggregated_.size());
691+
} else {
692+
const std::span<const RawType> group_values{
693+
reinterpret_cast<const RawType*>(aggregated_.data()), aggregated_.size() / sizeof(RawType)
694+
};
695+
Column::for_each_enumerated<typename col_type_info::TDT>(*col, [&](auto row) {
696+
row.value() = group_values[row.idx()];
697+
});
698+
}
699+
res.add_column(scalar_field(data_type_.value(), output_column_name.value), col);
678700
});
679701
}
680702
return res;

cpp/arcticdb/processing/unsorted_aggregation.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ class MeanAggregatorData : private AggregatorDataBase {
118118
};
119119
std::vector<Fraction> fractions_;
120120
std::optional<DataType> data_type_;
121+
util::BitMagic sparse_map_;
121122
};
122123

123124
class CountAggregatorData : private AggregatorDataBase {
@@ -131,6 +132,7 @@ class CountAggregatorData : private AggregatorDataBase {
131132

132133
private:
133134
std::vector<uint64_t> aggregated_;
135+
util::BitMagic sparse_map_;
134136
};
135137

136138
class FirstAggregatorData : private AggregatorDataBase {
@@ -146,6 +148,7 @@ class FirstAggregatorData : private AggregatorDataBase {
146148
std::optional<DataType> data_type_;
147149

148150
std::unordered_set<size_t> groups_cache_;
151+
util::BitMagic sparse_map_;
149152
};
150153

151154
class LastAggregatorData : private AggregatorDataBase {
@@ -161,6 +164,7 @@ class LastAggregatorData : private AggregatorDataBase {
161164
std::optional<DataType> data_type_;
162165

163166
std::unordered_set<size_t> groups_cache_;
167+
util::BitMagic sparse_map_;
164168
};
165169

166170
template<class AggregatorData>

0 commit comments

Comments
 (0)