Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 0 additions & 56 deletions ydb/library/actors/core/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,62 +35,6 @@ namespace NActors {
return *this;
}

template<i64 Increment>
static void UpdateQueueSizeAndTimestamp(TActorUsageImpl<true>& impl, ui64 time) {
ui64 usedTimeIncrement = 0;
using T = TActorUsageImpl<true>;

for (;;) {
uint64_t value = impl.QueueSizeAndTimestamp.load();
ui64 count = value >> T::TimestampBits;

count += Increment;
Y_ABORT_UNLESS((count & ~T::CountMask) == 0);

ui64 timestamp = value;
if (Increment == 1 && count == 1) {
timestamp = time;
} else if (Increment == -1 && count == 0) {
usedTimeIncrement = (static_cast<ui64>(time) - timestamp) & T::TimestampMask;
timestamp = 0; // reset timestamp to some zero value
}

const ui64 updated = (timestamp & T::TimestampMask) | (count << T::TimestampBits);
if (impl.QueueSizeAndTimestamp.compare_exchange_weak(value, updated)) {
break;
}
}

if (usedTimeIncrement && impl.LastUsageTimestamp <= time) {
impl.UsedTime += usedTimeIncrement;
}
}

void TActorUsageImpl<true>::OnEnqueueEvent(ui64 time) {
UpdateQueueSizeAndTimestamp<+1>(*this, time);
}

void TActorUsageImpl<true>::OnDequeueEvent() {
UpdateQueueSizeAndTimestamp<-1>(*this, GetCycleCountFast());
}

double TActorUsageImpl<true>::GetUsage(ui64 time) {
ui64 used = UsedTime.exchange(0);
if (const ui64 value = QueueSizeAndTimestamp.load(); value >> TimestampBits) {
used += (static_cast<ui64>(time) - value) & TimestampMask;
}

Y_ABORT_UNLESS(LastUsageTimestamp <= time);
ui64 passed = time - LastUsageTimestamp;
LastUsageTimestamp = time;

if (!passed) {
return 0;
}

return (double)Min(passed, used) / passed;
}

void IActor::Describe(IOutputStream &out) const noexcept {
SelfActorId.Out(out);
}
Expand Down
25 changes: 0 additions & 25 deletions ydb/library/actors/core/actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -320,34 +320,9 @@ namespace NActors {

};

template<bool>
struct TActorUsageImpl {
void OnEnqueueEvent(ui64 /*time*/) {} // called asynchronously when event is put in the mailbox
void OnDequeueEvent() {} // called when processed by Executor
double GetUsage(ui64 /*time*/) { return 0; } // called from collector thread
void DoActorInit() {}
};

template<>
struct TActorUsageImpl<true> {
static constexpr int TimestampBits = 40;
static constexpr int CountBits = 24;
static constexpr ui64 TimestampMask = ((ui64)1 << TimestampBits) - 1;
static constexpr ui64 CountMask = ((ui64)1 << CountBits) - 1;

std::atomic_uint64_t QueueSizeAndTimestamp = 0;
std::atomic_uint64_t UsedTime = 0; // how much time did we consume since last GetUsage() call
ui64 LastUsageTimestamp = 0; // when GetUsage() was called the last time

void OnEnqueueEvent(ui64 time);
void OnDequeueEvent();
double GetUsage(ui64 time);
void DoActorInit() { LastUsageTimestamp = GetCycleCountFast(); }
};

class IActor
: protected IActorOps
, public TActorUsageImpl<ActorLibCollectUsageStats>
{
private:
TActorIdentity SelfActorId;
Expand Down
2 changes: 0 additions & 2 deletions ydb/library/actors/core/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// event processing time histograms
#define ACTORSLIB_COLLECT_EXEC_STATS

static constexpr bool ActorLibCollectUsageStats = false;

namespace NActors {
using TPoolId = ui8;
using TPoolsMask = ui64;
Expand Down
17 changes: 0 additions & 17 deletions ydb/library/actors/core/executor_pool_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ namespace NActors {

void DoActorInit(TActorSystem* sys, IActor* actor, const TActorId& self, const TActorId& owner) {
actor->SelfActorId = self;
actor->DoActorInit();
actor->Registered(sys, owner);
}

Expand All @@ -34,17 +33,6 @@ namespace NActors {

const TMonotonic now = ActorSystem->Monotonic();

for (auto& u : stats.UsageByActivity) {
u.fill(0);
}

auto accountUsage = [&](ui32 activityType, double usage) {
Y_ABORT_UNLESS(0 <= usage);
Y_ABORT_UNLESS(usage <= 1);
int bin = Min<int>(9, usage * 10);
++stats.UsageByActivity[activityType][bin];
};

std::fill(stats.StuckActorsByActivity.begin(), stats.StuckActorsByActivity.end(), 0);

with_lock (StuckObserverMutex) {
Expand All @@ -55,12 +43,7 @@ namespace NActors {
if (delta > TDuration::Seconds(30)) {
++stats.StuckActorsByActivity[actor->GetActivityType()];
}
accountUsage(actor->GetActivityType(), actor->GetUsage(GetCycleCountFast()));
}
for (const auto& [activityType, usage] : DeadActorsUsage) {
accountUsage(activityType, usage);
}
DeadActorsUsage.clear();
}
}
#endif
Expand Down
1 change: 0 additions & 1 deletion ydb/library/actors/core/executor_pool_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ namespace NActors {
// Stuck actor monitoring
TMutex StuckObserverMutex;
std::vector<IActor*> Actors;
mutable std::vector<std::tuple<ui32, double>> DeadActorsUsage;
friend class TGenericExecutorThread;
friend class TSharedExecutorThread;
void RecalculateStuckActors(TExecutorThreadStats& stats) const;
Expand Down
6 changes: 0 additions & 6 deletions ydb/library/actors/core/executor_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ namespace NActors {
actorPtr = pool->Actors.back();
actorPtr->StuckIndex = i;
pool->Actors.pop_back();
pool->DeadActorsUsage.emplace_back(actor->GetActivityType(), actor->GetUsage(GetCycleCountFast()));
}
}
}
Expand Down Expand Up @@ -207,7 +206,6 @@ namespace NActors {

for (; Ctx.ExecutedEvents < Ctx.EventsPerMailbox; ++Ctx.ExecutedEvents) {
if (TAutoPtr<IEventHandle> evExt = mailbox->Pop()) {
mailbox->ProcessEvents(mailbox);
recipient = evExt->GetRecipientRewrite();
TActorContext ctx(*mailbox, *this, eventStart, recipient);
TlsActivationContext = &ctx; // ensure dtor (if any) is called within actor system
Expand Down Expand Up @@ -253,14 +251,10 @@ namespace NActors {
hpnow = GetCycleCountFast();
hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);

mailbox->ProcessEvents(mailbox);
actor->OnDequeueEvent();

size_t dyingActorsCnt = DyingActors.size();
Ctx.UpdateActorsStats(dyingActorsCnt);
if (dyingActorsCnt) {
DropUnregistered();
mailbox->ProcessEvents(mailbox);
actor = nullptr;
}

Expand Down
23 changes: 0 additions & 23 deletions ydb/library/actors/core/mailbox.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ namespace NActors {
switch (x->MailboxType) {
case TMailboxType::Simple: {
TSimpleMailbox* const mailbox = TSimpleMailbox::Get(lineHint, x);
mailbox->Push(recipient.LocalId());
#if (!defined(_tsan_enabled_))
Y_DEBUG_ABORT_UNLESS(mailbox->Type == (ui32)x->MailboxType);
#endif
Expand All @@ -205,7 +204,6 @@ namespace NActors {
return false;

TRevolvingMailbox* const mailbox = TRevolvingMailbox::Get(lineHint, x);
mailbox->Push(recipient.LocalId());
#if (!defined(_tsan_enabled_))
Y_DEBUG_ABORT_UNLESS(mailbox->Type == (ui32)x->MailboxType);
#endif
Expand All @@ -218,7 +216,6 @@ namespace NActors {
return true;
case TMailboxType::HTSwap: {
THTSwapMailbox* const mailbox = THTSwapMailbox::Get(lineHint, x);
mailbox->Push(recipient.LocalId());
#if (!defined(_tsan_enabled_))
Y_DEBUG_ABORT_UNLESS(mailbox->Type == (ui32)x->MailboxType);
#endif
Expand All @@ -234,7 +231,6 @@ namespace NActors {
return false;

TReadAsFilledMailbox* const mailbox = TReadAsFilledMailbox::Get(lineHint, x);
mailbox->Push(recipient.LocalId());
#if (!defined(_tsan_enabled_))
Y_DEBUG_ABORT_UNLESS(mailbox->Type == (ui32)x->MailboxType);
#endif
Expand All @@ -250,7 +246,6 @@ namespace NActors {
return false;

TTinyReadAsFilledMailbox* const mailbox = TTinyReadAsFilledMailbox::Get(lineHint, x);
mailbox->Push(recipient.LocalId());
#if (!defined(_tsan_enabled_))
Y_DEBUG_ABORT_UNLESS(mailbox->Type == (ui32)x->MailboxType);
#endif
Expand Down Expand Up @@ -435,24 +430,6 @@ namespace NActors {
}
}

TMailboxUsageImpl<true>::~TMailboxUsageImpl() {
while (auto *e = PendingEventQueue.Pop()) {
delete e;
}
}

void TMailboxUsageImpl<true>::Push(ui64 localId) {
PendingEventQueue.Push(new TPendingEvent{localId, GetCycleCountFast()});
}

void TMailboxUsageImpl<true>::ProcessEvents(TMailboxHeader *mailbox) {
while (std::unique_ptr<TPendingEvent> e{PendingEventQueue.Pop()}) {
if (IActor *actor = mailbox->FindActor(e->LocalId)) {
actor->OnEnqueueEvent(e->Timestamp);
}
}
}

TMailboxTable::TSimpleMailbox::TSimpleMailbox()
: TMailboxHeader(TMailboxType::Simple)
, ScheduleMoment(0)
Expand Down
20 changes: 0 additions & 20 deletions ydb/library/actors/core/mailbox.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,7 @@ namespace NActors {
ui64 ElapsedCycles = 0;
};

template<bool>
struct TMailboxUsageImpl {
void Push(ui64 /*localId*/) {}
void ProcessEvents(TMailboxHeader* /*mailbox*/) {}
};

template<>
struct TMailboxUsageImpl<true> {
struct TPendingEvent {
ui64 LocalId;
ui64 Timestamp;
};
NThreading::TReadAsFilledQueue<TPendingEvent> PendingEventQueue;

~TMailboxUsageImpl();
void Push(ui64 localId);
void ProcessEvents(TMailboxHeader *mailbox);
};

struct TMailboxHeader
: TMailboxUsageImpl<ActorLibCollectUsageStats>
{
struct TMailboxActorPack {
enum EType {
Expand Down
12 changes: 1 addition & 11 deletions ydb/library/actors/core/mon_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ namespace NActors {
, ActorsAliveByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount())
, ScheduledEventsByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount())
, StuckActorsByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount())
, UsageByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount())
{}

namespace {
Expand Down Expand Up @@ -95,15 +94,6 @@ namespace NActors {
AggregatedCurrentActivationTime.insert(AggregatedCurrentActivationTime.end(), other.AggregatedCurrentActivationTime.begin(), other.AggregatedCurrentActivationTime.end());
}

if (UsageByActivity.size() < other.UsageByActivity.size()) {
UsageByActivity.resize(other.UsageByActivity.size());
}
for (size_t i = 0; i < UsageByActivity.size(); ++i) {
for (size_t j = 0; j < 10; ++j) {
UsageByActivity[i][j] += RelaxedLoad(&other.UsageByActivity[i][j]);
}
}

RelaxedStore(
&PoolActorRegistrations,
std::max(RelaxedLoad(&PoolActorRegistrations), RelaxedLoad(&other.PoolActorRegistrations)));
Expand All @@ -119,4 +109,4 @@ namespace NActors {
return ActorsAliveByActivity.size();
}

}
}
1 change: 0 additions & 1 deletion ydb/library/actors/core/mon_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ namespace NActors {
TVector<ui64> ScheduledEventsByActivity;
TVector<ui64> StuckActorsByActivity;
TVector<TActivationTime> AggregatedCurrentActivationTime;
TVector<std::array<ui64, 10>> UsageByActivity;
ui64 PoolActorRegistrations = 0;
ui64 PoolDestroyedActors = 0;
ui64 PoolAllocatedMailboxes = 0;
Expand Down
Loading
Loading