1- #include " event_util.h"
2- #include " logging.h"
3- #include " stream_consumer_remover.h"
41#include " target_table.h"
5- #include " util.h"
62
73#include < ydb/core/base/path.h>
8- #include < ydb/core/scheme/scheme_pathid.h>
9- #include < ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>
10- #include < ydb/library/actors/core/actor_bootstrapped.h>
11- #include < ydb/library/actors/core/hfunc.h>
124
135namespace NKikimr ::NReplication::NController {
146
15- class TTableWorkerRegistar : public TActorBootstrapped <TTableWorkerRegistar> {
16- void Handle (TEvYdbProxy::TEvDescribeTopicResponse::TPtr& ev) {
17- LOG_T (" Handle " << ev->Get ()->ToString ());
18-
19- const auto & result = ev->Get ()->Result ;
20- if (!result.IsSuccess ()) {
21- if (IsRetryableError (result)) {
22- LOG_W (" Error of resolving topic '" << SrcStreamPath << " ': " << ev->Get ()->ToString () << " . Retry." );
23- return Retry ();
24- }
25-
26- LOG_E (" Error of resolving topic '" << SrcStreamPath << " ': " << ev->Get ()->ToString () << " . Stop." );
27- return ; // TODO: hard error
28- }
29-
30- for (const auto & partition : result.GetTopicDescription ().GetPartitions ()) {
31- if (!partition.GetParentPartitionIds ().empty ()) {
32- continue ;
33- }
34-
35- auto ev = MakeRunWorkerEv (
36- ReplicationId, TargetId, Config, partition.GetPartitionId (),
37- ConnectionParams, ConsistencySettings, SrcStreamPath, SrcStreamConsumerName, DstPathId);
38- Send (Parent, std::move (ev));
39- }
40-
41- PassAway ();
42- }
43-
44- void Retry () {
45- LOG_D (" Retry" );
46- Schedule (TDuration::Seconds (10 ), new TEvents::TEvWakeup ());
47- }
48-
49- public:
50- static constexpr NKikimrServices::TActivity::EType ActorActivityType () {
51- return NKikimrServices::TActivity::REPLICATION_CONTROLLER_TABLE_WORKER_REGISTAR;
52- }
53-
54- explicit TTableWorkerRegistar (
55- const TActorId& parent,
56- const TActorId& proxy,
57- const NKikimrReplication::TConnectionParams& connectionParams,
58- const NKikimrReplication::TConsistencySettings& consistencySettings,
59- ui64 rid,
60- ui64 tid,
61- const TString& srcStreamPath,
62- const TString& srcStreamConsumerName,
63- const TPathId& dstPathId,
64- const TReplication::ITarget::IConfig::TPtr& config)
65- : Parent(parent)
66- , YdbProxy(proxy)
67- , ConnectionParams(connectionParams)
68- , ConsistencySettings(consistencySettings)
69- , ReplicationId(rid)
70- , TargetId(tid)
71- , SrcStreamPath(srcStreamPath)
72- , SrcStreamConsumerName(srcStreamConsumerName)
73- , DstPathId(dstPathId)
74- , LogPrefix(" TableWorkerRegistar" , ReplicationId, TargetId)
75- , Config(config)
76- {
77- }
78-
79- void Bootstrap () {
80- Become (&TThis::StateWork);
81- Send (YdbProxy, new TEvYdbProxy::TEvDescribeTopicRequest (SrcStreamPath, {}));
82- }
83-
84- STATEFN (StateWork) {
85- switch (ev->GetTypeRewrite ()) {
86- hFunc (TEvYdbProxy::TEvDescribeTopicResponse, Handle);
87- sFunc (TEvents::TEvWakeup, Bootstrap);
88- sFunc (TEvents::TEvPoison, PassAway);
89- }
90- }
91-
92- private:
93- const TActorId Parent;
94- const TActorId YdbProxy;
95- const NKikimrReplication::TConnectionParams ConnectionParams;
96- const NKikimrReplication::TConsistencySettings ConsistencySettings;
97- const ui64 ReplicationId;
98- const ui64 TargetId;
99- const TString SrcStreamPath;
100- const TString SrcStreamConsumerName;
101- const TPathId DstPathId;
102- const TActorLogPrefix LogPrefix;
103- const TReplication::ITarget::IConfig::TPtr Config;
104-
105- }; // TTableWorkerRegistar
1067
1078TTargetTableBase::TTargetTableBase (TReplication* replication, ETargetKind finalKind,
1089 ui64 id, const IConfig::TPtr& config)
10910 : TTargetWithStream(replication, finalKind, id, config)
11011{
11112}
11213
113- IActor* TTargetTableBase::CreateWorkerRegistar (const TActorContext& ctx) const {
114- auto replication = GetReplication ();
115- const auto & config = replication->GetConfig ();
116- return new TTableWorkerRegistar (ctx.SelfID , replication->GetYdbProxy (),
117- config.GetSrcConnectionParams (), config.GetConsistencySettings (),
118- replication->GetId (), GetId (), BuildStreamPath (), GetStreamConsumerName (), GetDstPathId (), GetConfig ());
119- }
120-
12114TTargetTable::TTargetTable (TReplication* replication, ui64 id, const IConfig::TPtr& config)
12215 : TTargetTableBase(replication, ETargetKind::Table, id, config)
12316{
@@ -140,62 +33,4 @@ TString TTargetIndexTable::BuildStreamPath() const {
14033 return CanonizePath (ChildPath (SplitPath (GetSrcPath ()), {" indexImplTable" , GetStreamName ()}));
14134}
14235
143- TTargetTransfer::TTargetTransfer (TReplication* replication, ui64 id, const IConfig::TPtr& config)
144- : TTargetTableBase(replication, ETargetKind::Transfer, id, config)
145- {
146- }
147-
148- void TTargetTransfer::UpdateConfig (const NKikimrReplication::TReplicationConfig& cfg) {
149- auto & t = cfg.GetTransferSpecific ().GetTargets (0 );
150- Config = std::make_shared<TTargetTransfer::TTransferConfig>(
151- GetConfig ()->GetSrcPath (),
152- GetConfig ()->GetDstPath (),
153- t.GetTransformLambda ());
154- }
155-
156- void TTargetTransfer::Progress (const TActorContext& ctx) {
157- auto replication = GetReplication ();
158-
159- switch (GetStreamState ()) {
160- case EStreamState::Removing:
161- if (GetWorkers ()) {
162- RemoveWorkers (ctx);
163- } else if (!StreamConsumerRemover) {
164- StreamConsumerRemover = ctx.Register (CreateStreamConsumerRemover (replication, GetId (), ctx));
165- }
166- return ;
167- case EStreamState::Creating:
168- case EStreamState::Ready:
169- case EStreamState::Removed:
170- case EStreamState::Error:
171- break ;
172- }
173-
174- TTargetWithStream::Progress (ctx);
175- }
176-
177- void TTargetTransfer::Shutdown (const TActorContext& ctx) {
178- for (auto * x : TVector<TActorId*>{&StreamConsumerRemover}) {
179- if (auto actorId = std::exchange (*x, {})) {
180- ctx.Send (actorId, new TEvents::TEvPoison ());
181- }
182- }
183-
184- TTargetWithStream::Shutdown (ctx);
185- }
186-
187- TString TTargetTransfer::BuildStreamPath () const {
188- return CanonizePath (GetSrcPath ());
189- }
190-
191- TTargetTransfer::TTransferConfig::TTransferConfig (const TString& srcPath, const TString& dstPath, const TString& transformLambda)
192- : TConfigBase(ETargetKind::Transfer, srcPath, dstPath)
193- , TransformLambda(transformLambda)
194- {
195- }
196-
197- const TString& TTargetTransfer::TTransferConfig::GetTransformLambda () const {
198- return TransformLambda;
199- }
200-
20136}
0 commit comments