Skip to content

Commit 9b32775

Browse files
authored
Stop gRPC server when in DISCONNECTED state (ydb-platform#24939)
1 parent 26e567f commit 9b32775

File tree

2 files changed

+128
-52
lines changed

2 files changed

+128
-52
lines changed

ydb/core/driver_lib/run/run.cpp

Lines changed: 121 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,108 @@
169169

170170
#include <ydb/core/tracing/tablet_info.h>
171171

172+
#include <ydb/core/blobstorage/base/blobstorage_events.h>
173+
172174
namespace NKikimr {
173175

176+
class TGRpcServersManager : public TActorBootstrapped<TGRpcServersManager> {
177+
TGRpcServersFactory GRpcServersFactory;
178+
TGRpcServers GRpcServers;
179+
TIntrusivePtr<NMemory::IProcessMemoryInfoProvider> ProcessMemoryInfoProvider;
180+
181+
public:
182+
enum {
183+
EvStop = EventSpaceBegin(TEvents::ES_PRIVATE),
184+
};
185+
186+
struct TEvStop : TEventLocal<TEvStop, EvStop> {
187+
TManualEvent *Event;
188+
189+
TEvStop(TManualEvent *event)
190+
: Event(event)
191+
{}
192+
};
193+
194+
public:
195+
TGRpcServersManager(TGRpcServersFactory grpcServersFactory,
196+
TIntrusivePtr<NMemory::IProcessMemoryInfoProvider> processMemoryInfoProvider)
197+
: GRpcServersFactory(std::move(grpcServersFactory))
198+
, ProcessMemoryInfoProvider(std::move(processMemoryInfoProvider))
199+
{}
200+
201+
void Bootstrap() {
202+
Become(&TThis::StateFunc);
203+
Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvNodeWardenQueryStorageConfig(true));
204+
Start();
205+
}
206+
207+
void Handle(TEvNodeWardenStorageConfig::TPtr ev) {
208+
if (const auto& bridgeInfo = ev->Get()->BridgeInfo) {
209+
if (NBridge::PileStateTraits(bridgeInfo->SelfNodePile->State).RequiresConfigQuorum) {
210+
Start();
211+
} else {
212+
Stop();
213+
}
214+
}
215+
}
216+
217+
void Start() {
218+
if (GRpcServers) {
219+
return;
220+
}
221+
GRpcServers = GRpcServersFactory();
222+
for (auto& [name, server] : GRpcServers) {
223+
if (!server) {
224+
continue;
225+
}
226+
227+
server->Start();
228+
229+
TString endpoint;
230+
if (server->GetHost() != "[::]") {
231+
endpoint = server->GetHost();
232+
}
233+
234+
endpoint += Sprintf(":%d", server->GetPort());
235+
236+
Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId()),
237+
new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateAddEndpoint(name, endpoint));
238+
239+
if (ProcessMemoryInfoProvider) {
240+
auto memInfo = ProcessMemoryInfoProvider->Get();
241+
NKikimrWhiteboard::TSystemStateInfo systemStateInfo;
242+
if (memInfo.CGroupLimit) {
243+
systemStateInfo.SetMemoryLimit(*memInfo.CGroupLimit);
244+
} else if (memInfo.MemTotal) {
245+
systemStateInfo.SetMemoryLimit(*memInfo.MemTotal);
246+
}
247+
Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId()),
248+
new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateUpdate(systemStateInfo));
249+
}
250+
}
251+
}
252+
253+
void Stop() {
254+
for (auto& [name, server] : GRpcServers) {
255+
if (server) {
256+
server->Stop();
257+
}
258+
}
259+
GRpcServers.clear();
260+
}
261+
262+
void HandleStop(TEvStop::TPtr ev) {
263+
Stop();
264+
ev->Get()->Event->Signal();
265+
PassAway();
266+
}
267+
268+
STRICT_STFUNC(StateFunc,
269+
hFunc(TEvNodeWardenStorageConfig, Handle)
270+
hFunc(TEvStop, HandleStop)
271+
)
272+
};
273+
174274
class TDomainsInitializer : public IAppDataInitializer {
175275
const NKikimrConfig::TAppConfig& Config;
176276

@@ -432,15 +532,8 @@ TKikimrRunner::TKikimrRunner(std::shared_ptr<TModuleFactories> factories)
432532
}
433533

434534
TKikimrRunner::~TKikimrRunner() {
435-
if (!!ActorSystem) {
436-
// Stop ActorSystem first, so no one actor can call any grpc stuff.
535+
if (ActorSystem) {
437536
ActorSystem->Stop();
438-
// After that stop sending any requests to actors
439-
// by destroing grpc subsystem.
440-
for (auto& serv : GRpcServers) {
441-
serv.second.Destroy();
442-
}
443-
444537
ActorSystem.Destroy();
445538
}
446539
}
@@ -552,8 +645,14 @@ void TKikimrRunner::InitializeKqpController(const TKikimrRunConfig& runConfig) {
552645
}
553646

554647
void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
648+
GRpcServersFactory = [runConfig, this] { return CreateGRpcServers(runConfig); };
649+
}
650+
651+
TGRpcServers TKikimrRunner::CreateGRpcServers(const TKikimrRunConfig& runConfig) {
555652
const auto& appConfig = runConfig.AppConfig;
556653

654+
TGRpcServers grpcServers;
655+
557656
auto fillFn = [&](const NKikimrConfig::TGRpcConfig& grpcConfig, NYdbGrpc::TGRpcServer& server, NYdbGrpc::TServerOptions& opts) {
558657
const auto& services = grpcConfig.GetServices();
559658
const auto& rlServicesEnabled = grpcConfig.GetRatelimiterServicesEnabled();
@@ -1046,15 +1145,15 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
10461145
sslData.DoRequestClientCertificate = appConfig.GetClientCertificateAuthorization().GetRequestClientCertificate();
10471146
sslOpts.SetSslData(sslData);
10481147

1049-
GRpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(sslOpts, Counters) });
1148+
grpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(sslOpts, Counters) });
10501149

1051-
fillFn(grpcConfig, *GRpcServers.back().second, sslOpts);
1150+
fillFn(grpcConfig, *grpcServers.back().second, sslOpts);
10521151
}
10531152

10541153
if (grpcConfig.GetPort()) {
1055-
GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(opts, Counters) });
1154+
grpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(opts, Counters) });
10561155

1057-
fillFn(grpcConfig, *GRpcServers.back().second, opts);
1156+
fillFn(grpcConfig, *grpcServers.back().second, opts);
10581157
}
10591158

10601159
for (auto &ex : grpcConfig.GetExtEndpoints()) {
@@ -1069,8 +1168,8 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
10691168
xopts.SetEndpointId(ex.GetEndpointId());
10701169
}
10711170

1072-
GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(xopts, Counters) });
1073-
fillFn(ex, *GRpcServers.back().second, xopts);
1171+
grpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(xopts, Counters) });
1172+
fillFn(ex, *grpcServers.back().second, xopts);
10741173
}
10751174

10761175
if (ex.HasSslPort() && ex.GetSslPort()) {
@@ -1108,11 +1207,13 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
11081207
Y_ABORT_UNLESS(xopts.SslData->Cert, "Cert not set");
11091208
Y_ABORT_UNLESS(xopts.SslData->Key, "Key not set");
11101209

1111-
GRpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(xopts, Counters) });
1112-
fillFn(ex, *GRpcServers.back().second, xopts);
1210+
grpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(xopts, Counters) });
1211+
fillFn(ex, *grpcServers.back().second, xopts);
11131212
}
11141213
}
11151214
}
1215+
1216+
return grpcServers;
11161217
}
11171218

11181219
void TKikimrRunner::InitializeAllocator(const TKikimrRunConfig& runConfig) {
@@ -1889,30 +1990,7 @@ void TKikimrRunner::KikimrStart() {
18891990
Monitoring->Start(ActorSystem.Get());
18901991
}
18911992

1892-
for (auto& server : GRpcServers) {
1893-
if (server.second) {
1894-
server.second->Start();
1895-
1896-
TString endpoint;
1897-
if (server.second->GetHost() != "[::]") {
1898-
endpoint = server.second->GetHost();
1899-
}
1900-
endpoint += Sprintf(":%d", server.second->GetPort());
1901-
ActorSystem->Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(ActorSystem->NodeId),
1902-
new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateAddEndpoint(server.first, endpoint));
1903-
if (ProcessMemoryInfoProvider) {
1904-
auto memInfo = ProcessMemoryInfoProvider->Get();
1905-
NKikimrWhiteboard::TSystemStateInfo systemStateInfo;
1906-
if (memInfo.CGroupLimit) {
1907-
systemStateInfo.SetMemoryLimit(*memInfo.CGroupLimit);
1908-
} else if (memInfo.MemTotal) {
1909-
systemStateInfo.SetMemoryLimit(*memInfo.MemTotal);
1910-
}
1911-
ActorSystem->Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(ActorSystem->NodeId),
1912-
new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateUpdate(systemStateInfo));
1913-
}
1914-
}
1915-
}
1993+
GRpcServersManager = ActorSystem->Register(new TGRpcServersManager(std::move(GRpcServersFactory), ProcessMemoryInfoProvider));
19161994

19171995
if (SqsHttp) {
19181996
SqsHttp->Start();
@@ -2017,20 +2095,14 @@ void TKikimrRunner::KikimrStop(bool graceful) {
20172095
}
20182096

20192097
// stop processing grpc requests/response - we must stop feeding ActorSystem
2020-
for (auto& server : GRpcServers) {
2021-
if (server.second) {
2022-
server.second->Stop();
2023-
}
2024-
}
2098+
TManualEvent event;
2099+
ActorSystem->Send(new IEventHandle(GRpcServersManager, {}, new TGRpcServersManager::TEvStop(&event)));
2100+
event.WaitI();
20252101

20262102
if (ActorSystem) {
20272103
ActorSystem->Stop();
20282104
}
20292105

2030-
for (auto& server : GRpcServers) {
2031-
server.second.Destroy();
2032-
}
2033-
20342106
if (YqSharedResources) {
20352107
YqSharedResources->Stop();
20362108
}
@@ -2068,7 +2140,6 @@ void TKikimrRunner::OnTerminate(int) {
20682140
KikimrShouldContinue.ShouldStop(0);
20692141
}
20702142

2071-
20722143
void TKikimrRunner::SetSignalHandlers() {
20732144
#ifdef _unix_
20742145
signal(SIGPIPE, SIG_IGN);

ydb/core/driver_lib/run/run.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626

2727
namespace NKikimr {
2828

29+
using TGRpcServers = TVector<std::pair<TString, TAutoPtr<NYdbGrpc::TGRpcServer>>>;
30+
using TGRpcServersFactory = std::function<TGRpcServers()>;
31+
2932
class TKikimrRunner : public virtual TThrRefBase, private IGlobalObjectStorage {
3033
protected:
3134
static TProgramShouldContinue KikimrShouldContinue;
@@ -56,8 +59,6 @@ class TKikimrRunner : public virtual TThrRefBase, private IGlobalObjectStorage {
5659
TIntrusivePtr<NInterconnect::TPollerThreads> PollerThreads;
5760
TAutoPtr<TAppData> AppData;
5861

59-
TVector<std::pair<TString, TAutoPtr<NYdbGrpc::TGRpcServer>>> GRpcServers;
60-
6162
TIntrusivePtr<NActors::NLog::TSettings> LogSettings;
6263
std::shared_ptr<TLogBackend> LogBackend;
6364
TAutoPtr<TActorSystem> ActorSystem;
@@ -68,6 +69,9 @@ class TKikimrRunner : public virtual TThrRefBase, private IGlobalObjectStorage {
6869

6970
TKikimrRunner(std::shared_ptr<TModuleFactories> factories = {});
7071

72+
TGRpcServersFactory GRpcServersFactory;
73+
TActorId GRpcServersManager;
74+
7175
virtual ~TKikimrRunner();
7276

7377
virtual void InitializeRegistries(const TKikimrRunConfig& runConfig);
@@ -85,6 +89,7 @@ class TKikimrRunner : public virtual TThrRefBase, private IGlobalObjectStorage {
8589
void InitializeMonitoringLogin(const TKikimrRunConfig& runConfig);
8690

8791
void InitializeGRpc(const TKikimrRunConfig& runConfig);
92+
TGRpcServers CreateGRpcServers(const TKikimrRunConfig& runConfig);
8893

8994
void InitializeKqpController(const TKikimrRunConfig& runConfig);
9095

0 commit comments

Comments
 (0)