Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ ydb/core/kqp/ut/service [*/*] chunk chunk
ydb/core/kqp/ut/tx KqpSinkTx.OlapInvalidateOnError
ydb/core/kqp/ut/yql KqpScripting.StreamExecuteYqlScriptScanOperationTmeoutBruteForce
ydb/core/statistics/aggregator/ut AnalyzeColumnshard.AnalyzeRebootColumnShard
ydb/core/tx/datashard/ut_incremental_backup IncrementalBackup.ComplexRestoreBackupCollection+WithIncremental
ydb/core/tx/schemeshard/ut_login_large TSchemeShardLoginLargeTest.RemoveLogin_Many
ydb/core/tx/schemeshard/ut_move_reboots TSchemeShardMoveRebootsTest.WithData
ydb/core/tx/schemeshard/ut_move_reboots TSchemeShardMoveRebootsTest.WithDataAndPersistentPartitionStats
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1844,6 +1844,8 @@ enum EPathState {
EPathStateRestore = 10;
EPathStateMoving = 11;
EPathStateOutgoingIncrementalRestore = 12;
EPathStateAwaitingOutgoingIncrementalRestore = 13;
EPathStateIncomingIncrementalRestore = 14;
}

message TPathVersion {
Expand Down
40 changes: 40 additions & 0 deletions ydb/core/protos/tx_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1727,6 +1727,46 @@ message TEvCdcStreamScanResponse {
optional TStats Stats = 6;
}

message TEvRestoreScanRequest {
message TLimits {
optional uint32 BatchMaxBytes = 1 [default = 512000];
optional uint32 BatchMinRows = 2 [default = 10];
optional uint32 BatchMaxRows = 3 [default = 1000];
};

optional NKikimrProto.TPathID TablePathId = 1; // which table should be scanned
optional uint64 TableSchemaVersion = 2;
optional NKikimrProto.TPathID StreamPathId = 3;
optional uint64 SnapshotStep = 4;
optional uint64 SnapshotTxId = 5;
optional TLimits Limits = 6;
}

message TEvRestoreScanResponse {
enum EStatus {
PENDING = 0;
ACCEPTED = 1;
IN_PROGRESS = 2;
DONE = 3;
BAD_REQUEST = 4;
SCHEME_ERROR = 5;
OVERLOADED = 6;
ABORTED = 7;
}

message TStats {
optional uint64 RowsProcessed = 1;
optional uint64 BytesProcessed = 2;
}

optional uint64 TabletId = 1;
optional NKikimrProto.TPathID TablePathId = 2;
optional NKikimrProto.TPathID StreamPathId = 3;
optional EStatus Status = 4;
optional string ErrorDescription = 5;
optional TStats Stats = 6;
}

message TComputeShardingPolicy {
repeated string ColumnNames = 1;
optional uint32 ShardsCount = 2 [default = 0];
Expand Down
29 changes: 29 additions & 0 deletions ydb/core/tx/datashard/datashard.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,9 @@ namespace TEvDataShard {
EvPrefixKMeansRequest,
EvPrefixKMeansResponse,

EvRestoreScanRequest,
EvRestoreScanResponse,

EvEnd
};

Expand Down Expand Up @@ -1812,6 +1815,32 @@ namespace TEvDataShard {
}
};

struct TEvRestoreScanRequest
: public TEventPB<TEvRestoreScanRequest,
NKikimrTxDataShard::TEvRestoreScanRequest,
EvRestoreScanRequest>
{
};

struct TEvRestoreScanResponse
: public TEventPB<TEvRestoreScanResponse,
NKikimrTxDataShard::TEvRestoreScanResponse,
EvRestoreScanResponse>
{
TEvRestoreScanResponse() = default;

explicit TEvRestoreScanResponse(
const NKikimrTxDataShard::TEvRestoreScanRequest& request, ui64 tabletId,
NKikimrTxDataShard::TEvRestoreScanResponse::EStatus status, const TString& error = {})
{
Record.SetTabletId(tabletId);
Record.MutableTablePathId()->CopyFrom(request.GetTablePathId());
Record.MutableStreamPathId()->CopyFrom(request.GetStreamPathId());
Record.SetStatus(status);
Record.SetErrorDescription(error);
}
};

struct TEvInMemoryStateRequest
: public TEventPB<TEvInMemoryStateRequest,
NKikimrTxDataShard::TEvInMemoryStateRequest,
Expand Down
83 changes: 83 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,89 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) {
UNIT_ASSERT_VALUES_EQUAL(expected, actual);
}

Y_UNIT_TEST(E2EBackupCollectionMany) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
.SetUseRealThreads(false)
.SetDomainName("Root")
.SetEnableChangefeedInitialScan(true)
.SetEnableBackupService(true)
);

auto& runtime = *server->GetRuntime();
const auto edgeActor = runtime.AllocateEdgeActor();

SetupLogging(runtime);
InitRoot(server, edgeActor);
CreateShardedTable(server, edgeActor, "/Root", "Table1", SimpleTable());
CreateShardedTable(server, edgeActor, "/Root", "Table2", SimpleTable());

ExecSQL(server, edgeActor, R"(
UPSERT INTO `/Root/Table1` (key, value) VALUES
(1, 10)
, (2, 20)
, (3, 30)
;
)");

ExecSQL(server, edgeActor, R"(
UPSERT INTO `/Root/Table2` (key, value) VALUES
(4, 10)
, (5, 20)
, (6, 30)
;
)");

ExecSQL(server, edgeActor, R"(
CREATE BACKUP COLLECTION `MyCollection`
( TABLE `/Root/Table1`
, TABLE `/Root/Table2`
)
WITH
( STORAGE = 'cluster'
, INCREMENTAL_BACKUP_ENABLED = 'true'
);
)", false);

ExecSQL(server, edgeActor, R"(BACKUP `MyCollection`;)", false);

ExecSQL(server, edgeActor, R"(
UPSERT INTO `/Root/Table1` (key, value) VALUES
(2, 200);
)");

ExecSQL(server, edgeActor, R"(DELETE FROM `/Root/Table1` WHERE key=1;)");

ExecSQL(server, edgeActor, R"(BACKUP `MyCollection` INCREMENTAL;)", false);

SimulateSleep(server, TDuration::Seconds(1));

ExecSQL(server, edgeActor, R"(
UPSERT INTO `/Root/Table2` (key, value) VALUES
(5, 5000);
)");

ExecSQL(server, edgeActor, R"(BACKUP `MyCollection` INCREMENTAL;)", false);

SimulateSleep(server, TDuration::Seconds(1));

auto expected1 = KqpSimpleExec(runtime, R"(SELECT key, value FROM `/Root/Table1` ORDER BY key)");
auto expected2 = KqpSimpleExec(runtime, R"(SELECT key, value FROM `/Root/Table2` ORDER BY key)");

ExecSQL(server, edgeActor, R"(DROP TABLE `/Root/Table1`;)", false);
ExecSQL(server, edgeActor, R"(DROP TABLE `/Root/Table2`;)", false);

ExecSQL(server, edgeActor, R"(RESTORE `MyCollection`;)", false);

SimulateSleep(server, TDuration::Seconds(1));

auto actual1 = KqpSimpleExec(runtime, R"(SELECT key, value FROM `/Root/Table1` ORDER BY key)");
auto actual2 = KqpSimpleExec(runtime, R"(SELECT key, value FROM `/Root/Table2` ORDER BY key)");

UNIT_ASSERT_VALUES_EQUAL(expected1, actual1);
UNIT_ASSERT_VALUES_EQUAL(expected2, actual2);
}

} // Y_UNIT_TEST_SUITE(IncrementalBackup)

} // NKikimr
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,18 @@ class TConfigurePartsAtTable : public TSubOperationState {
auto table = context.SS->Tables.at(pathId);

auto& op = *tx.MutableCreateIncrementalRestoreSrc();
op.MutableSrcPathId()->CopyFrom(RestoreOp.GetSrcPathIds(LoopStep));
op.SetSrcTablePath(RestoreOp.GetSrcTablePaths(LoopStep));
// op.MutableSrcPathId()->CopyFrom(RestoreOp.GetSrcPathIds(LoopStep));
// op.SetSrcTablePath(RestoreOp.GetSrcTablePaths(LoopStep));
pathId.ToProto(op.MutableDstPathId());
op.SetDstTablePath(RestoreOp.GetDstTablePath());
}

public:
explicit TConfigurePartsAtTable(
TOperationId id,
const NKikimrSchemeOp::TRestoreMultipleIncrementalBackups& restoreOp,
ui64 loopStep)
const NKikimrSchemeOp::TRestoreMultipleIncrementalBackups& restoreOp)
: OperationId(id)
, RestoreOp(restoreOp)
, LoopStep(loopStep)
{
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::FLAT_TX_SCHEMESHARD, DebugHint() << " Constructed op# " << restoreOp.DebugString());
IgnoreMessages(DebugHint(), {});
Expand Down Expand Up @@ -135,7 +133,6 @@ class TConfigurePartsAtTable : public TSubOperationState {
private:
const TOperationId OperationId;
const NKikimrSchemeOp::TRestoreMultipleIncrementalBackups RestoreOp;
const ui64 LoopStep;
}; // TConfigurePartsAtTable

class TProposeAtTable : public TSubOperationState {
Expand Down Expand Up @@ -262,17 +259,63 @@ class TDone: public TSubOperationState {
const auto* txState = context.SS->FindTx(OperationId);
Y_ABORT_UNLESS(txState);
Y_ABORT_UNLESS(IsExpectedTxType(txState->TxType));
Y_ABORT_UNLESS(txState->LoopStep == RestoreOp.SrcPathIdsSize());
// Y_ABORT_UNLESS(txState->LoopStep == RestoreOp.SrcPathIdsSize());
Y_ABORT_UNLESS(txState->TargetPathId == TPathId::FromProto(RestoreOp.GetSrcPathIds(RestoreOp.SrcPathIdsSize() - 1)));

for (const auto& pathId : RestoreOp.GetSrcPathIds()) {
context.OnComplete.ReleasePathState(OperationId, TPathId::FromProto(pathId), TPathElement::EPathState::EPathStateNoChanges);
}

context.OnComplete.Send(context.SS->SelfId(), new TEvPrivate::TEvRunRestoreScan(RestoreOp));

context.OnComplete.DoneOperation(OperationId);
return true;
}

// bool HandleReply(TEvPrivate::TEvCompleteBarrier::TPtr&, TOperationContext& context) override { // WIP
// LOG_I(DebugHint() << "HandleReply TEvCompleteBarrier");

// // if (!TDone::Process(context)) {
// // return false;
// // }

// const auto* txState = context.SS->FindTx(OperationId);
// Y_ABORT_UNLESS(txState);
// Y_ABORT_UNLESS(txState->TxType == TTxState::TxCreateCdcStreamAtTableWithInitialScan);
// const auto& pathId = txState->TargetPathId;

// Y_ABORT_UNLESS(context.SS->PathsById.contains(pathId));
// auto path = context.SS->PathsById.at(pathId);

// TMaybe<TPathId> streamPathId;
// for (const auto& [_, childPathId] : path->GetChildren()) {
// Y_ABORT_UNLESS(context.SS->PathsById.contains(childPathId));
// auto childPath = context.SS->PathsById.at(childPathId);

// if (childPath->CreateTxId != OperationId.GetTxId()) {
// continue;
// }

// Y_ABORT_UNLESS(childPath->IsCdcStream() && !childPath->Dropped());
// Y_ABORT_UNLESS(context.SS->CdcStreams.contains(childPathId));
// auto stream = context.SS->CdcStreams.at(childPathId);

// Y_ABORT_UNLESS(stream->State == TCdcStreamInfo::EState::ECdcStreamStateScan);
// Y_VERIFY_S(!streamPathId, "Too many cdc streams are planned to fill with initial scan"
// << ": found# " << *streamPathId
// << ", another# " << childPathId);
// streamPathId = childPathId;
// }

// if (AppData()->DisableCdcAutoSwitchingToReadyStateForTests) {
// return true;
// }

// Y_ABORT_UNLESS(streamPathId);

// return true;
// }

private:
const TOperationId OperationId;
const NKikimrSchemeOp::TRestoreMultipleIncrementalBackups RestoreOp;
Expand Down Expand Up @@ -340,7 +383,7 @@ class TNewRestoreFromAtTable : public TSubOperation {
Y_ABORT("unreachable");
}

TTxState::ETxState NextState(TTxState::ETxState state, TOperationContext& context) const {
TTxState::ETxState NextState(TTxState::ETxState state, TOperationContext&) const {
switch (state) {
case TTxState::Waiting:
return TTxState::CopyTableBarrier;
Expand All @@ -350,37 +393,23 @@ class TNewRestoreFromAtTable : public TSubOperation {
return TTxState::Propose;
case TTxState::Propose:
return TTxState::ProposedWaitParts;
case TTxState::ProposedWaitParts: {
auto* txState = context.SS->FindTx(OperationId);
Y_ABORT_UNLESS(txState);
++(txState->LoopStep);
if (txState->LoopStep < Transaction.GetRestoreMultipleIncrementalBackups().SrcPathIdsSize()) {
txState->TargetPathId = TPathId::FromProto(Transaction.GetRestoreMultipleIncrementalBackups().GetSrcPathIds(txState->LoopStep));
txState->TxShardsListFinalized = false;
// TODO preserve TxState
return TTxState::ConfigureParts;
}
case TTxState::ProposedWaitParts:
return TTxState::Done;
}
default:
return TTxState::Invalid;
}
}

TSubOperationState::TPtr SelectStateFunc(TTxState::ETxState state, TOperationContext& context) override {
TSubOperationState::TPtr SelectStateFunc(TTxState::ETxState state, TOperationContext&) override {
switch (state) {
case TTxState::Waiting:
case TTxState::CopyTableBarrier:
return MakeHolder<NIncrRestore::TCopyTableBarrier>(OperationId);
case TTxState::ConfigureParts: {
auto* txState = context.SS->FindTx(OperationId);
Y_ABORT_UNLESS(txState);
return MakeHolder<NIncrRestore::TConfigurePartsAtTable>(OperationId, Transaction.GetRestoreMultipleIncrementalBackups(), txState->LoopStep);
}
case TTxState::ConfigureParts:
return MakeHolder<NIncrRestore::TConfigurePartsAtTable>(OperationId, Transaction.GetRestoreMultipleIncrementalBackups());
case TTxState::Propose:
return MakeHolder<NIncrRestore::TProposeAtTable>(OperationId, Transaction.GetRestoreMultipleIncrementalBackups());
case TTxState::ProposedWaitParts:
// TODO: check the right next state always choosen
return MakeHolder<NTableState::TProposedWaitParts>(OperationId);
case TTxState::Done:
return MakeHolder<NIncrRestore::TDone>(OperationId, Transaction.GetRestoreMultipleIncrementalBackups());
Expand Down Expand Up @@ -510,7 +539,7 @@ class TNewRestoreFromAtTable : public TSubOperation {
Y_ABORT_UNLESS(table->AlterVersion != 0);
Y_ABORT_UNLESS(!table->AlterData);

tablePath.Base()->PathState = NKikimrSchemeOp::EPathStateOutgoingIncrementalRestore;
tablePath.Base()->PathState = NKikimrSchemeOp::EPathStateAwaitingOutgoingIncrementalRestore;
tablePath.Base()->LastTxId = OperationId.GetTxId();

for (const auto& splitOpId : table->GetSplitOpsInFlight()) {
Expand Down
21 changes: 21 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1510,6 +1510,27 @@ class TSchemeShard
void RemoveCdcStreamScanShardStatus(NIceDb::TNiceDb& db, const TPathId& streamPathId, const TShardIdx& shardIdx);
// } // NCdcStreamScan

// namespace NRestoreScan {
struct TRestoreScan {
struct TTxProgress;
};

TDedicatedPipePool<TPathId> RestoreScanPipes;

NTabletFlatExecutor::ITransaction* CreateTxProgressRestoreScan(TEvPrivate::TEvRunRestoreScan::TPtr& ev);
NTabletFlatExecutor::ITransaction* CreateTxProgressRestoreScan(TEvDataShard::TEvRestoreScanResponse::TPtr& ev);
NTabletFlatExecutor::ITransaction* CreatePipeRetry(TTabletId tabletId);

void Handle(TEvPrivate::TEvRunRestoreScan::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvRestoreScanResponse::TPtr& ev, const TActorContext& ctx);

void ResumeRestoreScans(const TVector<TPathId>& ids, const TActorContext& ctx);

void PersistRestoreScanShardStatus(NIceDb::TNiceDb& db, const TPathId& streamPathId, const TShardIdx& shardIdx,
const TCdcStreamInfo::TShardStatus& status);
void RemoveRestoreScanShardStatus(NIceDb::TNiceDb& db, const TPathId& streamPathId, const TShardIdx& shardIdx);
// } // NRestoreScan

// statistics
TTabletId StatisticsAggregatorId;
TActorId SAPipeClientId;
Expand Down
Loading