Skip to content

Commit bc9c4ee

Browse files
committed
done
1 parent 6ce33f8 commit bc9c4ee

File tree

3 files changed

+38
-23
lines changed

3 files changed

+38
-23
lines changed

ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -117,26 +117,31 @@ Y_UNIT_TEST_SUITE(IncrementalRestoreScan) {
117117
runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE);
118118

119119
TUserTable::TPtr table = new TUserTable;
120-
121-
table->Columns.emplace(0, TUserTable::TUserColumn{});
122-
123120
NTable::TScheme::TTableSchema tableSchema;
124-
tableSchema.Columns[0] = NTable::TColumn("test", 0, {}, "");
121+
table->Columns.emplace(0, TUserTable::TUserColumn(NScheme::TTypeInfo(NScheme::NTypeIds::Uint32), "", "Key", true));
122+
tableSchema.Columns[0] = NTable::TColumn("key", 0, {}, "");
125123
tableSchema.Columns[0].KeyOrder = 0;
124+
125+
table->Columns.emplace(1, TUserTable::TUserColumn(NScheme::TTypeInfo(NScheme::NTypeIds::Bool), "", "__ydb_incrBackupImpl_deleted", false));
126+
tableSchema.Columns[1] = NTable::TColumn("__ydb_incrBackupImpl_deleted", 1, {}, "");
127+
tableSchema.Columns[1].KeyOrder = 1;
128+
126129
auto scheme = NTable::TRowScheme::Make(tableSchema.Columns, NUtil::TSecond());
127130

128-
TPathId targetPathId{};
129-
ui64 txId = 0;
131+
TPathId sourcePathId{1, 2};
132+
TPathId targetPathId{3, 4};
133+
ui64 txId = 1337;
130134

131135
auto* scan = CreateIncrementalRestoreScan(
132136
sender,
133137
[&](const TActorContext&) {
134138
return sender2;
135139
},
136-
TPathId{} /*sourcePathId*/,
140+
sourcePathId,
137141
table,
138142
targetPathId,
139-
txId).Release();
143+
txId,
144+
{}).Release();
140145

141146
TDriverMock driver;
142147

ydb/core/tx/datashard/incr_restore_scan.cpp

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,14 @@
22
#include "change_exchange_impl.h"
33
#include "datashard_impl.h"
44

5-
#include <ydb/library/actors/core/actor.h>
6-
#include <ydb/core/tx/datashard/change_record_body_serializer.h>
7-
#include <ydb/core/tx/datashard/datashard_user_table.h>
8-
#include <ydb/core/tx/datashard/change_record.h>
95
#include <ydb/core/change_exchange/change_exchange.h>
106
#include <ydb/core/tx/datashard/change_collector.h>
11-
#include <ydb/library/services/services.pb.h>
12-
#include <ydb/core/tx/datashard/stream_scan_common.h>
7+
#include <ydb/core/tx/datashard/change_record.h>
8+
#include <ydb/core/tx/datashard/change_record_body_serializer.h>
9+
#include <ydb/core/tx/datashard/datashard_user_table.h>
1310
#include <ydb/core/tx/datashard/incr_restore_helpers.h>
11+
#include <ydb/library/actors/core/actor.h>
12+
#include <ydb/library/services/services.pb.h>
1413

1514
namespace NKikimr::NDataShard {
1615

@@ -45,27 +44,34 @@ class TIncrementalRestoreScan
4544
ui64 txId,
4645
const TPathId& sourcePathId,
4746
TUserTable::TCPtr table,
48-
const TPathId& targetPathId)
49-
: IActorCallback(static_cast<TReceiveFunc>(&TIncrementalRestoreScan::StateWork), NKikimrServices::TActivity::CDC_STREAM_SCAN_ACTOR)
47+
const TPathId& targetPathId,
48+
NStreamScan::TLimits limits)
49+
: IActorCallback(static_cast<TReceiveFunc>(&TIncrementalRestoreScan::StateWork), NKikimrServices::TActivity::INCREMENTAL_RESTORE_SCAN_ACTOR)
5050
, Parent(parent)
5151
, ChangeSenderFactory(changeSenderFactory)
5252
, TxId(txId)
5353
, SourcePathId(sourcePathId)
5454
, TargetPathId(targetPathId)
5555
, ValueTags(InitValueTags(table))
56-
, Limits()
56+
, Limits(limits)
5757
{}
5858

5959
static TVector<TTag> InitValueTags(TUserTable::TCPtr table) {
60-
Y_VERIFY(table->Columns.size() >= 1);
60+
Y_VERIFY(table->Columns.size() >= 2);
6161
TVector<TTag> valueTags;
6262
valueTags.reserve(table->Columns.size() - 1);
63+
bool deletedMarkerColumnFound = false;
6364
for (const auto& [tag, column] : table->Columns) {
6465
if (!column.IsKey) {
6566
valueTags.push_back(tag);
67+
if (column.Name == "__ydb_incrBackupImpl_deleted") {
68+
deletedMarkerColumnFound = true;
69+
}
6670
}
6771
}
6872

73+
Y_VERIFY(deletedMarkerColumnFound);
74+
6975
return valueTags;
7076
}
7177

@@ -267,15 +273,17 @@ THolder<NTable::IScan> CreateIncrementalRestoreScan(
267273
TPathId sourcePathId,
268274
TUserTable::TCPtr table,
269275
const TPathId& targetPathId,
270-
ui64 txId)
276+
ui64 txId,
277+
NStreamScan::TLimits limits)
271278
{
272279
return MakeHolder<TIncrementalRestoreScan>(
273280
parent,
274281
changeSenderFactory,
275282
txId,
276283
sourcePathId,
277284
table,
278-
targetPathId);
285+
targetPathId,
286+
limits);
279287
}
280288

281289
} // namespace NKikimr::NDataShard

ydb/core/tx/datashard/incr_restore_scan.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
#pragma once
22

3+
#include <ydb/core/base/events.h>
34
#include <ydb/core/scheme/scheme_pathid.h>
45
#include <ydb/core/tablet_flat/flat_scan_iface.h>
5-
#include <ydb/library/actors/core/actor.h>
6-
#include <ydb/core/base/events.h>
76
#include <ydb/core/tx/datashard/datashard_user_table.h>
7+
#include <ydb/core/tx/datashard/stream_scan_common.h>
8+
#include <ydb/library/actors/core/actor.h>
89

910
#include <functional>
1011

@@ -32,6 +33,7 @@ THolder<NTable::IScan> CreateIncrementalRestoreScan(
3233
TPathId tablePathId,
3334
TUserTable::TCPtr table,
3435
const TPathId& targetPathId,
35-
ui64 txId);
36+
ui64 txId,
37+
NStreamScan::TLimits limits);
3638

3739
} // namespace NKikimr::NDataShard

0 commit comments

Comments
 (0)