Skip to content

Commit ccd3e1e

Browse files
kruallmregrock
authored andcommitted
Fix dublicates messages after stopping ydbd (#26014)
1 parent 67a0e68 commit ccd3e1e

File tree

2 files changed

+68
-30
lines changed

2 files changed

+68
-30
lines changed

ydb/core/driver_lib/run/run.cpp

Lines changed: 57 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -173,10 +173,29 @@
173173

174174
namespace NKikimr {
175175

176+
namespace {
177+
178+
void StopGRpcServers(std::weak_ptr<TGRpcServersWrapper> grpcServersWrapper) {
179+
auto wrapper = grpcServersWrapper.lock();
180+
if (!wrapper) {
181+
return;
182+
}
183+
TGuard<TMutex> guard = wrapper->Guard();
184+
for (auto& [name, server] : wrapper->Servers) {
185+
if (!server) {
186+
continue;
187+
}
188+
server->Stop();
189+
}
190+
wrapper->Servers.clear();
191+
}
192+
193+
}
194+
176195
class TGRpcServersManager : public TActorBootstrapped<TGRpcServersManager> {
177-
TGRpcServersFactory GRpcServersFactory;
178-
TGRpcServers GRpcServers;
196+
std::weak_ptr<TGRpcServersWrapper> GRpcServersWrapper;
179197
TIntrusivePtr<NMemory::IProcessMemoryInfoProvider> ProcessMemoryInfoProvider;
198+
bool Started = false;
180199

181200
public:
182201
enum {
@@ -192,9 +211,9 @@ class TGRpcServersManager : public TActorBootstrapped<TGRpcServersManager> {
192211
};
193212

194213
public:
195-
TGRpcServersManager(TGRpcServersFactory grpcServersFactory,
214+
TGRpcServersManager(std::weak_ptr<TGRpcServersWrapper> grpcServersWrapper,
196215
TIntrusivePtr<NMemory::IProcessMemoryInfoProvider> processMemoryInfoProvider)
197-
: GRpcServersFactory(std::move(grpcServersFactory))
216+
: GRpcServersWrapper(std::move(grpcServersWrapper))
198217
, ProcessMemoryInfoProvider(std::move(processMemoryInfoProvider))
199218
{}
200219

@@ -215,11 +234,17 @@ class TGRpcServersManager : public TActorBootstrapped<TGRpcServersManager> {
215234
}
216235

217236
void Start() {
218-
if (GRpcServers) {
237+
if (Started) {
238+
return;
239+
}
240+
Started = true;
241+
auto wrapper = GRpcServersWrapper.lock();
242+
if (!wrapper) {
219243
return;
220244
}
221-
GRpcServers = GRpcServersFactory();
222-
for (auto& [name, server] : GRpcServers) {
245+
TGuard<TMutex> guard = wrapper->Guard();
246+
wrapper->Servers = wrapper->GrpcServersFactory();
247+
for (auto& [name, server] : wrapper->Servers) {
223248
if (!server) {
224249
continue;
225250
}
@@ -251,12 +276,11 @@ class TGRpcServersManager : public TActorBootstrapped<TGRpcServersManager> {
251276
}
252277

253278
void Stop() {
254-
for (auto& [name, server] : GRpcServers) {
255-
if (server) {
256-
server->Stop();
257-
}
279+
if (!Started) {
280+
return;
258281
}
259-
GRpcServers.clear();
282+
Started = false;
283+
StopGRpcServers(GRpcServersWrapper);
260284
}
261285

262286
void HandleStop(TEvStop::TPtr ev) {
@@ -645,7 +669,10 @@ void TKikimrRunner::InitializeKqpController(const TKikimrRunConfig& runConfig) {
645669
}
646670

647671
void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
648-
GRpcServersFactory = [runConfig, this] { return CreateGRpcServers(runConfig); };
672+
if (!GRpcServersWrapper) {
673+
GRpcServersWrapper = std::make_shared<TGRpcServersWrapper>();
674+
}
675+
GRpcServersWrapper->GrpcServersFactory = [runConfig, this] { return CreateGRpcServers(runConfig); };
649676
}
650677

651678
TGRpcServers TKikimrRunner::CreateGRpcServers(const TKikimrRunConfig& runConfig) {
@@ -1990,8 +2017,9 @@ void TKikimrRunner::KikimrStart() {
19902017
Monitoring->Start(ActorSystem.Get());
19912018
}
19922019

1993-
if (GRpcServersFactory) {
1994-
GRpcServersManager = ActorSystem->Register(new TGRpcServersManager(std::move(GRpcServersFactory), ProcessMemoryInfoProvider));
2020+
if (GRpcServersWrapper) {
2021+
GRpcServersWrapper->Servers = GRpcServersWrapper->GrpcServersFactory();
2022+
GRpcServersManager = ActorSystem->Register(new TGRpcServersManager(GRpcServersWrapper, ProcessMemoryInfoProvider));
19952023
}
19962024

19972025
if (SqsHttp) {
@@ -2096,23 +2124,18 @@ void TKikimrRunner::KikimrStop(bool graceful) {
20962124
SqsHttp.Destroy();
20972125
}
20982126

2099-
// stop processing grpc requests/response - we must stop feeding ActorSystem
2100-
if (GRpcServersManager) {
2101-
TManualEvent event;
2102-
ActorSystem->Send(new IEventHandle(GRpcServersManager, {}, new TGRpcServersManager::TEvStop(&event)));
2103-
event.WaitI();
2104-
}
2105-
21062127
if (ActorSystem) {
21072128
ActorSystem->Stop();
21082129
}
21092130

2110-
if (YqSharedResources) {
2111-
YqSharedResources->Stop();
2131+
// stop processing grpc requests/response - we must stop feeding ActorSystem
2132+
if (GRpcServersManager) {
2133+
StopGRpcServers(GRpcServersWrapper);
2134+
GRpcServersWrapper->Servers.clear();
21122135
}
21132136

2114-
if (ActorSystem) {
2115-
ActorSystem->Cleanup();
2137+
if (YqSharedResources) {
2138+
YqSharedResources->Stop();
21162139
}
21172140

21182141
if (ModuleFactories) {
@@ -2121,12 +2144,17 @@ void TKikimrRunner::KikimrStop(bool graceful) {
21212144
}
21222145
}
21232146

2124-
if (YdbDriver) {
2125-
YdbDriver->Stop(true);
2126-
}
21272147
for (auto plugin: Plugins) {
21282148
plugin->Stop();
21292149
}
2150+
2151+
if (ActorSystem) {
2152+
ActorSystem->Cleanup();
2153+
}
2154+
2155+
if (YdbDriver) {
2156+
YdbDriver->Stop(true);
2157+
}
21302158
}
21312159

21322160
void TKikimrRunner::BusyLoop() {

ydb/core/driver_lib/run/run.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,16 @@ namespace NKikimr {
2929
using TGRpcServers = TVector<std::pair<TString, TAutoPtr<NYdbGrpc::TGRpcServer>>>;
3030
using TGRpcServersFactory = std::function<TGRpcServers()>;
3131

32+
struct TGRpcServersWrapper {
33+
TGRpcServers Servers;
34+
TGRpcServersFactory GrpcServersFactory;
35+
TMutex Mutex;
36+
37+
TGuard<TMutex> Guard() {
38+
return TGuard<TMutex>(Mutex);
39+
}
40+
};
41+
3242
class TKikimrRunner : public virtual TThrRefBase, private IGlobalObjectStorage {
3343
protected:
3444
static TProgramShouldContinue KikimrShouldContinue;
@@ -69,7 +79,7 @@ class TKikimrRunner : public virtual TThrRefBase, private IGlobalObjectStorage {
6979

7080
TKikimrRunner(std::shared_ptr<TModuleFactories> factories = {});
7181

72-
TGRpcServersFactory GRpcServersFactory;
82+
std::shared_ptr<TGRpcServersWrapper> GRpcServersWrapper;
7383
TActorId GRpcServersManager;
7484

7585
virtual ~TKikimrRunner();

0 commit comments

Comments
 (0)