From 0b70abfb8e4773a79e5c7b66d618b389f5ae5c2e Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Thu, 12 Sep 2024 23:29:44 +0000 Subject: [PATCH 01/11] WIP --- ydb/core/tx/datashard/change_exchange_impl.h | 2 + .../datashard_ut_incremental_backup.cpp | 1 - .../datashard_ut_incremental_restore_scan.cpp | 13 + ydb/core/tx/datashard/incr_restore_scan.cpp | 311 ++++++++++++++++++ ydb/core/tx/datashard/incr_restore_scan.h | 13 + .../ut_incremental_restore_scan/ya.make | 42 +++ ydb/core/tx/datashard/ya.make | 2 + 7 files changed, 383 insertions(+), 1 deletion(-) create mode 100644 ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp create mode 100644 ydb/core/tx/datashard/incr_restore_scan.cpp create mode 100644 ydb/core/tx/datashard/incr_restore_scan.h create mode 100644 ydb/core/tx/datashard/ut_incremental_restore_scan/ya.make diff --git a/ydb/core/tx/datashard/change_exchange_impl.h b/ydb/core/tx/datashard/change_exchange_impl.h index b6af2aea263e..591748a5796b 100644 --- a/ydb/core/tx/datashard/change_exchange_impl.h +++ b/ydb/core/tx/datashard/change_exchange_impl.h @@ -3,6 +3,8 @@ #include "defs.h" #include "change_exchange_helpers.h" +#include + namespace NKikimr { namespace NDataShard { diff --git a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp index 9924346ee6ca..9744a9c0ac5e 100644 --- a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp +++ b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp @@ -168,7 +168,6 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { return proto; } - Y_UNIT_TEST(SimpleBackup) { TPortManager portManager; TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) diff --git a/ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp b/ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp new file mode 100644 index 000000000000..490baf227fd5 --- /dev/null +++ b/ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp @@ -0,0 +1,13 @@ +#include "incr_restore_scan.h" + +#include + +namespace NKikimr { + +Y_UNIT_TEST_SUITE(IncrementalRestoreScan) { + Y_UNIT_TEST(Simple) { + + } +} + +} // namespace NKikimr diff --git a/ydb/core/tx/datashard/incr_restore_scan.cpp b/ydb/core/tx/datashard/incr_restore_scan.cpp new file mode 100644 index 000000000000..f9ea3ef9a01b --- /dev/null +++ b/ydb/core/tx/datashard/incr_restore_scan.cpp @@ -0,0 +1,311 @@ +#include "incr_restore_scan.h" +#include "change_exchange_impl.h" + +#include +#include +#include +#include +#include + +namespace NKikimr::NDataShard { + +using namespace NActors; +using namespace NTable; + +class TIncrementalRestoreScan + : public IActorCallback + , public NTable::IScan + , protected TChangeRecordBodySerializer +{ + struct TLimits { + ui32 BatchMaxBytes; + ui32 BatchMinRows; + ui32 BatchMaxRows; + + // TLimits(const NKikimrTxDataShard::TEvCdcStreamScanRequest::TLimits& proto) + // : BatchMaxBytes(proto.GetBatchMaxBytes()) + // , BatchMinRows(proto.GetBatchMinRows()) + // , BatchMaxRows(proto.GetBatchMaxRows()) + // { + // } + }; + + class TBuffer { + public: + void AddRow(TArrayRef key, TArrayRef value) { + const auto& [k, v] = Data.emplace_back( + TSerializedCellVec(key), + TSerializedCellVec(value) + ); + ByteSize += k.GetBuffer().size() + v.GetBuffer().size(); + } + + auto&& Flush() { + ByteSize = 0; + return std::move(Data); + } + + ui64 Bytes() const { + return ByteSize; + } + + ui64 Rows() const { + return Data.size(); + } + + explicit operator bool() const { + return !Data.empty(); + } + + private: + TVector> Data; // key & value (if any) + ui64 ByteSize = 0; + }; + + struct TChange { + ui64 Order; + ui64 Group; + ui64 Step; + ui64 TxId; + TPathId PathId; + ui64 BodySize; + TPathId TableId; + ui64 SchemaVersion; + ui64 LockId = 0; + ui64 LockOffset = 0; + + TInstant CreatedAt() const { + return Group + ? TInstant::MicroSeconds(Group) + : TInstant::MilliSeconds(Step); + } + }; + + static TVector MakeKey(TArrayRef cells, TUserTable::TCPtr table) { + TVector key(Reserve(cells.size())); + + Y_ABORT_UNLESS(cells.size() == table->KeyColumnTypes.size()); + for (TPos pos = 0; pos < cells.size(); ++pos) { + key.emplace_back(cells.at(pos).AsRef(), table->KeyColumnTypes.at(pos)); + } + + return key; + } + + static std::optional> MakeRestoreUpdates(TArrayRef cells, TArrayRef tags, TUserTable::TCPtr table) { + Y_ABORT_UNLESS(cells.size() >= 1); + TVector updates(::Reserve(cells.size() - 1)); + + bool foundSpecialColumn = false; + Y_ABORT_UNLESS(cells.size() == tags.size()); + for (TPos pos = 0; pos < cells.size(); ++pos) { + const auto tag = tags.at(pos); + auto it = table->Columns.find(tag); + Y_ABORT_UNLESS(it != table->Columns.end()); + if (it->second.Name == "__ydb_incrBackupImpl_deleted") { + if (const auto& cell = cells.at(pos); !cell.IsNull() && cell.AsValue()) { + return std::nullopt; + } + foundSpecialColumn = true; + continue; + } + updates.emplace_back(tag, ECellOp::Set, TRawTypeValue(cells.at(pos).AsRef(), it->second.Type)); + } + Y_ABORT_UNLESS(foundSpecialColumn); + + return updates; + } + +public: + explicit TIncrementalRestoreScan( + std::function changeSenderFactory, + ui64 txId, + const TPathId& tablePathId, + const TPathId& targetPathId) + : IActorCallback(static_cast(&TIncrementalRestoreScan::StateWork), NKikimrServices::TActivity::CDC_STREAM_SCAN_ACTOR) + , ChangeSenderFactory(changeSenderFactory) + // , DataShard{self->SelfId(), self->TabletID()} + , TxId(txId) + , TablePathId(tablePathId) + , TargetPathId(targetPathId) + , ReadVersion({}) + , Limits({}) + // , ValueTags(InitValueTags(self, tablePathId)) + {} + + void Registered(TActorSystem*, const TActorId&) override { + ChangeSender = RegisterWithSameMailbox(ChangeSenderFactory()); + } + + void PassAway() override { + Send(ChangeSender, new TEvents::TEvPoisonPill()); + + IActorCallback::PassAway(); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + // hFunc(TEvDataShard::TEvCdcStreamScanRequest, Handle); + // hFunc(TDataShard::TEvPrivate::TEvCdcStreamScanContinue, Handle); + // hFunc(TEvents::TEvWakeup, Start); + // hFunc(NChangeExchange::TEvChangeExchange::TEvRequestRecords, Handle); + // IgnoreFunc(NChangeExchange::TEvChangeExchange::TEvRemoveRecords); + // hFunc(TEvChangeExchange::TEvAllSent, Handle); + // IgnoreFunc(TDataShard::TEvPrivate::TEvConfirmReadonlyLease); + default: Y_ABORT("unexpected event Type# 0x%08" PRIx32, ev->GetTypeRewrite()); + } + } + + IScan::TInitialState Prepare(IDriver* driver, TIntrusiveConstPtr scheme) noexcept override { + TlsActivationContext->AsActorContext().RegisterWithSameMailbox(this); + Driver = driver; + Y_ABORT_UNLESS(!LastKey || LastKey->GetCells().size() == scheme->Tags(true).size()); + + return {EScan::Sleep, {}}; + } + + EScan Seek(TLead& lead, ui64) noexcept override { + if (LastKey) { + lead.To(ValueTags, LastKey->GetCells(), ESeek::Upper); + } else { + lead.To(ValueTags, {}, ESeek::Lower); + } + + return EScan::Feed; + } + + EScan Feed(TArrayRef key, const TRow& row) noexcept override { + Buffer.AddRow(key, *row); + if (Buffer.Bytes() < Limits.BatchMaxBytes) { + if (Buffer.Rows() < Limits.BatchMaxRows) { + return EScan::Feed; + } + } else { + if (Buffer.Rows() < Limits.BatchMinRows) { + return EScan::Feed; + } + } + + Progress(); + return EScan::Sleep; + } + + EScan Exhausted() noexcept override { + NoMoreData = true; + + if (!Buffer) { + return EScan::Sleep; + } + + return Progress(); + } + + TAutoPtr Finish(EAbort abort) noexcept override { + // Send(DataShard.ActorId, new TEvDataShard::TEvRestoreFinished{TxId}); + + if (abort != EAbort::None) { + // Reply(NKikimrTxDataShard::TEvCdcStreamScanResponse::ABORTED); + } else { + // Reply(NKikimrTxDataShard::TEvCdcStreamScanResponse::DONE); + } + + PassAway(); + return nullptr; + } + + void Describe(IOutputStream& o) const noexcept override { + o << "IncrRestoreScan {" + << " TxId: " << TxId + << " TablePathId: " << TablePathId + << " TargetPathId: " << TargetPathId + << " }"; + } + + EScan Progress() { + // Stats.RowsProcessed += Buffer.Rows(); + // Stats.BytesProcessed += Buffer.Bytes(); + + // auto& ctx = TlsActivationContext->AsActorContext(); + // auto TabletID = [&]() { return DataShard.TabletId; }; + // LOG_D("IncrRestore@Progress()" + // << ": Buffer.Rows()# " << Buffer.Rows()); + + // auto reservationCookie = Self->ReserveChangeQueueCapacity(Buffer.Rows()); + auto rows = Buffer.Flush(); + TVector changeRecords; + TVector records; + + // auto table = Self->GetUserTables().at(TablePathId.LocalPathId); + NDataShard::TUserTable::TCPtr table; + for (auto& [k, v] : rows) { + // LOG_D("IncrRestore@Progress()#iter" + // << ": k.GetCells().size()# " << k.GetCells().size() << ", v.GetCells().size()# " << v.GetCells().size()); + const auto key = MakeKey(k.GetCells(), table); + const auto& keyTags = table->KeyColumnIds; + NKikimrChangeExchange::TDataChange body; + if (auto updates = MakeRestoreUpdates(v.GetCells(), ValueTags, table); updates) { + Serialize(body, ERowOp::Upsert, key, keyTags, *updates); + } else { + Serialize(body, ERowOp::Erase, key, keyTags, {}); + } + auto recordPtr = TChangeRecordBuilder(TChangeRecord::EKind::AsyncIndex) + .WithOrder(++Order) + .WithGroup(0) + .WithStep(ReadVersion.Step) + .WithTxId(ReadVersion.TxId) + .WithPathId(TargetPathId) + .WithTableId(TablePathId) + .WithSchemaVersion(table->GetTableSchemaVersion()) + .WithBody(body.SerializeAsString()) + .WithSource(TChangeRecord::ESource::InitialScan) + .Build(); + + const auto& record = *recordPtr; + + records.emplace_back(record.GetOrder(), record.GetPathId(), record.GetBody().size()); + PendingRecords.emplace(record.GetOrder(), recordPtr); + } + + Send(ChangeSender, new NChangeExchange::TEvChangeExchange::TEvEnqueueRecords(records)); + + if (NoMoreData) { + // Send(ChangeSender, new TEvChangeExchange::TEvNoMoreData()); + } + + return NoMoreData ? EScan::Sleep : EScan::Feed; + } +private: + // const TDataShardId DataShard; + std::function ChangeSenderFactory; + TActorId ReplyTo; + const ui64 TxId; + const TPathId TablePathId; + const TPathId TargetPathId; + const TRowVersion ReadVersion; + const TVector ValueTags; + const TMaybe LastKey; + const TLimits Limits; + IDriver* Driver; + bool NoMoreData; + TBuffer Buffer; + // TStats Stats; + // TDataShard* Self; + ui64 Order = 0; + TActorId ChangeSender; + TMap PendingRecords; +}; + +THolder CreateIncrementalRestoreScan( + std::function changeSenderFactory, + TPathId tablePathId, + const TPathId& targetPathId, + ui64 txId) +{ + return MakeHolder( + changeSenderFactory, + txId, + tablePathId, + targetPathId); +} + +} // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/incr_restore_scan.h b/ydb/core/tx/datashard/incr_restore_scan.h new file mode 100644 index 000000000000..0953365e367e --- /dev/null +++ b/ydb/core/tx/datashard/incr_restore_scan.h @@ -0,0 +1,13 @@ +#pragma once + +#include +#include + +namespace NKikimr::NDataShard { + +THolder CreateIncrementalRestoreScan( + TPathId tablePathId, + const TPathId& targetPathId, + ui64 txId); + +} // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/ut_incremental_restore_scan/ya.make b/ydb/core/tx/datashard/ut_incremental_restore_scan/ya.make new file mode 100644 index 000000000000..0067f0623b5e --- /dev/null +++ b/ydb/core/tx/datashard/ut_incremental_restore_scan/ya.make @@ -0,0 +1,42 @@ +UNITTEST_FOR(ydb/core/tx/datashard) + +FORK_SUBTESTS() + +SPLIT_FACTOR(4) + +IF (SANITIZER_TYPE) + REQUIREMENTS(ram:32) +ENDIF() + +IF (SANITIZER_TYPE == "thread" OR WITH_VALGRIND) + TIMEOUT(3600) + SIZE(LARGE) + TAG(ya:fat) +ELSE() + TIMEOUT(600) + SIZE(MEDIUM) +ENDIF() + +PEERDIR( + ydb/core/tx/datashard/ut_common + library/cpp/getopt + library/cpp/regex/pcre + library/cpp/svnversion + ydb/core/kqp/ut/common + ydb/core/testlib/default + ydb/core/tx + ydb/library/yql/public/udf/service/exception_policy + ydb/public/lib/yson_value + ydb/public/sdk/cpp/client/ydb_datastreams + ydb/public/sdk/cpp/client/ydb_topic + ydb/public/sdk/cpp/client/ydb_persqueue_public + ydb/public/sdk/cpp/client/ydb_result +) + +YQL_LAST_ABI_VERSION() + +SRCS( + datashard_ut_incremental_restore_scan.cpp +) + +END() diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make index 70fd4a70b335..e267147f198b 100644 --- a/ydb/core/tx/datashard/ya.make +++ b/ydb/core/tx/datashard/ya.make @@ -160,6 +160,7 @@ SRCS( finish_propose_write_unit.cpp follower_edge.cpp incr_restore_helpers.cpp + incr_restore_scan.cpp initiate_build_index_unit.cpp key_conflicts.cpp key_conflicts.h @@ -304,6 +305,7 @@ RECURSE_FOR_TESTS( ut_erase_rows ut_followers ut_incremental_backup + ut_incremental_restore_scan ut_init ut_keys ut_kqp From b7fb79720b299eafac5bcb9328c602dd8cdbb411 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Fri, 13 Sep 2024 16:25:06 +0000 Subject: [PATCH 02/11] WIP --- ydb/core/tx/datashard/incr_restore_scan.cpp | 109 ++------------------ ydb/core/tx/datashard/stream_scan_common.h | 1 + 2 files changed, 10 insertions(+), 100 deletions(-) diff --git a/ydb/core/tx/datashard/incr_restore_scan.cpp b/ydb/core/tx/datashard/incr_restore_scan.cpp index f9ea3ef9a01b..6b6a55548233 100644 --- a/ydb/core/tx/datashard/incr_restore_scan.cpp +++ b/ydb/core/tx/datashard/incr_restore_scan.cpp @@ -6,6 +6,10 @@ #include #include #include +#include +#include +#include +#include namespace NKikimr::NDataShard { @@ -17,104 +21,9 @@ class TIncrementalRestoreScan , public NTable::IScan , protected TChangeRecordBodySerializer { - struct TLimits { - ui32 BatchMaxBytes; - ui32 BatchMinRows; - ui32 BatchMaxRows; - - // TLimits(const NKikimrTxDataShard::TEvCdcStreamScanRequest::TLimits& proto) - // : BatchMaxBytes(proto.GetBatchMaxBytes()) - // , BatchMinRows(proto.GetBatchMinRows()) - // , BatchMaxRows(proto.GetBatchMaxRows()) - // { - // } - }; - - class TBuffer { - public: - void AddRow(TArrayRef key, TArrayRef value) { - const auto& [k, v] = Data.emplace_back( - TSerializedCellVec(key), - TSerializedCellVec(value) - ); - ByteSize += k.GetBuffer().size() + v.GetBuffer().size(); - } - - auto&& Flush() { - ByteSize = 0; - return std::move(Data); - } - - ui64 Bytes() const { - return ByteSize; - } - - ui64 Rows() const { - return Data.size(); - } - - explicit operator bool() const { - return !Data.empty(); - } - - private: - TVector> Data; // key & value (if any) - ui64 ByteSize = 0; - }; - - struct TChange { - ui64 Order; - ui64 Group; - ui64 Step; - ui64 TxId; - TPathId PathId; - ui64 BodySize; - TPathId TableId; - ui64 SchemaVersion; - ui64 LockId = 0; - ui64 LockOffset = 0; - - TInstant CreatedAt() const { - return Group - ? TInstant::MicroSeconds(Group) - : TInstant::MilliSeconds(Step); - } - }; - - static TVector MakeKey(TArrayRef cells, TUserTable::TCPtr table) { - TVector key(Reserve(cells.size())); - - Y_ABORT_UNLESS(cells.size() == table->KeyColumnTypes.size()); - for (TPos pos = 0; pos < cells.size(); ++pos) { - key.emplace_back(cells.at(pos).AsRef(), table->KeyColumnTypes.at(pos)); - } - - return key; - } - - static std::optional> MakeRestoreUpdates(TArrayRef cells, TArrayRef tags, TUserTable::TCPtr table) { - Y_ABORT_UNLESS(cells.size() >= 1); - TVector updates(::Reserve(cells.size() - 1)); - - bool foundSpecialColumn = false; - Y_ABORT_UNLESS(cells.size() == tags.size()); - for (TPos pos = 0; pos < cells.size(); ++pos) { - const auto tag = tags.at(pos); - auto it = table->Columns.find(tag); - Y_ABORT_UNLESS(it != table->Columns.end()); - if (it->second.Name == "__ydb_incrBackupImpl_deleted") { - if (const auto& cell = cells.at(pos); !cell.IsNull() && cell.AsValue()) { - return std::nullopt; - } - foundSpecialColumn = true; - continue; - } - updates.emplace_back(tag, ECellOp::Set, TRawTypeValue(cells.at(pos).AsRef(), it->second.Type)); - } - Y_ABORT_UNLESS(foundSpecialColumn); - - return updates; - } + using TLimits = NStreamScan::TLimits; + using TBuffer = NStreamScan::TBuffer; + using TChange = IDataShardChangeCollector::TChange; public: explicit TIncrementalRestoreScan( @@ -240,10 +149,10 @@ class TIncrementalRestoreScan for (auto& [k, v] : rows) { // LOG_D("IncrRestore@Progress()#iter" // << ": k.GetCells().size()# " << k.GetCells().size() << ", v.GetCells().size()# " << v.GetCells().size()); - const auto key = MakeKey(k.GetCells(), table); + const auto key = NStreamScan::MakeKey(k.GetCells(), table); const auto& keyTags = table->KeyColumnIds; NKikimrChangeExchange::TDataChange body; - if (auto updates = MakeRestoreUpdates(v.GetCells(), ValueTags, table); updates) { + if (auto updates = NIncrRestoreHelpers::MakeRestoreUpdates(v.GetCells(), ValueTags, table); updates) { Serialize(body, ERowOp::Upsert, key, keyTags, *updates); } else { Serialize(body, ERowOp::Erase, key, keyTags, {}); diff --git a/ydb/core/tx/datashard/stream_scan_common.h b/ydb/core/tx/datashard/stream_scan_common.h index cc67a17e91d6..2ea325f91457 100644 --- a/ydb/core/tx/datashard/stream_scan_common.h +++ b/ydb/core/tx/datashard/stream_scan_common.h @@ -21,6 +21,7 @@ struct TLimits { ui32 BatchMaxRows; TLimits(const NKikimrTxDataShard::TEvCdcStreamScanRequest_TLimits& proto); + TLimits() = default; }; class TBuffer { From 2f5345256edd200194322d7a9657b70b99088512 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Fri, 13 Sep 2024 20:18:53 +0000 Subject: [PATCH 03/11] WIP --- ydb/core/base/events.h | 1 + ydb/core/tx/datashard/cdc_stream_scan.cpp | 4 +- .../tx/datashard/incr_restore_helpers.cpp | 6 +- ydb/core/tx/datashard/incr_restore_helpers.h | 2 +- ydb/core/tx/datashard/incr_restore_scan.cpp | 90 ++++++++++++------- ydb/core/tx/datashard/incr_restore_scan.h | 22 +++++ ydb/core/tx/datashard/stream_scan_common.cpp | 6 +- ydb/core/tx/datashard/stream_scan_common.h | 2 +- ydb/library/services/services.proto | 1 + 9 files changed, 94 insertions(+), 40 deletions(-) diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index 8a83f92c6708..dbfa09a39f24 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -181,6 +181,7 @@ struct TKikimrEvents : TEvents { ES_LIMITER = 4258, ES_MEMORY = 4259, ES_GROUPED_ALLOCATIONS_MANAGER = 4260, + ES_INCREMENTAL_RESTORE_SCAN = 4261, }; }; diff --git a/ydb/core/tx/datashard/cdc_stream_scan.cpp b/ydb/core/tx/datashard/cdc_stream_scan.cpp index aa53a055ac8d..b9eef0f1c642 100644 --- a/ydb/core/tx/datashard/cdc_stream_scan.cpp +++ b/ydb/core/tx/datashard/cdc_stream_scan.cpp @@ -276,7 +276,7 @@ class TDataShard::TTxCdcStreamScanProgress bool pageFault = false; for (const auto& [k, v] : ev.Rows) { - const auto key = NStreamScan::MakeKey(k.GetCells(), table); + const auto key = NStreamScan::MakeKey(k.GetCells(), table->KeyColumnTypes); const auto& keyTags = table->KeyColumnIds; TRowState row(0); @@ -299,7 +299,7 @@ class TDataShard::TTxCdcStreamScanProgress Serialize(body, ERowOp::Upsert, key, keyTags, MakeUpdates(v.GetCells(), valueTags, table)); break; case NKikimrSchemeOp::ECdcStreamModeRestoreIncrBackup: - if (auto updates = NIncrRestoreHelpers::MakeRestoreUpdates(v.GetCells(), valueTags, table); updates) { + if (auto updates = NIncrRestoreHelpers::MakeRestoreUpdates(v.GetCells(), valueTags, table->Columns); updates) { Serialize(body, ERowOp::Upsert, key, keyTags, *updates); } else { Serialize(body, ERowOp::Erase, key, keyTags, {}); diff --git a/ydb/core/tx/datashard/incr_restore_helpers.cpp b/ydb/core/tx/datashard/incr_restore_helpers.cpp index 12bc6e46a1c8..88aa55c1c099 100644 --- a/ydb/core/tx/datashard/incr_restore_helpers.cpp +++ b/ydb/core/tx/datashard/incr_restore_helpers.cpp @@ -2,7 +2,7 @@ namespace NKikimr::NDataShard::NIncrRestoreHelpers { -std::optional> MakeRestoreUpdates(TArrayRef cells, TArrayRef tags, TUserTable::TCPtr table) { +std::optional> MakeRestoreUpdates(TArrayRef cells, TArrayRef tags, const TMap& columns) { Y_ABORT_UNLESS(cells.size() >= 1); TVector updates(::Reserve(cells.size() - 1)); @@ -10,8 +10,8 @@ std::optional> MakeRestoreUpdates(TArrayRef cell Y_ABORT_UNLESS(cells.size() == tags.size()); for (TPos pos = 0; pos < cells.size(); ++pos) { const auto tag = tags.at(pos); - auto it = table->Columns.find(tag); - Y_ABORT_UNLESS(it != table->Columns.end()); + auto it = columns.find(tag); + Y_ABORT_UNLESS(it != columns.end()); if (it->second.Name == "__ydb_incrBackupImpl_deleted") { if (const auto& cell = cells.at(pos); !cell.IsNull() && cell.AsValue()) { return std::nullopt; diff --git a/ydb/core/tx/datashard/incr_restore_helpers.h b/ydb/core/tx/datashard/incr_restore_helpers.h index a011283d0ec0..a33031257069 100644 --- a/ydb/core/tx/datashard/incr_restore_helpers.h +++ b/ydb/core/tx/datashard/incr_restore_helpers.h @@ -11,6 +11,6 @@ namespace NKikimr::NDataShard::NIncrRestoreHelpers { using namespace NTable; -std::optional> MakeRestoreUpdates(TArrayRef cells, TArrayRef tags, TUserTable::TCPtr table); +std::optional> MakeRestoreUpdates(TArrayRef cells, TArrayRef tags, const TMap& columns); } // namespace NKikimr::NBackup::NImpl diff --git a/ydb/core/tx/datashard/incr_restore_scan.cpp b/ydb/core/tx/datashard/incr_restore_scan.cpp index 6b6a55548233..b9efb9574888 100644 --- a/ydb/core/tx/datashard/incr_restore_scan.cpp +++ b/ydb/core/tx/datashard/incr_restore_scan.cpp @@ -1,5 +1,6 @@ #include "incr_restore_scan.h" #include "change_exchange_impl.h" +#include "datashard_impl.h" #include #include @@ -27,21 +28,25 @@ class TIncrementalRestoreScan public: explicit TIncrementalRestoreScan( + TActorId parent, std::function changeSenderFactory, ui64 txId, const TPathId& tablePathId, const TPathId& targetPathId) : IActorCallback(static_cast(&TIncrementalRestoreScan::StateWork), NKikimrServices::TActivity::CDC_STREAM_SCAN_ACTOR) + , Parent(parent) , ChangeSenderFactory(changeSenderFactory) - // , DataShard{self->SelfId(), self->TabletID()} , TxId(txId) , TablePathId(tablePathId) , TargetPathId(targetPathId) - , ReadVersion({}) - , Limits({}) + , Limits() // , ValueTags(InitValueTags(self, tablePathId)) {} + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::INCREMENTAL_RESTORE_SCAN_ACTOR; + } + void Registered(TActorSystem*, const TActorId&) override { ChangeSender = RegisterWithSameMailbox(ChangeSenderFactory()); } @@ -52,16 +57,43 @@ class TIncrementalRestoreScan IActorCallback::PassAway(); } + void Start(TEvents::TEvWakeup::TPtr&) { + Driver->Touch(EScan::Feed); + } + + void Handle(NChangeExchange::TEvChangeExchange::TEvRequestRecords::TPtr& ev) { + // LOG_D("Handltypename e " << ev->Get()->ToString()); + + TVector records(::Reserve(ev->Get()->Records.size())); + + for (const auto& record : ev->Get()->Records) { + auto it = PendingRecords.find(record.Order); + Y_ABORT_UNLESS(it != PendingRecords.end()); + records.emplace_back(it->second); + } + + Send(ChangeSender, new NChangeExchange::TEvChangeExchange::TEvRecords( + std::make_shared>(std::move(records)))); + } + + void Handle(NChangeExchange::TEvChangeExchange::TEvRemoveRecords::TPtr& ev) { + // LOG_D("Handltypename e " << ev->Get()->ToString()); + + for (auto recordId : ev->Get()->Records) { + PendingRecords.erase(recordId); + } + } + + void Handle(TEvIncrementalRestoreScan::TEvFinished::TPtr&) { + Driver->Touch(EScan::Final); + } + STATEFN(StateWork) { switch (ev->GetTypeRewrite()) { - // hFunc(TEvDataShard::TEvCdcStreamScanRequest, Handle); - // hFunc(TDataShard::TEvPrivate::TEvCdcStreamScanContinue, Handle); - // hFunc(TEvents::TEvWakeup, Start); - // hFunc(NChangeExchange::TEvChangeExchange::TEvRequestRecords, Handle); - // IgnoreFunc(NChangeExchange::TEvChangeExchange::TEvRemoveRecords); - // hFunc(TEvChangeExchange::TEvAllSent, Handle); - // IgnoreFunc(TDataShard::TEvPrivate::TEvConfirmReadonlyLease); - default: Y_ABORT("unexpected event Type# 0x%08" PRIx32, ev->GetTypeRewrite()); + hFunc(TEvents::TEvWakeup, Start); + hFunc(NChangeExchange::TEvChangeExchange::TEvRequestRecords, Handle); + hFunc(NChangeExchange::TEvChangeExchange::TEvRemoveRecords, Handle); + hFunc(TEvIncrementalRestoreScan::TEvFinished, Handle); } } @@ -110,7 +142,7 @@ class TIncrementalRestoreScan } TAutoPtr Finish(EAbort abort) noexcept override { - // Send(DataShard.ActorId, new TEvDataShard::TEvRestoreFinished{TxId}); + Send(Parent, new TEvIncrementalRestoreScan::TEvFinished{}); if (abort != EAbort::None) { // Reply(NKikimrTxDataShard::TEvCdcStreamScanResponse::ABORTED); @@ -139,20 +171,17 @@ class TIncrementalRestoreScan // LOG_D("IncrRestore@Progress()" // << ": Buffer.Rows()# " << Buffer.Rows()); - // auto reservationCookie = Self->ReserveChangeQueueCapacity(Buffer.Rows()); auto rows = Buffer.Flush(); TVector changeRecords; TVector records; - // auto table = Self->GetUserTables().at(TablePathId.LocalPathId); - NDataShard::TUserTable::TCPtr table; for (auto& [k, v] : rows) { // LOG_D("IncrRestore@Progress()#iter" // << ": k.GetCells().size()# " << k.GetCells().size() << ", v.GetCells().size()# " << v.GetCells().size()); - const auto key = NStreamScan::MakeKey(k.GetCells(), table); - const auto& keyTags = table->KeyColumnIds; + const auto key = NStreamScan::MakeKey(k.GetCells(), KeyColumnTypes); + const auto& keyTags = KeyColumnIds; NKikimrChangeExchange::TDataChange body; - if (auto updates = NIncrRestoreHelpers::MakeRestoreUpdates(v.GetCells(), ValueTags, table); updates) { + if (auto updates = NIncrRestoreHelpers::MakeRestoreUpdates(v.GetCells(), ValueTags, Columns); updates) { Serialize(body, ERowOp::Upsert, key, keyTags, *updates); } else { Serialize(body, ERowOp::Erase, key, keyTags, {}); @@ -160,11 +189,9 @@ class TIncrementalRestoreScan auto recordPtr = TChangeRecordBuilder(TChangeRecord::EKind::AsyncIndex) .WithOrder(++Order) .WithGroup(0) - .WithStep(ReadVersion.Step) - .WithTxId(ReadVersion.TxId) .WithPathId(TargetPathId) .WithTableId(TablePathId) - .WithSchemaVersion(table->GetTableSchemaVersion()) + // .WithSchemaVersion(ReadVersion) // TODO(use SchemaVersion) .WithBody(body.SerializeAsString()) .WithSource(TChangeRecord::ESource::InitialScan) .Build(); @@ -178,39 +205,42 @@ class TIncrementalRestoreScan Send(ChangeSender, new NChangeExchange::TEvChangeExchange::TEvEnqueueRecords(records)); if (NoMoreData) { - // Send(ChangeSender, new TEvChangeExchange::TEvNoMoreData()); + Send(ChangeSender, new TEvIncrementalRestoreScan::TEvNoMoreData()); + return EScan::Sleep; } - return NoMoreData ? EScan::Sleep : EScan::Feed; + return EScan::Feed; } private: - // const TDataShardId DataShard; + TActorId Parent; std::function ChangeSenderFactory; - TActorId ReplyTo; const ui64 TxId; const TPathId TablePathId; const TPathId TargetPathId; - const TRowVersion ReadVersion; const TVector ValueTags; const TMaybe LastKey; const TLimits Limits; IDriver* Driver; bool NoMoreData; TBuffer Buffer; - // TStats Stats; - // TDataShard* Self; ui64 Order = 0; TActorId ChangeSender; TMap PendingRecords; + + TMap Columns; + TVector KeyColumnTypes; + TVector KeyColumnIds; }; THolder CreateIncrementalRestoreScan( - std::function changeSenderFactory, + NActors::TActorId parent, + std::function changeSenderFactory, TPathId tablePathId, const TPathId& targetPathId, ui64 txId) { - return MakeHolder( + return MakeHolder( + parent, changeSenderFactory, txId, tablePathId, diff --git a/ydb/core/tx/datashard/incr_restore_scan.h b/ydb/core/tx/datashard/incr_restore_scan.h index 0953365e367e..b549630cba32 100644 --- a/ydb/core/tx/datashard/incr_restore_scan.h +++ b/ydb/core/tx/datashard/incr_restore_scan.h @@ -2,10 +2,32 @@ #include #include +#include +#include + +#include namespace NKikimr::NDataShard { +using namespace NActors; + +struct TEvIncrementalRestoreScan { + enum EEv { + EvNoMoreData = EventSpaceBegin(TKikimrEvents::ES_INCREMENTAL_RESTORE_SCAN), + EvFinished, + + EvEnd, + }; + + static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_INCREMENTAL_RESTORE_SCAN)); + + struct TEvNoMoreData: public TEventLocal {}; + struct TEvFinished: public TEventLocal {}; +}; + THolder CreateIncrementalRestoreScan( + NActors::TActorId parent, + std::function changeSenderFactory, TPathId tablePathId, const TPathId& targetPathId, ui64 txId); diff --git a/ydb/core/tx/datashard/stream_scan_common.cpp b/ydb/core/tx/datashard/stream_scan_common.cpp index 8c2f1315745d..519c605151a6 100644 --- a/ydb/core/tx/datashard/stream_scan_common.cpp +++ b/ydb/core/tx/datashard/stream_scan_common.cpp @@ -13,12 +13,12 @@ TLimits::TLimits(const NKikimrTxDataShard::TEvCdcStreamScanRequest_TLimits& prot { } -TVector MakeKey(TArrayRef cells, TUserTable::TCPtr table) { +TVector MakeKey(TArrayRef cells, const TVector& keyColumnTypes) { TVector key(Reserve(cells.size())); - Y_ABORT_UNLESS(cells.size() == table->KeyColumnTypes.size()); + Y_ABORT_UNLESS(cells.size() == keyColumnTypes.size()); for (TPos pos = 0; pos < cells.size(); ++pos) { - key.emplace_back(cells.at(pos).AsRef(), table->KeyColumnTypes.at(pos)); + key.emplace_back(cells.at(pos).AsRef(), keyColumnTypes.at(pos)); } return key; diff --git a/ydb/core/tx/datashard/stream_scan_common.h b/ydb/core/tx/datashard/stream_scan_common.h index 2ea325f91457..8b0728db9360 100644 --- a/ydb/core/tx/datashard/stream_scan_common.h +++ b/ydb/core/tx/datashard/stream_scan_common.h @@ -13,7 +13,7 @@ class TEvCdcStreamScanRequest_TLimits; namespace NKikimr::NDataShard::NStreamScan { -TVector MakeKey(TArrayRef cells, TUserTable::TCPtr table); +TVector MakeKey(TArrayRef cells, const TVector& keyColumnTypes); struct TLimits { ui32 BatchMaxBytes; diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index 834323831e11..075cae1c12ec 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -1054,5 +1054,6 @@ message TActivity { BUILD_COLUMNS_SCAN_ACTOR = 645; SAMPLE_K_UPLOAD_ACTOR = 646; LOCAL_KMEANS_SCAN_ACTOR = 647; + INCREMENTAL_RESTORE_SCAN_ACTOR = 648; }; }; From 98c6197500acb3d98341b5358eefdf4e30dd0a89 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Fri, 13 Sep 2024 21:57:55 +0000 Subject: [PATCH 04/11] WIP --- ydb/core/tx/datashard/incr_restore_scan.cpp | 96 ++++++++++++++------- ydb/core/tx/datashard/incr_restore_scan.h | 2 + 2 files changed, 65 insertions(+), 33 deletions(-) diff --git a/ydb/core/tx/datashard/incr_restore_scan.cpp b/ydb/core/tx/datashard/incr_restore_scan.cpp index b9efb9574888..00d19e6502ea 100644 --- a/ydb/core/tx/datashard/incr_restore_scan.cpp +++ b/ydb/core/tx/datashard/incr_restore_scan.cpp @@ -26,23 +26,48 @@ class TIncrementalRestoreScan using TBuffer = NStreamScan::TBuffer; using TChange = IDataShardChangeCollector::TChange; + TStringBuf GetLogPrefix() const { + if (!LogPrefix) { + LogPrefix = TStringBuilder() + << "[TIncrementalRestoreScan]" + << "[" << TxId << "]" + << "[" << SourcePathId << "]" + << "[" << SourcePathId << "]" + << SelfId() /* contains brackets */ << " "; + } + + return LogPrefix.GetRef(); + } public: explicit TIncrementalRestoreScan( - TActorId parent, - std::function changeSenderFactory, - ui64 txId, - const TPathId& tablePathId, - const TPathId& targetPathId) + TActorId parent, + std::function changeSenderFactory, + ui64 txId, + const TPathId& sourcePathId, + TUserTable::TCPtr table, + const TPathId& targetPathId) : IActorCallback(static_cast(&TIncrementalRestoreScan::StateWork), NKikimrServices::TActivity::CDC_STREAM_SCAN_ACTOR) , Parent(parent) , ChangeSenderFactory(changeSenderFactory) , TxId(txId) - , TablePathId(tablePathId) + , SourcePathId(sourcePathId) , TargetPathId(targetPathId) + , ValueTags(InitValueTags(table)) , Limits() - // , ValueTags(InitValueTags(self, tablePathId)) {} + static TVector InitValueTags(TUserTable::TCPtr table) { + TVector valueTags; + valueTags.reserve(table->Columns.size() - 1); + for (const auto& [tag, column] : table->Columns) { + if (!column.IsKey) { + valueTags.push_back(tag); + } + } + + return valueTags; + } + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::INCREMENTAL_RESTORE_SCAN_ACTOR; } @@ -57,12 +82,14 @@ class TIncrementalRestoreScan IActorCallback::PassAway(); } - void Start(TEvents::TEvWakeup::TPtr&) { + void Start(TEvents::TEvWakeup::TPtr& ev) { + LOG_D("Handle TEvents::TEvWakeup " << ev->Get()->ToString()); + Driver->Touch(EScan::Feed); } void Handle(NChangeExchange::TEvChangeExchange::TEvRequestRecords::TPtr& ev) { - // LOG_D("Handltypename e " << ev->Get()->ToString()); + LOG_D("Handle TEvChangeExchange::TEvRequestRecords " << ev->Get()->ToString()); TVector records(::Reserve(ev->Get()->Records.size())); @@ -77,14 +104,16 @@ class TIncrementalRestoreScan } void Handle(NChangeExchange::TEvChangeExchange::TEvRemoveRecords::TPtr& ev) { - // LOG_D("Handltypename e " << ev->Get()->ToString()); + LOG_D("Handle TEvChangeExchange::TEvRemoveRecords " << ev->Get()->ToString()); for (auto recordId : ev->Get()->Records) { PendingRecords.erase(recordId); } } - void Handle(TEvIncrementalRestoreScan::TEvFinished::TPtr&) { + void Handle(TEvIncrementalRestoreScan::TEvFinished::TPtr& ev) { + LOG_D("Handle TEvIncrementalRestoreScan::TEvFinished " << ev->Get()->ToString()); + Driver->Touch(EScan::Final); } @@ -106,6 +135,8 @@ class TIncrementalRestoreScan } EScan Seek(TLead& lead, ui64) noexcept override { + LOG_D("Seek"); + if (LastKey) { lead.To(ValueTags, LastKey->GetCells(), ESeek::Upper); } else { @@ -116,6 +147,7 @@ class TIncrementalRestoreScan } EScan Feed(TArrayRef key, const TRow& row) noexcept override { + Buffer.AddRow(key, *row); if (Buffer.Bytes() < Limits.BatchMaxBytes) { if (Buffer.Rows() < Limits.BatchMaxRows) { @@ -132,9 +164,12 @@ class TIncrementalRestoreScan } EScan Exhausted() noexcept override { + LOG_D("Exhausted"); + NoMoreData = true; if (!Buffer) { + Send(ChangeSender, new TEvIncrementalRestoreScan::TEvNoMoreData()); return EScan::Sleep; } @@ -142,12 +177,12 @@ class TIncrementalRestoreScan } TAutoPtr Finish(EAbort abort) noexcept override { - Send(Parent, new TEvIncrementalRestoreScan::TEvFinished{}); + LOG_D("Finish " << abort); if (abort != EAbort::None) { - // Reply(NKikimrTxDataShard::TEvCdcStreamScanResponse::ABORTED); + Send(Parent, new TEvIncrementalRestoreScan::TEvFinished{}); } else { - // Reply(NKikimrTxDataShard::TEvCdcStreamScanResponse::DONE); + Send(Parent, new TEvIncrementalRestoreScan::TEvFinished{}); } PassAway(); @@ -157,27 +192,17 @@ class TIncrementalRestoreScan void Describe(IOutputStream& o) const noexcept override { o << "IncrRestoreScan {" << " TxId: " << TxId - << " TablePathId: " << TablePathId + << " SourcePathId: " << SourcePathId << " TargetPathId: " << TargetPathId << " }"; } EScan Progress() { - // Stats.RowsProcessed += Buffer.Rows(); - // Stats.BytesProcessed += Buffer.Bytes(); - - // auto& ctx = TlsActivationContext->AsActorContext(); - // auto TabletID = [&]() { return DataShard.TabletId; }; - // LOG_D("IncrRestore@Progress()" - // << ": Buffer.Rows()# " << Buffer.Rows()); - auto rows = Buffer.Flush(); TVector changeRecords; TVector records; for (auto& [k, v] : rows) { - // LOG_D("IncrRestore@Progress()#iter" - // << ": k.GetCells().size()# " << k.GetCells().size() << ", v.GetCells().size()# " << v.GetCells().size()); const auto key = NStreamScan::MakeKey(k.GetCells(), KeyColumnTypes); const auto& keyTags = KeyColumnIds; NKikimrChangeExchange::TDataChange body; @@ -186,11 +211,11 @@ class TIncrementalRestoreScan } else { Serialize(body, ERowOp::Erase, key, keyTags, {}); } - auto recordPtr = TChangeRecordBuilder(TChangeRecord::EKind::AsyncIndex) + auto recordPtr = TChangeRecordBuilder(TChangeRecord::EKind::IncrementalRestore) .WithOrder(++Order) .WithGroup(0) .WithPathId(TargetPathId) - .WithTableId(TablePathId) + .WithTableId(SourcePathId) // .WithSchemaVersion(ReadVersion) // TODO(use SchemaVersion) .WithBody(body.SerializeAsString()) .WithSource(TChangeRecord::ESource::InitialScan) @@ -209,19 +234,22 @@ class TIncrementalRestoreScan return EScan::Sleep; } + // TODO also limit on PendingRecords contents to keep memory usage in reasonable limits return EScan::Feed; } + private: - TActorId Parent; - std::function ChangeSenderFactory; + const TActorId Parent; + const std::function ChangeSenderFactory; const ui64 TxId; - const TPathId TablePathId; + const TPathId SourcePathId; const TPathId TargetPathId; const TVector ValueTags; const TMaybe LastKey; const TLimits Limits; + mutable TMaybe LogPrefix; IDriver* Driver; - bool NoMoreData; + bool NoMoreData = false; TBuffer Buffer; ui64 Order = 0; TActorId ChangeSender; @@ -235,7 +263,8 @@ class TIncrementalRestoreScan THolder CreateIncrementalRestoreScan( NActors::TActorId parent, std::function changeSenderFactory, - TPathId tablePathId, + TPathId sourcePathId, + TUserTable::TCPtr table, const TPathId& targetPathId, ui64 txId) { @@ -243,7 +272,8 @@ THolder CreateIncrementalRestoreScan( parent, changeSenderFactory, txId, - tablePathId, + sourcePathId, + table, targetPathId); } diff --git a/ydb/core/tx/datashard/incr_restore_scan.h b/ydb/core/tx/datashard/incr_restore_scan.h index b549630cba32..e94c28aa9bd1 100644 --- a/ydb/core/tx/datashard/incr_restore_scan.h +++ b/ydb/core/tx/datashard/incr_restore_scan.h @@ -4,6 +4,7 @@ #include #include #include +#include #include @@ -29,6 +30,7 @@ THolder CreateIncrementalRestoreScan( NActors::TActorId parent, std::function changeSenderFactory, TPathId tablePathId, + TUserTable::TCPtr table, const TPathId& targetPathId, ui64 txId); From dad88d6b985315fb3f15c72835d2e38c395ea31b Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Fri, 13 Sep 2024 22:58:02 +0000 Subject: [PATCH 05/11] WIP --- .../datashard_ut_incremental_restore_scan.cpp | 31 +++++++++++++++++-- ydb/core/tx/datashard/incr_restore_scan.cpp | 10 +++--- ydb/core/tx/datashard/incr_restore_scan.h | 2 +- 3 files changed, 35 insertions(+), 8 deletions(-) diff --git a/ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp b/ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp index 490baf227fd5..c4e89a976650 100644 --- a/ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp +++ b/ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp @@ -1,13 +1,40 @@ #include "incr_restore_scan.h" #include +#include +#include -namespace NKikimr { +namespace NKikimr::NDataShard { Y_UNIT_TEST_SUITE(IncrementalRestoreScan) { Y_UNIT_TEST(Simple) { + TPortManager pm; + Tests::TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + Tests::TServer::TPtr server = new Tests::TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + auto sender2 = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE); + + TUserTable::TCPtr table; + TPathId targetPathId{}; + ui64 txId = 0; + + auto scan = CreateIncrementalRestoreScan( + sender, + [&](const TActorContext&) { + return sender2; + }, + TPathId{} /*sourcePathId*/, + table, + targetPathId, + txId); } } -} // namespace NKikimr +} // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/incr_restore_scan.cpp b/ydb/core/tx/datashard/incr_restore_scan.cpp index 00d19e6502ea..a342a527f701 100644 --- a/ydb/core/tx/datashard/incr_restore_scan.cpp +++ b/ydb/core/tx/datashard/incr_restore_scan.cpp @@ -41,7 +41,7 @@ class TIncrementalRestoreScan public: explicit TIncrementalRestoreScan( TActorId parent, - std::function changeSenderFactory, + std::function changeSenderFactory, ui64 txId, const TPathId& sourcePathId, TUserTable::TCPtr table, @@ -73,7 +73,7 @@ class TIncrementalRestoreScan } void Registered(TActorSystem*, const TActorId&) override { - ChangeSender = RegisterWithSameMailbox(ChangeSenderFactory()); + ChangeSender = ChangeSenderFactory(TlsActivationContext->AsActorContext()); } void PassAway() override { @@ -177,7 +177,7 @@ class TIncrementalRestoreScan } TAutoPtr Finish(EAbort abort) noexcept override { - LOG_D("Finish " << abort); + LOG_D("Finish " << static_cast(abort)); if (abort != EAbort::None) { Send(Parent, new TEvIncrementalRestoreScan::TEvFinished{}); @@ -240,7 +240,7 @@ class TIncrementalRestoreScan private: const TActorId Parent; - const std::function ChangeSenderFactory; + const std::function ChangeSenderFactory; const ui64 TxId; const TPathId SourcePathId; const TPathId TargetPathId; @@ -262,7 +262,7 @@ class TIncrementalRestoreScan THolder CreateIncrementalRestoreScan( NActors::TActorId parent, - std::function changeSenderFactory, + std::function changeSenderFactory, TPathId sourcePathId, TUserTable::TCPtr table, const TPathId& targetPathId, diff --git a/ydb/core/tx/datashard/incr_restore_scan.h b/ydb/core/tx/datashard/incr_restore_scan.h index e94c28aa9bd1..4a51be9883fc 100644 --- a/ydb/core/tx/datashard/incr_restore_scan.h +++ b/ydb/core/tx/datashard/incr_restore_scan.h @@ -28,7 +28,7 @@ struct TEvIncrementalRestoreScan { THolder CreateIncrementalRestoreScan( NActors::TActorId parent, - std::function changeSenderFactory, + std::function changeSenderFactory, TPathId tablePathId, TUserTable::TCPtr table, const TPathId& targetPathId, From 19e432feb995c33485a1764940cd994cb7d30348 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Sat, 14 Sep 2024 00:54:36 +0000 Subject: [PATCH 06/11] WIP --- .../datashard_ut_incremental_restore_scan.cpp | 39 ++++++++++++++++++- ydb/core/tx/datashard/incr_restore_scan.cpp | 1 + 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp b/ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp index c4e89a976650..572ca8676ccf 100644 --- a/ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp +++ b/ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp @@ -6,6 +6,24 @@ namespace NKikimr::NDataShard { +class TDriverMock + : public NTable::IDriver +{ +public: + void Touch(NTable::EScan) noexcept { + + } +}; + +class TCbExecutorActor : public TActorBootstrapped { +public: + std::function Cb; + + void Bootstrap() { + Cb(); + } +}; + Y_UNIT_TEST_SUITE(IncrementalRestoreScan) { Y_UNIT_TEST(Simple) { TPortManager pm; @@ -21,7 +39,15 @@ Y_UNIT_TEST_SUITE(IncrementalRestoreScan) { runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE); - TUserTable::TCPtr table; + TUserTable::TPtr table = new TUserTable; + + table->Columns.emplace(0, TUserTable::TUserColumn{}); + + NTable::TScheme::TTableSchema tableSchema; + tableSchema.Columns[0] = NTable::TColumn("test", 0, {}, ""); + tableSchema.Columns[0].KeyOrder = 0; + auto scheme = NTable::TRowScheme::Make(tableSchema.Columns, NUtil::TSecond()); + TPathId targetPathId{}; ui64 txId = 0; @@ -34,6 +60,17 @@ Y_UNIT_TEST_SUITE(IncrementalRestoreScan) { table, targetPathId, txId); + + TDriverMock driver; + auto* executor = new TCbExecutorActor; + executor->Cb = [&]() { + scan->Prepare(&driver, scheme); + }; + auto executorActor = runtime.Register(executor); + runtime.EnableScheduleForActor(executorActor); + + auto resp = runtime.GrabEdgeEventRethrow(sender); + Y_UNUSED(resp); } } diff --git a/ydb/core/tx/datashard/incr_restore_scan.cpp b/ydb/core/tx/datashard/incr_restore_scan.cpp index a342a527f701..733f517a5737 100644 --- a/ydb/core/tx/datashard/incr_restore_scan.cpp +++ b/ydb/core/tx/datashard/incr_restore_scan.cpp @@ -57,6 +57,7 @@ class TIncrementalRestoreScan {} static TVector InitValueTags(TUserTable::TCPtr table) { + Y_VERIFY(table->Columns.size() >= 1); TVector valueTags; valueTags.reserve(table->Columns.size() - 1); for (const auto& [tag, column] : table->Columns) { From 6ce33f860904ee043f07ea37572b166c0aeeee79 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Sun, 15 Sep 2024 00:31:43 +0000 Subject: [PATCH 07/11] crafted --- .../datashard_ut_incremental_restore_scan.cpp | 114 ++++++++++++++++-- 1 file changed, 101 insertions(+), 13 deletions(-) diff --git a/ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp b/ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp index 572ca8676ccf..a6722fd1e23f 100644 --- a/ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp +++ b/ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp @@ -10,22 +10,99 @@ class TDriverMock : public NTable::IDriver { public: - void Touch(NTable::EScan) noexcept { + std::optional LastScan; + void Touch(NTable::EScan scan) noexcept { + LastScan = scan; } }; class TCbExecutorActor : public TActorBootstrapped { public: - std::function Cb; + enum EEv { + EvExec = EventSpaceBegin(TKikimrEvents::ES_PRIVATE), + EvBoot, + EvExecuted, + + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_PRIVATE), + "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_PRIVATE)"); + + struct TEvExec : public TEventLocal { + std::function OnHandle; + bool Async; + + TEvExec(std::function onHandle, bool async = true) + : OnHandle(onHandle) + , Async(async) + {} + }; + + struct TEvBoot : public TEventLocal {}; + struct TEvExecuted : public TEventLocal {}; + + std::function OnBootstrap; + TActorId ReplyTo; + TActorId ForwardTo; void Bootstrap() { - Cb(); + if (OnBootstrap) { + OnBootstrap(); + } + + Become(&TThis::Serve); + Send(ReplyTo, new TCbExecutorActor::TEvBoot()); + } + + void Handle(TEvExec::TPtr& ev) { + ev->Get()->OnHandle(); + if (!ev->Get()->Async) { + Send(ReplyTo, new TCbExecutorActor::TEvExecuted()); + } + } + + STATEFN(Serve) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvExec, Handle); + default: Y_ABORT("unexpected"); + } } }; +class TRuntimeCbExecutor { +public: + TRuntimeCbExecutor(TTestActorRuntime& runtime, std::function onBootstrap = {}, TActorId forwardTo = {}) + : Runtime(runtime) + , Sender(runtime.AllocateEdgeActor()) + { + auto* executor = new TCbExecutorActor; + executor->OnBootstrap = onBootstrap; + executor->ForwardTo = forwardTo; + executor->ReplyTo = Sender; + Impl = runtime.Register(executor); + Runtime.EnableScheduleForActor(Impl); + Runtime.GrabEdgeEventRethrow(Sender); + } + + void AsyncExecute(std::function cb) { + Runtime.Send(new IEventHandle(Impl, Sender, new TCbExecutorActor::TEvExec(cb), 0, 0), 0); + } + + void Execute(std::function cb) { + Runtime.Send(new IEventHandle(Impl, Sender, new TCbExecutorActor::TEvExec(cb, false), 0, 0), 0); + Runtime.GrabEdgeEventRethrow(Sender); + } + +private: + TTestActorRuntime& Runtime; + TActorId Sender; + TActorId Impl; +}; + Y_UNIT_TEST_SUITE(IncrementalRestoreScan) { - Y_UNIT_TEST(Simple) { + Y_UNIT_TEST(Empty) { TPortManager pm; Tests::TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") @@ -51,7 +128,7 @@ Y_UNIT_TEST_SUITE(IncrementalRestoreScan) { TPathId targetPathId{}; ui64 txId = 0; - auto scan = CreateIncrementalRestoreScan( + auto* scan = CreateIncrementalRestoreScan( sender, [&](const TActorContext&) { return sender2; @@ -59,18 +136,29 @@ Y_UNIT_TEST_SUITE(IncrementalRestoreScan) { TPathId{} /*sourcePathId*/, table, targetPathId, - txId); + txId).Release(); TDriverMock driver; - auto* executor = new TCbExecutorActor; - executor->Cb = [&]() { + + // later we can use driver, scan and scheme ONLY with additional sync, e.g. from actorExec to avoid races + TRuntimeCbExecutor actorExec(runtime, [&]() { scan->Prepare(&driver, scheme); - }; - auto executorActor = runtime.Register(executor); - runtime.EnableScheduleForActor(executorActor); + }); + + actorExec.Execute([&]() { + UNIT_ASSERT_EQUAL(scan->Exhausted(), NTable::EScan::Sleep); + }); + + auto resp = runtime.GrabEdgeEventRethrow(sender2); + + runtime.Send(new IEventHandle(resp->Sender, sender2, new TEvIncrementalRestoreScan::TEvFinished(), 0, 0), 0); + + actorExec.Execute([&]() { + UNIT_ASSERT(driver.LastScan && *driver.LastScan == NTable::EScan::Final); + scan->Finish(NTable::EAbort::None); + }); - auto resp = runtime.GrabEdgeEventRethrow(sender); - Y_UNUSED(resp); + runtime.GrabEdgeEventRethrow(sender); } } From bc9c4ee95371ea9af25a1e36f3f4c6a4b46ae2e0 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Sun, 15 Sep 2024 15:58:08 +0000 Subject: [PATCH 08/11] done --- .../datashard_ut_incremental_restore_scan.cpp | 21 +++++++----- ydb/core/tx/datashard/incr_restore_scan.cpp | 32 ++++++++++++------- ydb/core/tx/datashard/incr_restore_scan.h | 8 +++-- 3 files changed, 38 insertions(+), 23 deletions(-) diff --git a/ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp b/ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp index a6722fd1e23f..405f422ed21e 100644 --- a/ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp +++ b/ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp @@ -117,26 +117,31 @@ Y_UNIT_TEST_SUITE(IncrementalRestoreScan) { runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE); TUserTable::TPtr table = new TUserTable; - - table->Columns.emplace(0, TUserTable::TUserColumn{}); - NTable::TScheme::TTableSchema tableSchema; - tableSchema.Columns[0] = NTable::TColumn("test", 0, {}, ""); + table->Columns.emplace(0, TUserTable::TUserColumn(NScheme::TTypeInfo(NScheme::NTypeIds::Uint32), "", "Key", true)); + tableSchema.Columns[0] = NTable::TColumn("key", 0, {}, ""); tableSchema.Columns[0].KeyOrder = 0; + + table->Columns.emplace(1, TUserTable::TUserColumn(NScheme::TTypeInfo(NScheme::NTypeIds::Bool), "", "__ydb_incrBackupImpl_deleted", false)); + tableSchema.Columns[1] = NTable::TColumn("__ydb_incrBackupImpl_deleted", 1, {}, ""); + tableSchema.Columns[1].KeyOrder = 1; + auto scheme = NTable::TRowScheme::Make(tableSchema.Columns, NUtil::TSecond()); - TPathId targetPathId{}; - ui64 txId = 0; + TPathId sourcePathId{1, 2}; + TPathId targetPathId{3, 4}; + ui64 txId = 1337; auto* scan = CreateIncrementalRestoreScan( sender, [&](const TActorContext&) { return sender2; }, - TPathId{} /*sourcePathId*/, + sourcePathId, table, targetPathId, - txId).Release(); + txId, + {}).Release(); TDriverMock driver; diff --git a/ydb/core/tx/datashard/incr_restore_scan.cpp b/ydb/core/tx/datashard/incr_restore_scan.cpp index 733f517a5737..a0193b7e26fb 100644 --- a/ydb/core/tx/datashard/incr_restore_scan.cpp +++ b/ydb/core/tx/datashard/incr_restore_scan.cpp @@ -2,15 +2,14 @@ #include "change_exchange_impl.h" #include "datashard_impl.h" -#include -#include -#include -#include #include #include -#include -#include +#include +#include +#include #include +#include +#include namespace NKikimr::NDataShard { @@ -45,27 +44,34 @@ class TIncrementalRestoreScan ui64 txId, const TPathId& sourcePathId, TUserTable::TCPtr table, - const TPathId& targetPathId) - : IActorCallback(static_cast(&TIncrementalRestoreScan::StateWork), NKikimrServices::TActivity::CDC_STREAM_SCAN_ACTOR) + const TPathId& targetPathId, + NStreamScan::TLimits limits) + : IActorCallback(static_cast(&TIncrementalRestoreScan::StateWork), NKikimrServices::TActivity::INCREMENTAL_RESTORE_SCAN_ACTOR) , Parent(parent) , ChangeSenderFactory(changeSenderFactory) , TxId(txId) , SourcePathId(sourcePathId) , TargetPathId(targetPathId) , ValueTags(InitValueTags(table)) - , Limits() + , Limits(limits) {} static TVector InitValueTags(TUserTable::TCPtr table) { - Y_VERIFY(table->Columns.size() >= 1); + Y_VERIFY(table->Columns.size() >= 2); TVector valueTags; valueTags.reserve(table->Columns.size() - 1); + bool deletedMarkerColumnFound = false; for (const auto& [tag, column] : table->Columns) { if (!column.IsKey) { valueTags.push_back(tag); + if (column.Name == "__ydb_incrBackupImpl_deleted") { + deletedMarkerColumnFound = true; + } } } + Y_VERIFY(deletedMarkerColumnFound); + return valueTags; } @@ -267,7 +273,8 @@ THolder CreateIncrementalRestoreScan( TPathId sourcePathId, TUserTable::TCPtr table, const TPathId& targetPathId, - ui64 txId) + ui64 txId, + NStreamScan::TLimits limits) { return MakeHolder( parent, @@ -275,7 +282,8 @@ THolder CreateIncrementalRestoreScan( txId, sourcePathId, table, - targetPathId); + targetPathId, + limits); } } // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/incr_restore_scan.h b/ydb/core/tx/datashard/incr_restore_scan.h index 4a51be9883fc..5e451c8ed78f 100644 --- a/ydb/core/tx/datashard/incr_restore_scan.h +++ b/ydb/core/tx/datashard/incr_restore_scan.h @@ -1,10 +1,11 @@ #pragma once +#include #include #include -#include -#include #include +#include +#include #include @@ -32,6 +33,7 @@ THolder CreateIncrementalRestoreScan( TPathId tablePathId, TUserTable::TCPtr table, const TPathId& targetPathId, - ui64 txId); + ui64 txId, + NStreamScan::TLimits limits); } // namespace NKikimr::NDataShard From bfc217394f7c84e99b5d33997aa8755281f78840 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Sun, 15 Sep 2024 17:49:42 +0000 Subject: [PATCH 09/11] fix --- ydb/core/tx/datashard/change_exchange_impl.h | 2 -- ydb/core/tx/datashard/incr_restore_scan.cpp | 12 +++++++----- ydb/core/tx/datashard/incr_restore_scan.h | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/ydb/core/tx/datashard/change_exchange_impl.h b/ydb/core/tx/datashard/change_exchange_impl.h index 591748a5796b..b6af2aea263e 100644 --- a/ydb/core/tx/datashard/change_exchange_impl.h +++ b/ydb/core/tx/datashard/change_exchange_impl.h @@ -3,8 +3,6 @@ #include "defs.h" #include "change_exchange_helpers.h" -#include - namespace NKikimr { namespace NDataShard { diff --git a/ydb/core/tx/datashard/incr_restore_scan.cpp b/ydb/core/tx/datashard/incr_restore_scan.cpp index a0193b7e26fb..7bbb6fa5f784 100644 --- a/ydb/core/tx/datashard/incr_restore_scan.cpp +++ b/ydb/core/tx/datashard/incr_restore_scan.cpp @@ -31,7 +31,7 @@ class TIncrementalRestoreScan << "[TIncrementalRestoreScan]" << "[" << TxId << "]" << "[" << SourcePathId << "]" - << "[" << SourcePathId << "]" + << "[" << TargetPathId << "]" << SelfId() /* contains brackets */ << " "; } @@ -41,10 +41,10 @@ class TIncrementalRestoreScan explicit TIncrementalRestoreScan( TActorId parent, std::function changeSenderFactory, - ui64 txId, const TPathId& sourcePathId, TUserTable::TCPtr table, const TPathId& targetPathId, + ui64 txId, NStreamScan::TLimits limits) : IActorCallback(static_cast(&TIncrementalRestoreScan::StateWork), NKikimrServices::TActivity::INCREMENTAL_RESTORE_SCAN_ACTOR) , Parent(parent) @@ -54,6 +54,9 @@ class TIncrementalRestoreScan , TargetPathId(targetPathId) , ValueTags(InitValueTags(table)) , Limits(limits) + , Columns(table->Columns) + , KeyColumnTypes(table->KeyColumnTypes) + , KeyColumnIds(table->KeyColumnIds) {} static TVector InitValueTags(TUserTable::TCPtr table) { @@ -154,7 +157,6 @@ class TIncrementalRestoreScan } EScan Feed(TArrayRef key, const TRow& row) noexcept override { - Buffer.AddRow(key, *row); if (Buffer.Bytes() < Limits.BatchMaxBytes) { if (Buffer.Rows() < Limits.BatchMaxRows) { @@ -270,7 +272,7 @@ class TIncrementalRestoreScan THolder CreateIncrementalRestoreScan( NActors::TActorId parent, std::function changeSenderFactory, - TPathId sourcePathId, + const TPathId& sourcePathId, TUserTable::TCPtr table, const TPathId& targetPathId, ui64 txId, @@ -279,10 +281,10 @@ THolder CreateIncrementalRestoreScan( return MakeHolder( parent, changeSenderFactory, - txId, sourcePathId, table, targetPathId, + txId, limits); } diff --git a/ydb/core/tx/datashard/incr_restore_scan.h b/ydb/core/tx/datashard/incr_restore_scan.h index 5e451c8ed78f..a53c39bce987 100644 --- a/ydb/core/tx/datashard/incr_restore_scan.h +++ b/ydb/core/tx/datashard/incr_restore_scan.h @@ -30,7 +30,7 @@ struct TEvIncrementalRestoreScan { THolder CreateIncrementalRestoreScan( NActors::TActorId parent, std::function changeSenderFactory, - TPathId tablePathId, + const TPathId& sourcePathId, TUserTable::TCPtr table, const TPathId& targetPathId, ui64 txId, From 8e6f347e52856c5da6087d3e259a95604c734a5d Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Sun, 15 Sep 2024 18:55:47 +0000 Subject: [PATCH 10/11] fix --- ydb/core/tx/datashard/incr_restore_scan.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/core/tx/datashard/incr_restore_scan.cpp b/ydb/core/tx/datashard/incr_restore_scan.cpp index 7bbb6fa5f784..d425e700b15b 100644 --- a/ydb/core/tx/datashard/incr_restore_scan.cpp +++ b/ydb/core/tx/datashard/incr_restore_scan.cpp @@ -99,7 +99,7 @@ class TIncrementalRestoreScan } void Handle(NChangeExchange::TEvChangeExchange::TEvRequestRecords::TPtr& ev) { - LOG_D("Handle TEvChangeExchange::TEvRequestRecords " << ev->Get()->ToString()); + LOG_D("Handle " << ev->Get()->ToString()); TVector records(::Reserve(ev->Get()->Records.size())); @@ -114,7 +114,7 @@ class TIncrementalRestoreScan } void Handle(NChangeExchange::TEvChangeExchange::TEvRemoveRecords::TPtr& ev) { - LOG_D("Handle TEvChangeExchange::TEvRemoveRecords " << ev->Get()->ToString()); + LOG_D("Handle " << ev->Get()->ToString()); for (auto recordId : ev->Get()->Records) { PendingRecords.erase(recordId); From 86e59070611c154238b8f4dd3a0b04e5b884aca2 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Sun, 15 Sep 2024 19:37:28 +0000 Subject: [PATCH 11/11] fix --- ydb/core/tx/datashard/incr_restore_scan.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ydb/core/tx/datashard/incr_restore_scan.cpp b/ydb/core/tx/datashard/incr_restore_scan.cpp index d425e700b15b..0fbe2f156b8c 100644 --- a/ydb/core/tx/datashard/incr_restore_scan.cpp +++ b/ydb/core/tx/datashard/incr_restore_scan.cpp @@ -109,8 +109,7 @@ class TIncrementalRestoreScan records.emplace_back(it->second); } - Send(ChangeSender, new NChangeExchange::TEvChangeExchange::TEvRecords( - std::make_shared>(std::move(records)))); + Send(ChangeSender, new NChangeExchange::TEvChangeExchange::TEvRecords(std::move(records))); } void Handle(NChangeExchange::TEvChangeExchange::TEvRemoveRecords::TPtr& ev) {