Skip to content

Commit 10da688

Browse files
committed
Correct sparse handling for Aggregation clauses
- Makes Aggregation clauses like `Mean` and `Count` respect input column sparsity - Fixes `CopyToBufferTask` to respect sparsity for arrow
1 parent ffe58bc commit 10da688

File tree

7 files changed

+113
-61
lines changed

7 files changed

+113
-61
lines changed

cpp/arcticdb/arrow/array_from_block.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ namespace arcticdb {
1616
inline std::optional<sparrow::validity_bitmap> create_validity_bitmap(size_t offset, const Column& column, size_t bitmap_size) {
1717
if(column.has_extra_buffer(offset, ExtraBufferType::BITMAP)) {
1818
auto &bitmap_buffer = column.get_extra_buffer(offset, ExtraBufferType::BITMAP);
19+
util::check(
20+
bitmap_buffer.blocks().size() == 1,
21+
"Expected a single block bitmap extra buffer but got {} blocks",
22+
bitmap_buffer.blocks().size());
1923
return sparrow::validity_bitmap{reinterpret_cast<uint8_t *>(bitmap_buffer.block(0)->release()), bitmap_size};
2024
} else {
2125
return std::nullopt;

cpp/arcticdb/processing/test/test_clause.cpp

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,7 @@ namespace aggregation_test
103103
ASSERT_EQ(dt, column.type().data_type());
104104
for(std::size_t idx = 0u; idx < ugv; ++idx) {
105105
if constexpr (std::is_floating_point_v<T>) {
106-
const T val = column.scalar_at<T>(idx).value();
107-
if (std::isnan(val)) {
108-
ASSERT_TRUE(std::isnan(f(idx)));
109-
} else {
110-
ASSERT_EQ(f(idx), val);
111-
}
106+
ASSERT_EQ(f(idx), column.scalar_at<T>(idx));
112107
} else {
113108
ASSERT_EQ(f(idx), column.scalar_at<T>(idx));
114109
}
@@ -175,16 +170,16 @@ TEST(Clause, AggregationSparseColumn)
175170
return idx % 2 == 0 ? 450 + 10 * idx : 0;
176171
});
177172
check_column<int64_t>(*segments[0], "min_int", unique_grouping_values, [](size_t idx) -> std::optional<int64_t> {
178-
return idx % 2 == 0 ? std::optional{static_cast<int64_t>(idx)} : std::nullopt;
173+
return idx % 2 == 0 ? std::make_optional<int64_t>(idx) : std::nullopt;
179174
});
180175
check_column<int64_t>(*segments[0], "max_int", unique_grouping_values, [](size_t idx) -> std::optional<int64_t> {
181-
return idx % 2 == 0 ? std::optional{static_cast<int64_t>(90 + idx)} : std::nullopt;
176+
return idx % 2 == 0 ? std::make_optional<int64_t>(90 + idx) : std::nullopt;
182177
});
183-
check_column<double>(*segments[0], "mean_int", unique_grouping_values, [](size_t idx) -> double {
184-
return idx % 2 == 0 ? 45 + idx : std::numeric_limits<double>::quiet_NaN();
178+
check_column<double>(*segments[0], "mean_int", unique_grouping_values, [](size_t idx) -> std::optional<double> {
179+
return idx % 2 == 0 ? std::make_optional<double>(45 + idx) : std::nullopt;
185180
});
186-
check_column<uint64_t>(*segments[0], "count_int", unique_grouping_values, [](size_t idx) -> uint64_t {
187-
return idx % 2 == 0 ? 10 : 0;
181+
check_column<uint64_t>(*segments[0], "count_int", unique_grouping_values, [](size_t idx) -> std::optional<uint64_t> {
182+
return idx % 2 == 0 ? std::make_optional<uint64_t>(10) : std::nullopt;
188183
});
189184
}
190185

cpp/arcticdb/processing/test/test_unsorted_aggregation.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ class AggregationResult : public ::testing::TestWithParam<DataType> {
8383
} if constexpr (is_bool_type(InputTypeTag::data_type())) {
8484
return std::array{ 2 / 3.0, 0.0, 1.0, 1 / 3.0};
8585
} else if constexpr (is_empty_type(InputTypeTag::data_type())) {
86-
return std::array{0.0, 0.0, 0.0};
86+
return std::array<double, 0>{};
8787
}
8888
}
8989

@@ -139,7 +139,11 @@ TEST_P(AggregationResult, Mean) {
139139
ASSERT_EQ(result.field(0).type(), make_scalar_type(OutputDataTypeTag::data_type()));
140140
ASSERT_EQ(result.field(0).name(), "output");
141141
const Column& aggregated_column = result.column(0);
142-
ASSERT_EQ(aggregated_column.row_count(), group_count);
142+
if constexpr (!is_empty_type(TypeTag::data_type)) {
143+
ASSERT_EQ(aggregated_column.row_count(), group_count);
144+
} else {
145+
ASSERT_EQ(aggregated_column.row_count(), 0);
146+
}
143147
constexpr static std::array expected = get_expected_result_mean<InputDataTypeTag>();
144148
Column::for_each_enumerated<OutputDataTypeTag>(aggregated_column, [&](const auto& row) {
145149
ASSERT_EQ(row.value(), expected[row.idx()]);

cpp/arcticdb/processing/unsorted_aggregation.cpp

Lines changed: 65 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,7 @@ DataType MeanAggregatorData::get_output_data_type() {
426426

427427
void MeanAggregatorData::aggregate(const ColumnWithStrings& input_column, const std::vector<size_t>& groups, size_t unique_values) {
428428
fractions_.resize(unique_values);
429+
sparse_map_.resize(unique_values);
429430
details::visit_type(input_column.column_->type().data_type(), [&input_column, &groups, this] (auto col_tag) {
430431
using col_type_info = ScalarTypeInfo<decltype(col_tag)>;
431432
if constexpr (is_sequence_type(col_type_info::data_type)) {
@@ -439,10 +440,12 @@ void MeanAggregatorData::aggregate(const ColumnWithStrings& input_column, const
439440
if (ARCTICDB_LIKELY(!std::isnan(enumerating_it.value()))) {
440441
fraction.numerator_ += static_cast<double>(enumerating_it.value());
441442
++fraction.denominator_;
443+
sparse_map_.set(groups[enumerating_it.idx()]);
442444
}
443445
} else {
444446
fraction.numerator_ += static_cast<double>(enumerating_it.value());
445447
++fraction.denominator_;
448+
sparse_map_.set(groups[enumerating_it.idx()]);
446449
}
447450
});
448451
});
@@ -452,25 +455,23 @@ SegmentInMemory MeanAggregatorData::finalize(const ColumnName& output_column_nam
452455
SegmentInMemory res;
453456
if(!fractions_.empty()) {
454457
fractions_.resize(unique_values);
455-
auto col = std::make_shared<Column>(make_scalar_type(get_output_data_type()), fractions_.size(), AllocationType::PRESIZED, Sparsity::NOT_PERMITTED);
456-
auto column_data = col->data();
457-
// TODO: Empty type needs more though. Maybe we should emit a column of empty value and leave it to the
458-
// NullValueReducer to handle it. As of this PR (04.07.2025) the empty type is feature flagged and not used so
459-
// we don't worry too much about optimizing it.
458+
sparse_map_.resize(unique_values);
459+
auto col = create_output_column(make_scalar_type(get_output_data_type()), std::move(sparse_map_), unique_values);
460+
// TODO: Empty type needs more thought. Currently we emit a fully sparse column which will be populated by
461+
// `copy_frame_data_to_buffer` but this might not be the right approach. As of this PR (11.09.2025) the empty
462+
// type is feature flagged and not used so we don't worry too much about optimizing it.
460463
if (data_type_ && *data_type_ == DataType::EMPTYVAL) [[unlikely]] {
461-
std::fill_n(column_data.begin<ScalarTagType<DataTypeTag<DataType::FLOAT64>>>(), fractions_.size(), 0.f);
464+
auto empty_bitset = util::BitSet(unique_values);
465+
col->set_sparse_map(std::move(empty_bitset));
462466
} else {
463467
details::visit_type(col->type().data_type(), [&, this]<typename TypeTag>(TypeTag) {
464-
using OutputDataTypeTag = std::conditional_t<is_time_type(TypeTag::data_type), TypeTag, DataTypeTag<DataType::FLOAT64>>;
465-
using OutputTypeDescriptor = typename ScalarTypeInfo<OutputDataTypeTag>::TDT;
466-
std::transform(fractions_.cbegin(), fractions_.cend(),
467-
column_data.begin<OutputTypeDescriptor>(),
468-
[](const auto &fraction) {
469-
return static_cast<typename OutputDataTypeTag::raw_type>(fraction.to_double());
470-
});
471-
});
468+
using OutputDataTypeTag = std::conditional_t<is_time_type(TypeTag::data_type), TypeTag, DataTypeTag<DataType::FLOAT64>>;
469+
using OutputTypeDescriptor = typename ScalarTypeInfo<OutputDataTypeTag>::TDT;
470+
Column::for_each_enumerated<OutputTypeDescriptor>(*col, [&](auto row) {
471+
row.value() = static_cast<typename OutputDataTypeTag::raw_type>(fractions_[row.idx()].to_double());
472+
});
473+
});
472474
}
473-
col->set_row_data(fractions_.size() - 1);
474475
res.add_column(scalar_field(get_output_data_type(), output_column_name.value), std::move(col));
475476
}
476477
return res;
@@ -490,17 +491,20 @@ std::optional<Value> MeanAggregatorData::get_default_value() {
490491

491492
void CountAggregatorData::aggregate(const ColumnWithStrings& input_column, const std::vector<size_t>& groups, size_t unique_values) {
492493
aggregated_.resize(unique_values);
494+
sparse_map_.resize(unique_values);
493495
details::visit_type(input_column.column_->type().data_type(), [&input_column, &groups, this] (auto col_tag) {
494496
using col_type_info = ScalarTypeInfo<decltype(col_tag)>;
495497
Column::for_each_enumerated<typename col_type_info::TDT>(*input_column.column_, [&groups, this](auto enumerating_it) {
496498
if constexpr (is_floating_point_type(col_type_info::data_type)) {
497499
if (ARCTICDB_LIKELY(!std::isnan(enumerating_it.value()))) {
498500
auto& val = aggregated_[groups[enumerating_it.idx()]];
499501
++val;
502+
sparse_map_.set(groups[enumerating_it.idx()]);
500503
}
501504
} else {
502505
auto& val = aggregated_[groups[enumerating_it.idx()]];
503506
++val;
507+
sparse_map_.set(groups[enumerating_it.idx()]);
504508
}
505509
});
506510
});
@@ -510,11 +514,19 @@ SegmentInMemory CountAggregatorData::finalize(const ColumnName& output_column_na
510514
SegmentInMemory res;
511515
if(!aggregated_.empty()) {
512516
aggregated_.resize(unique_values);
513-
auto pos = res.add_column(scalar_field(DataType::UINT64, output_column_name.value), unique_values, AllocationType::PRESIZED);
514-
auto& column = res.column(pos);
515-
auto ptr = reinterpret_cast<uint64_t*>(column.ptr());
516-
column.set_row_data(unique_values - 1);
517-
memcpy(ptr, aggregated_.data(), sizeof(uint64_t)*unique_values);
517+
sparse_map_.resize(unique_values);
518+
auto col = create_output_column(make_scalar_type(get_output_data_type()), std::move(sparse_map_), unique_values);
519+
if (!col->opt_sparse_map().has_value()) {
520+
// If all values are set we use memcpy for efficiency
521+
auto ptr = reinterpret_cast<uint64_t*>(col->ptr());
522+
memcpy(ptr, aggregated_.data(), sizeof(uint64_t)*unique_values);
523+
} else {
524+
using OutputTypeDescriptor = typename ScalarTypeInfo<DataTypeTag<DataType::UINT64>>::TDT;
525+
Column::for_each_enumerated<OutputTypeDescriptor>(*col, [&](auto row) {
526+
row.value() = aggregated_[row.idx()];
527+
});
528+
}
529+
res.add_column(scalar_field(get_output_data_type(), output_column_name.value), std::move(col));
518530
}
519531
return res;
520532
}
@@ -538,6 +550,7 @@ void FirstAggregatorData::aggregate(const ColumnWithStrings& input_column, const
538550
using GlobalTypeDescriptorTag = typename OutputType<GlobalInputType>::type;
539551
using GlobalRawType = typename GlobalTypeDescriptorTag::DataTypeTag::raw_type;
540552
aggregated_.resize(sizeof(GlobalRawType)* unique_values);
553+
sparse_map_.resize(unique_values);
541554
auto col_data = input_column.column_->data();
542555
auto out_ptr = reinterpret_cast<GlobalRawType*>(aggregated_.data());
543556
details::visit_type(input_column.column_->type().data_type(), [this, &groups, &out_ptr, &col_data] (auto col_tag) {
@@ -553,11 +566,13 @@ void FirstAggregatorData::aggregate(const ColumnWithStrings& input_column, const
553566
if (is_first_group_el || std::isnan(static_cast<ColumnType>(val))) {
554567
groups_cache_.insert(groups[groups_pos]);
555568
val = GlobalRawType(*ptr);
569+
sparse_map_.set(groups[groups_pos]);
556570
}
557571
} else {
558572
if (is_first_group_el) {
559573
groups_cache_.insert(groups[groups_pos]);
560574
val = GlobalRawType(*ptr);
575+
sparse_map_.set(groups[groups_pos]);
561576
}
562577
}
563578
}
@@ -571,12 +586,20 @@ SegmentInMemory FirstAggregatorData::finalize(const ColumnName& output_column_na
571586
SegmentInMemory res;
572587
if(!aggregated_.empty()) {
573588
details::visit_type(*data_type_, [this, &res, &output_column_name, unique_values] (auto col_tag) {
574-
using RawType = typename decltype(col_tag)::DataTypeTag::raw_type;
589+
using col_type_info = ScalarTypeInfo<decltype(col_tag)>;
590+
using RawType = typename col_type_info::RawType;
575591
aggregated_.resize(sizeof(RawType)* unique_values);
576-
auto col = std::make_shared<Column>(make_scalar_type(data_type_.value()), unique_values, AllocationType::PRESIZED, Sparsity::NOT_PERMITTED);
577-
memcpy(col->ptr(), aggregated_.data(), aggregated_.size());
592+
sparse_map_.resize(unique_values);
593+
auto col = create_output_column(make_scalar_type(data_type_.value()), std::move(sparse_map_), unique_values);
594+
if (!col->opt_sparse_map().has_value()) {
595+
memcpy(col->ptr(), aggregated_.data(), aggregated_.size());
596+
} else {
597+
const std::span<const RawType> group_values{reinterpret_cast<const RawType*>(aggregated_.data()), aggregated_.size() / sizeof(RawType)};
598+
Column::for_each_enumerated<typename col_type_info::TDT>(*col, [&](auto row) {
599+
row.value() = group_values[row.idx()];
600+
});
601+
}
578602
res.add_column(scalar_field(data_type_.value(), output_column_name.value), col);
579-
col->set_row_data(unique_values - 1);
580603
});
581604
}
582605
return res;
@@ -601,6 +624,7 @@ void LastAggregatorData::aggregate(const ColumnWithStrings& input_column, const
601624
using GlobalTypeDescriptorTag = typename OutputType<GlobalInputType>::type;
602625
using GlobalRawType = typename GlobalTypeDescriptorTag::DataTypeTag::raw_type;
603626
aggregated_.resize(sizeof(GlobalRawType)* unique_values);
627+
sparse_map_.resize(unique_values);
604628
auto col_data = input_column.column_->data();
605629
auto out_ptr = reinterpret_cast<GlobalRawType*>(aggregated_.data());
606630
details::visit_type(input_column.column_->type().data_type(), [&groups, &out_ptr, &col_data, this] (auto col_tag) {
@@ -617,9 +641,11 @@ void LastAggregatorData::aggregate(const ColumnWithStrings& input_column, const
617641
if (is_first_group_el || !std::isnan(static_cast<ColumnType>(curr))) {
618642
groups_cache_.insert(groups[groups_pos]);
619643
val = curr;
644+
sparse_map_.set(groups[groups_pos]);
620645
}
621646
} else {
622647
val = GlobalRawType(*ptr);
648+
sparse_map_.set(groups[groups_pos]);
623649
}
624650
}
625651
}
@@ -631,13 +657,21 @@ void LastAggregatorData::aggregate(const ColumnWithStrings& input_column, const
631657
SegmentInMemory LastAggregatorData::finalize(const ColumnName& output_column_name, bool, size_t unique_values) {
632658
SegmentInMemory res;
633659
if(!aggregated_.empty()) {
634-
details::visit_type(*data_type_, [that=this, &res, &output_column_name, unique_values] (auto col_tag) {
635-
using RawType = typename decltype(col_tag)::DataTypeTag::raw_type;
636-
that->aggregated_.resize(sizeof(RawType)* unique_values);
637-
auto col = std::make_shared<Column>(make_scalar_type(that->data_type_.value()), unique_values, AllocationType::PRESIZED, Sparsity::NOT_PERMITTED);
638-
memcpy(col->ptr(), that->aggregated_.data(), that->aggregated_.size());
639-
res.add_column(scalar_field(that->data_type_.value(), output_column_name.value), col);
640-
col->set_row_data(unique_values - 1);
660+
details::visit_type(*data_type_, [&res, &output_column_name, unique_values, this] (auto col_tag) {
661+
using col_type_info = ScalarTypeInfo<decltype(col_tag)>;
662+
using RawType = typename col_type_info::RawType;
663+
aggregated_.resize(sizeof(RawType)* unique_values);
664+
sparse_map_.resize(unique_values);
665+
auto col = create_output_column(make_scalar_type(data_type_.value()), std::move(sparse_map_), unique_values);
666+
if (!col->opt_sparse_map().has_value()) {
667+
memcpy(col->ptr(), aggregated_.data(), aggregated_.size());
668+
} else {
669+
const std::span<const RawType> group_values{reinterpret_cast<const RawType*>(aggregated_.data()), aggregated_.size() / sizeof(RawType)};
670+
Column::for_each_enumerated<typename col_type_info::TDT>(*col, [&](auto row) {
671+
row.value() = group_values[row.idx()];
672+
});
673+
}
674+
res.add_column(scalar_field(data_type_.value(), output_column_name.value), col);
641675
});
642676
}
643677
return res;

cpp/arcticdb/processing/unsorted_aggregation.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ class MeanAggregatorData : private AggregatorDataBase
132132
};
133133
std::vector<Fraction> fractions_;
134134
std::optional<DataType> data_type_;
135+
util::BitMagic sparse_map_;
135136
};
136137

137138
class CountAggregatorData : private AggregatorDataBase
@@ -149,6 +150,7 @@ class CountAggregatorData : private AggregatorDataBase
149150
private:
150151

151152
std::vector<uint64_t> aggregated_;
153+
util::BitMagic sparse_map_;
152154
};
153155

154156
class FirstAggregatorData : private AggregatorDataBase
@@ -168,6 +170,7 @@ class FirstAggregatorData : private AggregatorDataBase
168170
std::optional<DataType> data_type_;
169171

170172
std::unordered_set<size_t> groups_cache_;
173+
util::BitMagic sparse_map_;
171174
};
172175

173176
class LastAggregatorData : private AggregatorDataBase
@@ -187,6 +190,7 @@ class LastAggregatorData : private AggregatorDataBase
187190
std::optional<DataType> data_type_;
188191

189192
std::unordered_set<size_t> groups_cache_;
193+
util::BitMagic sparse_map_;
190194
};
191195

192196
template <class AggregatorData>

0 commit comments

Comments
 (0)