9
9
#include < ydb/library/actors/core/actor_bootstrapped.h>
10
10
#include < ydb/library/actors/core/hfunc.h>
11
11
#include < ydb/library/actors/protos/actors.pb.h>
12
+ #include < ydb/library/logger/actor.h>
12
13
13
14
#include < ydb/core/base/path.h>
14
15
#include < ydb/core/protos/config.pb.h>
15
16
17
+ #include < memory>
18
+
16
19
namespace NFq {
17
20
18
21
using namespace NActors ;
@@ -83,7 +86,11 @@ struct TLeaderElectionMetrics {
83
86
::NMonitoring::TDynamicCounters::TCounterPtr LeaderChanged;
84
87
};
85
88
86
- class TLeaderElection : public TActorBootstrapped <TLeaderElection> {
89
+ struct TActorSystemPtrMixin {
90
+ NKikimr::TDeferredActorLogBackend::TSharedAtomicActorSystemPtr ActorSystemPtr = std::make_shared<NKikimr::TDeferredActorLogBackend::TAtomicActorSystemPtr>(nullptr );
91
+ };
92
+
93
+ class TLeaderElection : public TActorBootstrapped <TLeaderElection>, public TActorSystemPtrMixin {
87
94
88
95
enum class EState {
89
96
Init,
@@ -93,8 +100,8 @@ class TLeaderElection: public TActorBootstrapped<TLeaderElection> {
93
100
Started
94
101
};
95
102
NKikimrConfig::TSharedReadingConfig::TCoordinatorConfig Config;
96
- const NKikimr::TYdbCredentialsProviderFactory& CredentialsProviderFactory;
97
- NYdb::TDriver Driver;
103
+ NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
104
+ std::unique_ptr< NYdb::TDriver> Driver;
98
105
TYdbConnectionPtr YdbConnection;
99
106
TString TablePathPrefix;
100
107
const TString TenantId;
@@ -165,20 +172,19 @@ class TLeaderElection: public TActorBootstrapped<TLeaderElection> {
165
172
void ProcessState ();
166
173
void ResetState ();
167
174
void SetTimeout ();
175
+ NYdb::TDriverConfig GetYdbDriverConfig () const ;
168
176
};
169
177
170
178
TLeaderElection::TLeaderElection (
171
179
NActors::TActorId parentId,
172
180
NActors::TActorId coordinatorId,
173
181
const NKikimrConfig::TSharedReadingConfig::TCoordinatorConfig& config,
174
182
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
175
- NYdb::TDriver driver,
183
+ NYdb::TDriver /* driver*/ ,
176
184
const TString& tenant,
177
185
const ::NMonitoring::TDynamicCounterPtr& counters)
178
186
: Config(config)
179
187
, CredentialsProviderFactory(credentialsProviderFactory)
180
- , Driver(driver)
181
- , YdbConnection(config.GetLocalMode() ? nullptr : NewYdbConnection(config.GetDatabase(), credentialsProviderFactory, Driver))
182
188
, TablePathPrefix(JoinPath(config.GetDatabase().GetDatabase(), config.GetCoordinationNodePath()))
183
189
, TenantId(JoinSeq(" :" , NKikimr::SplitPath(tenant)))
184
190
, CoordinationNodePath(JoinPath(TablePathPrefix, TenantId))
@@ -218,13 +224,19 @@ TYdbSdkRetryPolicy::TPtr MakeSchemaRetryPolicy() {
218
224
219
225
void TLeaderElection::Bootstrap () {
220
226
Become (&TLeaderElection::StateFunc);
227
+ Y_ABORT_UNLESS (!ActorSystemPtr->load (std::memory_order_relaxed), " Double ActorSystemPtr init" );
228
+ ActorSystemPtr->store (TActivationContext::ActorSystem (), std::memory_order_relaxed);
229
+
221
230
LogPrefix = " TLeaderElection " + SelfId ().ToString () + " " ;
222
231
LOG_ROW_DISPATCHER_DEBUG (" Successfully bootstrapped, local coordinator id " << CoordinatorId.ToString ()
223
232
<< " , tenant id " << TenantId << " , local mode " << Config.GetLocalMode () << " , coordination node path " << CoordinationNodePath);
224
233
if (Config.GetLocalMode ()) {
225
234
TActivationContext::ActorSystem ()->Send (ParentId, new NFq::TEvRowDispatcher::TEvCoordinatorChanged (CoordinatorId, 0 ));
226
235
return ;
227
236
}
237
+
238
+ Driver = std::make_unique<NYdb::TDriver>(GetYdbDriverConfig ());
239
+ YdbConnection = NewYdbConnection (Config.GetDatabase (), CredentialsProviderFactory, *Driver);
228
240
ProcessState ();
229
241
}
230
242
@@ -469,6 +481,13 @@ void TLeaderElection::HandleException(const std::exception& e) {
469
481
ResetState ();
470
482
}
471
483
484
+ NYdb::TDriverConfig TLeaderElection::GetYdbDriverConfig () const {
485
+ NYdb::TDriverConfig cfg;
486
+ cfg.SetDiscoveryMode (NYdb::EDiscoveryMode::Async);
487
+ cfg.SetLog (std::make_unique<NKikimr::TDeferredActorLogBackend>(ActorSystemPtr, NKikimrServices::EServiceKikimr::YDB_SDK));
488
+ return cfg;
489
+ }
490
+
472
491
} // anonymous namespace
473
492
474
493
// //////////////////////////////////////////////////////////////////////////////
0 commit comments