Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ class TDone: public TSubOperationState {
const auto* txState = context.SS->FindTx(OperationId);
Y_ABORT_UNLESS(txState);
Y_ABORT_UNLESS(IsExpectedTxType(txState->TxType));
Y_ABORT_UNLESS(txState->TargetPathId == TPathId::FromProto(RestoreOp.GetSrcPathIds(RestoreOp.SrcPathIdsSize() - 1)));

for (const auto& pathId : RestoreOp.GetSrcPathIds()) {
context.OnComplete.ReleasePathState(OperationId, TPathId::FromProto(pathId), TPathElement::EPathState::EPathStateNoChanges);
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6875,6 +6875,8 @@ void TSchemeShard::Handle(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev, con
return Execute(CreateTxProgressExport(ev), ctx);
} else if (Imports.contains(id)) {
return Execute(CreateTxProgressImport(ev), ctx);
} else if (IncrementalRestoreContexts.contains(id)) {
return Execute(CreateTxProgressIncrementalRestore(ev), ctx);
} else if (IndexBuilds.contains(TIndexBuildId(id))) {
return Execute(CreateTxReply(ev), ctx);
}
Expand All @@ -6899,6 +6901,8 @@ void TSchemeShard::Handle(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr
return Execute(CreateTxProgressExport(ev), ctx);
} else if (TxIdToImport.contains(txId)) {
return Execute(CreateTxProgressImport(ev), ctx);
} else if (TxIdToIncrementalRestore.contains(txId)) {
return Execute(CreateTxProgressIncrementalRestore(ev), ctx);
} else if (TxIdToIndexBuilds.contains(txId)) {
return Execute(CreateTxReply(ev), ctx);
} else if (BackgroundCleaningTxToDirPathId.contains(txId)) {
Expand Down Expand Up @@ -6953,6 +6957,10 @@ void TSchemeShard::Handle(TEvSchemeShard::TEvNotifyTxCompletionResult::TPtr& ev,
Execute(CreateTxProgressImport(txId), ctx);
executed = true;
}
if (TxIdToIncrementalRestore.contains(txId)) {
Execute(CreateTxProgressIncrementalRestore(txId), ctx);
executed = true;
}
if (TxIdToIndexBuilds.contains(txId)) {
Execute(CreateTxReply(txId), ctx);
executed = true;
Expand Down
20 changes: 20 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1282,6 +1282,18 @@ class TSchemeShard
// This set is needed to kill all the running scheme uploaders on SchemeShard death.
THashSet<TActorId> RunningExportSchemeUploaders;

// Incremental restore transaction tracking
THashMap<TTxId, ui64> TxIdToIncrementalRestore;

// Context storage for incremental restore transactions
struct TIncrementalRestoreContext {
TPathId DestinationTablePathId;
TString DestinationTablePath;
ui64 OriginalOperationId;
TPathId BackupCollectionPathId;
};
THashMap<ui64, TIncrementalRestoreContext> IncrementalRestoreContexts;

void FromXxportInfo(NKikimrExport::TExport& exprt, const TExportInfo& exportInfo);

static void PersistCreateExport(NIceDb::TNiceDb& db, const TExportInfo& exportInfo);
Expand Down Expand Up @@ -1534,6 +1546,14 @@ class TSchemeShard

// Incremental Restore Scan
NTabletFlatExecutor::ITransaction* CreateTxProgressIncrementalRestore(TEvPrivate::TEvRunIncrementalRestore::TPtr& ev);

// Transaction lifecycle constructor functions
NTabletFlatExecutor::ITransaction* CreateTxProgressIncrementalRestore(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev);
NTabletFlatExecutor::ITransaction* CreateTxProgressIncrementalRestore(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& ev);
NTabletFlatExecutor::ITransaction* CreateTxProgressIncrementalRestore(TTxId completedTxId);

NTabletFlatExecutor::ITransaction* CreateTxIncrementalRestoreResponse(TEvDataShard::TEvProposeTransactionResult::TPtr& ev);

void Handle(TEvPrivate::TEvRunIncrementalRestore::TPtr& ev, const TActorContext& ctx);

void ResumeCdcStreamScans(const TVector<TPathId>& ids, const TActorContext& ctx);
Expand Down
Loading