Skip to content

Commit b657967

Browse files
committed
Some fixes in Bridge GRPC disconnect (#26493)
1 parent 4111d38 commit b657967

File tree

5 files changed

+87
-18
lines changed

5 files changed

+87
-18
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#pragma once
2+
3+
#include <ydb/core/base/defs.h>
4+
#include <ydb/core/base/appdata.h>
5+
#include <ydb/library/actors/core/event_local.h>
6+
7+
namespace NKikimr {
8+
9+
struct TEvGRpcServersManager {
10+
enum EEv {
11+
EvDisconnectRequestStarted = EventSpaceBegin(TEvents::ES_PRIVATE) + 10000,
12+
EvDisconnectRequestFinished,
13+
};
14+
15+
struct TEvDisconnectRequestStarted : public TEventLocal<TEvDisconnectRequestStarted, EvDisconnectRequestStarted> {
16+
};
17+
18+
struct TEvDisconnectRequestFinished : public TEventLocal<TEvDisconnectRequestFinished, EvDisconnectRequestFinished> {
19+
};
20+
};
21+
22+
inline TActorId MakeGRpcServersManagerId(ui32 nodeId) {
23+
char x[12] = {'g','r','p','c','s','r','v','r','m','n','g','r'};
24+
return TActorId(nodeId, TStringBuf(x, 12));
25+
}
26+
27+
} // namespace NKikimr
28+

ydb/core/driver_lib/run/run.cpp

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "auto_config_initializer.h"
22
#include "run.h"
3+
#include "grpc_servers_manager.h"
34
#include "service_initializer.h"
45
#include "kikimr_services_initializers.h"
56

@@ -196,6 +197,8 @@ class TGRpcServersManager : public TActorBootstrapped<TGRpcServersManager> {
196197
std::weak_ptr<TGRpcServersWrapper> GRpcServersWrapper;
197198
TIntrusivePtr<NMemory::IProcessMemoryInfoProvider> ProcessMemoryInfoProvider;
198199
bool Started = false;
200+
bool StopScheduled = false;
201+
bool WaitingForDisconnectRequest = false;
199202

200203
public:
201204
enum {
@@ -227,17 +230,36 @@ class TGRpcServersManager : public TActorBootstrapped<TGRpcServersManager> {
227230
if (const auto& bridgeInfo = ev->Get()->BridgeInfo) {
228231
if (NBridge::PileStateTraits(bridgeInfo->SelfNodePile->State).RequiresConfigQuorum) {
229232
Start();
230-
} else {
231-
Stop();
233+
} else if (!StopScheduled) {
234+
StopScheduled = true;
235+
CheckAndExecuteStop();
232236
}
233237
}
234238
}
235239

240+
void HandleDisconnectRequestStarted() {
241+
WaitingForDisconnectRequest = true;
242+
}
243+
244+
void HandleDisconnectRequestFinished() {
245+
WaitingForDisconnectRequest = false;
246+
CheckAndExecuteStop();
247+
}
248+
249+
void CheckAndExecuteStop() {
250+
if (StopScheduled && !WaitingForDisconnectRequest) {
251+
StopScheduled = false;
252+
Stop();
253+
}
254+
}
255+
236256
void Start() {
237257
if (Started) {
238258
return;
239259
}
240260
Started = true;
261+
StopScheduled = false;
262+
WaitingForDisconnectRequest = false;
241263
auto wrapper = GRpcServersWrapper.lock();
242264
if (!wrapper) {
243265
return;
@@ -292,6 +314,8 @@ class TGRpcServersManager : public TActorBootstrapped<TGRpcServersManager> {
292314
STRICT_STFUNC(StateFunc,
293315
hFunc(TEvNodeWardenStorageConfig, Handle)
294316
hFunc(TEvStop, HandleStop)
317+
cFunc(TEvGRpcServersManager::EvDisconnectRequestStarted, HandleDisconnectRequestStarted)
318+
cFunc(TEvGRpcServersManager::EvDisconnectRequestFinished, HandleDisconnectRequestFinished)
295319
)
296320
};
297321

@@ -2020,6 +2044,7 @@ void TKikimrRunner::KikimrStart() {
20202044
if (GRpcServersWrapper) {
20212045
GRpcServersWrapper->Servers = GRpcServersWrapper->GrpcServersFactory();
20222046
GRpcServersManager = ActorSystem->Register(new TGRpcServersManager(GRpcServersWrapper, ProcessMemoryInfoProvider));
2047+
ActorSystem->RegisterLocalService(NKikimr::MakeGRpcServersManagerId(ActorSystem->NodeId), GRpcServersManager);
20232048
}
20242049

20252050
if (SqsHttp) {

ydb/core/driver_lib/run/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ SRCS(
1414
driver.h
1515
factories.h
1616
factories.cpp
17+
grpc_servers_manager.h
1718
kikimr_services_initializers.cpp
1819
kikimr_services_initializers.h
1920
main.h

ydb/core/grpc_services/rpc_bridge.cpp

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <ydb/core/base/auth.h>
77
#include <ydb/core/base/bridge.h>
88
#include <ydb/core/base/blobstorage_common.h>
9+
#include <ydb/core/driver_lib/run/grpc_servers_manager.h>
910

1011
namespace NKikimr::NGRpcService {
1112

@@ -59,6 +60,8 @@ class TUpdateClusterStateRequest : public TBridgeRequestGrpc<TUpdateClusterState
5960
using TBase = TBridgeRequestGrpc<TUpdateClusterStateRequest, TEvUpdateClusterStateRequest, Ydb::Bridge::UpdateClusterStateResult>;
6061
using TRpcBase = TRpcOperationRequestActor<TUpdateClusterStateRequest, TEvUpdateClusterStateRequest>;
6162

63+
bool IsDisconnectingRequest = false;
64+
6265
public:
6366
using TBase::TBase;
6467

@@ -248,6 +251,19 @@ class TUpdateClusterStateRequest : public TBridgeRequestGrpc<TUpdateClusterState
248251
finalPromoted->CopyToProto(&newClusterState, &NKikimrBridge::TClusterState::SetPromotedPile);
249252
newClusterState.SetGeneration(currentClusterState.GetGeneration() + 1);
250253

254+
// check if current node is transitioning to DISCONNECTED state
255+
// if so, notify GRpcServersManager to wait for this request before shutting down
256+
const auto selfNodePileId = bridgeInfo->SelfNodePile->BridgePileId;
257+
if (selfNodePileId.GetPileIndex() < newClusterState.PerPileStateSize()) {
258+
const auto newSelfState = newClusterState.GetPerPileState(selfNodePileId.GetPileIndex());
259+
if (newSelfState == NKikimrBridge::TClusterState::DISCONNECTED ||
260+
newSelfState == NKikimrBridge::TClusterState::SUSPENDED) {
261+
IsDisconnectingRequest = true;
262+
self->ActorContext().Send(NKikimr::MakeGRpcServersManagerId(self->SelfId().NodeId()),
263+
new NKikimr::TEvGRpcServersManager::TEvDisconnectRequestStarted());
264+
}
265+
}
266+
251267
auto request = std::make_unique<NStorage::TEvNodeConfigInvokeOnRoot>();
252268
auto *cmd = request->Record.MutableSwitchBridgeClusterState();
253269
cmd->MutableNewClusterState()->CopyFrom(newClusterState);
@@ -280,13 +296,23 @@ class TUpdateClusterStateRequest : public TBridgeRequestGrpc<TUpdateClusterState
280296
void Handle(NStorage::TEvNodeConfigInvokeOnRootResult::TPtr& ev) {
281297
const auto& response = ev->Get()->Record;
282298
auto* self = Self();
283-
if (response.GetStatus() == NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult::OK) {
284-
Ydb::Bridge::UpdateClusterStateResult result;
285-
self->ReplyWithResult(Ydb::StatusIds::SUCCESS, result, self->ActorContext());
299+
const bool success = response.GetStatus() == NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult::OK;
300+
auto notifyDisconnectFinished = [&]() {
301+
if (IsDisconnectingRequest) {
302+
self->ActorContext().Send(NKikimr::MakeGRpcServersManagerId(self->SelfId().NodeId()),
303+
new NKikimr::TEvGRpcServersManager::TEvDisconnectRequestFinished());
304+
}
305+
};
306+
307+
if (!success) {
308+
notifyDisconnectFinished();
309+
self->Reply(Ydb::StatusIds::INTERNAL_ERROR, response.GetErrorReason(),
310+
NKikimrIssues::TIssuesIds::DEFAULT_ERROR, self->ActorContext());
286311
return;
287312
}
288-
self->Reply(Ydb::StatusIds::INTERNAL_ERROR, response.GetErrorReason(),
289-
NKikimrIssues::TIssuesIds::DEFAULT_ERROR, self->ActorContext());
313+
Ydb::Bridge::UpdateClusterStateResult result;
314+
self->ReplyWithResult(Ydb::StatusIds::SUCCESS, result, self->ActorContext());
315+
notifyDisconnectFinished();
290316
}
291317
};
292318

ydb/tests/functional/bridge/test_bridge.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,6 @@ def test_failover(self):
3030
self.update_cluster_state(self.bridge_client, updates)
3131
self.wait_for_cluster_state(self.secondary_bridge_client, {"r1": PileState.DISCONNECTED, "r2": PileState.PRIMARY})
3232

33-
updates = [
34-
PileState(pile_name="r1", state=PileState.NOT_SYNCHRONIZED),
35-
]
36-
self.update_cluster_state(self.secondary_bridge_client, updates)
37-
self.wait_for_cluster_state(self.secondary_bridge_client, {"r1": PileState.SYNCHRONIZED, "r2": PileState.PRIMARY}, timeout_seconds=50)
38-
3933
def test_takedown(self):
4034
initial_result = self.get_cluster_state(self.bridge_client)
4135
self.check_states(initial_result, {"r1": PileState.PRIMARY, "r2": PileState.SYNCHRONIZED})
@@ -44,11 +38,6 @@ def test_takedown(self):
4438
]
4539
self.update_cluster_state(self.bridge_client, updates)
4640
self.wait_for_cluster_state(self.bridge_client, {"r1": PileState.PRIMARY, "r2": PileState.DISCONNECTED})
47-
updates = [
48-
PileState(pile_name="r2", state=PileState.NOT_SYNCHRONIZED),
49-
]
50-
self.update_cluster_state(self.bridge_client, updates)
51-
self.wait_for_cluster_state(self.bridge_client, {"r1": PileState.PRIMARY, "r2": PileState.SYNCHRONIZED}, timeout_seconds=50)
5241

5342
def test_rejoin(self):
5443
initial_result = self.get_cluster_state(self.bridge_client)

0 commit comments

Comments
 (0)