diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp index 5d9ffa1aea0f..f51127ea03bd 100644 --- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp +++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp @@ -1368,7 +1368,8 @@ class TKqpGatewayProxy : public IKikimrGateway { op.SetPrefix(settings.Prefix); if (settings.Settings.IncrementalBackupEnabled) { - op.MutableIncrementalBackupConfig(); + auto* config = op.MutableIncrementalBackupConfig(); + config->SetOmitIndexes(settings.Settings.OmitIndexes); } auto errOpt = std::visit( diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index a66b5b9c2a8b..9c5eb19fc7f4 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -1021,6 +1021,13 @@ namespace { "INCREMENTAL_BACKUP_ENABLED must be true or false")); return false; } + } else if (name == "omit_indexes") { + auto value = ToString(setting.Value().Cast().Literal().Cast().Value()); + if (!TryFromString(value, dstSettings.OmitIndexes)) { + ctx.AddError(TIssue(ctx.GetPosition(pos), + "OMIT_INDEXES must be true or false")); + return false; + } } else if (name == "storage") { auto value = ToString(setting.Value().Cast().Literal().Cast().Value()); if (to_lower(value) != "cluster") { diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h index 67f252f3e7a6..63494ce2747c 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.h +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h @@ -1052,7 +1052,8 @@ struct TAnalyzeSettings { }; struct TBackupCollectionSettings { - bool IncrementalBackupEnabled; + bool IncrementalBackupEnabled = false; + bool OmitIndexes = false; }; struct TCreateBackupCollectionSettings { diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp index 9c1c88e2b47b..49336ab9ba4b 100644 --- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp @@ -2448,6 +2448,7 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over const THashSet supportedSettings = { "incremental_backup_enabled", "storage", + "omit_indexes", }; if (!CheckBackupCollectionSettings(node.BackupCollectionSettings(), supportedSettings, ctx)) { diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 4fc19bc9647c..6eaa71c40e15 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -2312,7 +2312,7 @@ message TBackupCollectionDescription { } message TIncrementalBackupConfig { - + optional bool OmitIndexes = 1 [default = false]; } oneof Entries { diff --git a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp index 646aea407ba7..51ad1ecf66df 100644 --- a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp +++ b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp @@ -39,6 +39,11 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { using TCdcStream = TShardedTableOptions::TCdcStream; + struct CdcOperationCounts { + int Deletes = 0; + int Inserts = 0; + }; + static NKikimrPQ::TPQConfig DefaultPQConfig() { NKikimrPQ::TPQConfig pqConfig; pqConfig.SetEnabled(true); @@ -211,6 +216,99 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { return proto; } + TString FindIncrementalBackupDir(TTestActorRuntime& runtime, const TActorId& sender, const TString& collectionPath) { + auto request = MakeHolder(); + request->Record.MutableDescribePath()->SetPath(collectionPath); + request->Record.MutableDescribePath()->MutableOptions()->SetReturnChildren(true); + runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release())); + auto reply = runtime.GrabEdgeEventRethrow(sender); + + UNIT_ASSERT_EQUAL(reply->Get()->GetRecord().GetStatus(), NKikimrScheme::EStatus::StatusSuccess); + + const auto& pathDescription = reply->Get()->GetRecord().GetPathDescription(); + for (ui32 i = 0; i < pathDescription.ChildrenSize(); ++i) { + const auto& child = pathDescription.GetChildren(i); + if (child.GetName().EndsWith("_incremental")) { + return child.GetName(); + } + } + return ""; + } + + struct TCdcMetadata { + bool IsDelete; + TVector UpdatedColumns; + TVector ErasedColumns; + }; + + TCdcMetadata ParseCdcMetadata(const TString& bytesValue) { + TCdcMetadata result; + result.IsDelete = false; + + // The bytes contain protobuf-encoded CDC metadata + // For Update mode CDC: + // - Updates have \020\000 (indicating value columns present) + // - Deletes have \020\001 (indicating erase operation) + + if (bytesValue.find("\020\001") != TString::npos) { + result.IsDelete = true; + } + + // Parse column tags from the metadata + // Format: \010\020 + for (size_t i = 0; i < bytesValue.size(); ++i) { + if (bytesValue[i] == '\010' && i + 1 < bytesValue.size()) { + ui32 tag = static_cast(bytesValue[i + 1]); + if (i + 2 < bytesValue.size() && bytesValue[i + 2] == '\020') { + ui8 flags = i + 3 < bytesValue.size() ? static_cast(bytesValue[i + 3]) : 0; + if (flags & 1) { + result.ErasedColumns.push_back(tag); + } else { + result.UpdatedColumns.push_back(tag); + } + } + } + } + + return result; + } + + CdcOperationCounts CountCdcOperations(const TString& backup) { + CdcOperationCounts counts; + size_t pos = 0; + + while ((pos = backup.find("bytes_value: \"", pos)) != TString::npos) { + pos += 14; + size_t endPos = backup.find("\"", pos); + if (endPos == TString::npos) break; + + TString metadataStr = backup.substr(pos, endPos - pos); + TString unescaped; + for (size_t i = 0; i < metadataStr.size(); ++i) { + if (metadataStr[i] == '\\' && i + 3 < metadataStr.size()) { + ui8 val = ((metadataStr[i+1] - '0') << 6) | + ((metadataStr[i+2] - '0') << 3) | + (metadataStr[i+3] - '0'); + unescaped += static_cast(val); + i += 3; + } else { + unescaped += metadataStr[i]; + } + } + + auto metadata = ParseCdcMetadata(unescaped); + if (metadata.IsDelete) { + counts.Deletes++; + } else { + counts.Inserts++; + } + + pos = endPos + 1; + } + + return counts; + } + NKikimrChangeExchange::TChangeRecord MakeUpsertPartial(ui32 key, ui32 value, const TVector& tags = {2}) { auto keyCell = TCell::Make(key); auto valueCell = TCell::Make(value); @@ -2519,6 +2617,401 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { } } + Y_UNIT_TEST(IncrementalBackupWithIndexes) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + .SetEnableBackupService(true) + .SetEnableRealSystemViewPaths(false) + .SetEnableDataColumnForIndexTable(true) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + + TShardedTableOptions opts; + opts.Columns({ + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false} + }); + opts.Indexes({ + TShardedTableOptions::TIndex{"ByValue", {"value"}, {}, NKikimrSchemeOp::EIndexTypeGlobal} + }); + CreateShardedTable(server, edgeActor, "/Root", "Table", opts); + + ExecSQL(server, edgeActor, R"( + CREATE BACKUP COLLECTION `MyCollection` + ( TABLE `/Root/Table` + ) + WITH + ( STORAGE = 'cluster' + , INCREMENTAL_BACKUP_ENABLED = 'true' + ); + )", false); + + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection`;)", false); + SimulateSleep(server, TDuration::Seconds(1)); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 100) + , (2, 200) + , (3, 300) + ; + )"); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES (2, 250); + )"); + + ExecSQL(server, edgeActor, R"(DELETE FROM `/Root/Table` WHERE key=3;)"); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES (4, 400); + )"); + + SimulateSleep(server, TDuration::Seconds(1)); + + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection` INCREMENTAL;)", false); + + SimulateSleep(server, TDuration::Seconds(10)); + + TString incrBackupDir = FindIncrementalBackupDir(runtime, edgeActor, "/Root/.backups/collections/MyCollection"); + UNIT_ASSERT_C(!incrBackupDir.empty(), "Could not find incremental backup directory"); + + TString mainTablePath = TStringBuilder() << "/Root/.backups/collections/MyCollection/" << incrBackupDir << "/Table"; + auto mainTableBackup = KqpSimpleExec(runtime, TStringBuilder() << R"( + SELECT key, value FROM `)" << mainTablePath << R"(` + ORDER BY key + )"); + + UNIT_ASSERT_C(mainTableBackup.find("uint32_value: 2") != TString::npos, + "Main table backup should contain updated key 2"); + UNIT_ASSERT_C(mainTableBackup.find("uint32_value: 250") != TString::npos, + "Main table backup should contain new value 250"); + UNIT_ASSERT_C(mainTableBackup.find("uint32_value: 3") != TString::npos, + "Main table backup should contain deleted key 3"); + UNIT_ASSERT_C(mainTableBackup.find("uint32_value: 4") != TString::npos, + "Main table backup should contain new key 4"); + UNIT_ASSERT_C(mainTableBackup.find("uint32_value: 400") != TString::npos, + "Main table backup should contain new value 400"); + + TString indexBackupPath = TStringBuilder() << "/Root/.backups/collections/MyCollection/" << incrBackupDir << "/__ydb_backup_meta/indexes/Table/ByValue"; + auto indexBackup = KqpSimpleExec(runtime, TStringBuilder() << R"( + SELECT * FROM `)" << indexBackupPath << R"(` + ORDER BY value + )"); + + UNIT_ASSERT_C(indexBackup.find("uint32_value: 200") != TString::npos, + "Index backup should contain old value 200 (deleted)"); + UNIT_ASSERT_C(indexBackup.find("uint32_value: 250") != TString::npos, + "Index backup should contain new value 250"); + UNIT_ASSERT_C(indexBackup.find("uint32_value: 300") != TString::npos, + "Index backup should contain deleted value 300"); + UNIT_ASSERT_C(indexBackup.find("uint32_value: 400") != TString::npos, + "Index backup should contain new value 400"); + } + + Y_UNIT_TEST(IncrementalBackupWithCoveringIndex) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + .SetEnableBackupService(true) + .SetEnableRealSystemViewPaths(false) + .SetEnableDataColumnForIndexTable(true) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + + TShardedTableOptions opts; + opts.Columns({ + {"key", "Uint32", true, false}, + {"name", "Utf8", false, false}, + {"age", "Uint32", false, false}, + {"salary", "Uint32", false, false} + }); + opts.Indexes({ + TShardedTableOptions::TIndex{"ByAge", {"age"}, {"name"}, NKikimrSchemeOp::EIndexTypeGlobal} + }); + CreateShardedTable(server, edgeActor, "/Root", "Table", opts); + + ExecSQL(server, edgeActor, R"( + CREATE BACKUP COLLECTION `MyCollection` + ( TABLE `/Root/Table` + ) + WITH + ( STORAGE = 'cluster' + , INCREMENTAL_BACKUP_ENABLED = 'true' + ); + )", false); + + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection`;)", false); + SimulateSleep(server, TDuration::Seconds(1)); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, name, age, salary) VALUES + (1, 'Alice', 30u, 5000u) + , (2, 'Bob', 25u, 4000u) + ; + )"); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, name, age, salary) VALUES (1, 'Alice2', 30u, 5000u); + )"); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, name, age, salary) VALUES (1, 'Alice2', 30u, 6000u); + )"); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, name, age, salary) VALUES (2, 'Bob', 26u, 4000u); + )"); + + ExecSQL(server, edgeActor, R"(DELETE FROM `/Root/Table` WHERE key=2;)"); + + SimulateSleep(server, TDuration::Seconds(1)); + + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection` INCREMENTAL;)", false); + + SimulateSleep(server, TDuration::Seconds(10)); + + TString incrBackupDir = FindIncrementalBackupDir(runtime, edgeActor, "/Root/.backups/collections/MyCollection"); + UNIT_ASSERT_C(!incrBackupDir.empty(), "Could not find incremental backup directory"); + + TString indexBackupPath = TStringBuilder() << "/Root/.backups/collections/MyCollection/" << incrBackupDir << "/__ydb_backup_meta/indexes/Table/ByAge"; + auto indexBackup = KqpSimpleExec(runtime, TStringBuilder() << R"( + SELECT * FROM `)" << indexBackupPath << R"(` + )"); + + UNIT_ASSERT_C(indexBackup.find("uint32_value: 30") != TString::npos, + "Index backup should contain age 30"); + UNIT_ASSERT_C(indexBackup.find("Alice") != TString::npos, + "Index backup should contain Alice2 from covering column name update"); + UNIT_ASSERT_C(indexBackup.find("uint32_value: 25") != TString::npos, + "Index backup should contain tombstone for age 25"); + UNIT_ASSERT_C(indexBackup.find("uint32_value: 26") != TString::npos, + "Index backup should contain tombstone for age 26"); + UNIT_ASSERT_C(indexBackup.find("null_flag_value: NULL_VALUE") != TString::npos, + "Index backup tombstones should have NULL for covering columns"); + + auto counts = CountCdcOperations(indexBackup); + Cerr << "CDC metadata: " << counts.Deletes << " DELETEs, " << counts.Inserts << " INSERTs" << Endl; + + UNIT_ASSERT_EQUAL_C(counts.Deletes, 2, "Should have 2 DELETE operations (tombstones for age 25 and 26)"); + UNIT_ASSERT_EQUAL_C(counts.Inserts, 1, "Should have 1 INSERT operation (for Alice2)"); + } + + Y_UNIT_TEST(IncrementalBackupMultipleIndexes) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + .SetEnableBackupService(true) + .SetEnableRealSystemViewPaths(false) + .SetEnableDataColumnForIndexTable(true) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + + TShardedTableOptions opts; + opts.Columns({ + {"key", "Uint32", true, false}, + {"name", "Utf8", false, false}, + {"age", "Uint32", false, false}, + {"city", "Utf8", false, false}, + {"salary", "Uint32", false, false} + }); + opts.Indexes({ + TShardedTableOptions::TIndex{"ByName", {"name"}, {}, NKikimrSchemeOp::EIndexTypeGlobal}, + TShardedTableOptions::TIndex{"ByAge", {"age"}, {"salary"}, NKikimrSchemeOp::EIndexTypeGlobal}, + TShardedTableOptions::TIndex{"ByCity", {"city", "name"}, {}, NKikimrSchemeOp::EIndexTypeGlobal} + }); + CreateShardedTable(server, edgeActor, "/Root", "Table", opts); + + ExecSQL(server, edgeActor, R"( + CREATE BACKUP COLLECTION `MyCollection` + ( TABLE `/Root/Table` + ) + WITH + ( STORAGE = 'cluster' + , INCREMENTAL_BACKUP_ENABLED = 'true' + ); + )", false); + + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection`;)", false); + SimulateSleep(server, TDuration::Seconds(5)); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, name, age, city, salary) VALUES + (1, 'Alice', 30u, 'NYC', 5000u) + , (2, 'Bob', 25u, 'LA', 4000u) + ; + )"); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, name, age, city, salary) VALUES (1, 'Alice2', 30u, 'NYC', 5000u); + )"); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, name, age, city, salary) VALUES (2, 'Bob', 26u, 'LA', 4000u); + )"); + + ExecSQL(server, edgeActor, R"(DELETE FROM `/Root/Table` WHERE key=1;)"); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, name, age, city, salary) VALUES (3, 'Carol', 28u, 'SF', 5500u); + )"); + + SimulateSleep(server, TDuration::Seconds(1)); + + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection` INCREMENTAL;)", false); + + SimulateSleep(server, TDuration::Seconds(10)); + + TString incrBackupDir = FindIncrementalBackupDir(runtime, edgeActor, "/Root/.backups/collections/MyCollection"); + UNIT_ASSERT_C(!incrBackupDir.empty(), "Could not find incremental backup directory"); + + TString byNamePath = TStringBuilder() << "/Root/.backups/collections/MyCollection/" << incrBackupDir << "/__ydb_backup_meta/indexes/Table/ByName"; + auto byNameBackup = KqpSimpleExec(runtime, TStringBuilder() << R"( + SELECT * FROM `)" << byNamePath << R"(` + )"); + UNIT_ASSERT_C(byNameBackup.find("Alice") != TString::npos, + "ByName backup should contain Alice (deleted)"); + UNIT_ASSERT_C(byNameBackup.find("Alice2") != TString::npos, + "ByName backup should contain Alice2 (updated)"); + UNIT_ASSERT_C(byNameBackup.find("Carol") != TString::npos, + "ByName backup should contain Carol (new)"); + + TString byAgePath = TStringBuilder() << "/Root/.backups/collections/MyCollection/" << incrBackupDir << "/__ydb_backup_meta/indexes/Table/ByAge"; + auto byAgeBackup = KqpSimpleExec(runtime, TStringBuilder() << R"( + SELECT * FROM `)" << byAgePath << R"(` + )"); + UNIT_ASSERT_C(byAgeBackup.find("uint32_value: 30") != TString::npos, + "ByAge backup should contain age 30 (deleted)"); + UNIT_ASSERT_C(byAgeBackup.find("uint32_value: 25") != TString::npos || + byAgeBackup.find("uint32_value: 26") != TString::npos, + "ByAge backup should contain age change (25 or 26)"); + UNIT_ASSERT_C(byAgeBackup.find("uint32_value: 28") != TString::npos, + "ByAge backup should contain age 28 (new)"); + UNIT_ASSERT_C(byAgeBackup.find("uint32_value: 5500") != TString::npos, + "ByAge backup should contain covered salary 5500"); + + TString byCityPath = TStringBuilder() << "/Root/.backups/collections/MyCollection/" << incrBackupDir << "/__ydb_backup_meta/indexes/Table/ByCity"; + auto byCityBackup = KqpSimpleExec(runtime, TStringBuilder() << R"( + SELECT * FROM `)" << byCityPath << R"(` + )"); + UNIT_ASSERT_C(byCityBackup.find("NYC") != TString::npos, + "ByCity backup should contain NYC"); + UNIT_ASSERT_C(byCityBackup.find("Alice") != TString::npos, + "ByCity backup should contain Alice (part of composite key)"); + UNIT_ASSERT_C(byCityBackup.find("Alice2") != TString::npos, + "ByCity backup should contain Alice2 (updated composite key)"); + UNIT_ASSERT_C(byCityBackup.find("SF") != TString::npos, + "ByCity backup should contain SF (new)"); + UNIT_ASSERT_C(byCityBackup.find("Carol") != TString::npos, + "ByCity backup should contain Carol (new composite key)"); + } + + Y_UNIT_TEST(OmitIndexesIncrementalBackup) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + .SetEnableBackupService(true) + .SetEnableRealSystemViewPaths(false) + .SetEnableDataColumnForIndexTable(true) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + + TShardedTableOptions opts; + opts.Columns({ + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false} + }); + opts.Indexes({ + TShardedTableOptions::TIndex{"ByValue", {"value"}, {}, NKikimrSchemeOp::EIndexTypeGlobal} + }); + CreateShardedTable(server, edgeActor, "/Root", "Table", opts); + + ExecSQL(server, edgeActor, R"( + CREATE BACKUP COLLECTION `MyCollection` + ( TABLE `/Root/Table` + ) + WITH + ( STORAGE = 'cluster' + , INCREMENTAL_BACKUP_ENABLED = 'true' + , OMIT_INDEXES = 'true' + ); + )", false); + + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection`;)", false); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 100) + , (2, 200) + , (3, 300) + ; + )"); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES (2, 250); + )"); + + ExecSQL(server, edgeActor, R"(DELETE FROM `/Root/Table` WHERE key=3;)"); + + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection` INCREMENTAL;)", false); + + SimulateSleep(server, TDuration::Seconds(5)); + + auto mainTableBackup = KqpSimpleExec(runtime, R"( + SELECT key, value FROM `/Root/.backups/collections/MyCollection/19700101000002Z_incremental/Table` + ORDER BY key + )"); + + UNIT_ASSERT_C(mainTableBackup.find("uint32_value: 2") != TString::npos, + "Main table backup should exist with OmitIndexes flag"); + UNIT_ASSERT_C(mainTableBackup.find("uint32_value: 250") != TString::npos, + "Main table backup should contain updated value"); + + bool indexBackupExists = true; + try { + auto indexBackup = KqpSimpleExec(runtime, R"( + SELECT * FROM `/Root/.backups/collections/MyCollection/19700101000002Z_incremental/__ydb_backup_meta/indexes/Table/ByValue` + )"); + if (indexBackup.empty() || indexBackup.find("ERROR") != TString::npos || + indexBackup.find("not found") != TString::npos || indexBackup.find("doesn't exist") != TString::npos) { + indexBackupExists = false; + } + } catch (...) { + indexBackupExists = false; + } + + UNIT_ASSERT_C(!indexBackupExists, "Index backup should NOT exist when OmitIndexes flag is set"); + } + } // Y_UNIT_TEST_SUITE(IncrementalBackup) } // NKikimr diff --git a/ydb/core/tx/schemeshard/schemeshard__backup_collection_common.cpp b/ydb/core/tx/schemeshard/schemeshard__backup_collection_common.cpp index 28c22af19ece..a44e5e18897c 100644 --- a/ydb/core/tx/schemeshard/schemeshard__backup_collection_common.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__backup_collection_common.cpp @@ -130,6 +130,65 @@ std::optional>> GetBackupRequiredPaths( } } + // Add index backup metadata directories if incremental backup is enabled + bool incrBackupEnabled = bc->Description.HasIncrementalBackupConfig(); + + // Check OmitIndexes from two possible locations: + // 1. Top-level OmitIndexes field (for full backups) + // 2. IncrementalBackupConfig.OmitIndexes (for incremental backups) + bool omitIndexes = bc->Description.GetOmitIndexes() || + (incrBackupEnabled && bc->Description.GetIncrementalBackupConfig().GetOmitIndexes()); + + if (incrBackupEnabled && !omitIndexes) { + for (const auto& item : bc->Description.GetExplicitEntryList().GetEntries()) { + const auto tablePath = TPath::Resolve(item.GetPath(), context.SS); + + // Skip if path is not resolved or not a table + auto checks = tablePath.Check(); + checks.IsResolved().IsTable(); + if (!checks) { + continue; + } + + std::pair paths; + TString err; + if (!TrySplitPathByDb(item.GetPath(), tx.GetWorkingDir(), paths, err)) { + continue; + } + auto& relativeItemPath = paths.second; + + // Check if table has indexes + for (const auto& [childName, childPathId] : tablePath.Base()->GetChildren()) { + auto childPath = context.SS->PathsById.at(childPathId); + + if (childPath->PathType != NKikimrSchemeOp::EPathTypeTableIndex) { + continue; + } + + // Skip deleted indexes + if (childPath->Dropped()) { + continue; + } + + auto indexInfo = context.SS->Indexes.at(childPathId); + if (indexInfo->Type != NKikimrSchemeOp::EIndexTypeGlobal) { + continue; + } + + // Add required PARENT directory path for index backup: + // {targetDir}/__ydb_backup_meta/indexes/{table_path} + // The index name will be the table name created within this directory + TString indexBackupParentPath = JoinPath({ + targetDir, + "__ydb_backup_meta", + "indexes", + relativeItemPath + }); + collectionPaths.emplace(indexBackupParentPath); + } + } + } + return paths; } diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 4cb2e5ff16a9..c26815a43e20 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -3673,6 +3673,18 @@ struct TSchemeShard::TTxInit : public TTransactionBase { Y_ABORT_UNLESS(deserializeRes); txState.CdcPathId = TPathId::FromProto(proto.GetTxCopyTableExtraData().GetCdcPathId()); } + } else if (txState.TxType == TTxState::TxCreateCdcStreamAtTable || + txState.TxType == TTxState::TxCreateCdcStreamAtTableWithInitialScan || + txState.TxType == TTxState::TxAlterCdcStreamAtTable || + txState.TxType == TTxState::TxAlterCdcStreamAtTableDropSnapshot || + txState.TxType == TTxState::TxDropCdcStreamAtTable || + txState.TxType == TTxState::TxDropCdcStreamAtTableDropSnapshot) { + if (!extraData.empty()) { + NKikimrSchemeOp::TGenericTxInFlyExtraData proto; + bool deserializeRes = ParseFromStringNoSizeLimit(proto, extraData); + Y_ABORT_UNLESS(deserializeRes); + txState.CdcPathId = TPathId::FromProto(proto.GetTxCopyTableExtraData().GetCdcPathId()); + } } Y_ABORT_UNLESS(txState.TxType != TTxState::TxInvalid); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp index 92155f564df4..2e68d878d6ab 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp @@ -459,6 +459,7 @@ class TAlterCdcStreamAtTable: public TSubOperation { Y_ABORT_UNLESS(!context.SS->FindTx(OperationId)); auto& txState = context.SS->CreateTx(OperationId, txType, tablePath.Base()->PathId); txState.State = TTxState::ConfigureParts; + txState.CdcPathId = streamPath.Base()->PathId; // Store CDC stream PathId for later use tablePath.Base()->PathState = NKikimrSchemeOp::EPathStateAlter; tablePath.Base()->LastTxId = OperationId.GetTxId(); @@ -635,7 +636,7 @@ TVector CreateAlterCdcStream(TOperationId opId, const TTxTr result.push_back(DropLock(NextPartId(opId, result), outTx)); } - if (workingDirPath.IsTableIndex()) { + if (workingDirPath.IsTableIndex() && !streamName.EndsWith("_continuousBackupImpl")) { auto outTx = TransactionTemplate(workingDirPath.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex); outTx.MutableAlterTableIndex()->SetName(workingDirPath.LeafName()); outTx.MutableAlterTableIndex()->SetState(NKikimrSchemeOp::EIndexState::EIndexStateReady); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp index eb6891a41f4f..60752a3d91ce 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp @@ -2,6 +2,8 @@ #include "schemeshard__op_traits.h" #include "schemeshard__operation_common.h" #include "schemeshard__operation_create_cdc_stream.h" +#include "schemeshard__operation_part.h" +#include "schemeshard_utils.h" #include "schemeshard_impl.h" @@ -64,12 +66,12 @@ TVector CreateBackupBackupCollection(TOperationId opId, con Y_ABORT_UNLESS(context.SS->BackupCollections.contains(bcPath->PathId)); const auto& bc = context.SS->BackupCollections[bcPath->PathId]; bool incrBackupEnabled = bc->Description.HasIncrementalBackupConfig(); + + bool omitIndexes = bc->Description.GetOmitIndexes() || + (incrBackupEnabled && bc->Description.GetIncrementalBackupConfig().GetOmitIndexes()); + TString streamName = NBackup::ToX509String(TlsActivationContext->AsActorContext().Now()) + "_continuousBackupImpl"; - bool omitIndexes = bc->Description.HasOmitIndexes() - ? bc->Description.GetOmitIndexes() - : false; - for (const auto& item : bc->Description.GetExplicitEntryList().GetEntries()) { auto& desc = *copyTables.Add(); desc.SetSrcPath(item.GetPath()); @@ -167,6 +169,74 @@ TVector CreateBackupBackupCollection(TOperationId opId, con NCdc::DoCreatePqPart(result, createCdcStreamOp, opId, streamPath, streamName, table, boundaries, false); } + + if (!omitIndexes) { + for (const auto& item : bc->Description.GetExplicitEntryList().GetEntries()) { + const auto tablePath = TPath::Resolve(item.GetPath(), context.SS); + + // Iterate through table's children to find indexes + for (const auto& [childName, childPathId] : tablePath.Base()->GetChildren()) { + auto childPath = context.SS->PathsById.at(childPathId); + + // Skip non-index children (CDC streams, etc.) + if (childPath->PathType != NKikimrSchemeOp::EPathTypeTableIndex) { + continue; + } + + // Skip deleted indexes + if (childPath->Dropped()) { + continue; + } + + // Get index info and filter for global sync only + // We need more complex logic for vector indexes in future + auto indexInfo = context.SS->Indexes.at(childPathId); + if (indexInfo->Type != NKikimrSchemeOp::EIndexTypeGlobal) { + continue; + } + + // Get index implementation table (the only child of index) + auto indexPath = TPath::Init(childPathId, context.SS); + Y_ABORT_UNLESS(indexPath.Base()->GetChildren().size() == 1); + auto [implTableName, implTablePathId] = *indexPath.Base()->GetChildren().begin(); + + auto indexTablePath = indexPath.Child(implTableName); + auto indexTable = context.SS->Tables.at(implTablePathId); + + // Create CDC stream on index impl table + NKikimrSchemeOp::TCreateCdcStream createCdcStreamOp; + createCdcStreamOp.SetTableName(implTableName); + auto& streamDescription = *createCdcStreamOp.MutableStreamDescription(); + streamDescription.SetName(streamName); + streamDescription.SetMode(NKikimrSchemeOp::ECdcStreamModeUpdate); + streamDescription.SetFormat(NKikimrSchemeOp::ECdcStreamFormatProto); + + NCdc::DoCreateStreamImpl(result, createCdcStreamOp, opId, indexTablePath, false, false); + + // Create AtTable operation to notify datashard (without schema change) + { + auto outTx = TransactionTemplate(indexPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamAtTable); + auto& cdcOp = *outTx.MutableCreateCdcStream(); + cdcOp.CopyFrom(createCdcStreamOp); + result.push_back(CreateNewCdcStreamAtTable(NextPartId(opId, result), outTx, false)); + } + + // Create PQ part for index CDC stream + TVector boundaries; + const auto& partitions = indexTable->GetPartitions(); + boundaries.reserve(partitions.size() - 1); + for (ui32 i = 0; i < partitions.size(); ++i) { + const auto& partition = partitions.at(i); + if (i != partitions.size() - 1) { + boundaries.push_back(partition.EndOfRange); + } + } + + const auto streamPath = indexTablePath.Child(streamName); + NCdc::DoCreatePqPart(result, createCdcStreamOp, opId, streamPath, streamName, indexTable, boundaries, false); + } + } + } } return result; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_backup_incremental_backup_collection.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_backup_incremental_backup_collection.cpp index e08dcf9c0e17..6b49d60002e9 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_backup_incremental_backup_collection.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_backup_incremental_backup_collection.cpp @@ -223,6 +223,79 @@ TVector CreateBackupIncrementalBackupCollection(TOperationI streams.push_back(stream); } + // Process indexes if they are not omitted + bool omitIndexes = bc->Description.GetIncrementalBackupConfig().GetOmitIndexes(); + if (!omitIndexes) { + for (const auto& item : bc->Description.GetExplicitEntryList().GetEntries()) { + const auto tablePath = TPath::Resolve(item.GetPath(), context.SS); + auto table = context.SS->Tables.at(tablePath.Base()->PathId); + + std::pair paths; + TString err; + if (!TrySplitPathByDb(item.GetPath(), bcPath.GetDomainPathString(), paths, err)) { + result = {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, err)}; + return result; + } + auto& relativeItemPath = paths.second; + + // Iterate through table's children to find indexes + for (const auto& [childName, childPathId] : tablePath.Base()->GetChildren()) { + auto childPath = context.SS->PathsById.at(childPathId); + + // Skip non-index children (CDC streams, etc.) + if (childPath->PathType != NKikimrSchemeOp::EPathTypeTableIndex) { + continue; + } + + // Skip deleted indexes + if (childPath->Dropped()) { + continue; + } + + // Get index info and filter for global sync only + auto indexInfo = context.SS->Indexes.at(childPathId); + if (indexInfo->Type != NKikimrSchemeOp::EIndexTypeGlobal) { + continue; + } + + // Get index implementation table (single child of index) + auto indexPath = TPath::Init(childPathId, context.SS); + Y_ABORT_UNLESS(indexPath.Base()->GetChildren().size() == 1); + auto [implTableName, implTablePathId] = *indexPath.Base()->GetChildren().begin(); + + // Build relative path to index impl table (relative to working dir) + TString indexImplTableRelPath = JoinPath({relativeItemPath, childName, implTableName}); + + // Create AlterContinuousBackup for index impl table + NKikimrSchemeOp::TModifyScheme modifyScheme; + modifyScheme.SetWorkingDir(tx.GetWorkingDir()); + modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterContinuousBackup); + modifyScheme.SetInternal(true); + + auto& cb = *modifyScheme.MutableAlterContinuousBackup(); + cb.SetTableName(indexImplTableRelPath); // Relative path: table1/index1/indexImplTable + + auto& ib = *cb.MutableTakeIncrementalBackup(); + // Destination: {backup_collection}/{timestamp}_inc/__ydb_backup_meta/indexes/{table_path}/{index_name} + TString dstPath = JoinPath({ + tx.GetBackupIncrementalBackupCollection().GetName(), + tx.GetBackupIncrementalBackupCollection().GetTargetDir(), + "__ydb_backup_meta", + "indexes", + relativeItemPath, // Relative table path (e.g., "table1") + childName // Index name (e.g., "index1") + }); + ib.SetDstPath(dstPath); + + TPathId stream; + if (!CreateAlterContinuousBackup(opId, modifyScheme, context, result, stream)) { + return result; + } + streams.push_back(stream); + } + } + } + CreateLongIncrementalBackupOp(opId, bcPath, result, streams); return result; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common_cdc_stream.cpp index 2cb60ead5641..d8e4566bfebb 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common_cdc_stream.cpp @@ -127,6 +127,54 @@ bool TProposeAtTable::HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOpera table->AlterVersion += 1; NIceDb::TNiceDb db(context.GetDB()); + + bool isContinuousBackupStream = false; + if (txState->CdcPathId && context.SS->PathsById.contains(txState->CdcPathId)) { + auto cdcPath = context.SS->PathsById.at(txState->CdcPathId); + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " Checking CDC stream name" + << ", cdcPathId: " << txState->CdcPathId + << ", streamName: " << cdcPath->Name + << ", at schemeshard: " << context.SS->SelfTabletId()); + if (cdcPath->Name.EndsWith("_continuousBackupImpl")) { + isContinuousBackupStream = true; + } + } else { + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " CdcPathId not found" + << ", cdcPathId: " << txState->CdcPathId + << ", at schemeshard: " << context.SS->SelfTabletId()); + } + + // Check if this is an index implementation table + // If so, we need to sync the parent index version to match the impl table version + // Do this ONLY for continuous backup operations + TPathId parentPathId = path->ParentPathId; + if (parentPathId && context.SS->PathsById.contains(parentPathId) && isContinuousBackupStream) { + auto parentPath = context.SS->PathsById.at(parentPathId); + if (parentPath->IsTableIndex()) { + Y_ABORT_UNLESS(context.SS->Indexes.contains(parentPathId)); + auto index = context.SS->Indexes.at(parentPathId); + + index->AlterVersion = table->AlterVersion; + + // Persist the index version update directly to database + db.Table().Key(parentPathId.LocalPathId).Update( + NIceDb::TUpdate(index->AlterVersion) + ); + + context.SS->ClearDescribePathCaches(parentPath); + context.OnComplete.PublishToSchemeBoard(OperationId, parentPathId); + + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " Synced parent index version with impl table" + << ", indexPathId: " << parentPathId + << ", indexName: " << parentPath->Name + << ", newVersion: " << index->AlterVersion + << ", at schemeshard: " << context.SS->SelfTabletId()); + } + } + context.SS->PersistTableAlterVersion(db, pathId, table); context.SS->ClearDescribePathCaches(path); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index 211bb3654e0d..2ade3854dede 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -322,6 +322,13 @@ class TNewCdcStream: public TSubOperation { Y_ABORT_UNLESS(!context.SS->FindTx(OperationId)); auto& txState = context.SS->CreateTx(OperationId, TTxState::TxCreateCdcStream, streamPath.Base()->PathId); + txState.CdcPathId = streamPath.Base()->PathId; + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "DoNewStream: Set CdcPathId" + << ", operationId: " << OperationId + << ", cdcPathId: " << streamPath.Base()->PathId + << ", streamName: " << streamPath.Base()->Name + << ", at schemeshard: " << context.SS->SelfTabletId()); txState.State = TTxState::Propose; streamPath.Base()->PathState = NKikimrSchemeOp::EPathStateCreate; @@ -582,6 +589,18 @@ class TNewCdcStreamAtTable: public TSubOperation { Y_ABORT_UNLESS(!context.SS->FindTx(OperationId)); auto& txState = context.SS->CreateTx(OperationId, txType, tablePath.Base()->PathId); txState.State = TTxState::ConfigureParts; + + // Set CdcPathId for continuous backup detection + auto streamPath = tablePath.Child(streamName); + if (streamPath.IsResolved()) { + txState.CdcPathId = streamPath.Base()->PathId; + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TNewCdcStreamAtTable: Set CdcPathId" + << ", operationId: " << OperationId + << ", cdcPathId: " << streamPath.Base()->PathId + << ", streamName: " << streamName + << ", at schemeshard: " << context.SS->SelfTabletId()); + } tablePath.Base()->PathState = NKikimrSchemeOp::EPathStateAlter; tablePath.Base()->LastTxId = OperationId.GetTxId(); @@ -984,7 +1003,7 @@ TVector CreateNewCdcStream(TOperationId opId, const TTxTran DoCreateLock(result, opId, workingDirPath, tablePath); } - if (workingDirPath.IsTableIndex()) { + if (workingDirPath.IsTableIndex() && !streamName.EndsWith("_continuousBackupImpl")) { auto outTx = TransactionTemplate(workingDirPath.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex); outTx.MutableAlterTableIndex()->SetName(workingDirPath.LeafName()); outTx.MutableAlterTableIndex()->SetState(NKikimrSchemeOp::EIndexState::EIndexStateReady); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp index 593de4172b0a..11b82835a7ba 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp @@ -191,6 +191,7 @@ class TDropCdcStream: public TSubOperation { Y_ABORT_UNLESS(!context.SS->FindTx(OperationId)); auto& txState = context.SS->CreateTx(OperationId, TTxState::TxDropCdcStream, streamPath.Base()->PathId); + txState.CdcPathId = streamPath.Base()->PathId; txState.State = TTxState::Propose; txState.MinStep = TStepId(1); @@ -583,7 +584,15 @@ void DoDropStream( result.push_back(DropLock(NextPartId(opId, result), outTx)); } - if (workingDirPath.IsTableIndex()) { + bool hasContinuousBackupStream = false; + for (const auto& streamPath : streamPaths) { + if (streamPath.Base()->Name.EndsWith("_continuousBackupImpl")) { + hasContinuousBackupStream = true; + break; + } + } + + if (workingDirPath.IsTableIndex() && !hasContinuousBackupStream) { auto outTx = TransactionTemplate(workingDirPath.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex); outTx.MutableAlterTableIndex()->SetName(workingDirPath.LeafName()); outTx.MutableAlterTableIndex()->SetState(NKikimrSchemeOp::EIndexState::EIndexStateReady); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index a6c07f17acb8..0ced223da83a 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -2643,6 +2643,16 @@ void TSchemeShard::PersistTxState(NIceDb::TNiceDb& db, const TOperationId opId) txState.CdcPathId.ToProto(proto.MutableTxCopyTableExtraData()->MutableCdcPathId()); bool serializeRes = proto.SerializeToString(&extraData); Y_ABORT_UNLESS(serializeRes); + } else if (txState.TxType == TTxState::TxCreateCdcStreamAtTable || + txState.TxType == TTxState::TxCreateCdcStreamAtTableWithInitialScan || + txState.TxType == TTxState::TxAlterCdcStreamAtTable || + txState.TxType == TTxState::TxAlterCdcStreamAtTableDropSnapshot || + txState.TxType == TTxState::TxDropCdcStreamAtTable || + txState.TxType == TTxState::TxDropCdcStreamAtTableDropSnapshot) { + NKikimrSchemeOp::TGenericTxInFlyExtraData proto; + txState.CdcPathId.ToProto(proto.MutableTxCopyTableExtraData()->MutableCdcPathId()); + bool serializeRes = proto.SerializeToString(&extraData); + Y_ABORT_UNLESS(serializeRes); } db.Table().Key(opId.GetTxId(), opId.GetSubTxId()).Update( diff --git a/ydb/core/tx/schemeshard/ut_backup_collection/ut_backup_collection.cpp b/ydb/core/tx/schemeshard/ut_backup_collection/ut_backup_collection.cpp index 966a0273d239..1b4521e464da 100644 --- a/ydb/core/tx/schemeshard/ut_backup_collection/ut_backup_collection.cpp +++ b/ydb/core/tx/schemeshard/ut_backup_collection/ut_backup_collection.cpp @@ -1700,6 +1700,619 @@ Y_UNIT_TEST_SUITE(TBackupCollectionTests) { TestLs(runtime, "/MyRoot/.backups/collections/TestCollection/__ydb_backup_meta", false, NLs::PathExist); } + Y_UNIT_TEST(SingleTableWithGlobalSyncIndex) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true)); + ui64 txId = 100; + + SetupLogging(runtime); + PrepareDirs(runtime, env, txId); + + // Create incremental backup collection + TString collectionSettings = R"( + Name: ")" DEFAULT_NAME_1 R"(" + ExplicitEntryList { + Entries { + Type: ETypeTable + Path: "/MyRoot/TableWithIndex" + } + } + Cluster: {} + IncrementalBackupConfig: {} + )"; + + TestCreateBackupCollection(runtime, ++txId, "/MyRoot/.backups/collections/", collectionSettings); + env.TestWaitNotification(runtime, txId); + + // Create table with one global sync covering index + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "TableWithIndex" + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "ValueIndex" + KeyColumnNames: ["value"] + Type: EIndexTypeGlobal + } + )"); + env.TestWaitNotification(runtime, txId); + + // Execute full backup + TestBackupBackupCollection(runtime, ++txId, "/MyRoot", + R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")"); + env.TestWaitNotification(runtime, txId); + + // Verify CDC stream exists on main table + auto mainTableDesc = DescribePrivatePath(runtime, "/MyRoot/TableWithIndex", true, true); + UNIT_ASSERT(mainTableDesc.GetPathDescription().HasTable()); + + const auto& tableDesc = mainTableDesc.GetPathDescription().GetTable(); + bool foundMainTableCdc = false; + TString mainTableCdcName; + + for (size_t i = 0; i < tableDesc.CdcStreamsSize(); ++i) { + const auto& cdcStream = tableDesc.GetCdcStreams(i); + if (cdcStream.GetName().EndsWith("_continuousBackupImpl")) { + foundMainTableCdc = true; + mainTableCdcName = cdcStream.GetName(); + Cerr << "Found main table CDC stream: " << mainTableCdcName << Endl; + break; + } + } + UNIT_ASSERT_C(foundMainTableCdc, "Main table should have CDC stream with '_continuousBackupImpl' suffix"); + + // Verify CDC stream exists on index implementation table + auto indexDesc = DescribePrivatePath(runtime, "/MyRoot/TableWithIndex/ValueIndex", true, true); + UNIT_ASSERT(indexDesc.GetPathDescription().HasTableIndex()); + + // Get index implementation table (first child of index) + UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetPathDescription().ChildrenSize(), 1); + TString indexImplTableName = indexDesc.GetPathDescription().GetChildren(0).GetName(); + + auto indexImplTableDesc = DescribePrivatePath(runtime, + "/MyRoot/TableWithIndex/ValueIndex/" + indexImplTableName, true, true); + UNIT_ASSERT(indexImplTableDesc.GetPathDescription().HasTable()); + + const auto& indexTableDesc = indexImplTableDesc.GetPathDescription().GetTable(); + bool foundIndexCdc = false; + TString indexCdcName; + + for (size_t i = 0; i < indexTableDesc.CdcStreamsSize(); ++i) { + const auto& cdcStream = indexTableDesc.GetCdcStreams(i); + if (cdcStream.GetName().EndsWith("_continuousBackupImpl")) { + foundIndexCdc = true; + indexCdcName = cdcStream.GetName(); + Cerr << "Found index CDC stream: " << indexCdcName << Endl; + break; + } + } + UNIT_ASSERT_C(foundIndexCdc, "Index implementation table should have CDC stream with '_continuousBackupImpl' suffix"); + + // Verify CDC stream names match pattern and use same timestamp + UNIT_ASSERT_VALUES_EQUAL(mainTableCdcName, indexCdcName); + UNIT_ASSERT_C(mainTableCdcName.Contains("Z") && mainTableCdcName.EndsWith("_continuousBackupImpl"), + "CDC stream name should have X.509 timestamp format (YYYYMMDDHHMMSSZ_continuousBackupImpl)"); + } + + Y_UNIT_TEST(SingleTableWithMultipleGlobalSyncIndexes) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true)); + ui64 txId = 100; + + SetupLogging(runtime); + PrepareDirs(runtime, env, txId); + + // Create incremental backup collection + TString collectionSettings = R"( + Name: ")" DEFAULT_NAME_1 R"(" + ExplicitEntryList { + Entries { + Type: ETypeTable + Path: "/MyRoot/TableWithMultipleIndexes" + } + } + Cluster: {} + IncrementalBackupConfig: {} + )"; + + TestCreateBackupCollection(runtime, ++txId, "/MyRoot/.backups/collections/", collectionSettings); + env.TestWaitNotification(runtime, txId); + + // Create table with multiple global sync indexes + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "TableWithMultipleIndexes" + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "value1" Type: "Utf8" } + Columns { Name: "value2" Type: "Uint64" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "Value1Index" + KeyColumnNames: ["value1"] + Type: EIndexTypeGlobal + } + IndexDescription { + Name: "Value2Index" + KeyColumnNames: ["value2"] + DataColumnNames: ["value1"] + Type: EIndexTypeGlobal + } + )"); + env.TestWaitNotification(runtime, txId); + + // Execute full backup + TestBackupBackupCollection(runtime, ++txId, "/MyRoot", + R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")"); + env.TestWaitNotification(runtime, txId); + + // Verify CDC stream on main table + auto mainTableDesc = DescribePrivatePath(runtime, "/MyRoot/TableWithMultipleIndexes", true, true); + const auto& tableDesc = mainTableDesc.GetPathDescription().GetTable(); + + TString mainCdcName; + for (size_t i = 0; i < tableDesc.CdcStreamsSize(); ++i) { + const auto& cdcStream = tableDesc.GetCdcStreams(i); + if (cdcStream.GetName().EndsWith("_continuousBackupImpl")) { + mainCdcName = cdcStream.GetName(); + break; + } + } + UNIT_ASSERT_C(!mainCdcName.empty(), "Main table should have CDC stream"); + + // Verify CDC streams on both indexes + TVector indexNames = {"Value1Index", "Value2Index"}; + TVector indexCdcNames; + + for (const auto& indexName : indexNames) { + auto indexDesc = DescribePrivatePath(runtime, + "/MyRoot/TableWithMultipleIndexes/" + indexName, true, true); + UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetPathDescription().ChildrenSize(), 1); + TString indexImplTableName = indexDesc.GetPathDescription().GetChildren(0).GetName(); + + auto indexImplTableDesc = DescribePrivatePath(runtime, + "/MyRoot/TableWithMultipleIndexes/" + indexName + "/" + indexImplTableName, true, true); + const auto& indexTableDesc = indexImplTableDesc.GetPathDescription().GetTable(); + + bool foundCdc = false; + for (size_t i = 0; i < indexTableDesc.CdcStreamsSize(); ++i) { + const auto& cdcStream = indexTableDesc.GetCdcStreams(i); + if (cdcStream.GetName().EndsWith("_continuousBackupImpl")) { + indexCdcNames.push_back(cdcStream.GetName()); + foundCdc = true; + Cerr << "Found CDC stream on " << indexName << ": " << cdcStream.GetName() << Endl; + break; + } + } + UNIT_ASSERT_C(foundCdc, "Index " + indexName + " should have CDC stream"); + } + + // Verify all streams use the same timestamp + UNIT_ASSERT_VALUES_EQUAL(indexCdcNames.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(mainCdcName, indexCdcNames[0]); + UNIT_ASSERT_VALUES_EQUAL(mainCdcName, indexCdcNames[1]); + } + + Y_UNIT_TEST(TableWithMixedIndexTypes) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true)); + ui64 txId = 100; + + SetupLogging(runtime); + PrepareDirs(runtime, env, txId); + + // Create incremental backup collection + TString collectionSettings = R"( + Name: ")" DEFAULT_NAME_1 R"(" + ExplicitEntryList { + Entries { + Type: ETypeTable + Path: "/MyRoot/TableWithMixedIndexes" + } + } + Cluster: {} + IncrementalBackupConfig: {} + )"; + + TestCreateBackupCollection(runtime, ++txId, "/MyRoot/.backups/collections/", collectionSettings); + env.TestWaitNotification(runtime, txId); + + // Create table with global sync + async indexes + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "TableWithMixedIndexes" + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "value1" Type: "Utf8" } + Columns { Name: "value2" Type: "Uint64" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "SyncIndex" + KeyColumnNames: ["value1"] + Type: EIndexTypeGlobal + } + IndexDescription { + Name: "AsyncIndex" + KeyColumnNames: ["value2"] + Type: EIndexTypeGlobalAsync + } + )"); + env.TestWaitNotification(runtime, txId); + + // Execute full backup + TestBackupBackupCollection(runtime, ++txId, "/MyRoot", + R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")"); + env.TestWaitNotification(runtime, txId); + + // Verify CDC stream on main table + auto mainTableDesc = DescribePrivatePath(runtime, "/MyRoot/TableWithMixedIndexes", true, true); + const auto& tableDesc = mainTableDesc.GetPathDescription().GetTable(); + + bool foundMainCdc = false; + for (size_t i = 0; i < tableDesc.CdcStreamsSize(); ++i) { + if (tableDesc.GetCdcStreams(i).GetName().EndsWith("_continuousBackupImpl")) { + foundMainCdc = true; + break; + } + } + UNIT_ASSERT_C(foundMainCdc, "Main table should have CDC stream"); + + // Verify CDC stream on global sync index ONLY + auto syncIndexDesc = DescribePrivatePath(runtime, "/MyRoot/TableWithMixedIndexes/SyncIndex", true, true); + UNIT_ASSERT_VALUES_EQUAL(syncIndexDesc.GetPathDescription().ChildrenSize(), 1); + TString syncImplTableName = syncIndexDesc.GetPathDescription().GetChildren(0).GetName(); + + auto syncImplTableDesc = DescribePrivatePath(runtime, + "/MyRoot/TableWithMixedIndexes/SyncIndex/" + syncImplTableName, true, true); + const auto& syncTableDesc = syncImplTableDesc.GetPathDescription().GetTable(); + + bool foundSyncCdc = false; + for (size_t i = 0; i < syncTableDesc.CdcStreamsSize(); ++i) { + if (syncTableDesc.GetCdcStreams(i).GetName().EndsWith("_continuousBackupImpl")) { + foundSyncCdc = true; + Cerr << "Found CDC stream on SyncIndex (expected)" << Endl; + break; + } + } + UNIT_ASSERT_C(foundSyncCdc, "Global sync index should have CDC stream"); + + // Verify NO CDC stream on async index + auto asyncIndexDesc = DescribePrivatePath(runtime, "/MyRoot/TableWithMixedIndexes/AsyncIndex", true, true); + UNIT_ASSERT_VALUES_EQUAL(asyncIndexDesc.GetPathDescription().ChildrenSize(), 1); + TString asyncImplTableName = asyncIndexDesc.GetPathDescription().GetChildren(0).GetName(); + + auto asyncImplTableDesc = DescribePrivatePath(runtime, + "/MyRoot/TableWithMixedIndexes/AsyncIndex/" + asyncImplTableName, true, true); + const auto& asyncTableDesc = asyncImplTableDesc.GetPathDescription().GetTable(); + + bool foundAsyncCdc = false; + for (size_t i = 0; i < asyncTableDesc.CdcStreamsSize(); ++i) { + if (asyncTableDesc.GetCdcStreams(i).GetName().EndsWith("_continuousBackupImpl")) { + foundAsyncCdc = true; + break; + } + } + UNIT_ASSERT_C(!foundAsyncCdc, "Async index should NOT have CDC stream"); + } + + Y_UNIT_TEST(MultipleTablesWithIndexes) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true)); + ui64 txId = 100; + + SetupLogging(runtime); + PrepareDirs(runtime, env, txId); + + // Create incremental backup collection with 2 tables + TString collectionSettings = R"( + Name: ")" DEFAULT_NAME_1 R"(" + ExplicitEntryList { + Entries { + Type: ETypeTable + Path: "/MyRoot/Table1" + } + Entries { + Type: ETypeTable + Path: "/MyRoot/Table2" + } + } + Cluster: {} + IncrementalBackupConfig: {} + )"; + + TestCreateBackupCollection(runtime, ++txId, "/MyRoot/.backups/collections/", collectionSettings); + env.TestWaitNotification(runtime, txId); + + // Create Table1 with index + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "Table1" + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "Index1" + KeyColumnNames: ["value"] + Type: EIndexTypeGlobal + } + )"); + env.TestWaitNotification(runtime, txId); + + // Create Table2 with index + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "Table2" + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "data" Type: "Utf8" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "Index2" + KeyColumnNames: ["data"] + Type: EIndexTypeGlobal + } + )"); + env.TestWaitNotification(runtime, txId); + + // Execute full backup + TestBackupBackupCollection(runtime, ++txId, "/MyRoot", + R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")"); + env.TestWaitNotification(runtime, txId); + + // Verify CDC streams on both main tables + TVector tables = {"Table1", "Table2"}; + TVector indexes = {"Index1", "Index2"}; + + for (size_t tableIdx = 0; tableIdx < tables.size(); ++tableIdx) { + const auto& tableName = tables[tableIdx]; + const auto& indexName = indexes[tableIdx]; + + // Check main table CDC + auto mainTableDesc = DescribePrivatePath(runtime, "/MyRoot/" + tableName, true, true); + const auto& tableDesc = mainTableDesc.GetPathDescription().GetTable(); + + bool foundMainCdc = false; + for (size_t i = 0; i < tableDesc.CdcStreamsSize(); ++i) { + if (tableDesc.GetCdcStreams(i).GetName().EndsWith("_continuousBackupImpl")) { + foundMainCdc = true; + Cerr << "Found CDC stream on " << tableName << Endl; + break; + } + } + UNIT_ASSERT_C(foundMainCdc, tableName + " should have CDC stream"); + + // Check index CDC + auto indexDesc = DescribePrivatePath(runtime, "/MyRoot/" + tableName + "/" + indexName, true, true); + UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetPathDescription().ChildrenSize(), 1); + TString indexImplTableName = indexDesc.GetPathDescription().GetChildren(0).GetName(); + + auto indexImplTableDesc = DescribePrivatePath(runtime, + "/MyRoot/" + tableName + "/" + indexName + "/" + indexImplTableName, true, true); + const auto& indexTableDesc = indexImplTableDesc.GetPathDescription().GetTable(); + + bool foundIndexCdc = false; + for (size_t i = 0; i < indexTableDesc.CdcStreamsSize(); ++i) { + if (indexTableDesc.GetCdcStreams(i).GetName().EndsWith("_continuousBackupImpl")) { + foundIndexCdc = true; + Cerr << "Found CDC stream on " << indexName << Endl; + break; + } + } + UNIT_ASSERT_C(foundIndexCdc, indexName + " should have CDC stream"); + } + } + + Y_UNIT_TEST(IncrementalBackupWithIndexes) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true)); + ui64 txId = 100; + + SetupLogging(runtime); + PrepareDirs(runtime, env, txId); + + // Create incremental backup collection + TString collectionSettings = R"( + Name: ")" DEFAULT_NAME_1 R"(" + ExplicitEntryList { + Entries { + Type: ETypeTable + Path: "/MyRoot/TableForIncremental" + } + } + Cluster: {} + IncrementalBackupConfig: {} + )"; + + TestCreateBackupCollection(runtime, ++txId, "/MyRoot/.backups/collections/", collectionSettings); + env.TestWaitNotification(runtime, txId); + + // Create table with global sync index + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "TableForIncremental" + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "ValueIndex" + KeyColumnNames: ["value"] + Type: EIndexTypeGlobal + } + )"); + env.TestWaitNotification(runtime, txId); + + // Execute full backup (creates CDC streams) + TestBackupBackupCollection(runtime, ++txId, "/MyRoot", + R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")"); + env.TestWaitNotification(runtime, txId); + + // Verify CDC streams were created for both main table and index + auto mainTableDesc = DescribePrivatePath(runtime, "/MyRoot/TableForIncremental", true, true); + const auto& tableDesc = mainTableDesc.GetPathDescription().GetTable(); + + bool foundMainCdc = false; + for (size_t i = 0; i < tableDesc.CdcStreamsSize(); ++i) { + if (tableDesc.GetCdcStreams(i).GetName().EndsWith("_continuousBackupImpl")) { + foundMainCdc = true; + Cerr << "Found CDC stream on main table" << Endl; + break; + } + } + UNIT_ASSERT_C(foundMainCdc, "Main table should have CDC stream after full backup"); + + // Verify CDC stream on index + auto indexDesc = DescribePrivatePath(runtime, "/MyRoot/TableForIncremental/ValueIndex", true, true); + UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetPathDescription().ChildrenSize(), 1); + TString indexImplTableName = indexDesc.GetPathDescription().GetChildren(0).GetName(); + + auto indexImplTableDesc = DescribePrivatePath(runtime, + "/MyRoot/TableForIncremental/ValueIndex/" + indexImplTableName, true, true); + const auto& indexTableDesc = indexImplTableDesc.GetPathDescription().GetTable(); + + bool foundIndexCdc = false; + for (size_t i = 0; i < indexTableDesc.CdcStreamsSize(); ++i) { + if (indexTableDesc.GetCdcStreams(i).GetName().EndsWith("_continuousBackupImpl")) { + foundIndexCdc = true; + Cerr << "Found CDC stream on index implementation table" << Endl; + break; + } + } + UNIT_ASSERT_C(foundIndexCdc, "Index implementation table should have CDC stream after full backup"); + + runtime.AdvanceCurrentTime(TDuration::Seconds(1)); + + // Execute incremental backup (rotates CDC, creates backup tables) + TestBackupIncrementalBackupCollection(runtime, ++txId, "/MyRoot", + R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")"); + env.TestWaitNotification(runtime, txId); + + // Verify backup collection structure + TestDescribeResult(DescribePath(runtime, "/MyRoot/.backups/collections/" DEFAULT_NAME_1), { + NLs::PathExist, + NLs::IsBackupCollection, + NLs::ChildrenCount(2), // full + incremental + }); + + // Find the incremental backup directory (should end with "_incremental") + auto collectionDesc = DescribePath(runtime, "/MyRoot/.backups/collections/" DEFAULT_NAME_1, true, true); + TString incrBackupDir; + for (size_t i = 0; i < collectionDesc.GetPathDescription().ChildrenSize(); ++i) { + const auto& child = collectionDesc.GetPathDescription().GetChildren(i); + Cerr << "Child: " << child.GetName() << " PathState: " << child.GetPathState() << Endl; + if (child.GetName().EndsWith("_incremental")) { + incrBackupDir = child.GetName(); + break; + } + } + UNIT_ASSERT_C(!incrBackupDir.empty(), "Should find incremental backup directory"); + + // Verify backup table for main table exists + TestDescribeResult(DescribePath(runtime, + "/MyRoot/.backups/collections/" DEFAULT_NAME_1 "/" + incrBackupDir + "/TableForIncremental"), { + NLs::PathExist, + NLs::IsTable, + }); + + // Verify index backup table exists in __ydb_backup_meta/indexes/TableForIncremental/ValueIndex + TString indexBackupPath = "/MyRoot/.backups/collections/" DEFAULT_NAME_1 "/" + incrBackupDir + + "/__ydb_backup_meta/indexes/TableForIncremental/ValueIndex"; + TestDescribeResult(DescribePath(runtime, indexBackupPath), { + NLs::PathExist, + NLs::IsTable, + }); + + Cerr << "SUCCESS: Full backup created CDC streams for both main table and index" << Endl; + Cerr << " Incremental backup created backup tables for both main table and index" << Endl; + Cerr << " Index backup table verified at: " << indexBackupPath << Endl; + } + + Y_UNIT_TEST(OmitIndexesFlag) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true)); + ui64 txId = 100; + + SetupLogging(runtime); + PrepareDirs(runtime, env, txId); + + // Create incremental backup collection WITH OmitIndexes flag set + TString collectionSettings = R"( + Name: ")" DEFAULT_NAME_1 R"(" + ExplicitEntryList { + Entries { + Type: ETypeTable + Path: "/MyRoot/TableWithIndex" + } + } + Cluster: {} + IncrementalBackupConfig { + OmitIndexes: true + } + )"; + + TestCreateBackupCollection(runtime, ++txId, "/MyRoot/.backups/collections/", collectionSettings); + env.TestWaitNotification(runtime, txId); + + // Create table with global sync index + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "TableWithIndex" + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "ValueIndex" + KeyColumnNames: ["value"] + Type: EIndexTypeGlobal + } + )"); + env.TestWaitNotification(runtime, txId); + + // Execute full backup + TestBackupBackupCollection(runtime, ++txId, "/MyRoot", + R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")"); + env.TestWaitNotification(runtime, txId); + + // Verify CDC stream exists on main table + auto mainTableDesc = DescribePrivatePath(runtime, "/MyRoot/TableWithIndex", true, true); + const auto& tableDesc = mainTableDesc.GetPathDescription().GetTable(); + + bool foundMainCdc = false; + for (size_t i = 0; i < tableDesc.CdcStreamsSize(); ++i) { + if (tableDesc.GetCdcStreams(i).GetName().EndsWith("_continuousBackupImpl")) { + foundMainCdc = true; + Cerr << "Found CDC stream on main table (expected)" << Endl; + break; + } + } + UNIT_ASSERT_C(foundMainCdc, "Main table should have CDC stream even with OmitIndexes=true"); + + // Verify NO CDC stream on index (because OmitIndexes is true) + auto indexDesc = DescribePrivatePath(runtime, "/MyRoot/TableWithIndex/ValueIndex", true, true); + UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetPathDescription().ChildrenSize(), 1); + TString indexImplTableName = indexDesc.GetPathDescription().GetChildren(0).GetName(); + + auto indexImplTableDesc = DescribePrivatePath(runtime, + "/MyRoot/TableWithIndex/ValueIndex/" + indexImplTableName, true, true); + const auto& indexTableDesc = indexImplTableDesc.GetPathDescription().GetTable(); + + bool foundIndexCdc = false; + for (size_t i = 0; i < indexTableDesc.CdcStreamsSize(); ++i) { + if (indexTableDesc.GetCdcStreams(i).GetName().EndsWith("_continuousBackupImpl")) { + foundIndexCdc = true; + break; + } + } + UNIT_ASSERT_C(!foundIndexCdc, "Index should NOT have CDC stream when OmitIndexes=true"); + + Cerr << "SUCCESS: OmitIndexes flag works correctly - main table has CDC, index does not" << Endl; + } + Y_UNIT_TEST(BackupWithIndexes) { TTestBasicRuntime runtime; TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true)); diff --git a/ydb/core/tx/schemeshard/ut_base/ut_base.cpp b/ydb/core/tx/schemeshard/ut_base/ut_base.cpp index 6fb1434ae528..11aad2f7cf3b 100644 --- a/ydb/core/tx/schemeshard/ut_base/ut_base.cpp +++ b/ydb/core/tx/schemeshard/ut_base/ut_base.cpp @@ -11924,9 +11924,12 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { auto backupDirName = descr.GetChildren(0).GetName().c_str(); + // When WithIncremental=true, __ydb_backup_meta directory is created for index backups + // So we expect 3 children: Table1, DirA, and __ydb_backup_meta + // When WithIncremental=false, we expect 2 children: Table1 and DirA TestDescribeResult(DescribePath(runtime, Sprintf("/MyRoot/.backups/collections/MyCollection1/%s", backupDirName)), { NLs::PathExist, - NLs::ChildrenCount(2), + NLs::ChildrenCount(WithIncremental ? 3 : 2), NLs::Finished, }); @@ -11990,9 +11993,10 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { } } + // Incremental backup directory contains Table1, DirA, and __ydb_backup_meta TestDescribeResult(DescribePath(runtime, Sprintf("/MyRoot/.backups/collections/MyCollection1/%s", incrBackupDirName)), { NLs::PathExist, - NLs::ChildrenCount(2), + NLs::ChildrenCount(3), NLs::Finished, });