Skip to content

Commit a9a5714

Browse files
Topic key compaction counters (#24774)
1 parent ffa4be7 commit a9a5714

17 files changed

+515
-62
lines changed

ydb/core/http_proxy/ut/internal_counters.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -777,7 +777,7 @@
777777
"database":"/Root",
778778
"database_id":"database4",
779779
"folder_id":"folder4",
780-
"name":"topic.compaction.unprocessed_count_max",
780+
"name":"topic.partition.blobs.uncompacted_count_max",
781781
"topic":"test-counters-stream"
782782
},
783783
"value":0
@@ -790,7 +790,7 @@
790790
"database":"/Root",
791791
"database_id":"database4",
792792
"folder_id":"folder4",
793-
"name":"topic.compaction.unprocessed_bytes_max",
793+
"name":"topic.partition.blobs.uncompacted_bytes_max",
794794
"topic":"test-counters-stream"
795795
},
796796
"value":0
@@ -803,7 +803,7 @@
803803
"database":"/Root",
804804
"database_id":"database4",
805805
"folder_id":"folder4",
806-
"name":"topic.compaction.lag_milliseconds_max",
806+
"name":"topic.partition.blobs.compaction_lag_milliseconds_max",
807807
"topic":"test-counters-stream"
808808
},
809809
"value":0

ydb/core/persqueue/pqrb/read_balancer.cpp

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -615,9 +615,11 @@ void TPersQueueReadBalancer::UpdateCounters(const TActorContext& ctx) {
615615
THolder<TPartitionLabeledCounters> labeledCounters;
616616
using TConsumerLabeledCounters = TProtobufTabletLabeledCounters<EClientLabeledCounters_descriptor>;
617617
THolder<TConsumerLabeledCounters> labeledConsumerCounters;
618-
618+
using TPartitionKeyCompactionCounters = TProtobufTabletLabeledCounters<EPartitionKeyCompactionLabeledCounters_descriptor>;
619+
THolder<TPartitionKeyCompactionCounters> compactionCounters;
619620
labeledCounters.Reset(new TPartitionLabeledCounters("topic", 0, DatabasePath));
620621
labeledConsumerCounters.Reset(new TConsumerLabeledCounters("topic|x|consumer", 0, DatabasePath));
622+
compactionCounters.Reset(new TPartitionKeyCompactionCounters("topic", 0, DatabasePath));
621623

622624
if (AggregatedCounters.empty()) {
623625
for (ui32 i = 0; i < labeledCounters->GetCounters().Size(); ++i) {
@@ -628,7 +630,17 @@ void TPersQueueReadBalancer::UpdateCounters(const TActorContext& ctx) {
628630
AggregatedCounters.push_back(name.empty() ? nullptr : DynamicCounters->GetExpiringNamedCounter("name", name, false));
629631
}
630632
}
631-
633+
if (TabletConfig.GetEnableCompactification()) {
634+
if (AggregatedCompactionCounters.empty()) {
635+
for (ui32 i = 0; i < compactionCounters->GetCounters().Size(); ++i) {
636+
TStringBuf nameBuf = compactionCounters->GetNames()[i];
637+
nameBuf.SkipPrefix("PQ/");
638+
AggregatedCompactionCounters.push_back(nameBuf.empty() ? nullptr : DynamicCounters->GetExpiringNamedCounter("name", TString(nameBuf), false));
639+
}
640+
}
641+
} else {
642+
AggregatedCompactionCounters.clear();
643+
}
632644
for (auto& [consumer, info]: Consumers) {
633645
info.Aggr.Reset(new TTabletLabeledCountersBase{});
634646
if (info.AggregatedCounters.empty()) {
@@ -648,6 +660,7 @@ void TPersQueueReadBalancer::UpdateCounters(const TActorContext& ctx) {
648660
ui64 milliSeconds = TAppData::TimeProvider->Now().MilliSeconds();
649661

650662
THolder<TTabletLabeledCountersBase> aggr(new TTabletLabeledCountersBase);
663+
THolder<TTabletLabeledCountersBase> compactionAggr(new TTabletLabeledCountersBase);
651664

652665
for (auto it = AggregatedStats.Stats.begin(); it != AggregatedStats.Stats.end(); ++it) {
653666
if (!it->second.HasCounters)
@@ -657,6 +670,13 @@ void TPersQueueReadBalancer::UpdateCounters(const TActorContext& ctx) {
657670
}
658671
aggr->AggregateWith(*labeledCounters);
659672

673+
if (TabletConfig.GetEnableCompactification()) {
674+
for (ui32 i = 0; i < it->second.Counters.GetCompactionCounters().ValuesSize() && i < compactionCounters->GetCounters().Size(); ++i) {
675+
compactionCounters->GetCounters()[i] = it->second.Counters.GetCompactionCounters().GetValues(i);
676+
}
677+
compactionAggr->AggregateWith(*compactionCounters);
678+
}
679+
660680
for (const auto& consumerStats : it->second.Counters.GetConsumerAggregatedCounters()) {
661681
auto jt = Consumers.find(consumerStats.GetConsumer());
662682
if (jt == Consumers.end())
@@ -681,6 +701,14 @@ void TPersQueueReadBalancer::UpdateCounters(const TActorContext& ctx) {
681701
AggregatedCounters[i]->Set(val);
682702
}
683703

704+
for (ui32 i = 0; i < compactionAggr->GetCounters().Size() && i < AggregatedCompactionCounters.size(); ++i) {
705+
if (!AggregatedCompactionCounters[i]) {
706+
continue;
707+
}
708+
auto val = compactionAggr->GetCounters()[i].Get();
709+
AggregatedCompactionCounters[i]->Set(val);
710+
}
711+
684712
for (auto& [consumer, info] : Consumers) {
685713
for (ui32 i = 0; info.Aggr->HasCounters() && i < info.Aggr->GetCounters().Size(); ++i) {
686714
if (!info.AggregatedCounters[i])

ydb/core/persqueue/pqrb/read_balancer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>,
205205
std::unordered_set<ui64> PipesRequested;
206206

207207
std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> AggregatedCounters;
208+
std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> AggregatedCompactionCounters;
208209

209210
NMonitoring::TDynamicCounterPtr DynamicCounters;
210211
NMonitoring::TDynamicCounters::TCounterPtr ActivePartitionCountCounter;

ydb/core/persqueue/pqtablet/partition/partition.cpp

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,6 +1006,12 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
10061006
for (ui32 i = 0; i < PartitionCountersLabeled->GetCounters().Size(); ++i) {
10071007
ac->AddValues(PartitionCountersLabeled->GetCounters()[i].Get());
10081008
}
1009+
if (PartitionCompactionCounters) {
1010+
for (ui32 i = 0; i < PartitionCompactionCounters->GetCounters().Size(); ++i) {
1011+
ac->MutableCompactionCounters()->AddValues(PartitionCompactionCounters->GetCounters()[i].Get());
1012+
}
1013+
}
1014+
10091015
for (auto& userInfoPair : UsersInfoStorage->GetAll()) {
10101016
auto& userInfo = userInfoPair.second;
10111017
if (!userInfo.LabeledCounters)
@@ -1750,6 +1756,9 @@ void TPartition::Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext& ctx) {
17501756
CompacterPartitionRequestInflight = false;
17511757
if (Compacter) {
17521758
Compacter->ProcessResponse(ev);
1759+
// auto compacterCounters = Compacter->GetCounters();
1760+
// KeyCompactionReadCyclesTotal.Set(compacterCounters.ReadCyclesCount);
1761+
// KeyCompactionWriteCyclesTotal.Set(compacterCounters.WriteCyclesCount);
17531762
}
17541763
}
17551764
ReadingTimestamp = false;
@@ -2070,6 +2079,46 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) {
20702079
PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Set(quotaUsage);
20712080
}
20722081
}
2082+
if (PartitionCompactionCounters) {
2083+
Y_ENSURE(Compacter);
2084+
auto counters = Compacter->GetCounters();
2085+
if (counters.UncompactedSize != PartitionCompactionCounters->GetCounters()[METRIC_UNCOMPACTED_SIZE_MAX].Get()) {
2086+
PartitionCompactionCounters->GetCounters()[METRIC_UNCOMPACTED_SIZE_MAX].Set(counters.UncompactedSize);
2087+
haveChanges = true;
2088+
}
2089+
if (counters.UncompactedSize != PartitionCompactionCounters->GetCounters()[METRIC_UNCOMPACTED_SIZE_SUM].Get()) {
2090+
PartitionCompactionCounters->GetCounters()[METRIC_UNCOMPACTED_SIZE_SUM].Set(counters.UncompactedSize);
2091+
haveChanges = true;
2092+
}
2093+
if (counters.CompactedSize != PartitionCompactionCounters->GetCounters()[METRIC_COMPACTED_SIZE_MAX].Get()) {
2094+
PartitionCompactionCounters->GetCounters()[METRIC_COMPACTED_SIZE_MAX].Set(counters.CompactedSize);
2095+
haveChanges = true;
2096+
}
2097+
if (counters.CompactedSize != PartitionCompactionCounters->GetCounters()[METRIC_COMPACTED_SIZE_SUM].Get()) {
2098+
PartitionCompactionCounters->GetCounters()[METRIC_COMPACTED_SIZE_SUM].Set(counters.CompactedSize);
2099+
haveChanges = true;
2100+
}
2101+
if (counters.UncompactedCount != PartitionCompactionCounters->GetCounters()[METRIC_UNCOMPACTED_COUNT].Get()) {
2102+
PartitionCompactionCounters->GetCounters()[METRIC_UNCOMPACTED_COUNT].Set(counters.UncompactedCount);
2103+
haveChanges = true;
2104+
}
2105+
if (counters.CompactedCount != PartitionCompactionCounters->GetCounters()[METRIC_COMPACTED_COUNT].Get()) {
2106+
PartitionCompactionCounters->GetCounters()[METRIC_COMPACTED_COUNT].Set(counters.CompactedCount);
2107+
haveChanges = true;
2108+
}
2109+
// if (counters.GetUncompactedRatio() != PartitionCompactionCounters->GetCounters()[METRIC_UNCOMPACTED_RATIO].Get()) {
2110+
// PartitionCompactionCounters->GetCounters()[METRIC_UNCOMPACTED_RATIO].Set(counters.GetUncompactedRatio());
2111+
// haveChanges = true;
2112+
// }
2113+
if (counters.CurrReadCycleDuration.MilliSeconds() != PartitionCompactionCounters->GetCounters()[METRIC_CURR_CYCLE_DURATION].Get()) {
2114+
PartitionCompactionCounters->GetCounters()[METRIC_CURR_CYCLE_DURATION].Set(counters.CurrReadCycleDuration.MilliSeconds());
2115+
haveChanges = true;
2116+
}
2117+
if (counters.CurrentReadCycleKeys != PartitionCompactionCounters->GetCounters()[METRIC_CURR_READ_CYCLE_KEYS].Get()) {
2118+
PartitionCompactionCounters->GetCounters()[METRIC_CURR_READ_CYCLE_KEYS].Set(counters.CurrentReadCycleKeys);
2119+
haveChanges = true;
2120+
}
2121+
}
20732122
return haveChanges;
20742123
}
20752124

@@ -2103,6 +2152,9 @@ void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext&
21032152
Send(ReadQuotaTrackerActor, new TEvPQ::TEvReleaseExclusiveLock());
21042153
if (Compacter) {
21052154
Compacter->ProcessResponse(ev);
2155+
// auto compacterCounters = Compacter->GetCounters();
2156+
// KeyCompactionReadCyclesTotal.Set(compacterCounters.ReadCyclesCount);
2157+
// KeyCompactionWriteCyclesTotal.Set(compacterCounters.WriteCyclesCount);
21062158
}
21072159
return;
21082160
}
@@ -4318,6 +4370,10 @@ bool TPartition::IsSupportive() const
43184370
return Partition.IsSupportivePartition();
43194371
}
43204372

4373+
bool TPartition::IsKeyCompactionEnabled() const {
4374+
return Config.GetEnableCompactification() && AppData()->FeatureFlags.GetEnableTopicCompactificationByKey() && !IsSupportive();
4375+
}
4376+
43214377
void TPartition::AttachPersistRequestSpan(NWilson::TSpan& span)
43224378
{
43234379
if (span) {
@@ -4346,9 +4402,10 @@ void TPartition::Handle(TEvPQ::TEvExclusiveLockAcquired::TPtr&) {
43464402

43474403
IActor* CreatePartitionActor(ui64 tabletId, const TPartitionId& partition, const TActorId& tablet, ui32 tabletGeneration,
43484404
const TActorId& blobCache, const NPersQueue::TTopicConverterPtr& topicConverter, TString dcId, bool isServerless,
4349-
const NKikimrPQ::TPQTabletConfig& config, const std::shared_ptr<TTabletCountersBase>& counters, bool SubDomainOutOfSpace,
4350-
ui32 numChannels, const TActorId& writeQuoterActorId,
4351-
TIntrusivePtr<NJaegerTracing::TSamplingThrottlingControl> samplingControl, bool newPartition) {
4405+
const NKikimrPQ::TPQTabletConfig& config, const std::shared_ptr<TTabletCountersBase>& counters, bool SubDomainOutOfSpace,
4406+
ui32 numChannels, const TActorId& writeQuoterActorId,
4407+
TIntrusivePtr<NJaegerTracing::TSamplingThrottlingControl> samplingControl, bool newPartition
4408+
) {
43524409

43534410
return new TPartition(tabletId, partition, tablet, tabletGeneration, blobCache, topicConverter, dcId, isServerless,
43544411
config, counters, SubDomainOutOfSpace, numChannels, writeQuoterActorId, samplingControl,

ydb/core/persqueue/pqtablet/partition/partition.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,7 @@ class TPartition : public TBaseActor<TPartition> {
498498
ui64 GetReadOffset(ui64 offset, TMaybe<TInstant> readTimestamp) const;
499499

500500
TConsumerSnapshot CreateSnapshot(TUserInfo& userInfo) const;
501+
bool IsKeyCompactionEnabled() const;
501502
void CreateCompacter();
502503
void SendCompacterWriteRequest(THolder<TEvKeyValue::TEvRequest>&& request);
503504

@@ -918,13 +919,16 @@ class TPartition : public TBaseActor<TPartition> {
918919
TTabletCountersBase TabletCounters;
919920
THolder<TPartitionLabeledCounters> PartitionCountersLabeled;
920921

922+
THolder<TPartitionKeyCompactionCounters> PartitionCompactionCounters;
923+
921924
// Per partition counters
922925
NMonitoring::TDynamicCounters::TCounterPtr WriteTimeLagMsByLastWritePerPartition;
923926
NMonitoring::TDynamicCounters::TCounterPtr SourceIdCountPerPartition;
924927
NMonitoring::TDynamicCounters::TCounterPtr TimeSinceLastWriteMsPerPartition;
925928
NMonitoring::TDynamicCounters::TCounterPtr BytesWrittenPerPartition;
926929
NMonitoring::TDynamicCounters::TCounterPtr MessagesWrittenPerPartition;
927930

931+
928932
TInstant LastCountersUpdate;
929933

930934
TSubscriber Subscriber;
@@ -1004,6 +1008,9 @@ class TPartition : public TBaseActor<TPartition> {
10041008
TMultiCounter CompactionUnprocessedBytes;
10051009
TMultiCounter CompactionTimeLag;
10061010

1011+
// NKikimr::NPQ::TMultiCounter KeyCompactionReadCyclesTotal;
1012+
// NKikimr::NPQ::TMultiCounter KeyCompactionWriteCyclesTotal;
1013+
10071014
// Writing blob with topic quota variables
10081015
ui64 TopicQuotaRequestCookie = 0;
10091016
ui64 NextTopicWriteQuotaRequestCookie = 1;

0 commit comments

Comments
 (0)