Skip to content

Commit 1fa5a26

Browse files
authored
stable-25-3-1: EXT-1430 Fix race in resource manager (#25738)
2 parents c9ffb77 + c127b28 commit 1fa5a26

File tree

3 files changed

+30
-16
lines changed

3 files changed

+30
-16
lines changed

ydb/core/kqp/rm_service/kqp_resource_info_exchanger.cpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -316,19 +316,18 @@ class TKqpResourceInfoExchangerActor : public TActorBootstrapped<TKqpResourceInf
316316
Schedule(at - now, new TEvPrivate::TEvUpdateSnapshotState);
317317
return;
318318
}
319-
TVector<NKikimrKqp::TKqpNodeResources> resources;
320-
resources.reserve(NodesState.size());
319+
std::shared_ptr<TVector<NKikimrKqp::TKqpNodeResources>> resources = std::make_shared<TVector<NKikimrKqp::TKqpNodeResources>>();
320+
resources->reserve(NodesState.size());
321321

322322
for (const auto& [nodeId, state] : NodesState) {
323323
const auto& currentResources = state.NodeData.GetResources();
324324
if (currentResources.HasNodeId()) {
325-
resources.push_back(std::move(currentResources));
325+
resources->push_back(std::move(currentResources));
326326
}
327327
}
328328

329329
with_lock (ResourceSnapshotState->Lock) {
330-
ResourceSnapshotState->Snapshot =
331-
std::make_shared<TVector<NKikimrKqp::TKqpNodeResources>>(std::move(resources));
330+
ResourceSnapshotState->Snapshot = std::move(resources);
332331
}
333332

334333
ResourceSnapshotRetryState.LastRetryAt = now;

ydb/core/kqp/rm_service/kqp_rm_service.cpp

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -171,12 +171,12 @@ class TKqpResourceManager : public IKqpResourceManager {
171171
SetConfigValues(config);
172172
}
173173

174-
void Bootstrap(NKikimrConfig::TTableServiceConfig::TResourceManager& config, TActorSystem* actorSystem, TActorId selfId) {
175-
if (!Counters) {
176-
Counters = MakeIntrusive<TKqpCounters>(AppData()->Counters);
177-
}
174+
void Registered(NKikimrConfig::TTableServiceConfig::TResourceManager& config, TActorSystem* actorSystem, TActorId selfId) {
178175
ActorSystem = actorSystem;
179176
SelfId = selfId;
177+
if (!Counters) {
178+
Counters = MakeIntrusive<TKqpCounters>(AppData(ActorSystem)->Counters);
179+
}
180180
UpdatePatternCache(config.GetKqpPatternCacheCapacityBytes(),
181181
config.GetKqpPatternCacheCompiledCapacityBytes(),
182182
config.GetKqpPatternCachePatternAccessTimesBeforeTryToCompile());
@@ -582,20 +582,30 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
582582
TKqpResourceManagerActor(const NKikimrConfig::TTableServiceConfig::TResourceManager& config,
583583
TIntrusivePtr<TKqpCounters> counters, const TActorId& resourceBrokerId,
584584
std::shared_ptr<TKqpProxySharedResources>&& kqpProxySharedResources, ui32 nodeId)
585-
: Config(config)
585+
: NodeId(nodeId)
586+
, Config(config)
586587
, ResourceBrokerId(resourceBrokerId ? resourceBrokerId : MakeResourceBrokerID())
587588
, KqpProxySharedResources(std::move(kqpProxySharedResources))
588589
{
589590
ResourceManager = std::make_shared<TKqpResourceManager>(config, counters);
591+
}
592+
593+
// Is called right after service registration
594+
// and before any usual actor can try to get ResourceManager
595+
void Registered(TActorSystem* sys, const TActorId& owner) override {
596+
TActorBootstrapped::Registered(sys, owner);
597+
598+
ResourceManager->Registered(Config, sys, SelfId());
599+
590600
with_lock (ResourceManagers.Lock) {
591-
ResourceManagers.ByNodeId[nodeId] = ResourceManager;
592-
ResourceManagers.Default = ResourceManager;
601+
if (ResourceManagers.Default.expired()) { // There can be several managers in tests
602+
ResourceManagers.Default = ResourceManager;
603+
}
604+
ResourceManagers.ByNodeId[NodeId] = ResourceManager;
593605
}
594606
}
595607

596608
void Bootstrap() {
597-
ResourceManager->Bootstrap(Config, TlsActivationContext->ActorSystem(), SelfId());
598-
599609
LOG_D("Start KqpResourceManagerActor at " << SelfId() << " with ResourceBroker at " << ResourceBrokerId);
600610

601611
// Subscribe for tenant changes
@@ -935,6 +945,7 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
935945
}
936946

937947
private:
948+
const ui32 NodeId;
938949
NKikimrConfig::TTableServiceConfig::TResourceManager Config;
939950

940951
const TActorId ResourceBrokerId;
@@ -984,7 +995,7 @@ std::shared_ptr<NRm::IKqpResourceManager> GetKqpResourceManager(TMaybe<ui32> _no
984995

985996
std::shared_ptr<NRm::IKqpResourceManager> TryGetKqpResourceManager(TMaybe<ui32> _nodeId) {
986997
ui32 nodeId = _nodeId ? *_nodeId : TActivationContext::ActorSystem()->NodeId;
987-
auto rm = NRm::ResourceManagers.Default.lock();
998+
std::shared_ptr<NRm::TKqpResourceManager> rm = NRm::ResourceManagers.Default.lock();
988999
if (Y_LIKELY(rm && rm->GetNodeId() == nodeId)) {
9891000
return rm;
9901001
}

ydb/core/kqp/rm_service/kqp_rm_ut.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,14 @@ class KqpRm : public TTestBase {
150150
const NKikimrConfig::TTableServiceConfig::TResourceManager& config, ui32 nodeInd = 0) {
151151
auto kqpCounters = MakeIntrusive<TKqpCounters>(Counters);
152152
auto resman = CreateKqpResourceManagerActor(config, kqpCounters, ResourceBrokers[nodeInd], nullptr, Runtime->GetNodeId(nodeInd));
153+
// RM creates children during its registration, we need to enable schedule for them
154+
auto prevObserver = Runtime->SetRegistrationObserverFunc([](TTestActorRuntimeBase& runtime, const TActorId& /*parentId*/, const TActorId& actorId) {
155+
runtime.EnableScheduleForActor(actorId, true);
156+
});
153157
ResourceManagers.push_back(Runtime->Register(resman, nodeInd));
154158
Runtime->RegisterService(MakeKqpResourceManagerServiceID(
155159
Runtime->GetNodeId(nodeInd)), ResourceManagers.back(), nodeInd);
156-
Runtime->EnableScheduleForActor(ResourceManagers.back(), true);
160+
Runtime->SetRegistrationObserverFunc(prevObserver);
157161
}
158162

159163
void StartRms(const TVector<NKikimrConfig::TTableServiceConfig::TResourceManager>& configs = {}) {

0 commit comments

Comments
 (0)