From 8879b1fba9c554dce468339f6eba5830ff9eb481 Mon Sep 17 00:00:00 2001 From: Vlad Kuznetsov Date: Fri, 3 Oct 2025 11:03:32 +0200 Subject: [PATCH] EXT-1551. Remove obsolete UsageByActivity sensor from ActorSystem (#26188) --- ydb/library/actors/core/actor.cpp | 56 ---- ydb/library/actors/core/actor.h | 25 -- ydb/library/actors/core/defs.h | 2 - .../actors/core/executor_pool_base.cpp | 17 - ydb/library/actors/core/executor_pool_base.h | 1 - ydb/library/actors/core/executor_thread.cpp | 6 - ydb/library/actors/core/mailbox.cpp | 23 -- ydb/library/actors/core/mailbox.h | 20 -- ydb/library/actors/core/mon_stats.cpp | 12 +- ydb/library/actors/core/mon_stats.h | 1 - .../actors/helpers/collector_counters.cpp | 306 ++++++++++++++++++ .../actors/helpers/collector_counters.h | 124 +++++++ 12 files changed, 431 insertions(+), 162 deletions(-) create mode 100644 ydb/library/actors/helpers/collector_counters.cpp create mode 100644 ydb/library/actors/helpers/collector_counters.h diff --git a/ydb/library/actors/core/actor.cpp b/ydb/library/actors/core/actor.cpp index 355e80c0df02..1a2866b21fda 100644 --- a/ydb/library/actors/core/actor.cpp +++ b/ydb/library/actors/core/actor.cpp @@ -35,62 +35,6 @@ namespace NActors { return *this; } - template - static void UpdateQueueSizeAndTimestamp(TActorUsageImpl& impl, ui64 time) { - ui64 usedTimeIncrement = 0; - using T = TActorUsageImpl; - - for (;;) { - uint64_t value = impl.QueueSizeAndTimestamp.load(); - ui64 count = value >> T::TimestampBits; - - count += Increment; - Y_ABORT_UNLESS((count & ~T::CountMask) == 0); - - ui64 timestamp = value; - if (Increment == 1 && count == 1) { - timestamp = time; - } else if (Increment == -1 && count == 0) { - usedTimeIncrement = (static_cast(time) - timestamp) & T::TimestampMask; - timestamp = 0; // reset timestamp to some zero value - } - - const ui64 updated = (timestamp & T::TimestampMask) | (count << T::TimestampBits); - if (impl.QueueSizeAndTimestamp.compare_exchange_weak(value, updated)) { - break; - } - } - - if (usedTimeIncrement && impl.LastUsageTimestamp <= time) { - impl.UsedTime += usedTimeIncrement; - } - } - - void TActorUsageImpl::OnEnqueueEvent(ui64 time) { - UpdateQueueSizeAndTimestamp<+1>(*this, time); - } - - void TActorUsageImpl::OnDequeueEvent() { - UpdateQueueSizeAndTimestamp<-1>(*this, GetCycleCountFast()); - } - - double TActorUsageImpl::GetUsage(ui64 time) { - ui64 used = UsedTime.exchange(0); - if (const ui64 value = QueueSizeAndTimestamp.load(); value >> TimestampBits) { - used += (static_cast(time) - value) & TimestampMask; - } - - Y_ABORT_UNLESS(LastUsageTimestamp <= time); - ui64 passed = time - LastUsageTimestamp; - LastUsageTimestamp = time; - - if (!passed) { - return 0; - } - - return (double)Min(passed, used) / passed; - } - void IActor::Describe(IOutputStream &out) const noexcept { SelfActorId.Out(out); } diff --git a/ydb/library/actors/core/actor.h b/ydb/library/actors/core/actor.h index 504335fa6a3d..9d30221563ff 100644 --- a/ydb/library/actors/core/actor.h +++ b/ydb/library/actors/core/actor.h @@ -320,34 +320,9 @@ namespace NActors { }; - template - struct TActorUsageImpl { - void OnEnqueueEvent(ui64 /*time*/) {} // called asynchronously when event is put in the mailbox - void OnDequeueEvent() {} // called when processed by Executor - double GetUsage(ui64 /*time*/) { return 0; } // called from collector thread - void DoActorInit() {} - }; - - template<> - struct TActorUsageImpl { - static constexpr int TimestampBits = 40; - static constexpr int CountBits = 24; - static constexpr ui64 TimestampMask = ((ui64)1 << TimestampBits) - 1; - static constexpr ui64 CountMask = ((ui64)1 << CountBits) - 1; - - std::atomic_uint64_t QueueSizeAndTimestamp = 0; - std::atomic_uint64_t UsedTime = 0; // how much time did we consume since last GetUsage() call - ui64 LastUsageTimestamp = 0; // when GetUsage() was called the last time - - void OnEnqueueEvent(ui64 time); - void OnDequeueEvent(); - double GetUsage(ui64 time); - void DoActorInit() { LastUsageTimestamp = GetCycleCountFast(); } - }; class IActor : protected IActorOps - , public TActorUsageImpl { private: TActorIdentity SelfActorId; diff --git a/ydb/library/actors/core/defs.h b/ydb/library/actors/core/defs.h index 9629225b889e..0c0247c8f1df 100644 --- a/ydb/library/actors/core/defs.h +++ b/ydb/library/actors/core/defs.h @@ -12,8 +12,6 @@ // event processing time histograms #define ACTORSLIB_COLLECT_EXEC_STATS -static constexpr bool ActorLibCollectUsageStats = false; - namespace NActors { using TPoolId = ui8; using TPoolsMask = ui64; diff --git a/ydb/library/actors/core/executor_pool_base.cpp b/ydb/library/actors/core/executor_pool_base.cpp index 0c538f37b9e8..fc512789f7f3 100644 --- a/ydb/library/actors/core/executor_pool_base.cpp +++ b/ydb/library/actors/core/executor_pool_base.cpp @@ -12,7 +12,6 @@ namespace NActors { void DoActorInit(TActorSystem* sys, IActor* actor, const TActorId& self, const TActorId& owner) { actor->SelfActorId = self; - actor->DoActorInit(); actor->Registered(sys, owner); } @@ -34,17 +33,6 @@ namespace NActors { const TMonotonic now = ActorSystem->Monotonic(); - for (auto& u : stats.UsageByActivity) { - u.fill(0); - } - - auto accountUsage = [&](ui32 activityType, double usage) { - Y_ABORT_UNLESS(0 <= usage); - Y_ABORT_UNLESS(usage <= 1); - int bin = Min(9, usage * 10); - ++stats.UsageByActivity[activityType][bin]; - }; - std::fill(stats.StuckActorsByActivity.begin(), stats.StuckActorsByActivity.end(), 0); with_lock (StuckObserverMutex) { @@ -55,12 +43,7 @@ namespace NActors { if (delta > TDuration::Seconds(30)) { ++stats.StuckActorsByActivity[actor->GetActivityType()]; } - accountUsage(actor->GetActivityType(), actor->GetUsage(GetCycleCountFast())); - } - for (const auto& [activityType, usage] : DeadActorsUsage) { - accountUsage(activityType, usage); } - DeadActorsUsage.clear(); } } #endif diff --git a/ydb/library/actors/core/executor_pool_base.h b/ydb/library/actors/core/executor_pool_base.h index 323b06595805..fa54b85ecfcb 100644 --- a/ydb/library/actors/core/executor_pool_base.h +++ b/ydb/library/actors/core/executor_pool_base.h @@ -26,7 +26,6 @@ namespace NActors { // Stuck actor monitoring TMutex StuckObserverMutex; std::vector Actors; - mutable std::vector> DeadActorsUsage; friend class TGenericExecutorThread; friend class TSharedExecutorThread; void RecalculateStuckActors(TExecutorThreadStats& stats) const; diff --git a/ydb/library/actors/core/executor_thread.cpp b/ydb/library/actors/core/executor_thread.cpp index acc82a046489..bc9e02cfa647 100644 --- a/ydb/library/actors/core/executor_thread.cpp +++ b/ydb/library/actors/core/executor_thread.cpp @@ -122,7 +122,6 @@ namespace NActors { actorPtr = pool->Actors.back(); actorPtr->StuckIndex = i; pool->Actors.pop_back(); - pool->DeadActorsUsage.emplace_back(actor->GetActivityType(), actor->GetUsage(GetCycleCountFast())); } } } @@ -207,7 +206,6 @@ namespace NActors { for (; Ctx.ExecutedEvents < Ctx.EventsPerMailbox; ++Ctx.ExecutedEvents) { if (TAutoPtr evExt = mailbox->Pop()) { - mailbox->ProcessEvents(mailbox); recipient = evExt->GetRecipientRewrite(); TActorContext ctx(*mailbox, *this, eventStart, recipient); TlsActivationContext = &ctx; // ensure dtor (if any) is called within actor system @@ -253,14 +251,10 @@ namespace NActors { hpnow = GetCycleCountFast(); hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow); - mailbox->ProcessEvents(mailbox); - actor->OnDequeueEvent(); - size_t dyingActorsCnt = DyingActors.size(); Ctx.UpdateActorsStats(dyingActorsCnt); if (dyingActorsCnt) { DropUnregistered(); - mailbox->ProcessEvents(mailbox); actor = nullptr; } diff --git a/ydb/library/actors/core/mailbox.cpp b/ydb/library/actors/core/mailbox.cpp index 6b7ba86431a1..2978a46a0c33 100644 --- a/ydb/library/actors/core/mailbox.cpp +++ b/ydb/library/actors/core/mailbox.cpp @@ -180,7 +180,6 @@ namespace NActors { switch (x->MailboxType) { case TMailboxType::Simple: { TSimpleMailbox* const mailbox = TSimpleMailbox::Get(lineHint, x); - mailbox->Push(recipient.LocalId()); #if (!defined(_tsan_enabled_)) Y_DEBUG_ABORT_UNLESS(mailbox->Type == (ui32)x->MailboxType); #endif @@ -205,7 +204,6 @@ namespace NActors { return false; TRevolvingMailbox* const mailbox = TRevolvingMailbox::Get(lineHint, x); - mailbox->Push(recipient.LocalId()); #if (!defined(_tsan_enabled_)) Y_DEBUG_ABORT_UNLESS(mailbox->Type == (ui32)x->MailboxType); #endif @@ -218,7 +216,6 @@ namespace NActors { return true; case TMailboxType::HTSwap: { THTSwapMailbox* const mailbox = THTSwapMailbox::Get(lineHint, x); - mailbox->Push(recipient.LocalId()); #if (!defined(_tsan_enabled_)) Y_DEBUG_ABORT_UNLESS(mailbox->Type == (ui32)x->MailboxType); #endif @@ -234,7 +231,6 @@ namespace NActors { return false; TReadAsFilledMailbox* const mailbox = TReadAsFilledMailbox::Get(lineHint, x); - mailbox->Push(recipient.LocalId()); #if (!defined(_tsan_enabled_)) Y_DEBUG_ABORT_UNLESS(mailbox->Type == (ui32)x->MailboxType); #endif @@ -250,7 +246,6 @@ namespace NActors { return false; TTinyReadAsFilledMailbox* const mailbox = TTinyReadAsFilledMailbox::Get(lineHint, x); - mailbox->Push(recipient.LocalId()); #if (!defined(_tsan_enabled_)) Y_DEBUG_ABORT_UNLESS(mailbox->Type == (ui32)x->MailboxType); #endif @@ -435,24 +430,6 @@ namespace NActors { } } - TMailboxUsageImpl::~TMailboxUsageImpl() { - while (auto *e = PendingEventQueue.Pop()) { - delete e; - } - } - - void TMailboxUsageImpl::Push(ui64 localId) { - PendingEventQueue.Push(new TPendingEvent{localId, GetCycleCountFast()}); - } - - void TMailboxUsageImpl::ProcessEvents(TMailboxHeader *mailbox) { - while (std::unique_ptr e{PendingEventQueue.Pop()}) { - if (IActor *actor = mailbox->FindActor(e->LocalId)) { - actor->OnEnqueueEvent(e->Timestamp); - } - } - } - TMailboxTable::TSimpleMailbox::TSimpleMailbox() : TMailboxHeader(TMailboxType::Simple) , ScheduleMoment(0) diff --git a/ydb/library/actors/core/mailbox.h b/ydb/library/actors/core/mailbox.h index 2ae1ff997e2d..13e7678996f1 100644 --- a/ydb/library/actors/core/mailbox.h +++ b/ydb/library/actors/core/mailbox.h @@ -32,27 +32,7 @@ namespace NActors { ui64 ElapsedCycles = 0; }; - template - struct TMailboxUsageImpl { - void Push(ui64 /*localId*/) {} - void ProcessEvents(TMailboxHeader* /*mailbox*/) {} - }; - - template<> - struct TMailboxUsageImpl { - struct TPendingEvent { - ui64 LocalId; - ui64 Timestamp; - }; - NThreading::TReadAsFilledQueue PendingEventQueue; - - ~TMailboxUsageImpl(); - void Push(ui64 localId); - void ProcessEvents(TMailboxHeader *mailbox); - }; - struct TMailboxHeader - : TMailboxUsageImpl { struct TMailboxActorPack { enum EType { diff --git a/ydb/library/actors/core/mon_stats.cpp b/ydb/library/actors/core/mon_stats.cpp index 6524e1c5ea75..e3c02a9919bc 100644 --- a/ydb/library/actors/core/mon_stats.cpp +++ b/ydb/library/actors/core/mon_stats.cpp @@ -38,7 +38,6 @@ namespace NActors { , ActorsAliveByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount()) , ScheduledEventsByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount()) , StuckActorsByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount()) - , UsageByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount()) {} namespace { @@ -95,15 +94,6 @@ namespace NActors { AggregatedCurrentActivationTime.insert(AggregatedCurrentActivationTime.end(), other.AggregatedCurrentActivationTime.begin(), other.AggregatedCurrentActivationTime.end()); } - if (UsageByActivity.size() < other.UsageByActivity.size()) { - UsageByActivity.resize(other.UsageByActivity.size()); - } - for (size_t i = 0; i < UsageByActivity.size(); ++i) { - for (size_t j = 0; j < 10; ++j) { - UsageByActivity[i][j] += RelaxedLoad(&other.UsageByActivity[i][j]); - } - } - RelaxedStore( &PoolActorRegistrations, std::max(RelaxedLoad(&PoolActorRegistrations), RelaxedLoad(&other.PoolActorRegistrations))); @@ -119,4 +109,4 @@ namespace NActors { return ActorsAliveByActivity.size(); } -} \ No newline at end of file +} diff --git a/ydb/library/actors/core/mon_stats.h b/ydb/library/actors/core/mon_stats.h index 4bceffa8f373..5a900f9bcf28 100644 --- a/ydb/library/actors/core/mon_stats.h +++ b/ydb/library/actors/core/mon_stats.h @@ -113,7 +113,6 @@ namespace NActors { TVector ScheduledEventsByActivity; TVector StuckActorsByActivity; TVector AggregatedCurrentActivationTime; - TVector> UsageByActivity; ui64 PoolActorRegistrations = 0; ui64 PoolDestroyedActors = 0; ui64 PoolAllocatedMailboxes = 0; diff --git a/ydb/library/actors/helpers/collector_counters.cpp b/ydb/library/actors/helpers/collector_counters.cpp new file mode 100644 index 000000000000..5640fd1d742c --- /dev/null +++ b/ydb/library/actors/helpers/collector_counters.cpp @@ -0,0 +1,306 @@ +#include "collector_counters.h" + +namespace NActors { + +// THistogramCounters + +void THistogramCounters::Init(NMonitoring::TDynamicCounters* group, const TString& baseName, const TString& unit, ui64 maxVal) { + for (size_t i = 0; (1ull<GetSubgroup("sensor", baseName)->GetNamedCounter("range", bucketName, true)); + } + Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", "INF", true)); +} + +void THistogramCounters::Set(const TLogHistogram& data) { + ui32 i = 0; + for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i) + *Buckets[i] = data.Buckets[i]; + ui64 last = 0; + for (;i < Y_ARRAY_SIZE(data.Buckets); ++i) + last += data.Buckets[i]; + *Buckets.back() = last; +} + +void THistogramCounters::Set(const TLogHistogram& data, double factor) { + ui32 i = 0; + for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i) + *Buckets[i] = data.Buckets[i]*factor; + ui64 last = 0; + for (;i < Y_ARRAY_SIZE(data.Buckets); ++i) + last += data.Buckets[i]; + *Buckets.back() = last*factor; +} + +// TActivityStats + +void TActivityStats::Init(NMonitoring::TDynamicCounterPtr group) { + Group = group; + + CurrentActivationTimeByActivity.resize(GetActivityTypeCount()); + ElapsedMicrosecByActivityBuckets.resize(GetActivityTypeCount()); + ReceivedEventsByActivityBuckets.resize(GetActivityTypeCount()); + ActorsAliveByActivityBuckets.resize(GetActivityTypeCount()); + ScheduledEventsByActivityBuckets.resize(GetActivityTypeCount()); + StuckActorsByActivityBuckets.resize(GetActivityTypeCount()); +} + +void TActivityStats::Set(const TExecutorThreadStats& stats) { + for (ui32 i : xrange(stats.MaxActivityType())) { + Y_ABORT_UNLESS(i < GetActivityTypeCount()); + ui64 ticks = stats.ElapsedTicksByActivity[i]; + ui64 events = stats.ReceivedEventsByActivity[i]; + ui64 actors = stats.ActorsAliveByActivity[i]; + ui64 scheduled = stats.ScheduledEventsByActivity[i]; + ui64 stuck = stats.StuckActorsByActivity[i]; + + if (!ActorsAliveByActivityBuckets[i]) { + if (ticks || events || actors || scheduled) { + InitCountersForActivity(i); + } else { + continue; + } + } + + *CurrentActivationTimeByActivity[i] = 0; + *ElapsedMicrosecByActivityBuckets[i] = ::NHPTimer::GetSeconds(ticks)*1000000; + *ReceivedEventsByActivityBuckets[i] = events; + *ActorsAliveByActivityBuckets[i] = actors; + *ScheduledEventsByActivityBuckets[i] = scheduled; + *StuckActorsByActivityBuckets[i] = stuck; + } + + auto setActivationTime = [&](TActivationTime activation) { + if (!ActorsAliveByActivityBuckets[activation.LastActivity]) { + InitCountersForActivity(activation.LastActivity); + } + *CurrentActivationTimeByActivity[activation.LastActivity] = activation.TimeUs; + }; + if (stats.CurrentActivationTime.TimeUs) { + setActivationTime(stats.CurrentActivationTime); + } + std::vector activationTimes = stats.AggregatedCurrentActivationTime; + Sort(activationTimes.begin(), activationTimes.end(), [](auto &left, auto &right) { + return left.LastActivity < right.LastActivity || + left.LastActivity == right.LastActivity && left.TimeUs > right.TimeUs; + }); + ui32 prevActivity = Max(); + for (auto &activationTime : activationTimes) { + if (activationTime.LastActivity == prevActivity) { + continue; + } + setActivationTime(activationTime); + prevActivity = activationTime.LastActivity; + } +} + +void TActivityStats::InitCountersForActivity(ui32 activityType) { + Y_ABORT_UNLESS(activityType < GetActivityTypeCount()); + + auto bucketName = TString(GetActivityTypeName(activityType)); + + CurrentActivationTimeByActivity[activityType] = + Group->GetSubgroup("sensor", "CurrentActivationTimeUsByActivity")->GetNamedCounter("activity", bucketName, false); + ElapsedMicrosecByActivityBuckets[activityType] = + Group->GetSubgroup("sensor", "ElapsedMicrosecByActivity")->GetNamedCounter("activity", bucketName, true); + ReceivedEventsByActivityBuckets[activityType] = + Group->GetSubgroup("sensor", "ReceivedEventsByActivity")->GetNamedCounter("activity", bucketName, true); + ActorsAliveByActivityBuckets[activityType] = + Group->GetSubgroup("sensor", "ActorsAliveByActivity")->GetNamedCounter("activity", bucketName, false); + ScheduledEventsByActivityBuckets[activityType] = + Group->GetSubgroup("sensor", "ScheduledEventsByActivity")->GetNamedCounter("activity", bucketName, true); + StuckActorsByActivityBuckets[activityType] = + Group->GetSubgroup("sensor", "StuckActorsByActivity")->GetNamedCounter("activity", bucketName, false); +} + +// TExecutorPoolCounters + +void TExecutorPoolCounters::Init(NMonitoring::TDynamicCounters* group, const TString& poolName, ui32 threads) { + LastElapsedSeconds = 0; + Usage = 0; + UsageTimer.Reset(); + Name = poolName; + Threads = threads; + LimitThreads = threads; + DefaultThreads = threads; + + PoolGroup = group->GetSubgroup("execpool", poolName); + + SentEvents = PoolGroup->GetCounter("SentEvents", true); + ReceivedEvents = PoolGroup->GetCounter("ReceivedEvents", true); + PreemptedEvents = PoolGroup->GetCounter("PreemptedEvents", true); + NonDeliveredEvents = PoolGroup->GetCounter("NonDeliveredEvents", true); + DestroyedActors = PoolGroup->GetCounter("DestroyedActors", true); + CpuMicrosec = PoolGroup->GetCounter("CpuMicrosec", true); + ElapsedMicrosec = PoolGroup->GetCounter("ElapsedMicrosec", true); + ParkedMicrosec = PoolGroup->GetCounter("ParkedMicrosec", true); + EmptyMailboxActivation = PoolGroup->GetCounter("EmptyMailboxActivation", true); + ActorRegistrations = PoolGroup->GetCounter("ActorRegistrations", true); + ActorsAlive = PoolGroup->GetCounter("ActorsAlive", false); + AllocatedMailboxes = PoolGroup->GetCounter("AllocatedMailboxes", false); + MailboxPushedOutBySoftPreemption = PoolGroup->GetCounter("MailboxPushedOutBySoftPreemption", true); + MailboxPushedOutByTime = PoolGroup->GetCounter("MailboxPushedOutByTime", true); + MailboxPushedOutByEventCount = PoolGroup->GetCounter("MailboxPushedOutByEventCount", true); + WrongWakenedThreadCount = PoolGroup->GetCounter("WrongWakenedThreadCount", true); + CurrentThreadCount = PoolGroup->GetCounter("CurrentThreadCount", false); + PotentialMaxThreadCount = PoolGroup->GetCounter("PotentialMaxThreadCount", false); + DefaultThreadCount = PoolGroup->GetCounter("DefaultThreadCount", false); + MaxThreadCount = PoolGroup->GetCounter("MaxThreadCount", false); + + CurrentThreadCountPercent = PoolGroup->GetCounter("CurrentThreadCountPercent", false); + PotentialMaxThreadCountPercent = PoolGroup->GetCounter("PotentialMaxThreadCountPercent", false); + PossibleMaxThreadCountPercent = PoolGroup->GetCounter("PossibleMaxThreadCountPercent", false); + DefaultThreadCountPercent = PoolGroup->GetCounter("DefaultThreadCountPercent", false); + MaxThreadCountPercent = PoolGroup->GetCounter("MaxThreadCountPercent", false); + + IsNeedy = PoolGroup->GetCounter("IsNeedy", false); + IsStarved = PoolGroup->GetCounter("IsStarved", false); + IsHoggish = PoolGroup->GetCounter("IsHoggish", false); + HasFullOwnSharedThread = PoolGroup->GetCounter("HasFullOwnSharedThread", false); + HasHalfOfOwnSharedThread = PoolGroup->GetCounter("HasHalfOfOwnSharedThread", false); + HasHalfOfOtherSharedThread = PoolGroup->GetCounter("HasHalfOfOtherSharedThread", false); + IncreasingThreadsByNeedyState = PoolGroup->GetCounter("IncreasingThreadsByNeedyState", true); + IncreasingThreadsByExchange = PoolGroup->GetCounter("IncreasingThreadsByExchange", true); + DecreasingThreadsByStarvedState = PoolGroup->GetCounter("DecreasingThreadsByStarvedState", true); + DecreasingThreadsByHoggishState = PoolGroup->GetCounter("DecreasingThreadsByHoggishState", true); + DecreasingThreadsByExchange = PoolGroup->GetCounter("DecreasingThreadsByExchange", true); + NotEnoughCpuExecutions = PoolGroup->GetCounter("NotEnoughCpuExecutions", true); + SpinningTimeUs = PoolGroup->GetCounter("SpinningTimeUs", true); + SpinThresholdUs = PoolGroup->GetCounter("SpinThresholdUs", false); + + + LegacyActivationTimeHistogram.Init(PoolGroup.Get(), "ActivationTime", "usec", 5*1000*1000); + ActivationTimeHistogram = PoolGroup->GetHistogram( + "ActivationTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1)); + LegacyEventDeliveryTimeHistogram.Init(PoolGroup.Get(), "EventDeliveryTime", "usec", 5*1000*1000); + EventDeliveryTimeHistogram = PoolGroup->GetHistogram( + "EventDeliveryTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1)); + LegacyEventProcessingCountHistogram.Init(PoolGroup.Get(), "EventProcessingCount", "usec", 5*1000*1000); + EventProcessingCountHistogram = PoolGroup->GetHistogram( + "EventProcessingCountUs", NMonitoring::ExponentialHistogram(24, 2, 1)); + LegacyEventProcessingTimeHistogram.Init(PoolGroup.Get(), "EventProcessingTime", "usec", 5*1000*1000); + EventProcessingTimeHistogram = PoolGroup->GetHistogram( + "EventProcessingTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1)); + + ActivityStats.Init(PoolGroup.Get()); + + MaxUtilizationTime = PoolGroup->GetCounter("MaxUtilizationTime", true); +} + +void TExecutorPoolCounters::Set(const TExecutorPoolStats& poolStats, const TExecutorThreadStats& stats) { +#ifdef ACTORSLIB_COLLECT_EXEC_STATS + double elapsedSeconds = ::NHPTimer::GetSeconds(stats.ElapsedTicks); + *SentEvents = stats.SentEvents; + *ReceivedEvents = stats.ReceivedEvents; + *PreemptedEvents = stats.PreemptedEvents; + *NonDeliveredEvents = stats.NonDeliveredEvents; + *DestroyedActors = stats.PoolDestroyedActors; + *EmptyMailboxActivation = stats.EmptyMailboxActivation; + *CpuMicrosec = stats.CpuUs; + *ElapsedMicrosec = elapsedSeconds*1000000; + *ParkedMicrosec = ::NHPTimer::GetSeconds(stats.ParkedTicks)*1000000; + *ActorRegistrations = stats.PoolActorRegistrations; + *ActorsAlive = stats.PoolActorRegistrations - stats.PoolDestroyedActors; + *AllocatedMailboxes = stats.PoolAllocatedMailboxes; + *MailboxPushedOutBySoftPreemption = stats.MailboxPushedOutBySoftPreemption; + *MailboxPushedOutByTime = stats.MailboxPushedOutByTime; + *MailboxPushedOutByEventCount = stats.MailboxPushedOutByEventCount; + *WrongWakenedThreadCount = poolStats.WrongWakenedThreadCount; + *CurrentThreadCount = poolStats.CurrentThreadCount; + *PotentialMaxThreadCount = poolStats.PotentialMaxThreadCount; + *DefaultThreadCount = poolStats.DefaultThreadCount; + *MaxThreadCount = poolStats.MaxThreadCount; + + *CurrentThreadCountPercent = poolStats.CurrentThreadCount * 100; + *PotentialMaxThreadCountPercent = poolStats.PotentialMaxThreadCount * 100; + *PossibleMaxThreadCountPercent = poolStats.PotentialMaxThreadCount * 100; + *DefaultThreadCountPercent = poolStats.DefaultThreadCount * 100; + *MaxThreadCountPercent = poolStats.MaxThreadCount * 100; + + *IsNeedy = poolStats.IsNeedy; + *IsStarved = poolStats.IsStarved; + *IsHoggish = poolStats.IsHoggish; + + *HasFullOwnSharedThread = poolStats.HasFullOwnSharedThread; + *HasHalfOfOwnSharedThread = poolStats.HasHalfOfOwnSharedThread; + *HasHalfOfOtherSharedThread = poolStats.HasHalfOfOtherSharedThread; + *IncreasingThreadsByNeedyState = poolStats.IncreasingThreadsByNeedyState; + *IncreasingThreadsByExchange = poolStats.IncreasingThreadsByExchange; + *DecreasingThreadsByStarvedState = poolStats.DecreasingThreadsByStarvedState; + *DecreasingThreadsByHoggishState = poolStats.DecreasingThreadsByHoggishState; + *DecreasingThreadsByExchange = poolStats.DecreasingThreadsByExchange; + *NotEnoughCpuExecutions = stats.NotEnoughCpuExecutions; + + *SpinningTimeUs = poolStats.SpinningTimeUs; + *SpinThresholdUs = poolStats.SpinThresholdUs; + + LegacyActivationTimeHistogram.Set(stats.ActivationTimeHistogram); + ActivationTimeHistogram->Reset(); + ActivationTimeHistogram->Collect(stats.ActivationTimeHistogram); + + LegacyEventDeliveryTimeHistogram.Set(stats.EventDeliveryTimeHistogram); + EventDeliveryTimeHistogram->Reset(); + EventDeliveryTimeHistogram->Collect(stats.EventDeliveryTimeHistogram); + + LegacyEventProcessingCountHistogram.Set(stats.EventProcessingCountHistogram); + EventProcessingCountHistogram->Reset(); + EventProcessingCountHistogram->Collect(stats.EventProcessingCountHistogram); + + double toMicrosec = 1000000 / NHPTimer::GetClockRate(); + LegacyEventProcessingTimeHistogram.Set(stats.EventProcessingTimeHistogram, toMicrosec); + EventProcessingTimeHistogram->Reset(); + for (ui32 i = 0; i < stats.EventProcessingTimeHistogram.Count(); ++i) { + EventProcessingTimeHistogram->Collect( + stats.EventProcessingTimeHistogram.UpperBound(i), + stats.EventProcessingTimeHistogram.Value(i) * toMicrosec); + } + + ActivityStats.Set(stats); + + *MaxUtilizationTime = poolStats.MaxUtilizationTime; + + double seconds = UsageTimer.PassedReset(); + + // TODO[serxa]: It doesn't account for contention. Use 1 - parkedTicksDelta / seconds / numThreads KIKIMR-11916 + Threads = poolStats.CurrentThreadCount; + LimitThreads = poolStats.PotentialMaxThreadCount; + const double currentUsage = LimitThreads > 0 ? ((elapsedSeconds - LastElapsedSeconds) / seconds / LimitThreads) : 0; + + // update usage factor according to smoothness + const double smoothness = 0.5; + Usage = currentUsage * smoothness + Usage * (1.0 - smoothness); + LastElapsedSeconds = elapsedSeconds; +#else + Y_UNUSED(stats); + Y_UNUSED(poolStats); +#endif +} + +// TActorSystemCounters + +void TActorSystemCounters::Init(NMonitoring::TDynamicCounters* group) { + Group = group; + + MaxUsedCpuPercent = Group->GetCounter("MaxUsedCpuPercent", false); + MinUsedCpuPercent = Group->GetCounter("MinUsedCpuPercent", false); + MaxElapsedCpuPercent = Group->GetCounter("MaxElapsedCpuPercent", false); + MinElapsedCpuPercent = Group->GetCounter("MinElapsedCpuPercent", false); + AvgAwakeningTimeNs = Group->GetCounter("AvgAwakeningTimeNs", false); + AvgWakingUpTimeNs = Group->GetCounter("AvgWakingUpTimeNs", false); +} + +void TActorSystemCounters::Set(const THarmonizerStats& harmonizerStats) { +#ifdef ACTORSLIB_COLLECT_EXEC_STATS + *MaxUsedCpuPercent = harmonizerStats.MaxUsedCpu; + *MinUsedCpuPercent = harmonizerStats.MinUsedCpu; + *MaxElapsedCpuPercent = harmonizerStats.MaxElapsedCpu; + *MinElapsedCpuPercent = harmonizerStats.MinElapsedCpu; + + *AvgAwakeningTimeNs = harmonizerStats.AvgAwakeningTimeUs * 1000; + *AvgWakingUpTimeNs = harmonizerStats.AvgWakingUpTimeUs * 1000; +#else + Y_UNUSED(harmonizerStats); +#endif +} + +} // NActors diff --git a/ydb/library/actors/helpers/collector_counters.h b/ydb/library/actors/helpers/collector_counters.h new file mode 100644 index 000000000000..0a475bcddf8b --- /dev/null +++ b/ydb/library/actors/helpers/collector_counters.h @@ -0,0 +1,124 @@ +#pragma once + +#include +#include +#include + +#include +#include +#include + +namespace NActors { + +struct THistogramCounters { + void Init(NMonitoring::TDynamicCounters* group, const TString& baseName, const TString& unit, ui64 maxVal); + void Set(const TLogHistogram& data); + void Set(const TLogHistogram& data, double factor); + +private: + TVector Buckets; +}; + +struct TActivityStats { + void Init(NMonitoring::TDynamicCounterPtr group); + void Set(const TExecutorThreadStats& stats); + +private: + void InitCountersForActivity(ui32 activityType); + +private: + NMonitoring::TDynamicCounterPtr Group; + + TVector CurrentActivationTimeByActivity; + TVector ElapsedMicrosecByActivityBuckets; + TVector ReceivedEventsByActivityBuckets; + TVector ActorsAliveByActivityBuckets; + TVector ScheduledEventsByActivityBuckets; + TVector StuckActorsByActivityBuckets; +}; + +struct TExecutorPoolCounters { + TIntrusivePtr PoolGroup; + + NMonitoring::TDynamicCounters::TCounterPtr SentEvents; + NMonitoring::TDynamicCounters::TCounterPtr ReceivedEvents; + NMonitoring::TDynamicCounters::TCounterPtr PreemptedEvents; + NMonitoring::TDynamicCounters::TCounterPtr NonDeliveredEvents; + NMonitoring::TDynamicCounters::TCounterPtr DestroyedActors; + NMonitoring::TDynamicCounters::TCounterPtr EmptyMailboxActivation; + NMonitoring::TDynamicCounters::TCounterPtr CpuMicrosec; + NMonitoring::TDynamicCounters::TCounterPtr ElapsedMicrosec; + NMonitoring::TDynamicCounters::TCounterPtr ParkedMicrosec; + NMonitoring::TDynamicCounters::TCounterPtr ActorRegistrations; + NMonitoring::TDynamicCounters::TCounterPtr ActorsAlive; + NMonitoring::TDynamicCounters::TCounterPtr AllocatedMailboxes; + NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutBySoftPreemption; + NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByTime; + NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByEventCount; + NMonitoring::TDynamicCounters::TCounterPtr WrongWakenedThreadCount; + NMonitoring::TDynamicCounters::TCounterPtr CurrentThreadCount; + NMonitoring::TDynamicCounters::TCounterPtr PotentialMaxThreadCount; + NMonitoring::TDynamicCounters::TCounterPtr DefaultThreadCount; + NMonitoring::TDynamicCounters::TCounterPtr MaxThreadCount; + NMonitoring::TDynamicCounters::TCounterPtr CurrentThreadCountPercent; + NMonitoring::TDynamicCounters::TCounterPtr PotentialMaxThreadCountPercent; + NMonitoring::TDynamicCounters::TCounterPtr PossibleMaxThreadCountPercent; + NMonitoring::TDynamicCounters::TCounterPtr DefaultThreadCountPercent; + NMonitoring::TDynamicCounters::TCounterPtr MaxThreadCountPercent; + NMonitoring::TDynamicCounters::TCounterPtr IsNeedy; + NMonitoring::TDynamicCounters::TCounterPtr IsStarved; + NMonitoring::TDynamicCounters::TCounterPtr IsHoggish; + NMonitoring::TDynamicCounters::TCounterPtr HasFullOwnSharedThread; + NMonitoring::TDynamicCounters::TCounterPtr HasHalfOfOwnSharedThread; + NMonitoring::TDynamicCounters::TCounterPtr HasHalfOfOtherSharedThread; + NMonitoring::TDynamicCounters::TCounterPtr IncreasingThreadsByNeedyState; + NMonitoring::TDynamicCounters::TCounterPtr IncreasingThreadsByExchange; + NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByStarvedState; + NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByHoggishState; + NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByExchange; + NMonitoring::TDynamicCounters::TCounterPtr NotEnoughCpuExecutions; + NMonitoring::TDynamicCounters::TCounterPtr SpinningTimeUs; + NMonitoring::TDynamicCounters::TCounterPtr SpinThresholdUs; + + + THistogramCounters LegacyActivationTimeHistogram; + NMonitoring::THistogramPtr ActivationTimeHistogram; + THistogramCounters LegacyEventDeliveryTimeHistogram; + NMonitoring::THistogramPtr EventDeliveryTimeHistogram; + THistogramCounters LegacyEventProcessingCountHistogram; + NMonitoring::THistogramPtr EventProcessingCountHistogram; + THistogramCounters LegacyEventProcessingTimeHistogram; + NMonitoring::THistogramPtr EventProcessingTimeHistogram; + + TActivityStats ActivityStats; + NMonitoring::TDynamicCounters::TCounterPtr MaxUtilizationTime; + + double Usage = 0; + double LastElapsedSeconds = 0; + THPTimer UsageTimer; + TString Name; + double Threads; + double LimitThreads; + double DefaultThreads; + + void Init(NMonitoring::TDynamicCounters* group, const TString& poolName, ui32 threads); + void Set(const TExecutorPoolStats& poolStats, const TExecutorThreadStats& stats); +}; + +struct TActorSystemCounters { + TIntrusivePtr Group; + + NMonitoring::TDynamicCounters::TCounterPtr MaxUsedCpuPercent; + NMonitoring::TDynamicCounters::TCounterPtr MinUsedCpuPercent; + NMonitoring::TDynamicCounters::TCounterPtr MaxElapsedCpuPercent; + NMonitoring::TDynamicCounters::TCounterPtr MinElapsedCpuPercent; + + NMonitoring::TDynamicCounters::TCounterPtr AvgAwakeningTimeNs; + NMonitoring::TDynamicCounters::TCounterPtr AvgWakingUpTimeNs; + + + void Init(NMonitoring::TDynamicCounters* group); + void Set(const THarmonizerStats& harmonizerStats); +}; + +} // NActors