173
173
174
174
namespace NKikimr {
175
175
176
+ namespace {
177
+
178
+ void StopGRpcServers (std::weak_ptr<TGRpcServersWrapper> grpcServersWrapper) {
179
+ auto wrapper = grpcServersWrapper.lock ();
180
+ if (!wrapper) {
181
+ return ;
182
+ }
183
+ TGuard<TMutex> guard = wrapper->Guard ();
184
+ for (auto & [name, server] : wrapper->Servers ) {
185
+ if (!server) {
186
+ continue ;
187
+ }
188
+ server->Stop ();
189
+ }
190
+ wrapper->Servers .clear ();
191
+ }
192
+
193
+ }
194
+
176
195
class TGRpcServersManager : public TActorBootstrapped <TGRpcServersManager> {
177
- TGRpcServersFactory GRpcServersFactory;
178
- TGRpcServers GRpcServers;
196
+ std::weak_ptr<TGRpcServersWrapper> GRpcServersWrapper;
179
197
TIntrusivePtr<NMemory::IProcessMemoryInfoProvider> ProcessMemoryInfoProvider;
198
+ bool Started = false ;
180
199
181
200
public:
182
201
enum {
@@ -192,9 +211,9 @@ class TGRpcServersManager : public TActorBootstrapped<TGRpcServersManager> {
192
211
};
193
212
194
213
public:
195
- TGRpcServersManager (TGRpcServersFactory grpcServersFactory ,
214
+ TGRpcServersManager (std::weak_ptr<TGRpcServersWrapper> grpcServersWrapper ,
196
215
TIntrusivePtr<NMemory::IProcessMemoryInfoProvider> processMemoryInfoProvider)
197
- : GRpcServersFactory (std::move(grpcServersFactory ))
216
+ : GRpcServersWrapper (std::move(grpcServersWrapper ))
198
217
, ProcessMemoryInfoProvider(std::move(processMemoryInfoProvider))
199
218
{}
200
219
@@ -215,11 +234,17 @@ class TGRpcServersManager : public TActorBootstrapped<TGRpcServersManager> {
215
234
}
216
235
217
236
void Start () {
218
- if (GRpcServers) {
237
+ if (Started) {
238
+ return ;
239
+ }
240
+ Started = true ;
241
+ auto wrapper = GRpcServersWrapper.lock ();
242
+ if (!wrapper) {
219
243
return ;
220
244
}
221
- GRpcServers = GRpcServersFactory ();
222
- for (auto & [name, server] : GRpcServers) {
245
+ TGuard<TMutex> guard = wrapper->Guard ();
246
+ wrapper->Servers = wrapper->GrpcServersFactory ();
247
+ for (auto & [name, server] : wrapper->Servers ) {
223
248
if (!server) {
224
249
continue ;
225
250
}
@@ -251,12 +276,11 @@ class TGRpcServersManager : public TActorBootstrapped<TGRpcServersManager> {
251
276
}
252
277
253
278
void Stop () {
254
- for (auto & [name, server] : GRpcServers) {
255
- if (server) {
256
- server->Stop ();
257
- }
279
+ if (!Started) {
280
+ return ;
258
281
}
259
- GRpcServers.clear ();
282
+ Started = false ;
283
+ StopGRpcServers (GRpcServersWrapper);
260
284
}
261
285
262
286
void HandleStop (TEvStop::TPtr ev) {
@@ -645,7 +669,10 @@ void TKikimrRunner::InitializeKqpController(const TKikimrRunConfig& runConfig) {
645
669
}
646
670
647
671
void TKikimrRunner::InitializeGRpc (const TKikimrRunConfig& runConfig) {
648
- GRpcServersFactory = [runConfig, this ] { return CreateGRpcServers (runConfig); };
672
+ if (!GRpcServersWrapper) {
673
+ GRpcServersWrapper = std::make_shared<TGRpcServersWrapper>();
674
+ }
675
+ GRpcServersWrapper->GrpcServersFactory = [runConfig, this ] { return CreateGRpcServers (runConfig); };
649
676
}
650
677
651
678
TGRpcServers TKikimrRunner::CreateGRpcServers (const TKikimrRunConfig& runConfig) {
@@ -1990,8 +2017,9 @@ void TKikimrRunner::KikimrStart() {
1990
2017
Monitoring->Start (ActorSystem.Get ());
1991
2018
}
1992
2019
1993
- if (GRpcServersFactory) {
1994
- GRpcServersManager = ActorSystem->Register (new TGRpcServersManager (std::move (GRpcServersFactory), ProcessMemoryInfoProvider));
2020
+ if (GRpcServersWrapper) {
2021
+ GRpcServersWrapper->Servers = GRpcServersWrapper->GrpcServersFactory ();
2022
+ GRpcServersManager = ActorSystem->Register (new TGRpcServersManager (GRpcServersWrapper, ProcessMemoryInfoProvider));
1995
2023
}
1996
2024
1997
2025
if (SqsHttp) {
@@ -2096,23 +2124,18 @@ void TKikimrRunner::KikimrStop(bool graceful) {
2096
2124
SqsHttp.Destroy ();
2097
2125
}
2098
2126
2099
- // stop processing grpc requests/response - we must stop feeding ActorSystem
2100
- if (GRpcServersManager) {
2101
- TManualEvent event;
2102
- ActorSystem->Send (new IEventHandle (GRpcServersManager, {}, new TGRpcServersManager::TEvStop (&event)));
2103
- event.WaitI ();
2104
- }
2105
-
2106
2127
if (ActorSystem) {
2107
2128
ActorSystem->Stop ();
2108
2129
}
2109
2130
2110
- if (YqSharedResources) {
2111
- YqSharedResources->Stop ();
2131
+ // stop processing grpc requests/response - we must stop feeding ActorSystem
2132
+ if (GRpcServersManager) {
2133
+ StopGRpcServers (GRpcServersWrapper);
2134
+ GRpcServersWrapper->Servers .clear ();
2112
2135
}
2113
2136
2114
- if (ActorSystem ) {
2115
- ActorSystem-> Cleanup ();
2137
+ if (YqSharedResources ) {
2138
+ YqSharedResources-> Stop ();
2116
2139
}
2117
2140
2118
2141
if (ModuleFactories) {
@@ -2121,12 +2144,17 @@ void TKikimrRunner::KikimrStop(bool graceful) {
2121
2144
}
2122
2145
}
2123
2146
2124
- if (YdbDriver) {
2125
- YdbDriver->Stop (true );
2126
- }
2127
2147
for (auto plugin: Plugins) {
2128
2148
plugin->Stop ();
2129
2149
}
2150
+
2151
+ if (ActorSystem) {
2152
+ ActorSystem->Cleanup ();
2153
+ }
2154
+
2155
+ if (YdbDriver) {
2156
+ YdbDriver->Stop (true );
2157
+ }
2130
2158
}
2131
2159
2132
2160
void TKikimrRunner::BusyLoop () {
0 commit comments