From 636f707805f04c56f36c9737708f260b99b43841 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Fri, 31 Oct 2025 00:52:20 +0000 Subject: [PATCH 01/15] WIP --- ydb/core/kqp/host/kqp_gateway_proxy.cpp | 3 +- ydb/core/kqp/provider/yql_kikimr_exec.cpp | 7 + ydb/core/kqp/provider/yql_kikimr_gateway.h | 3 +- ydb/core/kqp/provider/yql_kikimr_type_ann.cpp | 1 + ydb/core/protos/flat_scheme_op.proto | 2 +- .../datashard/change_collector_cdc_stream.cpp | 30 +- .../tx/datashard/create_cdc_stream_unit.cpp | 8 + ydb/core/tx/datashard/datashard_impl.h | 12 +- .../datashard_ut_incremental_backup.cpp | 636 ++++++++++++++++++ .../schemeshard__backup_collection_common.cpp | 49 ++ ...chemeshard__operation_alter_cdc_stream.cpp | 9 +- ...rd__operation_backup_backup_collection.cpp | 68 ++ ...n_backup_incremental_backup_collection.cpp | 68 ++ ...hemeshard__operation_common_cdc_stream.cpp | 38 +- ...hemeshard__operation_create_cdc_stream.cpp | 11 +- ...schemeshard__operation_drop_cdc_stream.cpp | 9 +- .../schemeshard_path_describer.cpp | 5 + .../ut_backup_collection.cpp | 613 +++++++++++++++++ 18 files changed, 1541 insertions(+), 31 deletions(-) diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp index 5d9ffa1aea0f..f51127ea03bd 100644 --- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp +++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp @@ -1368,7 +1368,8 @@ class TKqpGatewayProxy : public IKikimrGateway { op.SetPrefix(settings.Prefix); if (settings.Settings.IncrementalBackupEnabled) { - op.MutableIncrementalBackupConfig(); + auto* config = op.MutableIncrementalBackupConfig(); + config->SetOmitIndexes(settings.Settings.OmitIndexes); } auto errOpt = std::visit( diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index a66b5b9c2a8b..9c5eb19fc7f4 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -1021,6 +1021,13 @@ namespace { "INCREMENTAL_BACKUP_ENABLED must be true or false")); return false; } + } else if (name == "omit_indexes") { + auto value = ToString(setting.Value().Cast().Literal().Cast().Value()); + if (!TryFromString(value, dstSettings.OmitIndexes)) { + ctx.AddError(TIssue(ctx.GetPosition(pos), + "OMIT_INDEXES must be true or false")); + return false; + } } else if (name == "storage") { auto value = ToString(setting.Value().Cast().Literal().Cast().Value()); if (to_lower(value) != "cluster") { diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h index 67f252f3e7a6..63494ce2747c 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.h +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h @@ -1052,7 +1052,8 @@ struct TAnalyzeSettings { }; struct TBackupCollectionSettings { - bool IncrementalBackupEnabled; + bool IncrementalBackupEnabled = false; + bool OmitIndexes = false; }; struct TCreateBackupCollectionSettings { diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp index 9c1c88e2b47b..49336ab9ba4b 100644 --- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp @@ -2448,6 +2448,7 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over const THashSet supportedSettings = { "incremental_backup_enabled", "storage", + "omit_indexes", }; if (!CheckBackupCollectionSettings(node.BackupCollectionSettings(), supportedSettings, ctx)) { diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index fc045eb78f03..9e2708fdcac2 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -2312,7 +2312,7 @@ message TBackupCollectionDescription { } message TIncrementalBackupConfig { - + optional bool OmitIndexes = 1 [default = false]; } oneof Entries { diff --git a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp index 4dbff2ca29f3..136a589200cc 100644 --- a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp @@ -123,8 +123,15 @@ bool TCdcStreamChangeCollector::NeedToReadKeys() const { } bool value = false; - for (const auto& [_, tableInfo] : Self->GetUserTables()) { - for (const auto& [_, streamInfo] : tableInfo->CdcStreams) { + const auto& userTables = Self->GetUserTables(); + Cerr << "CDC_DEBUG: NeedToReadKeys checking " << userTables.size() << " user tables" << Endl; + for (const auto& [tableLocalPathId, tableInfo] : userTables) { + Cerr << "CDC_DEBUG: Table LocalPathId=" << tableLocalPathId + << " has " << tableInfo->CdcStreams.size() << " CDC streams" << Endl; + for (const auto& [streamPathId, streamInfo] : tableInfo->CdcStreams) { + Cerr << "CDC_DEBUG: Stream PathId=" << streamPathId + << " State=" << static_cast(streamInfo.State) + << " Mode=" << static_cast(streamInfo.Mode) << Endl; if (streamInfo.State == NKikimrSchemeOp::ECdcStreamStateDisabled) { continue; } @@ -145,15 +152,29 @@ bool TCdcStreamChangeCollector::NeedToReadKeys() const { } CachedNeedToReadKeys = value; + Cerr << "CDC_DEBUG: NeedToReadKeys returning " << value << Endl; return *CachedNeedToReadKeys; } bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop, TArrayRef key, TArrayRef updates) { - Y_ENSURE(Self->IsUserTable(tableId), "Unknown table: " << tableId); + Cerr << "CDC_DEBUG: Collect called for TableId OwnerId=" << tableId.PathId.OwnerId + << " LocalPathId=" << tableId.PathId.LocalPathId + << " RowOp=" << static_cast(rop) << Endl; + + if (!Self->IsUserTable(tableId)) { + const auto& userTables = Self->GetUserTables(); + Cerr << "CDC_DEBUG: IsUserTable returned FALSE! TableId not in UserTables map." << Endl; + Cerr << "CDC_DEBUG: UserTables contains " << userTables.size() << " tables:" << Endl; + for (const auto& [localPathId, _] : userTables) { + Cerr << "CDC_DEBUG: LocalPathId=" << localPathId << Endl; + } + Y_ENSURE(false, "Unknown table: " << tableId); + } auto userTable = Self->GetUserTables().at(tableId.PathId.LocalPathId); + Cerr << "CDC_DEBUG: Found user table with " << userTable->CdcStreams.size() << " CDC streams" << Endl; const auto& keyTags = userTable->KeyColumnIds; const auto& keyTypes = userTable->KeyColumnTypes; const auto valueTags = MakeValueTags(userTable->Columns); @@ -173,6 +194,9 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop, } for (const auto& [pathId, stream] : userTable->CdcStreams) { + Cerr << "CDC_DEBUG: Processing CDC stream PathId=" << pathId + << " State=" << static_cast(stream.State) + << " Mode=" << static_cast(stream.Mode) << Endl; TMaybe initialState; TMaybe snapshotVersion; diff --git a/ydb/core/tx/datashard/create_cdc_stream_unit.cpp b/ydb/core/tx/datashard/create_cdc_stream_unit.cpp index 2b5f02812aaa..d13fb4eeac28 100644 --- a/ydb/core/tx/datashard/create_cdc_stream_unit.cpp +++ b/ydb/core/tx/datashard/create_cdc_stream_unit.cpp @@ -43,9 +43,17 @@ class TCreateCdcStreamUnit : public TExecutionUnit { const auto version = params.GetTableSchemaVersion(); Y_ENSURE(version); + Cerr << "CDC_DEBUG: CreateCdcStreamUnit creating CDC stream on table PathId=" << pathId + << " (OwnerId=" << pathId.OwnerId << " LocalPathId=" << pathId.LocalPathId << ")" + << " streamPathId=" << streamPathId + << " State=" << static_cast(streamDesc.GetState()) + << " Mode=" << static_cast(streamDesc.GetMode()) << Endl; + auto tableInfo = DataShard.AlterTableAddCdcStream(ctx, txc, pathId, version, streamDesc); TDataShardLocksDb locksDb(DataShard, txc); DataShard.AddUserTable(pathId, tableInfo, &locksDb); + + Cerr << "CDC_DEBUG: Added table to TableInfos with " << tableInfo->CdcStreams.size() << " CDC streams" << Endl; if (tableInfo->NeedSchemaSnapshots()) { DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx); diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index af995803df29..6fe413cde3f6 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1700,8 +1700,16 @@ class TDataShard } bool IsUserTable(const TTableId& tableId) const { - return (TableInfos.find(tableId.PathId.LocalPathId) != TableInfos.end()) - && !TSysTables::IsSystemTable(tableId); + bool inTableInfos = TableInfos.find(tableId.PathId.LocalPathId) != TableInfos.end(); + bool isSystemTable = TSysTables::IsSystemTable(tableId); + bool result = inTableInfos && !isSystemTable; + if (!result) { + Cerr << "CDC_DEBUG: IsUserTable returning FALSE for TableId OwnerId=" << tableId.PathId.OwnerId + << " LocalPathId=" << tableId.PathId.LocalPathId + << " inTableInfos=" << inTableInfos + << " isSystemTable=" << isSystemTable << Endl; + } + return result; } const THashMap &GetUserTables() const { return TableInfos; } diff --git a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp index 646aea407ba7..42074e0d83aa 100644 --- a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp +++ b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp @@ -211,6 +211,63 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { return proto; } + TString FindIncrementalBackupDir(TTestActorRuntime& runtime, const TActorId& sender, const TString& collectionPath) { + auto request = MakeHolder(); + request->Record.MutableDescribePath()->SetPath(collectionPath); + request->Record.MutableDescribePath()->MutableOptions()->SetReturnChildren(true); + runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release())); + auto reply = runtime.GrabEdgeEventRethrow(sender); + + UNIT_ASSERT_EQUAL(reply->Get()->GetRecord().GetStatus(), NKikimrScheme::EStatus::StatusSuccess); + + const auto& pathDescription = reply->Get()->GetRecord().GetPathDescription(); + for (ui32 i = 0; i < pathDescription.ChildrenSize(); ++i) { + const auto& child = pathDescription.GetChildren(i); + if (child.GetName().EndsWith("_incremental")) { + return child.GetName(); + } + } + return ""; + } + + struct TCdcMetadata { + bool IsDelete; + TVector UpdatedColumns; + TVector ErasedColumns; + }; + + TCdcMetadata ParseCdcMetadata(const TString& bytesValue) { + TCdcMetadata result; + result.IsDelete = false; + + // The bytes contain protobuf-encoded CDC metadata + // For Update mode CDC: + // - Updates have \020\000 (indicating value columns present) + // - Deletes have \020\001 (indicating erase operation) + + if (bytesValue.find("\020\001") != TString::npos) { + result.IsDelete = true; + } + + // Parse column tags from the metadata + // Format: \010\020 + for (size_t i = 0; i < bytesValue.size(); ++i) { + if (bytesValue[i] == '\010' && i + 1 < bytesValue.size()) { + ui32 tag = static_cast(bytesValue[i + 1]); + if (i + 2 < bytesValue.size() && bytesValue[i + 2] == '\020') { + ui8 flags = i + 3 < bytesValue.size() ? static_cast(bytesValue[i + 3]) : 0; + if (flags & 1) { + result.ErasedColumns.push_back(tag); + } else { + result.UpdatedColumns.push_back(tag); + } + } + } + } + + return result; + } + NKikimrChangeExchange::TChangeRecord MakeUpsertPartial(ui32 key, ui32 value, const TVector& tags = {2}) { auto keyCell = TCell::Make(key); auto valueCell = TCell::Make(value); @@ -2519,6 +2576,585 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { } } + Y_UNIT_TEST(IncrementalBackupWithIndexes) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + .SetEnableBackupService(true) + .SetEnableRealSystemViewPaths(false) + .SetEnableDataColumnForIndexTable(true) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + + // Create table with one global sync index + TShardedTableOptions opts; + opts.Columns({ + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false} + }); + opts.Indexes({ + TShardedTableOptions::TIndex{"ByValue", {"value"}, {}, NKikimrSchemeOp::EIndexTypeGlobal} + }); + CreateShardedTable(server, edgeActor, "/Root", "Table", opts); + + // Create backup collection with incremental enabled + ExecSQL(server, edgeActor, R"( + CREATE BACKUP COLLECTION `MyCollection` + ( TABLE `/Root/Table` + ) + WITH + ( STORAGE = 'cluster' + , INCREMENTAL_BACKUP_ENABLED = 'true' + ); + )", false); + + // Perform full backup (creates CDC streams on main table and index tables) + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection`;)", false); + + // Wait for CDC streams to be fully activated on all tables (including index tables) + SimulateSleep(server, TDuration::Seconds(1)); + + // Debug: Verify CDC streams are created on both main table and index table + Cerr << "CDC_DEBUG: Checking CDC streams after full backup..." << Endl; + auto mainTableCdcCheck = Navigate(runtime, edgeActor, "/Root/Table"); + Cerr << "CDC_DEBUG: Main table Navigate result Status=" << mainTableCdcCheck->ResultSet.size() << " entries" << Endl; + + auto indexTableCdcCheck = Navigate(runtime, edgeActor, "/Root/Table/ByValue/indexImplTable"); + Cerr << "CDC_DEBUG: Index impl table Navigate result Status=" << indexTableCdcCheck->ResultSet.size() << " entries" << Endl; + + // Insert initial data + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 100) + , (2, 200) + , (3, 300) + ; + )"); + + // Update row: (2, 200) -> (2, 250) + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES (2, 250); + )"); + + // Delete row: (3, 300) + ExecSQL(server, edgeActor, R"(DELETE FROM `/Root/Table` WHERE key=3;)"); + + // Insert new row: (4, 400) + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES (4, 400); + )"); + + // Debug: Verify index table has data before backup + auto indexDataBeforeBackup = KqpSimpleExec(runtime, R"( + SELECT value, key FROM `/Root/Table/ByValue/indexImplTable` + ORDER BY value + )"); + Cerr << "CDC_DEBUG: Index table data BEFORE incremental backup: " << indexDataBeforeBackup << Endl; + + // Debug: Check which tables are in datashard TableInfos + Cerr << "CDC_DEBUG: About to wait for CDC to capture changes..." << Endl; + + // Wait for CDC streams to capture all changes (including on index tables) + SimulateSleep(server, TDuration::Seconds(1)); + + // Take incremental backup + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection` INCREMENTAL;)", false); + + // Wait for backup operation to complete and CDC offload (including index tables) + SimulateSleep(server, TDuration::Seconds(10)); + + // Find the incremental backup directory (timestamp-dependent) + TString incrBackupDir = FindIncrementalBackupDir(runtime, edgeActor, "/Root/.backups/collections/MyCollection"); + UNIT_ASSERT_C(!incrBackupDir.empty(), "Could not find incremental backup directory"); + Cerr << "Found incremental backup directory: " << incrBackupDir << Endl; + + // Verify main table backup contains correct data + TString mainTablePath = TStringBuilder() << "/Root/.backups/collections/MyCollection/" << incrBackupDir << "/Table"; + auto mainTableBackup = KqpSimpleExec(runtime, TStringBuilder() << R"( + SELECT key, value FROM `)" << mainTablePath << R"(` + ORDER BY key + )"); + + // Should contain: (1, NULL) - not changed but tombstone or no entry, (2, 250), (3, NULL), (4, 400) + // Actually, incremental backup only contains changes, so: (2, 250), (3, NULL - tombstone), (4, 400) + Cerr << "Main table backup: " << mainTableBackup << Endl; + UNIT_ASSERT_C(mainTableBackup.find("uint32_value: 2") != TString::npos, + "Main table backup should contain updated key 2"); + UNIT_ASSERT_C(mainTableBackup.find("uint32_value: 250") != TString::npos, + "Main table backup should contain new value 250"); + UNIT_ASSERT_C(mainTableBackup.find("uint32_value: 3") != TString::npos, + "Main table backup should contain deleted key 3"); + UNIT_ASSERT_C(mainTableBackup.find("uint32_value: 4") != TString::npos, + "Main table backup should contain new key 4"); + UNIT_ASSERT_C(mainTableBackup.find("uint32_value: 400") != TString::npos, + "Main table backup should contain new value 400"); + + // Verify index backup table exists and contains correct data + TString indexBackupPath = TStringBuilder() << "/Root/.backups/collections/MyCollection/" << incrBackupDir << "/__ydb_backup_meta/indexes/Table/ByValue"; + auto indexBackup = KqpSimpleExec(runtime, TStringBuilder() << R"( + SELECT * FROM `)" << indexBackupPath << R"(` + ORDER BY value + )"); + + Cerr << "CDC_DEBUG: Index backup result: " << indexBackup << Endl; + + // Index should contain changes: + // - (200, 2) - old value deleted due to update + // - (250, 2) - new value from update + // - (300, 3) - tombstone for deleted row + // - (400, 4) - new insert + UNIT_ASSERT_C(indexBackup.find("uint32_value: 200") != TString::npos, + "Index backup should contain old value 200 (deleted)"); + UNIT_ASSERT_C(indexBackup.find("uint32_value: 250") != TString::npos, + "Index backup should contain new value 250"); + UNIT_ASSERT_C(indexBackup.find("uint32_value: 300") != TString::npos, + "Index backup should contain deleted value 300"); + UNIT_ASSERT_C(indexBackup.find("uint32_value: 400") != TString::npos, + "Index backup should contain new value 400"); + } + + Y_UNIT_TEST(IncrementalBackupWithCoveringIndex) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + .SetEnableBackupService(true) + .SetEnableRealSystemViewPaths(false) + .SetEnableDataColumnForIndexTable(true) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + + // Create table with covering index: ByAge on age, covering name + TShardedTableOptions opts; + opts.Columns({ + {"key", "Uint32", true, false}, + {"name", "Utf8", false, false}, + {"age", "Uint32", false, false}, + {"salary", "Uint32", false, false} + }); + opts.Indexes({ + TShardedTableOptions::TIndex{"ByAge", {"age"}, {"name"}, NKikimrSchemeOp::EIndexTypeGlobal} + }); + CreateShardedTable(server, edgeActor, "/Root", "Table", opts); + + // Create backup collection with incremental enabled + ExecSQL(server, edgeActor, R"( + CREATE BACKUP COLLECTION `MyCollection` + ( TABLE `/Root/Table` + ) + WITH + ( STORAGE = 'cluster' + , INCREMENTAL_BACKUP_ENABLED = 'true' + ); + )", false); + + // Perform full backup (creates CDC streams on main table and index tables) + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection`;)", false); + + // Wait for CDC streams to be fully activated on all tables (including index tables) + SimulateSleep(server, TDuration::Seconds(1)); + + // Debug: Capture index implementation table state after full backup (should be empty) + auto indexImplTableInitial = KqpSimpleExec(runtime, R"( + SELECT * FROM `/Root/Table/ByAge/indexImplTable` + )"); + + // Insert initial data + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, name, age, salary) VALUES + (1, 'Alice', 30u, 5000u) + , (2, 'Bob', 25u, 4000u) + ; + )"); + + // Debug: Capture index implementation table after initial insert + auto indexImplTableAfterInsert = KqpSimpleExec(runtime, R"( + SELECT * FROM `/Root/Table/ByAge/indexImplTable` + )"); + + // Update covered column: name changes (should appear in index) + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, name, age, salary) VALUES (1, 'Alice2', 30u, 5000u); + )"); + + // Update non-covered column: salary changes (should NOT appear in index backup as a separate change) + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, name, age, salary) VALUES (1, 'Alice2', 30u, 6000u); + )"); + + // Update indexed column: age changes (creates tombstone for old + new entry) + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, name, age, salary) VALUES (2, 'Bob', 26u, 4000u); + )"); + + // Delete row + ExecSQL(server, edgeActor, R"(DELETE FROM `/Root/Table` WHERE key=2;)"); + + // Wait for CDC streams to capture all changes (including on index tables) + SimulateSleep(server, TDuration::Seconds(1)); + + // Take incremental backup + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection` INCREMENTAL;)", false); + + // Wait for backup operation to complete and CDC offload (including index tables) + SimulateSleep(server, TDuration::Seconds(10)); + + // Find the incremental backup directory (timestamp-dependent) + TString incrBackupDir = FindIncrementalBackupDir(runtime, edgeActor, "/Root/.backups/collections/MyCollection"); + UNIT_ASSERT_C(!incrBackupDir.empty(), "Could not find incremental backup directory"); + Cerr << "Found incremental backup directory: " << incrBackupDir << Endl; + + // Debug: Check main table backup first + TString mainTablePath = TStringBuilder() << "/Root/.backups/collections/MyCollection/" << incrBackupDir << "/Table"; + auto mainBackup = KqpSimpleExec(runtime, TStringBuilder() << R"( + SELECT * FROM `)" << mainTablePath << R"(` + )"); + Cerr << "Main table backup: " << mainBackup << Endl; + + // Verify index backup table exists and contains covered column data + TString indexBackupPath = TStringBuilder() << "/Root/.backups/collections/MyCollection/" << incrBackupDir << "/__ydb_backup_meta/indexes/Table/ByAge"; + auto indexBackup = KqpSimpleExec(runtime, TStringBuilder() << R"( + SELECT * FROM `)" << indexBackupPath << R"(` + )"); + + // Debug: Check actual index implementation table + auto indexImplTableFinal = KqpSimpleExec(runtime, R"( + SELECT * FROM `/Root/Table/ByAge/indexImplTable` + )"); + + Cerr << "=== DEBUG: Index Tables State ===" << Endl; + Cerr << "1. After full backup (should be empty): " << indexImplTableInitial << Endl; + Cerr << "2. After initial insert (Alice, Bob): " << indexImplTableAfterInsert << Endl; + Cerr << "3. Final state (physical table): " << indexImplTableFinal << Endl; + Cerr << "4. Incremental backup (CDC captured): " << indexBackup << Endl; + Cerr << "=================================" << Endl; + + // Incremental backup should contain (with CDC compaction): + // - INSERT for (30, 1) with "Alice2" (final state after name update) + // - Tombstone (DELETE) for (25, 2) - Bob's old age entry, compacted with initial INSERT + // - Tombstone (DELETE) for (26, 2) - Bob's new age entry, compacted with INSERT from age change + // + // Note: Bob's name won't appear because: + // - Initial INSERT (25, 2, "Bob") was compacted with DELETE (25, 2) → just tombstone + // - Age change INSERT (26, 2, "Bob") was compacted with DELETE (26, 2) → just tombstone + // + // The salary change should not create a separate index entry since salary is not indexed or covered + + UNIT_ASSERT_C(indexBackup.find("uint32_value: 30") != TString::npos, + "Index backup should contain age 30"); + UNIT_ASSERT_C(indexBackup.find("Alice") != TString::npos, + "Index backup should contain Alice2 from covering column name update"); + UNIT_ASSERT_C(indexBackup.find("uint32_value: 25") != TString::npos, + "Index backup should contain tombstone for age 25"); + UNIT_ASSERT_C(indexBackup.find("uint32_value: 26") != TString::npos, + "Index backup should contain tombstone for age 26"); + + // Verify tombstones have NULL for covering column (correct behavior for DELETEs) + // and INSERT has the actual covering column value + UNIT_ASSERT_C(indexBackup.find("null_flag_value: NULL_VALUE") != TString::npos, + "Index backup tombstones should have NULL for covering columns"); + + // Parse and verify CDC metadata + // The backup contains 3 records, let's verify their metadata + size_t pos = 0; + int deleteCount = 0; + int insertCount = 0; + + while ((pos = indexBackup.find("bytes_value: \"", pos)) != TString::npos) { + pos += 14; // Skip 'bytes_value: "' + size_t endPos = indexBackup.find("\"", pos); + if (endPos == TString::npos) break; + + TString metadataStr = indexBackup.substr(pos, endPos - pos); + // Unescape the string + TString unescaped; + for (size_t i = 0; i < metadataStr.size(); ++i) { + if (metadataStr[i] == '\\' && i + 3 < metadataStr.size()) { + // Octal escape \nnn + ui8 val = ((metadataStr[i+1] - '0') << 6) | + ((metadataStr[i+2] - '0') << 3) | + (metadataStr[i+3] - '0'); + unescaped += static_cast(val); + i += 3; + } else { + unescaped += metadataStr[i]; + } + } + + auto metadata = ParseCdcMetadata(unescaped); + if (metadata.IsDelete) { + deleteCount++; + } else { + insertCount++; + } + + pos = endPos + 1; + } + + Cerr << "CDC metadata: " << deleteCount << " DELETEs, " << insertCount << " INSERTs" << Endl; + + UNIT_ASSERT_EQUAL_C(deleteCount, 2, "Should have 2 DELETE operations (tombstones for age 25 and 26)"); + UNIT_ASSERT_EQUAL_C(insertCount, 1, "Should have 1 INSERT operation (for Alice2)"); + } + + Y_UNIT_TEST(IncrementalBackupMultipleIndexes) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + .SetEnableBackupService(true) + .SetEnableRealSystemViewPaths(false) + .SetEnableDataColumnForIndexTable(true) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + + // Create table with 3 indexes: simple, covering, composite + TShardedTableOptions opts; + opts.Columns({ + {"key", "Uint32", true, false}, + {"name", "Utf8", false, false}, + {"age", "Uint32", false, false}, + {"city", "Utf8", false, false}, + {"salary", "Uint32", false, false} + }); + opts.Indexes({ + TShardedTableOptions::TIndex{"ByName", {"name"}, {}, NKikimrSchemeOp::EIndexTypeGlobal}, + TShardedTableOptions::TIndex{"ByAge", {"age"}, {"salary"}, NKikimrSchemeOp::EIndexTypeGlobal}, + TShardedTableOptions::TIndex{"ByCity", {"city", "name"}, {}, NKikimrSchemeOp::EIndexTypeGlobal} + }); + CreateShardedTable(server, edgeActor, "/Root", "Table", opts); + + // Create backup collection with incremental enabled + ExecSQL(server, edgeActor, R"( + CREATE BACKUP COLLECTION `MyCollection` + ( TABLE `/Root/Table` + ) + WITH + ( STORAGE = 'cluster' + , INCREMENTAL_BACKUP_ENABLED = 'true' + ); + )", false); + + // Perform full backup (creates CDC streams on main table and index tables) + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection`;)", false); + + // Wait for SchemeBoard updates to propagate to KQP metadata cache. + // CDC stream creation increments table schema versions (v1 -> v2), which is published + // to SchemeBoard asynchronously. KQP needs time to refresh its metadata cache before + // we can query the tables (main + indexes). Without this wait, KQP will fail with + // "schema version mismatch during metadata loading" error. + SimulateSleep(server, TDuration::Seconds(5)); + + // Insert initial data + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, name, age, city, salary) VALUES + (1, 'Alice', 30u, 'NYC', 5000u) + , (2, 'Bob', 25u, 'LA', 4000u) + ; + )"); + + // Update name: affects ByName and ByCity + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, name, age, city, salary) VALUES (1, 'Alice2', 30u, 'NYC', 5000u); + )"); + + // Update age: affects ByAge + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, name, age, city, salary) VALUES (2, 'Bob', 26u, 'LA', 4000u); + )"); + + // Delete: affects all indexes + ExecSQL(server, edgeActor, R"(DELETE FROM `/Root/Table` WHERE key=1;)"); + + // Insert new: affects all indexes + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, name, age, city, salary) VALUES (3, 'Carol', 28u, 'SF', 5500u); + )"); + + // Debug: Verify index tables have data before backup + auto byNameIndexData = KqpSimpleExec(runtime, R"( + SELECT name, key FROM `/Root/Table/ByName/indexImplTable` + ORDER BY name + )"); + Cerr << "ByName index table data BEFORE incremental backup: " << byNameIndexData << Endl; + + // Wait for CDC streams to capture all changes (including on index tables) + SimulateSleep(server, TDuration::Seconds(1)); + + // Take incremental backup + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection` INCREMENTAL;)", false); + + // Wait for backup operation to complete and CDC offload (including index tables) + SimulateSleep(server, TDuration::Seconds(10)); + + // Find the incremental backup directory (timestamp-dependent) + TString incrBackupDir = FindIncrementalBackupDir(runtime, edgeActor, "/Root/.backups/collections/MyCollection"); + UNIT_ASSERT_C(!incrBackupDir.empty(), "Could not find incremental backup directory"); + Cerr << "Found incremental backup directory: " << incrBackupDir << Endl; + + // Verify ByName index backup + TString byNamePath = TStringBuilder() << "/Root/.backups/collections/MyCollection/" << incrBackupDir << "/__ydb_backup_meta/indexes/Table/ByName"; + auto byNameBackup = KqpSimpleExec(runtime, TStringBuilder() << R"( + SELECT * FROM `)" << byNamePath << R"(` + )"); + Cerr << "ByName index backup: " << byNameBackup << Endl; + UNIT_ASSERT_C(byNameBackup.find("Alice") != TString::npos, + "ByName backup should contain Alice (deleted)"); + UNIT_ASSERT_C(byNameBackup.find("Alice2") != TString::npos, + "ByName backup should contain Alice2 (updated)"); + UNIT_ASSERT_C(byNameBackup.find("Carol") != TString::npos, + "ByName backup should contain Carol (new)"); + + // Verify ByAge index backup (covering index with salary) + TString byAgePath = TStringBuilder() << "/Root/.backups/collections/MyCollection/" << incrBackupDir << "/__ydb_backup_meta/indexes/Table/ByAge"; + auto byAgeBackup = KqpSimpleExec(runtime, TStringBuilder() << R"( + SELECT * FROM `)" << byAgePath << R"(` + )"); + Cerr << "ByAge index backup: " << byAgeBackup << Endl; + UNIT_ASSERT_C(byAgeBackup.find("uint32_value: 30") != TString::npos, + "ByAge backup should contain age 30 (deleted)"); + UNIT_ASSERT_C(byAgeBackup.find("uint32_value: 25") != TString::npos || + byAgeBackup.find("uint32_value: 26") != TString::npos, + "ByAge backup should contain age change (25 or 26)"); + UNIT_ASSERT_C(byAgeBackup.find("uint32_value: 28") != TString::npos, + "ByAge backup should contain age 28 (new)"); + UNIT_ASSERT_C(byAgeBackup.find("uint32_value: 5500") != TString::npos, + "ByAge backup should contain covered salary 5500"); + + // Verify ByCity index backup (composite key) + TString byCityPath = TStringBuilder() << "/Root/.backups/collections/MyCollection/" << incrBackupDir << "/__ydb_backup_meta/indexes/Table/ByCity"; + auto byCityBackup = KqpSimpleExec(runtime, TStringBuilder() << R"( + SELECT * FROM `)" << byCityPath << R"(` + )"); + Cerr << "ByCity index backup: " << byCityBackup << Endl; + UNIT_ASSERT_C(byCityBackup.find("NYC") != TString::npos, + "ByCity backup should contain NYC"); + UNIT_ASSERT_C(byCityBackup.find("Alice") != TString::npos, + "ByCity backup should contain Alice (part of composite key)"); + UNIT_ASSERT_C(byCityBackup.find("Alice2") != TString::npos, + "ByCity backup should contain Alice2 (updated composite key)"); + UNIT_ASSERT_C(byCityBackup.find("SF") != TString::npos, + "ByCity backup should contain SF (new)"); + UNIT_ASSERT_C(byCityBackup.find("Carol") != TString::npos, + "ByCity backup should contain Carol (new composite key)"); + } + + Y_UNIT_TEST(OmitIndexesIncrementalBackup) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + .SetEnableBackupService(true) + .SetEnableRealSystemViewPaths(false) + .SetEnableDataColumnForIndexTable(true) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + + // Create table with one global sync index + TShardedTableOptions opts; + opts.Columns({ + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false} + }); + opts.Indexes({ + TShardedTableOptions::TIndex{"ByValue", {"value"}, {}, NKikimrSchemeOp::EIndexTypeGlobal} + }); + CreateShardedTable(server, edgeActor, "/Root", "Table", opts); + + // Create backup collection with OmitIndexes: true + ExecSQL(server, edgeActor, R"( + CREATE BACKUP COLLECTION `MyCollection` + ( TABLE `/Root/Table` + ) + WITH + ( STORAGE = 'cluster' + , INCREMENTAL_BACKUP_ENABLED = 'true' + , OMIT_INDEXES = 'true' + ); + )", false); + + // Perform full backup + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection`;)", false); + + // Insert and modify data + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 100) + , (2, 200) + , (3, 300) + ; + )"); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES (2, 250); + )"); + + ExecSQL(server, edgeActor, R"(DELETE FROM `/Root/Table` WHERE key=3;)"); + + // Take incremental backup + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection` INCREMENTAL;)", false); + + SimulateSleep(server, TDuration::Seconds(5)); + + // Verify main table backup exists + auto mainTableBackup = KqpSimpleExec(runtime, R"( + SELECT key, value FROM `/Root/.backups/collections/MyCollection/19700101000002Z_incremental/Table` + ORDER BY key + )"); + + Cerr << "Main table backup (with OmitIndexes): " << mainTableBackup << Endl; + UNIT_ASSERT_C(mainTableBackup.find("uint32_value: 2") != TString::npos, + "Main table backup should exist with OmitIndexes flag"); + UNIT_ASSERT_C(mainTableBackup.find("uint32_value: 250") != TString::npos, + "Main table backup should contain updated value"); + + // Verify index backup directory does NOT exist + // Attempt to query index backup table - should fail or return empty + bool indexBackupExists = true; + try { + auto indexBackup = KqpSimpleExec(runtime, R"( + SELECT * FROM `/Root/.backups/collections/MyCollection/19700101000002Z_incremental/__ydb_backup_meta/indexes/Table/ByValue` + )"); + // If we got here without exception, check if it's empty or doesn't exist + if (indexBackup.empty() || indexBackup.find("ERROR") != TString::npos || + indexBackup.find("not found") != TString::npos || indexBackup.find("doesn't exist") != TString::npos) { + indexBackupExists = false; + } + Cerr << "Index backup query result (should not exist): " << indexBackup << Endl; + } catch (...) { + // Expected - index backup table should not exist + indexBackupExists = false; + } + + UNIT_ASSERT_C(!indexBackupExists, + "Index backup should NOT exist when OmitIndexes flag is set"); + } + } // Y_UNIT_TEST_SUITE(IncrementalBackup) } // NKikimr diff --git a/ydb/core/tx/schemeshard/schemeshard__backup_collection_common.cpp b/ydb/core/tx/schemeshard/schemeshard__backup_collection_common.cpp index 28c22af19ece..3f6f01a4e9b7 100644 --- a/ydb/core/tx/schemeshard/schemeshard__backup_collection_common.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__backup_collection_common.cpp @@ -130,6 +130,55 @@ std::optional>> GetBackupRequiredPaths( } } + // Add index backup metadata directories if incremental backup is enabled + bool incrBackupEnabled = bc->Description.HasIncrementalBackupConfig(); + bool omitIndexes = incrBackupEnabled && bc->Description.GetIncrementalBackupConfig().GetOmitIndexes(); + + if (incrBackupEnabled && !omitIndexes) { + for (const auto& item : bc->Description.GetExplicitEntryList().GetEntries()) { + const auto tablePath = TPath::Resolve(item.GetPath(), context.SS); + + // Skip if path is not resolved or not a table + auto checks = tablePath.Check(); + checks.IsResolved().IsTable(); + if (!checks) { + continue; + } + + std::pair paths; + TString err; + if (!TrySplitPathByDb(item.GetPath(), tx.GetWorkingDir(), paths, err)) { + continue; + } + auto& relativeItemPath = paths.second; + + // Check if table has indexes + for (const auto& [childName, childPathId] : tablePath.Base()->GetChildren()) { + auto childPath = context.SS->PathsById.at(childPathId); + + if (childPath->PathType != NKikimrSchemeOp::EPathTypeTableIndex) { + continue; + } + + auto indexInfo = context.SS->Indexes.at(childPathId); + if (indexInfo->Type != NKikimrSchemeOp::EIndexTypeGlobal) { + continue; + } + + // Add required PARENT directory path for index backup: + // {targetDir}/__ydb_backup_meta/indexes/{table_path} + // The index name will be the table name created within this directory + TString indexBackupParentPath = JoinPath({ + targetDir, + "__ydb_backup_meta", + "indexes", + relativeItemPath + }); + collectionPaths.emplace(indexBackupParentPath); + } + } + } + return paths; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp index 92155f564df4..ec562aea920a 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp @@ -635,13 +635,8 @@ TVector CreateAlterCdcStream(TOperationId opId, const TTxTr result.push_back(DropLock(NextPartId(opId, result), outTx)); } - if (workingDirPath.IsTableIndex()) { - auto outTx = TransactionTemplate(workingDirPath.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex); - outTx.MutableAlterTableIndex()->SetName(workingDirPath.LeafName()); - outTx.MutableAlterTableIndex()->SetState(NKikimrSchemeOp::EIndexState::EIndexStateReady); - - result.push_back(CreateAlterTableIndex(NextPartId(opId, result), outTx)); - } + // Note: For ALTER CDC streams on index impl tables, we don't create an AlterTableIndex sub-operation + // The index version sync happens in the CDC stream operation handlers directly return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp index 261afefbbec5..d69f17cd38be 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp @@ -2,6 +2,8 @@ #include "schemeshard__op_traits.h" #include "schemeshard__operation_common.h" #include "schemeshard__operation_create_cdc_stream.h" +#include "schemeshard__operation_part.h" +#include "schemeshard_utils.h" #include "schemeshard_impl.h" @@ -64,6 +66,8 @@ TVector CreateBackupBackupCollection(TOperationId opId, con Y_ABORT_UNLESS(context.SS->BackupCollections.contains(bcPath->PathId)); const auto& bc = context.SS->BackupCollections[bcPath->PathId]; bool incrBackupEnabled = bc->Description.HasIncrementalBackupConfig(); + // Check if indexes should be omitted (only relevant when incremental backup is enabled) + bool omitIndexes = incrBackupEnabled && bc->Description.GetIncrementalBackupConfig().GetOmitIndexes(); TString streamName = NBackup::ToX509String(TlsActivationContext->AsActorContext().Now()) + "_continuousBackupImpl"; for (const auto& item : bc->Description.GetExplicitEntryList().GetEntries()) { @@ -155,6 +159,70 @@ TVector CreateBackupBackupCollection(TOperationId opId, con NCdc::DoCreatePqPart(result, createCdcStreamOp, opId, streamPath, streamName, table, boundaries, false); } + + // Create CDC streams for indexes (only if incremental backup is enabled and indexes are not omitted) + if (!omitIndexes) { + for (const auto& item : bc->Description.GetExplicitEntryList().GetEntries()) { + const auto tablePath = TPath::Resolve(item.GetPath(), context.SS); + + // Iterate through table's children to find indexes + for (const auto& [childName, childPathId] : tablePath.Base()->GetChildren()) { + auto childPath = context.SS->PathsById.at(childPathId); + + // Skip non-index children (CDC streams, etc.) + if (childPath->PathType != NKikimrSchemeOp::EPathTypeTableIndex) { + continue; + } + + // Get index info and filter for global sync only + auto indexInfo = context.SS->Indexes.at(childPathId); + if (indexInfo->Type != NKikimrSchemeOp::EIndexTypeGlobal) { + continue; + } + + // Get index implementation table (single child of index) + auto indexPath = TPath::Init(childPathId, context.SS); + Y_ABORT_UNLESS(indexPath.Base()->GetChildren().size() == 1); + auto [implTableName, implTablePathId] = *indexPath.Base()->GetChildren().begin(); + + auto indexTablePath = indexPath.Child(implTableName); + auto indexTable = context.SS->Tables.at(implTablePathId); + + // Create CDC stream on index impl table + NKikimrSchemeOp::TCreateCdcStream createCdcStreamOp; + createCdcStreamOp.SetTableName(implTableName); // Use relative name only + auto& streamDescription = *createCdcStreamOp.MutableStreamDescription(); + streamDescription.SetName(streamName); // Same stream name as main table + streamDescription.SetMode(NKikimrSchemeOp::ECdcStreamModeUpdate); + streamDescription.SetFormat(NKikimrSchemeOp::ECdcStreamFormatProto); + + // Create CDC stream metadata (schemeshard side) + NCdc::DoCreateStreamImpl(result, createCdcStreamOp, opId, indexTablePath, false, false); + + // Create AtTable operation to notify datashard (without schema change) + { + auto outTx = TransactionTemplate(indexPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamAtTable); + auto& cdcOp = *outTx.MutableCreateCdcStream(); + cdcOp.CopyFrom(createCdcStreamOp); + result.push_back(CreateNewCdcStreamAtTable(NextPartId(opId, result), outTx, false)); + } + + // Create PQ part for index CDC stream + TVector boundaries; + const auto& partitions = indexTable->GetPartitions(); + boundaries.reserve(partitions.size() - 1); + for (ui32 i = 0; i < partitions.size(); ++i) { + const auto& partition = partitions.at(i); + if (i != partitions.size() - 1) { + boundaries.push_back(partition.EndOfRange); + } + } + + const auto streamPath = indexTablePath.Child(streamName); + NCdc::DoCreatePqPart(result, createCdcStreamOp, opId, streamPath, streamName, indexTable, boundaries, false); + } + } + } } return result; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_backup_incremental_backup_collection.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_backup_incremental_backup_collection.cpp index e08dcf9c0e17..c2da2d2bbd55 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_backup_incremental_backup_collection.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_backup_incremental_backup_collection.cpp @@ -223,6 +223,74 @@ TVector CreateBackupIncrementalBackupCollection(TOperationI streams.push_back(stream); } + // Process indexes if they are not omitted + bool omitIndexes = bc->Description.GetIncrementalBackupConfig().GetOmitIndexes(); + if (!omitIndexes) { + for (const auto& item : bc->Description.GetExplicitEntryList().GetEntries()) { + const auto tablePath = TPath::Resolve(item.GetPath(), context.SS); + auto table = context.SS->Tables.at(tablePath.Base()->PathId); + + std::pair paths; + TString err; + if (!TrySplitPathByDb(item.GetPath(), bcPath.GetDomainPathString(), paths, err)) { + result = {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, err)}; + return result; + } + auto& relativeItemPath = paths.second; + + // Iterate through table's children to find indexes + for (const auto& [childName, childPathId] : tablePath.Base()->GetChildren()) { + auto childPath = context.SS->PathsById.at(childPathId); + + // Skip non-index children (CDC streams, etc.) + if (childPath->PathType != NKikimrSchemeOp::EPathTypeTableIndex) { + continue; + } + + // Get index info and filter for global sync only + auto indexInfo = context.SS->Indexes.at(childPathId); + if (indexInfo->Type != NKikimrSchemeOp::EIndexTypeGlobal) { + continue; + } + + // Get index implementation table (single child of index) + auto indexPath = TPath::Init(childPathId, context.SS); + Y_ABORT_UNLESS(indexPath.Base()->GetChildren().size() == 1); + auto [implTableName, implTablePathId] = *indexPath.Base()->GetChildren().begin(); + + // Build relative path to index impl table (relative to working dir) + TString indexImplTableRelPath = JoinPath({relativeItemPath, childName, implTableName}); + + // Create AlterContinuousBackup for index impl table + NKikimrSchemeOp::TModifyScheme modifyScheme; + modifyScheme.SetWorkingDir(tx.GetWorkingDir()); + modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterContinuousBackup); + modifyScheme.SetInternal(true); + + auto& cb = *modifyScheme.MutableAlterContinuousBackup(); + cb.SetTableName(indexImplTableRelPath); // Relative path: table1/index1/indexImplTable + + auto& ib = *cb.MutableTakeIncrementalBackup(); + // Destination: {backup_collection}/{timestamp}_inc/__ydb_backup_meta/indexes/{table_path}/{index_name} + TString dstPath = JoinPath({ + tx.GetBackupIncrementalBackupCollection().GetName(), + tx.GetBackupIncrementalBackupCollection().GetTargetDir(), + "__ydb_backup_meta", + "indexes", + relativeItemPath, // Relative table path (e.g., "table1") + childName // Index name (e.g., "index1") + }); + ib.SetDstPath(dstPath); + + TPathId stream; + if (!CreateAlterContinuousBackup(opId, modifyScheme, context, result, stream)) { + return result; + } + streams.push_back(stream); + } + } + } + CreateLongIncrementalBackupOp(opId, bcPath, result, streams); return result; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common_cdc_stream.cpp index 2cb60ead5641..11f3a7bae6e2 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common_cdc_stream.cpp @@ -124,9 +124,43 @@ bool TProposeAtTable::HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOpera Y_ABORT_UNLESS(context.SS->Tables.contains(pathId)); auto table = context.SS->Tables.at(pathId); - table->AlterVersion += 1; - NIceDb::TNiceDb db(context.GetDB()); + + // Check if this is an index implementation table + // Check if this is an index implementation table + // If so, we need to sync the parent index version to match the impl table version + TPathId parentPathId = path->ParentPathId; + if (parentPathId && context.SS->PathsById.contains(parentPathId)) { + auto parentPath = context.SS->PathsById.at(parentPathId); + if (parentPath->IsTableIndex()) { + // This is an index impl table, sync parent index version directly + Y_ABORT_UNLESS(context.SS->Indexes.contains(parentPathId)); + auto index = context.SS->Indexes.at(parentPathId); + + // Set index version to match impl table version (which will be incremented below) + index->AlterVersion = table->AlterVersion + 1; + + // Persist the index version update directly to database + db.Table().Key(parentPathId.LocalPathId).Update( + NIceDb::TUpdate(index->AlterVersion) + ); + + // Clear caches and publish for the index + context.SS->ClearDescribePathCaches(parentPath); + context.OnComplete.PublishToSchemeBoard(OperationId, parentPathId); + + LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " Synced parent index version with impl table" + << ", indexPathId: " << parentPathId + << ", indexName: " << parentPath->Name + << ", newVersion: " << index->AlterVersion + << ", at schemeshard: " << context.SS->SelfTabletId()); + } + } + + // Increment and persist the table's AlterVersion + // This happens AFTER DataShards have been notified and are waiting for the plan + table->AlterVersion += 1; context.SS->PersistTableAlterVersion(db, pathId, table); context.SS->ClearDescribePathCaches(path); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index 211bb3654e0d..a98d34b2111c 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -984,13 +984,10 @@ TVector CreateNewCdcStream(TOperationId opId, const TTxTran DoCreateLock(result, opId, workingDirPath, tablePath); } - if (workingDirPath.IsTableIndex()) { - auto outTx = TransactionTemplate(workingDirPath.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex); - outTx.MutableAlterTableIndex()->SetName(workingDirPath.LeafName()); - outTx.MutableAlterTableIndex()->SetState(NKikimrSchemeOp::EIndexState::EIndexStateReady); - - result.push_back(CreateAlterTableIndex(NextPartId(opId, result), outTx)); - } + // Note: For CDC streams on index impl tables, we don't create an AlterTableIndex sub-operation + // The index version will be synced directly in TProposeAtTable::HandleReply to avoid + // the complexity of managing AlterData for what is essentially just a version sync + // (not an actual index schema change) Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId)); auto table = context.SS->Tables.at(tablePath.Base()->PathId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp index 593de4172b0a..865b67891336 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp @@ -583,13 +583,8 @@ void DoDropStream( result.push_back(DropLock(NextPartId(opId, result), outTx)); } - if (workingDirPath.IsTableIndex()) { - auto outTx = TransactionTemplate(workingDirPath.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex); - outTx.MutableAlterTableIndex()->SetName(workingDirPath.LeafName()); - outTx.MutableAlterTableIndex()->SetState(NKikimrSchemeOp::EIndexState::EIndexStateReady); - - result.push_back(CreateAlterTableIndex(NextPartId(opId, result), outTx)); - } + // Note: For DROP CDC streams on index impl tables, we don't create an AlterTableIndex sub-operation + // The index version sync is not needed for drops (no schema changes to track) for (const auto& streamPath : streamPaths) { auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStreamImpl); diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp index 73d2ae506b1d..97adfd1afa99 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp @@ -1390,6 +1390,11 @@ void TSchemeShard::DescribeTable( { Y_UNUSED(typeRegistry); + LOG_NOTICE_S(TlsActivationContext->AsActorContext(), NKikimrServices::FLAT_TX_SCHEMESHARD, + "DescribeTable publishing schema version" + << " AlterVersion=" << tableInfo.AlterVersion + << " for SchemeBoard"); + entry->SetTableSchemaVersion(tableInfo.AlterVersion); FillColumns(tableInfo, *entry->MutableColumns()); FillKeyColumns(tableInfo, *entry->MutableKeyColumnNames(), *entry->MutableKeyColumnIds()); diff --git a/ydb/core/tx/schemeshard/ut_backup_collection/ut_backup_collection.cpp b/ydb/core/tx/schemeshard/ut_backup_collection/ut_backup_collection.cpp index 6fc1a58f0946..12f6bd34a930 100644 --- a/ydb/core/tx/schemeshard/ut_backup_collection/ut_backup_collection.cpp +++ b/ydb/core/tx/schemeshard/ut_backup_collection/ut_backup_collection.cpp @@ -1699,4 +1699,617 @@ Y_UNIT_TEST_SUITE(TBackupCollectionTests) { // Verify it was created TestLs(runtime, "/MyRoot/.backups/collections/TestCollection/__ydb_backup_meta", false, NLs::PathExist); } + + Y_UNIT_TEST(SingleTableWithGlobalSyncIndex) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true)); + ui64 txId = 100; + + SetupLogging(runtime); + PrepareDirs(runtime, env, txId); + + // Create incremental backup collection + TString collectionSettings = R"( + Name: ")" DEFAULT_NAME_1 R"(" + ExplicitEntryList { + Entries { + Type: ETypeTable + Path: "/MyRoot/TableWithIndex" + } + } + Cluster: {} + IncrementalBackupConfig: {} + )"; + + TestCreateBackupCollection(runtime, ++txId, "/MyRoot/.backups/collections/", collectionSettings); + env.TestWaitNotification(runtime, txId); + + // Create table with one global sync covering index + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "TableWithIndex" + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "ValueIndex" + KeyColumnNames: ["value"] + Type: EIndexTypeGlobal + } + )"); + env.TestWaitNotification(runtime, txId); + + // Execute full backup + TestBackupBackupCollection(runtime, ++txId, "/MyRoot", + R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")"); + env.TestWaitNotification(runtime, txId); + + // Verify CDC stream exists on main table + auto mainTableDesc = DescribePrivatePath(runtime, "/MyRoot/TableWithIndex", true, true); + UNIT_ASSERT(mainTableDesc.GetPathDescription().HasTable()); + + const auto& tableDesc = mainTableDesc.GetPathDescription().GetTable(); + bool foundMainTableCdc = false; + TString mainTableCdcName; + + for (size_t i = 0; i < tableDesc.CdcStreamsSize(); ++i) { + const auto& cdcStream = tableDesc.GetCdcStreams(i); + if (cdcStream.GetName().EndsWith("_continuousBackupImpl")) { + foundMainTableCdc = true; + mainTableCdcName = cdcStream.GetName(); + Cerr << "Found main table CDC stream: " << mainTableCdcName << Endl; + break; + } + } + UNIT_ASSERT_C(foundMainTableCdc, "Main table should have CDC stream with '_continuousBackupImpl' suffix"); + + // Verify CDC stream exists on index implementation table + auto indexDesc = DescribePrivatePath(runtime, "/MyRoot/TableWithIndex/ValueIndex", true, true); + UNIT_ASSERT(indexDesc.GetPathDescription().HasTableIndex()); + + // Get index implementation table (first child of index) + UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetPathDescription().ChildrenSize(), 1); + TString indexImplTableName = indexDesc.GetPathDescription().GetChildren(0).GetName(); + + auto indexImplTableDesc = DescribePrivatePath(runtime, + "/MyRoot/TableWithIndex/ValueIndex/" + indexImplTableName, true, true); + UNIT_ASSERT(indexImplTableDesc.GetPathDescription().HasTable()); + + const auto& indexTableDesc = indexImplTableDesc.GetPathDescription().GetTable(); + bool foundIndexCdc = false; + TString indexCdcName; + + for (size_t i = 0; i < indexTableDesc.CdcStreamsSize(); ++i) { + const auto& cdcStream = indexTableDesc.GetCdcStreams(i); + if (cdcStream.GetName().EndsWith("_continuousBackupImpl")) { + foundIndexCdc = true; + indexCdcName = cdcStream.GetName(); + Cerr << "Found index CDC stream: " << indexCdcName << Endl; + break; + } + } + UNIT_ASSERT_C(foundIndexCdc, "Index implementation table should have CDC stream with '_continuousBackupImpl' suffix"); + + // Verify CDC stream names match pattern and use same timestamp + UNIT_ASSERT_VALUES_EQUAL(mainTableCdcName, indexCdcName); + UNIT_ASSERT_C(mainTableCdcName.Contains("Z") && mainTableCdcName.EndsWith("_continuousBackupImpl"), + "CDC stream name should have X.509 timestamp format (YYYYMMDDHHMMSSZ_continuousBackupImpl)"); + } + + Y_UNIT_TEST(SingleTableWithMultipleGlobalSyncIndexes) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true)); + ui64 txId = 100; + + SetupLogging(runtime); + PrepareDirs(runtime, env, txId); + + // Create incremental backup collection + TString collectionSettings = R"( + Name: ")" DEFAULT_NAME_1 R"(" + ExplicitEntryList { + Entries { + Type: ETypeTable + Path: "/MyRoot/TableWithMultipleIndexes" + } + } + Cluster: {} + IncrementalBackupConfig: {} + )"; + + TestCreateBackupCollection(runtime, ++txId, "/MyRoot/.backups/collections/", collectionSettings); + env.TestWaitNotification(runtime, txId); + + // Create table with multiple global sync indexes + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "TableWithMultipleIndexes" + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "value1" Type: "Utf8" } + Columns { Name: "value2" Type: "Uint64" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "Value1Index" + KeyColumnNames: ["value1"] + Type: EIndexTypeGlobal + } + IndexDescription { + Name: "Value2Index" + KeyColumnNames: ["value2"] + DataColumnNames: ["value1"] + Type: EIndexTypeGlobal + } + )"); + env.TestWaitNotification(runtime, txId); + + // Execute full backup + TestBackupBackupCollection(runtime, ++txId, "/MyRoot", + R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")"); + env.TestWaitNotification(runtime, txId); + + // Verify CDC stream on main table + auto mainTableDesc = DescribePrivatePath(runtime, "/MyRoot/TableWithMultipleIndexes", true, true); + const auto& tableDesc = mainTableDesc.GetPathDescription().GetTable(); + + TString mainCdcName; + for (size_t i = 0; i < tableDesc.CdcStreamsSize(); ++i) { + const auto& cdcStream = tableDesc.GetCdcStreams(i); + if (cdcStream.GetName().EndsWith("_continuousBackupImpl")) { + mainCdcName = cdcStream.GetName(); + break; + } + } + UNIT_ASSERT_C(!mainCdcName.empty(), "Main table should have CDC stream"); + + // Verify CDC streams on both indexes + TVector indexNames = {"Value1Index", "Value2Index"}; + TVector indexCdcNames; + + for (const auto& indexName : indexNames) { + auto indexDesc = DescribePrivatePath(runtime, + "/MyRoot/TableWithMultipleIndexes/" + indexName, true, true); + UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetPathDescription().ChildrenSize(), 1); + TString indexImplTableName = indexDesc.GetPathDescription().GetChildren(0).GetName(); + + auto indexImplTableDesc = DescribePrivatePath(runtime, + "/MyRoot/TableWithMultipleIndexes/" + indexName + "/" + indexImplTableName, true, true); + const auto& indexTableDesc = indexImplTableDesc.GetPathDescription().GetTable(); + + bool foundCdc = false; + for (size_t i = 0; i < indexTableDesc.CdcStreamsSize(); ++i) { + const auto& cdcStream = indexTableDesc.GetCdcStreams(i); + if (cdcStream.GetName().EndsWith("_continuousBackupImpl")) { + indexCdcNames.push_back(cdcStream.GetName()); + foundCdc = true; + Cerr << "Found CDC stream on " << indexName << ": " << cdcStream.GetName() << Endl; + break; + } + } + UNIT_ASSERT_C(foundCdc, "Index " + indexName + " should have CDC stream"); + } + + // Verify all streams use the same timestamp + UNIT_ASSERT_VALUES_EQUAL(indexCdcNames.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(mainCdcName, indexCdcNames[0]); + UNIT_ASSERT_VALUES_EQUAL(mainCdcName, indexCdcNames[1]); + } + + Y_UNIT_TEST(TableWithMixedIndexTypes) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true)); + ui64 txId = 100; + + SetupLogging(runtime); + PrepareDirs(runtime, env, txId); + + // Create incremental backup collection + TString collectionSettings = R"( + Name: ")" DEFAULT_NAME_1 R"(" + ExplicitEntryList { + Entries { + Type: ETypeTable + Path: "/MyRoot/TableWithMixedIndexes" + } + } + Cluster: {} + IncrementalBackupConfig: {} + )"; + + TestCreateBackupCollection(runtime, ++txId, "/MyRoot/.backups/collections/", collectionSettings); + env.TestWaitNotification(runtime, txId); + + // Create table with global sync + async indexes + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "TableWithMixedIndexes" + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "value1" Type: "Utf8" } + Columns { Name: "value2" Type: "Uint64" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "SyncIndex" + KeyColumnNames: ["value1"] + Type: EIndexTypeGlobal + } + IndexDescription { + Name: "AsyncIndex" + KeyColumnNames: ["value2"] + Type: EIndexTypeGlobalAsync + } + )"); + env.TestWaitNotification(runtime, txId); + + // Execute full backup + TestBackupBackupCollection(runtime, ++txId, "/MyRoot", + R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")"); + env.TestWaitNotification(runtime, txId); + + // Verify CDC stream on main table + auto mainTableDesc = DescribePrivatePath(runtime, "/MyRoot/TableWithMixedIndexes", true, true); + const auto& tableDesc = mainTableDesc.GetPathDescription().GetTable(); + + bool foundMainCdc = false; + for (size_t i = 0; i < tableDesc.CdcStreamsSize(); ++i) { + if (tableDesc.GetCdcStreams(i).GetName().EndsWith("_continuousBackupImpl")) { + foundMainCdc = true; + break; + } + } + UNIT_ASSERT_C(foundMainCdc, "Main table should have CDC stream"); + + // Verify CDC stream on global sync index ONLY + auto syncIndexDesc = DescribePrivatePath(runtime, "/MyRoot/TableWithMixedIndexes/SyncIndex", true, true); + UNIT_ASSERT_VALUES_EQUAL(syncIndexDesc.GetPathDescription().ChildrenSize(), 1); + TString syncImplTableName = syncIndexDesc.GetPathDescription().GetChildren(0).GetName(); + + auto syncImplTableDesc = DescribePrivatePath(runtime, + "/MyRoot/TableWithMixedIndexes/SyncIndex/" + syncImplTableName, true, true); + const auto& syncTableDesc = syncImplTableDesc.GetPathDescription().GetTable(); + + bool foundSyncCdc = false; + for (size_t i = 0; i < syncTableDesc.CdcStreamsSize(); ++i) { + if (syncTableDesc.GetCdcStreams(i).GetName().EndsWith("_continuousBackupImpl")) { + foundSyncCdc = true; + Cerr << "Found CDC stream on SyncIndex (expected)" << Endl; + break; + } + } + UNIT_ASSERT_C(foundSyncCdc, "Global sync index should have CDC stream"); + + // Verify NO CDC stream on async index + auto asyncIndexDesc = DescribePrivatePath(runtime, "/MyRoot/TableWithMixedIndexes/AsyncIndex", true, true); + UNIT_ASSERT_VALUES_EQUAL(asyncIndexDesc.GetPathDescription().ChildrenSize(), 1); + TString asyncImplTableName = asyncIndexDesc.GetPathDescription().GetChildren(0).GetName(); + + auto asyncImplTableDesc = DescribePrivatePath(runtime, + "/MyRoot/TableWithMixedIndexes/AsyncIndex/" + asyncImplTableName, true, true); + const auto& asyncTableDesc = asyncImplTableDesc.GetPathDescription().GetTable(); + + bool foundAsyncCdc = false; + for (size_t i = 0; i < asyncTableDesc.CdcStreamsSize(); ++i) { + if (asyncTableDesc.GetCdcStreams(i).GetName().EndsWith("_continuousBackupImpl")) { + foundAsyncCdc = true; + break; + } + } + UNIT_ASSERT_C(!foundAsyncCdc, "Async index should NOT have CDC stream"); + } + + Y_UNIT_TEST(MultipleTablesWithIndexes) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true)); + ui64 txId = 100; + + SetupLogging(runtime); + PrepareDirs(runtime, env, txId); + + // Create incremental backup collection with 2 tables + TString collectionSettings = R"( + Name: ")" DEFAULT_NAME_1 R"(" + ExplicitEntryList { + Entries { + Type: ETypeTable + Path: "/MyRoot/Table1" + } + Entries { + Type: ETypeTable + Path: "/MyRoot/Table2" + } + } + Cluster: {} + IncrementalBackupConfig: {} + )"; + + TestCreateBackupCollection(runtime, ++txId, "/MyRoot/.backups/collections/", collectionSettings); + env.TestWaitNotification(runtime, txId); + + // Create Table1 with index + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "Table1" + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "Index1" + KeyColumnNames: ["value"] + Type: EIndexTypeGlobal + } + )"); + env.TestWaitNotification(runtime, txId); + + // Create Table2 with index + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "Table2" + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "data" Type: "Utf8" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "Index2" + KeyColumnNames: ["data"] + Type: EIndexTypeGlobal + } + )"); + env.TestWaitNotification(runtime, txId); + + // Execute full backup + TestBackupBackupCollection(runtime, ++txId, "/MyRoot", + R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")"); + env.TestWaitNotification(runtime, txId); + + // Verify CDC streams on both main tables + TVector tables = {"Table1", "Table2"}; + TVector indexes = {"Index1", "Index2"}; + + for (size_t tableIdx = 0; tableIdx < tables.size(); ++tableIdx) { + const auto& tableName = tables[tableIdx]; + const auto& indexName = indexes[tableIdx]; + + // Check main table CDC + auto mainTableDesc = DescribePrivatePath(runtime, "/MyRoot/" + tableName, true, true); + const auto& tableDesc = mainTableDesc.GetPathDescription().GetTable(); + + bool foundMainCdc = false; + for (size_t i = 0; i < tableDesc.CdcStreamsSize(); ++i) { + if (tableDesc.GetCdcStreams(i).GetName().EndsWith("_continuousBackupImpl")) { + foundMainCdc = true; + Cerr << "Found CDC stream on " << tableName << Endl; + break; + } + } + UNIT_ASSERT_C(foundMainCdc, tableName + " should have CDC stream"); + + // Check index CDC + auto indexDesc = DescribePrivatePath(runtime, "/MyRoot/" + tableName + "/" + indexName, true, true); + UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetPathDescription().ChildrenSize(), 1); + TString indexImplTableName = indexDesc.GetPathDescription().GetChildren(0).GetName(); + + auto indexImplTableDesc = DescribePrivatePath(runtime, + "/MyRoot/" + tableName + "/" + indexName + "/" + indexImplTableName, true, true); + const auto& indexTableDesc = indexImplTableDesc.GetPathDescription().GetTable(); + + bool foundIndexCdc = false; + for (size_t i = 0; i < indexTableDesc.CdcStreamsSize(); ++i) { + if (indexTableDesc.GetCdcStreams(i).GetName().EndsWith("_continuousBackupImpl")) { + foundIndexCdc = true; + Cerr << "Found CDC stream on " << indexName << Endl; + break; + } + } + UNIT_ASSERT_C(foundIndexCdc, indexName + " should have CDC stream"); + } + } + + Y_UNIT_TEST(IncrementalBackupWithIndexes) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true)); + ui64 txId = 100; + + SetupLogging(runtime); + PrepareDirs(runtime, env, txId); + + // Create incremental backup collection + TString collectionSettings = R"( + Name: ")" DEFAULT_NAME_1 R"(" + ExplicitEntryList { + Entries { + Type: ETypeTable + Path: "/MyRoot/TableForIncremental" + } + } + Cluster: {} + IncrementalBackupConfig: {} + )"; + + TestCreateBackupCollection(runtime, ++txId, "/MyRoot/.backups/collections/", collectionSettings); + env.TestWaitNotification(runtime, txId); + + // Create table with global sync index + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "TableForIncremental" + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "ValueIndex" + KeyColumnNames: ["value"] + Type: EIndexTypeGlobal + } + )"); + env.TestWaitNotification(runtime, txId); + + // Execute full backup (creates CDC streams) + TestBackupBackupCollection(runtime, ++txId, "/MyRoot", + R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")"); + env.TestWaitNotification(runtime, txId); + + // Verify CDC streams were created for both main table and index + auto mainTableDesc = DescribePrivatePath(runtime, "/MyRoot/TableForIncremental", true, true); + const auto& tableDesc = mainTableDesc.GetPathDescription().GetTable(); + + bool foundMainCdc = false; + for (size_t i = 0; i < tableDesc.CdcStreamsSize(); ++i) { + if (tableDesc.GetCdcStreams(i).GetName().EndsWith("_continuousBackupImpl")) { + foundMainCdc = true; + Cerr << "Found CDC stream on main table" << Endl; + break; + } + } + UNIT_ASSERT_C(foundMainCdc, "Main table should have CDC stream after full backup"); + + // Verify CDC stream on index + auto indexDesc = DescribePrivatePath(runtime, "/MyRoot/TableForIncremental/ValueIndex", true, true); + UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetPathDescription().ChildrenSize(), 1); + TString indexImplTableName = indexDesc.GetPathDescription().GetChildren(0).GetName(); + + auto indexImplTableDesc = DescribePrivatePath(runtime, + "/MyRoot/TableForIncremental/ValueIndex/" + indexImplTableName, true, true); + const auto& indexTableDesc = indexImplTableDesc.GetPathDescription().GetTable(); + + bool foundIndexCdc = false; + for (size_t i = 0; i < indexTableDesc.CdcStreamsSize(); ++i) { + if (indexTableDesc.GetCdcStreams(i).GetName().EndsWith("_continuousBackupImpl")) { + foundIndexCdc = true; + Cerr << "Found CDC stream on index implementation table" << Endl; + break; + } + } + UNIT_ASSERT_C(foundIndexCdc, "Index implementation table should have CDC stream after full backup"); + + runtime.AdvanceCurrentTime(TDuration::Seconds(1)); + + // Execute incremental backup (rotates CDC, creates backup tables) + TestBackupIncrementalBackupCollection(runtime, ++txId, "/MyRoot", + R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")"); + env.TestWaitNotification(runtime, txId); + + // Verify backup collection structure + TestDescribeResult(DescribePath(runtime, "/MyRoot/.backups/collections/" DEFAULT_NAME_1), { + NLs::PathExist, + NLs::IsBackupCollection, + NLs::ChildrenCount(2), // full + incremental + }); + + // Find the incremental backup directory (should end with "_incremental") + auto collectionDesc = DescribePath(runtime, "/MyRoot/.backups/collections/" DEFAULT_NAME_1, true, true); + TString incrBackupDir; + for (size_t i = 0; i < collectionDesc.GetPathDescription().ChildrenSize(); ++i) { + const auto& child = collectionDesc.GetPathDescription().GetChildren(i); + Cerr << "Child: " << child.GetName() << " PathState: " << child.GetPathState() << Endl; + if (child.GetName().EndsWith("_incremental")) { + incrBackupDir = child.GetName(); + break; + } + } + UNIT_ASSERT_C(!incrBackupDir.empty(), "Should find incremental backup directory"); + + // Verify backup table for main table exists + TestDescribeResult(DescribePath(runtime, + "/MyRoot/.backups/collections/" DEFAULT_NAME_1 "/" + incrBackupDir + "/TableForIncremental"), { + NLs::PathExist, + NLs::IsTable, + }); + + // Verify index backup table exists in __ydb_backup_meta/indexes/TableForIncremental/ValueIndex + TString indexBackupPath = "/MyRoot/.backups/collections/" DEFAULT_NAME_1 "/" + incrBackupDir + + "/__ydb_backup_meta/indexes/TableForIncremental/ValueIndex"; + TestDescribeResult(DescribePath(runtime, indexBackupPath), { + NLs::PathExist, + NLs::IsTable, + }); + + Cerr << "SUCCESS: Full backup created CDC streams for both main table and index" << Endl; + Cerr << " Incremental backup created backup tables for both main table and index" << Endl; + Cerr << " Index backup table verified at: " << indexBackupPath << Endl; + } + + Y_UNIT_TEST(OmitIndexesFlag) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true)); + ui64 txId = 100; + + SetupLogging(runtime); + PrepareDirs(runtime, env, txId); + + // Create incremental backup collection WITH OmitIndexes flag set + TString collectionSettings = R"( + Name: ")" DEFAULT_NAME_1 R"(" + ExplicitEntryList { + Entries { + Type: ETypeTable + Path: "/MyRoot/TableWithIndex" + } + } + Cluster: {} + IncrementalBackupConfig { + OmitIndexes: true + } + )"; + + TestCreateBackupCollection(runtime, ++txId, "/MyRoot/.backups/collections/", collectionSettings); + env.TestWaitNotification(runtime, txId); + + // Create table with global sync index + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "TableWithIndex" + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "ValueIndex" + KeyColumnNames: ["value"] + Type: EIndexTypeGlobal + } + )"); + env.TestWaitNotification(runtime, txId); + + // Execute full backup + TestBackupBackupCollection(runtime, ++txId, "/MyRoot", + R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")"); + env.TestWaitNotification(runtime, txId); + + // Verify CDC stream exists on main table + auto mainTableDesc = DescribePrivatePath(runtime, "/MyRoot/TableWithIndex", true, true); + const auto& tableDesc = mainTableDesc.GetPathDescription().GetTable(); + + bool foundMainCdc = false; + for (size_t i = 0; i < tableDesc.CdcStreamsSize(); ++i) { + if (tableDesc.GetCdcStreams(i).GetName().EndsWith("_continuousBackupImpl")) { + foundMainCdc = true; + Cerr << "Found CDC stream on main table (expected)" << Endl; + break; + } + } + UNIT_ASSERT_C(foundMainCdc, "Main table should have CDC stream even with OmitIndexes=true"); + + // Verify NO CDC stream on index (because OmitIndexes is true) + auto indexDesc = DescribePrivatePath(runtime, "/MyRoot/TableWithIndex/ValueIndex", true, true); + UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetPathDescription().ChildrenSize(), 1); + TString indexImplTableName = indexDesc.GetPathDescription().GetChildren(0).GetName(); + + auto indexImplTableDesc = DescribePrivatePath(runtime, + "/MyRoot/TableWithIndex/ValueIndex/" + indexImplTableName, true, true); + const auto& indexTableDesc = indexImplTableDesc.GetPathDescription().GetTable(); + + bool foundIndexCdc = false; + for (size_t i = 0; i < indexTableDesc.CdcStreamsSize(); ++i) { + if (indexTableDesc.GetCdcStreams(i).GetName().EndsWith("_continuousBackupImpl")) { + foundIndexCdc = true; + break; + } + } + UNIT_ASSERT_C(!foundIndexCdc, "Index should NOT have CDC stream when OmitIndexes=true"); + + Cerr << "SUCCESS: OmitIndexes flag works correctly - main table has CDC, index does not" << Endl; + } } // TBackupCollectionTests From a63a924aeb8e0daf639545b8ee25f9adcda0effb Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Fri, 31 Oct 2025 00:53:57 +0000 Subject: [PATCH 02/15] WIP --- ydb/core/tx/schemeshard/schemeshard_path_describer.cpp | 5 ----- 1 file changed, 5 deletions(-) diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp index 97adfd1afa99..73d2ae506b1d 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp @@ -1390,11 +1390,6 @@ void TSchemeShard::DescribeTable( { Y_UNUSED(typeRegistry); - LOG_NOTICE_S(TlsActivationContext->AsActorContext(), NKikimrServices::FLAT_TX_SCHEMESHARD, - "DescribeTable publishing schema version" - << " AlterVersion=" << tableInfo.AlterVersion - << " for SchemeBoard"); - entry->SetTableSchemaVersion(tableInfo.AlterVersion); FillColumns(tableInfo, *entry->MutableColumns()); FillKeyColumns(tableInfo, *entry->MutableKeyColumnNames(), *entry->MutableKeyColumnIds()); From cbad1d93b01d90057775b75b2bbc46c34b56dcf6 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Fri, 31 Oct 2025 01:22:47 +0000 Subject: [PATCH 03/15] WIP --- ydb/core/tx/datashard/datashard_impl.h | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 6fe413cde3f6..af995803df29 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1700,16 +1700,8 @@ class TDataShard } bool IsUserTable(const TTableId& tableId) const { - bool inTableInfos = TableInfos.find(tableId.PathId.LocalPathId) != TableInfos.end(); - bool isSystemTable = TSysTables::IsSystemTable(tableId); - bool result = inTableInfos && !isSystemTable; - if (!result) { - Cerr << "CDC_DEBUG: IsUserTable returning FALSE for TableId OwnerId=" << tableId.PathId.OwnerId - << " LocalPathId=" << tableId.PathId.LocalPathId - << " inTableInfos=" << inTableInfos - << " isSystemTable=" << isSystemTable << Endl; - } - return result; + return (TableInfos.find(tableId.PathId.LocalPathId) != TableInfos.end()) + && !TSysTables::IsSystemTable(tableId); } const THashMap &GetUserTables() const { return TableInfos; } From 69a689b6232be116411d5860a566abc5bb7a8984 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Fri, 31 Oct 2025 01:24:17 +0000 Subject: [PATCH 04/15] WIP --- ydb/core/tx/datashard/create_cdc_stream_unit.cpp | 8 -------- 1 file changed, 8 deletions(-) diff --git a/ydb/core/tx/datashard/create_cdc_stream_unit.cpp b/ydb/core/tx/datashard/create_cdc_stream_unit.cpp index d13fb4eeac28..2b5f02812aaa 100644 --- a/ydb/core/tx/datashard/create_cdc_stream_unit.cpp +++ b/ydb/core/tx/datashard/create_cdc_stream_unit.cpp @@ -43,17 +43,9 @@ class TCreateCdcStreamUnit : public TExecutionUnit { const auto version = params.GetTableSchemaVersion(); Y_ENSURE(version); - Cerr << "CDC_DEBUG: CreateCdcStreamUnit creating CDC stream on table PathId=" << pathId - << " (OwnerId=" << pathId.OwnerId << " LocalPathId=" << pathId.LocalPathId << ")" - << " streamPathId=" << streamPathId - << " State=" << static_cast(streamDesc.GetState()) - << " Mode=" << static_cast(streamDesc.GetMode()) << Endl; - auto tableInfo = DataShard.AlterTableAddCdcStream(ctx, txc, pathId, version, streamDesc); TDataShardLocksDb locksDb(DataShard, txc); DataShard.AddUserTable(pathId, tableInfo, &locksDb); - - Cerr << "CDC_DEBUG: Added table to TableInfos with " << tableInfo->CdcStreams.size() << " CDC streams" << Endl; if (tableInfo->NeedSchemaSnapshots()) { DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx); From d8ca10fe2e0703e27b1f9ff4109451ead68e8a77 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Fri, 31 Oct 2025 01:27:47 +0000 Subject: [PATCH 05/15] WIP --- .../datashard/change_collector_cdc_stream.cpp | 30 ++----------------- 1 file changed, 3 insertions(+), 27 deletions(-) diff --git a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp index 136a589200cc..4dbff2ca29f3 100644 --- a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp @@ -123,15 +123,8 @@ bool TCdcStreamChangeCollector::NeedToReadKeys() const { } bool value = false; - const auto& userTables = Self->GetUserTables(); - Cerr << "CDC_DEBUG: NeedToReadKeys checking " << userTables.size() << " user tables" << Endl; - for (const auto& [tableLocalPathId, tableInfo] : userTables) { - Cerr << "CDC_DEBUG: Table LocalPathId=" << tableLocalPathId - << " has " << tableInfo->CdcStreams.size() << " CDC streams" << Endl; - for (const auto& [streamPathId, streamInfo] : tableInfo->CdcStreams) { - Cerr << "CDC_DEBUG: Stream PathId=" << streamPathId - << " State=" << static_cast(streamInfo.State) - << " Mode=" << static_cast(streamInfo.Mode) << Endl; + for (const auto& [_, tableInfo] : Self->GetUserTables()) { + for (const auto& [_, streamInfo] : tableInfo->CdcStreams) { if (streamInfo.State == NKikimrSchemeOp::ECdcStreamStateDisabled) { continue; } @@ -152,29 +145,15 @@ bool TCdcStreamChangeCollector::NeedToReadKeys() const { } CachedNeedToReadKeys = value; - Cerr << "CDC_DEBUG: NeedToReadKeys returning " << value << Endl; return *CachedNeedToReadKeys; } bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop, TArrayRef key, TArrayRef updates) { - Cerr << "CDC_DEBUG: Collect called for TableId OwnerId=" << tableId.PathId.OwnerId - << " LocalPathId=" << tableId.PathId.LocalPathId - << " RowOp=" << static_cast(rop) << Endl; - - if (!Self->IsUserTable(tableId)) { - const auto& userTables = Self->GetUserTables(); - Cerr << "CDC_DEBUG: IsUserTable returned FALSE! TableId not in UserTables map." << Endl; - Cerr << "CDC_DEBUG: UserTables contains " << userTables.size() << " tables:" << Endl; - for (const auto& [localPathId, _] : userTables) { - Cerr << "CDC_DEBUG: LocalPathId=" << localPathId << Endl; - } - Y_ENSURE(false, "Unknown table: " << tableId); - } + Y_ENSURE(Self->IsUserTable(tableId), "Unknown table: " << tableId); auto userTable = Self->GetUserTables().at(tableId.PathId.LocalPathId); - Cerr << "CDC_DEBUG: Found user table with " << userTable->CdcStreams.size() << " CDC streams" << Endl; const auto& keyTags = userTable->KeyColumnIds; const auto& keyTypes = userTable->KeyColumnTypes; const auto valueTags = MakeValueTags(userTable->Columns); @@ -194,9 +173,6 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop, } for (const auto& [pathId, stream] : userTable->CdcStreams) { - Cerr << "CDC_DEBUG: Processing CDC stream PathId=" << pathId - << " State=" << static_cast(stream.State) - << " Mode=" << static_cast(stream.Mode) << Endl; TMaybe initialState; TMaybe snapshotVersion; From 39ad3c51c34c5d66cf42206bcf8b2a7c24f87d14 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Fri, 31 Oct 2025 01:43:10 +0000 Subject: [PATCH 06/15] WIP --- .../schemeshard__operation_alter_cdc_stream.cpp | 9 +++++++-- .../schemeshard__operation_create_cdc_stream.cpp | 11 +++++++---- .../schemeshard__operation_drop_cdc_stream.cpp | 9 +++++++-- 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp index ec562aea920a..92155f564df4 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp @@ -635,8 +635,13 @@ TVector CreateAlterCdcStream(TOperationId opId, const TTxTr result.push_back(DropLock(NextPartId(opId, result), outTx)); } - // Note: For ALTER CDC streams on index impl tables, we don't create an AlterTableIndex sub-operation - // The index version sync happens in the CDC stream operation handlers directly + if (workingDirPath.IsTableIndex()) { + auto outTx = TransactionTemplate(workingDirPath.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex); + outTx.MutableAlterTableIndex()->SetName(workingDirPath.LeafName()); + outTx.MutableAlterTableIndex()->SetState(NKikimrSchemeOp::EIndexState::EIndexStateReady); + + result.push_back(CreateAlterTableIndex(NextPartId(opId, result), outTx)); + } return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index a98d34b2111c..211bb3654e0d 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -984,10 +984,13 @@ TVector CreateNewCdcStream(TOperationId opId, const TTxTran DoCreateLock(result, opId, workingDirPath, tablePath); } - // Note: For CDC streams on index impl tables, we don't create an AlterTableIndex sub-operation - // The index version will be synced directly in TProposeAtTable::HandleReply to avoid - // the complexity of managing AlterData for what is essentially just a version sync - // (not an actual index schema change) + if (workingDirPath.IsTableIndex()) { + auto outTx = TransactionTemplate(workingDirPath.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex); + outTx.MutableAlterTableIndex()->SetName(workingDirPath.LeafName()); + outTx.MutableAlterTableIndex()->SetState(NKikimrSchemeOp::EIndexState::EIndexStateReady); + + result.push_back(CreateAlterTableIndex(NextPartId(opId, result), outTx)); + } Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId)); auto table = context.SS->Tables.at(tablePath.Base()->PathId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp index 865b67891336..593de4172b0a 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp @@ -583,8 +583,13 @@ void DoDropStream( result.push_back(DropLock(NextPartId(opId, result), outTx)); } - // Note: For DROP CDC streams on index impl tables, we don't create an AlterTableIndex sub-operation - // The index version sync is not needed for drops (no schema changes to track) + if (workingDirPath.IsTableIndex()) { + auto outTx = TransactionTemplate(workingDirPath.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex); + outTx.MutableAlterTableIndex()->SetName(workingDirPath.LeafName()); + outTx.MutableAlterTableIndex()->SetState(NKikimrSchemeOp::EIndexState::EIndexStateReady); + + result.push_back(CreateAlterTableIndex(NextPartId(opId, result), outTx)); + } for (const auto& streamPath : streamPaths) { auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStreamImpl); From 862b23a387c7693086b3b7553812fc41e71bddf1 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Fri, 31 Oct 2025 01:43:37 +0000 Subject: [PATCH 07/15] WIP --- .../schemeshard__operation_alter_cdc_stream.cpp | 2 +- .../schemeshard__operation_create_cdc_stream.cpp | 2 +- .../schemeshard__operation_drop_cdc_stream.cpp | 10 +++++++++- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp index 92155f564df4..29ae29f10b2a 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp @@ -635,7 +635,7 @@ TVector CreateAlterCdcStream(TOperationId opId, const TTxTr result.push_back(DropLock(NextPartId(opId, result), outTx)); } - if (workingDirPath.IsTableIndex()) { + if (workingDirPath.IsTableIndex() && !streamName.EndsWith("_continuousBackupImpl")) { auto outTx = TransactionTemplate(workingDirPath.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex); outTx.MutableAlterTableIndex()->SetName(workingDirPath.LeafName()); outTx.MutableAlterTableIndex()->SetState(NKikimrSchemeOp::EIndexState::EIndexStateReady); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index 211bb3654e0d..d839638f95eb 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -984,7 +984,7 @@ TVector CreateNewCdcStream(TOperationId opId, const TTxTran DoCreateLock(result, opId, workingDirPath, tablePath); } - if (workingDirPath.IsTableIndex()) { + if (workingDirPath.IsTableIndex() && !streamName.EndsWith("_continuousBackupImpl")) { auto outTx = TransactionTemplate(workingDirPath.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex); outTx.MutableAlterTableIndex()->SetName(workingDirPath.LeafName()); outTx.MutableAlterTableIndex()->SetState(NKikimrSchemeOp::EIndexState::EIndexStateReady); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp index 593de4172b0a..5c7474cea89b 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp @@ -583,7 +583,15 @@ void DoDropStream( result.push_back(DropLock(NextPartId(opId, result), outTx)); } - if (workingDirPath.IsTableIndex()) { + bool hasContinuousBackupStream = false; + for (const auto& streamPath : streamPaths) { + if (streamPath.Base()->Name.EndsWith("_continuousBackupImpl")) { + hasContinuousBackupStream = true; + break; + } + } + + if (workingDirPath.IsTableIndex() && !hasContinuousBackupStream) { auto outTx = TransactionTemplate(workingDirPath.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex); outTx.MutableAlterTableIndex()->SetName(workingDirPath.LeafName()); outTx.MutableAlterTableIndex()->SetState(NKikimrSchemeOp::EIndexState::EIndexStateReady); From 52c57d350350ca0b174eeba3911b02cb02add8cb Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Fri, 31 Oct 2025 01:58:51 +0000 Subject: [PATCH 08/15] WIP --- ...rd__operation_backup_backup_collection.cpp | 80 +++++++++---------- 1 file changed, 39 insertions(+), 41 deletions(-) diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp index d69f17cd38be..cfc99dd16c4c 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp @@ -66,7 +66,6 @@ TVector CreateBackupBackupCollection(TOperationId opId, con Y_ABORT_UNLESS(context.SS->BackupCollections.contains(bcPath->PathId)); const auto& bc = context.SS->BackupCollections[bcPath->PathId]; bool incrBackupEnabled = bc->Description.HasIncrementalBackupConfig(); - // Check if indexes should be omitted (only relevant when incremental backup is enabled) bool omitIndexes = incrBackupEnabled && bc->Description.GetIncrementalBackupConfig().GetOmitIndexes(); TString streamName = NBackup::ToX509String(TlsActivationContext->AsActorContext().Now()) + "_continuousBackupImpl"; @@ -160,7 +159,6 @@ TVector CreateBackupBackupCollection(TOperationId opId, con NCdc::DoCreatePqPart(result, createCdcStreamOp, opId, streamPath, streamName, table, boundaries, false); } - // Create CDC streams for indexes (only if incremental backup is enabled and indexes are not omitted) if (!omitIndexes) { for (const auto& item : bc->Description.GetExplicitEntryList().GetEntries()) { const auto tablePath = TPath::Resolve(item.GetPath(), context.SS); @@ -175,51 +173,51 @@ TVector CreateBackupBackupCollection(TOperationId opId, con } // Get index info and filter for global sync only + // We need more complex logic for vector indexes in future auto indexInfo = context.SS->Indexes.at(childPathId); if (indexInfo->Type != NKikimrSchemeOp::EIndexTypeGlobal) { continue; } - // Get index implementation table (single child of index) - auto indexPath = TPath::Init(childPathId, context.SS); - Y_ABORT_UNLESS(indexPath.Base()->GetChildren().size() == 1); - auto [implTableName, implTablePathId] = *indexPath.Base()->GetChildren().begin(); - - auto indexTablePath = indexPath.Child(implTableName); - auto indexTable = context.SS->Tables.at(implTablePathId); - - // Create CDC stream on index impl table - NKikimrSchemeOp::TCreateCdcStream createCdcStreamOp; - createCdcStreamOp.SetTableName(implTableName); // Use relative name only - auto& streamDescription = *createCdcStreamOp.MutableStreamDescription(); - streamDescription.SetName(streamName); // Same stream name as main table - streamDescription.SetMode(NKikimrSchemeOp::ECdcStreamModeUpdate); - streamDescription.SetFormat(NKikimrSchemeOp::ECdcStreamFormatProto); - - // Create CDC stream metadata (schemeshard side) - NCdc::DoCreateStreamImpl(result, createCdcStreamOp, opId, indexTablePath, false, false); - - // Create AtTable operation to notify datashard (without schema change) - { - auto outTx = TransactionTemplate(indexPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamAtTable); - auto& cdcOp = *outTx.MutableCreateCdcStream(); - cdcOp.CopyFrom(createCdcStreamOp); - result.push_back(CreateNewCdcStreamAtTable(NextPartId(opId, result), outTx, false)); - } - - // Create PQ part for index CDC stream - TVector boundaries; - const auto& partitions = indexTable->GetPartitions(); - boundaries.reserve(partitions.size() - 1); - for (ui32 i = 0; i < partitions.size(); ++i) { - const auto& partition = partitions.at(i); - if (i != partitions.size() - 1) { - boundaries.push_back(partition.EndOfRange); + // Get index implementation table (the only child of index) + auto indexPath = TPath::Init(childPathId, context.SS); + Y_ABORT_UNLESS(indexPath.Base()->GetChildren().size() == 1); + auto [implTableName, implTablePathId] = *indexPath.Base()->GetChildren().begin(); + + auto indexTablePath = indexPath.Child(implTableName); + auto indexTable = context.SS->Tables.at(implTablePathId); + + // Create CDC stream on index impl table + NKikimrSchemeOp::TCreateCdcStream createCdcStreamOp; + createCdcStreamOp.SetTableName(implTableName); + auto& streamDescription = *createCdcStreamOp.MutableStreamDescription(); + streamDescription.SetName(streamName); + streamDescription.SetMode(NKikimrSchemeOp::ECdcStreamModeUpdate); + streamDescription.SetFormat(NKikimrSchemeOp::ECdcStreamFormatProto); + + NCdc::DoCreateStreamImpl(result, createCdcStreamOp, opId, indexTablePath, false, false); + + // Create AtTable operation to notify datashard (without schema change) + { + auto outTx = TransactionTemplate(indexPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamAtTable); + auto& cdcOp = *outTx.MutableCreateCdcStream(); + cdcOp.CopyFrom(createCdcStreamOp); + result.push_back(CreateNewCdcStreamAtTable(NextPartId(opId, result), outTx, false)); } - } - - const auto streamPath = indexTablePath.Child(streamName); - NCdc::DoCreatePqPart(result, createCdcStreamOp, opId, streamPath, streamName, indexTable, boundaries, false); + + // Create PQ part for index CDC stream + TVector boundaries; + const auto& partitions = indexTable->GetPartitions(); + boundaries.reserve(partitions.size() - 1); + for (ui32 i = 0; i < partitions.size(); ++i) { + const auto& partition = partitions.at(i); + if (i != partitions.size() - 1) { + boundaries.push_back(partition.EndOfRange); + } + } + + const auto streamPath = indexTablePath.Child(streamName); + NCdc::DoCreatePqPart(result, createCdcStreamOp, opId, streamPath, streamName, indexTable, boundaries, false); } } } From a0999740f25ceeb664ccf53b5b9b102d0d4a1880 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Fri, 31 Oct 2025 02:51:26 +0000 Subject: [PATCH 09/15] WIP --- ydb/core/tx/schemeshard/schemeshard__init.cpp | 12 +++++++ ...chemeshard__operation_alter_cdc_stream.cpp | 1 + ...hemeshard__operation_common_cdc_stream.cpp | 36 +++++++++++++------ ...hemeshard__operation_create_cdc_stream.cpp | 19 ++++++++++ ...schemeshard__operation_drop_cdc_stream.cpp | 16 +++++++++ ydb/core/tx/schemeshard/schemeshard_impl.cpp | 10 ++++++ 6 files changed, 83 insertions(+), 11 deletions(-) diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 4cb2e5ff16a9..c26815a43e20 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -3673,6 +3673,18 @@ struct TSchemeShard::TTxInit : public TTransactionBase { Y_ABORT_UNLESS(deserializeRes); txState.CdcPathId = TPathId::FromProto(proto.GetTxCopyTableExtraData().GetCdcPathId()); } + } else if (txState.TxType == TTxState::TxCreateCdcStreamAtTable || + txState.TxType == TTxState::TxCreateCdcStreamAtTableWithInitialScan || + txState.TxType == TTxState::TxAlterCdcStreamAtTable || + txState.TxType == TTxState::TxAlterCdcStreamAtTableDropSnapshot || + txState.TxType == TTxState::TxDropCdcStreamAtTable || + txState.TxType == TTxState::TxDropCdcStreamAtTableDropSnapshot) { + if (!extraData.empty()) { + NKikimrSchemeOp::TGenericTxInFlyExtraData proto; + bool deserializeRes = ParseFromStringNoSizeLimit(proto, extraData); + Y_ABORT_UNLESS(deserializeRes); + txState.CdcPathId = TPathId::FromProto(proto.GetTxCopyTableExtraData().GetCdcPathId()); + } } Y_ABORT_UNLESS(txState.TxType != TTxState::TxInvalid); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp index 29ae29f10b2a..2e68d878d6ab 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp @@ -459,6 +459,7 @@ class TAlterCdcStreamAtTable: public TSubOperation { Y_ABORT_UNLESS(!context.SS->FindTx(OperationId)); auto& txState = context.SS->CreateTx(OperationId, txType, tablePath.Base()->PathId); txState.State = TTxState::ConfigureParts; + txState.CdcPathId = streamPath.Base()->PathId; // Store CDC stream PathId for later use tablePath.Base()->PathState = NKikimrSchemeOp::EPathStateAlter; tablePath.Base()->LastTxId = OperationId.GetTxId(); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common_cdc_stream.cpp index 11f3a7bae6e2..d8e4566bfebb 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common_cdc_stream.cpp @@ -124,32 +124,49 @@ bool TProposeAtTable::HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOpera Y_ABORT_UNLESS(context.SS->Tables.contains(pathId)); auto table = context.SS->Tables.at(pathId); + table->AlterVersion += 1; + NIceDb::TNiceDb db(context.GetDB()); - // Check if this is an index implementation table + bool isContinuousBackupStream = false; + if (txState->CdcPathId && context.SS->PathsById.contains(txState->CdcPathId)) { + auto cdcPath = context.SS->PathsById.at(txState->CdcPathId); + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " Checking CDC stream name" + << ", cdcPathId: " << txState->CdcPathId + << ", streamName: " << cdcPath->Name + << ", at schemeshard: " << context.SS->SelfTabletId()); + if (cdcPath->Name.EndsWith("_continuousBackupImpl")) { + isContinuousBackupStream = true; + } + } else { + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " CdcPathId not found" + << ", cdcPathId: " << txState->CdcPathId + << ", at schemeshard: " << context.SS->SelfTabletId()); + } + // Check if this is an index implementation table // If so, we need to sync the parent index version to match the impl table version + // Do this ONLY for continuous backup operations TPathId parentPathId = path->ParentPathId; - if (parentPathId && context.SS->PathsById.contains(parentPathId)) { + if (parentPathId && context.SS->PathsById.contains(parentPathId) && isContinuousBackupStream) { auto parentPath = context.SS->PathsById.at(parentPathId); if (parentPath->IsTableIndex()) { - // This is an index impl table, sync parent index version directly Y_ABORT_UNLESS(context.SS->Indexes.contains(parentPathId)); auto index = context.SS->Indexes.at(parentPathId); - // Set index version to match impl table version (which will be incremented below) - index->AlterVersion = table->AlterVersion + 1; + index->AlterVersion = table->AlterVersion; // Persist the index version update directly to database db.Table().Key(parentPathId.LocalPathId).Update( NIceDb::TUpdate(index->AlterVersion) ); - // Clear caches and publish for the index context.SS->ClearDescribePathCaches(parentPath); context.OnComplete.PublishToSchemeBoard(OperationId, parentPathId); - - LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, DebugHint() << " Synced parent index version with impl table" << ", indexPathId: " << parentPathId << ", indexName: " << parentPath->Name @@ -158,9 +175,6 @@ bool TProposeAtTable::HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOpera } } - // Increment and persist the table's AlterVersion - // This happens AFTER DataShards have been notified and are waiting for the plan - table->AlterVersion += 1; context.SS->PersistTableAlterVersion(db, pathId, table); context.SS->ClearDescribePathCaches(path); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index d839638f95eb..2ade3854dede 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -322,6 +322,13 @@ class TNewCdcStream: public TSubOperation { Y_ABORT_UNLESS(!context.SS->FindTx(OperationId)); auto& txState = context.SS->CreateTx(OperationId, TTxState::TxCreateCdcStream, streamPath.Base()->PathId); + txState.CdcPathId = streamPath.Base()->PathId; + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "DoNewStream: Set CdcPathId" + << ", operationId: " << OperationId + << ", cdcPathId: " << streamPath.Base()->PathId + << ", streamName: " << streamPath.Base()->Name + << ", at schemeshard: " << context.SS->SelfTabletId()); txState.State = TTxState::Propose; streamPath.Base()->PathState = NKikimrSchemeOp::EPathStateCreate; @@ -582,6 +589,18 @@ class TNewCdcStreamAtTable: public TSubOperation { Y_ABORT_UNLESS(!context.SS->FindTx(OperationId)); auto& txState = context.SS->CreateTx(OperationId, txType, tablePath.Base()->PathId); txState.State = TTxState::ConfigureParts; + + // Set CdcPathId for continuous backup detection + auto streamPath = tablePath.Child(streamName); + if (streamPath.IsResolved()) { + txState.CdcPathId = streamPath.Base()->PathId; + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TNewCdcStreamAtTable: Set CdcPathId" + << ", operationId: " << OperationId + << ", cdcPathId: " << streamPath.Base()->PathId + << ", streamName: " << streamName + << ", at schemeshard: " << context.SS->SelfTabletId()); + } tablePath.Base()->PathState = NKikimrSchemeOp::EPathStateAlter; tablePath.Base()->LastTxId = OperationId.GetTxId(); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp index 5c7474cea89b..e445f7d154cd 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp @@ -191,6 +191,7 @@ class TDropCdcStream: public TSubOperation { Y_ABORT_UNLESS(!context.SS->FindTx(OperationId)); auto& txState = context.SS->CreateTx(OperationId, TTxState::TxDropCdcStream, streamPath.Base()->PathId); + txState.CdcPathId = streamPath.Base()->PathId; txState.State = TTxState::Propose; txState.MinStep = TStepId(1); @@ -453,6 +454,21 @@ class TDropCdcStreamAtTable: public TSubOperation { Y_ABORT_UNLESS(!context.SS->FindTx(OperationId)); auto& txState = context.SS->CreateTx(OperationId, txType, tablePath.Base()->PathId); txState.State = TTxState::ConfigureParts; + + // Set CdcPathId for continuous backup detection + // If dropping multiple streams, check if any is continuous backup + for (const auto& streamPath : streamPaths) { + if (streamPath.Base()->Name.EndsWith("_continuousBackupImpl")) { + txState.CdcPathId = streamPath.Base()->PathId; + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TDropCdcStreamAtTable: Set CdcPathId for continuous backup" + << ", operationId: " << OperationId + << ", cdcPathId: " << streamPath.Base()->PathId + << ", streamName: " << streamPath.Base()->Name + << ", at schemeshard: " << context.SS->SelfTabletId()); + break; + } + } tablePath.Base()->PathState = NKikimrSchemeOp::EPathStateAlter; tablePath.Base()->LastTxId = OperationId.GetTxId(); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 4d2a59663922..f40c6282fa6e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -2643,6 +2643,16 @@ void TSchemeShard::PersistTxState(NIceDb::TNiceDb& db, const TOperationId opId) txState.CdcPathId.ToProto(proto.MutableTxCopyTableExtraData()->MutableCdcPathId()); bool serializeRes = proto.SerializeToString(&extraData); Y_ABORT_UNLESS(serializeRes); + } else if (txState.TxType == TTxState::TxCreateCdcStreamAtTable || + txState.TxType == TTxState::TxCreateCdcStreamAtTableWithInitialScan || + txState.TxType == TTxState::TxAlterCdcStreamAtTable || + txState.TxType == TTxState::TxAlterCdcStreamAtTableDropSnapshot || + txState.TxType == TTxState::TxDropCdcStreamAtTable || + txState.TxType == TTxState::TxDropCdcStreamAtTableDropSnapshot) { + NKikimrSchemeOp::TGenericTxInFlyExtraData proto; + txState.CdcPathId.ToProto(proto.MutableTxCopyTableExtraData()->MutableCdcPathId()); + bool serializeRes = proto.SerializeToString(&extraData); + Y_ABORT_UNLESS(serializeRes); } db.Table().Key(opId.GetTxId(), opId.GetSubTxId()).Update( From ce5da8f9eca4ffb211b58cc5e1a71f015d30f430 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Fri, 31 Oct 2025 02:55:05 +0000 Subject: [PATCH 10/15] WIP --- .../schemeshard__operation_drop_cdc_stream.cpp | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp index e445f7d154cd..11b82835a7ba 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp @@ -454,21 +454,6 @@ class TDropCdcStreamAtTable: public TSubOperation { Y_ABORT_UNLESS(!context.SS->FindTx(OperationId)); auto& txState = context.SS->CreateTx(OperationId, txType, tablePath.Base()->PathId); txState.State = TTxState::ConfigureParts; - - // Set CdcPathId for continuous backup detection - // If dropping multiple streams, check if any is continuous backup - for (const auto& streamPath : streamPaths) { - if (streamPath.Base()->Name.EndsWith("_continuousBackupImpl")) { - txState.CdcPathId = streamPath.Base()->PathId; - LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TDropCdcStreamAtTable: Set CdcPathId for continuous backup" - << ", operationId: " << OperationId - << ", cdcPathId: " << streamPath.Base()->PathId - << ", streamName: " << streamPath.Base()->Name - << ", at schemeshard: " << context.SS->SelfTabletId()); - break; - } - } tablePath.Base()->PathState = NKikimrSchemeOp::EPathStateAlter; tablePath.Base()->LastTxId = OperationId.GetTxId(); From 43cd17c8a58e28cd156b58873e20cf204c35cf0d Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Fri, 31 Oct 2025 06:53:05 +0000 Subject: [PATCH 11/15] fix --- .../schemeshard__operation_backup_backup_collection.cpp | 4 ---- 1 file changed, 4 deletions(-) diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp index 556e7408a04c..c49d615221a5 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp @@ -69,10 +69,6 @@ TVector CreateBackupBackupCollection(TOperationId opId, con bool omitIndexes = incrBackupEnabled && bc->Description.GetIncrementalBackupConfig().GetOmitIndexes(); TString streamName = NBackup::ToX509String(TlsActivationContext->AsActorContext().Now()) + "_continuousBackupImpl"; - bool omitIndexes = bc->Description.HasOmitIndexes() - ? bc->Description.GetOmitIndexes() - : false; - for (const auto& item : bc->Description.GetExplicitEntryList().GetEntries()) { auto& desc = *copyTables.Add(); desc.SetSrcPath(item.GetPath()); From 96af589a3c3d7740081955df0fe84da0a7b35651 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Sat, 1 Nov 2025 02:48:09 +0000 Subject: [PATCH 12/15] WIP --- .../datashard_ut_incremental_backup.cpp | 235 ++++-------------- ...rd__operation_backup_backup_collection.cpp | 5 +- 2 files changed, 50 insertions(+), 190 deletions(-) diff --git a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp index 42074e0d83aa..51ad1ecf66df 100644 --- a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp +++ b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp @@ -39,6 +39,11 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { using TCdcStream = TShardedTableOptions::TCdcStream; + struct CdcOperationCounts { + int Deletes = 0; + int Inserts = 0; + }; + static NKikimrPQ::TPQConfig DefaultPQConfig() { NKikimrPQ::TPQConfig pqConfig; pqConfig.SetEnabled(true); @@ -268,6 +273,42 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { return result; } + CdcOperationCounts CountCdcOperations(const TString& backup) { + CdcOperationCounts counts; + size_t pos = 0; + + while ((pos = backup.find("bytes_value: \"", pos)) != TString::npos) { + pos += 14; + size_t endPos = backup.find("\"", pos); + if (endPos == TString::npos) break; + + TString metadataStr = backup.substr(pos, endPos - pos); + TString unescaped; + for (size_t i = 0; i < metadataStr.size(); ++i) { + if (metadataStr[i] == '\\' && i + 3 < metadataStr.size()) { + ui8 val = ((metadataStr[i+1] - '0') << 6) | + ((metadataStr[i+2] - '0') << 3) | + (metadataStr[i+3] - '0'); + unescaped += static_cast(val); + i += 3; + } else { + unescaped += metadataStr[i]; + } + } + + auto metadata = ParseCdcMetadata(unescaped); + if (metadata.IsDelete) { + counts.Deletes++; + } else { + counts.Inserts++; + } + + pos = endPos + 1; + } + + return counts; + } + NKikimrChangeExchange::TChangeRecord MakeUpsertPartial(ui32 key, ui32 value, const TVector& tags = {2}) { auto keyCell = TCell::Make(key); auto valueCell = TCell::Make(value); @@ -2593,7 +2634,6 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { SetupLogging(runtime); InitRoot(server, edgeActor); - // Create table with one global sync index TShardedTableOptions opts; opts.Columns({ {"key", "Uint32", true, false}, @@ -2604,7 +2644,6 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { }); CreateShardedTable(server, edgeActor, "/Root", "Table", opts); - // Create backup collection with incremental enabled ExecSQL(server, edgeActor, R"( CREATE BACKUP COLLECTION `MyCollection` ( TABLE `/Root/Table` @@ -2615,21 +2654,9 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { ); )", false); - // Perform full backup (creates CDC streams on main table and index tables) ExecSQL(server, edgeActor, R"(BACKUP `MyCollection`;)", false); - - // Wait for CDC streams to be fully activated on all tables (including index tables) SimulateSleep(server, TDuration::Seconds(1)); - // Debug: Verify CDC streams are created on both main table and index table - Cerr << "CDC_DEBUG: Checking CDC streams after full backup..." << Endl; - auto mainTableCdcCheck = Navigate(runtime, edgeActor, "/Root/Table"); - Cerr << "CDC_DEBUG: Main table Navigate result Status=" << mainTableCdcCheck->ResultSet.size() << " entries" << Endl; - - auto indexTableCdcCheck = Navigate(runtime, edgeActor, "/Root/Table/ByValue/indexImplTable"); - Cerr << "CDC_DEBUG: Index impl table Navigate result Status=" << indexTableCdcCheck->ResultSet.size() << " entries" << Endl; - - // Insert initial data ExecSQL(server, edgeActor, R"( UPSERT INTO `/Root/Table` (key, value) VALUES (1, 100) @@ -2638,53 +2665,31 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { ; )"); - // Update row: (2, 200) -> (2, 250) ExecSQL(server, edgeActor, R"( UPSERT INTO `/Root/Table` (key, value) VALUES (2, 250); )"); - // Delete row: (3, 300) ExecSQL(server, edgeActor, R"(DELETE FROM `/Root/Table` WHERE key=3;)"); - // Insert new row: (4, 400) ExecSQL(server, edgeActor, R"( UPSERT INTO `/Root/Table` (key, value) VALUES (4, 400); )"); - // Debug: Verify index table has data before backup - auto indexDataBeforeBackup = KqpSimpleExec(runtime, R"( - SELECT value, key FROM `/Root/Table/ByValue/indexImplTable` - ORDER BY value - )"); - Cerr << "CDC_DEBUG: Index table data BEFORE incremental backup: " << indexDataBeforeBackup << Endl; - - // Debug: Check which tables are in datashard TableInfos - Cerr << "CDC_DEBUG: About to wait for CDC to capture changes..." << Endl; - - // Wait for CDC streams to capture all changes (including on index tables) SimulateSleep(server, TDuration::Seconds(1)); - // Take incremental backup ExecSQL(server, edgeActor, R"(BACKUP `MyCollection` INCREMENTAL;)", false); - // Wait for backup operation to complete and CDC offload (including index tables) SimulateSleep(server, TDuration::Seconds(10)); - // Find the incremental backup directory (timestamp-dependent) TString incrBackupDir = FindIncrementalBackupDir(runtime, edgeActor, "/Root/.backups/collections/MyCollection"); UNIT_ASSERT_C(!incrBackupDir.empty(), "Could not find incremental backup directory"); - Cerr << "Found incremental backup directory: " << incrBackupDir << Endl; - // Verify main table backup contains correct data TString mainTablePath = TStringBuilder() << "/Root/.backups/collections/MyCollection/" << incrBackupDir << "/Table"; auto mainTableBackup = KqpSimpleExec(runtime, TStringBuilder() << R"( SELECT key, value FROM `)" << mainTablePath << R"(` ORDER BY key )"); - // Should contain: (1, NULL) - not changed but tombstone or no entry, (2, 250), (3, NULL), (4, 400) - // Actually, incremental backup only contains changes, so: (2, 250), (3, NULL - tombstone), (4, 400) - Cerr << "Main table backup: " << mainTableBackup << Endl; UNIT_ASSERT_C(mainTableBackup.find("uint32_value: 2") != TString::npos, "Main table backup should contain updated key 2"); UNIT_ASSERT_C(mainTableBackup.find("uint32_value: 250") != TString::npos, @@ -2696,20 +2701,12 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { UNIT_ASSERT_C(mainTableBackup.find("uint32_value: 400") != TString::npos, "Main table backup should contain new value 400"); - // Verify index backup table exists and contains correct data TString indexBackupPath = TStringBuilder() << "/Root/.backups/collections/MyCollection/" << incrBackupDir << "/__ydb_backup_meta/indexes/Table/ByValue"; auto indexBackup = KqpSimpleExec(runtime, TStringBuilder() << R"( SELECT * FROM `)" << indexBackupPath << R"(` ORDER BY value )"); - Cerr << "CDC_DEBUG: Index backup result: " << indexBackup << Endl; - - // Index should contain changes: - // - (200, 2) - old value deleted due to update - // - (250, 2) - new value from update - // - (300, 3) - tombstone for deleted row - // - (400, 4) - new insert UNIT_ASSERT_C(indexBackup.find("uint32_value: 200") != TString::npos, "Index backup should contain old value 200 (deleted)"); UNIT_ASSERT_C(indexBackup.find("uint32_value: 250") != TString::npos, @@ -2737,7 +2734,6 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { SetupLogging(runtime); InitRoot(server, edgeActor); - // Create table with covering index: ByAge on age, covering name TShardedTableOptions opts; opts.Columns({ {"key", "Uint32", true, false}, @@ -2750,7 +2746,6 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { }); CreateShardedTable(server, edgeActor, "/Root", "Table", opts); - // Create backup collection with incremental enabled ExecSQL(server, edgeActor, R"( CREATE BACKUP COLLECTION `MyCollection` ( TABLE `/Root/Table` @@ -2761,18 +2756,9 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { ); )", false); - // Perform full backup (creates CDC streams on main table and index tables) ExecSQL(server, edgeActor, R"(BACKUP `MyCollection`;)", false); - - // Wait for CDC streams to be fully activated on all tables (including index tables) SimulateSleep(server, TDuration::Seconds(1)); - // Debug: Capture index implementation table state after full backup (should be empty) - auto indexImplTableInitial = KqpSimpleExec(runtime, R"( - SELECT * FROM `/Root/Table/ByAge/indexImplTable` - )"); - - // Insert initial data ExecSQL(server, edgeActor, R"( UPSERT INTO `/Root/Table` (key, name, age, salary) VALUES (1, 'Alice', 30u, 5000u) @@ -2780,79 +2766,34 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { ; )"); - // Debug: Capture index implementation table after initial insert - auto indexImplTableAfterInsert = KqpSimpleExec(runtime, R"( - SELECT * FROM `/Root/Table/ByAge/indexImplTable` - )"); - - // Update covered column: name changes (should appear in index) ExecSQL(server, edgeActor, R"( UPSERT INTO `/Root/Table` (key, name, age, salary) VALUES (1, 'Alice2', 30u, 5000u); )"); - // Update non-covered column: salary changes (should NOT appear in index backup as a separate change) ExecSQL(server, edgeActor, R"( UPSERT INTO `/Root/Table` (key, name, age, salary) VALUES (1, 'Alice2', 30u, 6000u); )"); - // Update indexed column: age changes (creates tombstone for old + new entry) ExecSQL(server, edgeActor, R"( UPSERT INTO `/Root/Table` (key, name, age, salary) VALUES (2, 'Bob', 26u, 4000u); )"); - // Delete row ExecSQL(server, edgeActor, R"(DELETE FROM `/Root/Table` WHERE key=2;)"); - // Wait for CDC streams to capture all changes (including on index tables) SimulateSleep(server, TDuration::Seconds(1)); - // Take incremental backup ExecSQL(server, edgeActor, R"(BACKUP `MyCollection` INCREMENTAL;)", false); - // Wait for backup operation to complete and CDC offload (including index tables) SimulateSleep(server, TDuration::Seconds(10)); - // Find the incremental backup directory (timestamp-dependent) TString incrBackupDir = FindIncrementalBackupDir(runtime, edgeActor, "/Root/.backups/collections/MyCollection"); UNIT_ASSERT_C(!incrBackupDir.empty(), "Could not find incremental backup directory"); - Cerr << "Found incremental backup directory: " << incrBackupDir << Endl; - - // Debug: Check main table backup first - TString mainTablePath = TStringBuilder() << "/Root/.backups/collections/MyCollection/" << incrBackupDir << "/Table"; - auto mainBackup = KqpSimpleExec(runtime, TStringBuilder() << R"( - SELECT * FROM `)" << mainTablePath << R"(` - )"); - Cerr << "Main table backup: " << mainBackup << Endl; - // Verify index backup table exists and contains covered column data TString indexBackupPath = TStringBuilder() << "/Root/.backups/collections/MyCollection/" << incrBackupDir << "/__ydb_backup_meta/indexes/Table/ByAge"; auto indexBackup = KqpSimpleExec(runtime, TStringBuilder() << R"( SELECT * FROM `)" << indexBackupPath << R"(` )"); - // Debug: Check actual index implementation table - auto indexImplTableFinal = KqpSimpleExec(runtime, R"( - SELECT * FROM `/Root/Table/ByAge/indexImplTable` - )"); - - Cerr << "=== DEBUG: Index Tables State ===" << Endl; - Cerr << "1. After full backup (should be empty): " << indexImplTableInitial << Endl; - Cerr << "2. After initial insert (Alice, Bob): " << indexImplTableAfterInsert << Endl; - Cerr << "3. Final state (physical table): " << indexImplTableFinal << Endl; - Cerr << "4. Incremental backup (CDC captured): " << indexBackup << Endl; - Cerr << "=================================" << Endl; - - // Incremental backup should contain (with CDC compaction): - // - INSERT for (30, 1) with "Alice2" (final state after name update) - // - Tombstone (DELETE) for (25, 2) - Bob's old age entry, compacted with initial INSERT - // - Tombstone (DELETE) for (26, 2) - Bob's new age entry, compacted with INSERT from age change - // - // Note: Bob's name won't appear because: - // - Initial INSERT (25, 2, "Bob") was compacted with DELETE (25, 2) → just tombstone - // - Age change INSERT (26, 2, "Bob") was compacted with DELETE (26, 2) → just tombstone - // - // The salary change should not create a separate index entry since salary is not indexed or covered - UNIT_ASSERT_C(indexBackup.find("uint32_value: 30") != TString::npos, "Index backup should contain age 30"); UNIT_ASSERT_C(indexBackup.find("Alice") != TString::npos, @@ -2861,53 +2802,14 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { "Index backup should contain tombstone for age 25"); UNIT_ASSERT_C(indexBackup.find("uint32_value: 26") != TString::npos, "Index backup should contain tombstone for age 26"); - - // Verify tombstones have NULL for covering column (correct behavior for DELETEs) - // and INSERT has the actual covering column value UNIT_ASSERT_C(indexBackup.find("null_flag_value: NULL_VALUE") != TString::npos, "Index backup tombstones should have NULL for covering columns"); - // Parse and verify CDC metadata - // The backup contains 3 records, let's verify their metadata - size_t pos = 0; - int deleteCount = 0; - int insertCount = 0; - - while ((pos = indexBackup.find("bytes_value: \"", pos)) != TString::npos) { - pos += 14; // Skip 'bytes_value: "' - size_t endPos = indexBackup.find("\"", pos); - if (endPos == TString::npos) break; - - TString metadataStr = indexBackup.substr(pos, endPos - pos); - // Unescape the string - TString unescaped; - for (size_t i = 0; i < metadataStr.size(); ++i) { - if (metadataStr[i] == '\\' && i + 3 < metadataStr.size()) { - // Octal escape \nnn - ui8 val = ((metadataStr[i+1] - '0') << 6) | - ((metadataStr[i+2] - '0') << 3) | - (metadataStr[i+3] - '0'); - unescaped += static_cast(val); - i += 3; - } else { - unescaped += metadataStr[i]; - } - } - - auto metadata = ParseCdcMetadata(unescaped); - if (metadata.IsDelete) { - deleteCount++; - } else { - insertCount++; - } - - pos = endPos + 1; - } + auto counts = CountCdcOperations(indexBackup); + Cerr << "CDC metadata: " << counts.Deletes << " DELETEs, " << counts.Inserts << " INSERTs" << Endl; - Cerr << "CDC metadata: " << deleteCount << " DELETEs, " << insertCount << " INSERTs" << Endl; - - UNIT_ASSERT_EQUAL_C(deleteCount, 2, "Should have 2 DELETE operations (tombstones for age 25 and 26)"); - UNIT_ASSERT_EQUAL_C(insertCount, 1, "Should have 1 INSERT operation (for Alice2)"); + UNIT_ASSERT_EQUAL_C(counts.Deletes, 2, "Should have 2 DELETE operations (tombstones for age 25 and 26)"); + UNIT_ASSERT_EQUAL_C(counts.Inserts, 1, "Should have 1 INSERT operation (for Alice2)"); } Y_UNIT_TEST(IncrementalBackupMultipleIndexes) { @@ -2927,7 +2829,6 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { SetupLogging(runtime); InitRoot(server, edgeActor); - // Create table with 3 indexes: simple, covering, composite TShardedTableOptions opts; opts.Columns({ {"key", "Uint32", true, false}, @@ -2943,7 +2844,6 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { }); CreateShardedTable(server, edgeActor, "/Root", "Table", opts); - // Create backup collection with incremental enabled ExecSQL(server, edgeActor, R"( CREATE BACKUP COLLECTION `MyCollection` ( TABLE `/Root/Table` @@ -2954,17 +2854,9 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { ); )", false); - // Perform full backup (creates CDC streams on main table and index tables) ExecSQL(server, edgeActor, R"(BACKUP `MyCollection`;)", false); - - // Wait for SchemeBoard updates to propagate to KQP metadata cache. - // CDC stream creation increments table schema versions (v1 -> v2), which is published - // to SchemeBoard asynchronously. KQP needs time to refresh its metadata cache before - // we can query the tables (main + indexes). Without this wait, KQP will fail with - // "schema version mismatch during metadata loading" error. SimulateSleep(server, TDuration::Seconds(5)); - // Insert initial data ExecSQL(server, edgeActor, R"( UPSERT INTO `/Root/Table` (key, name, age, city, salary) VALUES (1, 'Alice', 30u, 'NYC', 5000u) @@ -2972,51 +2864,33 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { ; )"); - // Update name: affects ByName and ByCity ExecSQL(server, edgeActor, R"( UPSERT INTO `/Root/Table` (key, name, age, city, salary) VALUES (1, 'Alice2', 30u, 'NYC', 5000u); )"); - // Update age: affects ByAge ExecSQL(server, edgeActor, R"( UPSERT INTO `/Root/Table` (key, name, age, city, salary) VALUES (2, 'Bob', 26u, 'LA', 4000u); )"); - // Delete: affects all indexes ExecSQL(server, edgeActor, R"(DELETE FROM `/Root/Table` WHERE key=1;)"); - // Insert new: affects all indexes ExecSQL(server, edgeActor, R"( UPSERT INTO `/Root/Table` (key, name, age, city, salary) VALUES (3, 'Carol', 28u, 'SF', 5500u); )"); - // Debug: Verify index tables have data before backup - auto byNameIndexData = KqpSimpleExec(runtime, R"( - SELECT name, key FROM `/Root/Table/ByName/indexImplTable` - ORDER BY name - )"); - Cerr << "ByName index table data BEFORE incremental backup: " << byNameIndexData << Endl; - - // Wait for CDC streams to capture all changes (including on index tables) SimulateSleep(server, TDuration::Seconds(1)); - // Take incremental backup ExecSQL(server, edgeActor, R"(BACKUP `MyCollection` INCREMENTAL;)", false); - // Wait for backup operation to complete and CDC offload (including index tables) SimulateSleep(server, TDuration::Seconds(10)); - // Find the incremental backup directory (timestamp-dependent) TString incrBackupDir = FindIncrementalBackupDir(runtime, edgeActor, "/Root/.backups/collections/MyCollection"); UNIT_ASSERT_C(!incrBackupDir.empty(), "Could not find incremental backup directory"); - Cerr << "Found incremental backup directory: " << incrBackupDir << Endl; - // Verify ByName index backup TString byNamePath = TStringBuilder() << "/Root/.backups/collections/MyCollection/" << incrBackupDir << "/__ydb_backup_meta/indexes/Table/ByName"; auto byNameBackup = KqpSimpleExec(runtime, TStringBuilder() << R"( SELECT * FROM `)" << byNamePath << R"(` )"); - Cerr << "ByName index backup: " << byNameBackup << Endl; UNIT_ASSERT_C(byNameBackup.find("Alice") != TString::npos, "ByName backup should contain Alice (deleted)"); UNIT_ASSERT_C(byNameBackup.find("Alice2") != TString::npos, @@ -3024,12 +2898,10 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { UNIT_ASSERT_C(byNameBackup.find("Carol") != TString::npos, "ByName backup should contain Carol (new)"); - // Verify ByAge index backup (covering index with salary) TString byAgePath = TStringBuilder() << "/Root/.backups/collections/MyCollection/" << incrBackupDir << "/__ydb_backup_meta/indexes/Table/ByAge"; auto byAgeBackup = KqpSimpleExec(runtime, TStringBuilder() << R"( SELECT * FROM `)" << byAgePath << R"(` )"); - Cerr << "ByAge index backup: " << byAgeBackup << Endl; UNIT_ASSERT_C(byAgeBackup.find("uint32_value: 30") != TString::npos, "ByAge backup should contain age 30 (deleted)"); UNIT_ASSERT_C(byAgeBackup.find("uint32_value: 25") != TString::npos || @@ -3040,12 +2912,10 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { UNIT_ASSERT_C(byAgeBackup.find("uint32_value: 5500") != TString::npos, "ByAge backup should contain covered salary 5500"); - // Verify ByCity index backup (composite key) TString byCityPath = TStringBuilder() << "/Root/.backups/collections/MyCollection/" << incrBackupDir << "/__ydb_backup_meta/indexes/Table/ByCity"; auto byCityBackup = KqpSimpleExec(runtime, TStringBuilder() << R"( SELECT * FROM `)" << byCityPath << R"(` )"); - Cerr << "ByCity index backup: " << byCityBackup << Endl; UNIT_ASSERT_C(byCityBackup.find("NYC") != TString::npos, "ByCity backup should contain NYC"); UNIT_ASSERT_C(byCityBackup.find("Alice") != TString::npos, @@ -3075,7 +2945,6 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { SetupLogging(runtime); InitRoot(server, edgeActor); - // Create table with one global sync index TShardedTableOptions opts; opts.Columns({ {"key", "Uint32", true, false}, @@ -3086,7 +2955,6 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { }); CreateShardedTable(server, edgeActor, "/Root", "Table", opts); - // Create backup collection with OmitIndexes: true ExecSQL(server, edgeActor, R"( CREATE BACKUP COLLECTION `MyCollection` ( TABLE `/Root/Table` @@ -3098,10 +2966,8 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { ); )", false); - // Perform full backup ExecSQL(server, edgeActor, R"(BACKUP `MyCollection`;)", false); - // Insert and modify data ExecSQL(server, edgeActor, R"( UPSERT INTO `/Root/Table` (key, value) VALUES (1, 100) @@ -3116,43 +2982,34 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { ExecSQL(server, edgeActor, R"(DELETE FROM `/Root/Table` WHERE key=3;)"); - // Take incremental backup ExecSQL(server, edgeActor, R"(BACKUP `MyCollection` INCREMENTAL;)", false); SimulateSleep(server, TDuration::Seconds(5)); - // Verify main table backup exists auto mainTableBackup = KqpSimpleExec(runtime, R"( SELECT key, value FROM `/Root/.backups/collections/MyCollection/19700101000002Z_incremental/Table` ORDER BY key )"); - Cerr << "Main table backup (with OmitIndexes): " << mainTableBackup << Endl; UNIT_ASSERT_C(mainTableBackup.find("uint32_value: 2") != TString::npos, "Main table backup should exist with OmitIndexes flag"); UNIT_ASSERT_C(mainTableBackup.find("uint32_value: 250") != TString::npos, "Main table backup should contain updated value"); - // Verify index backup directory does NOT exist - // Attempt to query index backup table - should fail or return empty bool indexBackupExists = true; try { auto indexBackup = KqpSimpleExec(runtime, R"( SELECT * FROM `/Root/.backups/collections/MyCollection/19700101000002Z_incremental/__ydb_backup_meta/indexes/Table/ByValue` )"); - // If we got here without exception, check if it's empty or doesn't exist if (indexBackup.empty() || indexBackup.find("ERROR") != TString::npos || indexBackup.find("not found") != TString::npos || indexBackup.find("doesn't exist") != TString::npos) { indexBackupExists = false; } - Cerr << "Index backup query result (should not exist): " << indexBackup << Endl; } catch (...) { - // Expected - index backup table should not exist indexBackupExists = false; } - UNIT_ASSERT_C(!indexBackupExists, - "Index backup should NOT exist when OmitIndexes flag is set"); + UNIT_ASSERT_C(!indexBackupExists, "Index backup should NOT exist when OmitIndexes flag is set"); } } // Y_UNIT_TEST_SUITE(IncrementalBackup) diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp index c49d615221a5..c398292699df 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp @@ -66,7 +66,10 @@ TVector CreateBackupBackupCollection(TOperationId opId, con Y_ABORT_UNLESS(context.SS->BackupCollections.contains(bcPath->PathId)); const auto& bc = context.SS->BackupCollections[bcPath->PathId]; bool incrBackupEnabled = bc->Description.HasIncrementalBackupConfig(); - bool omitIndexes = incrBackupEnabled && bc->Description.GetIncrementalBackupConfig().GetOmitIndexes(); + + bool omitIndexes = bc->Description.GetOmitIndexes() || + (incrBackupEnabled && bc->Description.GetIncrementalBackupConfig().GetOmitIndexes()); + TString streamName = NBackup::ToX509String(TlsActivationContext->AsActorContext().Now()) + "_continuousBackupImpl"; for (const auto& item : bc->Description.GetExplicitEntryList().GetEntries()) { From 1bb9f92d830777633db379d8844d0a5d44ec988f Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Sat, 1 Nov 2025 08:13:59 +0000 Subject: [PATCH 13/15] WIP --- .../schemeshard/schemeshard__backup_collection_common.cpp | 7 ++++++- ydb/core/tx/schemeshard/ut_base/ut_base.cpp | 5 ++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/ydb/core/tx/schemeshard/schemeshard__backup_collection_common.cpp b/ydb/core/tx/schemeshard/schemeshard__backup_collection_common.cpp index 3f6f01a4e9b7..4133cf277783 100644 --- a/ydb/core/tx/schemeshard/schemeshard__backup_collection_common.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__backup_collection_common.cpp @@ -132,7 +132,12 @@ std::optional>> GetBackupRequiredPaths( // Add index backup metadata directories if incremental backup is enabled bool incrBackupEnabled = bc->Description.HasIncrementalBackupConfig(); - bool omitIndexes = incrBackupEnabled && bc->Description.GetIncrementalBackupConfig().GetOmitIndexes(); + + // Check OmitIndexes from two possible locations: + // 1. Top-level OmitIndexes field (for full backups) + // 2. IncrementalBackupConfig.OmitIndexes (for incremental backups) + bool omitIndexes = bc->Description.GetOmitIndexes() || + (incrBackupEnabled && bc->Description.GetIncrementalBackupConfig().GetOmitIndexes()); if (incrBackupEnabled && !omitIndexes) { for (const auto& item : bc->Description.GetExplicitEntryList().GetEntries()) { diff --git a/ydb/core/tx/schemeshard/ut_base/ut_base.cpp b/ydb/core/tx/schemeshard/ut_base/ut_base.cpp index 6fb1434ae528..a0eda192a306 100644 --- a/ydb/core/tx/schemeshard/ut_base/ut_base.cpp +++ b/ydb/core/tx/schemeshard/ut_base/ut_base.cpp @@ -11924,9 +11924,12 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { auto backupDirName = descr.GetChildren(0).GetName().c_str(); + // When WithIncremental=true, __ydb_backup_meta directory is created for index backups + // So we expect 3 children: Table1, DirA, and __ydb_backup_meta + // When WithIncremental=false, we expect 2 children: Table1 and DirA TestDescribeResult(DescribePath(runtime, Sprintf("/MyRoot/.backups/collections/MyCollection1/%s", backupDirName)), { NLs::PathExist, - NLs::ChildrenCount(2), + NLs::ChildrenCount(WithIncremental ? 3 : 2), NLs::Finished, }); From 91aa06e47261e68d302c37f4fab09afd0c375532 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Sat, 1 Nov 2025 11:05:43 +0000 Subject: [PATCH 14/15] WIP --- ydb/core/tx/schemeshard/ut_base/ut_base.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ydb/core/tx/schemeshard/ut_base/ut_base.cpp b/ydb/core/tx/schemeshard/ut_base/ut_base.cpp index a0eda192a306..11aad2f7cf3b 100644 --- a/ydb/core/tx/schemeshard/ut_base/ut_base.cpp +++ b/ydb/core/tx/schemeshard/ut_base/ut_base.cpp @@ -11993,9 +11993,10 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { } } + // Incremental backup directory contains Table1, DirA, and __ydb_backup_meta TestDescribeResult(DescribePath(runtime, Sprintf("/MyRoot/.backups/collections/MyCollection1/%s", incrBackupDirName)), { NLs::PathExist, - NLs::ChildrenCount(2), + NLs::ChildrenCount(3), NLs::Finished, }); From 4722e043ef4d92970fd81af1b0505a734e8c8a87 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Wed, 5 Nov 2025 11:33:12 +0000 Subject: [PATCH 15/15] address PR issues --- .../tx/schemeshard/schemeshard__backup_collection_common.cpp | 5 +++++ .../schemeshard__operation_backup_backup_collection.cpp | 5 +++++ ...shard__operation_backup_incremental_backup_collection.cpp | 5 +++++ 3 files changed, 15 insertions(+) diff --git a/ydb/core/tx/schemeshard/schemeshard__backup_collection_common.cpp b/ydb/core/tx/schemeshard/schemeshard__backup_collection_common.cpp index 4133cf277783..a44e5e18897c 100644 --- a/ydb/core/tx/schemeshard/schemeshard__backup_collection_common.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__backup_collection_common.cpp @@ -164,6 +164,11 @@ std::optional>> GetBackupRequiredPaths( if (childPath->PathType != NKikimrSchemeOp::EPathTypeTableIndex) { continue; } + + // Skip deleted indexes + if (childPath->Dropped()) { + continue; + } auto indexInfo = context.SS->Indexes.at(childPathId); if (indexInfo->Type != NKikimrSchemeOp::EIndexTypeGlobal) { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp index c398292699df..60752a3d91ce 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp @@ -183,6 +183,11 @@ TVector CreateBackupBackupCollection(TOperationId opId, con continue; } + // Skip deleted indexes + if (childPath->Dropped()) { + continue; + } + // Get index info and filter for global sync only // We need more complex logic for vector indexes in future auto indexInfo = context.SS->Indexes.at(childPathId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_backup_incremental_backup_collection.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_backup_incremental_backup_collection.cpp index c2da2d2bbd55..6b49d60002e9 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_backup_incremental_backup_collection.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_backup_incremental_backup_collection.cpp @@ -246,6 +246,11 @@ TVector CreateBackupIncrementalBackupCollection(TOperationI if (childPath->PathType != NKikimrSchemeOp::EPathTypeTableIndex) { continue; } + + // Skip deleted indexes + if (childPath->Dropped()) { + continue; + } // Get index info and filter for global sync only auto indexInfo = context.SS->Indexes.at(childPathId);