diff --git a/ydb/core/base/appdata_fwd.h b/ydb/core/base/appdata_fwd.h index b15d1cfcf880..5c4a675a5673 100644 --- a/ydb/core/base/appdata_fwd.h +++ b/ydb/core/base/appdata_fwd.h @@ -116,6 +116,7 @@ namespace NMiniKQL { namespace NDataShard { class IExportFactory; + class IRestoreIncrementalBackupFactory; } namespace NSQS { @@ -153,6 +154,7 @@ struct TAppData { const NScheme::TTypeRegistry* TypeRegistry = nullptr; const NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr; const NDataShard::IExportFactory *DataShardExportFactory = nullptr; + const NDataShard::IRestoreIncrementalBackupFactory *DataShardRestoreIncrementalBackupFactory = nullptr; const TFormatFactory* FormatFactory = nullptr; const NSQS::IEventsWriterFactory* SqsEventsWriterFactory = nullptr; diff --git a/ydb/core/change_exchange/change_sender_common_ops.h b/ydb/core/change_exchange/change_sender_common_ops.h index c90cb766cfde..d482bd5585da 100644 --- a/ydb/core/change_exchange/change_sender_common_ops.h +++ b/ydb/core/change_exchange/change_sender_common_ops.h @@ -137,6 +137,9 @@ class TBaseChangeSender { auto it = Senders.find(partitionId); if (it != Senders.end()) { senders.emplace(partitionId, std::move(it->second)); + if (it->second.Ready) { + --ReadySenders; + } Senders.erase(it); } else { LazyCreateSender(senders, partitionId); @@ -208,6 +211,8 @@ class TBaseChangeSender { THashSet registrations; bool needToResolve = false; + // int tmp = PendingSent.size(); + while (it != PendingSent.end()) { if (Enqueued && Enqueued.begin()->Order <= it->first) { break; @@ -258,6 +263,8 @@ class TBaseChangeSender { it = PendingSent.erase(it); } + // Y_ABORT_S("something strange: " << sendTo.size() << " " << tmp); + for (const auto partitionId : registrations) { RegisterSender(partitionId); } @@ -279,6 +286,7 @@ class TBaseChangeSender { Y_ABORT_UNLESS(sender.Ready); sender.Ready = false; + ReadySenders--; sender.Pending.reserve(sender.Prepared.size()); for (const auto& record : sender.Prepared) { @@ -527,6 +535,7 @@ class TBaseChangeSender { auto& sender = it->second; sender.Ready = true; + ReadySenders++; if (sender.Pending) { RemoveRecords(std::exchange(sender.Pending, {})); @@ -551,6 +560,9 @@ class TBaseChangeSender { } ReEnqueueRecords(it->second); + if (it->second.Ready) { + --ReadySenders; + } Senders.erase(it); GonePartitions.push_back(partitionId); @@ -576,6 +588,10 @@ class TBaseChangeSender { , MemUsage(0) {} + bool AllReady() { + return ReadySenders == Senders.size(); + } + void RenderHtmlPage(ui64 tabletId, NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx) { const auto& cgi = ev->Get()->Cgi(); if (const auto& str = cgi.Get("partitionId")) { @@ -776,6 +792,7 @@ class TBaseChangeSender { ui64 MemUsage; THashMap Senders; // ui64 is partition id + ui64 ReadySenders = 0; TSet Enqueued; TSet PendingBody; TMap PendingSent; // ui64 is order diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 6a0d8bff5b10..cae26ee33129 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -1015,7 +1015,10 @@ message TDropContinuousBackup { message TRestoreIncrementalBackup { optional string SrcTableName = 1; + optional NKikimrProto.TPathID SrcPathId = 3; + optional string DstTableName = 2; + optional NKikimrProto.TPathID DstPathId = 4; } enum EIndexType { diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 5822b02c1fb0..b529b22b4e5c 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -480,6 +480,8 @@ message TFlatSchemeTransaction { optional TAlterCdcStreamNotice AlterCdcStreamNotice = 19; optional TDropCdcStreamNotice DropCdcStreamNotice = 20; optional TMoveIndex MoveIndex = 21; + + optional NKikimrSchemeOp.TRestoreIncrementalBackup RestoreIncrementalBackupSrc = 22; } message TDistributedEraseTransaction { diff --git a/ydb/core/tx/datashard/cdc_stream_scan.cpp b/ydb/core/tx/datashard/cdc_stream_scan.cpp index ed214a994363..aad223f943f9 100644 --- a/ydb/core/tx/datashard/cdc_stream_scan.cpp +++ b/ydb/core/tx/datashard/cdc_stream_scan.cpp @@ -1,6 +1,7 @@ #include "cdc_stream_scan.h" #include "change_record_body_serializer.h" #include "datashard_impl.h" +#include "change_exchange_impl.h" #include #include @@ -8,6 +9,10 @@ #include #include +#undef LOG_D +#undef LOG_I +#undef LOG_W + #define LOG_D(stream) LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamScan][" << TabletID() << "] " << stream) #define LOG_I(stream) LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamScan][" << TabletID() << "] " << stream) #define LOG_W(stream) LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamScan][" << TabletID() << "] " << stream) @@ -417,7 +422,7 @@ class TDataShard::TTxCdcStreamScanProgress }; // TTxCdcStreamScanProgress -class TCdcStreamScan: public IActorCallback, public IScan { +class TCdcStreamScan: public IActorCallback, public IScan, protected TChangeRecordBodySerializer { using TStats = TCdcStreamScanManager::TStats; struct TDataShardId { @@ -474,21 +479,139 @@ class TCdcStreamScan: public IActorCallback, public IScan { switch (ev->GetTypeRewrite()) { hFunc(TEvDataShard::TEvCdcStreamScanRequest, Handle); hFunc(TDataShard::TEvPrivate::TEvCdcStreamScanContinue, Handle); + hFunc(TEvents::TEvWakeup, Start); + hFunc(NChangeExchange::TEvChangeExchange::TEvRequestRecords, Handle); + IgnoreFunc(NChangeExchange::TEvChangeExchange::TEvRemoveRecords); + hFunc(TEvChangeExchange::TEvAllSent, Handle); + // IgnoreFunc(TDataShard::TEvPrivate::TEvConfirmReadonlyLease); + default: Y_ABORT("unexpected event Type# 0x%08" PRIx32, ev->GetTypeRewrite()); } } + void Start(TEvents::TEvWakeup::TPtr&) { + Driver->Touch(EScan::Feed); + } + + void Handle(TEvChangeExchange::TEvAllSent::TPtr&) { + Driver->Touch(EScan::Final); + } + + void Handle(NChangeExchange::TEvChangeExchange::TEvRequestRecords::TPtr& ev) { + // LOG_D("Handltypename e " << ev->Get()->ToString()); + + TVector records(::Reserve(ev->Get()->Records.size())); + + for (const auto& record : ev->Get()->Records) { + auto it = PendingRecords.find(record.Order); + Y_ABORT_UNLESS(it != PendingRecords.end()); + records.emplace_back(it->second); + } + + Send(ev->Sender, new NChangeExchange::TEvChangeExchange::TEvRecords(std::make_shared>(std::move(records)))); + } + void Handle(TEvDataShard::TEvCdcStreamScanRequest::TPtr& ev) { ReplyTo = ev->Sender; Reply(NKikimrTxDataShard::TEvCdcStreamScanResponse::IN_PROGRESS); } - void Progress() { + static TVector MakeKey(TArrayRef cells, TUserTable::TCPtr table) { + TVector key(Reserve(cells.size())); + + Y_ABORT_UNLESS(cells.size() == table->KeyColumnTypes.size()); + for (TPos pos = 0; pos < cells.size(); ++pos) { + key.emplace_back(cells.at(pos).AsRef(), table->KeyColumnTypes.at(pos)); + } + + return key; + } + + static std::optional> MakeRestoreUpdates(TArrayRef cells, TArrayRef tags, TUserTable::TCPtr table) { + Y_ABORT_UNLESS(cells.size() >= 1); + TVector updates(::Reserve(cells.size() - 1)); + + bool foundSpecialColumn = false; + Y_ABORT_UNLESS(cells.size() == tags.size()); + for (TPos pos = 0; pos < cells.size(); ++pos) { + const auto tag = tags.at(pos); + auto it = table->Columns.find(tag); + Y_ABORT_UNLESS(it != table->Columns.end()); + if (it->second.Name == "__ydb_incrBackupImpl_deleted") { + if (const auto& cell = cells.at(pos); !cell.IsNull() && cell.AsValue()) { + return std::nullopt; + } + foundSpecialColumn = true; + continue; + } + updates.emplace_back(tag, ECellOp::Set, TRawTypeValue(cells.at(pos).AsRef(), it->second.Type)); + } + Y_ABORT_UNLESS(foundSpecialColumn); + + return updates; + } + + EScan Progress() { Stats.RowsProcessed += Buffer.Rows(); Stats.BytesProcessed += Buffer.Bytes(); + if (IncrRestore) { + auto& ctx = TlsActivationContext->AsActorContext(); + auto TabletID = [&]() { return DataShard.TabletId; }; + LOG_D("IncrRestore@Progress()" + << ": Buffer.Rows()# " << Buffer.Rows()); + + // auto reservationCookie = Self->ReserveChangeQueueCapacity(Buffer.Rows()); + auto rows = Buffer.Flush(); + TVector changeRecords; + TVector records; + + auto table = Self->GetUserTables().at(TablePathId.LocalPathId); + for (auto& [k, v] : rows) { + LOG_D("IncrRestore@Progress()#iter" + << ": k.GetCells().size()# " << k.GetCells().size() << ", v.GetCells().size()# " << v.GetCells().size()); + const auto key = MakeKey(k.GetCells(), table); + const auto& keyTags = table->KeyColumnIds; + NKikimrChangeExchange::TDataChange body; + if (auto updates = MakeRestoreUpdates(v.GetCells(), ValueTags, table); updates) { + Serialize(body, ERowOp::Upsert, key, keyTags, *updates); + } else { + Serialize(body, ERowOp::Erase, key, keyTags, {}); + } + auto recordPtr = TChangeRecordBuilder(TChangeRecord::EKind::AsyncIndex) + .WithOrder(++Order) + .WithGroup(0) + .WithStep(ReadVersion.Step) + .WithTxId(ReadVersion.TxId) + .WithPathId(StreamPathId) + .WithTableId(TablePathId) + .WithSchemaVersion(table->GetTableSchemaVersion()) + .WithBody(body.SerializeAsString()) + .WithSource(TChangeRecord::ESource::InitialScan) + .Build(); + + const auto& record = *recordPtr; + + records.emplace_back(record.GetOrder(), record.GetPathId(), record.GetBody().size()); + // Self->ChangesQueue.emplace(record.GetOrder(), record); + PendingRecords.emplace(record.GetOrder(), recordPtr); + } + + Send(ChangeSender, new NChangeExchange::TEvChangeExchange::TEvEnqueueRecords(records)); + // Self->MaybeActivateChangeSender(TlsActivationContext->AsActorContext()); + // Self->EnqueueChangeRecords(std::move(changeRecords), reservationCookie); + + if (NoMoreData) { + Send(ChangeSender, new TEvChangeExchange::TEvNoMoreData()); + } + + return NoMoreData ? EScan::Sleep : EScan::Feed; + } + Send(DataShard.ActorId, new TDataShard::TEvPrivate::TEvCdcStreamScanProgress( TablePathId, StreamPathId, ReadVersion, ValueTags, std::move(Buffer.Flush()), Stats )); + + return EScan::Sleep; } void Handle(TDataShard::TEvPrivate::TEvCdcStreamScanContinue::TPtr&) { @@ -526,9 +649,41 @@ class TCdcStreamScan: public IActorCallback, public IScan { , Driver(nullptr) , NoMoreData(false) , Stats(stats) + , IncrRestore(false) { } + static TVector InitValueTags(TDataShard* self, const TPathId& tablePathId) { + auto table = self->GetUserTables().at(tablePathId.LocalPathId); + TVector valueTags; + valueTags.reserve(table->Columns.size() - 1); + for (const auto& [tag, column] : table->Columns) { + if (!column.IsKey) { + valueTags.push_back(tag); + } + } + + return valueTags; + } + + explicit TCdcStreamScan( + TDataShard* self, + ui64 txId, + const TPathId& tablePathId, + const TPathId& streamPathId) + : IActorCallback(static_cast(&TCdcStreamScan::StateWork), NKikimrServices::TActivity::CDC_STREAM_SCAN_ACTOR) + , DataShard{self->SelfId(), self->TabletID()} + , TxId(txId) + , TablePathId(tablePathId) + , StreamPathId(streamPathId) + , ReadVersion({}) + , ValueTags(InitValueTags(self, tablePathId)) + , Limits({}) + , Stats({}) + , IncrRestore(true) + , Self(self) + {} + void Describe(IOutputStream& o) const noexcept override { o << "CdcStreamScan {" << " TxId: " << TxId @@ -541,11 +696,21 @@ class TCdcStreamScan: public IActorCallback, public IScan { TlsActivationContext->AsActorContext().RegisterWithSameMailbox(this); Driver = driver; Y_ABORT_UNLESS(!LastKey || LastKey->GetCells().size() == scheme->Tags(true).size()); + + if (IncrRestore) { + return {EScan::Sleep, {}}; + } + return {EScan::Feed, {}}; } void Registered(TActorSystem* sys, const TActorId&) override { - sys->Send(DataShard.ActorId, new TDataShard::TEvPrivate::TEvCdcStreamScanRegistered(TxId, SelfId())); + if (IncrRestore) { + auto ds = NKikimr::NDataShard::TDataShardId(Self->TabletID(), Self->Generation(), SelfId()); + ChangeSender = RegisterWithSameMailbox(CreateIncrRestoreChangeSender(ds, TablePathId, StreamPathId)); + } else { + sys->Send(DataShard.ActorId, new TDataShard::TEvPrivate::TEvCdcStreamScanRegistered(TxId, SelfId())); + } } EScan Seek(TLead& lead, ui64) noexcept override { @@ -577,15 +742,22 @@ class TCdcStreamScan: public IActorCallback, public IScan { EScan Exhausted() noexcept override { NoMoreData = true; + if (!Buffer && IncrRestore) { + return EScan::Sleep; + } + if (!Buffer) { return EScan::Final; } - Progress(); - return EScan::Sleep; + return Progress(); } TAutoPtr Finish(EAbort abort) noexcept override { + if (IncrRestore) { + Send(DataShard.ActorId, new TEvDataShard::TEvRestoreFinished{TxId}); + } + if (abort != EAbort::None) { Reply(NKikimrTxDataShard::TEvCdcStreamScanResponse::ABORTED); } else { @@ -611,7 +783,11 @@ class TCdcStreamScan: public IActorCallback, public IScan { bool NoMoreData; TBuffer Buffer; TStats Stats; - + bool IncrRestore; + TDataShard* Self; + ui64 Order = 0; + TActorId ChangeSender; + TMap PendingRecords; }; // TCdcStreamScan class TDataShard::TTxCdcStreamScanRun: public TTransactionBase { @@ -783,4 +959,25 @@ void TDataShard::Handle(TEvPrivate::TEvCdcStreamScanProgress::TPtr& ev, const TA Execute(new TTxCdcStreamScanProgress(this, ev), ctx); } +void TDataShard::Handle(TEvDataShard::TEvRestoreFinished::TPtr& ev, const TActorContext& ctx) { + RestoreFinished = true; + + TOperation::TPtr op = Pipeline.FindOp(ev->Get()->TxId); + if (op) { + ForwardEventToOperation(ev, op, ctx); + } +} + +THolder TDataShard::CreateVolatileStreamScan( + TPathId tablePathId, + const TPathId& streamPathId, + ui64 txId) +{ + return MakeHolder( + this, + txId, // why not tie breaker? + tablePathId, + streamPathId); +} + } diff --git a/ydb/core/tx/datashard/cdc_stream_scan.h b/ydb/core/tx/datashard/cdc_stream_scan.h index eaa2460e149e..ed6f928259ac 100644 --- a/ydb/core/tx/datashard/cdc_stream_scan.h +++ b/ydb/core/tx/datashard/cdc_stream_scan.h @@ -5,6 +5,8 @@ #include #include #include +#include +#include #include #include diff --git a/ydb/core/tx/datashard/change_exchange.h b/ydb/core/tx/datashard/change_exchange.h index 548fe14e3163..7e0ffc1b0d76 100644 --- a/ydb/core/tx/datashard/change_exchange.h +++ b/ydb/core/tx/datashard/change_exchange.h @@ -33,6 +33,9 @@ struct TEvChangeExchange { // Split/merge EvSplitAck, + EvNoMoreData, + EvAllSent, + EvEnd, }; @@ -44,11 +47,14 @@ struct TEvChangeExchange { struct TEvStatus: public TEventPB {}; struct TEvActivateSender: public TEventPB {}; struct TEvActivateSenderAck: public TEventPB {}; + struct TEvNoMoreData: public TEventLocal {}; + struct TEvAllSent: public TEventLocal {}; /// Local events enum class ESenderType { AsyncIndex, CdcStream, + IncrRestore, }; struct TEvAddSender: public TEventLocal { diff --git a/ydb/core/tx/datashard/change_exchange_impl.h b/ydb/core/tx/datashard/change_exchange_impl.h index b6af2aea263e..1945d651d54b 100644 --- a/ydb/core/tx/datashard/change_exchange_impl.h +++ b/ydb/core/tx/datashard/change_exchange_impl.h @@ -6,6 +6,7 @@ namespace NKikimr { namespace NDataShard { +IActor* CreateIncrRestoreChangeSender(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& restoreTargetPathId); IActor* CreateAsyncIndexChangeSender(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& indexPathId); IActor* CreateCdcStreamChangeSender(const TDataShardId& dataShard, const TPathId& streamPathId); diff --git a/ydb/core/tx/datashard/change_exchange_split.cpp b/ydb/core/tx/datashard/change_exchange_split.cpp index 93e6d68a3b8d..b0954c5e154c 100644 --- a/ydb/core/tx/datashard/change_exchange_split.cpp +++ b/ydb/core/tx/datashard/change_exchange_split.cpp @@ -523,6 +523,8 @@ class TChangeExchageSplit: public TActorBootstrapped { return Register(new TCdcWorker(SelfId(), pathId, DataShard.TabletId, DstDataShards)); case EWorkerType::AsyncIndex: Y_ABORT("unreachable"); + case EWorkerType::IncrRestore: + Y_ABORT("unreachable"); } } diff --git a/ydb/core/tx/datashard/change_sender.cpp b/ydb/core/tx/datashard/change_sender.cpp index d1c5e8c40331..2d40cb328d03 100644 --- a/ydb/core/tx/datashard/change_sender.cpp +++ b/ydb/core/tx/datashard/change_sender.cpp @@ -59,6 +59,8 @@ class TChangeSender: public TActor { return Register(CreateAsyncIndexChangeSender(DataShard, userTableId, pathId)); case ESenderType::CdcStream: return Register(CreateCdcStreamChangeSender(DataShard, pathId)); + case ESenderType::IncrRestore: + return Register(CreateIncrRestoreChangeSender(DataShard, userTableId, pathId)); } } @@ -107,6 +109,7 @@ class TChangeSender: public TActor { auto it = Senders.find(msg.PathId); if (it != Senders.end()) { + Y_ABORT("Trying to create multiple senders"); Y_ABORT_UNLESS(it->second.UserTableId == msg.UserTableId); Y_ABORT_UNLESS(it->second.Type == msg.Type); LOG_W("Trying to add duplicate sender" diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp index 1a361379f343..d56a721ee7f0 100644 --- a/ydb/core/tx/datashard/change_sender_async_index.cpp +++ b/ydb/core/tx/datashard/change_sender_async_index.cpp @@ -62,6 +62,14 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrapped(); + handshake->Record.SetOrigin(DataShard.TabletId); + handshake->Record.SetGeneration(DataShard.Generation); + Send(LeaderPipeCache, new TEvPipeCache::TEvForward(handshake.Release(), ShardId, true)); + Become(&TThis::StateHandshake); + return; + } Send(DataShard.ActorId, new TDataShard::TEvPrivate::TEvConfirmReadonlyLease, 0, ++LeaseConfirmationCookie); Become(&TThis::StateHandshake); } @@ -154,8 +162,9 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrapped& tagMap) + const TPathId& indexTablePathId, const TMap& tagMap, bool noLease = false) : Parent(parent) , DataShard(dataShard) , ShardId(shardId) @@ -294,6 +303,7 @@ class TAsyncIndexChangeSenderShard: public TActorBootstrappedResultSet.emplace_back(MakeNavigateEntry(PathId, TNavigate::OpList)); Send(MakeSchemeCacheID(), new TEvNavigate(request.Release())); - Become(&TThis::StateResolveIndex); + if (!IncrRestore) { + Become(&TThis::StateResolveIndex); + } else { + Become(&TThis::StateResolveIndexTable); + } } STATEFN(StateResolveIndex) { @@ -599,7 +614,9 @@ class TAsyncIndexChangeSenderMain const auto& entry = result->ResultSet.at(0); - if (!CheckTableId(entry, IndexTablePathId)) { + if (IncrRestore && !CheckTableId(entry, PathId)) { + return; + } else if (!IncrRestore && !CheckTableId(entry, IndexTablePathId)) { return; } @@ -611,6 +628,7 @@ class TAsyncIndexChangeSenderMain return; } + // FIXME(+active) TagMap.clear(); TVector keyColumnTypes; @@ -680,7 +698,9 @@ class TAsyncIndexChangeSenderMain auto& entry = result->ResultSet.at(0); - if (!CheckTableId(entry, IndexTablePathId)) { + if (IncrRestore && !CheckTableId(entry, PathId)) { + return; + } else if (!IncrRestore && !CheckTableId(entry, IndexTablePathId)) { return; } @@ -700,13 +720,30 @@ class TAsyncIndexChangeSenderMain KeyDesc = std::move(entry.KeyDescription); CreateSenders(MakePartitionIds(KeyDesc->GetPartitions()), versionChanged); + if (IncrRestore) { + Send(DataShard.ActorId, new TEvents::TEvWakeup); + } + Become(&TThis::StateMain); } /// Main STATEFN(StateMain) { - return StateBase(ev); + switch (ev->GetTypeRewrite()) { + hFunc(TEvChangeExchange::TEvNoMoreData, Handle); + default: + return StateBase(ev); + } + } + + void Handle(TEvChangeExchange::TEvNoMoreData::TPtr& ev) { + LOG_D("Handle " << ev->Get()->ToString()); + NoMoreData = true; + + if (AllReady()) { + Send(DataShard.ActorId, new TEvChangeExchange::TEvAllSent()); + } } void Resolve() override { @@ -718,7 +755,7 @@ class TAsyncIndexChangeSenderMain } IActor* CreateSender(ui64 partitionId) const override { - return new TAsyncIndexChangeSenderShard(SelfId(), DataShard, partitionId, IndexTablePathId, TagMap); + return new TAsyncIndexChangeSenderShard(SelfId(), DataShard, partitionId, IncrRestore ? PathId : IndexTablePathId, TagMap, IncrRestore); } void Handle(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { @@ -740,6 +777,10 @@ class TAsyncIndexChangeSenderMain void Handle(NChangeExchange::TEvChangeExchangePrivate::TEvReady::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); OnReady(ev->Get()->PartitionId); + + if (NoMoreData && AllReady()) { + Send(DataShard.ActorId, new TEvChangeExchange::TEvAllSent()); + } } void Handle(NChangeExchange::TEvChangeExchangePrivate::TEvGone::TPtr& ev) { @@ -774,12 +815,13 @@ class TAsyncIndexChangeSenderMain return NKikimrServices::TActivity::CHANGE_SENDER_ASYNC_INDEX_ACTOR_MAIN; } - explicit TAsyncIndexChangeSenderMain(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& indexPathId) + explicit TAsyncIndexChangeSenderMain(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& indexPathId, bool incrRestore = false) : TActorBootstrapped() , TBaseChangeSender(this, this, this, dataShard.ActorId, indexPathId) , DataShard(dataShard) , UserTableId(userTableId) , IndexTableVersion(0) + , IncrRestore(incrRestore) { } @@ -813,6 +855,7 @@ class TAsyncIndexChangeSenderMain const TDataShardId DataShard; const TTableId UserTableId; mutable TMaybe LogPrefix; + bool NoMoreData = false; THashMap MainColumnToTag; TMap TagMap; // from main to index @@ -820,11 +863,15 @@ class TAsyncIndexChangeSenderMain TPathId IndexTablePathId; ui64 IndexTableVersion; THolder KeyDesc; - + bool IncrRestore; }; // TAsyncIndexChangeSenderMain IActor* CreateAsyncIndexChangeSender(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& indexPathId) { return new TAsyncIndexChangeSenderMain(dataShard, userTableId, indexPathId); } +IActor* CreateIncrRestoreChangeSender(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& restoreTargetPathId) { + return new TAsyncIndexChangeSenderMain(dataShard, userTableId, restoreTargetPathId, true); +} + } diff --git a/ydb/core/tx/datashard/check_scheme_tx_unit.cpp b/ydb/core/tx/datashard/check_scheme_tx_unit.cpp index 137233eb7508..197a5d10a859 100644 --- a/ydb/core/tx/datashard/check_scheme_tx_unit.cpp +++ b/ydb/core/tx/datashard/check_scheme_tx_unit.cpp @@ -380,6 +380,9 @@ bool TCheckSchemeTxUnit::CheckSchemeTx(TActiveTransaction *activeTx) case TSchemaOperation::ETypeDropCdcStream: res = CheckDropCdcStream(activeTx); break; + case TSchemaOperation::ETypeRestoreIncrementalBackupSrc: + res = true; // FIXME: CheckDropCdcStream(activeTx); + break; default: LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, "Unknown scheme tx type detected at tablet " diff --git a/ydb/core/tx/datashard/create_restore_incremental_backup_src_unit.cpp b/ydb/core/tx/datashard/create_restore_incremental_backup_src_unit.cpp new file mode 100644 index 000000000000..e2211a3957b5 --- /dev/null +++ b/ydb/core/tx/datashard/create_restore_incremental_backup_src_unit.cpp @@ -0,0 +1,300 @@ +#include "defs.h" +#include "execution_unit_ctors.h" +#include "datashard_active_transaction.h" +#include "datashard_impl.h" +#include "export_iface.h" +#include "cdc_stream_scan.h" + +#include +#include +#include +#include + +#define EXPORT_LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::DATASHARD_BACKUP, "[Export] [" << LogPrefix() << "] " << stream) +#define EXPORT_LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::DATASHARD_BACKUP, "[Export] [" << LogPrefix() << "] " << stream) +#define EXPORT_LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::DATASHARD_BACKUP, "[Export] [" << LogPrefix() << "] " << stream) +#define EXPORT_LOG_N(stream) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::DATASHARD_BACKUP, "[Export] [" << LogPrefix() << "] " << stream) +#define EXPORT_LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::DATASHARD_BACKUP, "[Export] [" << LogPrefix() << "] " << stream) +#define EXPORT_LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::DATASHARD_BACKUP, "[Export] [" << LogPrefix() << "] " << stream) +#define EXPORT_LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::DATASHARD_BACKUP, "[Export] [" << LogPrefix() << "] " << stream) + +namespace NKikimr { +namespace NDataShard { + +using namespace NKikimrTxDataShard; +using namespace NExportScan; + +/// + +THolder CreateDirectReplicationScan(TDataShard& self, const ::NKikimrSchemeOp::TRestoreIncrementalBackup& incrBackup, ui64 txId) { + TPathId tablePathId = PathIdFromPathId(incrBackup.GetSrcPathId()); + TPathId dstTablePathId = PathIdFromPathId(incrBackup.GetDstPathId()); + return self.CreateVolatileStreamScan(tablePathId, dstTablePathId, txId); +} + +/// + +class TRestoreIncrementalBackupSrcUnit : public TExecutionUnit { + THolder AddSender; +protected: + bool IsRelevant(TActiveTransaction* tx) const { + return tx->GetSchemeTx().HasRestoreIncrementalBackupSrc(); + } + + bool IsWaiting(TOperation::TPtr op) const { + return op->IsWaitingForScan() || op->IsWaitingForRestart(); + } + + void SetWaiting(TOperation::TPtr op) { + op->SetWaitingForScanFlag(); + } + + void ResetWaiting(TOperation::TPtr op) { + op->ResetWaitingForScanFlag(); + op->ResetWaitingForRestartFlag(); + } + + void Abort(TOperation::TPtr op, const TActorContext& ctx, const TString& error) { + TActiveTransaction* tx = dynamic_cast(op.Get()); + Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); + + LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, error); + + BuildResult(op)->AddError(NKikimrTxDataShard::TError::WRONG_SHARD_STATE, error); + ResetWaiting(op); + + Cancel(tx, ctx); + } + + bool Run(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) { + TActiveTransaction* tx = dynamic_cast(op.Get()); + Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); + + Y_ABORT_UNLESS(!DataShard.RestoreStarted, "uh-oh"); + + Y_ABORT_UNLESS(tx->GetSchemeTx().HasRestoreIncrementalBackupSrc()); + const auto& restoreSrc = tx->GetSchemeTx().GetRestoreIncrementalBackupSrc(); + + const ui64 tableId = restoreSrc.GetSrcPathId().GetLocalId(); + Y_ABORT_UNLESS(DataShard.GetUserTables().contains(tableId)); + + const ui32 localTableId = DataShard.GetUserTables().at(tableId)->LocalTid; + Y_ABORT_UNLESS(txc.DB.GetScheme().GetTableInfo(localTableId)); + + Y_ABORT_UNLESS(restoreSrc.HasDstPathId()); + // const TPathId dstTableId = PathIdFromPathId(restoreSrc.GetDstPathId()); + + auto* appData = AppData(ctx); + const auto& columns = DataShard.GetUserTables().at(tableId)->Columns; + std::shared_ptr<::NKikimr::NDataShard::IExport> exp; // TODO: decouple from export + Y_UNUSED(exp, appData, columns); + + // if (auto* restoreFactory = appData->DataShardRestoreIncrementalBackupFactory) { + // std::shared_ptr(restoreFactory->CreateRestore(restoreSrc, columns)).swap(exp); + // } else { + // std::shared_ptr(new TTableExport(restoreSrc, columns)).swap(exp); + // /* + // Abort(op, ctx, "Restore incremental backup are disabled"); + // return false; + // */ + // } + + // auto createUploader = [self = DataShard.SelfId(), txId = op->GetTxId(), exp]() { + // return exp->CreateUploader(self, txId); + // }; + + // THolder buffer{exp->CreateBuffer()}; + // + THolder scan{CreateDirectReplicationScan(DataShard, restoreSrc, op->GetTxId())}; + + // FIXME: + + const auto& taskName = appData->DataShardConfig.GetBackupTaskName(); + const auto taskPrio = appData->DataShardConfig.GetBackupTaskPriority(); + + ui64 readAheadLo = appData->DataShardConfig.GetBackupReadAheadLo(); + if (ui64 readAheadLoOverride = DataShard.GetBackupReadAheadLoOverride(); readAheadLoOverride > 0) { + readAheadLo = readAheadLoOverride; + } + + ui64 readAheadHi = appData->DataShardConfig.GetBackupReadAheadHi(); + if (ui64 readAheadHiOverride = DataShard.GetBackupReadAheadHiOverride(); readAheadHiOverride > 0) { + readAheadHi = readAheadHiOverride; + } + + DataShard.RestoreStarted = true; + + tx->SetScanTask(DataShard.QueueScan(localTableId, scan.Release(), op->GetTxId(), + TScanOptions() + .SetResourceBroker(taskName, taskPrio) + .SetReadAhead(readAheadLo, readAheadHi) + .SetReadPrio(TScanOptions::EReadPrio::Low) + )); + + // AddSender.Reset(new TEvChangeExchange::TEvAddSender( + // TTableId(DataShard.GetPathOwnerId(), tableId), + // TEvChangeExchange::ESenderType::IncrRestore, + // dstTableId + // )); + + // if (AddSender) { + // ctx.Send(DataShard.GetChangeSender(), AddSender.Release()); + // } + + return true; + } + + bool HasResult(TOperation::TPtr op) const { + return op->HasScanResult(); + } + + bool ProcessResult(TOperation::TPtr op, const TActorContext&) { + TActiveTransaction* tx = dynamic_cast(op.Get()); + Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); + + auto* result = CheckedCast(op->ScanResult().Get()); + bool done = true; + + switch (result->Outcome) { + case EExportOutcome::Success: + case EExportOutcome::Error: + if (auto* schemeOp = DataShard.FindSchemaTx(op->GetTxId())) { + schemeOp->Success = result->Outcome == EExportOutcome::Success; + schemeOp->Error = std::move(result->Error); + schemeOp->BytesProcessed = result->BytesRead; + schemeOp->RowsProcessed = result->RowsRead; + } else { + Y_FAIL_S("Cannot find schema tx: " << op->GetTxId()); + } + break; + case EExportOutcome::Aborted: + done = false; + break; + } + + op->SetScanResult(nullptr); + tx->SetScanTask(0); + + return done; + } + + void Cancel(TActiveTransaction* tx, const TActorContext&) { + if (!tx->GetScanTask()) { + return; + } + + const ui64 tableId = tx->GetSchemeTx().GetBackup().GetTableId(); + + Y_ABORT_UNLESS(DataShard.GetUserTables().contains(tableId)); + const ui32 localTableId = DataShard.GetUserTables().at(tableId)->LocalTid; + + DataShard.CancelScan(localTableId, tx->GetScanTask()); + tx->SetScanTask(0); + } + + void PersistResult(TOperation::TPtr op, TTransactionContext& txc) { + auto* schemeOp = DataShard.FindSchemaTx(op->GetTxId()); + Y_ABORT_UNLESS(schemeOp); + + NIceDb::TNiceDb db(txc.DB); + DataShard.PersistSchemeTxResult(db, *schemeOp); + } + + EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override final { + Y_ABORT_UNLESS(op->IsSchemeTx()); + + TActiveTransaction* tx = dynamic_cast(op.Get()); + Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); + + const TString msg = TStringBuilder() << "Got2 " << "<" << tx->IsSchemeTx() << ">" << tx->GetTxBody() << " tx"; + LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, msg); + + if (!IsRelevant(tx)) { + return EExecutionStatus::Executed; + } + + if (!IsWaiting(op)) { + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Starting a " << GetKind() << " operation" + << " at " << DataShard.TabletID()); + + if (!Run(op, txc, ctx)) { + return EExecutionStatus::Executed; + } + + SetWaiting(op); + Y_DEBUG_ABORT_UNLESS(!HasResult(op)); + } + + if (HasResult(op)) { + LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "" << GetKind() << " complete" + << " at " << DataShard.TabletID()); + + ResetWaiting(op); + if (ProcessResult(op, ctx)) { + PersistResult(op, txc); + } else { + Y_DEBUG_ABORT_UNLESS(!HasResult(op)); + op->SetWaitingForRestartFlag(); + ctx.Schedule(TDuration::Seconds(1), new TDataShard::TEvPrivate::TEvRestartOperation(op->GetTxId())); + } + } + + while (op->HasPendingInputEvents()) { + ProcessEvent(op->InputEvents().front(), op, ctx); + op->InputEvents().pop(); + } + + if (IsWaiting(op)) { + return EExecutionStatus::Continue; + } + + return EExecutionStatus::Executed; + } + + bool IsReadyToExecute(TOperation::TPtr op) const override final { + if (!IsWaiting(op)) { + return true; + } + + if (HasResult(op)) { + return true; + } + + if (op->HasPendingInputEvents()) { + return true; + } + + return false; + } + + void Complete(TOperation::TPtr, const TActorContext&) override final { + } + + + void Handle(TEvDataShard::TEvRestoreFinished::TPtr& ev, TOperation::TPtr op, const TActorContext& ctx) { + Y_UNUSED(ev, op, ctx); + ResetWaiting(op); + } + + void ProcessEvent(TAutoPtr& ev, TOperation::TPtr op, const TActorContext& ctx) { + switch (ev->GetTypeRewrite()) { + OHFunc(TEvDataShard::TEvRestoreFinished, Handle); + // OHFunc(TEvCancel, Handle); + } + Y_UNUSED(op, ctx); + } + +public: + TRestoreIncrementalBackupSrcUnit(TDataShard& self, TPipeline& pipeline) + : TExecutionUnit(EExecutionUnitKind::RestoreIncrementalBackupSrc, false, self, pipeline) + { + } + +}; // TRestoreIncrementalBackupSrcUnit + +THolder CreateRestoreIncrementalBackupSrcUnit(TDataShard& self, TPipeline& pipeline) { + return THolder(new TRestoreIncrementalBackupSrcUnit(self, pipeline)); +} + +} // NDataShard +} // NKikimr diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h index 8c305a3a5ceb..60422aef8f37 100644 --- a/ydb/core/tx/datashard/datashard.h +++ b/ydb/core/tx/datashard/datashard.h @@ -332,6 +332,8 @@ struct TEvDataShard { EvSampleKRequest, EvSampleKResponse, + EvRestoreFinished, + EvEnd }; @@ -1716,6 +1718,18 @@ struct TEvDataShard { Record.SetErrorDescription(error); } }; + + struct TEvRestoreFinished + : public TEventLocal + { + TEvRestoreFinished(ui64 txId) + : TxId(txId) + { } + + ui64 TxId; + // todo restore tx id + persist + }; }; IActor* CreateDataShard(const TActorId &tablet, TTabletStorageInfo *info); diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp index d5130891d25d..ed82e56ee321 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.cpp +++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp @@ -366,7 +366,7 @@ void TActiveTransaction::FillTxData(TDataShard *self, if (DataTx->HasStreamResponse()) SetStreamSink(DataTx->GetSink()); } else if (IsSchemeTx()) { - BuildSchemeTx(); + Y_ABORT_UNLESS(BuildSchemeTx()); } else if (IsSnapshotTx()) { BuildSnapshotTx(); } else if (IsDistributedEraseTx()) { @@ -440,7 +440,8 @@ bool TActiveTransaction::BuildSchemeTx() + (ui32)SchemeTx->HasCreateCdcStreamNotice() + (ui32)SchemeTx->HasAlterCdcStreamNotice() + (ui32)SchemeTx->HasDropCdcStreamNotice() - + (ui32)SchemeTx->HasMoveIndex(); + + (ui32)SchemeTx->HasMoveIndex() + + (ui32)SchemeTx->HasRestoreIncrementalBackupSrc(); if (count != 1) return false; @@ -476,6 +477,8 @@ bool TActiveTransaction::BuildSchemeTx() SchemeTxType = TSchemaOperation::ETypeDropCdcStream; else if (SchemeTx->HasMoveIndex()) SchemeTxType = TSchemaOperation::ETypeMoveIndex; + else if (SchemeTx->HasRestoreIncrementalBackupSrc()) + SchemeTxType = TSchemaOperation::ETypeRestoreIncrementalBackupSrc; else SchemeTxType = TSchemaOperation::ETypeUnknown; @@ -858,6 +861,7 @@ void TActiveTransaction::BuildExecutionPlan(bool loaded) plan.push_back(EExecutionUnitKind::CreateCdcStream); plan.push_back(EExecutionUnitKind::AlterCdcStream); plan.push_back(EExecutionUnitKind::DropCdcStream); + plan.push_back(EExecutionUnitKind::RestoreIncrementalBackupSrc); plan.push_back(EExecutionUnitKind::CompleteOperation); plan.push_back(EExecutionUnitKind::CompletedOperations); } else { diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h index 356fae934f2e..f86a1eb9b4de 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.h +++ b/ydb/core/tx/datashard/datashard_active_transaction.h @@ -54,6 +54,7 @@ struct TSchemaOperation { ETypeAlterCdcStream = 14, ETypeDropCdcStream = 15, ETypeMoveIndex = 16, + ETypeRestoreIncrementalBackupSrc = 17, ETypeUnknown = Max() }; diff --git a/ydb/core/tx/datashard/datashard_change_receiving.cpp b/ydb/core/tx/datashard/datashard_change_receiving.cpp index 01daef6e42c4..6734a7e3064f 100644 --- a/ydb/core/tx/datashard/datashard_change_receiving.cpp +++ b/ydb/core/tx/datashard/datashard_change_receiving.cpp @@ -305,7 +305,14 @@ class TDataShard::TTxApplyChangeRecords: public TTransactionBase { return false; } - txc.DB.Update(tableInfo.LocalTid, rop, Key, Value, TRowVersion(record.GetStep(), record.GetTxId())); + if (!MvccReadWriteVersion) { + auto [readVersion, writeVersion] = Self->GetReadWriteVersions(); + Y_DEBUG_ABORT_UNLESS(readVersion == writeVersion); + MvccReadWriteVersion = writeVersion; + Pipeline.AddCommittingOp(*MvccReadWriteVersion); + } + + txc.DB.Update(tableInfo.LocalTid, rop, Key, Value, *MvccReadWriteVersion); Self->GetConflictsCache().GetTableCache(tableInfo.LocalTid).RemoveUncommittedWrites(KeyCells.GetCells(), txc.DB); tableInfo.Stats.UpdateTime = TAppData::TimeProvider->Now(); AddRecordStatus(ctx, record.GetOrder(), NKikimrChangeExchange::TEvStatus::STATUS_OK); @@ -314,8 +321,9 @@ class TDataShard::TTxApplyChangeRecords: public TTransactionBase { } public: - explicit TTxApplyChangeRecords(TDataShard* self, TEvChangeExchange::TEvApplyRecords::TPtr ev) + explicit TTxApplyChangeRecords(TDataShard* self, TPipeline& pipeline, TEvChangeExchange::TEvApplyRecords::TPtr ev) : TTransactionBase(self) + , Pipeline(pipeline) , Ev(std::move(ev)) , Status(new TEvChangeExchange::TEvStatus) { @@ -392,6 +400,10 @@ class TDataShard::TTxApplyChangeRecords: public TTransactionBase { void Complete(const TActorContext& ctx) override { Y_ABORT_UNLESS(Status); + if (MvccReadWriteVersion) { + Pipeline.RemoveCommittingOp(*MvccReadWriteVersion); + } + if (Status->Record.GetStatus() == NKikimrChangeExchange::TEvStatus::STATUS_OK) { Self->IncCounter(COUNTER_CHANGE_EXCHANGE_SUCCESSFUL_APPLY); } else { @@ -402,6 +414,7 @@ class TDataShard::TTxApplyChangeRecords: public TTransactionBase { } private: + TPipeline& Pipeline; TEvChangeExchange::TEvApplyRecords::TPtr Ev; THolder Status; @@ -410,7 +423,7 @@ class TDataShard::TTxApplyChangeRecords: public TTransactionBase { TVector Key; TVector Value; - + std::optional MvccReadWriteVersion; }; // TTxApplyChangeRecords void TDataShard::StartCollectingChangeExchangeHandshakes(const TActorContext& ctx) { @@ -446,7 +459,7 @@ void TDataShard::Handle(TEvChangeExchange::TEvApplyRecords::TPtr& ev, const TAct << ": origin# " << ev->Get()->Record.GetOrigin() << ", generation# " << ev->Get()->Record.GetGeneration() << ", at tablet# " << TabletID()); - Execute(new TTxApplyChangeRecords(this, ev), ctx); + Execute(new TTxApplyChangeRecords(this, Pipeline, ev), ctx); } } diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 18c07fc3a342..b13baec04abf 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -315,6 +315,7 @@ class TDataShard friend class TS3DownloadsManager; friend class TS3Downloader; template friend class TBackupRestoreUnitBase; + friend class TRestoreIncrementalBackupSrcUnit; friend struct TSetupSysLocks; friend class TDataShardLocksDb; @@ -1391,6 +1392,8 @@ class TDataShard void Handle(TEvPrivate::TEvRemoveSchemaSnapshots::TPtr& ev, const TActorContext& ctx); + void Handle(TEvDataShard::TEvRestoreFinished::TPtr& ev, const TActorContext& ctx); + void HandleByReplicationSourceOffsetsServer(STATEFN_SIG); void DoPeriodicTasks(const TActorContext &ctx); @@ -2125,6 +2128,11 @@ class TDataShard return LogThrottlers[type]; }; + THolder CreateVolatileStreamScan( + TPathId tablePathId, + const TPathId& streamPathId, + ui64 txId); + private: /// class TLoanReturnTracker { @@ -2962,6 +2970,9 @@ class TDataShard ui32 StatisticsScanTableId = 0; ui64 StatisticsScanId = 0; + bool RestoreStarted = false; + bool RestoreFinished = false; + public: auto& GetLockChangeRecords() { return LockChangeRecords; @@ -3153,6 +3164,7 @@ class TDataShard HFunc(NStat::TEvStatistics::TEvStatisticsRequest, Handle); HFunc(TEvPrivate::TEvStatisticsScanFinished, Handle); HFuncTraced(TEvPrivate::TEvRemoveSchemaSnapshots, Handle); + HFunc(TEvDataShard::TEvRestoreFinished, Handle); default: if (!HandleDefaultEvents(ev, SelfId())) { ALOG_WARN(NKikimrServices::TX_DATASHARD, "TDataShard::StateWork unhandled event type: " << ev->GetTypeRewrite() << " event: " << ev->ToString()); diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index a5646fa379ff..aa84ff06917f 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -1435,6 +1435,9 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction:: tx->Orbit = std::move(ev->Get()->Orbit); tx->OperationSpan = std::move(operationSpan); + const TString msg = TStringBuilder() << "Got " << "<" << tx->IsSchemeTx() << ">" << rec.GetTxBody() << " tx"; + LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, msg); + auto malformed = [&](const TStringBuf txType, const TString& txBody) { const TString error = TStringBuilder() << "Malformed " << txType << " tx" << " at tablet " << Self->TabletID() diff --git a/ydb/core/tx/datashard/execution_unit.cpp b/ydb/core/tx/datashard/execution_unit.cpp index bbe48c388aaa..90059482104c 100644 --- a/ydb/core/tx/datashard/execution_unit.cpp +++ b/ydb/core/tx/datashard/execution_unit.cpp @@ -150,6 +150,8 @@ THolder CreateExecutionUnit(EExecutionUnitKind kind, return CreateReadUnit(dataShard, pipeline); case EExecutionUnitKind::ExecuteWrite: return CreateExecuteWriteUnit(dataShard, pipeline); + case EExecutionUnitKind::RestoreIncrementalBackupSrc: + return CreateRestoreIncrementalBackupSrcUnit(dataShard, pipeline); default: Y_FAIL_S("Unexpected execution kind " << kind << " (" << (ui32)kind << ")"); } diff --git a/ydb/core/tx/datashard/execution_unit_ctors.h b/ydb/core/tx/datashard/execution_unit_ctors.h index 5034770ab221..8699d57db729 100644 --- a/ydb/core/tx/datashard/execution_unit_ctors.h +++ b/ydb/core/tx/datashard/execution_unit_ctors.h @@ -75,6 +75,7 @@ THolder CreateAlterCdcStreamUnit(TDataShard &dataShard, TPipelin THolder CreateDropCdcStreamUnit(TDataShard &dataShard, TPipeline &pipeline); THolder CreateCheckReadUnit(TDataShard &dataShard, TPipeline &pipeline); THolder CreateReadUnit(TDataShard &dataShard, TPipeline &pipeline); +THolder CreateRestoreIncrementalBackupSrcUnit(TDataShard &dataShard, TPipeline &pipeline); } // namespace NDataShard } // namespace NKikimr diff --git a/ydb/core/tx/datashard/execution_unit_kind.h b/ydb/core/tx/datashard/execution_unit_kind.h index 84d92380d3e0..40ed2f832b43 100644 --- a/ydb/core/tx/datashard/execution_unit_kind.h +++ b/ydb/core/tx/datashard/execution_unit_kind.h @@ -75,6 +75,7 @@ enum class EExecutionUnitKind: ui32 { AlterCdcStream, DropCdcStream, MoveIndex, + RestoreIncrementalBackupSrc, Count, Unspecified }; diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make index afe17818cf8a..3caa07837bf9 100644 --- a/ydb/core/tx/datashard/ya.make +++ b/ydb/core/tx/datashard/ya.make @@ -42,6 +42,7 @@ SRCS( create_persistent_snapshot_unit.cpp create_table_unit.cpp create_volatile_snapshot_unit.cpp + create_restore_incremental_backup_src_unit.cpp datashard__cancel_tx_proposal.cpp datashard__column_stats.cpp datashard__compact_borrowed.cpp diff --git a/ydb/core/tx/replication/service/worker.cpp b/ydb/core/tx/replication/service/worker.cpp index 5bbac37f23bf..8a8a53293d98 100644 --- a/ydb/core/tx/replication/service/worker.cpp +++ b/ydb/core/tx/replication/service/worker.cpp @@ -92,6 +92,13 @@ class TWorker: public TActorBootstrapped { { } + explicit TActorInfo(const TActorId& actorId) + : ActorId(actorId) + , InitDone(true) + , CreateAttempt(0) + { + } + operator TActorId() const { return ActorId; } @@ -101,7 +108,10 @@ class TWorker: public TActorBootstrapped { } void Register(IActorOps* ops) { - ActorId = ops->RegisterWithSameMailbox(CreateFn()); + if (CreateFn) { + ActorId = ops->RegisterWithSameMailbox(CreateFn()); + } + ops->Send(ActorId, new TEvWorker::TEvHandshake()); InitDone = false; ++CreateAttempt; @@ -274,6 +284,17 @@ class TWorker: public TActorBootstrapped { { } + explicit TWorker( + const TActorId& parent, + const TActorId& preparedReader, + std::function&& createWriterFn) + : Parent(parent) + , Reader(preparedReader) + , Writer(std::move(createWriterFn)) + , Lag(TDuration::Zero()) + { + } + void Bootstrap() { for (auto* actor : {&Reader, &Writer}) { actor->Register(this); @@ -314,4 +335,12 @@ IActor* CreateWorker( return new TWorker(parent, std::move(createReaderFn), std::move(createWriterFn)); } +IActor* CreateWorker( + const TActorId& parent, + const TActorId& preparedReader, + std::function&& createWriterFn) +{ + return new TWorker(parent, preparedReader, std::move(createWriterFn)); +} + } diff --git a/ydb/core/tx/replication/service/worker.h b/ydb/core/tx/replication/service/worker.h index ec2fd4197230..dcc9be90156a 100644 --- a/ydb/core/tx/replication/service/worker.h +++ b/ydb/core/tx/replication/service/worker.h @@ -75,6 +75,11 @@ IActor* CreateWorker( std::function&& createReaderFn, std::function&& createWriterFn); +IActor* CreateWorker( + const TActorId& parent, + const TActorId& preparedReader, + std::function&& createWriterFn); + } Y_DECLARE_OUT_SPEC(inline, NKikimr::NReplication::NService::TEvWorker::TEvData::TRecord, o, x) { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h index 14ec42f43168..537c6829333e 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h @@ -487,7 +487,7 @@ class TDone: public TSubOperationState { TString DebugHint() const override { return TStringBuilder() << "TDone" - << " opId# " << OperationId; + << " opId# " << OperationId << " "; } public: diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp index d80e40b6c954..44b92ca7ec4d 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp @@ -112,6 +112,432 @@ void DoCreateAlterTable( result.push_back(CreateAlterTable(NextPartId(opId, result), outTx)); } +/////// +/////// + +namespace NIncrBackup { + +class TConfigurePartsAtTable: public TSubOperationState { + TString DebugHint() const override { + return TStringBuilder() + << "NIncrBackupState::TConfigurePartsAtTable" + << " operationId: " << OperationId; + } + + static bool IsExpectedTxType(TTxState::ETxType txType) { + switch (txType) { + case TTxState::TxCreateCdcStreamAtTable: + case TTxState::TxCreateCdcStreamAtTableWithInitialScan: + case TTxState::TxAlterCdcStreamAtTable: + case TTxState::TxAlterCdcStreamAtTableDropSnapshot: + case TTxState::TxDropCdcStreamAtTable: + case TTxState::TxDropCdcStreamAtTableDropSnapshot: + return true; + default: + return false; + } + } + +protected: + // FIXME + void FillNotice( + const TPathId& pathId, + NKikimrTxDataShard::TFlatSchemeTransaction& tx, + TOperationContext& context) const + { + Y_ABORT_UNLESS(context.SS->PathsById.contains(pathId)); + auto path = context.SS->PathsById.at(pathId); + + Y_ABORT_UNLESS(context.SS->Tables.contains(pathId)); + auto table = context.SS->Tables.at(pathId); + + tx.MutableRestoreIncrementalBackupSrc()->CopyFrom(RestoreOp); + + // TODO: copy op to notice + } + +public: + explicit TConfigurePartsAtTable(TOperationId id, const NKikimrSchemeOp::TRestoreIncrementalBackup& restoreOp) + : OperationId(id) + , RestoreOp(restoreOp) + { + IgnoreMessages(DebugHint(), {}); + } + + bool ProgressState(TOperationContext& context) override { + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " ProgressState" + << ", at schemeshard: " << context.SS->TabletID()); + + auto* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + Y_ABORT_UNLESS(IsExpectedTxType(txState->TxType)); + const auto& pathId = txState->TargetPathId; + + if (NTableState::CheckPartitioningChangedForTableModification(*txState, context)) { + NTableState::UpdatePartitioningForTableModification(OperationId, *txState, context); + } + + NKikimrTxDataShard::TFlatSchemeTransaction tx; + context.SS->FillSeqNo(tx, context.SS->StartRound(*txState)); + FillNotice(pathId, tx, context); + + txState->ClearShardsInProgress(); + Y_ABORT_UNLESS(txState->Shards.size()); + + for (ui32 i = 0; i < txState->Shards.size(); ++i) { + const auto& idx = txState->Shards[i].Idx; + const auto datashardId = context.SS->ShardInfos[idx].TabletID; + auto ev = context.SS->MakeDataShardProposal(pathId, OperationId, tx.SerializeAsString(), context.Ctx); + context.OnComplete.BindMsgToPipe(OperationId, datashardId, idx, ev.Release()); + } + + txState->UpdateShardsInProgress(TTxState::ConfigureParts); + return false; + } + + bool HandleReply(TEvDataShard::TEvProposeTransactionResult::TPtr& ev, TOperationContext& context) override { + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " HandleReply " << ev->Get()->ToString() + << ", at schemeshard: " << context.SS->TabletID()); + + if (!NTableState::CollectProposeTransactionResults(OperationId, ev, context)) { + return false; + } + + return true; + } + +private: + const TOperationId OperationId; + const NKikimrSchemeOp::TRestoreIncrementalBackup RestoreOp; +}; // TConfigurePartsAtTable + +class TProposeAtTable: public TSubOperationState { + TString DebugHint() const override { + return TStringBuilder() + << "NIncrBackupState::TProposeAtTable" + << " operationId: " << OperationId; + } + + static bool IsExpectedTxType(TTxState::ETxType txType) { + switch (txType) { + case TTxState::TxCreateCdcStreamAtTable: + case TTxState::TxCreateCdcStreamAtTableWithInitialScan: + case TTxState::TxAlterCdcStreamAtTable: + case TTxState::TxAlterCdcStreamAtTableDropSnapshot: + case TTxState::TxDropCdcStreamAtTable: + case TTxState::TxDropCdcStreamAtTableDropSnapshot: + return true; + default: + return false; + } + } + +public: + explicit TProposeAtTable(TOperationId id) + : OperationId(id) + { + IgnoreMessages(DebugHint(), {TEvDataShard::TEvProposeTransactionResult::EventType}); + } + + bool ProgressState(TOperationContext& context) override { + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " ProgressState" + << ", at schemeshard: " << context.SS->TabletID()); + + const auto* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + Y_ABORT_UNLESS(IsExpectedTxType(txState->TxType)); + + TSet shardSet; + for (const auto& shard : txState->Shards) { + Y_ABORT_UNLESS(context.SS->ShardInfos.contains(shard.Idx)); + shardSet.insert(context.SS->ShardInfos.at(shard.Idx).TabletID); + } + + context.OnComplete.ProposeToCoordinator(OperationId, txState->TargetPathId, txState->MinStep, shardSet); + return false; + } + + bool HandleReply(TEvDataShard::TEvSchemaChanged::TPtr& ev, TOperationContext& context) override { + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " TEvDataShard::TEvSchemaChanged" + << " triggers early, save it" + << ", at schemeshard: " << context.SS->TabletID()); + + NTableState::CollectSchemaChanged(OperationId, ev, context); + return false; + } + + bool HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) override { + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " HandleReply TEvOperationPlan" + << ", step: " << ev->Get()->StepId + << ", at schemeshard: " << context.SS->TabletID()); + + const auto* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + Y_ABORT_UNLESS(IsExpectedTxType(txState->TxType)); + const auto& pathId = txState->TargetPathId; + + Y_ABORT_UNLESS(context.SS->PathsById.contains(pathId)); + auto path = context.SS->PathsById.at(pathId); + + Y_ABORT_UNLESS(context.SS->Tables.contains(pathId)); + auto table = context.SS->Tables.at(pathId); + + table->AlterVersion += 1; + + NIceDb::TNiceDb db(context.GetDB()); + context.SS->PersistTableAlterVersion(db, pathId, table); + + context.SS->ClearDescribePathCaches(path); + context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + + context.SS->ChangeTxState(db, OperationId, TTxState::ProposedWaitParts); + + const auto step = TStepId(ev->Get()->StepId); + context.SS->SnapshotsStepIds[OperationId.GetTxId()] = step; + context.SS->PersistSnapshotStepId(db, OperationId.GetTxId(), step); + + context.SS->TabletCounters->Simple()[COUNTER_SNAPSHOTS_COUNT].Add(1); + return true; + } + +protected: + const TOperationId OperationId; + +}; // TProposeAtTable + +class TDoneWithInitialScan: public TDone { +public: + using TDone::TDone; + + bool ProgressState(TOperationContext& context) override { + if (!TDone::ProgressState(context)) { + return false; + } + + const auto* txState = context.SS->FindTx(OperationId); + Y_ABORT_UNLESS(txState); + Y_ABORT_UNLESS(txState->TxType == TTxState::TxCreateCdcStreamAtTableWithInitialScan); + const auto& pathId = txState->TargetPathId; + + Y_ABORT_UNLESS(context.SS->PathsById.contains(pathId)); + auto path = context.SS->PathsById.at(pathId); + + TMaybe streamPathId; + for (const auto& [_, childPathId] : path->GetChildren()) { + Y_ABORT_UNLESS(context.SS->PathsById.contains(childPathId)); + auto childPath = context.SS->PathsById.at(childPathId); + + if (childPath->CreateTxId != OperationId.GetTxId()) { + continue; + } + + Y_ABORT_UNLESS(childPath->IsCdcStream() && !childPath->Dropped()); + Y_ABORT_UNLESS(context.SS->CdcStreams.contains(childPathId)); + auto stream = context.SS->CdcStreams.at(childPathId); + + Y_ABORT_UNLESS(stream->State == TCdcStreamInfo::EState::ECdcStreamStateScan); + Y_VERIFY_S(!streamPathId, "Too many cdc streams are planned to fill with initial scan" + << ": found# " << *streamPathId + << ", another# " << childPathId); + streamPathId = childPathId; + } + + // if (AppData()->DisableCdcAutoSwitchingToReadyStateForTests) { + // return true; + // } + + // FIXME(+active) + // Y_ABORT_UNLESS(streamPathId); + // context.OnComplete.Send(context.SS->SelfId(), new TEvPrivate::TEvRunCdcStreamScan(*streamPathId)); + + return true; + } + +}; // TDoneWithInitialScan + +class TNewRestoreFromAtTable: public TSubOperation { + static TTxState::ETxState NextState() { + return TTxState::ConfigureParts; + } + + TTxState::ETxState NextState(TTxState::ETxState state) const override { + switch (state) { + case TTxState::Waiting: + case TTxState::ConfigureParts: + return TTxState::Propose; + case TTxState::Propose: + return TTxState::ProposedWaitParts; + case TTxState::ProposedWaitParts: + return TTxState::Done; + default: + return TTxState::Invalid; + } + } + + TSubOperationState::TPtr SelectStateFunc(TTxState::ETxState state) override { + switch (state) { + case TTxState::Waiting: + case TTxState::ConfigureParts: + return MakeHolder(OperationId, Transaction.GetRestoreIncrementalBackup()); + case TTxState::Propose: + return MakeHolder(OperationId); + case TTxState::ProposedWaitParts: + return MakeHolder(OperationId); + case TTxState::Done: + return MakeHolder(OperationId); + default: + return nullptr; + } + } + +public: + explicit TNewRestoreFromAtTable(TOperationId id, const TTxTransaction& tx) + : TSubOperation(id, tx) + { + } + + explicit TNewRestoreFromAtTable(TOperationId id, TTxState::ETxState state) + : TSubOperation(id, state) + { + } + + THolder Propose(const TString&, TOperationContext& context) override { + const auto& workingDir = Transaction.GetWorkingDir(); + const auto& op = Transaction.GetRestoreIncrementalBackup(); + const auto& tableName = op.GetSrcTableName(); + const auto& dstTableName = op.GetDstTableName(); + + // LOG_N("TNewRestoreFromAtTable Propose" + // << ": opId# " << OperationId + // << ", stream# " << workingDir << "/" << tableName << "/" << streamName); + + auto result = MakeHolder( + NKikimrScheme::StatusAccepted, + ui64(OperationId.GetTxId()), + context.SS->TabletID()); + + const auto workingDirPath = TPath::Resolve(workingDir, context.SS); + // { + // const auto checks = workingDirPath.Check(); + // checks + // .NotUnderDomainUpgrade() + // .IsAtLocalSchemeShard() + // .IsResolved() + // .NotDeleted() + // .IsLikeDirectory() + // .NotUnderDeleting(); + + // if (checks && !workingDirPath.IsTableIndex()) { + // checks.IsCommonSensePath(); + // } + + // if (!checks) { + // result->SetError(checks.GetStatus(), checks.GetError()); + // return result; + // } + // } + + const auto tablePath = workingDirPath.Child(tableName); + // { + // const auto checks = tablePath.Check(); + // checks + // .NotEmpty() + // .NotUnderDomainUpgrade() + // .IsAtLocalSchemeShard() + // .IsResolved() + // .NotDeleted() + // .IsTable() + // .NotAsyncReplicaTable() + // .NotUnderDeleting(); + + // if (checks) { + // if (!tablePath.IsInsideTableIndexPath()) { + // checks.IsCommonSensePath(); + // } + // checks.IsUnderTheSameOperation(OperationId.GetTxId()); // lock op + // } + + // if (!checks) { + // result->SetError(checks.GetStatus(), checks.GetError()); + // return result; + // } + // } + const auto dstTablePath = workingDirPath.Child(dstTableName); + + TString errStr; + if (!context.SS->CheckApplyIf(Transaction, errStr)) { + result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr); + return result; + } + + // NKikimrScheme::EStatus status; + // if (!context.SS->CanCreateSnapshot(tablePath.Base()->PathId, OperationId.GetTxId(), status, errStr)) { + // result->SetError(status, errStr); + // return result; + // } + + auto guard = context.DbGuard(); + context.MemChanges.GrabPath(context.SS, tablePath.Base()->PathId); + context.MemChanges.GrabNewTxState(context.SS, OperationId); + + context.DbChanges.PersistPath(tablePath.Base()->PathId); + context.DbChanges.PersistTxState(OperationId); + + // context.MemChanges.GrabNewTableSnapshot(context.SS, tablePath.Base()->PathId, OperationId.GetTxId()); + // context.DbChanges.PersistTableSnapshot(tablePath.Base()->PathId, OperationId.GetTxId()); + + // context.SS->TablesWithSnapshots.emplace(tablePath.Base()->PathId, OperationId.GetTxId()); + // context.SS->SnapshotTables[OperationId.GetTxId()].insert(tablePath.Base()->PathId); + + Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId)); + auto table = context.SS->Tables.at(tablePath.Base()->PathId); + + Y_ABORT_UNLESS(table->AlterVersion != 0); + Y_ABORT_UNLESS(!table->AlterData); + + const auto txType = TTxState::TxCreateCdcStreamAtTableWithInitialScan; + + Y_ABORT_UNLESS(!context.SS->FindTx(OperationId)); + auto& txState = context.SS->CreateTx(OperationId, txType, tablePath.Base()->PathId); + txState.State = TTxState::ConfigureParts; + + tablePath.Base()->PathState = NKikimrSchemeOp::EPathStateAlter; + tablePath.Base()->LastTxId = OperationId.GetTxId(); + + for (const auto& splitOpId : table->GetSplitOpsInFlight()) { + context.OnComplete.Dependence(splitOpId.GetTxId(), OperationId.GetTxId()); + } + + context.OnComplete.ActivateTx(OperationId); + + SetState(NextState()); + return result; + } + + void AbortPropose(TOperationContext& context) override { + LOG_N("TNewRestoreFromAtTable AbortPropose" + << ": opId# " << OperationId); + } + + void AbortUnsafe(TTxId txId, TOperationContext& context) override { + LOG_N("TNewRestoreFromAtTable AbortUnsafe" + << ": opId# " << OperationId + << ", txId# " << txId); + context.OnComplete.DoneOperation(OperationId); + } + +}; // TNewRestoreFromAtTable + +} // namespace NIncrBackup + +/////// +/////// + TVector CreateRestoreIncrementalBackup(TOperationId opId, const TTxTransaction& tx, TOperationContext& context) { Y_ABORT_UNLESS(tx.GetOperationType() == NKikimrSchemeOp::EOperationType::ESchemeOpRestoreIncrementalBackup); @@ -161,18 +587,29 @@ TVector CreateRestoreIncrementalBackup(TOperationId opId, c } } - NKikimrSchemeOp::TCreateCdcStream createCdcStreamOp; - createCdcStreamOp.SetTableName(srcTableName); - auto& streamDescription = *createCdcStreamOp.MutableStreamDescription(); - streamDescription.SetName(IB_RESTORE_CDC_STREAM_NAME); - streamDescription.SetMode(NKikimrSchemeOp::ECdcStreamModeRestoreIncrBackup); - streamDescription.SetFormat(NKikimrSchemeOp::ECdcStreamFormatProto); - streamDescription.SetState(NKikimrSchemeOp::ECdcStreamStateScan); + // NKikimrSchemeOp::TCreateCdcStream createCdcStreamOp; + // createCdcStreamOp.SetTableName(srcTableName); + // auto& streamDescription = *createCdcStreamOp.MutableStreamDescription(); + // streamDescription.SetName(IB_RESTORE_CDC_STREAM_NAME); + // streamDescription.SetMode(NKikimrSchemeOp::ECdcStreamModeRestoreIncrBackup); + // streamDescription.SetFormat(NKikimrSchemeOp::ECdcStreamFormatProto); + // streamDescription.SetState(NKikimrSchemeOp::ECdcStreamStateScan); TVector result; DoCreateLock(opId, workingDirPath, srcTablePath, false, result); + { + auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamAtTable); + outTx.MutableRestoreIncrementalBackup()->CopyFrom(restoreOp); + auto& restoreOp = *outTx.MutableRestoreIncrementalBackup(); + PathIdFromPathId(srcTablePath.Base()->PathId, restoreOp.MutableSrcPathId()); + PathIdFromPathId(dstTablePath.Base()->PathId, restoreOp.MutableDstPathId()); + result.push_back(MakeSubOperation(NextPartId(opId, result), outTx)); + } + + /* + DoCreateAlterTable(opId, dstTablePath, result); NCdc::DoCreateStream( @@ -194,6 +631,8 @@ TVector CreateRestoreIncrementalBackup(TOperationId opId, c acceptExisted, result); + */ + return result; }