Skip to content

Commit dbc1d70

Browse files
committed
WIP
1 parent 6583b1b commit dbc1d70

File tree

7 files changed

+383
-1
lines changed

7 files changed

+383
-1
lines changed

ydb/core/tx/datashard/change_exchange_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
#include "defs.h"
44
#include "change_exchange_helpers.h"
55

6+
#include <ydb/core/scheme/scheme_tabledefs.h>
7+
68
namespace NKikimr {
79
namespace NDataShard {
810

ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,6 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) {
168168
return proto;
169169
}
170170

171-
172171
Y_UNIT_TEST(SimpleBackup) {
173172
TPortManager portManager;
174173
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#include "incr_restore_scan.h"
2+
3+
#include <library/cpp/testing/unittest/registar.h>
4+
5+
namespace NKikimr {
6+
7+
Y_UNIT_TEST_SUITE(IncrementalRestoreScan) {
8+
Y_UNIT_TEST(Simple) {
9+
10+
}
11+
}
12+
13+
} // namespace NKikimr
Lines changed: 311 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,311 @@
1+
#include "incr_restore_scan.h"
2+
#include "change_exchange_impl.h"
3+
4+
#include <ydb/library/actors/core/actor.h>
5+
#include <ydb/core/tx/datashard/change_record_body_serializer.h>
6+
#include <ydb/core/tx/datashard/datashard_user_table.h>
7+
#include <ydb/core/tx/datashard/change_record.h>
8+
#include <ydb/core/change_exchange/change_exchange.h>
9+
10+
namespace NKikimr::NDataShard {
11+
12+
using namespace NActors;
13+
using namespace NTable;
14+
15+
class TIncrementalRestoreScan
16+
: public IActorCallback
17+
, public NTable::IScan
18+
, protected TChangeRecordBodySerializer
19+
{
20+
struct TLimits {
21+
ui32 BatchMaxBytes;
22+
ui32 BatchMinRows;
23+
ui32 BatchMaxRows;
24+
25+
// TLimits(const NKikimrTxDataShard::TEvCdcStreamScanRequest::TLimits& proto)
26+
// : BatchMaxBytes(proto.GetBatchMaxBytes())
27+
// , BatchMinRows(proto.GetBatchMinRows())
28+
// , BatchMaxRows(proto.GetBatchMaxRows())
29+
// {
30+
// }
31+
};
32+
33+
class TBuffer {
34+
public:
35+
void AddRow(TArrayRef<const TCell> key, TArrayRef<const TCell> value) {
36+
const auto& [k, v] = Data.emplace_back(
37+
TSerializedCellVec(key),
38+
TSerializedCellVec(value)
39+
);
40+
ByteSize += k.GetBuffer().size() + v.GetBuffer().size();
41+
}
42+
43+
auto&& Flush() {
44+
ByteSize = 0;
45+
return std::move(Data);
46+
}
47+
48+
ui64 Bytes() const {
49+
return ByteSize;
50+
}
51+
52+
ui64 Rows() const {
53+
return Data.size();
54+
}
55+
56+
explicit operator bool() const {
57+
return !Data.empty();
58+
}
59+
60+
private:
61+
TVector<std::pair<TSerializedCellVec, TSerializedCellVec>> Data; // key & value (if any)
62+
ui64 ByteSize = 0;
63+
};
64+
65+
struct TChange {
66+
ui64 Order;
67+
ui64 Group;
68+
ui64 Step;
69+
ui64 TxId;
70+
TPathId PathId;
71+
ui64 BodySize;
72+
TPathId TableId;
73+
ui64 SchemaVersion;
74+
ui64 LockId = 0;
75+
ui64 LockOffset = 0;
76+
77+
TInstant CreatedAt() const {
78+
return Group
79+
? TInstant::MicroSeconds(Group)
80+
: TInstant::MilliSeconds(Step);
81+
}
82+
};
83+
84+
static TVector<TRawTypeValue> MakeKey(TArrayRef<const TCell> cells, TUserTable::TCPtr table) {
85+
TVector<TRawTypeValue> key(Reserve(cells.size()));
86+
87+
Y_ABORT_UNLESS(cells.size() == table->KeyColumnTypes.size());
88+
for (TPos pos = 0; pos < cells.size(); ++pos) {
89+
key.emplace_back(cells.at(pos).AsRef(), table->KeyColumnTypes.at(pos));
90+
}
91+
92+
return key;
93+
}
94+
95+
static std::optional<TVector<TUpdateOp>> MakeRestoreUpdates(TArrayRef<const TCell> cells, TArrayRef<const TTag> tags, TUserTable::TCPtr table) {
96+
Y_ABORT_UNLESS(cells.size() >= 1);
97+
TVector<TUpdateOp> updates(::Reserve(cells.size() - 1));
98+
99+
bool foundSpecialColumn = false;
100+
Y_ABORT_UNLESS(cells.size() == tags.size());
101+
for (TPos pos = 0; pos < cells.size(); ++pos) {
102+
const auto tag = tags.at(pos);
103+
auto it = table->Columns.find(tag);
104+
Y_ABORT_UNLESS(it != table->Columns.end());
105+
if (it->second.Name == "__ydb_incrBackupImpl_deleted") {
106+
if (const auto& cell = cells.at(pos); !cell.IsNull() && cell.AsValue<bool>()) {
107+
return std::nullopt;
108+
}
109+
foundSpecialColumn = true;
110+
continue;
111+
}
112+
updates.emplace_back(tag, ECellOp::Set, TRawTypeValue(cells.at(pos).AsRef(), it->second.Type));
113+
}
114+
Y_ABORT_UNLESS(foundSpecialColumn);
115+
116+
return updates;
117+
}
118+
119+
public:
120+
explicit TIncrementalRestoreScan(
121+
std::function<IActor*()> changeSenderFactory,
122+
ui64 txId,
123+
const TPathId& tablePathId,
124+
const TPathId& targetPathId)
125+
: IActorCallback(static_cast<TReceiveFunc>(&TIncrementalRestoreScan::StateWork), NKikimrServices::TActivity::CDC_STREAM_SCAN_ACTOR)
126+
, ChangeSenderFactory(changeSenderFactory)
127+
// , DataShard{self->SelfId(), self->TabletID()}
128+
, TxId(txId)
129+
, TablePathId(tablePathId)
130+
, TargetPathId(targetPathId)
131+
, ReadVersion({})
132+
, Limits({})
133+
// , ValueTags(InitValueTags(self, tablePathId))
134+
{}
135+
136+
void Registered(TActorSystem*, const TActorId&) override {
137+
ChangeSender = RegisterWithSameMailbox(ChangeSenderFactory());
138+
}
139+
140+
void PassAway() override {
141+
Send(ChangeSender, new TEvents::TEvPoisonPill());
142+
143+
IActorCallback::PassAway();
144+
}
145+
146+
STATEFN(StateWork) {
147+
switch (ev->GetTypeRewrite()) {
148+
// hFunc(TEvDataShard::TEvCdcStreamScanRequest, Handle);
149+
// hFunc(TDataShard::TEvPrivate::TEvCdcStreamScanContinue, Handle);
150+
// hFunc(TEvents::TEvWakeup, Start);
151+
// hFunc(NChangeExchange::TEvChangeExchange::TEvRequestRecords, Handle);
152+
// IgnoreFunc(NChangeExchange::TEvChangeExchange::TEvRemoveRecords);
153+
// hFunc(TEvChangeExchange::TEvAllSent, Handle);
154+
// IgnoreFunc(TDataShard::TEvPrivate::TEvConfirmReadonlyLease);
155+
default: Y_ABORT("unexpected event Type# 0x%08" PRIx32, ev->GetTypeRewrite());
156+
}
157+
}
158+
159+
IScan::TInitialState Prepare(IDriver* driver, TIntrusiveConstPtr<TScheme> scheme) noexcept override {
160+
TlsActivationContext->AsActorContext().RegisterWithSameMailbox(this);
161+
Driver = driver;
162+
Y_ABORT_UNLESS(!LastKey || LastKey->GetCells().size() == scheme->Tags(true).size());
163+
164+
return {EScan::Sleep, {}};
165+
}
166+
167+
EScan Seek(TLead& lead, ui64) noexcept override {
168+
if (LastKey) {
169+
lead.To(ValueTags, LastKey->GetCells(), ESeek::Upper);
170+
} else {
171+
lead.To(ValueTags, {}, ESeek::Lower);
172+
}
173+
174+
return EScan::Feed;
175+
}
176+
177+
EScan Feed(TArrayRef<const TCell> key, const TRow& row) noexcept override {
178+
Buffer.AddRow(key, *row);
179+
if (Buffer.Bytes() < Limits.BatchMaxBytes) {
180+
if (Buffer.Rows() < Limits.BatchMaxRows) {
181+
return EScan::Feed;
182+
}
183+
} else {
184+
if (Buffer.Rows() < Limits.BatchMinRows) {
185+
return EScan::Feed;
186+
}
187+
}
188+
189+
Progress();
190+
return EScan::Sleep;
191+
}
192+
193+
EScan Exhausted() noexcept override {
194+
NoMoreData = true;
195+
196+
if (!Buffer) {
197+
return EScan::Sleep;
198+
}
199+
200+
return Progress();
201+
}
202+
203+
TAutoPtr<IDestructable> Finish(EAbort abort) noexcept override {
204+
// Send(DataShard.ActorId, new TEvDataShard::TEvRestoreFinished{TxId});
205+
206+
if (abort != EAbort::None) {
207+
// Reply(NKikimrTxDataShard::TEvCdcStreamScanResponse::ABORTED);
208+
} else {
209+
// Reply(NKikimrTxDataShard::TEvCdcStreamScanResponse::DONE);
210+
}
211+
212+
PassAway();
213+
return nullptr;
214+
}
215+
216+
void Describe(IOutputStream& o) const noexcept override {
217+
o << "IncrRestoreScan {"
218+
<< " TxId: " << TxId
219+
<< " TablePathId: " << TablePathId
220+
<< " TargetPathId: " << TargetPathId
221+
<< " }";
222+
}
223+
224+
EScan Progress() {
225+
// Stats.RowsProcessed += Buffer.Rows();
226+
// Stats.BytesProcessed += Buffer.Bytes();
227+
228+
// auto& ctx = TlsActivationContext->AsActorContext();
229+
// auto TabletID = [&]() { return DataShard.TabletId; };
230+
// LOG_D("IncrRestore@Progress()"
231+
// << ": Buffer.Rows()# " << Buffer.Rows());
232+
233+
// auto reservationCookie = Self->ReserveChangeQueueCapacity(Buffer.Rows());
234+
auto rows = Buffer.Flush();
235+
TVector<TChange> changeRecords;
236+
TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> records;
237+
238+
// auto table = Self->GetUserTables().at(TablePathId.LocalPathId);
239+
NDataShard::TUserTable::TCPtr table;
240+
for (auto& [k, v] : rows) {
241+
// LOG_D("IncrRestore@Progress()#iter"
242+
// << ": k.GetCells().size()# " << k.GetCells().size() << ", v.GetCells().size()# " << v.GetCells().size());
243+
const auto key = MakeKey(k.GetCells(), table);
244+
const auto& keyTags = table->KeyColumnIds;
245+
NKikimrChangeExchange::TDataChange body;
246+
if (auto updates = MakeRestoreUpdates(v.GetCells(), ValueTags, table); updates) {
247+
Serialize(body, ERowOp::Upsert, key, keyTags, *updates);
248+
} else {
249+
Serialize(body, ERowOp::Erase, key, keyTags, {});
250+
}
251+
auto recordPtr = TChangeRecordBuilder(TChangeRecord::EKind::AsyncIndex)
252+
.WithOrder(++Order)
253+
.WithGroup(0)
254+
.WithStep(ReadVersion.Step)
255+
.WithTxId(ReadVersion.TxId)
256+
.WithPathId(TargetPathId)
257+
.WithTableId(TablePathId)
258+
.WithSchemaVersion(table->GetTableSchemaVersion())
259+
.WithBody(body.SerializeAsString())
260+
.WithSource(TChangeRecord::ESource::InitialScan)
261+
.Build();
262+
263+
const auto& record = *recordPtr;
264+
265+
records.emplace_back(record.GetOrder(), record.GetPathId(), record.GetBody().size());
266+
PendingRecords.emplace(record.GetOrder(), recordPtr);
267+
}
268+
269+
Send(ChangeSender, new NChangeExchange::TEvChangeExchange::TEvEnqueueRecords(records));
270+
271+
if (NoMoreData) {
272+
// Send(ChangeSender, new TEvChangeExchange::TEvNoMoreData());
273+
}
274+
275+
return NoMoreData ? EScan::Sleep : EScan::Feed;
276+
}
277+
private:
278+
// const TDataShardId DataShard;
279+
std::function<IActor*()> ChangeSenderFactory;
280+
TActorId ReplyTo;
281+
const ui64 TxId;
282+
const TPathId TablePathId;
283+
const TPathId TargetPathId;
284+
const TRowVersion ReadVersion;
285+
const TVector<TTag> ValueTags;
286+
const TMaybe<TSerializedCellVec> LastKey;
287+
const TLimits Limits;
288+
IDriver* Driver;
289+
bool NoMoreData;
290+
TBuffer Buffer;
291+
// TStats Stats;
292+
// TDataShard* Self;
293+
ui64 Order = 0;
294+
TActorId ChangeSender;
295+
TMap<ui64, TChangeRecord::TPtr> PendingRecords;
296+
};
297+
298+
THolder<NTable::IScan> CreateIncrementalRestoreScan(
299+
std::function<IActor*()> changeSenderFactory,
300+
TPathId tablePathId,
301+
const TPathId& targetPathId,
302+
ui64 txId)
303+
{
304+
return MakeHolder<TIncrementalRestoreScan>(
305+
changeSenderFactory,
306+
txId,
307+
tablePathId,
308+
targetPathId);
309+
}
310+
311+
} // namespace NKikimr::NDataShard
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#pragma once
2+
3+
#include <ydb/core/scheme/scheme_pathid.h>
4+
#include <ydb/core/tablet_flat/flat_scan_iface.h>
5+
6+
namespace NKikimr::NDataShard {
7+
8+
THolder<NTable::IScan> CreateIncrementalRestoreScan(
9+
TPathId tablePathId,
10+
const TPathId& targetPathId,
11+
ui64 txId);
12+
13+
} // namespace NKikimr::NDataShard
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
UNITTEST_FOR(ydb/core/tx/datashard)
2+
3+
FORK_SUBTESTS()
4+
5+
SPLIT_FACTOR(4)
6+
7+
IF (SANITIZER_TYPE)
8+
REQUIREMENTS(ram:32)
9+
ENDIF()
10+
11+
IF (SANITIZER_TYPE == "thread" OR WITH_VALGRIND)
12+
TIMEOUT(3600)
13+
SIZE(LARGE)
14+
TAG(ya:fat)
15+
ELSE()
16+
TIMEOUT(600)
17+
SIZE(MEDIUM)
18+
ENDIF()
19+
20+
PEERDIR(
21+
ydb/core/tx/datashard/ut_common
22+
library/cpp/getopt
23+
library/cpp/regex/pcre
24+
library/cpp/svnversion
25+
ydb/core/kqp/ut/common
26+
ydb/core/testlib/default
27+
ydb/core/tx
28+
ydb/library/yql/public/udf/service/exception_policy
29+
ydb/public/lib/yson_value
30+
ydb/public/sdk/cpp/client/ydb_datastreams
31+
ydb/public/sdk/cpp/client/ydb_topic
32+
ydb/public/sdk/cpp/client/ydb_persqueue_public
33+
ydb/public/sdk/cpp/client/ydb_result
34+
)
35+
36+
YQL_LAST_ABI_VERSION()
37+
38+
SRCS(
39+
datashard_ut_incremental_restore_scan.cpp
40+
)
41+
42+
END()

0 commit comments

Comments
 (0)