diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index 8a83f92c6708..dbfa09a39f24 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -181,6 +181,7 @@ struct TKikimrEvents : TEvents { ES_LIMITER = 4258, ES_MEMORY = 4259, ES_GROUPED_ALLOCATIONS_MANAGER = 4260, + ES_INCREMENTAL_RESTORE_SCAN = 4261, }; }; diff --git a/ydb/core/tx/datashard/cdc_stream_scan.cpp b/ydb/core/tx/datashard/cdc_stream_scan.cpp index aa53a055ac8d..b9eef0f1c642 100644 --- a/ydb/core/tx/datashard/cdc_stream_scan.cpp +++ b/ydb/core/tx/datashard/cdc_stream_scan.cpp @@ -276,7 +276,7 @@ class TDataShard::TTxCdcStreamScanProgress bool pageFault = false; for (const auto& [k, v] : ev.Rows) { - const auto key = NStreamScan::MakeKey(k.GetCells(), table); + const auto key = NStreamScan::MakeKey(k.GetCells(), table->KeyColumnTypes); const auto& keyTags = table->KeyColumnIds; TRowState row(0); @@ -299,7 +299,7 @@ class TDataShard::TTxCdcStreamScanProgress Serialize(body, ERowOp::Upsert, key, keyTags, MakeUpdates(v.GetCells(), valueTags, table)); break; case NKikimrSchemeOp::ECdcStreamModeRestoreIncrBackup: - if (auto updates = NIncrRestoreHelpers::MakeRestoreUpdates(v.GetCells(), valueTags, table); updates) { + if (auto updates = NIncrRestoreHelpers::MakeRestoreUpdates(v.GetCells(), valueTags, table->Columns); updates) { Serialize(body, ERowOp::Upsert, key, keyTags, *updates); } else { Serialize(body, ERowOp::Erase, key, keyTags, {}); diff --git a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp index 9924346ee6ca..9744a9c0ac5e 100644 --- a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp +++ b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp @@ -168,7 +168,6 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { return proto; } - Y_UNIT_TEST(SimpleBackup) { TPortManager portManager; TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) diff --git a/ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp b/ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp new file mode 100644 index 000000000000..405f422ed21e --- /dev/null +++ b/ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp @@ -0,0 +1,170 @@ +#include "incr_restore_scan.h" + +#include +#include +#include + +namespace NKikimr::NDataShard { + +class TDriverMock + : public NTable::IDriver +{ +public: + std::optional LastScan; + + void Touch(NTable::EScan scan) noexcept { + LastScan = scan; + } +}; + +class TCbExecutorActor : public TActorBootstrapped { +public: + enum EEv { + EvExec = EventSpaceBegin(TKikimrEvents::ES_PRIVATE), + EvBoot, + EvExecuted, + + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_PRIVATE), + "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_PRIVATE)"); + + struct TEvExec : public TEventLocal { + std::function OnHandle; + bool Async; + + TEvExec(std::function onHandle, bool async = true) + : OnHandle(onHandle) + , Async(async) + {} + }; + + struct TEvBoot : public TEventLocal {}; + struct TEvExecuted : public TEventLocal {}; + + std::function OnBootstrap; + TActorId ReplyTo; + TActorId ForwardTo; + + void Bootstrap() { + if (OnBootstrap) { + OnBootstrap(); + } + + Become(&TThis::Serve); + Send(ReplyTo, new TCbExecutorActor::TEvBoot()); + } + + void Handle(TEvExec::TPtr& ev) { + ev->Get()->OnHandle(); + if (!ev->Get()->Async) { + Send(ReplyTo, new TCbExecutorActor::TEvExecuted()); + } + } + + STATEFN(Serve) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvExec, Handle); + default: Y_ABORT("unexpected"); + } + } +}; + +class TRuntimeCbExecutor { +public: + TRuntimeCbExecutor(TTestActorRuntime& runtime, std::function onBootstrap = {}, TActorId forwardTo = {}) + : Runtime(runtime) + , Sender(runtime.AllocateEdgeActor()) + { + auto* executor = new TCbExecutorActor; + executor->OnBootstrap = onBootstrap; + executor->ForwardTo = forwardTo; + executor->ReplyTo = Sender; + Impl = runtime.Register(executor); + Runtime.EnableScheduleForActor(Impl); + Runtime.GrabEdgeEventRethrow(Sender); + } + + void AsyncExecute(std::function cb) { + Runtime.Send(new IEventHandle(Impl, Sender, new TCbExecutorActor::TEvExec(cb), 0, 0), 0); + } + + void Execute(std::function cb) { + Runtime.Send(new IEventHandle(Impl, Sender, new TCbExecutorActor::TEvExec(cb, false), 0, 0), 0); + Runtime.GrabEdgeEventRethrow(Sender); + } + +private: + TTestActorRuntime& Runtime; + TActorId Sender; + TActorId Impl; +}; + +Y_UNIT_TEST_SUITE(IncrementalRestoreScan) { + Y_UNIT_TEST(Empty) { + TPortManager pm; + Tests::TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new Tests::TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + auto sender2 = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE); + + TUserTable::TPtr table = new TUserTable; + NTable::TScheme::TTableSchema tableSchema; + table->Columns.emplace(0, TUserTable::TUserColumn(NScheme::TTypeInfo(NScheme::NTypeIds::Uint32), "", "Key", true)); + tableSchema.Columns[0] = NTable::TColumn("key", 0, {}, ""); + tableSchema.Columns[0].KeyOrder = 0; + + table->Columns.emplace(1, TUserTable::TUserColumn(NScheme::TTypeInfo(NScheme::NTypeIds::Bool), "", "__ydb_incrBackupImpl_deleted", false)); + tableSchema.Columns[1] = NTable::TColumn("__ydb_incrBackupImpl_deleted", 1, {}, ""); + tableSchema.Columns[1].KeyOrder = 1; + + auto scheme = NTable::TRowScheme::Make(tableSchema.Columns, NUtil::TSecond()); + + TPathId sourcePathId{1, 2}; + TPathId targetPathId{3, 4}; + ui64 txId = 1337; + + auto* scan = CreateIncrementalRestoreScan( + sender, + [&](const TActorContext&) { + return sender2; + }, + sourcePathId, + table, + targetPathId, + txId, + {}).Release(); + + TDriverMock driver; + + // later we can use driver, scan and scheme ONLY with additional sync, e.g. from actorExec to avoid races + TRuntimeCbExecutor actorExec(runtime, [&]() { + scan->Prepare(&driver, scheme); + }); + + actorExec.Execute([&]() { + UNIT_ASSERT_EQUAL(scan->Exhausted(), NTable::EScan::Sleep); + }); + + auto resp = runtime.GrabEdgeEventRethrow(sender2); + + runtime.Send(new IEventHandle(resp->Sender, sender2, new TEvIncrementalRestoreScan::TEvFinished(), 0, 0), 0); + + actorExec.Execute([&]() { + UNIT_ASSERT(driver.LastScan && *driver.LastScan == NTable::EScan::Final); + scan->Finish(NTable::EAbort::None); + }); + + runtime.GrabEdgeEventRethrow(sender); + } +} + +} // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/incr_restore_helpers.cpp b/ydb/core/tx/datashard/incr_restore_helpers.cpp index 12bc6e46a1c8..88aa55c1c099 100644 --- a/ydb/core/tx/datashard/incr_restore_helpers.cpp +++ b/ydb/core/tx/datashard/incr_restore_helpers.cpp @@ -2,7 +2,7 @@ namespace NKikimr::NDataShard::NIncrRestoreHelpers { -std::optional> MakeRestoreUpdates(TArrayRef cells, TArrayRef tags, TUserTable::TCPtr table) { +std::optional> MakeRestoreUpdates(TArrayRef cells, TArrayRef tags, const TMap& columns) { Y_ABORT_UNLESS(cells.size() >= 1); TVector updates(::Reserve(cells.size() - 1)); @@ -10,8 +10,8 @@ std::optional> MakeRestoreUpdates(TArrayRef cell Y_ABORT_UNLESS(cells.size() == tags.size()); for (TPos pos = 0; pos < cells.size(); ++pos) { const auto tag = tags.at(pos); - auto it = table->Columns.find(tag); - Y_ABORT_UNLESS(it != table->Columns.end()); + auto it = columns.find(tag); + Y_ABORT_UNLESS(it != columns.end()); if (it->second.Name == "__ydb_incrBackupImpl_deleted") { if (const auto& cell = cells.at(pos); !cell.IsNull() && cell.AsValue()) { return std::nullopt; diff --git a/ydb/core/tx/datashard/incr_restore_helpers.h b/ydb/core/tx/datashard/incr_restore_helpers.h index a011283d0ec0..a33031257069 100644 --- a/ydb/core/tx/datashard/incr_restore_helpers.h +++ b/ydb/core/tx/datashard/incr_restore_helpers.h @@ -11,6 +11,6 @@ namespace NKikimr::NDataShard::NIncrRestoreHelpers { using namespace NTable; -std::optional> MakeRestoreUpdates(TArrayRef cells, TArrayRef tags, TUserTable::TCPtr table); +std::optional> MakeRestoreUpdates(TArrayRef cells, TArrayRef tags, const TMap& columns); } // namespace NKikimr::NBackup::NImpl diff --git a/ydb/core/tx/datashard/incr_restore_scan.cpp b/ydb/core/tx/datashard/incr_restore_scan.cpp new file mode 100644 index 000000000000..0fbe2f156b8c --- /dev/null +++ b/ydb/core/tx/datashard/incr_restore_scan.cpp @@ -0,0 +1,290 @@ +#include "incr_restore_scan.h" +#include "change_exchange_impl.h" +#include "datashard_impl.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace NKikimr::NDataShard { + +using namespace NActors; +using namespace NTable; + +class TIncrementalRestoreScan + : public IActorCallback + , public NTable::IScan + , protected TChangeRecordBodySerializer +{ + using TLimits = NStreamScan::TLimits; + using TBuffer = NStreamScan::TBuffer; + using TChange = IDataShardChangeCollector::TChange; + + TStringBuf GetLogPrefix() const { + if (!LogPrefix) { + LogPrefix = TStringBuilder() + << "[TIncrementalRestoreScan]" + << "[" << TxId << "]" + << "[" << SourcePathId << "]" + << "[" << TargetPathId << "]" + << SelfId() /* contains brackets */ << " "; + } + + return LogPrefix.GetRef(); + } +public: + explicit TIncrementalRestoreScan( + TActorId parent, + std::function changeSenderFactory, + const TPathId& sourcePathId, + TUserTable::TCPtr table, + const TPathId& targetPathId, + ui64 txId, + NStreamScan::TLimits limits) + : IActorCallback(static_cast(&TIncrementalRestoreScan::StateWork), NKikimrServices::TActivity::INCREMENTAL_RESTORE_SCAN_ACTOR) + , Parent(parent) + , ChangeSenderFactory(changeSenderFactory) + , TxId(txId) + , SourcePathId(sourcePathId) + , TargetPathId(targetPathId) + , ValueTags(InitValueTags(table)) + , Limits(limits) + , Columns(table->Columns) + , KeyColumnTypes(table->KeyColumnTypes) + , KeyColumnIds(table->KeyColumnIds) + {} + + static TVector InitValueTags(TUserTable::TCPtr table) { + Y_VERIFY(table->Columns.size() >= 2); + TVector valueTags; + valueTags.reserve(table->Columns.size() - 1); + bool deletedMarkerColumnFound = false; + for (const auto& [tag, column] : table->Columns) { + if (!column.IsKey) { + valueTags.push_back(tag); + if (column.Name == "__ydb_incrBackupImpl_deleted") { + deletedMarkerColumnFound = true; + } + } + } + + Y_VERIFY(deletedMarkerColumnFound); + + return valueTags; + } + + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::INCREMENTAL_RESTORE_SCAN_ACTOR; + } + + void Registered(TActorSystem*, const TActorId&) override { + ChangeSender = ChangeSenderFactory(TlsActivationContext->AsActorContext()); + } + + void PassAway() override { + Send(ChangeSender, new TEvents::TEvPoisonPill()); + + IActorCallback::PassAway(); + } + + void Start(TEvents::TEvWakeup::TPtr& ev) { + LOG_D("Handle TEvents::TEvWakeup " << ev->Get()->ToString()); + + Driver->Touch(EScan::Feed); + } + + void Handle(NChangeExchange::TEvChangeExchange::TEvRequestRecords::TPtr& ev) { + LOG_D("Handle " << ev->Get()->ToString()); + + TVector records(::Reserve(ev->Get()->Records.size())); + + for (const auto& record : ev->Get()->Records) { + auto it = PendingRecords.find(record.Order); + Y_ABORT_UNLESS(it != PendingRecords.end()); + records.emplace_back(it->second); + } + + Send(ChangeSender, new NChangeExchange::TEvChangeExchange::TEvRecords(std::move(records))); + } + + void Handle(NChangeExchange::TEvChangeExchange::TEvRemoveRecords::TPtr& ev) { + LOG_D("Handle " << ev->Get()->ToString()); + + for (auto recordId : ev->Get()->Records) { + PendingRecords.erase(recordId); + } + } + + void Handle(TEvIncrementalRestoreScan::TEvFinished::TPtr& ev) { + LOG_D("Handle TEvIncrementalRestoreScan::TEvFinished " << ev->Get()->ToString()); + + Driver->Touch(EScan::Final); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvents::TEvWakeup, Start); + hFunc(NChangeExchange::TEvChangeExchange::TEvRequestRecords, Handle); + hFunc(NChangeExchange::TEvChangeExchange::TEvRemoveRecords, Handle); + hFunc(TEvIncrementalRestoreScan::TEvFinished, Handle); + } + } + + IScan::TInitialState Prepare(IDriver* driver, TIntrusiveConstPtr scheme) noexcept override { + TlsActivationContext->AsActorContext().RegisterWithSameMailbox(this); + Driver = driver; + Y_ABORT_UNLESS(!LastKey || LastKey->GetCells().size() == scheme->Tags(true).size()); + + return {EScan::Sleep, {}}; + } + + EScan Seek(TLead& lead, ui64) noexcept override { + LOG_D("Seek"); + + if (LastKey) { + lead.To(ValueTags, LastKey->GetCells(), ESeek::Upper); + } else { + lead.To(ValueTags, {}, ESeek::Lower); + } + + return EScan::Feed; + } + + EScan Feed(TArrayRef key, const TRow& row) noexcept override { + Buffer.AddRow(key, *row); + if (Buffer.Bytes() < Limits.BatchMaxBytes) { + if (Buffer.Rows() < Limits.BatchMaxRows) { + return EScan::Feed; + } + } else { + if (Buffer.Rows() < Limits.BatchMinRows) { + return EScan::Feed; + } + } + + Progress(); + return EScan::Sleep; + } + + EScan Exhausted() noexcept override { + LOG_D("Exhausted"); + + NoMoreData = true; + + if (!Buffer) { + Send(ChangeSender, new TEvIncrementalRestoreScan::TEvNoMoreData()); + return EScan::Sleep; + } + + return Progress(); + } + + TAutoPtr Finish(EAbort abort) noexcept override { + LOG_D("Finish " << static_cast(abort)); + + if (abort != EAbort::None) { + Send(Parent, new TEvIncrementalRestoreScan::TEvFinished{}); + } else { + Send(Parent, new TEvIncrementalRestoreScan::TEvFinished{}); + } + + PassAway(); + return nullptr; + } + + void Describe(IOutputStream& o) const noexcept override { + o << "IncrRestoreScan {" + << " TxId: " << TxId + << " SourcePathId: " << SourcePathId + << " TargetPathId: " << TargetPathId + << " }"; + } + + EScan Progress() { + auto rows = Buffer.Flush(); + TVector changeRecords; + TVector records; + + for (auto& [k, v] : rows) { + const auto key = NStreamScan::MakeKey(k.GetCells(), KeyColumnTypes); + const auto& keyTags = KeyColumnIds; + NKikimrChangeExchange::TDataChange body; + if (auto updates = NIncrRestoreHelpers::MakeRestoreUpdates(v.GetCells(), ValueTags, Columns); updates) { + Serialize(body, ERowOp::Upsert, key, keyTags, *updates); + } else { + Serialize(body, ERowOp::Erase, key, keyTags, {}); + } + auto recordPtr = TChangeRecordBuilder(TChangeRecord::EKind::IncrementalRestore) + .WithOrder(++Order) + .WithGroup(0) + .WithPathId(TargetPathId) + .WithTableId(SourcePathId) + // .WithSchemaVersion(ReadVersion) // TODO(use SchemaVersion) + .WithBody(body.SerializeAsString()) + .WithSource(TChangeRecord::ESource::InitialScan) + .Build(); + + const auto& record = *recordPtr; + + records.emplace_back(record.GetOrder(), record.GetPathId(), record.GetBody().size()); + PendingRecords.emplace(record.GetOrder(), recordPtr); + } + + Send(ChangeSender, new NChangeExchange::TEvChangeExchange::TEvEnqueueRecords(records)); + + if (NoMoreData) { + Send(ChangeSender, new TEvIncrementalRestoreScan::TEvNoMoreData()); + return EScan::Sleep; + } + + // TODO also limit on PendingRecords contents to keep memory usage in reasonable limits + return EScan::Feed; + } + +private: + const TActorId Parent; + const std::function ChangeSenderFactory; + const ui64 TxId; + const TPathId SourcePathId; + const TPathId TargetPathId; + const TVector ValueTags; + const TMaybe LastKey; + const TLimits Limits; + mutable TMaybe LogPrefix; + IDriver* Driver; + bool NoMoreData = false; + TBuffer Buffer; + ui64 Order = 0; + TActorId ChangeSender; + TMap PendingRecords; + + TMap Columns; + TVector KeyColumnTypes; + TVector KeyColumnIds; +}; + +THolder CreateIncrementalRestoreScan( + NActors::TActorId parent, + std::function changeSenderFactory, + const TPathId& sourcePathId, + TUserTable::TCPtr table, + const TPathId& targetPathId, + ui64 txId, + NStreamScan::TLimits limits) +{ + return MakeHolder( + parent, + changeSenderFactory, + sourcePathId, + table, + targetPathId, + txId, + limits); +} + +} // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/incr_restore_scan.h b/ydb/core/tx/datashard/incr_restore_scan.h new file mode 100644 index 000000000000..a53c39bce987 --- /dev/null +++ b/ydb/core/tx/datashard/incr_restore_scan.h @@ -0,0 +1,39 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include + +namespace NKikimr::NDataShard { + +using namespace NActors; + +struct TEvIncrementalRestoreScan { + enum EEv { + EvNoMoreData = EventSpaceBegin(TKikimrEvents::ES_INCREMENTAL_RESTORE_SCAN), + EvFinished, + + EvEnd, + }; + + static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_INCREMENTAL_RESTORE_SCAN)); + + struct TEvNoMoreData: public TEventLocal {}; + struct TEvFinished: public TEventLocal {}; +}; + +THolder CreateIncrementalRestoreScan( + NActors::TActorId parent, + std::function changeSenderFactory, + const TPathId& sourcePathId, + TUserTable::TCPtr table, + const TPathId& targetPathId, + ui64 txId, + NStreamScan::TLimits limits); + +} // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/stream_scan_common.cpp b/ydb/core/tx/datashard/stream_scan_common.cpp index 8c2f1315745d..519c605151a6 100644 --- a/ydb/core/tx/datashard/stream_scan_common.cpp +++ b/ydb/core/tx/datashard/stream_scan_common.cpp @@ -13,12 +13,12 @@ TLimits::TLimits(const NKikimrTxDataShard::TEvCdcStreamScanRequest_TLimits& prot { } -TVector MakeKey(TArrayRef cells, TUserTable::TCPtr table) { +TVector MakeKey(TArrayRef cells, const TVector& keyColumnTypes) { TVector key(Reserve(cells.size())); - Y_ABORT_UNLESS(cells.size() == table->KeyColumnTypes.size()); + Y_ABORT_UNLESS(cells.size() == keyColumnTypes.size()); for (TPos pos = 0; pos < cells.size(); ++pos) { - key.emplace_back(cells.at(pos).AsRef(), table->KeyColumnTypes.at(pos)); + key.emplace_back(cells.at(pos).AsRef(), keyColumnTypes.at(pos)); } return key; diff --git a/ydb/core/tx/datashard/stream_scan_common.h b/ydb/core/tx/datashard/stream_scan_common.h index cc67a17e91d6..8b0728db9360 100644 --- a/ydb/core/tx/datashard/stream_scan_common.h +++ b/ydb/core/tx/datashard/stream_scan_common.h @@ -13,7 +13,7 @@ class TEvCdcStreamScanRequest_TLimits; namespace NKikimr::NDataShard::NStreamScan { -TVector MakeKey(TArrayRef cells, TUserTable::TCPtr table); +TVector MakeKey(TArrayRef cells, const TVector& keyColumnTypes); struct TLimits { ui32 BatchMaxBytes; @@ -21,6 +21,7 @@ struct TLimits { ui32 BatchMaxRows; TLimits(const NKikimrTxDataShard::TEvCdcStreamScanRequest_TLimits& proto); + TLimits() = default; }; class TBuffer { diff --git a/ydb/core/tx/datashard/ut_incremental_restore_scan/ya.make b/ydb/core/tx/datashard/ut_incremental_restore_scan/ya.make new file mode 100644 index 000000000000..0067f0623b5e --- /dev/null +++ b/ydb/core/tx/datashard/ut_incremental_restore_scan/ya.make @@ -0,0 +1,42 @@ +UNITTEST_FOR(ydb/core/tx/datashard) + +FORK_SUBTESTS() + +SPLIT_FACTOR(4) + +IF (SANITIZER_TYPE) + REQUIREMENTS(ram:32) +ENDIF() + +IF (SANITIZER_TYPE == "thread" OR WITH_VALGRIND) + TIMEOUT(3600) + SIZE(LARGE) + TAG(ya:fat) +ELSE() + TIMEOUT(600) + SIZE(MEDIUM) +ENDIF() + +PEERDIR( + ydb/core/tx/datashard/ut_common + library/cpp/getopt + library/cpp/regex/pcre + library/cpp/svnversion + ydb/core/kqp/ut/common + ydb/core/testlib/default + ydb/core/tx + ydb/library/yql/public/udf/service/exception_policy + ydb/public/lib/yson_value + ydb/public/sdk/cpp/client/ydb_datastreams + ydb/public/sdk/cpp/client/ydb_topic + ydb/public/sdk/cpp/client/ydb_persqueue_public + ydb/public/sdk/cpp/client/ydb_result +) + +YQL_LAST_ABI_VERSION() + +SRCS( + datashard_ut_incremental_restore_scan.cpp +) + +END() diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make index 70fd4a70b335..e267147f198b 100644 --- a/ydb/core/tx/datashard/ya.make +++ b/ydb/core/tx/datashard/ya.make @@ -160,6 +160,7 @@ SRCS( finish_propose_write_unit.cpp follower_edge.cpp incr_restore_helpers.cpp + incr_restore_scan.cpp initiate_build_index_unit.cpp key_conflicts.cpp key_conflicts.h @@ -304,6 +305,7 @@ RECURSE_FOR_TESTS( ut_erase_rows ut_followers ut_incremental_backup + ut_incremental_restore_scan ut_init ut_keys ut_kqp diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index 834323831e11..075cae1c12ec 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -1054,5 +1054,6 @@ message TActivity { BUILD_COLUMNS_SCAN_ACTOR = 645; SAMPLE_K_UPLOAD_ACTOR = 646; LOCAL_KMEANS_SCAN_ACTOR = 647; + INCREMENTAL_RESTORE_SCAN_ACTOR = 648; }; };