@@ -26,23 +26,48 @@ class TIncrementalRestoreScan
2626 using TBuffer = NStreamScan::TBuffer;
2727 using TChange = IDataShardChangeCollector::TChange;
2828
29+ TStringBuf GetLogPrefix () const {
30+ if (!LogPrefix) {
31+ LogPrefix = TStringBuilder ()
32+ << " [TIncrementalRestoreScan]"
33+ << " [" << TxId << " ]"
34+ << " [" << SourcePathId << " ]"
35+ << " [" << SourcePathId << " ]"
36+ << SelfId () /* contains brackets */ << " " ;
37+ }
38+
39+ return LogPrefix.GetRef ();
40+ }
2941public:
3042 explicit TIncrementalRestoreScan (
31- TActorId parent,
32- std::function<IActor*()> changeSenderFactory,
33- ui64 txId,
34- const TPathId& tablePathId,
35- const TPathId& targetPathId)
43+ TActorId parent,
44+ std::function<IActor*()> changeSenderFactory,
45+ ui64 txId,
46+ const TPathId& sourcePathId,
47+ TUserTable::TCPtr table,
48+ const TPathId& targetPathId)
3649 : IActorCallback(static_cast <TReceiveFunc>(&TIncrementalRestoreScan::StateWork), NKikimrServices::TActivity::CDC_STREAM_SCAN_ACTOR)
3750 , Parent(parent)
3851 , ChangeSenderFactory(changeSenderFactory)
3952 , TxId(txId)
40- , TablePathId(tablePathId )
53+ , SourcePathId(sourcePathId )
4154 , TargetPathId(targetPathId)
55+ , ValueTags(InitValueTags(table))
4256 , Limits()
43- // , ValueTags(InitValueTags(self, tablePathId))
4457 {}
4558
59+ static TVector<TTag> InitValueTags (TUserTable::TCPtr table) {
60+ TVector<TTag> valueTags;
61+ valueTags.reserve (table->Columns .size () - 1 );
62+ for (const auto & [tag, column] : table->Columns ) {
63+ if (!column.IsKey ) {
64+ valueTags.push_back (tag);
65+ }
66+ }
67+
68+ return valueTags;
69+ }
70+
4671 static constexpr NKikimrServices::TActivity::EType ActorActivityType () {
4772 return NKikimrServices::TActivity::INCREMENTAL_RESTORE_SCAN_ACTOR;
4873 }
@@ -57,12 +82,14 @@ class TIncrementalRestoreScan
5782 IActorCallback::PassAway ();
5883 }
5984
60- void Start (TEvents::TEvWakeup::TPtr&) {
85+ void Start (TEvents::TEvWakeup::TPtr& ev) {
86+ LOG_D (" Handle TEvents::TEvWakeup " << ev->Get ()->ToString ());
87+
6188 Driver->Touch (EScan::Feed);
6289 }
6390
6491 void Handle (NChangeExchange::TEvChangeExchange::TEvRequestRecords::TPtr& ev) {
65- // LOG_D("Handltypename e " << ev->Get()->ToString());
92+ LOG_D (" Handle TEvChangeExchange::TEvRequestRecords " << ev->Get ()->ToString ());
6693
6794 TVector<TChangeRecord::TPtr> records (::Reserve (ev->Get ()->Records .size ()));
6895
@@ -77,14 +104,16 @@ class TIncrementalRestoreScan
77104 }
78105
79106 void Handle (NChangeExchange::TEvChangeExchange::TEvRemoveRecords::TPtr& ev) {
80- // LOG_D("Handltypename e " << ev->Get()->ToString());
107+ LOG_D (" Handle TEvChangeExchange::TEvRemoveRecords " << ev->Get ()->ToString ());
81108
82109 for (auto recordId : ev->Get ()->Records ) {
83110 PendingRecords.erase (recordId);
84111 }
85112 }
86113
87- void Handle (TEvIncrementalRestoreScan::TEvFinished::TPtr&) {
114+ void Handle (TEvIncrementalRestoreScan::TEvFinished::TPtr& ev) {
115+ LOG_D (" Handle TEvIncrementalRestoreScan::TEvFinished " << ev->Get ()->ToString ());
116+
88117 Driver->Touch (EScan::Final);
89118 }
90119
@@ -106,6 +135,8 @@ class TIncrementalRestoreScan
106135 }
107136
108137 EScan Seek (TLead& lead, ui64) noexcept override {
138+ LOG_D (" Seek" );
139+
109140 if (LastKey) {
110141 lead.To (ValueTags, LastKey->GetCells (), ESeek::Upper);
111142 } else {
@@ -116,6 +147,7 @@ class TIncrementalRestoreScan
116147 }
117148
118149 EScan Feed (TArrayRef<const TCell> key, const TRow& row) noexcept override {
150+
119151 Buffer.AddRow (key, *row);
120152 if (Buffer.Bytes () < Limits.BatchMaxBytes ) {
121153 if (Buffer.Rows () < Limits.BatchMaxRows ) {
@@ -132,22 +164,25 @@ class TIncrementalRestoreScan
132164 }
133165
134166 EScan Exhausted () noexcept override {
167+ LOG_D (" Exhausted" );
168+
135169 NoMoreData = true ;
136170
137171 if (!Buffer) {
172+ Send (ChangeSender, new TEvIncrementalRestoreScan::TEvNoMoreData ());
138173 return EScan::Sleep;
139174 }
140175
141176 return Progress ();
142177 }
143178
144179 TAutoPtr<IDestructable> Finish (EAbort abort) noexcept override {
145- Send (Parent, new TEvIncrementalRestoreScan::TEvFinished{} );
180+ LOG_D ( " Finish " << abort );
146181
147182 if (abort != EAbort::None) {
148- // Reply(NKikimrTxDataShard::TEvCdcStreamScanResponse::ABORTED );
183+ Send (Parent, new TEvIncrementalRestoreScan::TEvFinished{} );
149184 } else {
150- // Reply(NKikimrTxDataShard::TEvCdcStreamScanResponse::DONE );
185+ Send (Parent, new TEvIncrementalRestoreScan::TEvFinished{} );
151186 }
152187
153188 PassAway ();
@@ -157,27 +192,17 @@ class TIncrementalRestoreScan
157192 void Describe (IOutputStream& o) const noexcept override {
158193 o << " IncrRestoreScan {"
159194 << " TxId: " << TxId
160- << " TablePathId : " << TablePathId
195+ << " SourcePathId : " << SourcePathId
161196 << " TargetPathId: " << TargetPathId
162197 << " }" ;
163198 }
164199
165200 EScan Progress () {
166- // Stats.RowsProcessed += Buffer.Rows();
167- // Stats.BytesProcessed += Buffer.Bytes();
168-
169- // auto& ctx = TlsActivationContext->AsActorContext();
170- // auto TabletID = [&]() { return DataShard.TabletId; };
171- // LOG_D("IncrRestore@Progress()"
172- // << ": Buffer.Rows()# " << Buffer.Rows());
173-
174201 auto rows = Buffer.Flush ();
175202 TVector<TChange> changeRecords;
176203 TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> records;
177204
178205 for (auto & [k, v] : rows) {
179- // LOG_D("IncrRestore@Progress()#iter"
180- // << ": k.GetCells().size()# " << k.GetCells().size() << ", v.GetCells().size()# " << v.GetCells().size());
181206 const auto key = NStreamScan::MakeKey (k.GetCells (), KeyColumnTypes);
182207 const auto & keyTags = KeyColumnIds;
183208 NKikimrChangeExchange::TDataChange body;
@@ -186,11 +211,11 @@ class TIncrementalRestoreScan
186211 } else {
187212 Serialize (body, ERowOp::Erase, key, keyTags, {});
188213 }
189- auto recordPtr = TChangeRecordBuilder (TChangeRecord::EKind::AsyncIndex )
214+ auto recordPtr = TChangeRecordBuilder (TChangeRecord::EKind::IncrementalRestore )
190215 .WithOrder (++Order)
191216 .WithGroup (0 )
192217 .WithPathId (TargetPathId)
193- .WithTableId (TablePathId )
218+ .WithTableId (SourcePathId )
194219 // .WithSchemaVersion(ReadVersion) // TODO(use SchemaVersion)
195220 .WithBody (body.SerializeAsString ())
196221 .WithSource (TChangeRecord::ESource::InitialScan)
@@ -209,19 +234,22 @@ class TIncrementalRestoreScan
209234 return EScan::Sleep;
210235 }
211236
237+ // TODO also limit on PendingRecords contents to keep memory usage in reasonable limits
212238 return EScan::Feed;
213239 }
240+
214241private:
215- TActorId Parent;
216- std::function<IActor*()> ChangeSenderFactory;
242+ const TActorId Parent;
243+ const std::function<IActor*()> ChangeSenderFactory;
217244 const ui64 TxId;
218- const TPathId TablePathId ;
245+ const TPathId SourcePathId ;
219246 const TPathId TargetPathId;
220247 const TVector<TTag> ValueTags;
221248 const TMaybe<TSerializedCellVec> LastKey;
222249 const TLimits Limits;
250+ mutable TMaybe<TString> LogPrefix;
223251 IDriver* Driver;
224- bool NoMoreData;
252+ bool NoMoreData = false ;
225253 TBuffer Buffer;
226254 ui64 Order = 0 ;
227255 TActorId ChangeSender;
@@ -235,15 +263,17 @@ class TIncrementalRestoreScan
235263THolder<NTable::IScan> CreateIncrementalRestoreScan (
236264 NActors::TActorId parent,
237265 std::function<NActors::IActor*()> changeSenderFactory,
238- TPathId tablePathId,
266+ TPathId sourcePathId,
267+ TUserTable::TCPtr table,
239268 const TPathId& targetPathId,
240269 ui64 txId)
241270{
242271 return MakeHolder<TIncrementalRestoreScan>(
243272 parent,
244273 changeSenderFactory,
245274 txId,
246- tablePathId,
275+ sourcePathId,
276+ table,
247277 targetPathId);
248278}
249279
0 commit comments