From ccd3e1ee6e964ee48fdefa7ac0f7df37b7ec79e1 Mon Sep 17 00:00:00 2001 From: kruall Date: Tue, 30 Sep 2025 15:33:54 +0300 Subject: [PATCH 1/2] Fix dublicates messages after stopping ydbd (#26014) --- ydb/core/driver_lib/run/run.cpp | 86 ++++++++++++++++++++++----------- ydb/core/driver_lib/run/run.h | 12 ++++- 2 files changed, 68 insertions(+), 30 deletions(-) diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index 7d4218fbe54c..08b0ecbbe762 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -173,10 +173,29 @@ namespace NKikimr { +namespace { + + void StopGRpcServers(std::weak_ptr grpcServersWrapper) { + auto wrapper = grpcServersWrapper.lock(); + if (!wrapper) { + return; + } + TGuard guard = wrapper->Guard(); + for (auto& [name, server] : wrapper->Servers) { + if (!server) { + continue; + } + server->Stop(); + } + wrapper->Servers.clear(); + } + +} + class TGRpcServersManager : public TActorBootstrapped { - TGRpcServersFactory GRpcServersFactory; - TGRpcServers GRpcServers; + std::weak_ptr GRpcServersWrapper; TIntrusivePtr ProcessMemoryInfoProvider; + bool Started = false; public: enum { @@ -192,9 +211,9 @@ class TGRpcServersManager : public TActorBootstrapped { }; public: - TGRpcServersManager(TGRpcServersFactory grpcServersFactory, + TGRpcServersManager(std::weak_ptr grpcServersWrapper, TIntrusivePtr processMemoryInfoProvider) - : GRpcServersFactory(std::move(grpcServersFactory)) + : GRpcServersWrapper(std::move(grpcServersWrapper)) , ProcessMemoryInfoProvider(std::move(processMemoryInfoProvider)) {} @@ -215,11 +234,17 @@ class TGRpcServersManager : public TActorBootstrapped { } void Start() { - if (GRpcServers) { + if (Started) { + return; + } + Started = true; + auto wrapper = GRpcServersWrapper.lock(); + if (!wrapper) { return; } - GRpcServers = GRpcServersFactory(); - for (auto& [name, server] : GRpcServers) { + TGuard guard = wrapper->Guard(); + wrapper->Servers = wrapper->GrpcServersFactory(); + for (auto& [name, server] : wrapper->Servers) { if (!server) { continue; } @@ -251,12 +276,11 @@ class TGRpcServersManager : public TActorBootstrapped { } void Stop() { - for (auto& [name, server] : GRpcServers) { - if (server) { - server->Stop(); - } + if (!Started) { + return; } - GRpcServers.clear(); + Started = false; + StopGRpcServers(GRpcServersWrapper); } void HandleStop(TEvStop::TPtr ev) { @@ -645,7 +669,10 @@ void TKikimrRunner::InitializeKqpController(const TKikimrRunConfig& runConfig) { } void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { - GRpcServersFactory = [runConfig, this] { return CreateGRpcServers(runConfig); }; + if (!GRpcServersWrapper) { + GRpcServersWrapper = std::make_shared(); + } + GRpcServersWrapper->GrpcServersFactory = [runConfig, this] { return CreateGRpcServers(runConfig); }; } TGRpcServers TKikimrRunner::CreateGRpcServers(const TKikimrRunConfig& runConfig) { @@ -1990,8 +2017,9 @@ void TKikimrRunner::KikimrStart() { Monitoring->Start(ActorSystem.Get()); } - if (GRpcServersFactory) { - GRpcServersManager = ActorSystem->Register(new TGRpcServersManager(std::move(GRpcServersFactory), ProcessMemoryInfoProvider)); + if (GRpcServersWrapper) { + GRpcServersWrapper->Servers = GRpcServersWrapper->GrpcServersFactory(); + GRpcServersManager = ActorSystem->Register(new TGRpcServersManager(GRpcServersWrapper, ProcessMemoryInfoProvider)); } if (SqsHttp) { @@ -2096,23 +2124,18 @@ void TKikimrRunner::KikimrStop(bool graceful) { SqsHttp.Destroy(); } - // stop processing grpc requests/response - we must stop feeding ActorSystem - if (GRpcServersManager) { - TManualEvent event; - ActorSystem->Send(new IEventHandle(GRpcServersManager, {}, new TGRpcServersManager::TEvStop(&event))); - event.WaitI(); - } - if (ActorSystem) { ActorSystem->Stop(); } - if (YqSharedResources) { - YqSharedResources->Stop(); + // stop processing grpc requests/response - we must stop feeding ActorSystem + if (GRpcServersManager) { + StopGRpcServers(GRpcServersWrapper); + GRpcServersWrapper->Servers.clear(); } - if (ActorSystem) { - ActorSystem->Cleanup(); + if (YqSharedResources) { + YqSharedResources->Stop(); } if (ModuleFactories) { @@ -2121,12 +2144,17 @@ void TKikimrRunner::KikimrStop(bool graceful) { } } - if (YdbDriver) { - YdbDriver->Stop(true); - } for (auto plugin: Plugins) { plugin->Stop(); } + + if (ActorSystem) { + ActorSystem->Cleanup(); + } + + if (YdbDriver) { + YdbDriver->Stop(true); + } } void TKikimrRunner::BusyLoop() { diff --git a/ydb/core/driver_lib/run/run.h b/ydb/core/driver_lib/run/run.h index 8368cfd3eab9..197e59b5bb9f 100644 --- a/ydb/core/driver_lib/run/run.h +++ b/ydb/core/driver_lib/run/run.h @@ -29,6 +29,16 @@ namespace NKikimr { using TGRpcServers = TVector>>; using TGRpcServersFactory = std::function; +struct TGRpcServersWrapper { + TGRpcServers Servers; + TGRpcServersFactory GrpcServersFactory; + TMutex Mutex; + + TGuard Guard() { + return TGuard(Mutex); + } +}; + class TKikimrRunner : public virtual TThrRefBase, private IGlobalObjectStorage { protected: static TProgramShouldContinue KikimrShouldContinue; @@ -69,7 +79,7 @@ class TKikimrRunner : public virtual TThrRefBase, private IGlobalObjectStorage { TKikimrRunner(std::shared_ptr factories = {}); - TGRpcServersFactory GRpcServersFactory; + std::shared_ptr GRpcServersWrapper; TActorId GRpcServersManager; virtual ~TKikimrRunner(); From 7d7e9881d9f6406255d0e051d2e174e1f7bec81d Mon Sep 17 00:00:00 2001 From: mregrock Date: Wed, 8 Oct 2025 11:57:22 +0300 Subject: [PATCH 2/2] Some fixes in Bridge GRPC disconnect (#26493) --- .../driver_lib/run/grpc_servers_manager.h | 28 +++++++++++++++ ydb/core/driver_lib/run/run.cpp | 29 +++++++++++++-- ydb/core/driver_lib/run/ya.make | 1 + ydb/core/grpc_services/rpc_bridge.cpp | 36 ++++++++++++++++--- ydb/tests/functional/bridge/test_bridge.py | 11 ------ 5 files changed, 87 insertions(+), 18 deletions(-) create mode 100644 ydb/core/driver_lib/run/grpc_servers_manager.h diff --git a/ydb/core/driver_lib/run/grpc_servers_manager.h b/ydb/core/driver_lib/run/grpc_servers_manager.h new file mode 100644 index 000000000000..7e65b9e71465 --- /dev/null +++ b/ydb/core/driver_lib/run/grpc_servers_manager.h @@ -0,0 +1,28 @@ +#pragma once + +#include +#include +#include + +namespace NKikimr { + +struct TEvGRpcServersManager { + enum EEv { + EvDisconnectRequestStarted = EventSpaceBegin(TEvents::ES_PRIVATE) + 10000, + EvDisconnectRequestFinished, + }; + + struct TEvDisconnectRequestStarted : public TEventLocal { + }; + + struct TEvDisconnectRequestFinished : public TEventLocal { + }; +}; + +inline TActorId MakeGRpcServersManagerId(ui32 nodeId) { + char x[12] = {'g','r','p','c','s','r','v','r','m','n','g','r'}; + return TActorId(nodeId, TStringBuf(x, 12)); +} + +} // namespace NKikimr + diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index 08b0ecbbe762..53c09d4756d1 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -1,5 +1,6 @@ #include "auto_config_initializer.h" #include "run.h" +#include "grpc_servers_manager.h" #include "service_initializer.h" #include "kikimr_services_initializers.h" @@ -196,6 +197,8 @@ class TGRpcServersManager : public TActorBootstrapped { std::weak_ptr GRpcServersWrapper; TIntrusivePtr ProcessMemoryInfoProvider; bool Started = false; + bool StopScheduled = false; + bool WaitingForDisconnectRequest = false; public: enum { @@ -227,17 +230,36 @@ class TGRpcServersManager : public TActorBootstrapped { if (const auto& bridgeInfo = ev->Get()->BridgeInfo) { if (NBridge::PileStateTraits(bridgeInfo->SelfNodePile->State).RequiresConfigQuorum) { Start(); - } else { - Stop(); + } else if (!StopScheduled) { + StopScheduled = true; + CheckAndExecuteStop(); } } } + void HandleDisconnectRequestStarted() { + WaitingForDisconnectRequest = true; + } + + void HandleDisconnectRequestFinished() { + WaitingForDisconnectRequest = false; + CheckAndExecuteStop(); + } + + void CheckAndExecuteStop() { + if (StopScheduled && !WaitingForDisconnectRequest) { + StopScheduled = false; + Stop(); + } + } + void Start() { if (Started) { return; } Started = true; + StopScheduled = false; + WaitingForDisconnectRequest = false; auto wrapper = GRpcServersWrapper.lock(); if (!wrapper) { return; @@ -292,6 +314,8 @@ class TGRpcServersManager : public TActorBootstrapped { STRICT_STFUNC(StateFunc, hFunc(TEvNodeWardenStorageConfig, Handle) hFunc(TEvStop, HandleStop) + cFunc(TEvGRpcServersManager::EvDisconnectRequestStarted, HandleDisconnectRequestStarted) + cFunc(TEvGRpcServersManager::EvDisconnectRequestFinished, HandleDisconnectRequestFinished) ) }; @@ -2020,6 +2044,7 @@ void TKikimrRunner::KikimrStart() { if (GRpcServersWrapper) { GRpcServersWrapper->Servers = GRpcServersWrapper->GrpcServersFactory(); GRpcServersManager = ActorSystem->Register(new TGRpcServersManager(GRpcServersWrapper, ProcessMemoryInfoProvider)); + ActorSystem->RegisterLocalService(NKikimr::MakeGRpcServersManagerId(ActorSystem->NodeId), GRpcServersManager); } if (SqsHttp) { diff --git a/ydb/core/driver_lib/run/ya.make b/ydb/core/driver_lib/run/ya.make index 4bf76f194cd4..4f3fec07ec53 100644 --- a/ydb/core/driver_lib/run/ya.make +++ b/ydb/core/driver_lib/run/ya.make @@ -14,6 +14,7 @@ SRCS( driver.h factories.h factories.cpp + grpc_servers_manager.h kikimr_services_initializers.cpp kikimr_services_initializers.h main.h diff --git a/ydb/core/grpc_services/rpc_bridge.cpp b/ydb/core/grpc_services/rpc_bridge.cpp index 751e2ff05537..ee491820cba6 100644 --- a/ydb/core/grpc_services/rpc_bridge.cpp +++ b/ydb/core/grpc_services/rpc_bridge.cpp @@ -6,6 +6,7 @@ #include #include #include +#include namespace NKikimr::NGRpcService { @@ -59,6 +60,8 @@ class TUpdateClusterStateRequest : public TBridgeRequestGrpc; using TRpcBase = TRpcOperationRequestActor; + bool IsDisconnectingRequest = false; + public: using TBase::TBase; @@ -248,6 +251,19 @@ class TUpdateClusterStateRequest : public TBridgeRequestGrpcCopyToProto(&newClusterState, &NKikimrBridge::TClusterState::SetPromotedPile); newClusterState.SetGeneration(currentClusterState.GetGeneration() + 1); + // check if current node is transitioning to DISCONNECTED state + // if so, notify GRpcServersManager to wait for this request before shutting down + const auto selfNodePileId = bridgeInfo->SelfNodePile->BridgePileId; + if (selfNodePileId.GetPileIndex() < newClusterState.PerPileStateSize()) { + const auto newSelfState = newClusterState.GetPerPileState(selfNodePileId.GetPileIndex()); + if (newSelfState == NKikimrBridge::TClusterState::DISCONNECTED || + newSelfState == NKikimrBridge::TClusterState::SUSPENDED) { + IsDisconnectingRequest = true; + self->ActorContext().Send(NKikimr::MakeGRpcServersManagerId(self->SelfId().NodeId()), + new NKikimr::TEvGRpcServersManager::TEvDisconnectRequestStarted()); + } + } + auto request = std::make_unique(); auto *cmd = request->Record.MutableSwitchBridgeClusterState(); cmd->MutableNewClusterState()->CopyFrom(newClusterState); @@ -280,13 +296,23 @@ class TUpdateClusterStateRequest : public TBridgeRequestGrpcGet()->Record; auto* self = Self(); - if (response.GetStatus() == NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult::OK) { - Ydb::Bridge::UpdateClusterStateResult result; - self->ReplyWithResult(Ydb::StatusIds::SUCCESS, result, self->ActorContext()); + const bool success = response.GetStatus() == NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult::OK; + auto notifyDisconnectFinished = [&]() { + if (IsDisconnectingRequest) { + self->ActorContext().Send(NKikimr::MakeGRpcServersManagerId(self->SelfId().NodeId()), + new NKikimr::TEvGRpcServersManager::TEvDisconnectRequestFinished()); + } + }; + + if (!success) { + notifyDisconnectFinished(); + self->Reply(Ydb::StatusIds::INTERNAL_ERROR, response.GetErrorReason(), + NKikimrIssues::TIssuesIds::DEFAULT_ERROR, self->ActorContext()); return; } - self->Reply(Ydb::StatusIds::INTERNAL_ERROR, response.GetErrorReason(), - NKikimrIssues::TIssuesIds::DEFAULT_ERROR, self->ActorContext()); + Ydb::Bridge::UpdateClusterStateResult result; + self->ReplyWithResult(Ydb::StatusIds::SUCCESS, result, self->ActorContext()); + notifyDisconnectFinished(); } }; diff --git a/ydb/tests/functional/bridge/test_bridge.py b/ydb/tests/functional/bridge/test_bridge.py index 3648c3a5ae2d..9dfe2d36d2c1 100644 --- a/ydb/tests/functional/bridge/test_bridge.py +++ b/ydb/tests/functional/bridge/test_bridge.py @@ -30,12 +30,6 @@ def test_failover(self): self.update_cluster_state(self.bridge_client, updates) self.wait_for_cluster_state(self.secondary_bridge_client, {"r1": PileState.DISCONNECTED, "r2": PileState.PRIMARY}) - updates = [ - PileState(pile_name="r1", state=PileState.NOT_SYNCHRONIZED), - ] - self.update_cluster_state(self.secondary_bridge_client, updates) - self.wait_for_cluster_state(self.secondary_bridge_client, {"r1": PileState.SYNCHRONIZED, "r2": PileState.PRIMARY}, timeout_seconds=50) - def test_takedown(self): initial_result = self.get_cluster_state(self.bridge_client) self.check_states(initial_result, {"r1": PileState.PRIMARY, "r2": PileState.SYNCHRONIZED}) @@ -44,11 +38,6 @@ def test_takedown(self): ] self.update_cluster_state(self.bridge_client, updates) self.wait_for_cluster_state(self.bridge_client, {"r1": PileState.PRIMARY, "r2": PileState.DISCONNECTED}) - updates = [ - PileState(pile_name="r2", state=PileState.NOT_SYNCHRONIZED), - ] - self.update_cluster_state(self.bridge_client, updates) - self.wait_for_cluster_state(self.bridge_client, {"r1": PileState.PRIMARY, "r2": PileState.SYNCHRONIZED}, timeout_seconds=50) def test_rejoin(self): initial_result = self.get_cluster_state(self.bridge_client)