Skip to content
17 changes: 10 additions & 7 deletions ydb/core/kqp/runtime/scheduler/kqp_compute_scheduler_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class TComputeSchedulerService : public NActors::TActorBootstrapped<TComputeSche

void Handle(TEvAddDatabase::TPtr& ev) {
NHdrf::TStaticAttributes const attrs {
.Weight = std::max(ev->Get()->Weight, 0.0), // TODO: weight shouldn't be negative!
.Weight = ev->Get()->Weight, // TODO: weight shouldn't be negative!
};
Scheduler->AddOrUpdateDatabase(ev->Get()->Id, attrs);
}
Expand All @@ -62,7 +62,7 @@ class TComputeSchedulerService : public NActors::TActorBootstrapped<TComputeSche
const auto& poolId = ev->Get()->PoolId;
const auto resourceWeight = std::max(ev->Get()->Params.ResourceWeight, 0.0); // TODO: resource weight shouldn't be negative!
NHdrf::TStaticAttributes attrs = {
.Weight = std::max(ev->Get()->Weight, 0.0), // TODO: weight shouldn't be negative!
.Weight = ev->Get()->Weight, // TODO: weight shouldn't be negative!
};

if (ev->Get()->Params.TotalCpuLimitPercentPerNode >= 0) {
Expand Down Expand Up @@ -127,7 +127,7 @@ class TComputeSchedulerService : public NActors::TActorBootstrapped<TComputeSche
const auto& poolId = ev->Get()->PoolId;
const auto& queryId = ev->Get()->QueryId;
NHdrf::TStaticAttributes const attrs {
.Weight = std::max(ev->Get()->Weight, 0.0), // TODO: weight shouldn't be negative!
.Weight = ev->Get()->Weight, // TODO: weight shouldn't be negative!
};

auto query = Scheduler->AddOrUpdateQuery(databaseId, poolId.empty() ? NKikimr::NResourcePool::DEFAULT_POOL_ID : poolId, queryId, attrs);
Expand Down Expand Up @@ -205,6 +205,8 @@ ui64 TComputeScheduler::GetTotalCpuLimit() const {
void TComputeScheduler::AddOrUpdateDatabase(const TString& databaseId, const NHdrf::TStaticAttributes& attrs) {
TWriteGuard lock(Mutex);

Y_ENSURE(attrs.GetWeight() > 0.0, "Weight should be positive");

if (auto database = Root->GetDatabase(databaseId)) {
database->Update(attrs);
} else {
Expand All @@ -219,6 +221,8 @@ void TComputeScheduler::AddOrUpdatePool(const TString& databaseId, const TString
auto database = Root->GetDatabase(databaseId);
Y_ENSURE(database, "Database not found: " << databaseId);

Y_ENSURE(attrs.GetWeight() > 0.0, "Weight should be positive");

if (auto pool = database->GetPool(poolId)) {
pool->Update(attrs);
} else {
Expand All @@ -235,6 +239,7 @@ TQueryPtr TComputeScheduler::AddOrUpdateQuery(const TString& databaseId, const T
auto pool = database->GetPool(poolId);
Y_ENSURE(pool, "Pool not found: " << poolId);

Y_ENSURE(attrs.GetWeight() > 0.0, "Weight should be positive");
TQueryPtr query;

if (query = std::static_pointer_cast<TQuery>(pool->GetQuery(queryId))) {
Expand All @@ -258,7 +263,7 @@ void TComputeScheduler::RemoveQuery(const TQueryPtr& query) {
query->GetParent()->RemoveQuery(queryId);
}

void TComputeScheduler::UpdateFairShare() {
void TComputeScheduler::UpdateFairShare(bool allowFairShareOverlimit) {
auto startTime = TMonotonic::Now();

NHdrf::NSnapshot::TRootPtr snapshot;
Expand All @@ -268,9 +273,7 @@ void TComputeScheduler::UpdateFairShare() {
}

snapshot->UpdateBottomUp(Root->TotalLimit);
// We want to allow FairShare to be over Limit.
// If you need to change this behaviour change variable's default value
snapshot->UpdateTopDown(true);
snapshot->UpdateTopDown(allowFairShareOverlimit);

{
TWriteGuard lock(Mutex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ class TComputeScheduler : public std::enable_shared_from_this<TComputeScheduler>
NHdrf::NDynamic::TQueryPtr AddOrUpdateQuery(const TString& databaseId, const TString& poolId, const NHdrf::TQueryId& queryId, const NHdrf::TStaticAttributes& attrs);
void RemoveQuery(const NHdrf::NDynamic::TQueryPtr& query);

void UpdateFairShare();
// We want to allow FairShare to be over Limit.
// If you need to change this behaviour change variable's default value
void UpdateFairShare(bool allowFairShareOverlimit = true);

private:
TRWMutex Mutex;
Expand Down
Loading