Skip to content

Commit 8f81173

Browse files
authored
Extract common code (#29671)
1 parent 539848e commit 8f81173

File tree

1 file changed

+72
-101
lines changed

1 file changed

+72
-101
lines changed

ydb/core/persqueue/pqrb/read_balancer.cpp

Lines changed: 72 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -607,150 +607,121 @@ void TPersQueueReadBalancer::UpdateConfigCounters() {
607607
}
608608

609609
void TPersQueueReadBalancer::UpdateCounters(const TActorContext& ctx) {
610-
if (!AggregatedStats.Stats.size())
610+
if (!AggregatedStats.Stats.size()) {
611611
return;
612+
}
612613

613-
if (!DynamicCounters)
614+
if (!DynamicCounters) {
614615
return;
616+
}
617+
618+
auto ensureCounters = [&](auto& counters, auto& config, const std::vector<std::pair<TString, TString>>& subgroups = {}, bool skipPrefix = true) {
619+
auto group = DynamicCounters;
620+
if (counters.empty()) {
621+
for (const auto& subgroup : subgroups) {
622+
group = group->GetSubgroup(subgroup.first, subgroup.second);
623+
}
624+
625+
for (size_t i = 0; i < config->GetCounters().Size(); ++i) {
626+
TString name = config->GetNames()[i];
627+
if (skipPrefix) {
628+
TStringBuf nameBuf = name;
629+
nameBuf.SkipPrefix("PQ/");
630+
name = nameBuf;
631+
}
632+
counters.push_back(name.empty() ? nullptr : group->GetExpiringNamedCounter("name", name, false));
633+
}
634+
}
635+
};
615636

616637
using TPartitionLabeledCounters = TProtobufTabletLabeledCounters<EPartitionLabeledCounters_descriptor>;
617-
THolder<TPartitionLabeledCounters> labeledCounters;
618-
using TConsumerLabeledCounters = TProtobufTabletLabeledCounters<EClientLabeledCounters_descriptor>;
619-
THolder<TConsumerLabeledCounters> labeledConsumerCounters;
620-
using TPartitionKeyCompactionCounters = TProtobufTabletLabeledCounters<EPartitionKeyCompactionLabeledCounters_descriptor>;
621-
THolder<TPartitionKeyCompactionCounters> compactionCounters;
638+
auto labeledCounters = std::make_unique<TPartitionLabeledCounters>("topic", 0, DatabasePath);
639+
ensureCounters(AggregatedCounters, labeledCounters);
640+
622641
using TPartitionExtendedLabeledCounters = TProtobufTabletLabeledCounters<EPartitionExtendedLabeledCounters_descriptor>;
623-
THolder<TPartitionExtendedLabeledCounters> extendedLabeledCounters;
624-
labeledCounters.Reset(new TPartitionLabeledCounters("topic", 0, DatabasePath));
625-
labeledConsumerCounters.Reset(new TConsumerLabeledCounters("topic|x|consumer", 0, DatabasePath));
626-
compactionCounters.Reset(new TPartitionKeyCompactionCounters("topic", 0, DatabasePath));
627-
extendedLabeledCounters.Reset(new TPartitionExtendedLabeledCounters("topic", 0, DatabasePath));
628-
629-
if (AggregatedCounters.empty()) {
630-
for (ui32 i = 0; i < labeledCounters->GetCounters().Size(); ++i) {
631-
TString name = labeledCounters->GetNames()[i];
632-
TStringBuf nameBuf = name;
633-
nameBuf.SkipPrefix("PQ/");
634-
name = nameBuf;
635-
AggregatedCounters.push_back(name.empty() ? nullptr : DynamicCounters->GetExpiringNamedCounter("name", name, false));
636-
}
637-
}
638-
if (AggregatedExtendedCounters.empty()) {
639-
for (ui32 i = 0; i < extendedLabeledCounters->GetCounters().Size(); ++i) {
640-
TString name = extendedLabeledCounters->GetNames()[i];
641-
AggregatedExtendedCounters.push_back(name.empty() ? nullptr : DynamicCounters->GetExpiringNamedCounter("name", name, false));
642-
}
643-
}
642+
auto extendedLabeledCounters = std::make_unique<TPartitionExtendedLabeledCounters>("topic", 0, DatabasePath);
643+
ensureCounters(AggregatedExtendedCounters, extendedLabeledCounters, {}, false);
644644

645+
using TPartitionKeyCompactionCounters = TProtobufTabletLabeledCounters<EPartitionKeyCompactionLabeledCounters_descriptor>;
646+
auto compactionCounters = std::make_unique<TPartitionKeyCompactionCounters>("topic", 0, DatabasePath);
645647
if (TabletConfig.GetEnableCompactification()) {
646-
if (AggregatedCompactionCounters.empty()) {
647-
for (ui32 i = 0; i < compactionCounters->GetCounters().Size(); ++i) {
648-
TStringBuf nameBuf = compactionCounters->GetNames()[i];
649-
nameBuf.SkipPrefix("PQ/");
650-
AggregatedCompactionCounters.push_back(nameBuf.empty() ? nullptr : DynamicCounters->GetExpiringNamedCounter("name", TString(nameBuf), false));
651-
}
652-
}
648+
ensureCounters(AggregatedCompactionCounters, compactionCounters);
653649
} else {
654650
AggregatedCompactionCounters.clear();
655651
}
652+
653+
using TConsumerLabeledCounters = TProtobufTabletLabeledCounters<EClientLabeledCounters_descriptor>;
654+
auto labeledConsumerCounters = std::make_unique<TConsumerLabeledCounters>("topic|x|consumer", 0, DatabasePath);
656655
for (auto& [consumer, info]: Consumers) {
656+
ensureCounters(info.AggregatedCounters, labeledConsumerCounters, {{"consumer", NPersQueue::ConvertOldConsumerName(consumer, ctx)}});
657657
info.Aggr.Reset(new TTabletLabeledCountersBase{});
658-
if (info.AggregatedCounters.empty()) {
659-
auto clientCounters = DynamicCounters->GetSubgroup("consumer", NPersQueue::ConvertOldConsumerName(consumer, ctx));
660-
for (ui32 i = 0; i < labeledConsumerCounters->GetCounters().Size(); ++i) {
661-
TString name = labeledConsumerCounters->GetNames()[i];
662-
TStringBuf nameBuf = name;
663-
nameBuf.SkipPrefix("PQ/");
664-
name = nameBuf;
665-
info.AggregatedCounters.push_back(name.empty() ? nullptr : clientCounters->GetExpiringNamedCounter("name", name, false));
666-
}
667-
}
668658
}
669659

670660
/*** apply counters ****/
671661

672662
ui64 milliSeconds = TAppData::TimeProvider->Now().MilliSeconds();
673663

674-
THolder<TTabletLabeledCountersBase> aggr(new TTabletLabeledCountersBase);
675-
THolder<TTabletLabeledCountersBase> aggrExtended(new TTabletLabeledCountersBase);
676-
THolder<TTabletLabeledCountersBase> compactionAggr(new TTabletLabeledCountersBase);
664+
auto aggr = std::make_unique<TTabletLabeledCountersBase>();
665+
auto aggrExtended = std::make_unique<TTabletLabeledCountersBase>();
666+
auto compactionAggr = std::make_unique<TTabletLabeledCountersBase>();
667+
668+
auto setCounters = [](auto& counters, const auto& state) {
669+
for (size_t i = 0; i < counters->GetCounters().Size() && i < state.ValuesSize(); ++i) {
670+
counters->GetCounters()[i] = state.GetValues(i);
671+
}
672+
};
677673

678674
for (auto it = AggregatedStats.Stats.begin(); it != AggregatedStats.Stats.end(); ++it) {
679-
if (!it->second.HasCounters)
675+
auto& partitionStats = it->second;
676+
677+
if (!partitionStats.HasCounters) {
680678
continue;
681-
for (ui32 i = 0; i < it->second.Counters.ValuesSize() && i < labeledCounters->GetCounters().Size(); ++i) {
682-
labeledCounters->GetCounters()[i] = it->second.Counters.GetValues(i);
683679
}
680+
681+
setCounters(labeledCounters, partitionStats.Counters);
684682
aggr->AggregateWith(*labeledCounters);
685683

686-
for (ui32 i = 0; i < it->second.Counters.GetExtendedCounters().ValuesSize()
687-
&& i < extendedLabeledCounters->GetCounters().Size(); ++i
688-
) {
689-
extendedLabeledCounters->GetCounters()[i] = it->second.Counters.GetExtendedCounters().GetValues(i);
690-
}
684+
setCounters(extendedLabeledCounters, partitionStats.Counters.GetExtendedCounters());
691685
aggrExtended->AggregateWith(*extendedLabeledCounters);
692686

693687
if (TabletConfig.GetEnableCompactification()) {
694-
for (ui32 i = 0; i < it->second.Counters.GetCompactionCounters().ValuesSize() && i < compactionCounters->GetCounters().Size(); ++i) {
695-
compactionCounters->GetCounters()[i] = it->second.Counters.GetCompactionCounters().GetValues(i);
696-
}
688+
setCounters(compactionCounters, partitionStats.Counters.GetCompactionCounters());
697689
compactionAggr->AggregateWith(*compactionCounters);
698690
}
699691

700-
for (const auto& consumerStats : it->second.Counters.GetConsumerAggregatedCounters()) {
692+
for (const auto& consumerStats : partitionStats.Counters.GetConsumerAggregatedCounters()) {
701693
auto jt = Consumers.find(consumerStats.GetConsumer());
702-
if (jt == Consumers.end())
694+
if (jt == Consumers.end()) {
703695
continue;
704-
for (ui32 i = 0; i < consumerStats.ValuesSize() && i < labeledCounters->GetCounters().Size(); ++i) {
705-
labeledConsumerCounters->GetCounters()[i] = consumerStats.GetValues(i);
706696
}
707-
jt->second.Aggr->AggregateWith(*labeledConsumerCounters);
708-
}
697+
auto& consumerInfo = jt->second;
709698

710-
}
711-
712-
/*** show counters ***/
713-
for (ui32 i = 0; aggr->HasCounters() && i < aggr->GetCounters().Size(); ++i) {
714-
if (!AggregatedCounters[i])
715-
continue;
716-
const auto& type = aggr->GetCounterType(i);
717-
auto val = aggr->GetCounters()[i].Get();
718-
if (type == TLabeledCounterOptions::CT_TIMELAG) {
719-
val = val <= milliSeconds ? milliSeconds - val : 0;
720-
}
721-
AggregatedCounters[i]->Set(val);
722-
}
723-
for (ui32 i = 0; aggrExtended->HasCounters() && i < aggrExtended->GetCounters().Size(); ++i) {
724-
if (!AggregatedExtendedCounters[i])
725-
continue;
726-
const auto& type = aggrExtended->GetCounterType(i);
727-
auto val = aggrExtended->GetCounters()[i].Get();
728-
if (type == TLabeledCounterOptions::CT_TIMELAG) {
729-
val = val <= milliSeconds ? milliSeconds - val : 0;
699+
setCounters(labeledConsumerCounters, consumerStats);
700+
consumerInfo.Aggr->AggregateWith(*labeledConsumerCounters);
730701
}
731-
AggregatedExtendedCounters[i]->Set(val);
732702
}
733703

734-
735-
for (ui32 i = 0; i < compactionAggr->GetCounters().Size() && i < AggregatedCompactionCounters.size(); ++i) {
736-
if (!AggregatedCompactionCounters[i]) {
737-
continue;
738-
}
739-
auto val = compactionAggr->GetCounters()[i].Get();
740-
AggregatedCompactionCounters[i]->Set(val);
741-
}
742-
743-
for (auto& [consumer, info] : Consumers) {
744-
for (ui32 i = 0; info.Aggr->HasCounters() && i < info.Aggr->GetCounters().Size(); ++i) {
745-
if (!info.AggregatedCounters[i])
704+
auto processAggregators = [milliSeconds](auto& aggregator, auto& counters) {
705+
for (size_t i = 0; aggregator->HasCounters() && i < aggregator->GetCounters().Size(); ++i) {
706+
if (!counters[i]) {
746707
continue;
747-
const auto& type = info.Aggr->GetCounterType(i);
748-
auto val = info.Aggr->GetCounters()[i].Get();
708+
}
709+
const auto& type = aggregator->GetCounterType(i);
710+
auto val = aggregator->GetCounters()[i].Get();
749711
if (type == TLabeledCounterOptions::CT_TIMELAG) {
750712
val = val <= milliSeconds ? milliSeconds - val : 0;
751713
}
752-
info.AggregatedCounters[i]->Set(val);
714+
counters[i]->Set(val);
753715
}
716+
};
717+
718+
/*** show counters ***/
719+
processAggregators(aggr, AggregatedCounters);
720+
processAggregators(aggrExtended, AggregatedExtendedCounters);
721+
processAggregators(compactionAggr, AggregatedCompactionCounters);
722+
723+
for (auto& [consumer, info] : Consumers) {
724+
processAggregators(info.Aggr, info.AggregatedCounters);
754725
}
755726
}
756727

0 commit comments

Comments
 (0)