@@ -1006,6 +1006,12 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
1006
1006
for (ui32 i = 0 ; i < PartitionCountersLabeled->GetCounters ().Size (); ++i) {
1007
1007
ac->AddValues (PartitionCountersLabeled->GetCounters ()[i].Get ());
1008
1008
}
1009
+ if (PartitionCompactionCounters) {
1010
+ for (ui32 i = 0 ; i < PartitionCompactionCounters->GetCounters ().Size (); ++i) {
1011
+ ac->MutableCompactionCounters ()->AddValues (PartitionCompactionCounters->GetCounters ()[i].Get ());
1012
+ }
1013
+ }
1014
+
1009
1015
for (auto & userInfoPair : UsersInfoStorage->GetAll ()) {
1010
1016
auto & userInfo = userInfoPair.second ;
1011
1017
if (!userInfo.LabeledCounters )
@@ -1750,6 +1756,9 @@ void TPartition::Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext& ctx) {
1750
1756
CompacterPartitionRequestInflight = false ;
1751
1757
if (Compacter) {
1752
1758
Compacter->ProcessResponse (ev);
1759
+ // auto compacterCounters = Compacter->GetCounters();
1760
+ // KeyCompactionReadCyclesTotal.Set(compacterCounters.ReadCyclesCount);
1761
+ // KeyCompactionWriteCyclesTotal.Set(compacterCounters.WriteCyclesCount);
1753
1762
}
1754
1763
}
1755
1764
ReadingTimestamp = false ;
@@ -2070,6 +2079,46 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) {
2070
2079
PartitionCountersLabeled->GetCounters ()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Set (quotaUsage);
2071
2080
}
2072
2081
}
2082
+ if (PartitionCompactionCounters) {
2083
+ Y_ENSURE (Compacter);
2084
+ auto counters = Compacter->GetCounters ();
2085
+ if (counters.UncompactedSize != PartitionCompactionCounters->GetCounters ()[METRIC_UNCOMPACTED_SIZE_MAX].Get ()) {
2086
+ PartitionCompactionCounters->GetCounters ()[METRIC_UNCOMPACTED_SIZE_MAX].Set (counters.UncompactedSize );
2087
+ haveChanges = true ;
2088
+ }
2089
+ if (counters.UncompactedSize != PartitionCompactionCounters->GetCounters ()[METRIC_UNCOMPACTED_SIZE_SUM].Get ()) {
2090
+ PartitionCompactionCounters->GetCounters ()[METRIC_UNCOMPACTED_SIZE_SUM].Set (counters.UncompactedSize );
2091
+ haveChanges = true ;
2092
+ }
2093
+ if (counters.CompactedSize != PartitionCompactionCounters->GetCounters ()[METRIC_COMPACTED_SIZE_MAX].Get ()) {
2094
+ PartitionCompactionCounters->GetCounters ()[METRIC_COMPACTED_SIZE_MAX].Set (counters.CompactedSize );
2095
+ haveChanges = true ;
2096
+ }
2097
+ if (counters.CompactedSize != PartitionCompactionCounters->GetCounters ()[METRIC_COMPACTED_SIZE_SUM].Get ()) {
2098
+ PartitionCompactionCounters->GetCounters ()[METRIC_COMPACTED_SIZE_SUM].Set (counters.CompactedSize );
2099
+ haveChanges = true ;
2100
+ }
2101
+ if (counters.UncompactedCount != PartitionCompactionCounters->GetCounters ()[METRIC_UNCOMPACTED_COUNT].Get ()) {
2102
+ PartitionCompactionCounters->GetCounters ()[METRIC_UNCOMPACTED_COUNT].Set (counters.UncompactedCount );
2103
+ haveChanges = true ;
2104
+ }
2105
+ if (counters.CompactedCount != PartitionCompactionCounters->GetCounters ()[METRIC_COMPACTED_COUNT].Get ()) {
2106
+ PartitionCompactionCounters->GetCounters ()[METRIC_COMPACTED_COUNT].Set (counters.CompactedCount );
2107
+ haveChanges = true ;
2108
+ }
2109
+ // if (counters.GetUncompactedRatio() != PartitionCompactionCounters->GetCounters()[METRIC_UNCOMPACTED_RATIO].Get()) {
2110
+ // PartitionCompactionCounters->GetCounters()[METRIC_UNCOMPACTED_RATIO].Set(counters.GetUncompactedRatio());
2111
+ // haveChanges = true;
2112
+ // }
2113
+ if (counters.CurrReadCycleDuration .MilliSeconds () != PartitionCompactionCounters->GetCounters ()[METRIC_CURR_CYCLE_DURATION].Get ()) {
2114
+ PartitionCompactionCounters->GetCounters ()[METRIC_CURR_CYCLE_DURATION].Set (counters.CurrReadCycleDuration .MilliSeconds ());
2115
+ haveChanges = true ;
2116
+ }
2117
+ if (counters.CurrentReadCycleKeys != PartitionCompactionCounters->GetCounters ()[METRIC_CURR_READ_CYCLE_KEYS].Get ()) {
2118
+ PartitionCompactionCounters->GetCounters ()[METRIC_CURR_READ_CYCLE_KEYS].Set (counters.CurrentReadCycleKeys );
2119
+ haveChanges = true ;
2120
+ }
2121
+ }
2073
2122
return haveChanges;
2074
2123
}
2075
2124
@@ -2103,6 +2152,9 @@ void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext&
2103
2152
Send (ReadQuotaTrackerActor, new TEvPQ::TEvReleaseExclusiveLock ());
2104
2153
if (Compacter) {
2105
2154
Compacter->ProcessResponse (ev);
2155
+ // auto compacterCounters = Compacter->GetCounters();
2156
+ // KeyCompactionReadCyclesTotal.Set(compacterCounters.ReadCyclesCount);
2157
+ // KeyCompactionWriteCyclesTotal.Set(compacterCounters.WriteCyclesCount);
2106
2158
}
2107
2159
return ;
2108
2160
}
@@ -4318,6 +4370,10 @@ bool TPartition::IsSupportive() const
4318
4370
return Partition.IsSupportivePartition ();
4319
4371
}
4320
4372
4373
+ bool TPartition::IsKeyCompactionEnabled () const {
4374
+ return Config.GetEnableCompactification () && AppData ()->FeatureFlags .GetEnableTopicCompactificationByKey () && !IsSupportive ();
4375
+ }
4376
+
4321
4377
void TPartition::AttachPersistRequestSpan (NWilson::TSpan& span)
4322
4378
{
4323
4379
if (span) {
@@ -4346,9 +4402,10 @@ void TPartition::Handle(TEvPQ::TEvExclusiveLockAcquired::TPtr&) {
4346
4402
4347
4403
IActor* CreatePartitionActor (ui64 tabletId, const TPartitionId& partition, const TActorId& tablet, ui32 tabletGeneration,
4348
4404
const TActorId& blobCache, const NPersQueue::TTopicConverterPtr& topicConverter, TString dcId, bool isServerless,
4349
- const NKikimrPQ::TPQTabletConfig& config, const std::shared_ptr<TTabletCountersBase>& counters, bool SubDomainOutOfSpace,
4350
- ui32 numChannels, const TActorId& writeQuoterActorId,
4351
- TIntrusivePtr<NJaegerTracing::TSamplingThrottlingControl> samplingControl, bool newPartition) {
4405
+ const NKikimrPQ::TPQTabletConfig& config, const std::shared_ptr<TTabletCountersBase>& counters, bool SubDomainOutOfSpace,
4406
+ ui32 numChannels, const TActorId& writeQuoterActorId,
4407
+ TIntrusivePtr<NJaegerTracing::TSamplingThrottlingControl> samplingControl, bool newPartition
4408
+ ) {
4352
4409
4353
4410
return new TPartition (tabletId, partition, tablet, tabletGeneration, blobCache, topicConverter, dcId, isServerless,
4354
4411
config, counters, SubDomainOutOfSpace, numChannels, writeQuoterActorId, samplingControl,
0 commit comments