diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt index d3a5e8589cd1..978d9a2cbbd4 100644 --- a/.github/config/muted_ya.txt +++ b/.github/config/muted_ya.txt @@ -40,7 +40,6 @@ ydb/core/kqp/ut/service [*/*] chunk chunk ydb/core/kqp/ut/tx KqpSinkTx.OlapInvalidateOnError ydb/core/kqp/ut/yql KqpScripting.StreamExecuteYqlScriptScanOperationTmeoutBruteForce ydb/core/statistics/aggregator/ut AnalyzeColumnshard.AnalyzeRebootColumnShard -ydb/core/tx/datashard/ut_incremental_backup IncrementalBackup.ComplexRestoreBackupCollection+WithIncremental ydb/core/tx/schemeshard/ut_login_large TSchemeShardLoginLargeTest.RemoveLogin_Many ydb/core/tx/schemeshard/ut_move_reboots TSchemeShardMoveRebootsTest.WithData ydb/core/tx/schemeshard/ut_move_reboots TSchemeShardMoveRebootsTest.WithDataAndPersistentPartitionStats diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index d42f0b9b539f..7fb2fc0f94a6 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -1844,6 +1844,8 @@ enum EPathState { EPathStateRestore = 10; EPathStateMoving = 11; EPathStateOutgoingIncrementalRestore = 12; + EPathStateAwaitingOutgoingIncrementalRestore = 13; + EPathStateIncomingIncrementalRestore = 14; } message TPathVersion { diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index bd56a183c2dc..d7a41b643fe4 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -1727,6 +1727,46 @@ message TEvCdcStreamScanResponse { optional TStats Stats = 6; } +message TEvRestoreScanRequest { + message TLimits { + optional uint32 BatchMaxBytes = 1 [default = 512000]; + optional uint32 BatchMinRows = 2 [default = 10]; + optional uint32 BatchMaxRows = 3 [default = 1000]; + }; + + optional NKikimrProto.TPathID TablePathId = 1; // which table should be scanned + optional uint64 TableSchemaVersion = 2; + optional NKikimrProto.TPathID StreamPathId = 3; + optional uint64 SnapshotStep = 4; + optional uint64 SnapshotTxId = 5; + optional TLimits Limits = 6; +} + +message TEvRestoreScanResponse { + enum EStatus { + PENDING = 0; + ACCEPTED = 1; + IN_PROGRESS = 2; + DONE = 3; + BAD_REQUEST = 4; + SCHEME_ERROR = 5; + OVERLOADED = 6; + ABORTED = 7; + } + + message TStats { + optional uint64 RowsProcessed = 1; + optional uint64 BytesProcessed = 2; + } + + optional uint64 TabletId = 1; + optional NKikimrProto.TPathID TablePathId = 2; + optional NKikimrProto.TPathID StreamPathId = 3; + optional EStatus Status = 4; + optional string ErrorDescription = 5; + optional TStats Stats = 6; +} + message TComputeShardingPolicy { repeated string ColumnNames = 1; optional uint32 ShardsCount = 2 [default = 0]; diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h index 7d2bd5291ba0..e32b77d9a625 100644 --- a/ydb/core/tx/datashard/datashard.h +++ b/ydb/core/tx/datashard/datashard.h @@ -352,6 +352,9 @@ namespace TEvDataShard { EvPrefixKMeansRequest, EvPrefixKMeansResponse, + EvRestoreScanRequest, + EvRestoreScanResponse, + EvEnd }; @@ -1812,6 +1815,32 @@ namespace TEvDataShard { } }; + struct TEvRestoreScanRequest + : public TEventPB + { + }; + + struct TEvRestoreScanResponse + : public TEventPB + { + TEvRestoreScanResponse() = default; + + explicit TEvRestoreScanResponse( + const NKikimrTxDataShard::TEvRestoreScanRequest& request, ui64 tabletId, + NKikimrTxDataShard::TEvRestoreScanResponse::EStatus status, const TString& error = {}) + { + Record.SetTabletId(tabletId); + Record.MutableTablePathId()->CopyFrom(request.GetTablePathId()); + Record.MutableStreamPathId()->CopyFrom(request.GetStreamPathId()); + Record.SetStatus(status); + Record.SetErrorDescription(error); + } + }; + struct TEvInMemoryStateRequest : public TEventPBGetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table1", SimpleTable()); + CreateShardedTable(server, edgeActor, "/Root", "Table2", SimpleTable()); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table1` (key, value) VALUES + (1, 10) + , (2, 20) + , (3, 30) + ; + )"); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table2` (key, value) VALUES + (4, 10) + , (5, 20) + , (6, 30) + ; + )"); + + ExecSQL(server, edgeActor, R"( + CREATE BACKUP COLLECTION `MyCollection` + ( TABLE `/Root/Table1` + , TABLE `/Root/Table2` + ) + WITH + ( STORAGE = 'cluster' + , INCREMENTAL_BACKUP_ENABLED = 'true' + ); + )", false); + + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection`;)", false); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table1` (key, value) VALUES + (2, 200); + )"); + + ExecSQL(server, edgeActor, R"(DELETE FROM `/Root/Table1` WHERE key=1;)"); + + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection` INCREMENTAL;)", false); + + SimulateSleep(server, TDuration::Seconds(1)); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table2` (key, value) VALUES + (5, 5000); + )"); + + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection` INCREMENTAL;)", false); + + SimulateSleep(server, TDuration::Seconds(1)); + + auto expected1 = KqpSimpleExec(runtime, R"(SELECT key, value FROM `/Root/Table1` ORDER BY key)"); + auto expected2 = KqpSimpleExec(runtime, R"(SELECT key, value FROM `/Root/Table2` ORDER BY key)"); + + ExecSQL(server, edgeActor, R"(DROP TABLE `/Root/Table1`;)", false); + ExecSQL(server, edgeActor, R"(DROP TABLE `/Root/Table2`;)", false); + + ExecSQL(server, edgeActor, R"(RESTORE `MyCollection`;)", false); + + SimulateSleep(server, TDuration::Seconds(1)); + + auto actual1 = KqpSimpleExec(runtime, R"(SELECT key, value FROM `/Root/Table1` ORDER BY key)"); + auto actual2 = KqpSimpleExec(runtime, R"(SELECT key, value FROM `/Root/Table2` ORDER BY key)"); + + UNIT_ASSERT_VALUES_EQUAL(expected1, actual1); + UNIT_ASSERT_VALUES_EQUAL(expected2, actual2); + } + } // Y_UNIT_TEST_SUITE(IncrementalBackup) } // NKikimr diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp index 03d8f033eeea..5e4c468810fe 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp @@ -68,8 +68,8 @@ class TConfigurePartsAtTable : public TSubOperationState { auto table = context.SS->Tables.at(pathId); auto& op = *tx.MutableCreateIncrementalRestoreSrc(); - op.MutableSrcPathId()->CopyFrom(RestoreOp.GetSrcPathIds(LoopStep)); - op.SetSrcTablePath(RestoreOp.GetSrcTablePaths(LoopStep)); + // op.MutableSrcPathId()->CopyFrom(RestoreOp.GetSrcPathIds(LoopStep)); + // op.SetSrcTablePath(RestoreOp.GetSrcTablePaths(LoopStep)); pathId.ToProto(op.MutableDstPathId()); op.SetDstTablePath(RestoreOp.GetDstTablePath()); } @@ -77,11 +77,9 @@ class TConfigurePartsAtTable : public TSubOperationState { public: explicit TConfigurePartsAtTable( TOperationId id, - const NKikimrSchemeOp::TRestoreMultipleIncrementalBackups& restoreOp, - ui64 loopStep) + const NKikimrSchemeOp::TRestoreMultipleIncrementalBackups& restoreOp) : OperationId(id) , RestoreOp(restoreOp) - , LoopStep(loopStep) { LOG_TRACE_S(*TlsActivationContext, NKikimrServices::FLAT_TX_SCHEMESHARD, DebugHint() << " Constructed op# " << restoreOp.DebugString()); IgnoreMessages(DebugHint(), {}); @@ -135,7 +133,6 @@ class TConfigurePartsAtTable : public TSubOperationState { private: const TOperationId OperationId; const NKikimrSchemeOp::TRestoreMultipleIncrementalBackups RestoreOp; - const ui64 LoopStep; }; // TConfigurePartsAtTable class TProposeAtTable : public TSubOperationState { @@ -262,17 +259,63 @@ class TDone: public TSubOperationState { const auto* txState = context.SS->FindTx(OperationId); Y_ABORT_UNLESS(txState); Y_ABORT_UNLESS(IsExpectedTxType(txState->TxType)); - Y_ABORT_UNLESS(txState->LoopStep == RestoreOp.SrcPathIdsSize()); + // Y_ABORT_UNLESS(txState->LoopStep == RestoreOp.SrcPathIdsSize()); Y_ABORT_UNLESS(txState->TargetPathId == TPathId::FromProto(RestoreOp.GetSrcPathIds(RestoreOp.SrcPathIdsSize() - 1))); for (const auto& pathId : RestoreOp.GetSrcPathIds()) { context.OnComplete.ReleasePathState(OperationId, TPathId::FromProto(pathId), TPathElement::EPathState::EPathStateNoChanges); } + context.OnComplete.Send(context.SS->SelfId(), new TEvPrivate::TEvRunRestoreScan(RestoreOp)); + context.OnComplete.DoneOperation(OperationId); return true; } + // bool HandleReply(TEvPrivate::TEvCompleteBarrier::TPtr&, TOperationContext& context) override { // WIP + // LOG_I(DebugHint() << "HandleReply TEvCompleteBarrier"); + + // // if (!TDone::Process(context)) { + // // return false; + // // } + + // const auto* txState = context.SS->FindTx(OperationId); + // Y_ABORT_UNLESS(txState); + // Y_ABORT_UNLESS(txState->TxType == TTxState::TxCreateCdcStreamAtTableWithInitialScan); + // const auto& pathId = txState->TargetPathId; + + // Y_ABORT_UNLESS(context.SS->PathsById.contains(pathId)); + // auto path = context.SS->PathsById.at(pathId); + + // TMaybe streamPathId; + // for (const auto& [_, childPathId] : path->GetChildren()) { + // Y_ABORT_UNLESS(context.SS->PathsById.contains(childPathId)); + // auto childPath = context.SS->PathsById.at(childPathId); + + // if (childPath->CreateTxId != OperationId.GetTxId()) { + // continue; + // } + + // Y_ABORT_UNLESS(childPath->IsCdcStream() && !childPath->Dropped()); + // Y_ABORT_UNLESS(context.SS->CdcStreams.contains(childPathId)); + // auto stream = context.SS->CdcStreams.at(childPathId); + + // Y_ABORT_UNLESS(stream->State == TCdcStreamInfo::EState::ECdcStreamStateScan); + // Y_VERIFY_S(!streamPathId, "Too many cdc streams are planned to fill with initial scan" + // << ": found# " << *streamPathId + // << ", another# " << childPathId); + // streamPathId = childPathId; + // } + + // if (AppData()->DisableCdcAutoSwitchingToReadyStateForTests) { + // return true; + // } + + // Y_ABORT_UNLESS(streamPathId); + + // return true; + // } + private: const TOperationId OperationId; const NKikimrSchemeOp::TRestoreMultipleIncrementalBackups RestoreOp; @@ -340,7 +383,7 @@ class TNewRestoreFromAtTable : public TSubOperation { Y_ABORT("unreachable"); } - TTxState::ETxState NextState(TTxState::ETxState state, TOperationContext& context) const { + TTxState::ETxState NextState(TTxState::ETxState state, TOperationContext&) const { switch (state) { case TTxState::Waiting: return TTxState::CopyTableBarrier; @@ -350,37 +393,23 @@ class TNewRestoreFromAtTable : public TSubOperation { return TTxState::Propose; case TTxState::Propose: return TTxState::ProposedWaitParts; - case TTxState::ProposedWaitParts: { - auto* txState = context.SS->FindTx(OperationId); - Y_ABORT_UNLESS(txState); - ++(txState->LoopStep); - if (txState->LoopStep < Transaction.GetRestoreMultipleIncrementalBackups().SrcPathIdsSize()) { - txState->TargetPathId = TPathId::FromProto(Transaction.GetRestoreMultipleIncrementalBackups().GetSrcPathIds(txState->LoopStep)); - txState->TxShardsListFinalized = false; - // TODO preserve TxState - return TTxState::ConfigureParts; - } + case TTxState::ProposedWaitParts: return TTxState::Done; - } default: return TTxState::Invalid; } } - TSubOperationState::TPtr SelectStateFunc(TTxState::ETxState state, TOperationContext& context) override { + TSubOperationState::TPtr SelectStateFunc(TTxState::ETxState state, TOperationContext&) override { switch (state) { case TTxState::Waiting: case TTxState::CopyTableBarrier: return MakeHolder(OperationId); - case TTxState::ConfigureParts: { - auto* txState = context.SS->FindTx(OperationId); - Y_ABORT_UNLESS(txState); - return MakeHolder(OperationId, Transaction.GetRestoreMultipleIncrementalBackups(), txState->LoopStep); - } + case TTxState::ConfigureParts: + return MakeHolder(OperationId, Transaction.GetRestoreMultipleIncrementalBackups()); case TTxState::Propose: return MakeHolder(OperationId, Transaction.GetRestoreMultipleIncrementalBackups()); case TTxState::ProposedWaitParts: - // TODO: check the right next state always choosen return MakeHolder(OperationId); case TTxState::Done: return MakeHolder(OperationId, Transaction.GetRestoreMultipleIncrementalBackups()); @@ -510,7 +539,7 @@ class TNewRestoreFromAtTable : public TSubOperation { Y_ABORT_UNLESS(table->AlterVersion != 0); Y_ABORT_UNLESS(!table->AlterData); - tablePath.Base()->PathState = NKikimrSchemeOp::EPathStateOutgoingIncrementalRestore; + tablePath.Base()->PathState = NKikimrSchemeOp::EPathStateAwaitingOutgoingIncrementalRestore; tablePath.Base()->LastTxId = OperationId.GetTxId(); for (const auto& splitOpId : table->GetSplitOpsInFlight()) { diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 416cd2adb78a..1e596c22627e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -1510,6 +1510,27 @@ class TSchemeShard void RemoveCdcStreamScanShardStatus(NIceDb::TNiceDb& db, const TPathId& streamPathId, const TShardIdx& shardIdx); // } // NCdcStreamScan + // namespace NRestoreScan { + struct TRestoreScan { + struct TTxProgress; + }; + + TDedicatedPipePool RestoreScanPipes; + + NTabletFlatExecutor::ITransaction* CreateTxProgressRestoreScan(TEvPrivate::TEvRunRestoreScan::TPtr& ev); + NTabletFlatExecutor::ITransaction* CreateTxProgressRestoreScan(TEvDataShard::TEvRestoreScanResponse::TPtr& ev); + NTabletFlatExecutor::ITransaction* CreatePipeRetry(TTabletId tabletId); + + void Handle(TEvPrivate::TEvRunRestoreScan::TPtr& ev, const TActorContext& ctx); + void Handle(TEvDataShard::TEvRestoreScanResponse::TPtr& ev, const TActorContext& ctx); + + void ResumeRestoreScans(const TVector& ids, const TActorContext& ctx); + + void PersistRestoreScanShardStatus(NIceDb::TNiceDb& db, const TPathId& streamPathId, const TShardIdx& shardIdx, + const TCdcStreamInfo::TShardStatus& status); + void RemoveRestoreScanShardStatus(NIceDb::TNiceDb& db, const TPathId& streamPathId, const TShardIdx& shardIdx); + // } // NRestoreScan + // statistics TTabletId StatisticsAggregatorId; TActorId SAPipeClientId; diff --git a/ydb/core/tx/schemeshard/schemeshard_incremental_restore.cpp b/ydb/core/tx/schemeshard/schemeshard_incremental_restore.cpp new file mode 100644 index 000000000000..540868b45d5d --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard_incremental_restore.cpp @@ -0,0 +1,413 @@ +#include "schemeshard_billing_helpers.h" +#include "schemeshard_impl.h" + +#include +#include + +#include + +#if defined LOG_D || \ + defined LOG_W || \ + defined LOG_E +#error log macro redefinition +#endif + +#define LOG_D(stream) LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RestoreScan] " << stream) +#define LOG_N(stream) LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RestoreScan] " << stream) +#define LOG_W(stream) LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RestoreScan] " << stream) +#define LOG_E(stream) LOG_ERROR_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[RestoreScan] " << stream) + +namespace NKikimr::NSchemeShard { + +using namespace NTabletFlatExecutor; + +class TRestoreScanFinalizer: public TActorBootstrapped { +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::SCHEMESHARD_CDC_STREAM_SCAN_FINALIZER; + } + + explicit TRestoreScanFinalizer(const TActorId& ssActorId, THolder&& req) + : SSActorId(ssActorId) + , Request(std::move(req)) // template without txId + { + } + + void Bootstrap() { + AllocateTxId(); + Become(&TRestoreScanFinalizer::StateWork); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, Handle) + sFunc(TEvents::TEvPoison, PassAway); + } + } + +private: + void AllocateTxId() { + Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId); + } + + void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) { + Request->Record.SetTxId(ev->Get()->TxId); + Send(SSActorId, Request.Release()); + TActorBootstrapped::PassAway(); + } + +private: + const TActorId SSActorId; + THolder Request; + +}; // TRestoreScanFinalizer + +struct TSchemeShard::TRestoreScan::TTxProgress: public TTransactionBase { + // params + TEvPrivate::TEvRunRestoreScan::TPtr RunRestoreScan = nullptr; + TEvDataShard::TEvRestoreScanResponse::TPtr RestoreScanResponse = nullptr; + struct { + TPathId StreamPathId; + TTabletId TabletId; + explicit operator bool() const { return StreamPathId && TabletId; } + } PipeRetry; + + // side effects + TDeque>> ScanRequests; + TPathId StreamToProgress; + THolder Metering; + THolder Finalize; + +public: + explicit TTxProgress(TSelf* self, TEvPrivate::TEvRunRestoreScan::TPtr& ev) + : TBase(self) + , RunRestoreScan(ev) + { + } + + explicit TTxProgress(TSelf* self, TEvDataShard::TEvRestoreScanResponse::TPtr& ev) + : TBase(self) + , RestoreScanResponse(ev) + { + } + + explicit TTxProgress(TSelf* self, const TPathId& streamPathId, TTabletId tabletId) + : TBase(self) + , PipeRetry({streamPathId, tabletId}) + { + } + + TTxType GetTxType() const override { + return TXTYPE_CDC_STREAM_SCAN_PROGRESS; + } + + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { + if (RunRestoreScan) { + return OnRunRestoreScan(txc, ctx); + } else if (RestoreScanResponse) { + return OnRestoreScanResponse(txc, ctx); + } else if (PipeRetry) { + return OnPipeRetry(txc, ctx); + } else { + Y_ABORT("unreachable"); + } + } + + void Complete(const TActorContext& ctx) override { + for (auto& [streamPathId, tabletId, ev] : ScanRequests) { + Self->RestoreScanPipes.Send(streamPathId, tabletId, std::move(ev), ctx); + } + + if (StreamToProgress) { + ctx.Send(ctx.SelfID, new TEvPrivate::TEvRunRestoreScan(StreamToProgress)); + } + + if (Metering) { + ctx.Send(NMetering::MakeMeteringServiceID(), Metering.Release()); + } + + if (Finalize) { + ctx.Register(new TRestoreScanFinalizer(ctx.SelfID, std::move(Finalize))); + } + } + +private: + bool OnRunRestoreScan(TTransactionContext& txc, const TActorContext& ctx) { + const auto& streamPathId = RunRestoreScan->Get()->StreamPathId; + + LOG_D("Run" + << ": streamPathId# " << streamPathId); + + if (!Self->CdcStreams.contains(streamPathId)) { + LOG_W("Cannot run" + << ": streamPathId# " << streamPathId + << ", reason# " << "stream doesn't exist"); + return true; + } + + auto streamInfo = Self->CdcStreams.at(streamPathId); + if (streamInfo->State != TCdcStreamInfo::EState::ECdcStreamStateScan) { + LOG_W("Cannot run" + << ": streamPathId# " << streamPathId + << ", reason# " << "unexpected state"); + return true; + } + + Y_ABORT_UNLESS(Self->PathsById.contains(streamPathId)); + auto streamPath = Self->PathsById.at(streamPathId); + + Y_ABORT_UNLESS(Self->PathsById.contains(streamPathId)); + const auto& tablePathId = Self->PathsById.at(streamPathId)->ParentPathId; + + Y_ABORT_UNLESS(Self->Tables.contains(tablePathId)); + auto table = Self->Tables.at(tablePathId); + + if (streamInfo->ScanShards.empty()) { + NIceDb::TNiceDb db(txc.DB); + for (const auto& shard : table->GetPartitions()) { + // const auto status = TCdcStreamInfo::TShardStatus(NKikimrTxDataShard::TEvRestoreScanResponse::PENDING); + const auto status = TCdcStreamInfo::TShardStatus(NKikimrTxDataShard::TEvCdcStreamScanResponse::PENDING); + streamInfo->ScanShards.emplace(shard.ShardIdx, status); + streamInfo->PendingShards.insert(shard.ShardIdx); + Self->PersistRestoreScanShardStatus(db, streamPathId, shard.ShardIdx, status); + } + } + + while (!streamInfo->PendingShards.empty()) { + if (streamInfo->InProgressShards.size() >= Self->MaxCdcInitialScanShardsInFlight) { + break; + } + + auto it = streamInfo->PendingShards.begin(); + + Y_ABORT_UNLESS(Self->ShardInfos.contains(*it)); + const auto tabletId = Self->ShardInfos.at(*it).TabletID; + + streamInfo->InProgressShards.insert(*it); + streamInfo->PendingShards.erase(it); + + auto ev = MakeHolder(); + tablePathId.ToProto(ev->Record.MutableTablePathId()); + ev->Record.SetTableSchemaVersion(table->AlterVersion); + streamPathId.ToProto(ev->Record.MutableStreamPathId()); + ev->Record.SetSnapshotStep(ui64(streamPath->StepCreated)); + ev->Record.SetSnapshotTxId(ui64(streamPath->CreateTxId)); + ScanRequests.emplace_back(streamPathId, tabletId, std::move(ev)); + } + + if (streamInfo->DoneShards.size() == streamInfo->ScanShards.size()) { + const auto path = TPath::Init(streamPathId, Self); + + Finalize = MakeHolder(); + auto& tx = *Finalize->Record.AddTransaction(); + tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterCdcStream); + tx.SetWorkingDir(path.Parent().Parent().PathString()); // stream -> table -> working dir + tx.SetFailOnExist(false); + + auto& op = *tx.MutableAlterCdcStream(); + op.SetTableName(path.Parent().LeafName()); + op.SetStreamName(path.LeafName()); + op.MutableGetReady()->SetLockTxId(ui64(streamPath->CreateTxId)); + tx.MutableLockGuard()->SetOwnerTxId(ui64(streamPath->CreateTxId)); + } + + return true; + } + + bool OnRestoreScanResponse(TTransactionContext& txc, const TActorContext& ctx) { + const auto& record = RestoreScanResponse->Get()->Record; + + LOG_D("Response" + << ": ev# " << record.ShortDebugString()); + + // FIXME + + Y_UNUSED(txc, ctx, record); + + // const auto streamPathId = TPathId::FromProto(record.GetStreamPathId()); + // if (!Self->CdcStreams.contains(streamPathId)) { + // LOG_W("Cannot process response" + // << ": streamPathId# " << streamPathId + // << ", reason# " << "stream doesn't exist"); + // return true; + // } + + // auto streamInfo = Self->CdcStreams.at(streamPathId); + // if (streamInfo->State != TCdcStreamInfo::EState::ECdcStreamStateScan) { + // LOG_W("Cannot process response" + // << ": streamPathId# " << streamPathId + // << ", reason# " << "unexpected state"); + // return true; + // } + + // const auto tabletId = TTabletId(record.GetTabletId()); + // const auto shardIdx = Self->GetShardIdx(tabletId); + // if (shardIdx == InvalidShardIdx) { + // LOG_E("Cannot process response" + // << ": streamPathId# " << streamPathId + // << ", tabletId# " << tabletId + // << ", reason# " << "tablet not found"); + // return true; + // } + + // auto it = streamInfo->ScanShards.find(shardIdx); + // if (it == streamInfo->ScanShards.end()) { + // LOG_E("Cannot process response" + // << ": streamPathId# " << streamPathId + // << ", shardIdx# " << shardIdx + // << ", reason# " << "shard not found"); + // return true; + // } + + // auto& status = it->second; + // if (!streamInfo->InProgressShards.contains(shardIdx)) { + // LOG_W("Shard status mismatch" + // << ": streamPathId# " << streamPathId + // << ", shardIdx# " << shardIdx + // << ", got# " << record.GetStatus() + // << ", current# " << status.Status); + // return true; + // } + + // switch (record.GetStatus()) { + // case NKikimrTxDataShard::TEvRestoreScanResponse::ACCEPTED: + // case NKikimrTxDataShard::TEvRestoreScanResponse::IN_PROGRESS: + // break; + + // case NKikimrTxDataShard::TEvRestoreScanResponse::DONE: + // status.Status = record.GetStatus(); + // streamInfo->DoneShards.insert(shardIdx); + // streamInfo->InProgressShards.erase(shardIdx); + // Self->RestoreScanPipes.Close(streamPathId, tabletId, ctx); + // StreamToProgress = streamPathId; + // Bill(streamPathId, shardIdx, TRUCalculator::ReadTable(record.GetStats().GetBytesProcessed()), ctx); + // break; + + // case NKikimrTxDataShard::TEvRestoreScanResponse::OVERLOADED: + // case NKikimrTxDataShard::TEvRestoreScanResponse::ABORTED: + // streamInfo->PendingShards.insert(shardIdx); + // streamInfo->InProgressShards.erase(shardIdx); + // Self->RestoreScanPipes.Close(streamPathId, tabletId, ctx); + // StreamToProgress = streamPathId; + // break; + + // case NKikimrTxDataShard::TEvRestoreScanResponse::BAD_REQUEST: + // case NKikimrTxDataShard::TEvRestoreScanResponse::SCHEME_ERROR: + // Y_ABORT("unreachable"); + + // default: + // LOG_E("Unexpected response status" + // << ": status# " << static_cast(record.GetStatus()) + // << ", error# " << record.GetErrorDescription()); + // return true; + // } + + // NIceDb::TNiceDb db(txc.DB); + // Self->PersistRestoreScanShardStatus(db, streamPathId, shardIdx, status); + + // if (streamInfo->DoneShards.size() == streamInfo->ScanShards.size()) { + // StreamToProgress = streamPathId; + // } + + return true; + } + + bool OnPipeRetry(TTransactionContext&, const TActorContext& ctx) { + const auto& streamPathId = PipeRetry.StreamPathId; + const auto& tabletId = PipeRetry.TabletId; + + LOG_D("Pipe retry" + << ": streamPathId# " << streamPathId + << ", tabletId# " << tabletId); + + if (!Self->CdcStreams.contains(streamPathId)) { + LOG_W("Cannot retry" + << ": streamPathId# " << streamPathId + << ", reason# " << "stream doesn't exist"); + return true; + } + + auto streamInfo = Self->CdcStreams.at(streamPathId); + if (streamInfo->State != TCdcStreamInfo::EState::ECdcStreamStateScan) { + LOG_W("Cannot retry" + << ": streamPathId# " << streamPathId + << ", reason# " << "unexpected state"); + return true; + } + + const auto shardIdx = Self->GetShardIdx(tabletId); + if (shardIdx == InvalidShardIdx) { + LOG_E("Cannot retry" + << ": streamPathId# " << streamPathId + << ", tabletId# " << tabletId + << ", reason# " << "tablet not found"); + return true; + } + + auto it = streamInfo->InProgressShards.find(shardIdx); + if (it == streamInfo->InProgressShards.end()) { + LOG_E("Cannot retry" + << ": streamPathId# " << streamPathId + << ", shardIdx# " << shardIdx + << ", reason# " << "shard not found"); + return true; + } + + streamInfo->PendingShards.insert(*it); + streamInfo->InProgressShards.erase(it); + Self->RestoreScanPipes.Close(streamPathId, tabletId, ctx); + StreamToProgress = streamPathId; + + return true; + } +}; + +ITransaction* TSchemeShard::CreateTxProgressRestoreScan(TEvPrivate::TEvRunRestoreScan::TPtr& ev) { + return new TRestoreScan::TTxProgress(this, ev); +} + +ITransaction* TSchemeShard::CreateTxProgressRestoreScan(TEvDataShard::TEvRestoreScanResponse::TPtr& ev) { + return new TRestoreScan::TTxProgress(this, ev); +} + +ITransaction* TSchemeShard::CreatePipeRetry(TTabletId tabletId) { + // TODO + return nullptr; + // return new TRestoreScan::TTxProgress(this, tabletId); +} + +void TSchemeShard::Handle(TEvPrivate::TEvRunRestoreScan::TPtr& ev, const TActorContext& ctx) { + Execute(CreateTxProgressRestoreScan(ev), ctx); +} + +void TSchemeShard::Handle(TEvDataShard::TEvRestoreScanResponse::TPtr& ev, const TActorContext& ctx) { + Execute(CreateTxProgressRestoreScan(ev), ctx); +} + +void TSchemeShard::ResumeRestoreScans(const TVector& ids, const TActorContext& ctx) { + for (const auto& id : ids) { + Send(ctx.SelfID, new TEvPrivate::TEvRunRestoreScan(id)); + } +} + +void TSchemeShard::PersistRestoreScanShardStatus(NIceDb::TNiceDb& db, const TPathId& streamPathId, + const TShardIdx& shardIdx, const TCdcStreamInfo::TShardStatus& status) +{ + // FIXME + + Y_UNUSED(db, streamPathId, shardIdx, status); + + // db.Table() + // .Key(streamPathId.OwnerId, streamPathId.LocalPathId, shardIdx.GetOwnerId(), shardIdx.GetLocalId()) + // .Update( + // NIceDb::TUpdate(status.Status) + // ); +} + +void TSchemeShard::RemoveRestoreScanShardStatus(NIceDb::TNiceDb& db, const TPathId& streamPathId, const TShardIdx& shardIdx) { + db.Table() + .Key(streamPathId.OwnerId, streamPathId.LocalPathId, shardIdx.GetOwnerId(), shardIdx.GetLocalId()) + .Delete(); +} + +} diff --git a/ydb/core/tx/schemeshard/schemeshard_private.h b/ydb/core/tx/schemeshard/schemeshard_private.h index d6bcba2d812b..2e2ce6431e60 100644 --- a/ydb/core/tx/schemeshard/schemeshard_private.h +++ b/ydb/core/tx/schemeshard/schemeshard_private.h @@ -42,6 +42,7 @@ namespace TEvPrivate { EvRetryNodeSubscribe, EvRunDataErasure, EvRunTenantDataErasure, + EvRunRestoreScan, EvEnd }; @@ -254,6 +255,14 @@ namespace TEvPrivate { {} }; + struct TEvRunRestoreScan: public TEventLocal { + const NKikimrSchemeOp::TRestoreMultipleIncrementalBackups RestoreOp; + + TEvRunRestoreScan(const NKikimrSchemeOp::TRestoreMultipleIncrementalBackups& restoreOp) + : RestoreOp(restoreOp) + {} + }; + struct TEvSendBaseStatsToSA: public TEventLocal { }; diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index 0b651a61dc80..7777662c1bbc 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -2028,6 +2028,26 @@ struct Schema : NIceDb::Schema { >; }; + struct RestoreScanShardStatus : Table<119> { + // path id of cdc stream + struct OwnerPathId : Column<1, NScheme::NTypeIds::Uint64> { using Type = TOwnerId; }; + struct LocalPathId : Column<2, NScheme::NTypeIds::Uint64> { using Type = TLocalPathId; }; + // shard idx of datashard + struct OwnerShardIdx : Column<3, NScheme::NTypeIds::Uint64> { using Type = TOwnerId; }; + struct LocalShardIdx : Column<4, NScheme::NTypeIds::Uint64> { using Type = TLocalShardIdx; }; + + struct Status : Column<5, NScheme::NTypeIds::Uint32> { using Type = NKikimrTxDataShard::TEvRestoreScanResponse::EStatus; }; + + using TKey = TableKey; + using TColumns = TableColumns< + OwnerPathId, + LocalPathId, + OwnerShardIdx, + LocalShardIdx, + Status + >; + }; + using TTables = SchemaTables< Paths, TxInFlight, @@ -2144,7 +2164,8 @@ struct Schema : NIceDb::Schema { DataErasureGenerations, WaitingDataErasureTenants, TenantDataErasureGenerations, - WaitingDataErasureShards + WaitingDataErasureShards, + RestoreScanShardStatus >; static constexpr ui64 SysParam_NextPathId = 1; diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index c68a0494d7ac..43dc0e40ee8c 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -219,6 +219,7 @@ SRCS( schemeshard_build_index_tx_base.cpp schemeshard_cdc_stream_common.cpp schemeshard_cdc_stream_scan.cpp + schemeshard_incremental_restore.cpp schemeshard_domain_links.cpp schemeshard_domain_links.h schemeshard_effective_acl.cpp