Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions ydb/core/driver_lib/run/grpc_servers_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#pragma once

#include <ydb/core/base/defs.h>
#include <ydb/core/base/appdata.h>
#include <ydb/library/actors/core/event_local.h>

namespace NKikimr {

struct TEvGRpcServersManager {
enum EEv {
EvDisconnectRequestStarted = EventSpaceBegin(TEvents::ES_PRIVATE) + 10000,
EvDisconnectRequestFinished,
};

struct TEvDisconnectRequestStarted : public TEventLocal<TEvDisconnectRequestStarted, EvDisconnectRequestStarted> {
};

struct TEvDisconnectRequestFinished : public TEventLocal<TEvDisconnectRequestFinished, EvDisconnectRequestFinished> {
};
};

inline TActorId MakeGRpcServersManagerId(ui32 nodeId) {
char x[12] = {'g','r','p','c','s','r','v','r','m','n','g','r'};
return TActorId(nodeId, TStringBuf(x, 12));
}

} // namespace NKikimr

115 changes: 84 additions & 31 deletions ydb/core/driver_lib/run/run.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "auto_config_initializer.h"
#include "run.h"
#include "grpc_servers_manager.h"
#include "service_initializer.h"
#include "kikimr_services_initializers.h"

Expand Down Expand Up @@ -173,10 +174,31 @@

namespace NKikimr {

namespace {

void StopGRpcServers(std::weak_ptr<TGRpcServersWrapper> grpcServersWrapper) {
auto wrapper = grpcServersWrapper.lock();
if (!wrapper) {
return;
}
TGuard<TMutex> guard = wrapper->Guard();
for (auto& [name, server] : wrapper->Servers) {
if (!server) {
continue;
}
server->Stop();
}
wrapper->Servers.clear();
}

}

class TGRpcServersManager : public TActorBootstrapped<TGRpcServersManager> {
TGRpcServersFactory GRpcServersFactory;
TGRpcServers GRpcServers;
std::weak_ptr<TGRpcServersWrapper> GRpcServersWrapper;
TIntrusivePtr<NMemory::IProcessMemoryInfoProvider> ProcessMemoryInfoProvider;
bool Started = false;
bool StopScheduled = false;
bool WaitingForDisconnectRequest = false;

public:
enum {
Expand All @@ -192,9 +214,9 @@ class TGRpcServersManager : public TActorBootstrapped<TGRpcServersManager> {
};

public:
TGRpcServersManager(TGRpcServersFactory grpcServersFactory,
TGRpcServersManager(std::weak_ptr<TGRpcServersWrapper> grpcServersWrapper,
TIntrusivePtr<NMemory::IProcessMemoryInfoProvider> processMemoryInfoProvider)
: GRpcServersFactory(std::move(grpcServersFactory))
: GRpcServersWrapper(std::move(grpcServersWrapper))
, ProcessMemoryInfoProvider(std::move(processMemoryInfoProvider))
{}

Expand All @@ -208,18 +230,43 @@ class TGRpcServersManager : public TActorBootstrapped<TGRpcServersManager> {
if (const auto& bridgeInfo = ev->Get()->BridgeInfo) {
if (NBridge::PileStateTraits(bridgeInfo->SelfNodePile->State).RequiresConfigQuorum) {
Start();
} else {
Stop();
} else if (!StopScheduled) {
StopScheduled = true;
CheckAndExecuteStop();
}
}
}

void HandleDisconnectRequestStarted() {
WaitingForDisconnectRequest = true;
}

void HandleDisconnectRequestFinished() {
WaitingForDisconnectRequest = false;
CheckAndExecuteStop();
}

void CheckAndExecuteStop() {
if (StopScheduled && !WaitingForDisconnectRequest) {
StopScheduled = false;
Stop();
}
}

void Start() {
if (GRpcServers) {
if (Started) {
return;
}
Started = true;
StopScheduled = false;
WaitingForDisconnectRequest = false;
auto wrapper = GRpcServersWrapper.lock();
if (!wrapper) {
return;
}
GRpcServers = GRpcServersFactory();
for (auto& [name, server] : GRpcServers) {
TGuard<TMutex> guard = wrapper->Guard();
wrapper->Servers = wrapper->GrpcServersFactory();
for (auto& [name, server] : wrapper->Servers) {
if (!server) {
continue;
}
Expand Down Expand Up @@ -251,12 +298,11 @@ class TGRpcServersManager : public TActorBootstrapped<TGRpcServersManager> {
}

void Stop() {
for (auto& [name, server] : GRpcServers) {
if (server) {
server->Stop();
}
if (!Started) {
return;
}
GRpcServers.clear();
Started = false;
StopGRpcServers(GRpcServersWrapper);
}

void HandleStop(TEvStop::TPtr ev) {
Expand All @@ -268,6 +314,8 @@ class TGRpcServersManager : public TActorBootstrapped<TGRpcServersManager> {
STRICT_STFUNC(StateFunc,
hFunc(TEvNodeWardenStorageConfig, Handle)
hFunc(TEvStop, HandleStop)
cFunc(TEvGRpcServersManager::EvDisconnectRequestStarted, HandleDisconnectRequestStarted)
cFunc(TEvGRpcServersManager::EvDisconnectRequestFinished, HandleDisconnectRequestFinished)
)
};

Expand Down Expand Up @@ -645,7 +693,10 @@ void TKikimrRunner::InitializeKqpController(const TKikimrRunConfig& runConfig) {
}

void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
GRpcServersFactory = [runConfig, this] { return CreateGRpcServers(runConfig); };
if (!GRpcServersWrapper) {
GRpcServersWrapper = std::make_shared<TGRpcServersWrapper>();
}
GRpcServersWrapper->GrpcServersFactory = [runConfig, this] { return CreateGRpcServers(runConfig); };
}

TGRpcServers TKikimrRunner::CreateGRpcServers(const TKikimrRunConfig& runConfig) {
Expand Down Expand Up @@ -1990,8 +2041,10 @@ void TKikimrRunner::KikimrStart() {
Monitoring->Start(ActorSystem.Get());
}

if (GRpcServersFactory) {
GRpcServersManager = ActorSystem->Register(new TGRpcServersManager(std::move(GRpcServersFactory), ProcessMemoryInfoProvider));
if (GRpcServersWrapper) {
GRpcServersWrapper->Servers = GRpcServersWrapper->GrpcServersFactory();
GRpcServersManager = ActorSystem->Register(new TGRpcServersManager(GRpcServersWrapper, ProcessMemoryInfoProvider));
ActorSystem->RegisterLocalService(NKikimr::MakeGRpcServersManagerId(ActorSystem->NodeId), GRpcServersManager);
}

if (SqsHttp) {
Expand Down Expand Up @@ -2096,23 +2149,18 @@ void TKikimrRunner::KikimrStop(bool graceful) {
SqsHttp.Destroy();
}

// stop processing grpc requests/response - we must stop feeding ActorSystem
if (GRpcServersManager) {
TManualEvent event;
ActorSystem->Send(new IEventHandle(GRpcServersManager, {}, new TGRpcServersManager::TEvStop(&event)));
event.WaitI();
}

if (ActorSystem) {
ActorSystem->Stop();
}

if (YqSharedResources) {
YqSharedResources->Stop();
// stop processing grpc requests/response - we must stop feeding ActorSystem
if (GRpcServersManager) {
StopGRpcServers(GRpcServersWrapper);
GRpcServersWrapper->Servers.clear();
}

if (ActorSystem) {
ActorSystem->Cleanup();
if (YqSharedResources) {
YqSharedResources->Stop();
}

if (ModuleFactories) {
Expand All @@ -2121,12 +2169,17 @@ void TKikimrRunner::KikimrStop(bool graceful) {
}
}

if (YdbDriver) {
YdbDriver->Stop(true);
}
for (auto plugin: Plugins) {
plugin->Stop();
}

if (ActorSystem) {
ActorSystem->Cleanup();
}

if (YdbDriver) {
YdbDriver->Stop(true);
}
}

void TKikimrRunner::BusyLoop() {
Expand Down
12 changes: 11 additions & 1 deletion ydb/core/driver_lib/run/run.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ namespace NKikimr {
using TGRpcServers = TVector<std::pair<TString, TAutoPtr<NYdbGrpc::TGRpcServer>>>;
using TGRpcServersFactory = std::function<TGRpcServers()>;

struct TGRpcServersWrapper {
TGRpcServers Servers;
TGRpcServersFactory GrpcServersFactory;
TMutex Mutex;

TGuard<TMutex> Guard() {
return TGuard<TMutex>(Mutex);
}
};

class TKikimrRunner : public virtual TThrRefBase, private IGlobalObjectStorage {
protected:
static TProgramShouldContinue KikimrShouldContinue;
Expand Down Expand Up @@ -69,7 +79,7 @@ class TKikimrRunner : public virtual TThrRefBase, private IGlobalObjectStorage {

TKikimrRunner(std::shared_ptr<TModuleFactories> factories = {});

TGRpcServersFactory GRpcServersFactory;
std::shared_ptr<TGRpcServersWrapper> GRpcServersWrapper;
TActorId GRpcServersManager;

virtual ~TKikimrRunner();
Expand Down
1 change: 1 addition & 0 deletions ydb/core/driver_lib/run/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ SRCS(
driver.h
factories.h
factories.cpp
grpc_servers_manager.h
kikimr_services_initializers.cpp
kikimr_services_initializers.h
main.h
Expand Down
36 changes: 31 additions & 5 deletions ydb/core/grpc_services/rpc_bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <ydb/core/base/auth.h>
#include <ydb/core/base/bridge.h>
#include <ydb/core/base/blobstorage_common.h>
#include <ydb/core/driver_lib/run/grpc_servers_manager.h>

namespace NKikimr::NGRpcService {

Expand Down Expand Up @@ -59,6 +60,8 @@ class TUpdateClusterStateRequest : public TBridgeRequestGrpc<TUpdateClusterState
using TBase = TBridgeRequestGrpc<TUpdateClusterStateRequest, TEvUpdateClusterStateRequest, Ydb::Bridge::UpdateClusterStateResult>;
using TRpcBase = TRpcOperationRequestActor<TUpdateClusterStateRequest, TEvUpdateClusterStateRequest>;

bool IsDisconnectingRequest = false;

public:
using TBase::TBase;

Expand Down Expand Up @@ -248,6 +251,19 @@ class TUpdateClusterStateRequest : public TBridgeRequestGrpc<TUpdateClusterState
finalPromoted->CopyToProto(&newClusterState, &NKikimrBridge::TClusterState::SetPromotedPile);
newClusterState.SetGeneration(currentClusterState.GetGeneration() + 1);

// check if current node is transitioning to DISCONNECTED state
// if so, notify GRpcServersManager to wait for this request before shutting down
const auto selfNodePileId = bridgeInfo->SelfNodePile->BridgePileId;
if (selfNodePileId.GetPileIndex() < newClusterState.PerPileStateSize()) {
const auto newSelfState = newClusterState.GetPerPileState(selfNodePileId.GetPileIndex());
if (newSelfState == NKikimrBridge::TClusterState::DISCONNECTED ||
newSelfState == NKikimrBridge::TClusterState::SUSPENDED) {
IsDisconnectingRequest = true;
self->ActorContext().Send(NKikimr::MakeGRpcServersManagerId(self->SelfId().NodeId()),
new NKikimr::TEvGRpcServersManager::TEvDisconnectRequestStarted());
}
}

auto request = std::make_unique<NStorage::TEvNodeConfigInvokeOnRoot>();
auto *cmd = request->Record.MutableSwitchBridgeClusterState();
cmd->MutableNewClusterState()->CopyFrom(newClusterState);
Expand Down Expand Up @@ -280,13 +296,23 @@ class TUpdateClusterStateRequest : public TBridgeRequestGrpc<TUpdateClusterState
void Handle(NStorage::TEvNodeConfigInvokeOnRootResult::TPtr& ev) {
const auto& response = ev->Get()->Record;
auto* self = Self();
if (response.GetStatus() == NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult::OK) {
Ydb::Bridge::UpdateClusterStateResult result;
self->ReplyWithResult(Ydb::StatusIds::SUCCESS, result, self->ActorContext());
const bool success = response.GetStatus() == NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult::OK;
auto notifyDisconnectFinished = [&]() {
if (IsDisconnectingRequest) {
self->ActorContext().Send(NKikimr::MakeGRpcServersManagerId(self->SelfId().NodeId()),
new NKikimr::TEvGRpcServersManager::TEvDisconnectRequestFinished());
}
};

if (!success) {
notifyDisconnectFinished();
self->Reply(Ydb::StatusIds::INTERNAL_ERROR, response.GetErrorReason(),
NKikimrIssues::TIssuesIds::DEFAULT_ERROR, self->ActorContext());
return;
}
self->Reply(Ydb::StatusIds::INTERNAL_ERROR, response.GetErrorReason(),
NKikimrIssues::TIssuesIds::DEFAULT_ERROR, self->ActorContext());
Ydb::Bridge::UpdateClusterStateResult result;
self->ReplyWithResult(Ydb::StatusIds::SUCCESS, result, self->ActorContext());
notifyDisconnectFinished();
}
};

Expand Down
11 changes: 0 additions & 11 deletions ydb/tests/functional/bridge/test_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,6 @@ def test_failover(self):
self.update_cluster_state(self.bridge_client, updates)
self.wait_for_cluster_state(self.secondary_bridge_client, {"r1": PileState.DISCONNECTED, "r2": PileState.PRIMARY})

updates = [
PileState(pile_name="r1", state=PileState.NOT_SYNCHRONIZED),
]
self.update_cluster_state(self.secondary_bridge_client, updates)
self.wait_for_cluster_state(self.secondary_bridge_client, {"r1": PileState.SYNCHRONIZED, "r2": PileState.PRIMARY}, timeout_seconds=50)

def test_takedown(self):
initial_result = self.get_cluster_state(self.bridge_client)
self.check_states(initial_result, {"r1": PileState.PRIMARY, "r2": PileState.SYNCHRONIZED})
Expand All @@ -44,11 +38,6 @@ def test_takedown(self):
]
self.update_cluster_state(self.bridge_client, updates)
self.wait_for_cluster_state(self.bridge_client, {"r1": PileState.PRIMARY, "r2": PileState.DISCONNECTED})
updates = [
PileState(pile_name="r2", state=PileState.NOT_SYNCHRONIZED),
]
self.update_cluster_state(self.bridge_client, updates)
self.wait_for_cluster_state(self.bridge_client, {"r1": PileState.PRIMARY, "r2": PileState.SYNCHRONIZED}, timeout_seconds=50)

def test_rejoin(self):
initial_result = self.get_cluster_state(self.bridge_client)
Expand Down
Loading