diff --git a/INCREMENTAL_RESTORE_INDEXES_PLAN.md b/INCREMENTAL_RESTORE_INDEXES_PLAN.md new file mode 100644 index 000000000000..9553602f1d83 --- /dev/null +++ b/INCREMENTAL_RESTORE_INDEXES_PLAN.md @@ -0,0 +1,311 @@ +# Implementation Plan: Incremental Restore for Indexes + +## Overview + +This document describes the implementation plan for adding incremental restore support for indexes in YDB, extending the existing incremental backup/restore functionality implemented in branch `feature/incr-backup/indexes-support-003`. + +## Current State Analysis + +### Backup Implementation (Already Done) +- When backing up tables, the system now also backs up global indexes (if `OmitIndexes` is false) +- For each table, it discovers child indexes of type `EIndexTypeGlobal` +- Creates `AlterContinuousBackup` operations for index implementation tables +- Stores index backups at: `{backup_collection}/{timestamp}_inc/__ydb_backup_meta/indexes/{table_path}/{index_name}` +- **Location**: `ydb/core/tx/schemeshard/schemeshard__operation_backup_incremental_backup_collection.cpp:240-311` + +### Restore Implementation (Tables Only - Current Gap) +- Currently restores only tables, not indexes +- Uses `ESchemeOpRestoreMultipleIncrementalBackups` operation +- Creates separate restore transactions for each table +- **Location**: `ydb/core/tx/schemeshard/schemeshard_incremental_restore_scan.cpp:512-599` + +## Architecture Design + +### Key Principles + +1. **Parallel Execution**: Tables and indexes restore data simultaneously + - Multiple `ESchemeOpRestoreMultipleIncrementalBackups` operations created + - Each operation targets different shards (table shards vs index impl table shards) + - No artificial dependencies or sequencing between tables and indexes + +2. **Unified Tracking**: Same tracking mechanism for both tables and indexes + - Both added to `InProgressOperations` + - Both tracked in `TableOperations` map + - Both contribute to completion check + +3. **Atomic Activation**: Finalization happens together + - `AreAllCurrentOperationsComplete()` waits for ALL operations (tables + indexes) + - `FinalizeIncrementalRestoreOperation` activates everything atomically + - No partial visibility (either all visible or none) + +## Implementation Phases + +### Phase 1: Parallel Discovery & Operation Creation + +**File**: `ydb/core/tx/schemeshard/schemeshard_incremental_restore_scan.cpp` + +#### Task 1.1: Extend `CreateIncrementalRestoreOperation` + +Add index discovery after table operation creation (after line ~596): + +```cpp +void TSchemeShard::CreateIncrementalRestoreOperation( + const TPathId& backupCollectionPathId, + ui64 operationId, + const TString& backupName, + const TActorContext& ctx) { + + // ... existing table discovery and operation creation ... + + // NEW: Discover and create index restore operations IN PARALLEL + DiscoverAndCreateIndexRestoreOperations( + backupCollectionPathId, + operationId, + backupName, + bcPath, + backupCollectionInfo, + ctx + ); +} +``` + +#### Task 1.2: Implement index discovery helper + +```cpp +void TSchemeShard::DiscoverAndCreateIndexRestoreOperations( + const TPathId& backupCollectionPathId, + ui64 operationId, + const TString& backupName, + const TPath& bcPath, + const TBackupCollectionInfo::TPtr& backupCollectionInfo, + const TActorContext& ctx); +``` + +**Logic**: +1. Check if indexes were backed up (`OmitIndexes` flag) +2. Resolve path to index metadata: `{backup}/__ydb_backup_meta/indexes` +3. Iterate through table directories under `indexes/` +4. For each index in each table directory: + - Find corresponding target table path + - Validate index exists on target table + - Create restore operation + +#### Task 1.3: Create individual index restore operations + +```cpp +void TSchemeShard::CreateSingleIndexRestoreOperation( + ui64 operationId, + const TString& backupName, + const TPath& bcPath, + const TString& relativeTablePath, + const TString& indexName, + const TString& targetTablePath, + const TActorContext& ctx); +``` + +**Logic**: +1. Validate target table exists and has the specified index +2. Find index implementation table (child of index path) +3. Construct source path: `{backup}/__ydb_backup_meta/indexes/{table}/{index}` +4. Construct destination path: `{table}/{index}/indexImplTable` +5. Create `ESchemeOpRestoreMultipleIncrementalBackups` operation +6. Track operation in `InProgressOperations` (same as table operations) +7. Track expected shards for index impl table +8. Send restore request + +### Phase 2: Helper Functions + +**File**: `ydb/core/tx/schemeshard/schemeshard_incremental_restore_scan.cpp` + +#### Task 2.1: Add target table path mapper + +```cpp +TString TSchemeShard::FindTargetTablePath( + const TBackupCollectionInfo::TPtr& backupCollectionInfo, + const TString& relativeTablePath); +``` + +**Logic**: +- Map backup relative path to restore target path +- Use backup collection's `ExplicitEntryList` to find matching table + +### Phase 3: Unified Progress Tracking + +**File**: `ydb/core/tx/schemeshard/schemeshard_incremental_restore_scan.cpp` + +#### Task 3.1: Verify existing tracking works + +**No changes needed!** The existing `TTxProgressIncrementalRestore` already handles this: + +```cpp +// Existing code at line ~75: +if (state.AreAllCurrentOperationsComplete()) { + LOG_I("All operations for current incremental backup completed, moving to next"); + // This includes BOTH table and index operations! + state.MarkCurrentIncrementalComplete(); + state.MoveToNextIncremental(); +} +``` + +Both table and index operations are tracked in `InProgressOperations` and complete independently. + +### Phase 4: Synchronized Finalization + +**File**: `ydb/core/tx/schemeshard/schemeshard_incremental_restore_scan.cpp` + +#### Task 4.1: Verify atomic activation + +Review `FinalizeIncrementalRestoreOperation` to ensure it atomically activates both tables and indexes by sending the `ESchemeOpIncrementalRestoreFinalize` operation. + +The finalization should: +1. Wait for ALL operations (tables + indexes) to complete +2. Atomically change path states for all involved paths +3. Make everything visible/active in a single transaction + +### Phase 5: Header Declarations + +**File**: `ydb/core/tx/schemeshard/schemeshard_impl.h` + +#### Task 5.1: Add method declarations + +```cpp +void DiscoverAndCreateIndexRestoreOperations( + const TPathId& backupCollectionPathId, + ui64 operationId, + const TString& backupName, + const TPath& bcPath, + const TBackupCollectionInfo::TPtr& backupCollectionInfo, + const TActorContext& ctx); + +void CreateSingleIndexRestoreOperation( + ui64 operationId, + const TString& backupName, + const TPath& bcPath, + const TString& relativeTablePath, + const TString& indexName, + const TString& targetTablePath, + const TActorContext& ctx); + +TString FindTargetTablePath( + const TBackupCollectionInfo::TPtr& backupCollectionInfo, + const TString& relativeTablePath); +``` + +#### Task 5.2: Optional tracking enhancement + +Add flag to distinguish index vs table operations (for debugging): + +```cpp +struct TTableOperationState { + TOperationId OperationId; + THashSet ExpectedShards; + THashSet CompletedShards; + bool IsIndexOperation = false; // NEW: for logging/debugging +}; +``` + +### Phase 6: Testing + +**File**: `ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp` + +#### Test Scenarios + +1. **Basic index restore**: Backup and restore table with one global index +2. **Multiple indexes**: Table with multiple global indexes +3. **Mixed scenario**: Multiple tables, each with different number of indexes +4. **Index data verification**: Verify index data is correctly restored +5. **Parallel completion**: Verify indexes and tables complete in parallel +6. **Incremental sequences**: Multiple incremental backups with index changes +7. **OmitIndexes flag**: Verify restore works gracefully when indexes weren't backed up +8. **Missing index**: Index in backup but not on target table (should warn and skip) + +## Files to Modify + +### Core Implementation +1. **`ydb/core/tx/schemeshard/schemeshard_incremental_restore_scan.cpp`** + - Add `DiscoverAndCreateIndexRestoreOperations()` + - Add `CreateSingleIndexRestoreOperation()` + - Add `FindTargetTablePath()` helper + - Call from `CreateIncrementalRestoreOperation()` + +2. **`ydb/core/tx/schemeshard/schemeshard_impl.h`** + - Add method declarations + - Optional: Add `IsIndexOperation` flag to tracking structure + +### Testing +3. **`ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp`** + - Add comprehensive test suite for index restore scenarios + +### No Changes Needed +- **Proto definitions**: Reuse existing `TRestoreMultipleIncrementalBackups` +- **Progress tracking**: Already handles mixed operations +- **Finalization**: Should already be atomic + +## Key Implementation Details + +### Path Structure + +**Backup structure**: +``` +{backup_collection}/ + {timestamp}_incremental/ + table1/ # Main table data + table2/ + __ydb_backup_meta/ + indexes/ + table1/ + index1/ # Index implementation table data + index2/ + table2/ + index1/ +``` + +**Restore mapping**: +- Source: `{backup}/__ydb_backup_meta/indexes/{table}/{index}` +- Target: `{table_path}/{index_name}/indexImplTable` + +### Index Types + +Only `EIndexTypeGlobal` indexes are backed up and restored (sync indexes). Async indexes (vector indexes, etc.) are excluded. + +### Error Handling + +1. **Missing indexes on target**: Log warning, skip that index, continue with others +2. **OmitIndexes=true in backup**: Skip index restore entirely, log info message +3. **Schema mismatches**: Let the restore operation fail naturally (same as table restore) +4. **Index metadata directory missing**: Log info, continue (valid case if no indexes backed up) + +### Performance Considerations + +- Index restores run **in parallel** with table restores +- Each targets different shards, no contention +- Completion tracked together, finalization is atomic +- No artificial sequencing or dependencies + +## Estimated Complexity + +- **Phase 1 (Parallel Discovery)**: ~400-500 lines +- **Phase 2 (Helpers)**: ~100-150 lines +- **Phase 3 (Tracking)**: ~0 lines (already works!) +- **Phase 4 (Finalization)**: ~50-100 lines (verification/minor tweaks) +- **Phase 5 (Headers)**: ~50 lines +- **Phase 6 (Testing)**: ~500-800 lines + +**Total estimated effort**: 1-2 weeks + +## Success Criteria + +1. ✅ Index backups are discovered during restore +2. ✅ Index restore operations created in parallel with table operations +3. ✅ All operations (tables + indexes) tracked together +4. ✅ Atomic activation when all complete +5. ✅ Data integrity: index data matches expected state after restore +6. ✅ Tests pass for all scenarios +7. ✅ Graceful handling of edge cases (missing indexes, OmitIndexes flag, etc.) + +## References + +- **Base branch**: `Enjection:feature/incr-backup/indexes-support-003` +- **Backup implementation**: `schemeshard__operation_backup_incremental_backup_collection.cpp:240-311` +- **Table restore**: `schemeshard_incremental_restore_scan.cpp:512-599` +- **Progress tracking**: `schemeshard_incremental_restore_scan.cpp:30-200` diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp index 5d9ffa1aea0f..f51127ea03bd 100644 --- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp +++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp @@ -1368,7 +1368,8 @@ class TKqpGatewayProxy : public IKikimrGateway { op.SetPrefix(settings.Prefix); if (settings.Settings.IncrementalBackupEnabled) { - op.MutableIncrementalBackupConfig(); + auto* config = op.MutableIncrementalBackupConfig(); + config->SetOmitIndexes(settings.Settings.OmitIndexes); } auto errOpt = std::visit( diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index a66b5b9c2a8b..9c5eb19fc7f4 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -1021,6 +1021,13 @@ namespace { "INCREMENTAL_BACKUP_ENABLED must be true or false")); return false; } + } else if (name == "omit_indexes") { + auto value = ToString(setting.Value().Cast().Literal().Cast().Value()); + if (!TryFromString(value, dstSettings.OmitIndexes)) { + ctx.AddError(TIssue(ctx.GetPosition(pos), + "OMIT_INDEXES must be true or false")); + return false; + } } else if (name == "storage") { auto value = ToString(setting.Value().Cast().Literal().Cast().Value()); if (to_lower(value) != "cluster") { diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h index 67f252f3e7a6..63494ce2747c 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.h +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h @@ -1052,7 +1052,8 @@ struct TAnalyzeSettings { }; struct TBackupCollectionSettings { - bool IncrementalBackupEnabled; + bool IncrementalBackupEnabled = false; + bool OmitIndexes = false; }; struct TCreateBackupCollectionSettings { diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp index 9c1c88e2b47b..49336ab9ba4b 100644 --- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp @@ -2448,6 +2448,7 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over const THashSet supportedSettings = { "incremental_backup_enabled", "storage", + "omit_indexes", }; if (!CheckBackupCollectionSettings(node.BackupCollectionSettings(), supportedSettings, ctx)) { diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 4fc19bc9647c..6eaa71c40e15 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -2312,7 +2312,7 @@ message TBackupCollectionDescription { } message TIncrementalBackupConfig { - + optional bool OmitIndexes = 1 [default = false]; } oneof Entries { diff --git a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp index 646aea407ba7..1873fa3b4a51 100644 --- a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp +++ b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp @@ -39,6 +39,11 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { using TCdcStream = TShardedTableOptions::TCdcStream; + struct CdcOperationCounts { + int Deletes = 0; + int Inserts = 0; + }; + static NKikimrPQ::TPQConfig DefaultPQConfig() { NKikimrPQ::TPQConfig pqConfig; pqConfig.SetEnabled(true); @@ -211,6 +216,99 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { return proto; } + TString FindIncrementalBackupDir(TTestActorRuntime& runtime, const TActorId& sender, const TString& collectionPath) { + auto request = MakeHolder(); + request->Record.MutableDescribePath()->SetPath(collectionPath); + request->Record.MutableDescribePath()->MutableOptions()->SetReturnChildren(true); + runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release())); + auto reply = runtime.GrabEdgeEventRethrow(sender); + + UNIT_ASSERT_EQUAL(reply->Get()->GetRecord().GetStatus(), NKikimrScheme::EStatus::StatusSuccess); + + const auto& pathDescription = reply->Get()->GetRecord().GetPathDescription(); + for (ui32 i = 0; i < pathDescription.ChildrenSize(); ++i) { + const auto& child = pathDescription.GetChildren(i); + if (child.GetName().EndsWith("_incremental")) { + return child.GetName(); + } + } + return ""; + } + + struct TCdcMetadata { + bool IsDelete; + TVector UpdatedColumns; + TVector ErasedColumns; + }; + + TCdcMetadata ParseCdcMetadata(const TString& bytesValue) { + TCdcMetadata result; + result.IsDelete = false; + + // The bytes contain protobuf-encoded CDC metadata + // For Update mode CDC: + // - Updates have \020\000 (indicating value columns present) + // - Deletes have \020\001 (indicating erase operation) + + if (bytesValue.find("\020\001") != TString::npos) { + result.IsDelete = true; + } + + // Parse column tags from the metadata + // Format: \010\020 + for (size_t i = 0; i < bytesValue.size(); ++i) { + if (bytesValue[i] == '\010' && i + 1 < bytesValue.size()) { + ui32 tag = static_cast(bytesValue[i + 1]); + if (i + 2 < bytesValue.size() && bytesValue[i + 2] == '\020') { + ui8 flags = i + 3 < bytesValue.size() ? static_cast(bytesValue[i + 3]) : 0; + if (flags & 1) { + result.ErasedColumns.push_back(tag); + } else { + result.UpdatedColumns.push_back(tag); + } + } + } + } + + return result; + } + + CdcOperationCounts CountCdcOperations(const TString& backup) { + CdcOperationCounts counts; + size_t pos = 0; + + while ((pos = backup.find("bytes_value: \"", pos)) != TString::npos) { + pos += 14; + size_t endPos = backup.find("\"", pos); + if (endPos == TString::npos) break; + + TString metadataStr = backup.substr(pos, endPos - pos); + TString unescaped; + for (size_t i = 0; i < metadataStr.size(); ++i) { + if (metadataStr[i] == '\\' && i + 3 < metadataStr.size()) { + ui8 val = ((metadataStr[i+1] - '0') << 6) | + ((metadataStr[i+2] - '0') << 3) | + (metadataStr[i+3] - '0'); + unescaped += static_cast(val); + i += 3; + } else { + unescaped += metadataStr[i]; + } + } + + auto metadata = ParseCdcMetadata(unescaped); + if (metadata.IsDelete) { + counts.Deletes++; + } else { + counts.Inserts++; + } + + pos = endPos + 1; + } + + return counts; + } + NKikimrChangeExchange::TChangeRecord MakeUpsertPartial(ui32 key, ui32 value, const TVector& tags = {2}) { auto keyCell = TCell::Make(key); auto valueCell = TCell::Make(value); @@ -2519,6 +2617,1055 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { } } + Y_UNIT_TEST(IncrementalBackupWithIndexes) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + .SetEnableBackupService(true) + .SetEnableRealSystemViewPaths(false) + .SetEnableDataColumnForIndexTable(true) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + + TShardedTableOptions opts; + opts.Columns({ + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false} + }); + opts.Indexes({ + TShardedTableOptions::TIndex{"ByValue", {"value"}, {}, NKikimrSchemeOp::EIndexTypeGlobal} + }); + CreateShardedTable(server, edgeActor, "/Root", "Table", opts); + + ExecSQL(server, edgeActor, R"( + CREATE BACKUP COLLECTION `MyCollection` + ( TABLE `/Root/Table` + ) + WITH + ( STORAGE = 'cluster' + , INCREMENTAL_BACKUP_ENABLED = 'true' + ); + )", false); + + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection`;)", false); + SimulateSleep(server, TDuration::Seconds(1)); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 100) + , (2, 200) + , (3, 300) + ; + )"); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES (2, 250); + )"); + + ExecSQL(server, edgeActor, R"(DELETE FROM `/Root/Table` WHERE key=3;)"); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES (4, 400); + )"); + + SimulateSleep(server, TDuration::Seconds(1)); + + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection` INCREMENTAL;)", false); + + SimulateSleep(server, TDuration::Seconds(10)); + + TString incrBackupDir = FindIncrementalBackupDir(runtime, edgeActor, "/Root/.backups/collections/MyCollection"); + UNIT_ASSERT_C(!incrBackupDir.empty(), "Could not find incremental backup directory"); + + TString mainTablePath = TStringBuilder() << "/Root/.backups/collections/MyCollection/" << incrBackupDir << "/Table"; + auto mainTableBackup = KqpSimpleExec(runtime, TStringBuilder() << R"( + SELECT key, value FROM `)" << mainTablePath << R"(` + ORDER BY key + )"); + + UNIT_ASSERT_C(mainTableBackup.find("uint32_value: 2") != TString::npos, + "Main table backup should contain updated key 2"); + UNIT_ASSERT_C(mainTableBackup.find("uint32_value: 250") != TString::npos, + "Main table backup should contain new value 250"); + UNIT_ASSERT_C(mainTableBackup.find("uint32_value: 3") != TString::npos, + "Main table backup should contain deleted key 3"); + UNIT_ASSERT_C(mainTableBackup.find("uint32_value: 4") != TString::npos, + "Main table backup should contain new key 4"); + UNIT_ASSERT_C(mainTableBackup.find("uint32_value: 400") != TString::npos, + "Main table backup should contain new value 400"); + + TString indexBackupPath = TStringBuilder() << "/Root/.backups/collections/MyCollection/" << incrBackupDir << "/__ydb_backup_meta/indexes/Table/ByValue"; + auto indexBackup = KqpSimpleExec(runtime, TStringBuilder() << R"( + SELECT * FROM `)" << indexBackupPath << R"(` + ORDER BY value + )"); + + UNIT_ASSERT_C(indexBackup.find("uint32_value: 200") != TString::npos, + "Index backup should contain old value 200 (deleted)"); + UNIT_ASSERT_C(indexBackup.find("uint32_value: 250") != TString::npos, + "Index backup should contain new value 250"); + UNIT_ASSERT_C(indexBackup.find("uint32_value: 300") != TString::npos, + "Index backup should contain deleted value 300"); + UNIT_ASSERT_C(indexBackup.find("uint32_value: 400") != TString::npos, + "Index backup should contain new value 400"); + } + + Y_UNIT_TEST(IncrementalBackupWithCoveringIndex) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + .SetEnableBackupService(true) + .SetEnableRealSystemViewPaths(false) + .SetEnableDataColumnForIndexTable(true) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + + TShardedTableOptions opts; + opts.Columns({ + {"key", "Uint32", true, false}, + {"name", "Utf8", false, false}, + {"age", "Uint32", false, false}, + {"salary", "Uint32", false, false} + }); + opts.Indexes({ + TShardedTableOptions::TIndex{"ByAge", {"age"}, {"name"}, NKikimrSchemeOp::EIndexTypeGlobal} + }); + CreateShardedTable(server, edgeActor, "/Root", "Table", opts); + + ExecSQL(server, edgeActor, R"( + CREATE BACKUP COLLECTION `MyCollection` + ( TABLE `/Root/Table` + ) + WITH + ( STORAGE = 'cluster' + , INCREMENTAL_BACKUP_ENABLED = 'true' + ); + )", false); + + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection`;)", false); + SimulateSleep(server, TDuration::Seconds(1)); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, name, age, salary) VALUES + (1, 'Alice', 30u, 5000u) + , (2, 'Bob', 25u, 4000u) + ; + )"); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, name, age, salary) VALUES (1, 'Alice2', 30u, 5000u); + )"); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, name, age, salary) VALUES (1, 'Alice2', 30u, 6000u); + )"); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, name, age, salary) VALUES (2, 'Bob', 26u, 4000u); + )"); + + ExecSQL(server, edgeActor, R"(DELETE FROM `/Root/Table` WHERE key=2;)"); + + SimulateSleep(server, TDuration::Seconds(1)); + + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection` INCREMENTAL;)", false); + + SimulateSleep(server, TDuration::Seconds(10)); + + TString incrBackupDir = FindIncrementalBackupDir(runtime, edgeActor, "/Root/.backups/collections/MyCollection"); + UNIT_ASSERT_C(!incrBackupDir.empty(), "Could not find incremental backup directory"); + + TString indexBackupPath = TStringBuilder() << "/Root/.backups/collections/MyCollection/" << incrBackupDir << "/__ydb_backup_meta/indexes/Table/ByAge"; + auto indexBackup = KqpSimpleExec(runtime, TStringBuilder() << R"( + SELECT * FROM `)" << indexBackupPath << R"(` + )"); + + UNIT_ASSERT_C(indexBackup.find("uint32_value: 30") != TString::npos, + "Index backup should contain age 30"); + UNIT_ASSERT_C(indexBackup.find("Alice") != TString::npos, + "Index backup should contain Alice2 from covering column name update"); + UNIT_ASSERT_C(indexBackup.find("uint32_value: 25") != TString::npos, + "Index backup should contain tombstone for age 25"); + UNIT_ASSERT_C(indexBackup.find("uint32_value: 26") != TString::npos, + "Index backup should contain tombstone for age 26"); + UNIT_ASSERT_C(indexBackup.find("null_flag_value: NULL_VALUE") != TString::npos, + "Index backup tombstones should have NULL for covering columns"); + + auto counts = CountCdcOperations(indexBackup); + Cerr << "CDC metadata: " << counts.Deletes << " DELETEs, " << counts.Inserts << " INSERTs" << Endl; + + UNIT_ASSERT_EQUAL_C(counts.Deletes, 2, "Should have 2 DELETE operations (tombstones for age 25 and 26)"); + UNIT_ASSERT_EQUAL_C(counts.Inserts, 1, "Should have 1 INSERT operation (for Alice2)"); + } + + Y_UNIT_TEST(IncrementalBackupMultipleIndexes) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + .SetEnableBackupService(true) + .SetEnableRealSystemViewPaths(false) + .SetEnableDataColumnForIndexTable(true) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + + TShardedTableOptions opts; + opts.Columns({ + {"key", "Uint32", true, false}, + {"name", "Utf8", false, false}, + {"age", "Uint32", false, false}, + {"city", "Utf8", false, false}, + {"salary", "Uint32", false, false} + }); + opts.Indexes({ + TShardedTableOptions::TIndex{"ByName", {"name"}, {}, NKikimrSchemeOp::EIndexTypeGlobal}, + TShardedTableOptions::TIndex{"ByAge", {"age"}, {"salary"}, NKikimrSchemeOp::EIndexTypeGlobal}, + TShardedTableOptions::TIndex{"ByCity", {"city", "name"}, {}, NKikimrSchemeOp::EIndexTypeGlobal} + }); + CreateShardedTable(server, edgeActor, "/Root", "Table", opts); + + ExecSQL(server, edgeActor, R"( + CREATE BACKUP COLLECTION `MyCollection` + ( TABLE `/Root/Table` + ) + WITH + ( STORAGE = 'cluster' + , INCREMENTAL_BACKUP_ENABLED = 'true' + ); + )", false); + + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection`;)", false); + SimulateSleep(server, TDuration::Seconds(5)); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, name, age, city, salary) VALUES + (1, 'Alice', 30u, 'NYC', 5000u) + , (2, 'Bob', 25u, 'LA', 4000u) + ; + )"); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, name, age, city, salary) VALUES (1, 'Alice2', 30u, 'NYC', 5000u); + )"); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, name, age, city, salary) VALUES (2, 'Bob', 26u, 'LA', 4000u); + )"); + + ExecSQL(server, edgeActor, R"(DELETE FROM `/Root/Table` WHERE key=1;)"); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, name, age, city, salary) VALUES (3, 'Carol', 28u, 'SF', 5500u); + )"); + + SimulateSleep(server, TDuration::Seconds(1)); + + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection` INCREMENTAL;)", false); + + SimulateSleep(server, TDuration::Seconds(10)); + + TString incrBackupDir = FindIncrementalBackupDir(runtime, edgeActor, "/Root/.backups/collections/MyCollection"); + UNIT_ASSERT_C(!incrBackupDir.empty(), "Could not find incremental backup directory"); + + TString byNamePath = TStringBuilder() << "/Root/.backups/collections/MyCollection/" << incrBackupDir << "/__ydb_backup_meta/indexes/Table/ByName"; + auto byNameBackup = KqpSimpleExec(runtime, TStringBuilder() << R"( + SELECT * FROM `)" << byNamePath << R"(` + )"); + UNIT_ASSERT_C(byNameBackup.find("Alice") != TString::npos, + "ByName backup should contain Alice (deleted)"); + UNIT_ASSERT_C(byNameBackup.find("Alice2") != TString::npos, + "ByName backup should contain Alice2 (updated)"); + UNIT_ASSERT_C(byNameBackup.find("Carol") != TString::npos, + "ByName backup should contain Carol (new)"); + + TString byAgePath = TStringBuilder() << "/Root/.backups/collections/MyCollection/" << incrBackupDir << "/__ydb_backup_meta/indexes/Table/ByAge"; + auto byAgeBackup = KqpSimpleExec(runtime, TStringBuilder() << R"( + SELECT * FROM `)" << byAgePath << R"(` + )"); + UNIT_ASSERT_C(byAgeBackup.find("uint32_value: 30") != TString::npos, + "ByAge backup should contain age 30 (deleted)"); + UNIT_ASSERT_C(byAgeBackup.find("uint32_value: 25") != TString::npos || + byAgeBackup.find("uint32_value: 26") != TString::npos, + "ByAge backup should contain age change (25 or 26)"); + UNIT_ASSERT_C(byAgeBackup.find("uint32_value: 28") != TString::npos, + "ByAge backup should contain age 28 (new)"); + UNIT_ASSERT_C(byAgeBackup.find("uint32_value: 5500") != TString::npos, + "ByAge backup should contain covered salary 5500"); + + TString byCityPath = TStringBuilder() << "/Root/.backups/collections/MyCollection/" << incrBackupDir << "/__ydb_backup_meta/indexes/Table/ByCity"; + auto byCityBackup = KqpSimpleExec(runtime, TStringBuilder() << R"( + SELECT * FROM `)" << byCityPath << R"(` + )"); + UNIT_ASSERT_C(byCityBackup.find("NYC") != TString::npos, + "ByCity backup should contain NYC"); + UNIT_ASSERT_C(byCityBackup.find("Alice") != TString::npos, + "ByCity backup should contain Alice (part of composite key)"); + UNIT_ASSERT_C(byCityBackup.find("Alice2") != TString::npos, + "ByCity backup should contain Alice2 (updated composite key)"); + UNIT_ASSERT_C(byCityBackup.find("SF") != TString::npos, + "ByCity backup should contain SF (new)"); + UNIT_ASSERT_C(byCityBackup.find("Carol") != TString::npos, + "ByCity backup should contain Carol (new composite key)"); + } + + Y_UNIT_TEST(OmitIndexesIncrementalBackup) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + .SetEnableBackupService(true) + .SetEnableRealSystemViewPaths(false) + .SetEnableDataColumnForIndexTable(true) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + + TShardedTableOptions opts; + opts.Columns({ + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false} + }); + opts.Indexes({ + TShardedTableOptions::TIndex{"ByValue", {"value"}, {}, NKikimrSchemeOp::EIndexTypeGlobal} + }); + CreateShardedTable(server, edgeActor, "/Root", "Table", opts); + + ExecSQL(server, edgeActor, R"( + CREATE BACKUP COLLECTION `MyCollection` + ( TABLE `/Root/Table` + ) + WITH + ( STORAGE = 'cluster' + , INCREMENTAL_BACKUP_ENABLED = 'true' + , OMIT_INDEXES = 'true' + ); + )", false); + + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection`;)", false); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 100) + , (2, 200) + , (3, 300) + ; + )"); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES (2, 250); + )"); + + ExecSQL(server, edgeActor, R"(DELETE FROM `/Root/Table` WHERE key=3;)"); + + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection` INCREMENTAL;)", false); + + SimulateSleep(server, TDuration::Seconds(5)); + + auto mainTableBackup = KqpSimpleExec(runtime, R"( + SELECT key, value FROM `/Root/.backups/collections/MyCollection/19700101000002Z_incremental/Table` + ORDER BY key + )"); + + UNIT_ASSERT_C(mainTableBackup.find("uint32_value: 2") != TString::npos, + "Main table backup should exist with OmitIndexes flag"); + UNIT_ASSERT_C(mainTableBackup.find("uint32_value: 250") != TString::npos, + "Main table backup should contain updated value"); + + bool indexBackupExists = true; + try { + auto indexBackup = KqpSimpleExec(runtime, R"( + SELECT * FROM `/Root/.backups/collections/MyCollection/19700101000002Z_incremental/__ydb_backup_meta/indexes/Table/ByValue` + )"); + if (indexBackup.empty() || indexBackup.find("ERROR") != TString::npos || + indexBackup.find("not found") != TString::npos || indexBackup.find("doesn't exist") != TString::npos) { + indexBackupExists = false; + } + } catch (...) { + indexBackupExists = false; + } + + UNIT_ASSERT_C(!indexBackupExists, "Index backup should NOT exist when OmitIndexes flag is set"); + } + Y_UNIT_TEST(BasicIndexIncrementalRestore) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + .SetEnableBackupService(true) + .SetEnableRealSystemViewPaths(false) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + + // Create table with one global index + CreateShardedTable(server, edgeActor, "/Root", "TableWithIndex", + TShardedTableOptions() + .Columns({ + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false}, + {"indexed_col", "Uint32", false, false} + }) + .Indexes({ + {"value_index", {"indexed_col"}, {}, NKikimrSchemeOp::EIndexTypeGlobal} + })); + + // Insert data + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/TableWithIndex` (key, value, indexed_col) VALUES + (1, 10, 100), + (2, 20, 200), + (3, 30, 300); + )"); + + // Create backup collection + ExecSQL(server, edgeActor, R"( + CREATE BACKUP COLLECTION `IndexTestCollection` + ( TABLE `/Root/TableWithIndex` + ) + WITH + ( STORAGE = 'cluster' + , INCREMENTAL_BACKUP_ENABLED = 'true' + ); + )", false); + + // Create full backup + ExecSQL(server, edgeActor, R"(BACKUP `IndexTestCollection`;)", false); + SimulateSleep(server, TDuration::Seconds(1)); + + // Modify data + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/TableWithIndex` (key, value, indexed_col) VALUES + (4, 40, 400), + (2, 25, 250); + )"); + + // Create incremental backup + ExecSQL(server, edgeActor, R"(BACKUP `IndexTestCollection` INCREMENTAL;)", false); + SimulateSleep(server, TDuration::Seconds(5)); + + // Capture expected state + auto expectedTable = KqpSimpleExec(runtime, R"( + SELECT key, value, indexed_col FROM `/Root/TableWithIndex` ORDER BY key + )"); + + auto expectedIndex = KqpSimpleExec(runtime, R"( + SELECT indexed_col FROM `/Root/TableWithIndex` VIEW value_index WHERE indexed_col > 0 ORDER BY indexed_col + )"); + + // Drop table (this also drops index) + ExecSQL(server, edgeActor, R"(DROP TABLE `/Root/TableWithIndex`;)", false); + + // Restore from backups + ExecSQL(server, edgeActor, R"(RESTORE `IndexTestCollection`;)", false); + runtime.SimulateSleep(TDuration::Seconds(10)); + + // Verify table data + auto actualTable = KqpSimpleExec(runtime, R"( + SELECT key, value, indexed_col FROM `/Root/TableWithIndex` ORDER BY key + )"); + UNIT_ASSERT_VALUES_EQUAL(expectedTable, actualTable); + + // Verify index works and has correct data + auto actualIndex = KqpSimpleExec(runtime, R"( + SELECT indexed_col FROM `/Root/TableWithIndex` VIEW value_index WHERE indexed_col > 0 ORDER BY indexed_col + )"); + UNIT_ASSERT_VALUES_EQUAL(expectedIndex, actualIndex); + + // Verify we can query using the index + auto indexQuery = KqpSimpleExec(runtime, R"( + SELECT key, indexed_col FROM `/Root/TableWithIndex` VIEW value_index WHERE indexed_col = 250 + )"); + UNIT_ASSERT_C(indexQuery.find("uint32_value: 2") != TString::npos, "Should find key=2"); + UNIT_ASSERT_C(indexQuery.find("uint32_value: 250") != TString::npos, "Should find indexed_col=250"); + + // Verify index implementation table was restored correctly + auto indexImplTable = KqpSimpleExec(runtime, R"( + SELECT indexed_col, key FROM `/Root/TableWithIndex/value_index/indexImplTable` ORDER BY indexed_col + )"); + // Should have 4 rows after incremental: (100,1), (250,2), (300,3), (400,4) + UNIT_ASSERT_C(indexImplTable.find("uint32_value: 100") != TString::npos, "Index table should have indexed_col=100"); + UNIT_ASSERT_C(indexImplTable.find("uint32_value: 250") != TString::npos, "Index table should have indexed_col=250"); + UNIT_ASSERT_C(indexImplTable.find("uint32_value: 300") != TString::npos, "Index table should have indexed_col=300"); + UNIT_ASSERT_C(indexImplTable.find("uint32_value: 400") != TString::npos, "Index table should have indexed_col=400"); + + // Count rows in index impl table + auto indexRowCount = KqpSimpleExec(runtime, R"( + SELECT COUNT(*) FROM `/Root/TableWithIndex/value_index/indexImplTable` + )"); + UNIT_ASSERT_C(indexRowCount.find("uint64_value: 4") != TString::npos, "Index table should have 4 rows"); + } + + Y_UNIT_TEST(MultipleIndexesIncrementalRestore) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + .SetEnableBackupService(true) + .SetEnableRealSystemViewPaths(false) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + + // Create table with multiple global indexes + CreateShardedTable(server, edgeActor, "/Root", "MultiIndexTable", + TShardedTableOptions() + .Columns({ + {"key", "Uint32", true, false}, + {"value1", "Uint32", false, false}, + {"value2", "Uint32", false, false}, + {"value3", "Uint32", false, false} + }) + .Indexes({ + {"index1", {"value1"}, {}, NKikimrSchemeOp::EIndexTypeGlobal}, + {"index2", {"value2"}, {}, NKikimrSchemeOp::EIndexTypeGlobal}, + {"index3", {"value3"}, {}, NKikimrSchemeOp::EIndexTypeGlobal} + })); + + // Insert data + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/MultiIndexTable` (key, value1, value2, value3) VALUES + (1, 11, 21, 31), + (2, 12, 22, 32), + (3, 13, 23, 33); + )"); + + // Create backup collection + ExecSQL(server, edgeActor, R"( + CREATE BACKUP COLLECTION `MultiIndexCollection` + ( TABLE `/Root/MultiIndexTable` + ) + WITH + ( STORAGE = 'cluster' + , INCREMENTAL_BACKUP_ENABLED = 'true' + ); + )", false); + + // Create full backup + ExecSQL(server, edgeActor, R"(BACKUP `MultiIndexCollection`;)", false); + SimulateSleep(server, TDuration::Seconds(1)); + + // Modify data + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/MultiIndexTable` (key, value1, value2, value3) VALUES + (4, 14, 24, 34); + )"); + + // Create incremental backup + ExecSQL(server, edgeActor, R"(BACKUP `MultiIndexCollection` INCREMENTAL;)", false); + SimulateSleep(server, TDuration::Seconds(5)); + + // Capture expected state for all indexes + auto expectedTable = KqpSimpleExec(runtime, R"( + SELECT key, value1, value2, value3 FROM `/Root/MultiIndexTable` ORDER BY key + )"); + + // Drop and restore + ExecSQL(server, edgeActor, R"(DROP TABLE `/Root/MultiIndexTable`;)", false); + ExecSQL(server, edgeActor, R"(RESTORE `MultiIndexCollection`;)", false); + runtime.SimulateSleep(TDuration::Seconds(10)); + + // Verify table data + auto actualTable = KqpSimpleExec(runtime, R"( + SELECT key, value1, value2, value3 FROM `/Root/MultiIndexTable` ORDER BY key + )"); + UNIT_ASSERT_VALUES_EQUAL(expectedTable, actualTable); + + // Verify all indexes work + auto index1Query = KqpSimpleExec(runtime, R"( + SELECT key FROM `/Root/MultiIndexTable` VIEW index1 WHERE value1 = 14 + )"); + UNIT_ASSERT_C(index1Query.find("uint32_value: 4") != TString::npos, "Index1 should work"); + + auto index2Query = KqpSimpleExec(runtime, R"( + SELECT key FROM `/Root/MultiIndexTable` VIEW index2 WHERE value2 = 24 + )"); + UNIT_ASSERT_C(index2Query.find("uint32_value: 4") != TString::npos, "Index2 should work"); + + auto index3Query = KqpSimpleExec(runtime, R"( + SELECT key FROM `/Root/MultiIndexTable` VIEW index3 WHERE value3 = 34 + )"); + UNIT_ASSERT_C(index3Query.find("uint32_value: 4") != TString::npos, "Index3 should work"); + + // Verify all index implementation tables were restored + auto index1ImplCount = KqpSimpleExec(runtime, R"( + SELECT COUNT(*) FROM `/Root/MultiIndexTable/index1/indexImplTable` + )"); + UNIT_ASSERT_C(index1ImplCount.find("uint64_value: 4") != TString::npos, "Index1 impl table should have 4 rows"); + + auto index2ImplCount = KqpSimpleExec(runtime, R"( + SELECT COUNT(*) FROM `/Root/MultiIndexTable/index2/indexImplTable` + )"); + UNIT_ASSERT_C(index2ImplCount.find("uint64_value: 4") != TString::npos, "Index2 impl table should have 4 rows"); + + auto index3ImplCount = KqpSimpleExec(runtime, R"( + SELECT COUNT(*) FROM `/Root/MultiIndexTable/index3/indexImplTable` + )"); + UNIT_ASSERT_C(index3ImplCount.find("uint64_value: 4") != TString::npos, "Index3 impl table should have 4 rows"); + + // Verify index3 impl table data (spot check) + auto index3ImplData = KqpSimpleExec(runtime, R"( + SELECT value3, key FROM `/Root/MultiIndexTable/index3/indexImplTable` WHERE value3 = 34 + )"); + UNIT_ASSERT_C(index3ImplData.find("uint32_value: 34") != TString::npos, "Index3 impl should have value3=34"); + UNIT_ASSERT_C(index3ImplData.find("uint32_value: 4") != TString::npos, "Index3 impl should have key=4"); + } + + Y_UNIT_TEST(IndexDataVerificationIncrementalRestore) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + .SetEnableBackupService(true) + .SetEnableRealSystemViewPaths(false) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + + // Create table with index + CreateShardedTable(server, edgeActor, "/Root", "DataVerifyTable", + TShardedTableOptions() + .Shards(2) + .Columns({ + {"key", "Uint32", true, false}, + {"name", "Utf8", false, false}, + {"age", "Uint32", false, false} + }) + .Indexes({ + {"age_index", {"age"}, {}, NKikimrSchemeOp::EIndexTypeGlobal} + })); + + // Insert data across shards + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/DataVerifyTable` (key, name, age) VALUES + (1, 'Alice', 25), + (2, 'Bob', 30), + (11, 'Charlie', 35), + (12, 'David', 40); + )"); + + // Create backup collection + ExecSQL(server, edgeActor, R"( + CREATE BACKUP COLLECTION `DataVerifyCollection` + ( TABLE `/Root/DataVerifyTable` + ) + WITH + ( STORAGE = 'cluster' + , INCREMENTAL_BACKUP_ENABLED = 'true' + ); + )", false); + + // Full backup + ExecSQL(server, edgeActor, R"(BACKUP `DataVerifyCollection`;)", false); + SimulateSleep(server, TDuration::Seconds(1)); + + // Modify: update existing records and add new ones + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/DataVerifyTable` (key, name, age) VALUES + (2, 'Bob', 31), -- update in shard 1 + (12, 'David', 41), -- update in shard 2 + (3, 'Eve', 28), -- new in shard 1 + (13, 'Frank', 45); -- new in shard 2 + )"); + + // Delete some records + ExecSQL(server, edgeActor, R"( + DELETE FROM `/Root/DataVerifyTable` WHERE key IN (1, 11); + )"); + + // Incremental backup + ExecSQL(server, edgeActor, R"(BACKUP `DataVerifyCollection` INCREMENTAL;)", false); + SimulateSleep(server, TDuration::Seconds(5)); + + // Verify index has correct data BEFORE restore + auto beforeRestore = KqpSimpleExec(runtime, R"( + SELECT key, name, age FROM `/Root/DataVerifyTable` VIEW age_index WHERE age >= 30 ORDER BY age + )"); + + // Drop and restore + ExecSQL(server, edgeActor, R"(DROP TABLE `/Root/DataVerifyTable`;)", false); + ExecSQL(server, edgeActor, R"(RESTORE `DataVerifyCollection`;)", false); + runtime.SimulateSleep(TDuration::Seconds(10)); + + // Verify index has correct data AFTER restore + auto afterRestore = KqpSimpleExec(runtime, R"( + SELECT key, name, age FROM `/Root/DataVerifyTable` VIEW age_index WHERE age >= 30 ORDER BY age + )"); + + UNIT_ASSERT_VALUES_EQUAL(beforeRestore, afterRestore); + + // Verify specific queries + UNIT_ASSERT_C(afterRestore.find("text_value: \"Bob\"") != TString::npos, "Bob should be present"); + UNIT_ASSERT_C(afterRestore.find("uint32_value: 31") != TString::npos, "Age 31 should be present"); + UNIT_ASSERT_C(afterRestore.find("text_value: \"Alice\"") == TString::npos, "Alice should be deleted"); + UNIT_ASSERT_C(afterRestore.find("text_value: \"Frank\"") != TString::npos, "Frank should be present"); + UNIT_ASSERT_C(afterRestore.find("uint32_value: 45") != TString::npos, "Age 45 should be present"); + + // Verify index implementation table has correct data + auto indexImplData = KqpSimpleExec(runtime, R"( + SELECT age, key, name FROM `/Root/DataVerifyTable/age_index/indexImplTable` ORDER BY age + )"); + // Should have: (28, 3, Eve), (31, 2, Bob), (41, 12, David), (45, 13, Frank) + // Deleted: (25, 1, Alice), (35, 11, Charlie) + UNIT_ASSERT_C(indexImplData.find("uint32_value: 28") != TString::npos, "Index should have age=28"); + UNIT_ASSERT_C(indexImplData.find("text_value: \"Eve\"") != TString::npos, "Index should have Eve"); + UNIT_ASSERT_C(indexImplData.find("uint32_value: 31") != TString::npos, "Index should have age=31"); + UNIT_ASSERT_C(indexImplData.find("text_value: \"Bob\"") != TString::npos, "Index should have Bob"); + UNIT_ASSERT_C(indexImplData.find("text_value: \"Alice\"") == TString::npos, "Index should NOT have Alice"); + UNIT_ASSERT_C(indexImplData.find("text_value: \"Charlie\"") == TString::npos, "Index should NOT have Charlie"); + + auto indexImplCount = KqpSimpleExec(runtime, R"( + SELECT COUNT(*) FROM `/Root/DataVerifyTable/age_index/indexImplTable` + )"); + UNIT_ASSERT_C(indexImplCount.find("uint64_value: 4") != TString::npos, "Index impl table should have 4 rows"); + } + + Y_UNIT_TEST(MultipleIncrementalBackupsWithIndexes) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + .SetEnableBackupService(true) + .SetEnableRealSystemViewPaths(false) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + + // Create table with index + CreateShardedTable(server, edgeActor, "/Root", "SequenceTable", + TShardedTableOptions() + .Columns({ + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false}, + {"indexed", "Uint32", false, false} + }) + .Indexes({ + {"idx", {"indexed"}, {}, NKikimrSchemeOp::EIndexTypeGlobal} + })); + + // Initial data + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/SequenceTable` (key, value, indexed) VALUES + (1, 10, 100), + (2, 20, 200); + )"); + + // Create backup collection + ExecSQL(server, edgeActor, R"( + CREATE BACKUP COLLECTION `SequenceCollection` + ( TABLE `/Root/SequenceTable` + ) + WITH + ( STORAGE = 'cluster' + , INCREMENTAL_BACKUP_ENABLED = 'true' + ); + )", false); + + // Full backup + ExecSQL(server, edgeActor, R"(BACKUP `SequenceCollection`;)", false); + SimulateSleep(server, TDuration::Seconds(1)); + + // First incremental: add data + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/SequenceTable` (key, value, indexed) VALUES (3, 30, 300); + )"); + ExecSQL(server, edgeActor, R"(BACKUP `SequenceCollection` INCREMENTAL;)", false); + SimulateSleep(server, TDuration::Seconds(5)); + + // Second incremental: update data + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/SequenceTable` (key, value, indexed) VALUES (2, 25, 250); + )"); + ExecSQL(server, edgeActor, R"(BACKUP `SequenceCollection` INCREMENTAL;)", false); + SimulateSleep(server, TDuration::Seconds(5)); + + // Third incremental: delete and add + ExecSQL(server, edgeActor, R"( + DELETE FROM `/Root/SequenceTable` WHERE key = 1; + )"); + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/SequenceTable` (key, value, indexed) VALUES (4, 40, 400); + )"); + ExecSQL(server, edgeActor, R"(BACKUP `SequenceCollection` INCREMENTAL;)", false); + SimulateSleep(server, TDuration::Seconds(5)); + + // Capture expected state + auto expectedTable = KqpSimpleExec(runtime, R"( + SELECT key, value, indexed FROM `/Root/SequenceTable` ORDER BY key + )"); + + auto expectedIndex = KqpSimpleExec(runtime, R"( + SELECT indexed FROM `/Root/SequenceTable` VIEW idx WHERE indexed > 0 ORDER BY indexed + )"); + + // Drop and restore + ExecSQL(server, edgeActor, R"(DROP TABLE `/Root/SequenceTable`;)", false); + ExecSQL(server, edgeActor, R"(RESTORE `SequenceCollection`;)", false); + runtime.SimulateSleep(TDuration::Seconds(15)); + + // Verify + auto actualTable = KqpSimpleExec(runtime, R"( + SELECT key, value, indexed FROM `/Root/SequenceTable` ORDER BY key + )"); + UNIT_ASSERT_VALUES_EQUAL(expectedTable, actualTable); + + auto actualIndex = KqpSimpleExec(runtime, R"( + SELECT indexed FROM `/Root/SequenceTable` VIEW idx WHERE indexed > 0 ORDER BY indexed + )"); + UNIT_ASSERT_VALUES_EQUAL(expectedIndex, actualIndex); + + // Verify final state: key 1 deleted, key 2 updated, keys 3 and 4 added + UNIT_ASSERT_C(actualTable.find("uint32_value: 1") == TString::npos, "Key 1 should be deleted"); + UNIT_ASSERT_C(actualTable.find("uint32_value: 25") != TString::npos, "Key 2 should have value 25"); + UNIT_ASSERT_C(actualTable.find("uint32_value: 30") != TString::npos, "Key 3 should exist"); + UNIT_ASSERT_C(actualTable.find("uint32_value: 40") != TString::npos, "Key 4 should exist"); + + // Verify index implementation table reflects all 3 incremental changes + auto indexImplData = KqpSimpleExec(runtime, R"( + SELECT indexed, key FROM `/Root/SequenceTable/idx/indexImplTable` ORDER BY indexed + )"); + // Final state should be: (250, 2), (300, 3), (400, 4) + // Deleted: (100, 1), (200, 2->old value) + UNIT_ASSERT_C(indexImplData.find("uint32_value: 100") == TString::npos, "Index should NOT have indexed=100 (deleted)"); + UNIT_ASSERT_C(indexImplData.find("uint32_value: 200") == TString::npos, "Index should NOT have indexed=200 (updated)"); + UNIT_ASSERT_C(indexImplData.find("uint32_value: 250") != TString::npos, "Index should have indexed=250 (updated value)"); + UNIT_ASSERT_C(indexImplData.find("uint32_value: 300") != TString::npos, "Index should have indexed=300 (added)"); + UNIT_ASSERT_C(indexImplData.find("uint32_value: 400") != TString::npos, "Index should have indexed=400 (added)"); + + auto indexImplCount = KqpSimpleExec(runtime, R"( + SELECT COUNT(*) FROM `/Root/SequenceTable/idx/indexImplTable` + )"); + UNIT_ASSERT_C(indexImplCount.find("uint64_value: 3") != TString::npos, "Index impl table should have 3 rows"); + } + + Y_UNIT_TEST(MultipleTablesWithIndexesIncrementalRestore) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + .SetEnableBackupService(true) + .SetEnableRealSystemViewPaths(false) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + + Cerr << "===== STEP 1: Creating Table1 with idx1 =====" << Endl; + // Create first table with index + CreateShardedTable(server, edgeActor, "/Root", "Table1", + TShardedTableOptions() + .Columns({ + {"key", "Uint32", true, false}, + {"val1", "Uint32", false, false} + }) + .Indexes({ + {"idx1", {"val1"}, {}, NKikimrSchemeOp::EIndexTypeGlobal} + })); + Cerr << "===== Table1 created successfully =====" << Endl; + + Cerr << "===== STEP 2: Creating Table2 with idx2 =====" << Endl; + // Create second table with different index + CreateShardedTable(server, edgeActor, "/Root", "Table2", + TShardedTableOptions() + .Columns({ + {"key", "Uint32", true, false}, + {"val2", "Uint32", false, false} + }) + .Indexes({ + {"idx2", {"val2"}, {}, NKikimrSchemeOp::EIndexTypeGlobal} + })); + Cerr << "===== Table2 created successfully =====" << Endl; + + Cerr << "===== STEP 3: Inserting initial data =====" << Endl; + // Insert data into both tables + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table1` (key, val1) VALUES (1, 100), (2, 200); + UPSERT INTO `/Root/Table2` (key, val2) VALUES (1, 1000), (2, 2000); + )"); + Cerr << "===== Initial data inserted successfully =====" << Endl; + + Cerr << "===== STEP 4: Creating backup collection =====" << Endl; + // Create backup collection with both tables + ExecSQL(server, edgeActor, R"( + CREATE BACKUP COLLECTION `MultiTableCollection` + ( TABLE `/Root/Table1` + , TABLE `/Root/Table2` + ) + WITH + ( STORAGE = 'cluster' + , INCREMENTAL_BACKUP_ENABLED = 'true' + ); + )", false); + Cerr << "===== Backup collection created successfully =====" << Endl; + + Cerr << "===== STEP 5: Performing full backup =====" << Endl; + // Full backup + ExecSQL(server, edgeActor, R"(BACKUP `MultiTableCollection`;)", false); + SimulateSleep(server, TDuration::Seconds(1)); + Cerr << "===== Full backup completed successfully =====" << Endl; + + Cerr << "===== STEP 6: Modifying data =====" << Endl; + // Modify both tables + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table1` (key, val1) VALUES (3, 300); + UPSERT INTO `/Root/Table2` (key, val2) VALUES (3, 3000); + )"); + Cerr << "===== Data modified successfully =====" << Endl; + + Cerr << "===== STEP 7: Performing incremental backup =====" << Endl; + // Incremental backup + ExecSQL(server, edgeActor, R"(BACKUP `MultiTableCollection` INCREMENTAL;)", false); + SimulateSleep(server, TDuration::Seconds(5)); + Cerr << "===== Incremental backup completed successfully =====" << Endl; + + Cerr << "===== STEP 8: Querying Table1 to capture expected state =====" << Endl; + // Capture expected states + auto expected1 = KqpSimpleExec(runtime, R"( + SELECT key, val1 FROM `/Root/Table1` ORDER BY key + )"); + Cerr << "===== Table1 query successful =====" << Endl; + + Cerr << "===== STEP 9: Querying Table2 to capture expected state =====" << Endl; + auto expected2 = KqpSimpleExec(runtime, R"( + SELECT key, val2 FROM `/Root/Table2` ORDER BY key + )"); + Cerr << "===== Table2 query successful =====" << Endl; + + Cerr << "===== STEP 10: Dropping Table1 =====" << Endl; + // Drop both tables + ExecSQL(server, edgeActor, R"(DROP TABLE `/Root/Table1`;)", false); + Cerr << "===== Table1 dropped successfully =====" << Endl; + + Cerr << "===== STEP 11: Dropping Table2 =====" << Endl; + ExecSQL(server, edgeActor, R"(DROP TABLE `/Root/Table2`;)", false); + Cerr << "===== Table2 dropped successfully =====" << Endl; + + Cerr << "===== STEP 12: Restoring from backup =====" << Endl; + // Restore + ExecSQL(server, edgeActor, R"(RESTORE `MultiTableCollection`;)", false); + runtime.SimulateSleep(TDuration::Seconds(10)); + Cerr << "===== Restore completed successfully =====" << Endl; + + Cerr << "===== STEP 13: Verifying Table1 data =====" << Endl; + // Verify both tables and indexes + auto actual1 = KqpSimpleExec(runtime, R"( + SELECT key, val1 FROM `/Root/Table1` ORDER BY key + )"); + Cerr << "===== Table1 verification query successful =====" << Endl; + + Cerr << "===== STEP 14: Verifying Table2 data =====" << Endl; + auto actual2 = KqpSimpleExec(runtime, R"( + SELECT key, val2 FROM `/Root/Table2` ORDER BY key + )"); + Cerr << "===== Table2 verification query successful =====" << Endl; + + Cerr << "===== STEP 15: Checking data equality for Table1 =====" << Endl; + UNIT_ASSERT_VALUES_EQUAL(expected1, actual1); + Cerr << "===== Table1 data matches expected =====" << Endl; + + Cerr << "===== STEP 16: Checking data equality for Table2 =====" << Endl; + UNIT_ASSERT_VALUES_EQUAL(expected2, actual2); + Cerr << "===== Table2 data matches expected =====" << Endl; + + Cerr << "===== STEP 17: Verifying idx1 works =====" << Endl; + // Verify indexes work + auto idx1Query = KqpSimpleExec(runtime, R"( + SELECT key FROM `/Root/Table1` VIEW idx1 WHERE val1 = 300 + )"); + UNIT_ASSERT_C(idx1Query.find("uint32_value: 3") != TString::npos, "Index idx1 should work"); + Cerr << "===== idx1 verification successful =====" << Endl; + + Cerr << "===== STEP 18: Verifying idx2 works =====" << Endl; + auto idx2Query = KqpSimpleExec(runtime, R"( + SELECT key FROM `/Root/Table2` VIEW idx2 WHERE val2 = 3000 + )"); + UNIT_ASSERT_C(idx2Query.find("uint32_value: 3") != TString::npos, "Index idx2 should work"); + Cerr << "===== idx2 verification successful =====" << Endl; + + Cerr << "===== STEP 19: Verifying Table1 index impl table =====" << Endl; + // Verify both index implementation tables were restored + auto idx1ImplCount = KqpSimpleExec(runtime, R"( + SELECT COUNT(*) FROM `/Root/Table1/idx1/indexImplTable` + )"); + UNIT_ASSERT_C(idx1ImplCount.find("uint64_value: 3") != TString::npos, "Table1 index impl should have 3 rows"); + Cerr << "===== Table1 index impl table verification successful =====" << Endl; + + Cerr << "===== STEP 20: Verifying Table2 index impl table =====" << Endl; + auto idx2ImplCount = KqpSimpleExec(runtime, R"( + SELECT COUNT(*) FROM `/Root/Table2/idx2/indexImplTable` + )"); + UNIT_ASSERT_C(idx2ImplCount.find("uint64_value: 3") != TString::npos, "Table2 index impl should have 3 rows"); + Cerr << "===== Table2 index impl table verification successful =====" << Endl; + + Cerr << "===== STEP 21: Verifying Table1 index impl table data =====" << Endl; + // Verify index impl tables have correct data + auto idx1ImplData = KqpSimpleExec(runtime, R"( + SELECT val1, key FROM `/Root/Table1/idx1/indexImplTable` WHERE val1 = 300 + )"); + UNIT_ASSERT_C(idx1ImplData.find("uint32_value: 300") != TString::npos, "Table1 index should have val1=300"); + UNIT_ASSERT_C(idx1ImplData.find("uint32_value: 3") != TString::npos, "Table1 index should have key=3"); + Cerr << "===== Table1 index impl table data verification successful =====" << Endl; + + Cerr << "===== STEP 22: Verifying Table2 index impl table data =====" << Endl; + auto idx2ImplData = KqpSimpleExec(runtime, R"( + SELECT val2, key FROM `/Root/Table2/idx2/indexImplTable` WHERE val2 = 3000 + )"); + UNIT_ASSERT_C(idx2ImplData.find("uint32_value: 3000") != TString::npos, "Table2 index should have val2=3000"); + UNIT_ASSERT_C(idx2ImplData.find("uint32_value: 3") != TString::npos, "Table2 index should have key=3"); + Cerr << "===== Table2 index impl table data verification successful =====" << Endl; + + Cerr << "===== TEST COMPLETED SUCCESSFULLY =====" << Endl; + } + + } // Y_UNIT_TEST_SUITE(IncrementalBackup) } // NKikimr diff --git a/ydb/core/tx/schemeshard/schemeshard__backup_collection_common.cpp b/ydb/core/tx/schemeshard/schemeshard__backup_collection_common.cpp index 28c22af19ece..a44e5e18897c 100644 --- a/ydb/core/tx/schemeshard/schemeshard__backup_collection_common.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__backup_collection_common.cpp @@ -130,6 +130,65 @@ std::optional>> GetBackupRequiredPaths( } } + // Add index backup metadata directories if incremental backup is enabled + bool incrBackupEnabled = bc->Description.HasIncrementalBackupConfig(); + + // Check OmitIndexes from two possible locations: + // 1. Top-level OmitIndexes field (for full backups) + // 2. IncrementalBackupConfig.OmitIndexes (for incremental backups) + bool omitIndexes = bc->Description.GetOmitIndexes() || + (incrBackupEnabled && bc->Description.GetIncrementalBackupConfig().GetOmitIndexes()); + + if (incrBackupEnabled && !omitIndexes) { + for (const auto& item : bc->Description.GetExplicitEntryList().GetEntries()) { + const auto tablePath = TPath::Resolve(item.GetPath(), context.SS); + + // Skip if path is not resolved or not a table + auto checks = tablePath.Check(); + checks.IsResolved().IsTable(); + if (!checks) { + continue; + } + + std::pair paths; + TString err; + if (!TrySplitPathByDb(item.GetPath(), tx.GetWorkingDir(), paths, err)) { + continue; + } + auto& relativeItemPath = paths.second; + + // Check if table has indexes + for (const auto& [childName, childPathId] : tablePath.Base()->GetChildren()) { + auto childPath = context.SS->PathsById.at(childPathId); + + if (childPath->PathType != NKikimrSchemeOp::EPathTypeTableIndex) { + continue; + } + + // Skip deleted indexes + if (childPath->Dropped()) { + continue; + } + + auto indexInfo = context.SS->Indexes.at(childPathId); + if (indexInfo->Type != NKikimrSchemeOp::EIndexTypeGlobal) { + continue; + } + + // Add required PARENT directory path for index backup: + // {targetDir}/__ydb_backup_meta/indexes/{table_path} + // The index name will be the table name created within this directory + TString indexBackupParentPath = JoinPath({ + targetDir, + "__ydb_backup_meta", + "indexes", + relativeItemPath + }); + collectionPaths.emplace(indexBackupParentPath); + } + } + } + return paths; } diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 4cb2e5ff16a9..c26815a43e20 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -3673,6 +3673,18 @@ struct TSchemeShard::TTxInit : public TTransactionBase { Y_ABORT_UNLESS(deserializeRes); txState.CdcPathId = TPathId::FromProto(proto.GetTxCopyTableExtraData().GetCdcPathId()); } + } else if (txState.TxType == TTxState::TxCreateCdcStreamAtTable || + txState.TxType == TTxState::TxCreateCdcStreamAtTableWithInitialScan || + txState.TxType == TTxState::TxAlterCdcStreamAtTable || + txState.TxType == TTxState::TxAlterCdcStreamAtTableDropSnapshot || + txState.TxType == TTxState::TxDropCdcStreamAtTable || + txState.TxType == TTxState::TxDropCdcStreamAtTableDropSnapshot) { + if (!extraData.empty()) { + NKikimrSchemeOp::TGenericTxInFlyExtraData proto; + bool deserializeRes = ParseFromStringNoSizeLimit(proto, extraData); + Y_ABORT_UNLESS(deserializeRes); + txState.CdcPathId = TPathId::FromProto(proto.GetTxCopyTableExtraData().GetCdcPathId()); + } } Y_ABORT_UNLESS(txState.TxType != TTxState::TxInvalid); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp index 92155f564df4..2e68d878d6ab 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp @@ -459,6 +459,7 @@ class TAlterCdcStreamAtTable: public TSubOperation { Y_ABORT_UNLESS(!context.SS->FindTx(OperationId)); auto& txState = context.SS->CreateTx(OperationId, txType, tablePath.Base()->PathId); txState.State = TTxState::ConfigureParts; + txState.CdcPathId = streamPath.Base()->PathId; // Store CDC stream PathId for later use tablePath.Base()->PathState = NKikimrSchemeOp::EPathStateAlter; tablePath.Base()->LastTxId = OperationId.GetTxId(); @@ -635,7 +636,7 @@ TVector CreateAlterCdcStream(TOperationId opId, const TTxTr result.push_back(DropLock(NextPartId(opId, result), outTx)); } - if (workingDirPath.IsTableIndex()) { + if (workingDirPath.IsTableIndex() && !streamName.EndsWith("_continuousBackupImpl")) { auto outTx = TransactionTemplate(workingDirPath.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex); outTx.MutableAlterTableIndex()->SetName(workingDirPath.LeafName()); outTx.MutableAlterTableIndex()->SetState(NKikimrSchemeOp::EIndexState::EIndexStateReady); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp index eb6891a41f4f..60752a3d91ce 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp @@ -2,6 +2,8 @@ #include "schemeshard__op_traits.h" #include "schemeshard__operation_common.h" #include "schemeshard__operation_create_cdc_stream.h" +#include "schemeshard__operation_part.h" +#include "schemeshard_utils.h" #include "schemeshard_impl.h" @@ -64,12 +66,12 @@ TVector CreateBackupBackupCollection(TOperationId opId, con Y_ABORT_UNLESS(context.SS->BackupCollections.contains(bcPath->PathId)); const auto& bc = context.SS->BackupCollections[bcPath->PathId]; bool incrBackupEnabled = bc->Description.HasIncrementalBackupConfig(); + + bool omitIndexes = bc->Description.GetOmitIndexes() || + (incrBackupEnabled && bc->Description.GetIncrementalBackupConfig().GetOmitIndexes()); + TString streamName = NBackup::ToX509String(TlsActivationContext->AsActorContext().Now()) + "_continuousBackupImpl"; - bool omitIndexes = bc->Description.HasOmitIndexes() - ? bc->Description.GetOmitIndexes() - : false; - for (const auto& item : bc->Description.GetExplicitEntryList().GetEntries()) { auto& desc = *copyTables.Add(); desc.SetSrcPath(item.GetPath()); @@ -167,6 +169,74 @@ TVector CreateBackupBackupCollection(TOperationId opId, con NCdc::DoCreatePqPart(result, createCdcStreamOp, opId, streamPath, streamName, table, boundaries, false); } + + if (!omitIndexes) { + for (const auto& item : bc->Description.GetExplicitEntryList().GetEntries()) { + const auto tablePath = TPath::Resolve(item.GetPath(), context.SS); + + // Iterate through table's children to find indexes + for (const auto& [childName, childPathId] : tablePath.Base()->GetChildren()) { + auto childPath = context.SS->PathsById.at(childPathId); + + // Skip non-index children (CDC streams, etc.) + if (childPath->PathType != NKikimrSchemeOp::EPathTypeTableIndex) { + continue; + } + + // Skip deleted indexes + if (childPath->Dropped()) { + continue; + } + + // Get index info and filter for global sync only + // We need more complex logic for vector indexes in future + auto indexInfo = context.SS->Indexes.at(childPathId); + if (indexInfo->Type != NKikimrSchemeOp::EIndexTypeGlobal) { + continue; + } + + // Get index implementation table (the only child of index) + auto indexPath = TPath::Init(childPathId, context.SS); + Y_ABORT_UNLESS(indexPath.Base()->GetChildren().size() == 1); + auto [implTableName, implTablePathId] = *indexPath.Base()->GetChildren().begin(); + + auto indexTablePath = indexPath.Child(implTableName); + auto indexTable = context.SS->Tables.at(implTablePathId); + + // Create CDC stream on index impl table + NKikimrSchemeOp::TCreateCdcStream createCdcStreamOp; + createCdcStreamOp.SetTableName(implTableName); + auto& streamDescription = *createCdcStreamOp.MutableStreamDescription(); + streamDescription.SetName(streamName); + streamDescription.SetMode(NKikimrSchemeOp::ECdcStreamModeUpdate); + streamDescription.SetFormat(NKikimrSchemeOp::ECdcStreamFormatProto); + + NCdc::DoCreateStreamImpl(result, createCdcStreamOp, opId, indexTablePath, false, false); + + // Create AtTable operation to notify datashard (without schema change) + { + auto outTx = TransactionTemplate(indexPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamAtTable); + auto& cdcOp = *outTx.MutableCreateCdcStream(); + cdcOp.CopyFrom(createCdcStreamOp); + result.push_back(CreateNewCdcStreamAtTable(NextPartId(opId, result), outTx, false)); + } + + // Create PQ part for index CDC stream + TVector boundaries; + const auto& partitions = indexTable->GetPartitions(); + boundaries.reserve(partitions.size() - 1); + for (ui32 i = 0; i < partitions.size(); ++i) { + const auto& partition = partitions.at(i); + if (i != partitions.size() - 1) { + boundaries.push_back(partition.EndOfRange); + } + } + + const auto streamPath = indexTablePath.Child(streamName); + NCdc::DoCreatePqPart(result, createCdcStreamOp, opId, streamPath, streamName, indexTable, boundaries, false); + } + } + } } return result; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_backup_incremental_backup_collection.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_backup_incremental_backup_collection.cpp index e08dcf9c0e17..6b49d60002e9 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_backup_incremental_backup_collection.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_backup_incremental_backup_collection.cpp @@ -223,6 +223,79 @@ TVector CreateBackupIncrementalBackupCollection(TOperationI streams.push_back(stream); } + // Process indexes if they are not omitted + bool omitIndexes = bc->Description.GetIncrementalBackupConfig().GetOmitIndexes(); + if (!omitIndexes) { + for (const auto& item : bc->Description.GetExplicitEntryList().GetEntries()) { + const auto tablePath = TPath::Resolve(item.GetPath(), context.SS); + auto table = context.SS->Tables.at(tablePath.Base()->PathId); + + std::pair paths; + TString err; + if (!TrySplitPathByDb(item.GetPath(), bcPath.GetDomainPathString(), paths, err)) { + result = {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, err)}; + return result; + } + auto& relativeItemPath = paths.second; + + // Iterate through table's children to find indexes + for (const auto& [childName, childPathId] : tablePath.Base()->GetChildren()) { + auto childPath = context.SS->PathsById.at(childPathId); + + // Skip non-index children (CDC streams, etc.) + if (childPath->PathType != NKikimrSchemeOp::EPathTypeTableIndex) { + continue; + } + + // Skip deleted indexes + if (childPath->Dropped()) { + continue; + } + + // Get index info and filter for global sync only + auto indexInfo = context.SS->Indexes.at(childPathId); + if (indexInfo->Type != NKikimrSchemeOp::EIndexTypeGlobal) { + continue; + } + + // Get index implementation table (single child of index) + auto indexPath = TPath::Init(childPathId, context.SS); + Y_ABORT_UNLESS(indexPath.Base()->GetChildren().size() == 1); + auto [implTableName, implTablePathId] = *indexPath.Base()->GetChildren().begin(); + + // Build relative path to index impl table (relative to working dir) + TString indexImplTableRelPath = JoinPath({relativeItemPath, childName, implTableName}); + + // Create AlterContinuousBackup for index impl table + NKikimrSchemeOp::TModifyScheme modifyScheme; + modifyScheme.SetWorkingDir(tx.GetWorkingDir()); + modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterContinuousBackup); + modifyScheme.SetInternal(true); + + auto& cb = *modifyScheme.MutableAlterContinuousBackup(); + cb.SetTableName(indexImplTableRelPath); // Relative path: table1/index1/indexImplTable + + auto& ib = *cb.MutableTakeIncrementalBackup(); + // Destination: {backup_collection}/{timestamp}_inc/__ydb_backup_meta/indexes/{table_path}/{index_name} + TString dstPath = JoinPath({ + tx.GetBackupIncrementalBackupCollection().GetName(), + tx.GetBackupIncrementalBackupCollection().GetTargetDir(), + "__ydb_backup_meta", + "indexes", + relativeItemPath, // Relative table path (e.g., "table1") + childName // Index name (e.g., "index1") + }); + ib.SetDstPath(dstPath); + + TPathId stream; + if (!CreateAlterContinuousBackup(opId, modifyScheme, context, result, stream)) { + return result; + } + streams.push_back(stream); + } + } + } + CreateLongIncrementalBackupOp(opId, bcPath, result, streams); return result; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common_cdc_stream.cpp index 2cb60ead5641..d8e4566bfebb 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common_cdc_stream.cpp @@ -127,6 +127,54 @@ bool TProposeAtTable::HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOpera table->AlterVersion += 1; NIceDb::TNiceDb db(context.GetDB()); + + bool isContinuousBackupStream = false; + if (txState->CdcPathId && context.SS->PathsById.contains(txState->CdcPathId)) { + auto cdcPath = context.SS->PathsById.at(txState->CdcPathId); + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " Checking CDC stream name" + << ", cdcPathId: " << txState->CdcPathId + << ", streamName: " << cdcPath->Name + << ", at schemeshard: " << context.SS->SelfTabletId()); + if (cdcPath->Name.EndsWith("_continuousBackupImpl")) { + isContinuousBackupStream = true; + } + } else { + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " CdcPathId not found" + << ", cdcPathId: " << txState->CdcPathId + << ", at schemeshard: " << context.SS->SelfTabletId()); + } + + // Check if this is an index implementation table + // If so, we need to sync the parent index version to match the impl table version + // Do this ONLY for continuous backup operations + TPathId parentPathId = path->ParentPathId; + if (parentPathId && context.SS->PathsById.contains(parentPathId) && isContinuousBackupStream) { + auto parentPath = context.SS->PathsById.at(parentPathId); + if (parentPath->IsTableIndex()) { + Y_ABORT_UNLESS(context.SS->Indexes.contains(parentPathId)); + auto index = context.SS->Indexes.at(parentPathId); + + index->AlterVersion = table->AlterVersion; + + // Persist the index version update directly to database + db.Table().Key(parentPathId.LocalPathId).Update( + NIceDb::TUpdate(index->AlterVersion) + ); + + context.SS->ClearDescribePathCaches(parentPath); + context.OnComplete.PublishToSchemeBoard(OperationId, parentPathId); + + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " Synced parent index version with impl table" + << ", indexPathId: " << parentPathId + << ", indexName: " << parentPath->Name + << ", newVersion: " << index->AlterVersion + << ", at schemeshard: " << context.SS->SelfTabletId()); + } + } + context.SS->PersistTableAlterVersion(db, pathId, table); context.SS->ClearDescribePathCaches(path); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index 211bb3654e0d..2ade3854dede 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -322,6 +322,13 @@ class TNewCdcStream: public TSubOperation { Y_ABORT_UNLESS(!context.SS->FindTx(OperationId)); auto& txState = context.SS->CreateTx(OperationId, TTxState::TxCreateCdcStream, streamPath.Base()->PathId); + txState.CdcPathId = streamPath.Base()->PathId; + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "DoNewStream: Set CdcPathId" + << ", operationId: " << OperationId + << ", cdcPathId: " << streamPath.Base()->PathId + << ", streamName: " << streamPath.Base()->Name + << ", at schemeshard: " << context.SS->SelfTabletId()); txState.State = TTxState::Propose; streamPath.Base()->PathState = NKikimrSchemeOp::EPathStateCreate; @@ -582,6 +589,18 @@ class TNewCdcStreamAtTable: public TSubOperation { Y_ABORT_UNLESS(!context.SS->FindTx(OperationId)); auto& txState = context.SS->CreateTx(OperationId, txType, tablePath.Base()->PathId); txState.State = TTxState::ConfigureParts; + + // Set CdcPathId for continuous backup detection + auto streamPath = tablePath.Child(streamName); + if (streamPath.IsResolved()) { + txState.CdcPathId = streamPath.Base()->PathId; + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TNewCdcStreamAtTable: Set CdcPathId" + << ", operationId: " << OperationId + << ", cdcPathId: " << streamPath.Base()->PathId + << ", streamName: " << streamName + << ", at schemeshard: " << context.SS->SelfTabletId()); + } tablePath.Base()->PathState = NKikimrSchemeOp::EPathStateAlter; tablePath.Base()->LastTxId = OperationId.GetTxId(); @@ -984,7 +1003,7 @@ TVector CreateNewCdcStream(TOperationId opId, const TTxTran DoCreateLock(result, opId, workingDirPath, tablePath); } - if (workingDirPath.IsTableIndex()) { + if (workingDirPath.IsTableIndex() && !streamName.EndsWith("_continuousBackupImpl")) { auto outTx = TransactionTemplate(workingDirPath.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex); outTx.MutableAlterTableIndex()->SetName(workingDirPath.LeafName()); outTx.MutableAlterTableIndex()->SetState(NKikimrSchemeOp::EIndexState::EIndexStateReady); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp index 521cc4868509..6662d3dc0ebb 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp @@ -251,6 +251,46 @@ class TDone: public TSubOperationState { Y_ABORT_UNLESS(context.SS->PathsById.contains(txState->TargetPathId)); auto targetPath = context.SS->PathsById.at(txState->TargetPathId); + // If we restored an index implementation table, sync the parent index's schema version + auto dstTablePath = TPath::Init(txState->TargetPathId, context.SS); + + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[" << context.SS->SelfTabletId() << "] " << DebugHint() + << " Checking if restored path is index impl table" + << ": path# " << dstTablePath.PathString() + << ", isResolved# " << dstTablePath.IsResolved() + << ", isInsideTableIndexPath# " << dstTablePath.IsInsideTableIndexPath(false)); + + if (dstTablePath.IsInsideTableIndexPath(false)) { + auto indexPath = dstTablePath.Parent(); + auto tablePath = indexPath.Parent(); + + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "[" << context.SS->SelfTabletId() << "] " << DebugHint() + << " Syncing index schema version after restore" + << ": index# " << indexPath.PathString() + << ", impl table# " << dstTablePath.PathString() + << ", table# " << tablePath.PathString()); + + // Create AlterTableIndex operation to sync schema versions + auto request = MakeHolder(); + auto& record = request->Record; + + auto txId = context.SS->GetCachedTxId(context.Ctx); + record.SetTxId(ui64(txId)); + + auto& alterTx = *record.AddTransaction(); + alterTx.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex); + alterTx.SetWorkingDir(tablePath.PathString()); + alterTx.SetInternal(true); + + auto alterIndex = alterTx.MutableAlterTableIndex(); + alterIndex->SetName(indexPath.LeafName()); + alterIndex->SetState(NKikimrSchemeOp::EIndexStateReady); + + context.Ctx.Send(context.SS->SelfId(), request.Release()); + } + context.SS->ClearDescribePathCaches(targetPath); context.OnComplete.PublishToSchemeBoard(OperationId, txState->TargetPathId); context.OnComplete.ReleasePathState(OperationId, txState->TargetPathId, TPathElement::EPathState::EPathStateNoChanges); @@ -602,6 +642,9 @@ bool CreateRestoreMultipleIncrementalBackups( result.push_back(CreateRestoreIncrementalBackupAtTable(NextPartId(opId, result), outTx)); } + // Note: AlterTableIndex for schema version sync is now handled in TDone::ProgressState + // after the restore completes, not here (where it would use stale schema versions) + return true; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp index 593de4172b0a..11b82835a7ba 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_cdc_stream.cpp @@ -191,6 +191,7 @@ class TDropCdcStream: public TSubOperation { Y_ABORT_UNLESS(!context.SS->FindTx(OperationId)); auto& txState = context.SS->CreateTx(OperationId, TTxState::TxDropCdcStream, streamPath.Base()->PathId); + txState.CdcPathId = streamPath.Base()->PathId; txState.State = TTxState::Propose; txState.MinStep = TStepId(1); @@ -583,7 +584,15 @@ void DoDropStream( result.push_back(DropLock(NextPartId(opId, result), outTx)); } - if (workingDirPath.IsTableIndex()) { + bool hasContinuousBackupStream = false; + for (const auto& streamPath : streamPaths) { + if (streamPath.Base()->Name.EndsWith("_continuousBackupImpl")) { + hasContinuousBackupStream = true; + break; + } + } + + if (workingDirPath.IsTableIndex() && !hasContinuousBackupStream) { auto outTx = TransactionTemplate(workingDirPath.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex); outTx.MutableAlterTableIndex()->SetName(workingDirPath.LeafName()); outTx.MutableAlterTableIndex()->SetState(NKikimrSchemeOp::EIndexState::EIndexStateReady); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index a6c07f17acb8..0ced223da83a 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -2643,6 +2643,16 @@ void TSchemeShard::PersistTxState(NIceDb::TNiceDb& db, const TOperationId opId) txState.CdcPathId.ToProto(proto.MutableTxCopyTableExtraData()->MutableCdcPathId()); bool serializeRes = proto.SerializeToString(&extraData); Y_ABORT_UNLESS(serializeRes); + } else if (txState.TxType == TTxState::TxCreateCdcStreamAtTable || + txState.TxType == TTxState::TxCreateCdcStreamAtTableWithInitialScan || + txState.TxType == TTxState::TxAlterCdcStreamAtTable || + txState.TxType == TTxState::TxAlterCdcStreamAtTableDropSnapshot || + txState.TxType == TTxState::TxDropCdcStreamAtTable || + txState.TxType == TTxState::TxDropCdcStreamAtTableDropSnapshot) { + NKikimrSchemeOp::TGenericTxInFlyExtraData proto; + txState.CdcPathId.ToProto(proto.MutableTxCopyTableExtraData()->MutableCdcPathId()); + bool serializeRes = proto.SerializeToString(&extraData); + Y_ABORT_UNLESS(serializeRes); } db.Table().Key(opId.GetTxId(), opId.GetSubTxId()).Update( diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 226abf154171..0cdfaf4b4b7b 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -1194,6 +1194,36 @@ class TSchemeShard void Handle(TEvDataShard::TEvIncrementalRestoreResponse::TPtr& ev, const TActorContext& ctx); void CreateIncrementalRestoreOperation(const TPathId& backupCollectionPathId, ui64 operationId, const TString& backupName, const TActorContext& ctx); + void DiscoverAndCreateIndexRestoreOperations( + const TPathId& backupCollectionPathId, + ui64 operationId, + const TString& backupName, + const TPath& bcPath, + const TBackupCollectionInfo::TPtr& backupCollectionInfo, + const TActorContext& ctx); + + void DiscoverIndexesRecursive( + ui64 operationId, + const TString& backupName, + const TPath& bcPath, + const TBackupCollectionInfo::TPtr& backupCollectionInfo, + const TPath& currentPath, + const TString& accumulatedRelativePath, + const TActorContext& ctx); + + void CreateSingleIndexRestoreOperation( + ui64 operationId, + const TString& backupName, + const TPath& bcPath, + const TString& relativeTablePath, + const TString& indexName, + const TString& targetTablePath, + const TActorContext& ctx); + + TString FindTargetTablePath( + const TBackupCollectionInfo::TPtr& backupCollectionInfo, + const TString& relativeTablePath); + void Handle(TEvDataShard::TEvProposeTransactionAttachResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActorContext &ctx); diff --git a/ydb/core/tx/schemeshard/schemeshard_incremental_restore_scan.cpp b/ydb/core/tx/schemeshard/schemeshard_incremental_restore_scan.cpp index e3fc2fbe4e08..8f976f3d0cb4 100644 --- a/ydb/core/tx/schemeshard/schemeshard_incremental_restore_scan.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_incremental_restore_scan.cpp @@ -594,10 +594,272 @@ void TSchemeShard::CreateIncrementalRestoreOperation( LOG_W("Incremental backup path not found: " << incrBackupPathStr); } } - + + // Discover and create index restore operations in parallel + DiscoverAndCreateIndexRestoreOperations( + backupCollectionPathId, + operationId, + backupName, + bcPath, + backupCollectionInfo, + ctx + ); + LOG_I("Created separate restore operations for incremental backup: " << backupName); } +TString TSchemeShard::FindTargetTablePath( + const TBackupCollectionInfo::TPtr& backupCollectionInfo, + const TString& relativeTablePath) { + + // Map backup relative path to restore target path using backup collection's ExplicitEntryList + for (const auto& item : backupCollectionInfo->Description.GetExplicitEntryList().GetEntries()) { + if (item.GetType() != NKikimrSchemeOp::TBackupCollectionDescription_TBackupEntry_EType_ETypeTable) { + continue; + } + + // Extract the relative part of the item path + // Item path is like /Root/db/table1, we need to extract the relative part + TString itemPath = item.GetPath(); + + // Only accept exact matches or suffixes preceded by path separator + // to avoid false matches (e.g. "/Root/FooBar" should not match "Bar") + if (itemPath == relativeTablePath || itemPath.EndsWith("/" + relativeTablePath)) { + return itemPath; + } + } + + return {}; +} + +void TSchemeShard::DiscoverIndexesRecursive( + ui64 operationId, + const TString& backupName, + const TPath& bcPath, + const TBackupCollectionInfo::TPtr& backupCollectionInfo, + const TPath& currentPath, + const TString& accumulatedRelativePath, + const TActorContext& ctx) { + + // Try to find target table for current accumulated path + TString targetTablePath = FindTargetTablePath(backupCollectionInfo, accumulatedRelativePath); + + if (!targetTablePath.empty()) { + // Found target table, children are indexes + LOG_I("Found table mapping: " << accumulatedRelativePath << " -> " << targetTablePath); + + for (const auto& [indexName, indexDirPathId] : currentPath.Base()->GetChildren()) { + CreateSingleIndexRestoreOperation( + operationId, + backupName, + bcPath, + accumulatedRelativePath, + indexName, + targetTablePath, + ctx + ); + } + } else { + // Not a table yet, descend into children to build up the path + for (const auto& [childName, childPathId] : currentPath.Base()->GetChildren()) { + auto childPath = TPath::Init(childPathId, this); + TString newRelativePath = accumulatedRelativePath.empty() + ? childName + : accumulatedRelativePath + "/" + childName; + + DiscoverIndexesRecursive( + operationId, + backupName, + bcPath, + backupCollectionInfo, + childPath, + newRelativePath, + ctx + ); + } + } +} + +void TSchemeShard::DiscoverAndCreateIndexRestoreOperations( + const TPathId& /*backupCollectionPathId*/, + ui64 operationId, + const TString& backupName, + const TPath& bcPath, + const TBackupCollectionInfo::TPtr& backupCollectionInfo, + const TActorContext& ctx) { + + // Check if indexes were backed up (OmitIndexes flag) + bool omitIndexes = backupCollectionInfo->Description.GetIncrementalBackupConfig().GetOmitIndexes(); + if (omitIndexes) { + LOG_I("Indexes were omitted in backup, skipping index restore"); + return; + } + + // Path to index metadata: {backup}/__ydb_backup_meta/indexes + TString indexMetaBasePath = JoinPath({ + bcPath.PathString(), + backupName + "_incremental", + "__ydb_backup_meta", + "indexes" + }); + + const TPath& indexMetaPath = TPath::Resolve(indexMetaBasePath, this); + if (!indexMetaPath.IsResolved()) { + LOG_I("No index metadata found at: " << indexMetaBasePath << " (this is normal if no indexes were backed up)"); + return; + } + + LOG_I("Discovering indexes for restore at: " << indexMetaBasePath); + + // Start recursive discovery from the indexes root with empty accumulated path + DiscoverIndexesRecursive( + operationId, + backupName, + bcPath, + backupCollectionInfo, + indexMetaPath, + "", // Start with empty accumulated path + ctx + ); +} + +void TSchemeShard::CreateSingleIndexRestoreOperation( + ui64 operationId, + const TString& backupName, + const TPath& bcPath, + const TString& relativeTablePath, + const TString& indexName, + const TString& targetTablePath, + const TActorContext& ctx) { + + LOG_I("CreateSingleIndexRestoreOperation: table=" << targetTablePath + << " index=" << indexName + << " relativeTablePath=" << relativeTablePath); + + // Validate target table exists + const TPath targetTablePathObj = TPath::Resolve(targetTablePath, this); + if (!targetTablePathObj.IsResolved() || !targetTablePathObj.Base()->IsTable()) { + LOG_W("Target table not found or invalid: " << targetTablePath); + return; + } + + // Find the index and its impl table + TPathId indexPathId; + TPathId indexImplTablePathId; + bool indexFound = false; + + for (const auto& [childName, childPathId] : targetTablePathObj.Base()->GetChildren()) { + if (childName == indexName) { + auto childPath = PathsById.at(childPathId); + if (childPath->PathType == NKikimrSchemeOp::EPathTypeTableIndex) { + indexPathId = childPathId; + + // Get index info to verify it's a global index + auto indexInfoIt = Indexes.find(indexPathId); + if (indexInfoIt == Indexes.end()) { + LOG_W("Index info not found for pathId: " << indexPathId); + return; + } + + auto indexInfo = indexInfoIt->second; + if (indexInfo->Type != NKikimrSchemeOp::EIndexTypeGlobal) { + LOG_I("Skipping non-global index: " << indexName << " (type=" << indexInfo->Type << ")"); + return; + } + + // Get index impl table (single child of index) + auto indexPath = TPath::Init(indexPathId, this); + if (indexPath.Base()->GetChildren().size() == 1) { + auto [implTableName, implTablePathId] = *indexPath.Base()->GetChildren().begin(); + indexImplTablePathId = implTablePathId; + indexFound = true; + LOG_I("Found global index '" << indexName << "' with impl table: " << implTableName); + break; + } else { + LOG_W("Index '" << indexName << "' has unexpected number of children: " + << indexPath.Base()->GetChildren().size()); + return; + } + } + } + } + + if (!indexFound) { + LOG_W("Index '" << indexName << "' not found on table " << targetTablePath + << " - skipping (index may have been dropped)"); + return; + } + + // Source: {backup}/__ydb_backup_meta/indexes/{table}/{index} + TString srcIndexBackupPath = JoinPath({ + bcPath.PathString(), + backupName + "_incremental", + "__ydb_backup_meta", + "indexes", + relativeTablePath, + indexName + }); + + const TPath& srcBackupPath = TPath::Resolve(srcIndexBackupPath, this); + if (!srcBackupPath.IsResolved()) { + LOG_W("Index backup not found at: " << srcIndexBackupPath); + return; + } + + // Destination: {table}/{index}/indexImplTable + auto indexImplTablePath = TPath::Init(indexImplTablePathId, this); + TString dstIndexImplPath = indexImplTablePath.PathString(); + + LOG_I("Creating index restore operation: " << srcIndexBackupPath << " -> " << dstIndexImplPath); + + // Create restore request (SAME structure as table restore) + auto indexRequest = MakeHolder(); + auto& indexRecord = indexRequest->Record; + + TTxId indexTxId = GetCachedTxId(ctx); + indexRecord.SetTxId(ui64(indexTxId)); + + auto& indexTx = *indexRecord.AddTransaction(); + indexTx.SetOperationType(NKikimrSchemeOp::ESchemeOpRestoreMultipleIncrementalBackups); + indexTx.SetInternal(true); + indexTx.SetWorkingDir(bcPath.PathString()); + + auto& indexRestore = *indexTx.MutableRestoreMultipleIncrementalBackups(); + indexRestore.AddSrcTablePaths(srcIndexBackupPath); + indexRestore.SetDstTablePath(dstIndexImplPath); + + // Track this operation as part of incremental restore + TOperationId indexRestoreOpId(indexTxId, 0); + IncrementalRestoreOperationToState[indexRestoreOpId] = operationId; + TxIdToIncrementalRestore[indexTxId] = operationId; + + auto stateIt = IncrementalRestoreStates.find(operationId); + if (stateIt != IncrementalRestoreStates.end()) { + // Add to in-progress operations (will be tracked alongside table operations) + stateIt->second.InProgressOperations.insert(indexRestoreOpId); + + // Track expected shards for this index impl table + auto& indexOpState = stateIt->second.TableOperations[indexRestoreOpId]; + indexOpState.OperationId = indexRestoreOpId; + + if (Tables.contains(indexImplTablePathId)) { + auto indexImplTable = Tables.at(indexImplTablePathId); + for (const auto& [shardIdx, partitionIdx] : indexImplTable->GetShard2PartitionIdx()) { + indexOpState.ExpectedShards.insert(shardIdx); + stateIt->second.InvolvedShards.insert(shardIdx); + } + LOG_I("Index operation " << indexRestoreOpId << " expects " << indexOpState.ExpectedShards.size() << " shards"); + } + + LOG_I("Tracking index operation " << indexRestoreOpId << " for incremental restore " << operationId); + } + + // Send the request (parallel with table operations) + // Note: AlterTableIndex for schema version sync is handled in CreateRestoreMultipleIncrementalBackups + LOG_I("Sending index restore operation for: " << dstIndexImplPath); + Send(SelfId(), indexRequest.Release()); +} + // Notification function for operation completion void TSchemeShard::NotifyIncrementalRestoreOperationCompleted(const TOperationId& operationId, const TActorContext& ctx) { // Find which incremental restore this operation belongs to diff --git a/ydb/core/tx/schemeshard/ut_backup_collection/ut_backup_collection.cpp b/ydb/core/tx/schemeshard/ut_backup_collection/ut_backup_collection.cpp index 966a0273d239..1b4521e464da 100644 --- a/ydb/core/tx/schemeshard/ut_backup_collection/ut_backup_collection.cpp +++ b/ydb/core/tx/schemeshard/ut_backup_collection/ut_backup_collection.cpp @@ -1700,6 +1700,619 @@ Y_UNIT_TEST_SUITE(TBackupCollectionTests) { TestLs(runtime, "/MyRoot/.backups/collections/TestCollection/__ydb_backup_meta", false, NLs::PathExist); } + Y_UNIT_TEST(SingleTableWithGlobalSyncIndex) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true)); + ui64 txId = 100; + + SetupLogging(runtime); + PrepareDirs(runtime, env, txId); + + // Create incremental backup collection + TString collectionSettings = R"( + Name: ")" DEFAULT_NAME_1 R"(" + ExplicitEntryList { + Entries { + Type: ETypeTable + Path: "/MyRoot/TableWithIndex" + } + } + Cluster: {} + IncrementalBackupConfig: {} + )"; + + TestCreateBackupCollection(runtime, ++txId, "/MyRoot/.backups/collections/", collectionSettings); + env.TestWaitNotification(runtime, txId); + + // Create table with one global sync covering index + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "TableWithIndex" + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "ValueIndex" + KeyColumnNames: ["value"] + Type: EIndexTypeGlobal + } + )"); + env.TestWaitNotification(runtime, txId); + + // Execute full backup + TestBackupBackupCollection(runtime, ++txId, "/MyRoot", + R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")"); + env.TestWaitNotification(runtime, txId); + + // Verify CDC stream exists on main table + auto mainTableDesc = DescribePrivatePath(runtime, "/MyRoot/TableWithIndex", true, true); + UNIT_ASSERT(mainTableDesc.GetPathDescription().HasTable()); + + const auto& tableDesc = mainTableDesc.GetPathDescription().GetTable(); + bool foundMainTableCdc = false; + TString mainTableCdcName; + + for (size_t i = 0; i < tableDesc.CdcStreamsSize(); ++i) { + const auto& cdcStream = tableDesc.GetCdcStreams(i); + if (cdcStream.GetName().EndsWith("_continuousBackupImpl")) { + foundMainTableCdc = true; + mainTableCdcName = cdcStream.GetName(); + Cerr << "Found main table CDC stream: " << mainTableCdcName << Endl; + break; + } + } + UNIT_ASSERT_C(foundMainTableCdc, "Main table should have CDC stream with '_continuousBackupImpl' suffix"); + + // Verify CDC stream exists on index implementation table + auto indexDesc = DescribePrivatePath(runtime, "/MyRoot/TableWithIndex/ValueIndex", true, true); + UNIT_ASSERT(indexDesc.GetPathDescription().HasTableIndex()); + + // Get index implementation table (first child of index) + UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetPathDescription().ChildrenSize(), 1); + TString indexImplTableName = indexDesc.GetPathDescription().GetChildren(0).GetName(); + + auto indexImplTableDesc = DescribePrivatePath(runtime, + "/MyRoot/TableWithIndex/ValueIndex/" + indexImplTableName, true, true); + UNIT_ASSERT(indexImplTableDesc.GetPathDescription().HasTable()); + + const auto& indexTableDesc = indexImplTableDesc.GetPathDescription().GetTable(); + bool foundIndexCdc = false; + TString indexCdcName; + + for (size_t i = 0; i < indexTableDesc.CdcStreamsSize(); ++i) { + const auto& cdcStream = indexTableDesc.GetCdcStreams(i); + if (cdcStream.GetName().EndsWith("_continuousBackupImpl")) { + foundIndexCdc = true; + indexCdcName = cdcStream.GetName(); + Cerr << "Found index CDC stream: " << indexCdcName << Endl; + break; + } + } + UNIT_ASSERT_C(foundIndexCdc, "Index implementation table should have CDC stream with '_continuousBackupImpl' suffix"); + + // Verify CDC stream names match pattern and use same timestamp + UNIT_ASSERT_VALUES_EQUAL(mainTableCdcName, indexCdcName); + UNIT_ASSERT_C(mainTableCdcName.Contains("Z") && mainTableCdcName.EndsWith("_continuousBackupImpl"), + "CDC stream name should have X.509 timestamp format (YYYYMMDDHHMMSSZ_continuousBackupImpl)"); + } + + Y_UNIT_TEST(SingleTableWithMultipleGlobalSyncIndexes) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true)); + ui64 txId = 100; + + SetupLogging(runtime); + PrepareDirs(runtime, env, txId); + + // Create incremental backup collection + TString collectionSettings = R"( + Name: ")" DEFAULT_NAME_1 R"(" + ExplicitEntryList { + Entries { + Type: ETypeTable + Path: "/MyRoot/TableWithMultipleIndexes" + } + } + Cluster: {} + IncrementalBackupConfig: {} + )"; + + TestCreateBackupCollection(runtime, ++txId, "/MyRoot/.backups/collections/", collectionSettings); + env.TestWaitNotification(runtime, txId); + + // Create table with multiple global sync indexes + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "TableWithMultipleIndexes" + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "value1" Type: "Utf8" } + Columns { Name: "value2" Type: "Uint64" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "Value1Index" + KeyColumnNames: ["value1"] + Type: EIndexTypeGlobal + } + IndexDescription { + Name: "Value2Index" + KeyColumnNames: ["value2"] + DataColumnNames: ["value1"] + Type: EIndexTypeGlobal + } + )"); + env.TestWaitNotification(runtime, txId); + + // Execute full backup + TestBackupBackupCollection(runtime, ++txId, "/MyRoot", + R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")"); + env.TestWaitNotification(runtime, txId); + + // Verify CDC stream on main table + auto mainTableDesc = DescribePrivatePath(runtime, "/MyRoot/TableWithMultipleIndexes", true, true); + const auto& tableDesc = mainTableDesc.GetPathDescription().GetTable(); + + TString mainCdcName; + for (size_t i = 0; i < tableDesc.CdcStreamsSize(); ++i) { + const auto& cdcStream = tableDesc.GetCdcStreams(i); + if (cdcStream.GetName().EndsWith("_continuousBackupImpl")) { + mainCdcName = cdcStream.GetName(); + break; + } + } + UNIT_ASSERT_C(!mainCdcName.empty(), "Main table should have CDC stream"); + + // Verify CDC streams on both indexes + TVector indexNames = {"Value1Index", "Value2Index"}; + TVector indexCdcNames; + + for (const auto& indexName : indexNames) { + auto indexDesc = DescribePrivatePath(runtime, + "/MyRoot/TableWithMultipleIndexes/" + indexName, true, true); + UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetPathDescription().ChildrenSize(), 1); + TString indexImplTableName = indexDesc.GetPathDescription().GetChildren(0).GetName(); + + auto indexImplTableDesc = DescribePrivatePath(runtime, + "/MyRoot/TableWithMultipleIndexes/" + indexName + "/" + indexImplTableName, true, true); + const auto& indexTableDesc = indexImplTableDesc.GetPathDescription().GetTable(); + + bool foundCdc = false; + for (size_t i = 0; i < indexTableDesc.CdcStreamsSize(); ++i) { + const auto& cdcStream = indexTableDesc.GetCdcStreams(i); + if (cdcStream.GetName().EndsWith("_continuousBackupImpl")) { + indexCdcNames.push_back(cdcStream.GetName()); + foundCdc = true; + Cerr << "Found CDC stream on " << indexName << ": " << cdcStream.GetName() << Endl; + break; + } + } + UNIT_ASSERT_C(foundCdc, "Index " + indexName + " should have CDC stream"); + } + + // Verify all streams use the same timestamp + UNIT_ASSERT_VALUES_EQUAL(indexCdcNames.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(mainCdcName, indexCdcNames[0]); + UNIT_ASSERT_VALUES_EQUAL(mainCdcName, indexCdcNames[1]); + } + + Y_UNIT_TEST(TableWithMixedIndexTypes) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true)); + ui64 txId = 100; + + SetupLogging(runtime); + PrepareDirs(runtime, env, txId); + + // Create incremental backup collection + TString collectionSettings = R"( + Name: ")" DEFAULT_NAME_1 R"(" + ExplicitEntryList { + Entries { + Type: ETypeTable + Path: "/MyRoot/TableWithMixedIndexes" + } + } + Cluster: {} + IncrementalBackupConfig: {} + )"; + + TestCreateBackupCollection(runtime, ++txId, "/MyRoot/.backups/collections/", collectionSettings); + env.TestWaitNotification(runtime, txId); + + // Create table with global sync + async indexes + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "TableWithMixedIndexes" + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "value1" Type: "Utf8" } + Columns { Name: "value2" Type: "Uint64" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "SyncIndex" + KeyColumnNames: ["value1"] + Type: EIndexTypeGlobal + } + IndexDescription { + Name: "AsyncIndex" + KeyColumnNames: ["value2"] + Type: EIndexTypeGlobalAsync + } + )"); + env.TestWaitNotification(runtime, txId); + + // Execute full backup + TestBackupBackupCollection(runtime, ++txId, "/MyRoot", + R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")"); + env.TestWaitNotification(runtime, txId); + + // Verify CDC stream on main table + auto mainTableDesc = DescribePrivatePath(runtime, "/MyRoot/TableWithMixedIndexes", true, true); + const auto& tableDesc = mainTableDesc.GetPathDescription().GetTable(); + + bool foundMainCdc = false; + for (size_t i = 0; i < tableDesc.CdcStreamsSize(); ++i) { + if (tableDesc.GetCdcStreams(i).GetName().EndsWith("_continuousBackupImpl")) { + foundMainCdc = true; + break; + } + } + UNIT_ASSERT_C(foundMainCdc, "Main table should have CDC stream"); + + // Verify CDC stream on global sync index ONLY + auto syncIndexDesc = DescribePrivatePath(runtime, "/MyRoot/TableWithMixedIndexes/SyncIndex", true, true); + UNIT_ASSERT_VALUES_EQUAL(syncIndexDesc.GetPathDescription().ChildrenSize(), 1); + TString syncImplTableName = syncIndexDesc.GetPathDescription().GetChildren(0).GetName(); + + auto syncImplTableDesc = DescribePrivatePath(runtime, + "/MyRoot/TableWithMixedIndexes/SyncIndex/" + syncImplTableName, true, true); + const auto& syncTableDesc = syncImplTableDesc.GetPathDescription().GetTable(); + + bool foundSyncCdc = false; + for (size_t i = 0; i < syncTableDesc.CdcStreamsSize(); ++i) { + if (syncTableDesc.GetCdcStreams(i).GetName().EndsWith("_continuousBackupImpl")) { + foundSyncCdc = true; + Cerr << "Found CDC stream on SyncIndex (expected)" << Endl; + break; + } + } + UNIT_ASSERT_C(foundSyncCdc, "Global sync index should have CDC stream"); + + // Verify NO CDC stream on async index + auto asyncIndexDesc = DescribePrivatePath(runtime, "/MyRoot/TableWithMixedIndexes/AsyncIndex", true, true); + UNIT_ASSERT_VALUES_EQUAL(asyncIndexDesc.GetPathDescription().ChildrenSize(), 1); + TString asyncImplTableName = asyncIndexDesc.GetPathDescription().GetChildren(0).GetName(); + + auto asyncImplTableDesc = DescribePrivatePath(runtime, + "/MyRoot/TableWithMixedIndexes/AsyncIndex/" + asyncImplTableName, true, true); + const auto& asyncTableDesc = asyncImplTableDesc.GetPathDescription().GetTable(); + + bool foundAsyncCdc = false; + for (size_t i = 0; i < asyncTableDesc.CdcStreamsSize(); ++i) { + if (asyncTableDesc.GetCdcStreams(i).GetName().EndsWith("_continuousBackupImpl")) { + foundAsyncCdc = true; + break; + } + } + UNIT_ASSERT_C(!foundAsyncCdc, "Async index should NOT have CDC stream"); + } + + Y_UNIT_TEST(MultipleTablesWithIndexes) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true)); + ui64 txId = 100; + + SetupLogging(runtime); + PrepareDirs(runtime, env, txId); + + // Create incremental backup collection with 2 tables + TString collectionSettings = R"( + Name: ")" DEFAULT_NAME_1 R"(" + ExplicitEntryList { + Entries { + Type: ETypeTable + Path: "/MyRoot/Table1" + } + Entries { + Type: ETypeTable + Path: "/MyRoot/Table2" + } + } + Cluster: {} + IncrementalBackupConfig: {} + )"; + + TestCreateBackupCollection(runtime, ++txId, "/MyRoot/.backups/collections/", collectionSettings); + env.TestWaitNotification(runtime, txId); + + // Create Table1 with index + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "Table1" + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "Index1" + KeyColumnNames: ["value"] + Type: EIndexTypeGlobal + } + )"); + env.TestWaitNotification(runtime, txId); + + // Create Table2 with index + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "Table2" + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "data" Type: "Utf8" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "Index2" + KeyColumnNames: ["data"] + Type: EIndexTypeGlobal + } + )"); + env.TestWaitNotification(runtime, txId); + + // Execute full backup + TestBackupBackupCollection(runtime, ++txId, "/MyRoot", + R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")"); + env.TestWaitNotification(runtime, txId); + + // Verify CDC streams on both main tables + TVector tables = {"Table1", "Table2"}; + TVector indexes = {"Index1", "Index2"}; + + for (size_t tableIdx = 0; tableIdx < tables.size(); ++tableIdx) { + const auto& tableName = tables[tableIdx]; + const auto& indexName = indexes[tableIdx]; + + // Check main table CDC + auto mainTableDesc = DescribePrivatePath(runtime, "/MyRoot/" + tableName, true, true); + const auto& tableDesc = mainTableDesc.GetPathDescription().GetTable(); + + bool foundMainCdc = false; + for (size_t i = 0; i < tableDesc.CdcStreamsSize(); ++i) { + if (tableDesc.GetCdcStreams(i).GetName().EndsWith("_continuousBackupImpl")) { + foundMainCdc = true; + Cerr << "Found CDC stream on " << tableName << Endl; + break; + } + } + UNIT_ASSERT_C(foundMainCdc, tableName + " should have CDC stream"); + + // Check index CDC + auto indexDesc = DescribePrivatePath(runtime, "/MyRoot/" + tableName + "/" + indexName, true, true); + UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetPathDescription().ChildrenSize(), 1); + TString indexImplTableName = indexDesc.GetPathDescription().GetChildren(0).GetName(); + + auto indexImplTableDesc = DescribePrivatePath(runtime, + "/MyRoot/" + tableName + "/" + indexName + "/" + indexImplTableName, true, true); + const auto& indexTableDesc = indexImplTableDesc.GetPathDescription().GetTable(); + + bool foundIndexCdc = false; + for (size_t i = 0; i < indexTableDesc.CdcStreamsSize(); ++i) { + if (indexTableDesc.GetCdcStreams(i).GetName().EndsWith("_continuousBackupImpl")) { + foundIndexCdc = true; + Cerr << "Found CDC stream on " << indexName << Endl; + break; + } + } + UNIT_ASSERT_C(foundIndexCdc, indexName + " should have CDC stream"); + } + } + + Y_UNIT_TEST(IncrementalBackupWithIndexes) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true)); + ui64 txId = 100; + + SetupLogging(runtime); + PrepareDirs(runtime, env, txId); + + // Create incremental backup collection + TString collectionSettings = R"( + Name: ")" DEFAULT_NAME_1 R"(" + ExplicitEntryList { + Entries { + Type: ETypeTable + Path: "/MyRoot/TableForIncremental" + } + } + Cluster: {} + IncrementalBackupConfig: {} + )"; + + TestCreateBackupCollection(runtime, ++txId, "/MyRoot/.backups/collections/", collectionSettings); + env.TestWaitNotification(runtime, txId); + + // Create table with global sync index + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "TableForIncremental" + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "ValueIndex" + KeyColumnNames: ["value"] + Type: EIndexTypeGlobal + } + )"); + env.TestWaitNotification(runtime, txId); + + // Execute full backup (creates CDC streams) + TestBackupBackupCollection(runtime, ++txId, "/MyRoot", + R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")"); + env.TestWaitNotification(runtime, txId); + + // Verify CDC streams were created for both main table and index + auto mainTableDesc = DescribePrivatePath(runtime, "/MyRoot/TableForIncremental", true, true); + const auto& tableDesc = mainTableDesc.GetPathDescription().GetTable(); + + bool foundMainCdc = false; + for (size_t i = 0; i < tableDesc.CdcStreamsSize(); ++i) { + if (tableDesc.GetCdcStreams(i).GetName().EndsWith("_continuousBackupImpl")) { + foundMainCdc = true; + Cerr << "Found CDC stream on main table" << Endl; + break; + } + } + UNIT_ASSERT_C(foundMainCdc, "Main table should have CDC stream after full backup"); + + // Verify CDC stream on index + auto indexDesc = DescribePrivatePath(runtime, "/MyRoot/TableForIncremental/ValueIndex", true, true); + UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetPathDescription().ChildrenSize(), 1); + TString indexImplTableName = indexDesc.GetPathDescription().GetChildren(0).GetName(); + + auto indexImplTableDesc = DescribePrivatePath(runtime, + "/MyRoot/TableForIncremental/ValueIndex/" + indexImplTableName, true, true); + const auto& indexTableDesc = indexImplTableDesc.GetPathDescription().GetTable(); + + bool foundIndexCdc = false; + for (size_t i = 0; i < indexTableDesc.CdcStreamsSize(); ++i) { + if (indexTableDesc.GetCdcStreams(i).GetName().EndsWith("_continuousBackupImpl")) { + foundIndexCdc = true; + Cerr << "Found CDC stream on index implementation table" << Endl; + break; + } + } + UNIT_ASSERT_C(foundIndexCdc, "Index implementation table should have CDC stream after full backup"); + + runtime.AdvanceCurrentTime(TDuration::Seconds(1)); + + // Execute incremental backup (rotates CDC, creates backup tables) + TestBackupIncrementalBackupCollection(runtime, ++txId, "/MyRoot", + R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")"); + env.TestWaitNotification(runtime, txId); + + // Verify backup collection structure + TestDescribeResult(DescribePath(runtime, "/MyRoot/.backups/collections/" DEFAULT_NAME_1), { + NLs::PathExist, + NLs::IsBackupCollection, + NLs::ChildrenCount(2), // full + incremental + }); + + // Find the incremental backup directory (should end with "_incremental") + auto collectionDesc = DescribePath(runtime, "/MyRoot/.backups/collections/" DEFAULT_NAME_1, true, true); + TString incrBackupDir; + for (size_t i = 0; i < collectionDesc.GetPathDescription().ChildrenSize(); ++i) { + const auto& child = collectionDesc.GetPathDescription().GetChildren(i); + Cerr << "Child: " << child.GetName() << " PathState: " << child.GetPathState() << Endl; + if (child.GetName().EndsWith("_incremental")) { + incrBackupDir = child.GetName(); + break; + } + } + UNIT_ASSERT_C(!incrBackupDir.empty(), "Should find incremental backup directory"); + + // Verify backup table for main table exists + TestDescribeResult(DescribePath(runtime, + "/MyRoot/.backups/collections/" DEFAULT_NAME_1 "/" + incrBackupDir + "/TableForIncremental"), { + NLs::PathExist, + NLs::IsTable, + }); + + // Verify index backup table exists in __ydb_backup_meta/indexes/TableForIncremental/ValueIndex + TString indexBackupPath = "/MyRoot/.backups/collections/" DEFAULT_NAME_1 "/" + incrBackupDir + + "/__ydb_backup_meta/indexes/TableForIncremental/ValueIndex"; + TestDescribeResult(DescribePath(runtime, indexBackupPath), { + NLs::PathExist, + NLs::IsTable, + }); + + Cerr << "SUCCESS: Full backup created CDC streams for both main table and index" << Endl; + Cerr << " Incremental backup created backup tables for both main table and index" << Endl; + Cerr << " Index backup table verified at: " << indexBackupPath << Endl; + } + + Y_UNIT_TEST(OmitIndexesFlag) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true)); + ui64 txId = 100; + + SetupLogging(runtime); + PrepareDirs(runtime, env, txId); + + // Create incremental backup collection WITH OmitIndexes flag set + TString collectionSettings = R"( + Name: ")" DEFAULT_NAME_1 R"(" + ExplicitEntryList { + Entries { + Type: ETypeTable + Path: "/MyRoot/TableWithIndex" + } + } + Cluster: {} + IncrementalBackupConfig { + OmitIndexes: true + } + )"; + + TestCreateBackupCollection(runtime, ++txId, "/MyRoot/.backups/collections/", collectionSettings); + env.TestWaitNotification(runtime, txId); + + // Create table with global sync index + TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "TableWithIndex" + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "ValueIndex" + KeyColumnNames: ["value"] + Type: EIndexTypeGlobal + } + )"); + env.TestWaitNotification(runtime, txId); + + // Execute full backup + TestBackupBackupCollection(runtime, ++txId, "/MyRoot", + R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")"); + env.TestWaitNotification(runtime, txId); + + // Verify CDC stream exists on main table + auto mainTableDesc = DescribePrivatePath(runtime, "/MyRoot/TableWithIndex", true, true); + const auto& tableDesc = mainTableDesc.GetPathDescription().GetTable(); + + bool foundMainCdc = false; + for (size_t i = 0; i < tableDesc.CdcStreamsSize(); ++i) { + if (tableDesc.GetCdcStreams(i).GetName().EndsWith("_continuousBackupImpl")) { + foundMainCdc = true; + Cerr << "Found CDC stream on main table (expected)" << Endl; + break; + } + } + UNIT_ASSERT_C(foundMainCdc, "Main table should have CDC stream even with OmitIndexes=true"); + + // Verify NO CDC stream on index (because OmitIndexes is true) + auto indexDesc = DescribePrivatePath(runtime, "/MyRoot/TableWithIndex/ValueIndex", true, true); + UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetPathDescription().ChildrenSize(), 1); + TString indexImplTableName = indexDesc.GetPathDescription().GetChildren(0).GetName(); + + auto indexImplTableDesc = DescribePrivatePath(runtime, + "/MyRoot/TableWithIndex/ValueIndex/" + indexImplTableName, true, true); + const auto& indexTableDesc = indexImplTableDesc.GetPathDescription().GetTable(); + + bool foundIndexCdc = false; + for (size_t i = 0; i < indexTableDesc.CdcStreamsSize(); ++i) { + if (indexTableDesc.GetCdcStreams(i).GetName().EndsWith("_continuousBackupImpl")) { + foundIndexCdc = true; + break; + } + } + UNIT_ASSERT_C(!foundIndexCdc, "Index should NOT have CDC stream when OmitIndexes=true"); + + Cerr << "SUCCESS: OmitIndexes flag works correctly - main table has CDC, index does not" << Endl; + } + Y_UNIT_TEST(BackupWithIndexes) { TTestBasicRuntime runtime; TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true)); diff --git a/ydb/core/tx/schemeshard/ut_base/ut_base.cpp b/ydb/core/tx/schemeshard/ut_base/ut_base.cpp index 6fb1434ae528..11aad2f7cf3b 100644 --- a/ydb/core/tx/schemeshard/ut_base/ut_base.cpp +++ b/ydb/core/tx/schemeshard/ut_base/ut_base.cpp @@ -11924,9 +11924,12 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { auto backupDirName = descr.GetChildren(0).GetName().c_str(); + // When WithIncremental=true, __ydb_backup_meta directory is created for index backups + // So we expect 3 children: Table1, DirA, and __ydb_backup_meta + // When WithIncremental=false, we expect 2 children: Table1 and DirA TestDescribeResult(DescribePath(runtime, Sprintf("/MyRoot/.backups/collections/MyCollection1/%s", backupDirName)), { NLs::PathExist, - NLs::ChildrenCount(2), + NLs::ChildrenCount(WithIncremental ? 3 : 2), NLs::Finished, }); @@ -11990,9 +11993,10 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { } } + // Incremental backup directory contains Table1, DirA, and __ydb_backup_meta TestDescribeResult(DescribePath(runtime, Sprintf("/MyRoot/.backups/collections/MyCollection1/%s", incrBackupDirName)), { NLs::PathExist, - NLs::ChildrenCount(2), + NLs::ChildrenCount(3), NLs::Finished, });