From 7603cc21bd046f944b3dde4d40d4f0f5374bd2d9 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Mon, 1 Jul 2024 10:44:27 +0000 Subject: [PATCH 1/9] add incremental backup flag for table --- ydb/core/grpc_services/rpc_describe_table.cpp | 1 + ydb/core/persqueue/offload_actor.cpp | 6 +++--- ydb/core/protos/flat_scheme_op.proto | 3 +++ ydb/core/tx/datashard/export_common.cpp | 1 + ydb/core/tx/schemeshard/schemeshard__init.cpp | 6 ++++-- ...eshard__operation_alter_continuous_backup.cpp | 3 ++- .../schemeshard__operation_create_table.cpp | 3 ++- ydb/core/tx/schemeshard/schemeshard_impl.cpp | 6 ++++-- .../tx/schemeshard/schemeshard_info_types.cpp | 1 + ydb/core/tx/schemeshard/schemeshard_info_types.h | 3 +++ .../schemeshard/schemeshard_path_describer.cpp | 2 ++ ydb/core/tx/schemeshard/schemeshard_schema.h | 8 ++++++-- ydb/core/ydb_convert/table_description.cpp | 16 ++++++++++++++++ ydb/core/ydb_convert/table_description.h | 8 ++++++++ ydb/public/api/protos/ydb_table.proto | 4 ++++ ydb/public/sdk/cpp/client/ydb_table/table.cpp | 10 ++++++++++ 16 files changed, 70 insertions(+), 11 deletions(-) diff --git a/ydb/core/grpc_services/rpc_describe_table.cpp b/ydb/core/grpc_services/rpc_describe_table.cpp index 05f481e60c89..693213dea88c 100644 --- a/ydb/core/grpc_services/rpc_describe_table.cpp +++ b/ydb/core/grpc_services/rpc_describe_table.cpp @@ -106,6 +106,7 @@ class TDescribeTableRPC : public TRpcSchemeRequestActorGetTypeRewrite()) { - default: - Y_VERIFY_S(false, "Unhandled event type: " << ev->GetTypeRewrite() - << " event: " << ev->ToString()); + // default: + // Y_VERIFY_S(false, "Unhandled event type: " << ev->GetTypeRewrite() + // << " event: " << ev->ToString()); } } }; diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 864ec8e74d0d..91917356dc59 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -395,6 +395,8 @@ message TTableDescription { optional TTableReplicationConfig ReplicationConfig = 40; optional bool Temporary = 41; + + optional bool IncrementalBackup = 42; } message TDictionaryEncodingSettings { @@ -947,6 +949,7 @@ message TAlterContinuousBackup { } message TTakeIncrementalBackup { + optional string DstPath = 1; } oneof Action { diff --git a/ydb/core/tx/datashard/export_common.cpp b/ydb/core/tx/datashard/export_common.cpp index ee801ebe4c52..72f9823ef2ac 100644 --- a/ydb/core/tx/datashard/export_common.cpp +++ b/ydb/core/tx/datashard/export_common.cpp @@ -67,6 +67,7 @@ TMaybe GenYdbScheme( FillKeyBloomFilter(scheme, tableDesc); FillReadReplicasSettings(scheme, tableDesc); FillSequenceDescription(scheme, tableDesc); + FillBackupDescription(scheme, tableDesc); return scheme; } diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 0b77b71a9656..6dc5a91d341f 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -307,7 +307,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase { return true; } - typedef std::tuple TTableRec; + typedef std::tuple TTableRec; typedef TDeque TTableRows; template @@ -323,7 +323,8 @@ struct TSchemeShard::TTxInit : public TTransactionBase { rowSet.template GetValueOrDefault(false), rowSet.template GetValueOrDefault(), rowSet.template GetValueOrDefault(false), - rowSet.template GetValueOrDefault("") + rowSet.template GetValueOrDefault(""), + rowSet.template GetValueOrDefault(false) ); } @@ -1830,6 +1831,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase { } tableInfo->IsBackup = std::get<8>(rec); + tableInfo->IsIncrementalBackup = std::get<12>(rec); Self->Tables[pathId] = tableInfo; Self->IncrementPathDbRefCount(pathId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp index 24f9c7c0d0f8..9fbc5fbafc5e 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp @@ -45,6 +45,7 @@ void DoCreateIncBackupTable(const TOperationId& opId, const TPath& dst, NKikimrS auto& desc = *outTx.MutableCreateTable(); desc.CopyFrom(tableDesc); desc.SetName(dst.LeafName()); + desc.SetIncrementalBackup(true); auto& replicationConfig = *desc.MutableReplicationConfig(); replicationConfig.SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_READ_ONLY); @@ -78,7 +79,7 @@ TVector CreateAlterContinuousBackup(TOperationId opId, cons const auto topicPath = streamPath.Child("streamImpl"); TTopicInfo::TPtr topic = context.SS->Topics.at(topicPath.Base()->PathId); - const auto backupTablePath = tablePath.Child("incBackupImpl"); + const auto backupTablePath = workingDirPath.Child(cbOp.GetTakeIncrementalBackup().GetDstPath()); const NScheme::TTypeRegistry* typeRegistry = AppData(context.Ctx)->TypeRegistry; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp index 029e72fe4c3f..fc64ffd76378 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp @@ -559,7 +559,8 @@ class TCreateTable: public TSubOperation { const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry; const TSchemeLimits& limits = domainInfo->GetSchemeLimits(); - TTableInfo::TAlterDataPtr alterData = TTableInfo::CreateAlterData(nullptr, schema, *typeRegistry, limits, *domainInfo, context.SS->EnableTablePgTypes, context.SS->EnableTableDatetime64, errStr, LocalSequences); + TTableInfo::TAlterDataPtr alterData = TTableInfo::CreateAlterData( + nullptr, schema, *typeRegistry, limits, *domainInfo, context.SS->EnableTablePgTypes, context.SS->EnableTableDatetime64, errStr, LocalSequences); if (!alterData.Get()) { result->SetError(NKikimrScheme::StatusSchemeError, errStr); return result; diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 05098b99d693..bf6ae6c2b15a 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -2645,7 +2645,8 @@ void TSchemeShard::PersistTableAltered(NIceDb::TNiceDb& db, const TPathId pathId NIceDb::TUpdate(tableInfo->IsBackup), NIceDb::TUpdate(replicationConfig), NIceDb::TUpdate(tableInfo->IsTemporary), - NIceDb::TUpdate(tableInfo->OwnerActorId.ToString())); + NIceDb::TUpdate(tableInfo->OwnerActorId.ToString()), + NIceDb::TUpdate(tableInfo->IsIncrementalBackup)); } else { db.Table().Key(pathId.OwnerId, pathId.LocalPathId).Update( NIceDb::TUpdate(tableInfo->NextColumnId), @@ -2657,7 +2658,8 @@ void TSchemeShard::PersistTableAltered(NIceDb::TNiceDb& db, const TPathId pathId NIceDb::TUpdate(tableInfo->IsBackup), NIceDb::TUpdate(replicationConfig), NIceDb::TUpdate(tableInfo->IsTemporary), - NIceDb::TUpdate(tableInfo->OwnerActorId.ToString())); + NIceDb::TUpdate(tableInfo->OwnerActorId.ToString()), + NIceDb::TUpdate(tableInfo->IsIncrementalBackup)); } for (auto col : tableInfo->Columns) { diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp index 54bc391e5c6a..072cae7d0553 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp @@ -541,6 +541,7 @@ TTableInfo::TAlterDataPtr TTableInfo::CreateAlterData( } alterData->IsBackup = op.GetIsBackup(); + alterData->IsIncrementalBackup = op.GetIncrementalBackup(); if (source && op.KeyColumnNamesSize() == 0) return alterData; diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index b5297bf20037..281b38922d99 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -381,6 +381,7 @@ struct TTableInfo : public TSimpleRefCount { THashMap Columns; TVector KeyColumnIds; bool IsBackup = false; + bool IsIncrementalBackup = true; NKikimrSchemeOp::TTableDescription TableDescriptionDiff; TMaybeFail TableDescriptionFull; @@ -426,6 +427,7 @@ struct TTableInfo : public TSimpleRefCount { bool IsBackup = false; bool IsTemporary = false; TActorId OwnerActorId; + bool IsIncrementalBackup = false; TAlterTableInfo::TPtr AlterData; @@ -533,6 +535,7 @@ struct TTableInfo : public TSimpleRefCount { , Columns(std::move(alterData.Columns)) , KeyColumnIds(std::move(alterData.KeyColumnIds)) , IsBackup(alterData.IsBackup) + , IsIncrementalBackup(alterData.IsIncrementalBackup) { TableDescription.Swap(alterData.TableDescriptionFull.Get()); } diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp index af1413ee1000..17531fb67179 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp @@ -1193,6 +1193,8 @@ void TSchemeShard::DescribeTable(const TTableInfo::TPtr tableInfo, const NScheme } entry->SetIsBackup(tableInfo->IsBackup); + + entry->SetIncrementalBackup(tableInfo->IsIncrementalBackup); } void TSchemeShard::DescribeTableIndex(const TPathId& pathId, const TString& name, diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index ac7cce9805c8..0f64921d3934 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -129,6 +129,7 @@ struct Schema : NIceDb::Schema { struct ReplicationConfig : Column<10, NScheme::NTypeIds::String> {}; struct IsTemporary : Column<11, NScheme::NTypeIds::Bool> {}; struct OwnerActorId : Column<12, NScheme::NTypeIds::String> {}; // deprecated + struct IsIncrementalBackup : Column<13, NScheme::NTypeIds::Bool> {}; using TKey = TableKey; using TColumns = TableColumns< @@ -143,7 +144,8 @@ struct Schema : NIceDb::Schema { IsBackup, ReplicationConfig, IsTemporary, - OwnerActorId + OwnerActorId, + IsIncrementalBackup >; }; @@ -162,6 +164,7 @@ struct Schema : NIceDb::Schema { struct ReplicationConfig : Column<11, NScheme::NTypeIds::String> {}; struct IsTemporary : Column<12, NScheme::NTypeIds::Bool> {}; struct OwnerActorId : Column<13, NScheme::NTypeIds::String> {}; // deprecated + struct IsIncrementalBackup : Column<14, NScheme::NTypeIds::Bool> {}; using TKey = TableKey; using TColumns = TableColumns< @@ -177,7 +180,8 @@ struct Schema : NIceDb::Schema { IsBackup, ReplicationConfig, IsTemporary, - OwnerActorId + OwnerActorId, + IsIncrementalBackup >; }; diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp index ed18ba73f2f8..227846800727 100644 --- a/ydb/core/ydb_convert/table_description.cpp +++ b/ydb/core/ydb_convert/table_description.cpp @@ -1417,6 +1417,10 @@ bool FillTableDescription(NKikimrSchemeOp::TModifyScheme& out, return false; } + if (in.incremental_backup()) { + tableDesc->SetIncrementalBackup(true); + } + return true; } @@ -1468,4 +1472,16 @@ void FillSequenceDescription(Ydb::Table::CreateTableRequest& out, const NKikimrS } } +void FillBackupDescription(Ydb::Table::CreateTableRequest& out, const NKikimrSchemeOp::TTableDescription& in) { + if (in.GetIncrementalBackup()) { + out.set_incremental_backup(true); + } +} + +void FillBackupDescription(Ydb::Table::DescribeTableResult& out, const NKikimrSchemeOp::TTableDescription& in) { + if (in.GetIncrementalBackup()) { + out.set_incremental_backup(true); + } +} + } // namespace NKikimr diff --git a/ydb/core/ydb_convert/table_description.h b/ydb/core/ydb_convert/table_description.h index 197b4581015d..89d93bc72318 100644 --- a/ydb/core/ydb_convert/table_description.h +++ b/ydb/core/ydb_convert/table_description.h @@ -137,4 +137,12 @@ void FillSequenceDescription(Ydb::Table::CreateTableRequest& out, void FillSequenceDescription(Ydb::Table::CreateTableRequest& out, const NKikimrSchemeOp::TTableDescription& in); +// out +void FillBackupDescription(Ydb::Table::CreateTableRequest& out, + const NKikimrSchemeOp::TTableDescription& in); + +// out +void FillBackupDescription(Ydb::Table::DescribeTableResult& out, + const NKikimrSchemeOp::TTableDescription& in); + } // namespace NKikimr diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto index a84139216d32..5407adf17acf 100644 --- a/ydb/public/api/protos/ydb_table.proto +++ b/ydb/public/api/protos/ydb_table.proto @@ -573,6 +573,8 @@ message CreateTableRequest { bool temporary = 19; // Is table column or row oriented StoreType store_type = 20; + // Is incremental backup table + bool incremental_backup = 21; } message CreateTableResponse { @@ -774,6 +776,8 @@ message DescribeTableResult { bool temporary = 17; // Is table column or row oriented StoreType store_type = 18; + // Is incremental backup table + bool incremental_backup = 21; } message Query { diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp index 523ad219d50f..c5b51c3bca4a 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp @@ -278,6 +278,7 @@ class TTableDescription::TImpl { , PartitioningSettings_(proto.partitioning_settings()) , HasStorageSettings_(proto.has_storage_settings()) , HasPartitioningSettings_(proto.has_partitioning_settings()) + , IsIncrementalBackup_(proto.incremental_backup()) { // primary key for (const auto& pk : proto.primary_key()) { @@ -626,6 +627,10 @@ class TTableDescription::TImpl { return ReadReplicasSettings_; } + bool IsIncrementalBackup() const { + return IsIncrementalBackup_; + } + private: Ydb::Table::DescribeTableResult Proto_; TStorageSettings StorageSettings_; @@ -652,6 +657,7 @@ class TTableDescription::TImpl { bool HasStorageSettings_ = false; bool HasPartitioningSettings_ = false; EStoreType StoreType_ = EStoreType::Row; + bool IsIncrementalBackup_ = false; }; TTableDescription::TTableDescription() @@ -962,6 +968,10 @@ void TTableDescription::SerializeTo(Ydb::Table::CreateTableRequest& request) con break; } } + + if (Impl_->IsIncrementalBackup()) { + request.set_incremental_backup(true); + } } //////////////////////////////////////////////////////////////////////////////// From b07ff37088a487f2d7432f51ee5826377dfe944c Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Mon, 1 Jul 2024 12:10:43 +0000 Subject: [PATCH 2/9] fix inc-backup restore table creation --- ydb/core/ydb_convert/table_settings.cpp | 2 ++ ydb/public/sdk/cpp/client/ydb_table/table.cpp | 4 ++++ ydb/public/sdk/cpp/client/ydb_table/table.h | 2 ++ 3 files changed, 8 insertions(+) diff --git a/ydb/core/ydb_convert/table_settings.cpp b/ydb/core/ydb_convert/table_settings.cpp index 9ee281f3f648..28063416400b 100644 --- a/ydb/core/ydb_convert/table_settings.cpp +++ b/ydb/core/ydb_convert/table_settings.cpp @@ -241,6 +241,8 @@ bool FillCreateTableSettingsDesc(NKikimrSchemeOp::TTableDescription& tableDesc, tableDesc.SetTemporary(proto.Gettemporary()); + tableDesc.SetIncrementalBackup(proto.incremental_backup()); + return true; } diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp index c5b51c3bca4a..e98e5ed14b1e 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp @@ -886,6 +886,10 @@ const Ydb::Table::DescribeTableResult& TTableDescription::GetProto() const { return Impl_->GetProto(); } +bool TTableDescription::IsIncrementalBackup() const { + return Impl_->IsIncrementalBackup(); +} + void TTableDescription::SerializeTo(Ydb::Table::CreateTableRequest& request) const { for (const auto& column : Impl_->GetColumns()) { auto& protoColumn = *request.add_columns(); diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h index d76c0ff2974b..afb237532598 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.h +++ b/ydb/public/sdk/cpp/client/ydb_table/table.h @@ -567,6 +567,8 @@ class TTableDescription { // Returns read replicas settings of the table TMaybe GetReadReplicasSettings() const; + bool IsIncrementalBackup() const; + // Fills CreateTableRequest proto from this description void SerializeTo(Ydb::Table::CreateTableRequest& request) const; From d83b3ce913dbce3ed63d9590ea9cddfd481593ec Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Tue, 2 Jul 2024 13:25:11 +0000 Subject: [PATCH 3/9] WIP --- .../backup/impl/local_partition_reader.cpp | 17 +- ydb/core/backup/impl/table_writer.cpp | 12 +- ydb/core/persqueue/offload_actor.cpp | 8 +- ydb/core/persqueue/read_balancer.cpp | 3 +- ydb/core/protos/counters_schemeshard.proto | 4 + ydb/core/protos/flat_scheme_op.proto | 9 + ydb/core/protos/pqconfig.proto | 5 + .../tx/datashard/datashard_repl_apply.cpp | 52 +++--- .../tx/schemeshard/schemeshard__operation.cpp | 4 + ...tion_create_restore_incremental_backup.cpp | 174 ++++++++++++++++++ .../schemeshard/schemeshard__operation_part.h | 2 + .../schemeshard_audit_log_fragment.cpp | 8 + ydb/core/tx/schemeshard/schemeshard_impl.cpp | 1 + .../tx/schemeshard/schemeshard_tx_infly.h | 4 + ydb/core/tx/schemeshard/ya.make | 1 + ydb/core/tx/tx_proxy/schemereq.cpp | 4 + .../actors/read_init_auth_actor.cpp | 1 + 17 files changed, 273 insertions(+), 36 deletions(-) create mode 100644 ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp diff --git a/ydb/core/backup/impl/local_partition_reader.cpp b/ydb/core/backup/impl/local_partition_reader.cpp index 32eb352fd000..462380077f6e 100644 --- a/ydb/core/backup/impl/local_partition_reader.cpp +++ b/ydb/core/backup/impl/local_partition_reader.cpp @@ -58,12 +58,16 @@ class TLocalPartitionReader Send(PQTablet, CreateGetOffsetRequest().Release()); } + void HandleInit() { + Send(PQTablet, CreateGetOffsetRequest().Release()); + } + void HandleInit(TEvPersQueue::TEvResponse::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); auto& record = ev->Get()->Record; if (record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { - // TODO reschedule - Y_ABORT("Unimplemented!"); + Schedule(TDuration::Seconds(1), new NActors::TEvents::TEvWakeup); + return; } Y_VERIFY_S(record.GetErrorCode() == NPersQueue::NErrorCode::OK, "Unimplemented!"); Y_VERIFY_S(record.HasPartitionResponse() && record.GetPartitionResponse().HasCmdGetClientOffsetResult(), "Unimplemented!"); @@ -85,7 +89,7 @@ class TLocalPartitionReader auto& read = *req.MutableCmdRead(); read.SetOffset(Offset); read.SetClientId(OFFLOAD_ACTOR_CLIENT_ID); - read.SetTimeoutMs(0); + read.SetTimeoutMs(1000); read.SetBytes(1_MB); return request; @@ -114,6 +118,12 @@ class TLocalPartitionReader const auto& readResult = record.GetPartitionResponse().GetCmdReadResult(); + if (!readResult.ResultSize()) { + Y_ABORT_UNLESS(PQTablet); + Send(PQTablet, CreateReadRequest().Release()); + return; + } + auto gotOffset = Offset; TVector records(::Reserve(readResult.ResultSize())); @@ -147,6 +157,7 @@ class TLocalPartitionReader switch (ev->GetTypeRewrite()) { hFunc(TEvWorker::TEvHandshake, HandleInit); hFunc(TEvPersQueue::TEvResponse, HandleInit); + sFunc(TEvents::TEvWakeup, HandleInit); sFunc(TEvents::TEvPoison, PassAway); default: Y_VERIFY_S(false, "Unhandled event type: " << ev->GetTypeRewrite() diff --git a/ydb/core/backup/impl/table_writer.cpp b/ydb/core/backup/impl/table_writer.cpp index 157e30c75edc..0c185bdb73b8 100644 --- a/ydb/core/backup/impl/table_writer.cpp +++ b/ydb/core/backup/impl/table_writer.cpp @@ -37,9 +37,15 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase { record.SetKey(ProtoBody.GetCdcDataChange().GetKey().GetData()); auto& upsert = *record.MutableUpsert(); - *upsert.MutableTags() = { - ProtoBody.GetCdcDataChange().GetUpsert().GetTags().begin(), - ProtoBody.GetCdcDataChange().GetUpsert().GetTags().end()}; + auto tagsSize = ProtoBody.GetCdcDataChange().GetUpsert().TagsSize(); + ui64 i = 0; + for (auto& tag : ProtoBody.GetCdcDataChange().GetUpsert().GetTags()) { + ++i; + if (i == tagsSize) { + break; + } + upsert.AddTags(tag); + } upsert.SetData(ProtoBody.GetCdcDataChange().GetUpsert().GetData()); } diff --git a/ydb/core/persqueue/offload_actor.cpp b/ydb/core/persqueue/offload_actor.cpp index 5ece1cbed35d..898e89074fcb 100644 --- a/ydb/core/persqueue/offload_actor.cpp +++ b/ydb/core/persqueue/offload_actor.cpp @@ -34,7 +34,6 @@ class TOffloadActor const TActorId ParentTablet; const ui32 Partition; const NKikimrPQ::TOffloadConfig Config; - const TPathId DstPathId; mutable TMaybe LogPrefix; TActorId Worker; @@ -60,14 +59,17 @@ class TOffloadActor : ParentTablet(parentTablet) , Partition(partition) , Config(config) - , DstPathId(PathIdFromPathId(config.GetIncrementalBackup().GetDstPathId())) {} void Bootstrap() { auto* workerActor = CreateWorker( SelfId(), [=]() -> IActor* { return NBackup::NImpl::CreateLocalPartitionReader(ParentTablet, Partition); }, - [=]() -> IActor* { return NBackup::NImpl::CreateLocalTableWriter(DstPathId); }); + [=]() -> IActor* { return NBackup::NImpl::CreateLocalTableWriter( + Config.HasIncrementalBackup() + ? PathIdFromPathId(Config.GetIncrementalBackup().GetDstPathId()) + : PathIdFromPathId(Config.GetIncrementalRestore().GetDstPathId())); + }); Worker = TActivationContext::Register(workerActor); Become(&TOffloadActor::StateWork); diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp index 495277081072..7e99f90329f7 100644 --- a/ydb/core/persqueue/read_balancer.cpp +++ b/ydb/core/persqueue/read_balancer.cpp @@ -366,7 +366,8 @@ void TPersQueueReadBalancer::CheckACL(const TEvPersQueue::TEvCheckACL::TPtr &req }; TString user = record.HasUser() ? record.GetUser() : ""; - + RespondWithACL(request, NKikimrPQ::EAccess::ALLOWED, "", ctx); + return; if (record.GetOperation() == NKikimrPQ::EOperation::READ_OP) { if (!Consumers.contains(user)) { RespondWithACL(request, NKikimrPQ::EAccess::DENIED, TStringBuilder() << "no read rule provided for consumer '" diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto index 15f8bb5d7f2b..26f739c4d19d 100644 --- a/ydb/core/protos/counters_schemeshard.proto +++ b/ydb/core/protos/counters_schemeshard.proto @@ -220,6 +220,8 @@ enum ESimpleCounters { COUNTER_IN_FLIGHT_OPS_TxCreateResourcePool = 174 [(CounterOpts) = {Name: "InFlightOps/CreateResourcePool"}]; COUNTER_IN_FLIGHT_OPS_TxDropResourcePool = 175 [(CounterOpts) = {Name: "InFlightOps/DropResourcePool"}]; COUNTER_IN_FLIGHT_OPS_TxAlterResourcePool = 176 [(CounterOpts) = {Name: "InFlightOps/AlterResourcePool"}]; + + COUNTER_IN_FLIGHT_OPS_TxRestoreIncrementalBackup = 177 [(CounterOpts) = {Name: "InFlightOps/RestoreIncrementalBackup"}]; } enum ECumulativeCounters { @@ -351,6 +353,8 @@ enum ECumulativeCounters { COUNTER_FINISHED_OPS_TxCreateResourcePool = 103 [(CounterOpts) = {Name: "FinishedOps/CreateResourcePool"}]; COUNTER_FINISHED_OPS_TxDropResourcePool = 104 [(CounterOpts) = {Name: "FinishedOps/DropResourcePool"}]; COUNTER_FINISHED_OPS_TxAlterResourcePool = 105 [(CounterOpts) = {Name: "FinishedOps/AlterResourcePool"}]; + + COUNTER_FINISHED_OPS_TxRestoreIncrementalBackup = 106 [(CounterOpts) = {Name: "FinishedOps/RestoreIncrementalBackup"}]; } enum EPercentileCounters { diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 91917356dc59..59e98b0c5013 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -962,6 +962,11 @@ message TDropContinuousBackup { optional string TableName = 1; } +message TRestoreIncrementalBackup { + optional string SrcTableName = 1; + optional string DstTableName = 2; +} + enum EIndexType { EIndexTypeInvalid = 0; EIndexTypeGlobal = 1; @@ -1592,6 +1597,8 @@ enum EOperationType { ESchemeOpCreateResourcePool = 100; ESchemeOpDropResourcePool = 101; ESchemeOpAlterResourcePool = 102; + + ESchemeOpRestoreIncrementalBackup = 103; } message TApplyIf { @@ -1774,6 +1781,8 @@ message TModifyScheme { optional bool AllowCreateInTempDir = 71 [default = false]; optional TResourcePoolDescription CreateResourcePool = 72; + + optional TRestoreIncrementalBackup RestoreIncrementalBackup = 73; } message TCopySequence { diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 60e1a87eb6fa..6a8be152c5f3 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -311,8 +311,13 @@ message TOffloadConfig { optional NKikimrProto.TPathID DstPathId = 2; } + message TIncrementalRestore { + optional NKikimrProto.TPathID DstPathId = 1; + } + oneof Strategy { TIncrementalBackup IncrementalBackup = 1; + TIncrementalRestore IncrementalRestore = 2; } } diff --git a/ydb/core/tx/datashard/datashard_repl_apply.cpp b/ydb/core/tx/datashard/datashard_repl_apply.cpp index 92533d7befde..cc01c35ce815 100644 --- a/ydb/core/tx/datashard/datashard_repl_apply.cpp +++ b/ydb/core/tx/datashard/datashard_repl_apply.cpp @@ -31,12 +31,12 @@ class TDataShard::TTxApplyReplicationChanges : public TTransactionBaseIsReplicated()) { - Result = MakeHolder( - NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED, - NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_BAD_REQUEST); - return true; - } + // if (!Self->IsReplicated()) { + // Result = MakeHolder( + // NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED, + // NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_BAD_REQUEST); + // return true; + // } const auto& msg = Ev->Get()->Record; @@ -102,29 +102,29 @@ class TDataShard::TTxApplyReplicationChanges : public TTransactionBase( - NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED, - NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_BAD_REQUEST, - "WriteTxId cannot be specified for weak consistency"); - return false; - } - } else { - if (writeTxId == 0) { - Result = MakeHolder( - NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED, - NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_BAD_REQUEST, - "Non-zero WriteTxId must be specified for strong consistency"); - return false; - } - } + // if (userTable.ReplicationConfig.HasWeakConsistency()) { + // if (writeTxId) { + // Result = MakeHolder( + // NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED, + // NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_BAD_REQUEST, + // "WriteTxId cannot be specified for weak consistency"); + // return false; + // } + // } else { + // if (writeTxId == 0) { + // Result = MakeHolder( + // NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED, + // NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_BAD_REQUEST, + // "Non-zero WriteTxId must be specified for strong consistency"); + // return false; + // } + // } TSerializedCellVec keyCellVec; if (!TSerializedCellVec::TryParse(change.GetKey(), keyCellVec) || @@ -207,8 +207,8 @@ class TDataShard::TTxApplyReplicationChanges : public TTransactionBase( NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED, diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.cpp b/ydb/core/tx/schemeshard/schemeshard__operation.cpp index ee7de50fa0a1..68504676510d 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation.cpp @@ -906,6 +906,8 @@ ISubOperation::TPtr TOperation::RestorePart(TTxState::ETxType txType, TTxState:: return CreateBackup(NextPartId(), txState); case TTxState::ETxType::TxRestore: return CreateRestore(NextPartId(), txState); + case TTxState::ETxType::TxRestoreIncrementalBackup: + Y_ABORT("TODO: implement"); case TTxState::ETxType::TxDropTable: return CreateDropTable(NextPartId(), txState); case TTxState::ETxType::TxCreateTableIndex: @@ -1343,6 +1345,8 @@ TVector TOperation::ConstructParts(const TTxTransaction& tx case NKikimrSchemeOp::EOperationType::ESchemeOpAlterResourcePool: return {CreateAlterResourcePool(NextPartId(), tx)}; + case NKikimrSchemeOp::EOperationType::ESchemeOpRestoreIncrementalBackup: + return CreateRestoreIncrementalBackup(NextPartId(), tx, context); } Y_UNREACHABLE(); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp new file mode 100644 index 000000000000..87566f73575c --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp @@ -0,0 +1,174 @@ +#include "schemeshard__operation_part.h" +#include "schemeshard__operation_common.h" +#include "schemeshard_impl.h" + +#include "schemeshard__operation_create_cdc_stream.h" + +namespace { + +const char* IB_RESTORE_CDC_STREAM_NAME = "__ib_restore_stream"; + +} + +#define LOG_D(stream) LOG_DEBUG_S (context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream) +#define LOG_I(stream) LOG_INFO_S (context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream) +#define LOG_N(stream) LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream) + +namespace NKikimr::NSchemeShard { + +void DoCreateLock(const TOperationId opId, const TPath& workingDirPath, const TPath& tablePath, bool allowIndexImplLock, + TVector& result) +{ + auto outTx = TransactionTemplate(workingDirPath.PathString(), + NKikimrSchemeOp::EOperationType::ESchemeOpCreateLock); + outTx.SetFailOnExist(false); + outTx.SetInternal(true); + auto cfg = outTx.MutableLockConfig(); + cfg->SetName(tablePath.LeafName()); + cfg->SetAllowIndexImplLock(allowIndexImplLock); + + result.push_back(CreateLock(NextPartId(opId, result), outTx)); +} + +void DoCreatePqPart( + const TOperationId& opId, + const TPath& streamPath, + const TString& streamName, + const TIntrusivePtr table, + const TPathId dstPathId, + const NKikimrSchemeOp::TCreateCdcStream& op, + const TVector& boundaries, + const bool acceptExisted, + TVector& result) +{ + auto outTx = TransactionTemplate(streamPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreatePersQueueGroup); + outTx.SetFailOnExist(!acceptExisted); + + auto& desc = *outTx.MutableCreatePersQueueGroup(); + desc.SetName("streamImpl"); + desc.SetTotalGroupCount(op.HasTopicPartitions() ? op.GetTopicPartitions() : table->GetPartitions().size()); + desc.SetPartitionPerTablet(2); + + auto& pqConfig = *desc.MutablePQTabletConfig(); + pqConfig.SetTopicName(streamName); + pqConfig.SetTopicPath(streamPath.Child("streamImpl").PathString()); + pqConfig.SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS); + + auto& partitionConfig = *pqConfig.MutablePartitionConfig(); + partitionConfig.SetLifetimeSeconds(op.GetRetentionPeriodSeconds()); + partitionConfig.SetWriteSpeedInBytesPerSecond(1_MB); // TODO: configurable write speed + partitionConfig.SetBurstSize(1_MB); // TODO: configurable burst + partitionConfig.SetMaxCountInPartition(Max()); + + for (const auto& tag : table->KeyColumnIds) { + Y_ABORT_UNLESS(table->Columns.contains(tag)); + const auto& column = table->Columns.at(tag); + + auto& keyComponent = *pqConfig.AddPartitionKeySchema(); + keyComponent.SetName(column.Name); + auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(column.PType, column.PTypeMod); + keyComponent.SetTypeId(columnType.TypeId); + if (columnType.TypeInfo) { + *keyComponent.MutableTypeInfo() = *columnType.TypeInfo; + } + } + + for (const auto& serialized : boundaries) { + TSerializedCellVec endKey(serialized); + Y_ABORT_UNLESS(endKey.GetCells().size() <= table->KeyColumnIds.size()); + + TString errStr; + auto& boundary = *desc.AddPartitionBoundaries(); + for (ui32 ki = 0; ki < endKey.GetCells().size(); ++ki) { + const auto& cell = endKey.GetCells()[ki]; + const auto tag = table->KeyColumnIds.at(ki); + Y_ABORT_UNLESS(table->Columns.contains(tag)); + const auto typeId = table->Columns.at(tag).PType; + const bool ok = NMiniKQL::CellToValue(typeId, cell, *boundary.AddTuple(), errStr); + Y_ABORT_UNLESS(ok, "Failed to build key tuple at position %" PRIu32 " error: %s", ki, errStr.data()); + } + } + + auto& ir = *pqConfig.MutableOffloadConfig()->MutableIncrementalRestore(); + auto* pathId = ir.MutableDstPathId(); + PathIdFromPathId(dstPathId, pathId); + + result.push_back(CreateNewPQ(NextPartId(opId, result), outTx)); +} + +TVector CreateRestoreIncrementalBackup(TOperationId opId, const TTxTransaction& tx, TOperationContext& context) { + Y_ABORT_UNLESS(tx.GetOperationType() == NKikimrSchemeOp::EOperationType::ESchemeOpRestoreIncrementalBackup); + + LOG_N("CreateRestoreIncrementalBackup" + << ": opId# " << opId + << ", tx# " << tx.ShortDebugString()); + + const auto acceptExisted = !tx.GetFailOnExist(); + const auto workingDirPath = TPath::Resolve(tx.GetWorkingDir(), context.SS); + const auto& restoreOp = tx.GetRestoreIncrementalBackup(); + const auto& srcTableName = restoreOp.GetSrcTableName(); + const auto& dstTableName = restoreOp.GetDstTableName(); + const auto dstTablePath = workingDirPath.Child(dstTableName); + + const auto checksResult = NCdc::DoNewStreamPathChecks(opId, workingDirPath, srcTableName, IB_RESTORE_CDC_STREAM_NAME, acceptExisted); + if (std::holds_alternative(checksResult)) { + return {std::get(checksResult)}; + } + + const auto [srcTablePath, streamPath] = std::get(checksResult); + + Y_ABORT_UNLESS(context.SS->Tables.contains(srcTablePath.Base()->PathId)); + auto srcTable = context.SS->Tables.at(srcTablePath.Base()->PathId); + + TString errStr; + if (!context.SS->CheckApplyIf(tx, errStr)) { + return {CreateReject(opId, NKikimrScheme::StatusPreconditionFailed, errStr)}; + } + + if (!context.SS->CheckLocks(srcTablePath.Base()->PathId, tx, errStr)) { + return {CreateReject(opId, NKikimrScheme::StatusMultipleModifications, errStr)}; + } + + // check dst locks + // lock dst + // start replication read on dst + + TVector boundaries; + const auto& partitions = srcTable->GetPartitions(); + boundaries.reserve(partitions.size() - 1); + + for (ui32 i = 0; i < partitions.size(); ++i) { + const auto& partition = partitions.at(i); + if (i != partitions.size() - 1) { + boundaries.push_back(partition.EndOfRange); + } + } + + NKikimrSchemeOp::TCreateCdcStream createCdcStreamOp; + createCdcStreamOp.SetTableName(srcTableName); + auto& streamDescription = *createCdcStreamOp.MutableStreamDescription(); + streamDescription.SetName(IB_RESTORE_CDC_STREAM_NAME); + streamDescription.SetMode(NKikimrSchemeOp::ECdcStreamModeUpdate); + streamDescription.SetFormat(NKikimrSchemeOp::ECdcStreamFormatProto); + streamDescription.SetState(NKikimrSchemeOp::ECdcStreamStateScan); + + TVector result; + + DoCreateLock(opId, workingDirPath, srcTablePath, false, result); + + NCdc::DoCreateStream(createCdcStreamOp, opId, workingDirPath, srcTablePath, acceptExisted, true, {}, result); + DoCreatePqPart( + opId, + streamPath, + IB_RESTORE_CDC_STREAM_NAME, + srcTable, + dstTablePath.Base()->PathId, + createCdcStreamOp, + boundaries, + acceptExisted, + result); + + return result; +} + +} // namespace NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.h b/ydb/core/tx/schemeshard/schemeshard__operation_part.h index 386dcb20768a..08c76fe7d540 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_part.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.h @@ -620,5 +620,7 @@ ISubOperation::TPtr CreateAlterResourcePool(TOperationId id, TTxState::ETxState ISubOperation::TPtr CreateDropResourcePool(TOperationId id, const TTxTransaction& tx); ISubOperation::TPtr CreateDropResourcePool(TOperationId id, TTxState::ETxState state); +TVector CreateRestoreIncrementalBackup(TOperationId opId, const TTxTransaction& tx, TOperationContext& context); + } } diff --git a/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp b/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp index 9c2005264923..a39168f7597e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp @@ -235,12 +235,16 @@ TString DefineUserOperationName(const NKikimrSchemeOp::TModifyScheme& tx) { return "ALTER TABLE ALTER CONTINUOUS BACKUP"; case NKikimrSchemeOp::EOperationType::ESchemeOpDropContinuousBackup: return "ALTER TABLE DROP CONTINUOUS BACKUP"; + // case NKikimrSchemeOp::EOperationType::ESchemeOpCreateResourcePool: return "CREATE RESOURCE POOL"; case NKikimrSchemeOp::EOperationType::ESchemeOpDropResourcePool: return "DROP RESOURCE POOL"; case NKikimrSchemeOp::EOperationType::ESchemeOpAlterResourcePool: return "ALTER RESOURCE POOL"; + // + case NKikimrSchemeOp::EOperationType::ESchemeOpRestoreIncrementalBackup: + return "RESTORE"; } Y_ABORT("switch should cover all operation types"); } @@ -544,6 +548,10 @@ TVector ExtractChangingPaths(const NKikimrSchemeOp::TModifyScheme& tx) case NKikimrSchemeOp::EOperationType::ESchemeOpAlterResourcePool: result.emplace_back(tx.GetCreateResourcePool().GetName()); break; + case NKikimrSchemeOp::EOperationType::ESchemeOpRestoreIncrementalBackup: + result.emplace_back(NKikimr::JoinPath({tx.GetWorkingDir(), tx.GetRestoreIncrementalBackup().GetSrcTableName()})); + result.emplace_back(NKikimr::JoinPath({tx.GetWorkingDir(), tx.GetRestoreIncrementalBackup().GetDstTableName()})); + break; } return result; diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index bf6ae6c2b15a..3d5d1c2783dc 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -1581,6 +1581,7 @@ TPathElement::EPathState TSchemeShard::CalcPathState(TTxState::ETxType txType, T case TTxState::TxBackup: return TPathElement::EPathState::EPathStateBackup; case TTxState::TxRestore: + case TTxState::TxRestoreIncrementalBackup: return TPathElement::EPathState::EPathStateRestore; case TTxState::TxUpgradeSubDomain: return TPathElement::EPathState::EPathStateUpgrade; diff --git a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h index aab23b08df34..00ea21e3dfb7 100644 --- a/ydb/core/tx/schemeshard/schemeshard_tx_infly.h +++ b/ydb/core/tx/schemeshard/schemeshard_tx_infly.h @@ -137,6 +137,7 @@ struct TTxState { item(TxCreateResourcePool, 91) \ item(TxDropResourcePool, 92) \ item(TxAlterResourcePool, 93) \ + item(TxRestoreIncrementalBackup, 94) \ // TX_STATE_TYPE_ENUM @@ -400,6 +401,7 @@ struct TTxState { case TxMergeTablePartition: case TxBackup: case TxRestore: + case TxRestoreIncrementalBackup: case TxAlterBlockStoreVolume: case TxAssignBlockStoreVolume: case TxAlterFileStore: @@ -505,6 +507,7 @@ struct TTxState { case TxMergeTablePartition: case TxBackup: case TxRestore: + case TxRestoreIncrementalBackup: case TxAlterBlockStoreVolume: case TxAssignBlockStoreVolume: case TxAlterFileStore: @@ -610,6 +613,7 @@ struct TTxState { case TxModifyACL: case TxBackup: case TxRestore: + case TxRestoreIncrementalBackup: case TxAlterBlockStoreVolume: case TxAssignBlockStoreVolume: case TxAlterFileStore: diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index 975514b27a12..631c0da8329f 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -232,6 +232,7 @@ SRCS( schemeshard_validate_ttl.cpp operation_queue_timer.h user_attributes.cpp + schemeshard__operation_create_restore_incremental_backup.cpp ) GENERATE_ENUM_SERIALIZATION(schemeshard_info_types.h) diff --git a/ydb/core/tx/tx_proxy/schemereq.cpp b/ydb/core/tx/tx_proxy/schemereq.cpp index b373ccbc83d6..99fa357a3fb2 100644 --- a/ydb/core/tx/tx_proxy/schemereq.cpp +++ b/ydb/core/tx/tx_proxy/schemereq.cpp @@ -356,6 +356,9 @@ struct TBaseSchemeReq: public TActorBootstrapped { case NKikimrSchemeOp::ESchemeOpAlterResourcePool: return *modifyScheme.MutableCreateResourcePool()->MutableName(); + + case NKikimrSchemeOp::ESchemeOpRestoreIncrementalBackup: + return *modifyScheme.MutableRestoreIncrementalBackup()->MutableSrcTableName(); } } @@ -615,6 +618,7 @@ struct TBaseSchemeReq: public TActorBootstrapped { case NKikimrSchemeOp::ESchemeOpAlterContinuousBackup: case NKikimrSchemeOp::ESchemeOpDropContinuousBackup: case NKikimrSchemeOp::ESchemeOpAlterResourcePool: + case NKikimrSchemeOp::ESchemeOpRestoreIncrementalBackup: { auto toResolve = TPathToResolve(pbModifyScheme.GetOperationType()); toResolve.Path = Merge(workingDir, SplitPath(GetPathNameForScheme(pbModifyScheme))); diff --git a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp index 91bf31dd8895..439c2488a91d 100644 --- a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp @@ -200,6 +200,7 @@ bool TReadInitAndAuthActor::CheckTopicACL( )) { return false; } + return true; if (!SkipReadRuleCheck && (Token || AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen())) { //TODO : add here checking of client-service-type password. Provide it via API-call. if (!NPQ::HasConsumer(pqDescr.GetPQTabletConfig(), ClientId)) { From 4b02509d57441e61a0f79d7ae5a67ddb8094b694 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Wed, 3 Jul 2024 11:25:39 +0000 Subject: [PATCH 4/9] move packing --- ydb/core/backup/impl/table_writer.cpp | 6 ---- ydb/core/protos/flat_scheme_op.proto | 1 + ydb/core/tx/datashard/cdc_stream_scan.cpp | 24 ++++++++++++++++ .../datashard/change_collector_cdc_stream.cpp | 28 +++++++++++++++++++ ...hemeshard__operation_create_cdc_stream.cpp | 2 ++ ...tion_create_restore_incremental_backup.cpp | 2 +- 6 files changed, 56 insertions(+), 7 deletions(-) diff --git a/ydb/core/backup/impl/table_writer.cpp b/ydb/core/backup/impl/table_writer.cpp index 0c185bdb73b8..3395e0cd1c62 100644 --- a/ydb/core/backup/impl/table_writer.cpp +++ b/ydb/core/backup/impl/table_writer.cpp @@ -37,13 +37,7 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase { record.SetKey(ProtoBody.GetCdcDataChange().GetKey().GetData()); auto& upsert = *record.MutableUpsert(); - auto tagsSize = ProtoBody.GetCdcDataChange().GetUpsert().TagsSize(); - ui64 i = 0; for (auto& tag : ProtoBody.GetCdcDataChange().GetUpsert().GetTags()) { - ++i; - if (i == tagsSize) { - break; - } upsert.AddTags(tag); } upsert.SetData(ProtoBody.GetCdcDataChange().GetUpsert().GetData()); diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 59e98b0c5013..4f2c30a63b4e 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -876,6 +876,7 @@ enum ECdcStreamMode { ECdcStreamModeNewImage = 3; ECdcStreamModeOldImage = 4; ECdcStreamModeNewAndOldImages = 5; + ECdcStreamModeRestoreIncBackup = 6; } enum ECdcStreamFormat { diff --git a/ydb/core/tx/datashard/cdc_stream_scan.cpp b/ydb/core/tx/datashard/cdc_stream_scan.cpp index 2f67fe5a79ac..40c5ad11ca04 100644 --- a/ydb/core/tx/datashard/cdc_stream_scan.cpp +++ b/ydb/core/tx/datashard/cdc_stream_scan.cpp @@ -208,6 +208,27 @@ class TDataShard::TTxCdcStreamScanProgress return updates; } + static TVector MakeRestoreUpdates(TArrayRef cells, TArrayRef tags, TUserTable::TCPtr table) { + Y_ABORT_UNLESS(cells.size() >= 1); + TVector updates(::Reserve(cells.size() - 1)); + + bool skipped = false; + Y_ABORT_UNLESS(cells.size() == tags.size()); + for (TPos pos = 0; pos < cells.size(); ++pos) { + const auto tag = tags.at(pos); + auto it = table->Columns.find(tag); + Y_ABORT_UNLESS(it != table->Columns.end()); + if (it->second.Name == "__incrBackupImpl_deleted") { + skipped = true; + continue; + } + updates.emplace_back(tag, ECellOp::Set, TRawTypeValue(cells.at(pos).AsRef(), it->second.Type)); + } + Y_ABORT_UNLESS(skipped); + + return updates; + } + static TRowState MakeRow(TArrayRef cells) { TRowState row(cells.size()); @@ -307,6 +328,9 @@ class TDataShard::TTxCdcStreamScanProgress case NKikimrSchemeOp::ECdcStreamModeUpdate: Serialize(body, ERowOp::Upsert, key, keyTags, MakeUpdates(v.GetCells(), valueTags, table)); break; + case NKikimrSchemeOp::ECdcStreamModeRestoreIncBackup: + Serialize(body, ERowOp::Upsert, key, keyTags, MakeRestoreUpdates(v.GetCells(), valueTags, table)); + break; case NKikimrSchemeOp::ECdcStreamModeNewImage: case NKikimrSchemeOp::ECdcStreamModeNewAndOldImages: { const auto newImage = MakeRow(v.GetCells()); diff --git a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp index 30dfb5b8b4ba..c157868f019b 100644 --- a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp @@ -59,6 +59,29 @@ namespace { return result; } + auto MakeRestoreUpdates(TArrayRef cells, TArrayRef tags, TArrayRef types, TUserTable::TCPtr table) { + Y_ABORT_UNLESS(cells.size() >= 1); + TVector result(::Reserve(cells.size() - 1)); + + Y_ABORT_UNLESS(cells.size() == tags.size()); + Y_ABORT_UNLESS(cells.size() == types.size()); + + bool skipped = false; + for (TPos pos = 0; pos < cells.size(); ++pos) { + const auto tag = tags.at(pos); + auto it = table->Columns.find(tag); + Y_ABORT_UNLESS(it != table->Columns.end()); + if (it->second.Name == "__incrBackupImpl_deleted") { + skipped = true; + continue; + } + result.emplace_back(tags.at(pos), ECellOp::Set, TRawTypeValue(cells.at(pos).AsRef(), types.at(pos))); + } + Y_ABORT_UNLESS(skipped); + + return result; + } + auto MakeTagToPos(TArrayRef tags) { THashMap result; @@ -132,6 +155,7 @@ bool TCdcStreamChangeCollector::NeedToReadKeys() const { switch (streamInfo.Mode) { case NKikimrSchemeOp::ECdcStreamModeKeysOnly: case NKikimrSchemeOp::ECdcStreamModeUpdate: + case NKikimrSchemeOp::ECdcStreamModeRestoreIncBackup: break; case NKikimrSchemeOp::ECdcStreamModeNewImage: case NKikimrSchemeOp::ECdcStreamModeOldImage: @@ -227,6 +251,9 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop, case NKikimrSchemeOp::ECdcStreamModeUpdate: Persist(tableId, pathId, ERowOp::Upsert, key, keyTags, MakeUpdates(**initialState, valueTags, valueTypes)); break; + case NKikimrSchemeOp::ECdcStreamModeRestoreIncBackup: + Persist(tableId, pathId, ERowOp::Upsert, key, keyTags, MakeRestoreUpdates(**initialState, valueTags, valueTypes, userTable)); + break; case NKikimrSchemeOp::ECdcStreamModeNewImage: case NKikimrSchemeOp::ECdcStreamModeNewAndOldImages: Persist(tableId, pathId, ERowOp::Upsert, key, keyTags, nullptr, &*initialState, valueTags); @@ -244,6 +271,7 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop, Persist(tableId, pathId, rop, key, keyTags, {}); break; case NKikimrSchemeOp::ECdcStreamModeUpdate: + case NKikimrSchemeOp::ECdcStreamModeRestoreIncBackup: Persist(tableId, pathId, rop, key, keyTags, updates); break; case NKikimrSchemeOp::ECdcStreamModeNewImage: diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index a615d6b68a4a..2a0ea16d3b45 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -202,6 +202,7 @@ class TNewCdcStream: public TSubOperation { case NKikimrSchemeOp::ECdcStreamModeNewImage: case NKikimrSchemeOp::ECdcStreamModeOldImage: case NKikimrSchemeOp::ECdcStreamModeNewAndOldImages: + case NKikimrSchemeOp::ECdcStreamModeRestoreIncBackup: break; case NKikimrSchemeOp::ECdcStreamModeUpdate: if (streamDesc.GetFormat() == NKikimrSchemeOp::ECdcStreamFormatDynamoDBStreamsJson) { @@ -937,6 +938,7 @@ TVector CreateNewCdcStream(TOperationId opId, const TTxTran case NKikimrSchemeOp::ECdcStreamModeNewImage: case NKikimrSchemeOp::ECdcStreamModeOldImage: case NKikimrSchemeOp::ECdcStreamModeNewAndOldImages: + case NKikimrSchemeOp::ECdcStreamModeRestoreIncBackup: break; default: return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, TStringBuilder() diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp index 87566f73575c..065891477e7a 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp @@ -148,7 +148,7 @@ TVector CreateRestoreIncrementalBackup(TOperationId opId, c createCdcStreamOp.SetTableName(srcTableName); auto& streamDescription = *createCdcStreamOp.MutableStreamDescription(); streamDescription.SetName(IB_RESTORE_CDC_STREAM_NAME); - streamDescription.SetMode(NKikimrSchemeOp::ECdcStreamModeUpdate); + streamDescription.SetMode(NKikimrSchemeOp::ECdcStreamModeRestoreIncBackup); streamDescription.SetFormat(NKikimrSchemeOp::ECdcStreamFormatProto); streamDescription.SetState(NKikimrSchemeOp::ECdcStreamStateScan); From 45564b63d924a4f818924ad0eb4449f2a6594589 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Thu, 4 Jul 2024 12:41:46 +0000 Subject: [PATCH 5/9] add erase support --- ydb/core/backup/impl/table_writer.cpp | 104 +++++++++++++++--- ydb/core/backup/impl/table_writer.h | 2 +- ydb/core/persqueue/offload_actor.cpp | 10 +- ydb/core/tx/datashard/cdc_stream_scan.cpp | 16 ++- .../datashard/change_collector_cdc_stream.cpp | 16 ++- ydb/core/tx/datashard/change_record.h | 8 ++ .../replication/service/json_change_record.h | 4 + .../replication/service/table_writer_impl.h | 19 +++- 8 files changed, 147 insertions(+), 32 deletions(-) diff --git a/ydb/core/backup/impl/table_writer.cpp b/ydb/core/backup/impl/table_writer.cpp index 3395e0cd1c62..4df5e1d6fdd1 100644 --- a/ydb/core/backup/impl/table_writer.cpp +++ b/ydb/core/backup/impl/table_writer.cpp @@ -7,6 +7,27 @@ namespace NKikimr::NBackup::NImpl { +class TChangeRecord; + +} + +namespace NKikimr { + +template <> +struct TChangeRecordBuilderContextTrait { + bool Restore; + + TChangeRecordBuilderContextTrait(bool restore) + : Restore(restore) + {} + + TChangeRecordBuilderContextTrait(const TChangeRecordBuilderContextTrait& other) = default; +}; + +} + +namespace NKikimr::NBackup::NImpl { + class TChangeRecord: public NChangeExchange::TChangeRecordBase { friend class TChangeRecordBuilder; @@ -30,24 +51,73 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase { return SourceId; } - void Serialize(NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record) const { - record.SetSourceOffset(GetOrder()); - // TODO: fill WriteTxId - - record.SetKey(ProtoBody.GetCdcDataChange().GetKey().GetData()); - - auto& upsert = *record.MutableUpsert(); - for (auto& tag : ProtoBody.GetCdcDataChange().GetUpsert().GetTags()) { - upsert.AddTags(tag); - } - upsert.SetData(ProtoBody.GetCdcDataChange().GetUpsert().GetData()); - } - void Serialize( NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record, - TChangeRecordBuilderContextTrait&) const + TChangeRecordBuilderContextTrait& ctx) const { - return Serialize(record); + if (!ctx.Restore) { + record.SetSourceOffset(GetOrder()); + // TODO: fill WriteTxId + + record.SetKey(ProtoBody.GetCdcDataChange().GetKey().GetData()); + + auto& upsert = *record.MutableUpsert(); + + switch (ProtoBody.GetCdcDataChange().GetRowOperationCase()) { + case NKikimrChangeExchange::TDataChange::kUpsert: + *upsert.MutableTags() = { + ProtoBody.GetCdcDataChange().GetUpsert().GetTags().begin(), + ProtoBody.GetCdcDataChange().GetUpsert().GetTags().end()}; + upsert.SetData(ProtoBody.GetCdcDataChange().GetUpsert().GetData()); + break; + case NKikimrChangeExchange::TDataChange::kErase: { + size_t size = Schema->ValueColumns.size(); + TVector tags; + TVector cells; + + tags.reserve(size); + cells.reserve(size); + + for (const auto& [name, value] : Schema->ValueColumns) { + tags.push_back(value.Tag); + if (name != "__incrBackupImpl_deleted") { + cells.emplace_back(); + } else { + cells.emplace_back(TCell::Make(true)); + } + } + + *upsert.MutableTags() = {tags.begin(), tags.end()}; + upsert.SetData(TSerializedCellVec::Serialize(cells)); + + break; + } + case NKikimrChangeExchange::TDataChange::kReset: + default: + Y_FAIL_S("Unexpected row operation: " << static_cast(ProtoBody.GetCdcDataChange().GetRowOperationCase())); + } + } else { + record.SetSourceOffset(GetOrder()); + record.SetKey(ProtoBody.GetCdcDataChange().GetKey().GetData()); + + switch (ProtoBody.GetCdcDataChange().GetRowOperationCase()) { + case NKikimrChangeExchange::TDataChange::kUpsert: { + auto& upsert = *record.MutableUpsert(); + *upsert.MutableTags() = { + ProtoBody.GetCdcDataChange().GetUpsert().GetTags().begin(), + ProtoBody.GetCdcDataChange().GetUpsert().GetTags().end()}; + upsert.SetData(ProtoBody.GetCdcDataChange().GetUpsert().GetData()); + break; + } + case NKikimrChangeExchange::TDataChange::kErase: + record.MutableErase(); + break; + case NKikimrChangeExchange::TDataChange::kReset: + default: + Y_FAIL_S("Unexpected row operation: " << static_cast(ProtoBody.GetCdcDataChange().GetRowOperationCase())); + } + + } } ui64 ResolvePartitionId(NChangeExchange::IChangeSenderResolver* const resolver) const override { @@ -156,8 +226,8 @@ Y_DECLARE_OUT_SPEC(inline, NKikimr::NBackup::NImpl::TChangeRecord::TPtr, out, va namespace NKikimr::NBackup::NImpl { -IActor* CreateLocalTableWriter(const TPathId& tablePathId) { - return new NReplication::NService::TLocalTableWriter(tablePathId); +IActor* CreateLocalTableWriter(const TPathId& tablePathId, bool restore) { + return new NReplication::NService::TLocalTableWriter(tablePathId, restore); } } diff --git a/ydb/core/backup/impl/table_writer.h b/ydb/core/backup/impl/table_writer.h index e5ca1dbc60b1..5c3611ccf576 100644 --- a/ydb/core/backup/impl/table_writer.h +++ b/ydb/core/backup/impl/table_writer.h @@ -8,6 +8,6 @@ namespace NKikimr { namespace NKikimr::NBackup::NImpl { -IActor* CreateLocalTableWriter(const TPathId& tablePathId); +IActor* CreateLocalTableWriter(const TPathId& tablePathId, bool restore = false); } diff --git a/ydb/core/persqueue/offload_actor.cpp b/ydb/core/persqueue/offload_actor.cpp index 898e89074fcb..10ceb15159c2 100644 --- a/ydb/core/persqueue/offload_actor.cpp +++ b/ydb/core/persqueue/offload_actor.cpp @@ -65,10 +65,12 @@ class TOffloadActor auto* workerActor = CreateWorker( SelfId(), [=]() -> IActor* { return NBackup::NImpl::CreateLocalPartitionReader(ParentTablet, Partition); }, - [=]() -> IActor* { return NBackup::NImpl::CreateLocalTableWriter( - Config.HasIncrementalBackup() - ? PathIdFromPathId(Config.GetIncrementalBackup().GetDstPathId()) - : PathIdFromPathId(Config.GetIncrementalRestore().GetDstPathId())); + [=]() -> IActor* { + if (Config.HasIncrementalBackup()) { + return NBackup::NImpl::CreateLocalTableWriter(PathIdFromPathId(Config.GetIncrementalBackup().GetDstPathId())); + } else { + return NBackup::NImpl::CreateLocalTableWriter(PathIdFromPathId(Config.GetIncrementalRestore().GetDstPathId()), true); + } }); Worker = TActivationContext::Register(workerActor); diff --git a/ydb/core/tx/datashard/cdc_stream_scan.cpp b/ydb/core/tx/datashard/cdc_stream_scan.cpp index 40c5ad11ca04..ebc1a09d4a1d 100644 --- a/ydb/core/tx/datashard/cdc_stream_scan.cpp +++ b/ydb/core/tx/datashard/cdc_stream_scan.cpp @@ -208,7 +208,7 @@ class TDataShard::TTxCdcStreamScanProgress return updates; } - static TVector MakeRestoreUpdates(TArrayRef cells, TArrayRef tags, TUserTable::TCPtr table) { + static std::optional> MakeRestoreUpdates(TArrayRef cells, TArrayRef tags, TUserTable::TCPtr table) { Y_ABORT_UNLESS(cells.size() >= 1); TVector updates(::Reserve(cells.size() - 1)); @@ -219,6 +219,10 @@ class TDataShard::TTxCdcStreamScanProgress auto it = table->Columns.find(tag); Y_ABORT_UNLESS(it != table->Columns.end()); if (it->second.Name == "__incrBackupImpl_deleted") { + // FIXME assert + if (const auto& cell = cells.at(pos); !cell.IsNull() && cell.AsValue()) { + return std::nullopt; + } skipped = true; continue; } @@ -328,9 +332,15 @@ class TDataShard::TTxCdcStreamScanProgress case NKikimrSchemeOp::ECdcStreamModeUpdate: Serialize(body, ERowOp::Upsert, key, keyTags, MakeUpdates(v.GetCells(), valueTags, table)); break; - case NKikimrSchemeOp::ECdcStreamModeRestoreIncBackup: - Serialize(body, ERowOp::Upsert, key, keyTags, MakeRestoreUpdates(v.GetCells(), valueTags, table)); + case NKikimrSchemeOp::ECdcStreamModeRestoreIncBackup: { + auto updates = MakeRestoreUpdates(v.GetCells(), valueTags, table); + if (updates) { + Serialize(body, ERowOp::Upsert, key, keyTags, *updates); + } else { + Serialize(body, ERowOp::Erase, key, keyTags, {}); + } break; + } case NKikimrSchemeOp::ECdcStreamModeNewImage: case NKikimrSchemeOp::ECdcStreamModeNewAndOldImages: { const auto newImage = MakeRow(v.GetCells()); diff --git a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp index c157868f019b..0a95fbc8dac2 100644 --- a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp @@ -59,7 +59,7 @@ namespace { return result; } - auto MakeRestoreUpdates(TArrayRef cells, TArrayRef tags, TArrayRef types, TUserTable::TCPtr table) { + std::optional> MakeRestoreUpdates(TArrayRef cells, TArrayRef tags, TArrayRef types, TUserTable::TCPtr table) { Y_ABORT_UNLESS(cells.size() >= 1); TVector result(::Reserve(cells.size() - 1)); @@ -72,6 +72,10 @@ namespace { auto it = table->Columns.find(tag); Y_ABORT_UNLESS(it != table->Columns.end()); if (it->second.Name == "__incrBackupImpl_deleted") { + // FIXME assert + if (const auto& cell = cells.at(pos); !cell.IsNull() && cell.AsValue()) { + return std::nullopt; + } skipped = true; continue; } @@ -251,9 +255,15 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop, case NKikimrSchemeOp::ECdcStreamModeUpdate: Persist(tableId, pathId, ERowOp::Upsert, key, keyTags, MakeUpdates(**initialState, valueTags, valueTypes)); break; - case NKikimrSchemeOp::ECdcStreamModeRestoreIncBackup: - Persist(tableId, pathId, ERowOp::Upsert, key, keyTags, MakeRestoreUpdates(**initialState, valueTags, valueTypes, userTable)); + case NKikimrSchemeOp::ECdcStreamModeRestoreIncBackup: { + auto updates = MakeRestoreUpdates(**initialState, valueTags, valueTypes, userTable); + if (updates) { + Persist(tableId, pathId, ERowOp::Upsert, key, keyTags, *updates); + } else { + Persist(tableId, pathId, ERowOp::Erase, key, keyTags, {}); + } break; + } case NKikimrSchemeOp::ECdcStreamModeNewImage: case NKikimrSchemeOp::ECdcStreamModeNewAndOldImages: Persist(tableId, pathId, ERowOp::Upsert, key, keyTags, nullptr, &*initialState, valueTags); diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h index 0366fd2c77db..99fddc9e5bb8 100644 --- a/ydb/core/tx/datashard/change_record.h +++ b/ydb/core/tx/datashard/change_record.h @@ -37,6 +37,9 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase { const TPathId& GetTableId() const { return TableId; } ui64 GetSchemaVersion() const { return SchemaVersion; } TUserTable::TCPtr GetSchema() const { return Schema; } + TString GetSourceId() const { + return SourceId; + } void Serialize(NKikimrChangeExchange::TChangeRecord& record) const; @@ -100,6 +103,7 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase { ui64 LockId = 0; ui64 LockOffset = 0; TPathId PathId; + TString SourceId; ui64 SchemaVersion; TPathId TableId; @@ -165,6 +169,10 @@ class TChangeRecordBuilder: public NChangeExchange::TChangeRecordBuilder(*this); } + TSelf& WithSourceId(const TString& sourceId) { + GetRecord()->SourceId = sourceId; + return static_cast(*this); + } }; // TChangeRecordBuilder } diff --git a/ydb/core/tx/replication/service/json_change_record.h b/ydb/core/tx/replication/service/json_change_record.h index f94091d02ce3..b43b5565f3a8 100644 --- a/ydb/core/tx/replication/service/json_change_record.h +++ b/ydb/core/tx/replication/service/json_change_record.h @@ -139,6 +139,10 @@ struct TChangeRecordBuilderContextTrait { TChangeRecordBuilderContextTrait() : MemoryPool(256) {} + + TChangeRecordBuilderContextTrait(const TChangeRecordBuilderContextTrait&) + : MemoryPool(256) + {}; }; } diff --git a/ydb/core/tx/replication/service/table_writer_impl.h b/ydb/core/tx/replication/service/table_writer_impl.h index 1fd77232fb2d..cf2c4f4ce8c8 100644 --- a/ydb/core/tx/replication/service/table_writer_impl.h +++ b/ydb/core/tx/replication/service/table_writer_impl.h @@ -178,10 +178,15 @@ class TTablePartitionWriter: public TActorBootstrapped builderContext) : Parent(parent) , TabletId(tabletId) , TableId(tableId) + , BuilderContext(builderContext) { } @@ -206,7 +211,6 @@ class TTablePartitionWriter: public TActorBootstrapped BuilderContext; - }; // TTablePartitionWriter template @@ -430,7 +434,11 @@ class TLocalTableWriter } IActor* CreateSender(ui64 partitionId) const override { - return new TTablePartitionWriter(this->SelfId(), partitionId, TTableId(this->PathId, Schema->Version)); + return new TTablePartitionWriter( + this->SelfId(), + partitionId, + TTableId(this->PathId, Schema->Version), + BuilderContext); } const TVector& GetPartitions() const override { return KeyDesc->GetPartitions(); } @@ -521,9 +529,11 @@ class TLocalTableWriter return NKikimrServices::TActivity::REPLICATION_LOCAL_TABLE_WRITER; } - explicit TLocalTableWriter(const TPathId& tablePathId) + template + explicit TLocalTableWriter(const TPathId& tablePathId, TArgs... args) : TBase(&TThis::StateWork) , TBaseSender(this, this, this, TActorId(), tablePathId) + , BuilderContext(args...) { } @@ -554,6 +564,7 @@ class TLocalTableWriter TMap PendingRecords; + TChangeRecordBuilderContextTrait BuilderContext; }; // TLocalTableWriter } // namespace NKikimr::NReplication::NService From e27946e659cb562b6e0ba1005429c3d979dcebea Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Thu, 4 Jul 2024 18:17:00 +0000 Subject: [PATCH 6/9] remove hack --- ydb/core/backup/impl/table_writer.cpp | 12 +++++-- ydb/core/scheme/scheme_tablecell.cpp | 34 +++++++++++++++++++ ydb/core/scheme/scheme_tablecell.h | 3 ++ .../tx/datashard/datashard_repl_apply.cpp | 4 +-- 4 files changed, 49 insertions(+), 4 deletions(-) diff --git a/ydb/core/backup/impl/table_writer.cpp b/ydb/core/backup/impl/table_writer.cpp index 4df5e1d6fdd1..b4708e07e7e7 100644 --- a/ydb/core/backup/impl/table_writer.cpp +++ b/ydb/core/backup/impl/table_writer.cpp @@ -64,12 +64,20 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase { auto& upsert = *record.MutableUpsert(); switch (ProtoBody.GetCdcDataChange().GetRowOperationCase()) { - case NKikimrChangeExchange::TDataChange::kUpsert: + case NKikimrChangeExchange::TDataChange::kUpsert: { *upsert.MutableTags() = { ProtoBody.GetCdcDataChange().GetUpsert().GetTags().begin(), ProtoBody.GetCdcDataChange().GetUpsert().GetTags().end()}; - upsert.SetData(ProtoBody.GetCdcDataChange().GetUpsert().GetData()); + auto it = Schema->ValueColumns.find("__incrBackupImpl_deleted"); + upsert.AddTags(it->second.Tag); + + TString serializedCellVec = ProtoBody.GetCdcDataChange().GetUpsert().GetData(); + const TCell cell = TCell::Make(false); + Y_ABORT_UNLESS(TSerializedCellVec::UnsafeAppendCell(cell, serializedCellVec), "uh-oh"); + + upsert.SetData(serializedCellVec); break; + } case NKikimrChangeExchange::TDataChange::kErase: { size_t size = Schema->ValueColumns.size(); TVector tags; diff --git a/ydb/core/scheme/scheme_tablecell.cpp b/ydb/core/scheme/scheme_tablecell.cpp index 2ee86265b1a4..e1acc55e099e 100644 --- a/ydb/core/scheme/scheme_tablecell.cpp +++ b/ydb/core/scheme/scheme_tablecell.cpp @@ -238,6 +238,40 @@ namespace { } } + +bool TSerializedCellVec::UnsafeAppendCell(const TCell& cell, TString& serializedCellVec) { + const char* buf = serializedCellVec.data(); + const char* bufEnd = serializedCellVec.data() + serializedCellVec.size(); + + if (!serializedCellVec) { + TSerializedCellVec::Serialize(serializedCellVec, {cell}); + return true; + } + + if (Y_UNLIKELY(bufEnd - buf < static_cast(sizeof(ui16)))) { + return false; + } + + ui16 cellCount = ReadUnaligned(buf); + cellCount += 1; + + serializedCellVec.ReserveAndResize(serializedCellVec.size() + sizeof(ui32) + cell.Size()); + + char* mutableBuf = serializedCellVec.Detach(); + char* oldBufEnd = mutableBuf + (bufEnd - buf); + + WriteUnaligned(mutableBuf, cellCount); + + TCellHeader header(cell.Size(), cell.IsNull()); + WriteUnaligned(oldBufEnd, header.RawValue); + oldBufEnd += sizeof(header); + if (cell.Size() > 0) { + cell.CopyDataInto(oldBufEnd); + } + + return true; +} + TSerializedCellVec::TSerializedCellVec(TConstArrayRef cells) { SerializeCellVec(cells, Buf, &Cells); diff --git a/ydb/core/scheme/scheme_tablecell.h b/ydb/core/scheme/scheme_tablecell.h index b7635a424725..ef61969652f8 100644 --- a/ydb/core/scheme/scheme_tablecell.h +++ b/ydb/core/scheme/scheme_tablecell.h @@ -552,6 +552,9 @@ class TSerializedCellVec { return std::move(Buf); } + // read headers, assuming the buf is correct and append additional cell to the end + static bool UnsafeAppendCell(const TCell& cell, TString& serializedCellVec); + private: bool DoTryParse(const TString& data); diff --git a/ydb/core/tx/datashard/datashard_repl_apply.cpp b/ydb/core/tx/datashard/datashard_repl_apply.cpp index cc01c35ce815..97859b0cc774 100644 --- a/ydb/core/tx/datashard/datashard_repl_apply.cpp +++ b/ydb/core/tx/datashard/datashard_repl_apply.cpp @@ -207,8 +207,8 @@ class TDataShard::TTxApplyReplicationChanges : public TTransactionBase( NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED, From 959fd0c725d8899bd01189fd415633d409eaedd6 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Thu, 4 Jul 2024 18:18:28 +0000 Subject: [PATCH 7/9] remove duct tape --- ydb/core/tx/datashard/change_record.h | 9 --------- 1 file changed, 9 deletions(-) diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h index 99fddc9e5bb8..4e97b872a898 100644 --- a/ydb/core/tx/datashard/change_record.h +++ b/ydb/core/tx/datashard/change_record.h @@ -37,9 +37,6 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase { const TPathId& GetTableId() const { return TableId; } ui64 GetSchemaVersion() const { return SchemaVersion; } TUserTable::TCPtr GetSchema() const { return Schema; } - TString GetSourceId() const { - return SourceId; - } void Serialize(NKikimrChangeExchange::TChangeRecord& record) const; @@ -103,7 +100,6 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase { ui64 LockId = 0; ui64 LockOffset = 0; TPathId PathId; - TString SourceId; ui64 SchemaVersion; TPathId TableId; @@ -168,11 +164,6 @@ class TChangeRecordBuilder: public NChangeExchange::TChangeRecordBuilderSchema = schema; return static_cast(*this); } - - TSelf& WithSourceId(const TString& sourceId) { - GetRecord()->SourceId = sourceId; - return static_cast(*this); - } }; // TChangeRecordBuilder } From fc32f6b219077ef9dde4e7525bf5f0b607599e7a Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Thu, 4 Jul 2024 18:18:56 +0000 Subject: [PATCH 8/9] remove duct tape --- ydb/core/tx/datashard/change_record.h | 1 + 1 file changed, 1 insertion(+) diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h index 4e97b872a898..0366fd2c77db 100644 --- a/ydb/core/tx/datashard/change_record.h +++ b/ydb/core/tx/datashard/change_record.h @@ -164,6 +164,7 @@ class TChangeRecordBuilder: public NChangeExchange::TChangeRecordBuilderSchema = schema; return static_cast(*this); } + }; // TChangeRecordBuilder } From 8c1c666ba81cd0f277e77f1a5bba8b20a94a916a Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Fri, 5 Jul 2024 10:29:58 +0000 Subject: [PATCH 9/9] remove duct tape --- ydb/core/protos/flat_scheme_op.proto | 1 + ydb/core/protos/tx_datashard.proto | 1 + .../tx/datashard/datashard_repl_apply.cpp | 53 ++++++++++--------- .../tx/datashard/datashard_user_table.cpp | 4 ++ ...tion_create_restore_incremental_backup.cpp | 20 +++++++ ydb/core/tx/schemeshard/schemeshard_impl.cpp | 2 + .../tx/schemeshard/schemeshard_info_types.cpp | 2 + 7 files changed, 58 insertions(+), 25 deletions(-) diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 4f2c30a63b4e..4257227fb405 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -344,6 +344,7 @@ message TTableReplicationConfig { enum EReplicationMode { REPLICATION_MODE_NONE = 0; REPLICATION_MODE_READ_ONLY = 1; + REPLICATION_MODE_RESTORE_INCREMENTAL_BACKUP = 2; } enum EConsistency { diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index b5380f7032fb..b864ef45e426 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -1854,6 +1854,7 @@ message TEvApplyReplicationChangesResult { REASON_SCHEME_ERROR = 2; REASON_BAD_REQUEST = 3; REASON_UNEXPECTED_ROW_OPERATION = 4; + REASON_OUTDATED_SCHEME = 5; } optional EStatus Status = 1; diff --git a/ydb/core/tx/datashard/datashard_repl_apply.cpp b/ydb/core/tx/datashard/datashard_repl_apply.cpp index 97859b0cc774..54bb72f3667e 100644 --- a/ydb/core/tx/datashard/datashard_repl_apply.cpp +++ b/ydb/core/tx/datashard/datashard_repl_apply.cpp @@ -31,12 +31,13 @@ class TDataShard::TTxApplyReplicationChanges : public TTransactionBaseIsReplicated()) { - // Result = MakeHolder( - // NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED, - // NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_BAD_REQUEST); - // return true; - // } + if (!Self->IsReplicated()) { + Result = MakeHolder( + NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED, + NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_BAD_REQUEST, + TStringBuilder() << "table is not replicated"); + return true; + } const auto& msg = Ev->Get()->Record; @@ -65,7 +66,9 @@ class TDataShard::TTxApplyReplicationChanges : public TTransactionBase( NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED, - NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_SCHEME_ERROR, + tableId.GetSchemaVersion() < userTable.GetTableSchemaVersion() + ? NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_OUTDATED_SCHEME + : NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_SCHEME_ERROR, std::move(error)); return true; } @@ -102,29 +105,29 @@ class TDataShard::TTxApplyReplicationChanges : public TTransactionBase( - // NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED, - // NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_BAD_REQUEST, - // "WriteTxId cannot be specified for weak consistency"); - // return false; - // } - // } else { - // if (writeTxId == 0) { - // Result = MakeHolder( - // NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED, - // NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_BAD_REQUEST, - // "Non-zero WriteTxId must be specified for strong consistency"); - // return false; - // } - // } + if (userTable.ReplicationConfig.HasWeakConsistency()) { + if (writeTxId) { + Result = MakeHolder( + NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED, + NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_BAD_REQUEST, + "WriteTxId cannot be specified for weak consistency"); + return false; + } + } else { + if (writeTxId == 0) { + Result = MakeHolder( + NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED, + NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_BAD_REQUEST, + "Non-zero WriteTxId must be specified for strong consistency"); + return false; + } + } TSerializedCellVec keyCellVec; if (!TSerializedCellVec::TryParse(change.GetKey(), keyCellVec) || diff --git a/ydb/core/tx/datashard/datashard_user_table.cpp b/ydb/core/tx/datashard/datashard_user_table.cpp index 8584c716597c..12122dad6afd 100644 --- a/ydb/core/tx/datashard/datashard_user_table.cpp +++ b/ydb/core/tx/datashard/datashard_user_table.cpp @@ -490,6 +490,10 @@ void TUserTable::ApplyAlter( } } + if (delta.HasReplicationConfig()) { + ReplicationConfig = delta.GetReplicationConfig(); + } + // Most settings are applied to both main and shadow table TStackVec tids; tids.push_back(LocalTid); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp index 065891477e7a..aa7dcfb011e4 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp @@ -96,6 +96,24 @@ void DoCreatePqPart( result.push_back(CreateNewPQ(NextPartId(opId, result), outTx)); } +void DoCreateAlterTable( + const TOperationId& opId, + const TPath& dstTablePath, + TVector& result) +{ + auto outTx = TransactionTemplate(dstTablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTable); + auto& desc = *outTx.MutableAlterTable(); + + PathIdFromPathId(dstTablePath.Base()->PathId, desc.MutablePathId()); + + auto& replicationConfig = *desc.MutableReplicationConfig(); + replicationConfig.SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_RESTORE_INCREMENTAL_BACKUP); + replicationConfig.SetConsistency(NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK); + + result.push_back(CreateAlterTable(NextPartId(opId, result), outTx)); +} + + TVector CreateRestoreIncrementalBackup(TOperationId opId, const TTxTransaction& tx, TOperationContext& context) { Y_ABORT_UNLESS(tx.GetOperationType() == NKikimrSchemeOp::EOperationType::ESchemeOpRestoreIncrementalBackup); @@ -156,6 +174,8 @@ TVector CreateRestoreIncrementalBackup(TOperationId opId, c DoCreateLock(opId, workingDirPath, srcTablePath, false, result); + DoCreateAlterTable(opId, dstTablePath, result); + NCdc::DoCreateStream(createCdcStreamOp, opId, workingDirPath, srcTablePath, acceptExisted, true, {}, result); DoCreatePqPart( opId, diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 3d5d1c2783dc..26a27e59c749 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -6541,6 +6541,8 @@ TString TSchemeShard::FillAlterTableTxBody(TPathId pathId, TShardIdx shardIdx, T *patch); } + proto->MutableReplicationConfig()->CopyFrom(alterData->TableDescriptionFull->GetReplicationConfig()); + TString txBody; Y_PROTOBUF_SUPPRESS_NODISCARD tx.SerializeToString(&txBody); return txBody; diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp index 072cae7d0553..e21dc2f1f5f4 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp @@ -532,6 +532,8 @@ TTableInfo::TAlterDataPtr TTableInfo::CreateAlterData( return nullptr; } break; + case NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_RESTORE_INCREMENTAL_BACKUP: + break; default: errStr = "Unknown replication mode"; return nullptr;