diff --git a/ydb/core/backup/impl/local_partition_reader.cpp b/ydb/core/backup/impl/local_partition_reader.cpp index 32eb352fd000..462380077f6e 100644 --- a/ydb/core/backup/impl/local_partition_reader.cpp +++ b/ydb/core/backup/impl/local_partition_reader.cpp @@ -58,12 +58,16 @@ class TLocalPartitionReader Send(PQTablet, CreateGetOffsetRequest().Release()); } + void HandleInit() { + Send(PQTablet, CreateGetOffsetRequest().Release()); + } + void HandleInit(TEvPersQueue::TEvResponse::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); auto& record = ev->Get()->Record; if (record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { - // TODO reschedule - Y_ABORT("Unimplemented!"); + Schedule(TDuration::Seconds(1), new NActors::TEvents::TEvWakeup); + return; } Y_VERIFY_S(record.GetErrorCode() == NPersQueue::NErrorCode::OK, "Unimplemented!"); Y_VERIFY_S(record.HasPartitionResponse() && record.GetPartitionResponse().HasCmdGetClientOffsetResult(), "Unimplemented!"); @@ -85,7 +89,7 @@ class TLocalPartitionReader auto& read = *req.MutableCmdRead(); read.SetOffset(Offset); read.SetClientId(OFFLOAD_ACTOR_CLIENT_ID); - read.SetTimeoutMs(0); + read.SetTimeoutMs(1000); read.SetBytes(1_MB); return request; @@ -114,6 +118,12 @@ class TLocalPartitionReader const auto& readResult = record.GetPartitionResponse().GetCmdReadResult(); + if (!readResult.ResultSize()) { + Y_ABORT_UNLESS(PQTablet); + Send(PQTablet, CreateReadRequest().Release()); + return; + } + auto gotOffset = Offset; TVector records(::Reserve(readResult.ResultSize())); @@ -147,6 +157,7 @@ class TLocalPartitionReader switch (ev->GetTypeRewrite()) { hFunc(TEvWorker::TEvHandshake, HandleInit); hFunc(TEvPersQueue::TEvResponse, HandleInit); + sFunc(TEvents::TEvWakeup, HandleInit); sFunc(TEvents::TEvPoison, PassAway); default: Y_VERIFY_S(false, "Unhandled event type: " << ev->GetTypeRewrite() diff --git a/ydb/core/backup/impl/table_writer.cpp b/ydb/core/backup/impl/table_writer.cpp index 157e30c75edc..b4708e07e7e7 100644 --- a/ydb/core/backup/impl/table_writer.cpp +++ b/ydb/core/backup/impl/table_writer.cpp @@ -7,6 +7,27 @@ namespace NKikimr::NBackup::NImpl { +class TChangeRecord; + +} + +namespace NKikimr { + +template <> +struct TChangeRecordBuilderContextTrait { + bool Restore; + + TChangeRecordBuilderContextTrait(bool restore) + : Restore(restore) + {} + + TChangeRecordBuilderContextTrait(const TChangeRecordBuilderContextTrait& other) = default; +}; + +} + +namespace NKikimr::NBackup::NImpl { + class TChangeRecord: public NChangeExchange::TChangeRecordBase { friend class TChangeRecordBuilder; @@ -30,24 +51,81 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase { return SourceId; } - void Serialize(NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record) const { - record.SetSourceOffset(GetOrder()); - // TODO: fill WriteTxId - - record.SetKey(ProtoBody.GetCdcDataChange().GetKey().GetData()); - - auto& upsert = *record.MutableUpsert(); - *upsert.MutableTags() = { - ProtoBody.GetCdcDataChange().GetUpsert().GetTags().begin(), - ProtoBody.GetCdcDataChange().GetUpsert().GetTags().end()}; - upsert.SetData(ProtoBody.GetCdcDataChange().GetUpsert().GetData()); - } - void Serialize( NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record, - TChangeRecordBuilderContextTrait&) const + TChangeRecordBuilderContextTrait& ctx) const { - return Serialize(record); + if (!ctx.Restore) { + record.SetSourceOffset(GetOrder()); + // TODO: fill WriteTxId + + record.SetKey(ProtoBody.GetCdcDataChange().GetKey().GetData()); + + auto& upsert = *record.MutableUpsert(); + + switch (ProtoBody.GetCdcDataChange().GetRowOperationCase()) { + case NKikimrChangeExchange::TDataChange::kUpsert: { + *upsert.MutableTags() = { + ProtoBody.GetCdcDataChange().GetUpsert().GetTags().begin(), + ProtoBody.GetCdcDataChange().GetUpsert().GetTags().end()}; + auto it = Schema->ValueColumns.find("__incrBackupImpl_deleted"); + upsert.AddTags(it->second.Tag); + + TString serializedCellVec = ProtoBody.GetCdcDataChange().GetUpsert().GetData(); + const TCell cell = TCell::Make(false); + Y_ABORT_UNLESS(TSerializedCellVec::UnsafeAppendCell(cell, serializedCellVec), "uh-oh"); + + upsert.SetData(serializedCellVec); + break; + } + case NKikimrChangeExchange::TDataChange::kErase: { + size_t size = Schema->ValueColumns.size(); + TVector tags; + TVector cells; + + tags.reserve(size); + cells.reserve(size); + + for (const auto& [name, value] : Schema->ValueColumns) { + tags.push_back(value.Tag); + if (name != "__incrBackupImpl_deleted") { + cells.emplace_back(); + } else { + cells.emplace_back(TCell::Make(true)); + } + } + + *upsert.MutableTags() = {tags.begin(), tags.end()}; + upsert.SetData(TSerializedCellVec::Serialize(cells)); + + break; + } + case NKikimrChangeExchange::TDataChange::kReset: + default: + Y_FAIL_S("Unexpected row operation: " << static_cast(ProtoBody.GetCdcDataChange().GetRowOperationCase())); + } + } else { + record.SetSourceOffset(GetOrder()); + record.SetKey(ProtoBody.GetCdcDataChange().GetKey().GetData()); + + switch (ProtoBody.GetCdcDataChange().GetRowOperationCase()) { + case NKikimrChangeExchange::TDataChange::kUpsert: { + auto& upsert = *record.MutableUpsert(); + *upsert.MutableTags() = { + ProtoBody.GetCdcDataChange().GetUpsert().GetTags().begin(), + ProtoBody.GetCdcDataChange().GetUpsert().GetTags().end()}; + upsert.SetData(ProtoBody.GetCdcDataChange().GetUpsert().GetData()); + break; + } + case NKikimrChangeExchange::TDataChange::kErase: + record.MutableErase(); + break; + case NKikimrChangeExchange::TDataChange::kReset: + default: + Y_FAIL_S("Unexpected row operation: " << static_cast(ProtoBody.GetCdcDataChange().GetRowOperationCase())); + } + + } } ui64 ResolvePartitionId(NChangeExchange::IChangeSenderResolver* const resolver) const override { @@ -156,8 +234,8 @@ Y_DECLARE_OUT_SPEC(inline, NKikimr::NBackup::NImpl::TChangeRecord::TPtr, out, va namespace NKikimr::NBackup::NImpl { -IActor* CreateLocalTableWriter(const TPathId& tablePathId) { - return new NReplication::NService::TLocalTableWriter(tablePathId); +IActor* CreateLocalTableWriter(const TPathId& tablePathId, bool restore) { + return new NReplication::NService::TLocalTableWriter(tablePathId, restore); } } diff --git a/ydb/core/backup/impl/table_writer.h b/ydb/core/backup/impl/table_writer.h index e5ca1dbc60b1..5c3611ccf576 100644 --- a/ydb/core/backup/impl/table_writer.h +++ b/ydb/core/backup/impl/table_writer.h @@ -8,6 +8,6 @@ namespace NKikimr { namespace NKikimr::NBackup::NImpl { -IActor* CreateLocalTableWriter(const TPathId& tablePathId); +IActor* CreateLocalTableWriter(const TPathId& tablePathId, bool restore = false); } diff --git a/ydb/core/grpc_services/rpc_describe_table.cpp b/ydb/core/grpc_services/rpc_describe_table.cpp index 05f481e60c89..693213dea88c 100644 --- a/ydb/core/grpc_services/rpc_describe_table.cpp +++ b/ydb/core/grpc_services/rpc_describe_table.cpp @@ -106,6 +106,7 @@ class TDescribeTableRPC : public TRpcSchemeRequestActor LogPrefix; TActorId Worker; @@ -60,14 +59,19 @@ class TOffloadActor : ParentTablet(parentTablet) , Partition(partition) , Config(config) - , DstPathId(PathIdFromPathId(config.GetIncrementalBackup().GetDstPathId())) {} void Bootstrap() { auto* workerActor = CreateWorker( SelfId(), [=]() -> IActor* { return NBackup::NImpl::CreateLocalPartitionReader(ParentTablet, Partition); }, - [=]() -> IActor* { return NBackup::NImpl::CreateLocalTableWriter(DstPathId); }); + [=]() -> IActor* { + if (Config.HasIncrementalBackup()) { + return NBackup::NImpl::CreateLocalTableWriter(PathIdFromPathId(Config.GetIncrementalBackup().GetDstPathId())); + } else { + return NBackup::NImpl::CreateLocalTableWriter(PathIdFromPathId(Config.GetIncrementalRestore().GetDstPathId()), true); + } + }); Worker = TActivationContext::Register(workerActor); Become(&TOffloadActor::StateWork); @@ -75,9 +79,9 @@ class TOffloadActor STATEFN(StateWork) { switch (ev->GetTypeRewrite()) { - default: - Y_VERIFY_S(false, "Unhandled event type: " << ev->GetTypeRewrite() - << " event: " << ev->ToString()); + // default: + // Y_VERIFY_S(false, "Unhandled event type: " << ev->GetTypeRewrite() + // << " event: " << ev->ToString()); } } }; diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp index 495277081072..7e99f90329f7 100644 --- a/ydb/core/persqueue/read_balancer.cpp +++ b/ydb/core/persqueue/read_balancer.cpp @@ -366,7 +366,8 @@ void TPersQueueReadBalancer::CheckACL(const TEvPersQueue::TEvCheckACL::TPtr &req }; TString user = record.HasUser() ? record.GetUser() : ""; - + RespondWithACL(request, NKikimrPQ::EAccess::ALLOWED, "", ctx); + return; if (record.GetOperation() == NKikimrPQ::EOperation::READ_OP) { if (!Consumers.contains(user)) { RespondWithACL(request, NKikimrPQ::EAccess::DENIED, TStringBuilder() << "no read rule provided for consumer '" diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto index 15f8bb5d7f2b..26f739c4d19d 100644 --- a/ydb/core/protos/counters_schemeshard.proto +++ b/ydb/core/protos/counters_schemeshard.proto @@ -220,6 +220,8 @@ enum ESimpleCounters { COUNTER_IN_FLIGHT_OPS_TxCreateResourcePool = 174 [(CounterOpts) = {Name: "InFlightOps/CreateResourcePool"}]; COUNTER_IN_FLIGHT_OPS_TxDropResourcePool = 175 [(CounterOpts) = {Name: "InFlightOps/DropResourcePool"}]; COUNTER_IN_FLIGHT_OPS_TxAlterResourcePool = 176 [(CounterOpts) = {Name: "InFlightOps/AlterResourcePool"}]; + + COUNTER_IN_FLIGHT_OPS_TxRestoreIncrementalBackup = 177 [(CounterOpts) = {Name: "InFlightOps/RestoreIncrementalBackup"}]; } enum ECumulativeCounters { @@ -351,6 +353,8 @@ enum ECumulativeCounters { COUNTER_FINISHED_OPS_TxCreateResourcePool = 103 [(CounterOpts) = {Name: "FinishedOps/CreateResourcePool"}]; COUNTER_FINISHED_OPS_TxDropResourcePool = 104 [(CounterOpts) = {Name: "FinishedOps/DropResourcePool"}]; COUNTER_FINISHED_OPS_TxAlterResourcePool = 105 [(CounterOpts) = {Name: "FinishedOps/AlterResourcePool"}]; + + COUNTER_FINISHED_OPS_TxRestoreIncrementalBackup = 106 [(CounterOpts) = {Name: "FinishedOps/RestoreIncrementalBackup"}]; } enum EPercentileCounters { diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 864ec8e74d0d..4257227fb405 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -344,6 +344,7 @@ message TTableReplicationConfig { enum EReplicationMode { REPLICATION_MODE_NONE = 0; REPLICATION_MODE_READ_ONLY = 1; + REPLICATION_MODE_RESTORE_INCREMENTAL_BACKUP = 2; } enum EConsistency { @@ -395,6 +396,8 @@ message TTableDescription { optional TTableReplicationConfig ReplicationConfig = 40; optional bool Temporary = 41; + + optional bool IncrementalBackup = 42; } message TDictionaryEncodingSettings { @@ -874,6 +877,7 @@ enum ECdcStreamMode { ECdcStreamModeNewImage = 3; ECdcStreamModeOldImage = 4; ECdcStreamModeNewAndOldImages = 5; + ECdcStreamModeRestoreIncBackup = 6; } enum ECdcStreamFormat { @@ -947,6 +951,7 @@ message TAlterContinuousBackup { } message TTakeIncrementalBackup { + optional string DstPath = 1; } oneof Action { @@ -959,6 +964,11 @@ message TDropContinuousBackup { optional string TableName = 1; } +message TRestoreIncrementalBackup { + optional string SrcTableName = 1; + optional string DstTableName = 2; +} + enum EIndexType { EIndexTypeInvalid = 0; EIndexTypeGlobal = 1; @@ -1589,6 +1599,8 @@ enum EOperationType { ESchemeOpCreateResourcePool = 100; ESchemeOpDropResourcePool = 101; ESchemeOpAlterResourcePool = 102; + + ESchemeOpRestoreIncrementalBackup = 103; } message TApplyIf { @@ -1771,6 +1783,8 @@ message TModifyScheme { optional bool AllowCreateInTempDir = 71 [default = false]; optional TResourcePoolDescription CreateResourcePool = 72; + + optional TRestoreIncrementalBackup RestoreIncrementalBackup = 73; } message TCopySequence { diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 60e1a87eb6fa..6a8be152c5f3 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -311,8 +311,13 @@ message TOffloadConfig { optional NKikimrProto.TPathID DstPathId = 2; } + message TIncrementalRestore { + optional NKikimrProto.TPathID DstPathId = 1; + } + oneof Strategy { TIncrementalBackup IncrementalBackup = 1; + TIncrementalRestore IncrementalRestore = 2; } } diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index b5380f7032fb..b864ef45e426 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -1854,6 +1854,7 @@ message TEvApplyReplicationChangesResult { REASON_SCHEME_ERROR = 2; REASON_BAD_REQUEST = 3; REASON_UNEXPECTED_ROW_OPERATION = 4; + REASON_OUTDATED_SCHEME = 5; } optional EStatus Status = 1; diff --git a/ydb/core/scheme/scheme_tablecell.cpp b/ydb/core/scheme/scheme_tablecell.cpp index 2ee86265b1a4..e1acc55e099e 100644 --- a/ydb/core/scheme/scheme_tablecell.cpp +++ b/ydb/core/scheme/scheme_tablecell.cpp @@ -238,6 +238,40 @@ namespace { } } + +bool TSerializedCellVec::UnsafeAppendCell(const TCell& cell, TString& serializedCellVec) { + const char* buf = serializedCellVec.data(); + const char* bufEnd = serializedCellVec.data() + serializedCellVec.size(); + + if (!serializedCellVec) { + TSerializedCellVec::Serialize(serializedCellVec, {cell}); + return true; + } + + if (Y_UNLIKELY(bufEnd - buf < static_cast(sizeof(ui16)))) { + return false; + } + + ui16 cellCount = ReadUnaligned(buf); + cellCount += 1; + + serializedCellVec.ReserveAndResize(serializedCellVec.size() + sizeof(ui32) + cell.Size()); + + char* mutableBuf = serializedCellVec.Detach(); + char* oldBufEnd = mutableBuf + (bufEnd - buf); + + WriteUnaligned(mutableBuf, cellCount); + + TCellHeader header(cell.Size(), cell.IsNull()); + WriteUnaligned(oldBufEnd, header.RawValue); + oldBufEnd += sizeof(header); + if (cell.Size() > 0) { + cell.CopyDataInto(oldBufEnd); + } + + return true; +} + TSerializedCellVec::TSerializedCellVec(TConstArrayRef cells) { SerializeCellVec(cells, Buf, &Cells); diff --git a/ydb/core/scheme/scheme_tablecell.h b/ydb/core/scheme/scheme_tablecell.h index b7635a424725..ef61969652f8 100644 --- a/ydb/core/scheme/scheme_tablecell.h +++ b/ydb/core/scheme/scheme_tablecell.h @@ -552,6 +552,9 @@ class TSerializedCellVec { return std::move(Buf); } + // read headers, assuming the buf is correct and append additional cell to the end + static bool UnsafeAppendCell(const TCell& cell, TString& serializedCellVec); + private: bool DoTryParse(const TString& data); diff --git a/ydb/core/tx/datashard/cdc_stream_scan.cpp b/ydb/core/tx/datashard/cdc_stream_scan.cpp index 2f67fe5a79ac..ebc1a09d4a1d 100644 --- a/ydb/core/tx/datashard/cdc_stream_scan.cpp +++ b/ydb/core/tx/datashard/cdc_stream_scan.cpp @@ -208,6 +208,31 @@ class TDataShard::TTxCdcStreamScanProgress return updates; } + static std::optional> MakeRestoreUpdates(TArrayRef cells, TArrayRef tags, TUserTable::TCPtr table) { + Y_ABORT_UNLESS(cells.size() >= 1); + TVector updates(::Reserve(cells.size() - 1)); + + bool skipped = false; + Y_ABORT_UNLESS(cells.size() == tags.size()); + for (TPos pos = 0; pos < cells.size(); ++pos) { + const auto tag = tags.at(pos); + auto it = table->Columns.find(tag); + Y_ABORT_UNLESS(it != table->Columns.end()); + if (it->second.Name == "__incrBackupImpl_deleted") { + // FIXME assert + if (const auto& cell = cells.at(pos); !cell.IsNull() && cell.AsValue()) { + return std::nullopt; + } + skipped = true; + continue; + } + updates.emplace_back(tag, ECellOp::Set, TRawTypeValue(cells.at(pos).AsRef(), it->second.Type)); + } + Y_ABORT_UNLESS(skipped); + + return updates; + } + static TRowState MakeRow(TArrayRef cells) { TRowState row(cells.size()); @@ -307,6 +332,15 @@ class TDataShard::TTxCdcStreamScanProgress case NKikimrSchemeOp::ECdcStreamModeUpdate: Serialize(body, ERowOp::Upsert, key, keyTags, MakeUpdates(v.GetCells(), valueTags, table)); break; + case NKikimrSchemeOp::ECdcStreamModeRestoreIncBackup: { + auto updates = MakeRestoreUpdates(v.GetCells(), valueTags, table); + if (updates) { + Serialize(body, ERowOp::Upsert, key, keyTags, *updates); + } else { + Serialize(body, ERowOp::Erase, key, keyTags, {}); + } + break; + } case NKikimrSchemeOp::ECdcStreamModeNewImage: case NKikimrSchemeOp::ECdcStreamModeNewAndOldImages: { const auto newImage = MakeRow(v.GetCells()); diff --git a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp index 30dfb5b8b4ba..0a95fbc8dac2 100644 --- a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp @@ -59,6 +59,33 @@ namespace { return result; } + std::optional> MakeRestoreUpdates(TArrayRef cells, TArrayRef tags, TArrayRef types, TUserTable::TCPtr table) { + Y_ABORT_UNLESS(cells.size() >= 1); + TVector result(::Reserve(cells.size() - 1)); + + Y_ABORT_UNLESS(cells.size() == tags.size()); + Y_ABORT_UNLESS(cells.size() == types.size()); + + bool skipped = false; + for (TPos pos = 0; pos < cells.size(); ++pos) { + const auto tag = tags.at(pos); + auto it = table->Columns.find(tag); + Y_ABORT_UNLESS(it != table->Columns.end()); + if (it->second.Name == "__incrBackupImpl_deleted") { + // FIXME assert + if (const auto& cell = cells.at(pos); !cell.IsNull() && cell.AsValue()) { + return std::nullopt; + } + skipped = true; + continue; + } + result.emplace_back(tags.at(pos), ECellOp::Set, TRawTypeValue(cells.at(pos).AsRef(), types.at(pos))); + } + Y_ABORT_UNLESS(skipped); + + return result; + } + auto MakeTagToPos(TArrayRef tags) { THashMap result; @@ -132,6 +159,7 @@ bool TCdcStreamChangeCollector::NeedToReadKeys() const { switch (streamInfo.Mode) { case NKikimrSchemeOp::ECdcStreamModeKeysOnly: case NKikimrSchemeOp::ECdcStreamModeUpdate: + case NKikimrSchemeOp::ECdcStreamModeRestoreIncBackup: break; case NKikimrSchemeOp::ECdcStreamModeNewImage: case NKikimrSchemeOp::ECdcStreamModeOldImage: @@ -227,6 +255,15 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop, case NKikimrSchemeOp::ECdcStreamModeUpdate: Persist(tableId, pathId, ERowOp::Upsert, key, keyTags, MakeUpdates(**initialState, valueTags, valueTypes)); break; + case NKikimrSchemeOp::ECdcStreamModeRestoreIncBackup: { + auto updates = MakeRestoreUpdates(**initialState, valueTags, valueTypes, userTable); + if (updates) { + Persist(tableId, pathId, ERowOp::Upsert, key, keyTags, *updates); + } else { + Persist(tableId, pathId, ERowOp::Erase, key, keyTags, {}); + } + break; + } case NKikimrSchemeOp::ECdcStreamModeNewImage: case NKikimrSchemeOp::ECdcStreamModeNewAndOldImages: Persist(tableId, pathId, ERowOp::Upsert, key, keyTags, nullptr, &*initialState, valueTags); @@ -244,6 +281,7 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop, Persist(tableId, pathId, rop, key, keyTags, {}); break; case NKikimrSchemeOp::ECdcStreamModeUpdate: + case NKikimrSchemeOp::ECdcStreamModeRestoreIncBackup: Persist(tableId, pathId, rop, key, keyTags, updates); break; case NKikimrSchemeOp::ECdcStreamModeNewImage: diff --git a/ydb/core/tx/datashard/datashard_repl_apply.cpp b/ydb/core/tx/datashard/datashard_repl_apply.cpp index 92533d7befde..54bb72f3667e 100644 --- a/ydb/core/tx/datashard/datashard_repl_apply.cpp +++ b/ydb/core/tx/datashard/datashard_repl_apply.cpp @@ -34,7 +34,8 @@ class TDataShard::TTxApplyReplicationChanges : public TTransactionBaseIsReplicated()) { Result = MakeHolder( NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED, - NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_BAD_REQUEST); + NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_BAD_REQUEST, + TStringBuilder() << "table is not replicated"); return true; } @@ -65,7 +66,9 @@ class TDataShard::TTxApplyReplicationChanges : public TTransactionBase( NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED, - NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_SCHEME_ERROR, + tableId.GetSchemaVersion() < userTable.GetTableSchemaVersion() + ? NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_OUTDATED_SCHEME + : NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_SCHEME_ERROR, std::move(error)); return true; } diff --git a/ydb/core/tx/datashard/datashard_user_table.cpp b/ydb/core/tx/datashard/datashard_user_table.cpp index 8584c716597c..12122dad6afd 100644 --- a/ydb/core/tx/datashard/datashard_user_table.cpp +++ b/ydb/core/tx/datashard/datashard_user_table.cpp @@ -490,6 +490,10 @@ void TUserTable::ApplyAlter( } } + if (delta.HasReplicationConfig()) { + ReplicationConfig = delta.GetReplicationConfig(); + } + // Most settings are applied to both main and shadow table TStackVec tids; tids.push_back(LocalTid); diff --git a/ydb/core/tx/datashard/export_common.cpp b/ydb/core/tx/datashard/export_common.cpp index ee801ebe4c52..72f9823ef2ac 100644 --- a/ydb/core/tx/datashard/export_common.cpp +++ b/ydb/core/tx/datashard/export_common.cpp @@ -67,6 +67,7 @@ TMaybe GenYdbScheme( FillKeyBloomFilter(scheme, tableDesc); FillReadReplicasSettings(scheme, tableDesc); FillSequenceDescription(scheme, tableDesc); + FillBackupDescription(scheme, tableDesc); return scheme; } diff --git a/ydb/core/tx/replication/service/json_change_record.h b/ydb/core/tx/replication/service/json_change_record.h index f94091d02ce3..b43b5565f3a8 100644 --- a/ydb/core/tx/replication/service/json_change_record.h +++ b/ydb/core/tx/replication/service/json_change_record.h @@ -139,6 +139,10 @@ struct TChangeRecordBuilderContextTrait { TChangeRecordBuilderContextTrait() : MemoryPool(256) {} + + TChangeRecordBuilderContextTrait(const TChangeRecordBuilderContextTrait&) + : MemoryPool(256) + {}; }; } diff --git a/ydb/core/tx/replication/service/table_writer_impl.h b/ydb/core/tx/replication/service/table_writer_impl.h index 1fd77232fb2d..cf2c4f4ce8c8 100644 --- a/ydb/core/tx/replication/service/table_writer_impl.h +++ b/ydb/core/tx/replication/service/table_writer_impl.h @@ -178,10 +178,15 @@ class TTablePartitionWriter: public TActorBootstrapped builderContext) : Parent(parent) , TabletId(tabletId) , TableId(tableId) + , BuilderContext(builderContext) { } @@ -206,7 +211,6 @@ class TTablePartitionWriter: public TActorBootstrapped BuilderContext; - }; // TTablePartitionWriter template @@ -430,7 +434,11 @@ class TLocalTableWriter } IActor* CreateSender(ui64 partitionId) const override { - return new TTablePartitionWriter(this->SelfId(), partitionId, TTableId(this->PathId, Schema->Version)); + return new TTablePartitionWriter( + this->SelfId(), + partitionId, + TTableId(this->PathId, Schema->Version), + BuilderContext); } const TVector& GetPartitions() const override { return KeyDesc->GetPartitions(); } @@ -521,9 +529,11 @@ class TLocalTableWriter return NKikimrServices::TActivity::REPLICATION_LOCAL_TABLE_WRITER; } - explicit TLocalTableWriter(const TPathId& tablePathId) + template + explicit TLocalTableWriter(const TPathId& tablePathId, TArgs... args) : TBase(&TThis::StateWork) , TBaseSender(this, this, this, TActorId(), tablePathId) + , BuilderContext(args...) { } @@ -554,6 +564,7 @@ class TLocalTableWriter TMap PendingRecords; + TChangeRecordBuilderContextTrait BuilderContext; }; // TLocalTableWriter } // namespace NKikimr::NReplication::NService diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 0b77b71a9656..6dc5a91d341f 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -307,7 +307,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase { return true; } - typedef std::tuple TTableRec; + typedef std::tuple TTableRec; typedef TDeque TTableRows; template @@ -323,7 +323,8 @@ struct TSchemeShard::TTxInit : public TTransactionBase { rowSet.template GetValueOrDefault(false), rowSet.template GetValueOrDefault(), rowSet.template GetValueOrDefault(false), - rowSet.template GetValueOrDefault("") + rowSet.template GetValueOrDefault(""), + rowSet.template GetValueOrDefault(false) ); } @@ -1830,6 +1831,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase { } tableInfo->IsBackup = std::get<8>(rec); + tableInfo->IsIncrementalBackup = std::get<12>(rec); Self->Tables[pathId] = tableInfo; Self->IncrementPathDbRefCount(pathId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.cpp b/ydb/core/tx/schemeshard/schemeshard__operation.cpp index ee7de50fa0a1..68504676510d 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation.cpp @@ -906,6 +906,8 @@ ISubOperation::TPtr TOperation::RestorePart(TTxState::ETxType txType, TTxState:: return CreateBackup(NextPartId(), txState); case TTxState::ETxType::TxRestore: return CreateRestore(NextPartId(), txState); + case TTxState::ETxType::TxRestoreIncrementalBackup: + Y_ABORT("TODO: implement"); case TTxState::ETxType::TxDropTable: return CreateDropTable(NextPartId(), txState); case TTxState::ETxType::TxCreateTableIndex: @@ -1343,6 +1345,8 @@ TVector TOperation::ConstructParts(const TTxTransaction& tx case NKikimrSchemeOp::EOperationType::ESchemeOpAlterResourcePool: return {CreateAlterResourcePool(NextPartId(), tx)}; + case NKikimrSchemeOp::EOperationType::ESchemeOpRestoreIncrementalBackup: + return CreateRestoreIncrementalBackup(NextPartId(), tx, context); } Y_UNREACHABLE(); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp index 24f9c7c0d0f8..9fbc5fbafc5e 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp @@ -45,6 +45,7 @@ void DoCreateIncBackupTable(const TOperationId& opId, const TPath& dst, NKikimrS auto& desc = *outTx.MutableCreateTable(); desc.CopyFrom(tableDesc); desc.SetName(dst.LeafName()); + desc.SetIncrementalBackup(true); auto& replicationConfig = *desc.MutableReplicationConfig(); replicationConfig.SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY); @@ -78,7 +79,7 @@ TVector CreateAlterContinuousBackup(TOperationId opId, cons const auto topicPath = streamPath.Child("streamImpl"); TTopicInfo::TPtr topic = context.SS->Topics.at(topicPath.Base()->PathId); - const auto backupTablePath = tablePath.Child("incBackupImpl"); + const auto backupTablePath = workingDirPath.Child(cbOp.GetTakeIncrementalBackup().GetDstPath()); const NScheme::TTypeRegistry* typeRegistry = AppData(context.Ctx)->TypeRegistry; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index a615d6b68a4a..2a0ea16d3b45 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -202,6 +202,7 @@ class TNewCdcStream: public TSubOperation { case NKikimrSchemeOp::ECdcStreamModeNewImage: case NKikimrSchemeOp::ECdcStreamModeOldImage: case NKikimrSchemeOp::ECdcStreamModeNewAndOldImages: + case NKikimrSchemeOp::ECdcStreamModeRestoreIncBackup: break; case NKikimrSchemeOp::ECdcStreamModeUpdate: if (streamDesc.GetFormat() == NKikimrSchemeOp::ECdcStreamFormatDynamoDBStreamsJson) { @@ -937,6 +938,7 @@ TVector CreateNewCdcStream(TOperationId opId, const TTxTran case NKikimrSchemeOp::ECdcStreamModeNewImage: case NKikimrSchemeOp::ECdcStreamModeOldImage: case NKikimrSchemeOp::ECdcStreamModeNewAndOldImages: + case NKikimrSchemeOp::ECdcStreamModeRestoreIncBackup: break; default: return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, TStringBuilder() diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp new file mode 100644 index 000000000000..aa7dcfb011e4 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp @@ -0,0 +1,194 @@ +#include "schemeshard__operation_part.h" +#include "schemeshard__operation_common.h" +#include "schemeshard_impl.h" + +#include "schemeshard__operation_create_cdc_stream.h" + +namespace { + +const char* IB_RESTORE_CDC_STREAM_NAME = "__ib_restore_stream"; + +} + +#define LOG_D(stream) LOG_DEBUG_S (context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream) +#define LOG_I(stream) LOG_INFO_S (context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream) +#define LOG_N(stream) LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream) + +namespace NKikimr::NSchemeShard { + +void DoCreateLock(const TOperationId opId, const TPath& workingDirPath, const TPath& tablePath, bool allowIndexImplLock, + TVector& result) +{ + auto outTx = TransactionTemplate(workingDirPath.PathString(), + NKikimrSchemeOp::EOperationType::ESchemeOpCreateLock); + outTx.SetFailOnExist(false); + outTx.SetInternal(true); + auto cfg = outTx.MutableLockConfig(); + cfg->SetName(tablePath.LeafName()); + cfg->SetAllowIndexImplLock(allowIndexImplLock); + + result.push_back(CreateLock(NextPartId(opId, result), outTx)); +} + +void DoCreatePqPart( + const TOperationId& opId, + const TPath& streamPath, + const TString& streamName, + const TIntrusivePtr table, + const TPathId dstPathId, + const NKikimrSchemeOp::TCreateCdcStream& op, + const TVector& boundaries, + const bool acceptExisted, + TVector& result) +{ + auto outTx = TransactionTemplate(streamPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreatePersQueueGroup); + outTx.SetFailOnExist(!acceptExisted); + + auto& desc = *outTx.MutableCreatePersQueueGroup(); + desc.SetName("streamImpl"); + desc.SetTotalGroupCount(op.HasTopicPartitions() ? op.GetTopicPartitions() : table->GetPartitions().size()); + desc.SetPartitionPerTablet(2); + + auto& pqConfig = *desc.MutablePQTabletConfig(); + pqConfig.SetTopicName(streamName); + pqConfig.SetTopicPath(streamPath.Child("streamImpl").PathString()); + pqConfig.SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS); + + auto& partitionConfig = *pqConfig.MutablePartitionConfig(); + partitionConfig.SetLifetimeSeconds(op.GetRetentionPeriodSeconds()); + partitionConfig.SetWriteSpeedInBytesPerSecond(1_MB); // TODO: configurable write speed + partitionConfig.SetBurstSize(1_MB); // TODO: configurable burst + partitionConfig.SetMaxCountInPartition(Max()); + + for (const auto& tag : table->KeyColumnIds) { + Y_ABORT_UNLESS(table->Columns.contains(tag)); + const auto& column = table->Columns.at(tag); + + auto& keyComponent = *pqConfig.AddPartitionKeySchema(); + keyComponent.SetName(column.Name); + auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(column.PType, column.PTypeMod); + keyComponent.SetTypeId(columnType.TypeId); + if (columnType.TypeInfo) { + *keyComponent.MutableTypeInfo() = *columnType.TypeInfo; + } + } + + for (const auto& serialized : boundaries) { + TSerializedCellVec endKey(serialized); + Y_ABORT_UNLESS(endKey.GetCells().size() <= table->KeyColumnIds.size()); + + TString errStr; + auto& boundary = *desc.AddPartitionBoundaries(); + for (ui32 ki = 0; ki < endKey.GetCells().size(); ++ki) { + const auto& cell = endKey.GetCells()[ki]; + const auto tag = table->KeyColumnIds.at(ki); + Y_ABORT_UNLESS(table->Columns.contains(tag)); + const auto typeId = table->Columns.at(tag).PType; + const bool ok = NMiniKQL::CellToValue(typeId, cell, *boundary.AddTuple(), errStr); + Y_ABORT_UNLESS(ok, "Failed to build key tuple at position %" PRIu32 " error: %s", ki, errStr.data()); + } + } + + auto& ir = *pqConfig.MutableOffloadConfig()->MutableIncrementalRestore(); + auto* pathId = ir.MutableDstPathId(); + PathIdFromPathId(dstPathId, pathId); + + result.push_back(CreateNewPQ(NextPartId(opId, result), outTx)); +} + +void DoCreateAlterTable( + const TOperationId& opId, + const TPath& dstTablePath, + TVector& result) +{ + auto outTx = TransactionTemplate(dstTablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTable); + auto& desc = *outTx.MutableAlterTable(); + + PathIdFromPathId(dstTablePath.Base()->PathId, desc.MutablePathId()); + + auto& replicationConfig = *desc.MutableReplicationConfig(); + replicationConfig.SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_RESTORE_INCREMENTAL_BACKUP); + replicationConfig.SetConsistency(NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK); + + result.push_back(CreateAlterTable(NextPartId(opId, result), outTx)); +} + + +TVector CreateRestoreIncrementalBackup(TOperationId opId, const TTxTransaction& tx, TOperationContext& context) { + Y_ABORT_UNLESS(tx.GetOperationType() == NKikimrSchemeOp::EOperationType::ESchemeOpRestoreIncrementalBackup); + + LOG_N("CreateRestoreIncrementalBackup" + << ": opId# " << opId + << ", tx# " << tx.ShortDebugString()); + + const auto acceptExisted = !tx.GetFailOnExist(); + const auto workingDirPath = TPath::Resolve(tx.GetWorkingDir(), context.SS); + const auto& restoreOp = tx.GetRestoreIncrementalBackup(); + const auto& srcTableName = restoreOp.GetSrcTableName(); + const auto& dstTableName = restoreOp.GetDstTableName(); + const auto dstTablePath = workingDirPath.Child(dstTableName); + + const auto checksResult = NCdc::DoNewStreamPathChecks(opId, workingDirPath, srcTableName, IB_RESTORE_CDC_STREAM_NAME, acceptExisted); + if (std::holds_alternative(checksResult)) { + return {std::get(checksResult)}; + } + + const auto [srcTablePath, streamPath] = std::get(checksResult); + + Y_ABORT_UNLESS(context.SS->Tables.contains(srcTablePath.Base()->PathId)); + auto srcTable = context.SS->Tables.at(srcTablePath.Base()->PathId); + + TString errStr; + if (!context.SS->CheckApplyIf(tx, errStr)) { + return {CreateReject(opId, NKikimrScheme::StatusPreconditionFailed, errStr)}; + } + + if (!context.SS->CheckLocks(srcTablePath.Base()->PathId, tx, errStr)) { + return {CreateReject(opId, NKikimrScheme::StatusMultipleModifications, errStr)}; + } + + // check dst locks + // lock dst + // start replication read on dst + + TVector boundaries; + const auto& partitions = srcTable->GetPartitions(); + boundaries.reserve(partitions.size() - 1); + + for (ui32 i = 0; i < partitions.size(); ++i) { + const auto& partition = partitions.at(i); + if (i != partitions.size() - 1) { + boundaries.push_back(partition.EndOfRange); + } + } + + NKikimrSchemeOp::TCreateCdcStream createCdcStreamOp; + createCdcStreamOp.SetTableName(srcTableName); + auto& streamDescription = *createCdcStreamOp.MutableStreamDescription(); + streamDescription.SetName(IB_RESTORE_CDC_STREAM_NAME); + streamDescription.SetMode(NKikimrSchemeOp::ECdcStreamModeRestoreIncBackup); + streamDescription.SetFormat(NKikimrSchemeOp::ECdcStreamFormatProto); + streamDescription.SetState(NKikimrSchemeOp::ECdcStreamStateScan); + + TVector result; + + DoCreateLock(opId, workingDirPath, srcTablePath, false, result); + + DoCreateAlterTable(opId, dstTablePath, result); + + NCdc::DoCreateStream(createCdcStreamOp, opId, workingDirPath, srcTablePath, acceptExisted, true, {}, result); + DoCreatePqPart( + opId, + streamPath, + IB_RESTORE_CDC_STREAM_NAME, + srcTable, + dstTablePath.Base()->PathId, + createCdcStreamOp, + boundaries, + acceptExisted, + result); + + return result; +} + +} // namespace NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp index 029e72fe4c3f..fc64ffd76378 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp @@ -559,7 +559,8 @@ class TCreateTable: public TSubOperation { const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry; const TSchemeLimits& limits = domainInfo->GetSchemeLimits(); - TTableInfo::TAlterDataPtr alterData = TTableInfo::CreateAlterData(nullptr, schema, *typeRegistry, limits, *domainInfo, context.SS->EnableTablePgTypes, context.SS->EnableTableDatetime64, errStr, LocalSequences); + TTableInfo::TAlterDataPtr alterData = TTableInfo::CreateAlterData( + nullptr, schema, *typeRegistry, limits, *domainInfo, context.SS->EnableTablePgTypes, context.SS->EnableTableDatetime64, errStr, LocalSequences); if (!alterData.Get()) { result->SetError(NKikimrScheme::StatusSchemeError, errStr); return result; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.h b/ydb/core/tx/schemeshard/schemeshard__operation_part.h index 386dcb20768a..08c76fe7d540 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_part.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.h @@ -620,5 +620,7 @@ ISubOperation::TPtr CreateAlterResourcePool(TOperationId id, TTxState::ETxState ISubOperation::TPtr CreateDropResourcePool(TOperationId id, const TTxTransaction& tx); ISubOperation::TPtr CreateDropResourcePool(TOperationId id, TTxState::ETxState state); +TVector CreateRestoreIncrementalBackup(TOperationId opId, const TTxTransaction& tx, TOperationContext& context); + } } diff --git a/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp b/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp index 9c2005264923..a39168f7597e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp @@ -235,12 +235,16 @@ TString DefineUserOperationName(const NKikimrSchemeOp::TModifyScheme& tx) { return "ALTER TABLE ALTER CONTINUOUS BACKUP"; case NKikimrSchemeOp::EOperationType::ESchemeOpDropContinuousBackup: return "ALTER TABLE DROP CONTINUOUS BACKUP"; + // case NKikimrSchemeOp::EOperationType::ESchemeOpCreateResourcePool: return "CREATE RESOURCE POOL"; case NKikimrSchemeOp::EOperationType::ESchemeOpDropResourcePool: return "DROP RESOURCE POOL"; case NKikimrSchemeOp::EOperationType::ESchemeOpAlterResourcePool: return "ALTER RESOURCE POOL"; + // + case NKikimrSchemeOp::EOperationType::ESchemeOpRestoreIncrementalBackup: + return "RESTORE"; } Y_ABORT("switch should cover all operation types"); } @@ -544,6 +548,10 @@ TVector ExtractChangingPaths(const NKikimrSchemeOp::TModifyScheme& tx) case NKikimrSchemeOp::EOperationType::ESchemeOpAlterResourcePool: result.emplace_back(tx.GetCreateResourcePool().GetName()); break; + case NKikimrSchemeOp::EOperationType::ESchemeOpRestoreIncrementalBackup: + result.emplace_back(NKikimr::JoinPath({tx.GetWorkingDir(), tx.GetRestoreIncrementalBackup().GetSrcTableName()})); + result.emplace_back(NKikimr::JoinPath({tx.GetWorkingDir(), tx.GetRestoreIncrementalBackup().GetDstTableName()})); + break; } return result; diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 05098b99d693..26a27e59c749 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -1581,6 +1581,7 @@ TPathElement::EPathState TSchemeShard::CalcPathState(TTxState::ETxType txType, T case TTxState::TxBackup: return TPathElement::EPathState::EPathStateBackup; case TTxState::TxRestore: + case TTxState::TxRestoreIncrementalBackup: return TPathElement::EPathState::EPathStateRestore; case TTxState::TxUpgradeSubDomain: return TPathElement::EPathState::EPathStateUpgrade; @@ -2645,7 +2646,8 @@ void TSchemeShard::PersistTableAltered(NIceDb::TNiceDb& db, const TPathId pathId NIceDb::TUpdate(tableInfo->IsBackup), NIceDb::TUpdate(replicationConfig), NIceDb::TUpdate(tableInfo->IsTemporary), - NIceDb::TUpdate(tableInfo->OwnerActorId.ToString())); + NIceDb::TUpdate(tableInfo->OwnerActorId.ToString()), + NIceDb::TUpdate(tableInfo->IsIncrementalBackup)); } else { db.Table().Key(pathId.OwnerId, pathId.LocalPathId).Update( NIceDb::TUpdate(tableInfo->NextColumnId), @@ -2657,7 +2659,8 @@ void TSchemeShard::PersistTableAltered(NIceDb::TNiceDb& db, const TPathId pathId NIceDb::TUpdate(tableInfo->IsBackup), NIceDb::TUpdate(replicationConfig), NIceDb::TUpdate(tableInfo->IsTemporary), - NIceDb::TUpdate(tableInfo->OwnerActorId.ToString())); + NIceDb::TUpdate(tableInfo->OwnerActorId.ToString()), + NIceDb::TUpdate(tableInfo->IsIncrementalBackup)); } for (auto col : tableInfo->Columns) { @@ -6538,6 +6541,8 @@ TString TSchemeShard::FillAlterTableTxBody(TPathId pathId, TShardIdx shardIdx, T *patch); } + proto->MutableReplicationConfig()->CopyFrom(alterData->TableDescriptionFull->GetReplicationConfig()); + TString txBody; Y_PROTOBUF_SUPPRESS_NODISCARD tx.SerializeToString(&txBody); return txBody; diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp index 54bc391e5c6a..e21dc2f1f5f4 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp @@ -532,6 +532,8 @@ TTableInfo::TAlterDataPtr TTableInfo::CreateAlterData( return nullptr; } break; + case NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_RESTORE_INCREMENTAL_BACKUP: + break; default: errStr = "Unknown replication mode"; return nullptr; @@ -541,6 +543,7 @@ TTableInfo::TAlterDataPtr TTableInfo::CreateAlterData( } alterData->IsBackup = op.GetIsBackup(); + alterData->IsIncrementalBackup = op.GetIncrementalBackup(); if (source && op.KeyColumnNamesSize() == 0) return alterData; diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index b5297bf20037..281b38922d99 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -381,6 +381,7 @@ struct TTableInfo : public TSimpleRefCount { THashMap Columns; TVector KeyColumnIds; bool IsBackup = false; + bool IsIncrementalBackup = true; NKikimrSchemeOp::TTableDescription TableDescriptionDiff; TMaybeFail TableDescriptionFull; @@ -426,6 +427,7 @@ struct TTableInfo : public TSimpleRefCount { bool IsBackup = false; bool IsTemporary = false; TActorId OwnerActorId; + bool IsIncrementalBackup = false; TAlterTableInfo::TPtr AlterData; @@ -533,6 +535,7 @@ struct TTableInfo : public TSimpleRefCount { , Columns(std::move(alterData.Columns)) , KeyColumnIds(std::move(alterData.KeyColumnIds)) , IsBackup(alterData.IsBackup) + , IsIncrementalBackup(alterData.IsIncrementalBackup) { TableDescription.Swap(alterData.TableDescriptionFull.Get()); } diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp index af1413ee1000..17531fb67179 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp @@ -1193,6 +1193,8 @@ void TSchemeShard::DescribeTable(const TTableInfo::TPtr tableInfo, const NScheme } entry->SetIsBackup(tableInfo->IsBackup); + + entry->SetIncrementalBackup(tableInfo->IsIncrementalBackup); } void TSchemeShard::DescribeTableIndex(const TPathId& pathId, const TString& name, diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index ac7cce9805c8..0f64921d3934 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -129,6 +129,7 @@ struct Schema : NIceDb::Schema { struct ReplicationConfig : Column<10, NScheme::NTypeIds::String> {}; struct IsTemporary : Column<11, NScheme::NTypeIds::Bool> {}; struct OwnerActorId : Column<12, NScheme::NTypeIds::String> {}; // deprecated + struct IsIncrementalBackup : Column<13, NScheme::NTypeIds::Bool> {}; using TKey = TableKey; using TColumns = TableColumns< @@ -143,7 +144,8 @@ struct Schema : NIceDb::Schema { IsBackup, ReplicationConfig, IsTemporary, - OwnerActorId + OwnerActorId, + IsIncrementalBackup >; }; @@ -162,6 +164,7 @@ struct Schema : NIceDb::Schema { struct ReplicationConfig : Column<11, NScheme::NTypeIds::String> {}; struct IsTemporary : Column<12, NScheme::NTypeIds::Bool> {}; struct OwnerActorId : Column<13, NScheme::NTypeIds::String> {}; // deprecated + struct IsIncrementalBackup : Column<14, NScheme::NTypeIds::Bool> {}; using TKey = TableKey; using TColumns = TableColumns< @@ -177,7 +180,8 @@ struct Schema : NIceDb::Schema { IsBackup, ReplicationConfig, IsTemporary, - OwnerActorId + OwnerActorId, + IsIncrementalBackup >; }; diff --git a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h index aab23b08df34..00ea21e3dfb7 100644 --- a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h +++ b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h @@ -137,6 +137,7 @@ struct TTxState { item(TxCreateResourcePool, 91) \ item(TxDropResourcePool, 92) \ item(TxAlterResourcePool, 93) \ + item(TxRestoreIncrementalBackup, 94) \ // TX_STATE_TYPE_ENUM @@ -400,6 +401,7 @@ struct TTxState { case TxMergeTablePartition: case TxBackup: case TxRestore: + case TxRestoreIncrementalBackup: case TxAlterBlockStoreVolume: case TxAssignBlockStoreVolume: case TxAlterFileStore: @@ -505,6 +507,7 @@ struct TTxState { case TxMergeTablePartition: case TxBackup: case TxRestore: + case TxRestoreIncrementalBackup: case TxAlterBlockStoreVolume: case TxAssignBlockStoreVolume: case TxAlterFileStore: @@ -610,6 +613,7 @@ struct TTxState { case TxModifyACL: case TxBackup: case TxRestore: + case TxRestoreIncrementalBackup: case TxAlterBlockStoreVolume: case TxAssignBlockStoreVolume: case TxAlterFileStore: diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index 975514b27a12..631c0da8329f 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -232,6 +232,7 @@ SRCS( schemeshard_validate_ttl.cpp operation_queue_timer.h user_attributes.cpp + schemeshard__operation_create_restore_incremental_backup.cpp ) GENERATE_ENUM_SERIALIZATION(schemeshard_info_types.h) diff --git a/ydb/core/tx/tx_proxy/schemereq.cpp b/ydb/core/tx/tx_proxy/schemereq.cpp index b373ccbc83d6..99fa357a3fb2 100644 --- a/ydb/core/tx/tx_proxy/schemereq.cpp +++ b/ydb/core/tx/tx_proxy/schemereq.cpp @@ -356,6 +356,9 @@ struct TBaseSchemeReq: public TActorBootstrapped { case NKikimrSchemeOp::ESchemeOpAlterResourcePool: return *modifyScheme.MutableCreateResourcePool()->MutableName(); + + case NKikimrSchemeOp::ESchemeOpRestoreIncrementalBackup: + return *modifyScheme.MutableRestoreIncrementalBackup()->MutableSrcTableName(); } } @@ -615,6 +618,7 @@ struct TBaseSchemeReq: public TActorBootstrapped { case NKikimrSchemeOp::ESchemeOpAlterContinuousBackup: case NKikimrSchemeOp::ESchemeOpDropContinuousBackup: case NKikimrSchemeOp::ESchemeOpAlterResourcePool: + case NKikimrSchemeOp::ESchemeOpRestoreIncrementalBackup: { auto toResolve = TPathToResolve(pbModifyScheme.GetOperationType()); toResolve.Path = Merge(workingDir, SplitPath(GetPathNameForScheme(pbModifyScheme))); diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp index ed18ba73f2f8..227846800727 100644 --- a/ydb/core/ydb_convert/table_description.cpp +++ b/ydb/core/ydb_convert/table_description.cpp @@ -1417,6 +1417,10 @@ bool FillTableDescription(NKikimrSchemeOp::TModifyScheme& out, return false; } + if (in.incremental_backup()) { + tableDesc->SetIncrementalBackup(true); + } + return true; } @@ -1468,4 +1472,16 @@ void FillSequenceDescription(Ydb::Table::CreateTableRequest& out, const NKikimrS } } +void FillBackupDescription(Ydb::Table::CreateTableRequest& out, const NKikimrSchemeOp::TTableDescription& in) { + if (in.GetIncrementalBackup()) { + out.set_incremental_backup(true); + } +} + +void FillBackupDescription(Ydb::Table::DescribeTableResult& out, const NKikimrSchemeOp::TTableDescription& in) { + if (in.GetIncrementalBackup()) { + out.set_incremental_backup(true); + } +} + } // namespace NKikimr diff --git a/ydb/core/ydb_convert/table_description.h b/ydb/core/ydb_convert/table_description.h index 197b4581015d..89d93bc72318 100644 --- a/ydb/core/ydb_convert/table_description.h +++ b/ydb/core/ydb_convert/table_description.h @@ -137,4 +137,12 @@ void FillSequenceDescription(Ydb::Table::CreateTableRequest& out, void FillSequenceDescription(Ydb::Table::CreateTableRequest& out, const NKikimrSchemeOp::TTableDescription& in); +// out +void FillBackupDescription(Ydb::Table::CreateTableRequest& out, + const NKikimrSchemeOp::TTableDescription& in); + +// out +void FillBackupDescription(Ydb::Table::DescribeTableResult& out, + const NKikimrSchemeOp::TTableDescription& in); + } // namespace NKikimr diff --git a/ydb/core/ydb_convert/table_settings.cpp b/ydb/core/ydb_convert/table_settings.cpp index 9ee281f3f648..28063416400b 100644 --- a/ydb/core/ydb_convert/table_settings.cpp +++ b/ydb/core/ydb_convert/table_settings.cpp @@ -241,6 +241,8 @@ bool FillCreateTableSettingsDesc(NKikimrSchemeOp::TTableDescription& tableDesc, tableDesc.SetTemporary(proto.Gettemporary()); + tableDesc.SetIncrementalBackup(proto.incremental_backup()); + return true; } diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto index a84139216d32..5407adf17acf 100644 --- a/ydb/public/api/protos/ydb_table.proto +++ b/ydb/public/api/protos/ydb_table.proto @@ -573,6 +573,8 @@ message CreateTableRequest { bool temporary = 19; // Is table column or row oriented StoreType store_type = 20; + // Is incremental backup table + bool incremental_backup = 21; } message CreateTableResponse { @@ -774,6 +776,8 @@ message DescribeTableResult { bool temporary = 17; // Is table column or row oriented StoreType store_type = 18; + // Is incremental backup table + bool incremental_backup = 21; } message Query { diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp index 523ad219d50f..e98e5ed14b1e 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp @@ -278,6 +278,7 @@ class TTableDescription::TImpl { , PartitioningSettings_(proto.partitioning_settings()) , HasStorageSettings_(proto.has_storage_settings()) , HasPartitioningSettings_(proto.has_partitioning_settings()) + , IsIncrementalBackup_(proto.incremental_backup()) { // primary key for (const auto& pk : proto.primary_key()) { @@ -626,6 +627,10 @@ class TTableDescription::TImpl { return ReadReplicasSettings_; } + bool IsIncrementalBackup() const { + return IsIncrementalBackup_; + } + private: Ydb::Table::DescribeTableResult Proto_; TStorageSettings StorageSettings_; @@ -652,6 +657,7 @@ class TTableDescription::TImpl { bool HasStorageSettings_ = false; bool HasPartitioningSettings_ = false; EStoreType StoreType_ = EStoreType::Row; + bool IsIncrementalBackup_ = false; }; TTableDescription::TTableDescription() @@ -880,6 +886,10 @@ const Ydb::Table::DescribeTableResult& TTableDescription::GetProto() const { return Impl_->GetProto(); } +bool TTableDescription::IsIncrementalBackup() const { + return Impl_->IsIncrementalBackup(); +} + void TTableDescription::SerializeTo(Ydb::Table::CreateTableRequest& request) const { for (const auto& column : Impl_->GetColumns()) { auto& protoColumn = *request.add_columns(); @@ -962,6 +972,10 @@ void TTableDescription::SerializeTo(Ydb::Table::CreateTableRequest& request) con break; } } + + if (Impl_->IsIncrementalBackup()) { + request.set_incremental_backup(true); + } } //////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h index d76c0ff2974b..afb237532598 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.h +++ b/ydb/public/sdk/cpp/client/ydb_table/table.h @@ -567,6 +567,8 @@ class TTableDescription { // Returns read replicas settings of the table TMaybe GetReadReplicasSettings() const; + bool IsIncrementalBackup() const; + // Fills CreateTableRequest proto from this description void SerializeTo(Ydb::Table::CreateTableRequest& request) const; diff --git a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp index 91bf31dd8895..439c2488a91d 100644 --- a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp @@ -200,6 +200,7 @@ bool TReadInitAndAuthActor::CheckTopicACL( )) { return false; } + return true; if (!SkipReadRuleCheck && (Token || AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen())) { //TODO : add here checking of client-service-type password. Provide it via API-call. if (!NPQ::HasConsumer(pqDescr.GetPQTabletConfig(), ClientId)) {