Skip to content

Commit 6570743

Browse files
committed
Use bulk_insert_iterator for aggragation clauses
1 parent 0fffb89 commit 6570743

File tree

1 file changed

+22
-14
lines changed

1 file changed

+22
-14
lines changed

cpp/arcticdb/processing/unsorted_aggregation.cpp

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -427,28 +427,30 @@ DataType MeanAggregatorData::get_output_data_type() {
427427
void MeanAggregatorData::aggregate(const ColumnWithStrings& input_column, const std::vector<size_t>& groups, size_t unique_values) {
428428
fractions_.resize(unique_values);
429429
sparse_map_.resize(unique_values);
430-
details::visit_type(input_column.column_->type().data_type(), [&input_column, &groups, this] (auto col_tag) {
430+
util::BitSet::bulk_insert_iterator inserter(sparse_map_);
431+
details::visit_type(input_column.column_->type().data_type(), [&input_column, &groups, &inserter, this] (auto col_tag) {
431432
using col_type_info = ScalarTypeInfo<decltype(col_tag)>;
432433
if constexpr (is_sequence_type(col_type_info::data_type)) {
433434
util::raise_rte("String aggregations not currently supported");
434435
} else if constexpr(is_empty_type(col_type_info::data_type)) {
435436
return;
436437
}
437-
Column::for_each_enumerated<typename col_type_info::TDT>(*input_column.column_, [&groups, this](auto enumerating_it) {
438+
Column::for_each_enumerated<typename col_type_info::TDT>(*input_column.column_, [&groups, &inserter, this](auto enumerating_it) {
438439
auto& fraction = fractions_[groups[enumerating_it.idx()]];
439440
if constexpr ((is_floating_point_type(col_type_info ::data_type))) {
440441
if (ARCTICDB_LIKELY(!std::isnan(enumerating_it.value()))) {
441442
fraction.numerator_ += static_cast<double>(enumerating_it.value());
442443
++fraction.denominator_;
443-
sparse_map_.set(groups[enumerating_it.idx()]);
444+
inserter = groups[enumerating_it.idx()];
444445
}
445446
} else {
446447
fraction.numerator_ += static_cast<double>(enumerating_it.value());
447448
++fraction.denominator_;
448-
sparse_map_.set(groups[enumerating_it.idx()]);
449+
inserter = groups[enumerating_it.idx()];
449450
}
450451
});
451452
});
453+
inserter.flush();
452454
}
453455

454456
SegmentInMemory MeanAggregatorData::finalize(const ColumnName& output_column_name, bool, size_t unique_values) {
@@ -492,22 +494,24 @@ std::optional<Value> MeanAggregatorData::get_default_value() {
492494
void CountAggregatorData::aggregate(const ColumnWithStrings& input_column, const std::vector<size_t>& groups, size_t unique_values) {
493495
aggregated_.resize(unique_values);
494496
sparse_map_.resize(unique_values);
495-
details::visit_type(input_column.column_->type().data_type(), [&input_column, &groups, this] (auto col_tag) {
497+
util::BitSet::bulk_insert_iterator inserter(sparse_map_);
498+
details::visit_type(input_column.column_->type().data_type(), [&input_column, &groups, &inserter, this] (auto col_tag) {
496499
using col_type_info = ScalarTypeInfo<decltype(col_tag)>;
497-
Column::for_each_enumerated<typename col_type_info::TDT>(*input_column.column_, [&groups, this](auto enumerating_it) {
500+
Column::for_each_enumerated<typename col_type_info::TDT>(*input_column.column_, [&groups, &inserter, this](auto enumerating_it) {
498501
if constexpr (is_floating_point_type(col_type_info::data_type)) {
499502
if (ARCTICDB_LIKELY(!std::isnan(enumerating_it.value()))) {
500503
auto& val = aggregated_[groups[enumerating_it.idx()]];
501504
++val;
502-
sparse_map_.set(groups[enumerating_it.idx()]);
505+
inserter = groups[enumerating_it.idx()];
503506
}
504507
} else {
505508
auto& val = aggregated_[groups[enumerating_it.idx()]];
506509
++val;
507-
sparse_map_.set(groups[enumerating_it.idx()]);
510+
inserter = groups[enumerating_it.idx()];
508511
}
509512
});
510513
});
514+
inserter.flush();
511515
}
512516

513517
SegmentInMemory CountAggregatorData::finalize(const ColumnName& output_column_name, bool, size_t unique_values) {
@@ -551,9 +555,10 @@ void FirstAggregatorData::aggregate(const ColumnWithStrings& input_column, const
551555
using GlobalRawType = typename GlobalTypeDescriptorTag::DataTypeTag::raw_type;
552556
aggregated_.resize(sizeof(GlobalRawType)* unique_values);
553557
sparse_map_.resize(unique_values);
558+
util::BitSet::bulk_insert_iterator inserter(sparse_map_);
554559
auto col_data = input_column.column_->data();
555560
auto out_ptr = reinterpret_cast<GlobalRawType*>(aggregated_.data());
556-
details::visit_type(input_column.column_->type().data_type(), [this, &groups, &out_ptr, &col_data] (auto col_tag) {
561+
details::visit_type(input_column.column_->type().data_type(), [this, &groups, &out_ptr, &col_data, &inserter] (auto col_tag) {
557562
using ColumnTagType = std::decay_t<decltype(col_tag)>;
558563
using ColumnType = typename ColumnTagType::raw_type;
559564
auto groups_pos = 0;
@@ -566,18 +571,19 @@ void FirstAggregatorData::aggregate(const ColumnWithStrings& input_column, const
566571
if (is_first_group_el || std::isnan(static_cast<ColumnType>(val))) {
567572
groups_cache_.insert(groups[groups_pos]);
568573
val = GlobalRawType(*ptr);
569-
sparse_map_.set(groups[groups_pos]);
574+
inserter = groups[groups_pos];
570575
}
571576
} else {
572577
if (is_first_group_el) {
573578
groups_cache_.insert(groups[groups_pos]);
574579
val = GlobalRawType(*ptr);
575-
sparse_map_.set(groups[groups_pos]);
580+
inserter = groups[groups_pos];
576581
}
577582
}
578583
}
579584
}
580585
});
586+
inserter.flush();
581587
});
582588
}
583589
}
@@ -625,9 +631,10 @@ void LastAggregatorData::aggregate(const ColumnWithStrings& input_column, const
625631
using GlobalRawType = typename GlobalTypeDescriptorTag::DataTypeTag::raw_type;
626632
aggregated_.resize(sizeof(GlobalRawType)* unique_values);
627633
sparse_map_.resize(unique_values);
634+
util::BitSet::bulk_insert_iterator inserter(sparse_map_);
628635
auto col_data = input_column.column_->data();
629636
auto out_ptr = reinterpret_cast<GlobalRawType*>(aggregated_.data());
630-
details::visit_type(input_column.column_->type().data_type(), [&groups, &out_ptr, &col_data, this] (auto col_tag) {
637+
details::visit_type(input_column.column_->type().data_type(), [&groups, &out_ptr, &col_data, &inserter, this] (auto col_tag) {
631638
using ColumnTagType = std::decay_t<decltype(col_tag)>;
632639
using ColumnType = typename ColumnTagType::raw_type;
633640
auto groups_pos = 0;
@@ -641,15 +648,16 @@ void LastAggregatorData::aggregate(const ColumnWithStrings& input_column, const
641648
if (is_first_group_el || !std::isnan(static_cast<ColumnType>(curr))) {
642649
groups_cache_.insert(groups[groups_pos]);
643650
val = curr;
644-
sparse_map_.set(groups[groups_pos]);
651+
inserter = groups[groups_pos];
645652
}
646653
} else {
647654
val = GlobalRawType(*ptr);
648-
sparse_map_.set(groups[groups_pos]);
655+
inserter = groups[groups_pos];
649656
}
650657
}
651658
}
652659
});
660+
inserter.flush();
653661
});
654662
}
655663
}

0 commit comments

Comments
 (0)