Skip to content

Commit 1338380

Browse files
authored
Fix in bridge grpc server (#26014)(#26493) (#26539)
2 parents 86a76a3 + 7d7e988 commit 1338380

File tree

6 files changed

+155
-48
lines changed

6 files changed

+155
-48
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#pragma once
2+
3+
#include <ydb/core/base/defs.h>
4+
#include <ydb/core/base/appdata.h>
5+
#include <ydb/library/actors/core/event_local.h>
6+
7+
namespace NKikimr {
8+
9+
struct TEvGRpcServersManager {
10+
enum EEv {
11+
EvDisconnectRequestStarted = EventSpaceBegin(TEvents::ES_PRIVATE) + 10000,
12+
EvDisconnectRequestFinished,
13+
};
14+
15+
struct TEvDisconnectRequestStarted : public TEventLocal<TEvDisconnectRequestStarted, EvDisconnectRequestStarted> {
16+
};
17+
18+
struct TEvDisconnectRequestFinished : public TEventLocal<TEvDisconnectRequestFinished, EvDisconnectRequestFinished> {
19+
};
20+
};
21+
22+
inline TActorId MakeGRpcServersManagerId(ui32 nodeId) {
23+
char x[12] = {'g','r','p','c','s','r','v','r','m','n','g','r'};
24+
return TActorId(nodeId, TStringBuf(x, 12));
25+
}
26+
27+
} // namespace NKikimr
28+

ydb/core/driver_lib/run/run.cpp

Lines changed: 84 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "auto_config_initializer.h"
22
#include "run.h"
3+
#include "grpc_servers_manager.h"
34
#include "service_initializer.h"
45
#include "kikimr_services_initializers.h"
56

@@ -173,10 +174,31 @@
173174

174175
namespace NKikimr {
175176

177+
namespace {
178+
179+
void StopGRpcServers(std::weak_ptr<TGRpcServersWrapper> grpcServersWrapper) {
180+
auto wrapper = grpcServersWrapper.lock();
181+
if (!wrapper) {
182+
return;
183+
}
184+
TGuard<TMutex> guard = wrapper->Guard();
185+
for (auto& [name, server] : wrapper->Servers) {
186+
if (!server) {
187+
continue;
188+
}
189+
server->Stop();
190+
}
191+
wrapper->Servers.clear();
192+
}
193+
194+
}
195+
176196
class TGRpcServersManager : public TActorBootstrapped<TGRpcServersManager> {
177-
TGRpcServersFactory GRpcServersFactory;
178-
TGRpcServers GRpcServers;
197+
std::weak_ptr<TGRpcServersWrapper> GRpcServersWrapper;
179198
TIntrusivePtr<NMemory::IProcessMemoryInfoProvider> ProcessMemoryInfoProvider;
199+
bool Started = false;
200+
bool StopScheduled = false;
201+
bool WaitingForDisconnectRequest = false;
180202

181203
public:
182204
enum {
@@ -192,9 +214,9 @@ class TGRpcServersManager : public TActorBootstrapped<TGRpcServersManager> {
192214
};
193215

194216
public:
195-
TGRpcServersManager(TGRpcServersFactory grpcServersFactory,
217+
TGRpcServersManager(std::weak_ptr<TGRpcServersWrapper> grpcServersWrapper,
196218
TIntrusivePtr<NMemory::IProcessMemoryInfoProvider> processMemoryInfoProvider)
197-
: GRpcServersFactory(std::move(grpcServersFactory))
219+
: GRpcServersWrapper(std::move(grpcServersWrapper))
198220
, ProcessMemoryInfoProvider(std::move(processMemoryInfoProvider))
199221
{}
200222

@@ -208,18 +230,43 @@ class TGRpcServersManager : public TActorBootstrapped<TGRpcServersManager> {
208230
if (const auto& bridgeInfo = ev->Get()->BridgeInfo) {
209231
if (NBridge::PileStateTraits(bridgeInfo->SelfNodePile->State).RequiresConfigQuorum) {
210232
Start();
211-
} else {
212-
Stop();
233+
} else if (!StopScheduled) {
234+
StopScheduled = true;
235+
CheckAndExecuteStop();
213236
}
214237
}
215238
}
216239

240+
void HandleDisconnectRequestStarted() {
241+
WaitingForDisconnectRequest = true;
242+
}
243+
244+
void HandleDisconnectRequestFinished() {
245+
WaitingForDisconnectRequest = false;
246+
CheckAndExecuteStop();
247+
}
248+
249+
void CheckAndExecuteStop() {
250+
if (StopScheduled && !WaitingForDisconnectRequest) {
251+
StopScheduled = false;
252+
Stop();
253+
}
254+
}
255+
217256
void Start() {
218-
if (GRpcServers) {
257+
if (Started) {
258+
return;
259+
}
260+
Started = true;
261+
StopScheduled = false;
262+
WaitingForDisconnectRequest = false;
263+
auto wrapper = GRpcServersWrapper.lock();
264+
if (!wrapper) {
219265
return;
220266
}
221-
GRpcServers = GRpcServersFactory();
222-
for (auto& [name, server] : GRpcServers) {
267+
TGuard<TMutex> guard = wrapper->Guard();
268+
wrapper->Servers = wrapper->GrpcServersFactory();
269+
for (auto& [name, server] : wrapper->Servers) {
223270
if (!server) {
224271
continue;
225272
}
@@ -251,12 +298,11 @@ class TGRpcServersManager : public TActorBootstrapped<TGRpcServersManager> {
251298
}
252299

253300
void Stop() {
254-
for (auto& [name, server] : GRpcServers) {
255-
if (server) {
256-
server->Stop();
257-
}
301+
if (!Started) {
302+
return;
258303
}
259-
GRpcServers.clear();
304+
Started = false;
305+
StopGRpcServers(GRpcServersWrapper);
260306
}
261307

262308
void HandleStop(TEvStop::TPtr ev) {
@@ -268,6 +314,8 @@ class TGRpcServersManager : public TActorBootstrapped<TGRpcServersManager> {
268314
STRICT_STFUNC(StateFunc,
269315
hFunc(TEvNodeWardenStorageConfig, Handle)
270316
hFunc(TEvStop, HandleStop)
317+
cFunc(TEvGRpcServersManager::EvDisconnectRequestStarted, HandleDisconnectRequestStarted)
318+
cFunc(TEvGRpcServersManager::EvDisconnectRequestFinished, HandleDisconnectRequestFinished)
271319
)
272320
};
273321

@@ -645,7 +693,10 @@ void TKikimrRunner::InitializeKqpController(const TKikimrRunConfig& runConfig) {
645693
}
646694

647695
void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
648-
GRpcServersFactory = [runConfig, this] { return CreateGRpcServers(runConfig); };
696+
if (!GRpcServersWrapper) {
697+
GRpcServersWrapper = std::make_shared<TGRpcServersWrapper>();
698+
}
699+
GRpcServersWrapper->GrpcServersFactory = [runConfig, this] { return CreateGRpcServers(runConfig); };
649700
}
650701

651702
TGRpcServers TKikimrRunner::CreateGRpcServers(const TKikimrRunConfig& runConfig) {
@@ -1990,8 +2041,10 @@ void TKikimrRunner::KikimrStart() {
19902041
Monitoring->Start(ActorSystem.Get());
19912042
}
19922043

1993-
if (GRpcServersFactory) {
1994-
GRpcServersManager = ActorSystem->Register(new TGRpcServersManager(std::move(GRpcServersFactory), ProcessMemoryInfoProvider));
2044+
if (GRpcServersWrapper) {
2045+
GRpcServersWrapper->Servers = GRpcServersWrapper->GrpcServersFactory();
2046+
GRpcServersManager = ActorSystem->Register(new TGRpcServersManager(GRpcServersWrapper, ProcessMemoryInfoProvider));
2047+
ActorSystem->RegisterLocalService(NKikimr::MakeGRpcServersManagerId(ActorSystem->NodeId), GRpcServersManager);
19952048
}
19962049

19972050
if (SqsHttp) {
@@ -2096,23 +2149,18 @@ void TKikimrRunner::KikimrStop(bool graceful) {
20962149
SqsHttp.Destroy();
20972150
}
20982151

2099-
// stop processing grpc requests/response - we must stop feeding ActorSystem
2100-
if (GRpcServersManager) {
2101-
TManualEvent event;
2102-
ActorSystem->Send(new IEventHandle(GRpcServersManager, {}, new TGRpcServersManager::TEvStop(&event)));
2103-
event.WaitI();
2104-
}
2105-
21062152
if (ActorSystem) {
21072153
ActorSystem->Stop();
21082154
}
21092155

2110-
if (YqSharedResources) {
2111-
YqSharedResources->Stop();
2156+
// stop processing grpc requests/response - we must stop feeding ActorSystem
2157+
if (GRpcServersManager) {
2158+
StopGRpcServers(GRpcServersWrapper);
2159+
GRpcServersWrapper->Servers.clear();
21122160
}
21132161

2114-
if (ActorSystem) {
2115-
ActorSystem->Cleanup();
2162+
if (YqSharedResources) {
2163+
YqSharedResources->Stop();
21162164
}
21172165

21182166
if (ModuleFactories) {
@@ -2121,12 +2169,17 @@ void TKikimrRunner::KikimrStop(bool graceful) {
21212169
}
21222170
}
21232171

2124-
if (YdbDriver) {
2125-
YdbDriver->Stop(true);
2126-
}
21272172
for (auto plugin: Plugins) {
21282173
plugin->Stop();
21292174
}
2175+
2176+
if (ActorSystem) {
2177+
ActorSystem->Cleanup();
2178+
}
2179+
2180+
if (YdbDriver) {
2181+
YdbDriver->Stop(true);
2182+
}
21302183
}
21312184

21322185
void TKikimrRunner::BusyLoop() {

ydb/core/driver_lib/run/run.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,16 @@ namespace NKikimr {
2929
using TGRpcServers = TVector<std::pair<TString, TAutoPtr<NYdbGrpc::TGRpcServer>>>;
3030
using TGRpcServersFactory = std::function<TGRpcServers()>;
3131

32+
struct TGRpcServersWrapper {
33+
TGRpcServers Servers;
34+
TGRpcServersFactory GrpcServersFactory;
35+
TMutex Mutex;
36+
37+
TGuard<TMutex> Guard() {
38+
return TGuard<TMutex>(Mutex);
39+
}
40+
};
41+
3242
class TKikimrRunner : public virtual TThrRefBase, private IGlobalObjectStorage {
3343
protected:
3444
static TProgramShouldContinue KikimrShouldContinue;
@@ -69,7 +79,7 @@ class TKikimrRunner : public virtual TThrRefBase, private IGlobalObjectStorage {
6979

7080
TKikimrRunner(std::shared_ptr<TModuleFactories> factories = {});
7181

72-
TGRpcServersFactory GRpcServersFactory;
82+
std::shared_ptr<TGRpcServersWrapper> GRpcServersWrapper;
7383
TActorId GRpcServersManager;
7484

7585
virtual ~TKikimrRunner();

ydb/core/driver_lib/run/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ SRCS(
1414
driver.h
1515
factories.h
1616
factories.cpp
17+
grpc_servers_manager.h
1718
kikimr_services_initializers.cpp
1819
kikimr_services_initializers.h
1920
main.h

ydb/core/grpc_services/rpc_bridge.cpp

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <ydb/core/base/auth.h>
77
#include <ydb/core/base/bridge.h>
88
#include <ydb/core/base/blobstorage_common.h>
9+
#include <ydb/core/driver_lib/run/grpc_servers_manager.h>
910

1011
namespace NKikimr::NGRpcService {
1112

@@ -59,6 +60,8 @@ class TUpdateClusterStateRequest : public TBridgeRequestGrpc<TUpdateClusterState
5960
using TBase = TBridgeRequestGrpc<TUpdateClusterStateRequest, TEvUpdateClusterStateRequest, Ydb::Bridge::UpdateClusterStateResult>;
6061
using TRpcBase = TRpcOperationRequestActor<TUpdateClusterStateRequest, TEvUpdateClusterStateRequest>;
6162

63+
bool IsDisconnectingRequest = false;
64+
6265
public:
6366
using TBase::TBase;
6467

@@ -248,6 +251,19 @@ class TUpdateClusterStateRequest : public TBridgeRequestGrpc<TUpdateClusterState
248251
finalPromoted->CopyToProto(&newClusterState, &NKikimrBridge::TClusterState::SetPromotedPile);
249252
newClusterState.SetGeneration(currentClusterState.GetGeneration() + 1);
250253

254+
// check if current node is transitioning to DISCONNECTED state
255+
// if so, notify GRpcServersManager to wait for this request before shutting down
256+
const auto selfNodePileId = bridgeInfo->SelfNodePile->BridgePileId;
257+
if (selfNodePileId.GetPileIndex() < newClusterState.PerPileStateSize()) {
258+
const auto newSelfState = newClusterState.GetPerPileState(selfNodePileId.GetPileIndex());
259+
if (newSelfState == NKikimrBridge::TClusterState::DISCONNECTED ||
260+
newSelfState == NKikimrBridge::TClusterState::SUSPENDED) {
261+
IsDisconnectingRequest = true;
262+
self->ActorContext().Send(NKikimr::MakeGRpcServersManagerId(self->SelfId().NodeId()),
263+
new NKikimr::TEvGRpcServersManager::TEvDisconnectRequestStarted());
264+
}
265+
}
266+
251267
auto request = std::make_unique<NStorage::TEvNodeConfigInvokeOnRoot>();
252268
auto *cmd = request->Record.MutableSwitchBridgeClusterState();
253269
cmd->MutableNewClusterState()->CopyFrom(newClusterState);
@@ -280,13 +296,23 @@ class TUpdateClusterStateRequest : public TBridgeRequestGrpc<TUpdateClusterState
280296
void Handle(NStorage::TEvNodeConfigInvokeOnRootResult::TPtr& ev) {
281297
const auto& response = ev->Get()->Record;
282298
auto* self = Self();
283-
if (response.GetStatus() == NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult::OK) {
284-
Ydb::Bridge::UpdateClusterStateResult result;
285-
self->ReplyWithResult(Ydb::StatusIds::SUCCESS, result, self->ActorContext());
299+
const bool success = response.GetStatus() == NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult::OK;
300+
auto notifyDisconnectFinished = [&]() {
301+
if (IsDisconnectingRequest) {
302+
self->ActorContext().Send(NKikimr::MakeGRpcServersManagerId(self->SelfId().NodeId()),
303+
new NKikimr::TEvGRpcServersManager::TEvDisconnectRequestFinished());
304+
}
305+
};
306+
307+
if (!success) {
308+
notifyDisconnectFinished();
309+
self->Reply(Ydb::StatusIds::INTERNAL_ERROR, response.GetErrorReason(),
310+
NKikimrIssues::TIssuesIds::DEFAULT_ERROR, self->ActorContext());
286311
return;
287312
}
288-
self->Reply(Ydb::StatusIds::INTERNAL_ERROR, response.GetErrorReason(),
289-
NKikimrIssues::TIssuesIds::DEFAULT_ERROR, self->ActorContext());
313+
Ydb::Bridge::UpdateClusterStateResult result;
314+
self->ReplyWithResult(Ydb::StatusIds::SUCCESS, result, self->ActorContext());
315+
notifyDisconnectFinished();
290316
}
291317
};
292318

ydb/tests/functional/bridge/test_bridge.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,6 @@ def test_failover(self):
3030
self.update_cluster_state(self.bridge_client, updates)
3131
self.wait_for_cluster_state(self.secondary_bridge_client, {"r1": PileState.DISCONNECTED, "r2": PileState.PRIMARY})
3232

33-
updates = [
34-
PileState(pile_name="r1", state=PileState.NOT_SYNCHRONIZED),
35-
]
36-
self.update_cluster_state(self.secondary_bridge_client, updates)
37-
self.wait_for_cluster_state(self.secondary_bridge_client, {"r1": PileState.SYNCHRONIZED, "r2": PileState.PRIMARY}, timeout_seconds=50)
38-
3933
def test_takedown(self):
4034
initial_result = self.get_cluster_state(self.bridge_client)
4135
self.check_states(initial_result, {"r1": PileState.PRIMARY, "r2": PileState.SYNCHRONIZED})
@@ -44,11 +38,6 @@ def test_takedown(self):
4438
]
4539
self.update_cluster_state(self.bridge_client, updates)
4640
self.wait_for_cluster_state(self.bridge_client, {"r1": PileState.PRIMARY, "r2": PileState.DISCONNECTED})
47-
updates = [
48-
PileState(pile_name="r2", state=PileState.NOT_SYNCHRONIZED),
49-
]
50-
self.update_cluster_state(self.bridge_client, updates)
51-
self.wait_for_cluster_state(self.bridge_client, {"r1": PileState.PRIMARY, "r2": PileState.SYNCHRONIZED}, timeout_seconds=50)
5241

5342
def test_rejoin(self):
5443
initial_result = self.get_cluster_state(self.bridge_client)

0 commit comments

Comments
 (0)