Skip to content

Commit 6d991e5

Browse files
committed
WIP
1 parent 3175e50 commit 6d991e5

7 files changed

+471
-101
lines changed

ydb/core/protos/flat_scheme_op.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,8 @@ message TTableDescription {
346346
optional bool AllowUnderSameOperation = 44 [default = false];
347347
// Create only as-well. Used for CopyTable to create table in desired state instead of default
348348
optional EPathState PathState = 46;
349+
// Skip automatic index/impl table copying - indexes will be handled separately
350+
optional bool OmitIndexes = 47 [default = false];
349351
}
350352

351353
message TDictionaryEncodingSettings {
@@ -1282,6 +1284,11 @@ message TCopyTableConfig { //TTableDescription implemets copying a table in orig
12821284
optional bool AllowUnderSameOperation = 7 [default = false];
12831285

12841286
optional NKikimrSchemeOp.EPathState TargetPathTargetState = 8;
1287+
1288+
// Map from index impl table name to CDC stream config for incremental backups
1289+
// Key: index impl table name (e.g., "indexImplTable")
1290+
// Value: CDC stream configuration to create on that index impl table
1291+
map<string, TCreateCdcStream> IndexImplTableCdcStreams = 9;
12851292
}
12861293

12871294
message TConsistentTableCopyingConfig {

ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,127 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) {
508508
"{ items { uint32_value: 3 } items { uint32_value: 30 } }");
509509
}
510510

511+
Y_UNIT_TEST(SimpleBackupRestoreWithIndex) {
512+
TPortManager portManager;
513+
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
514+
.SetUseRealThreads(false)
515+
.SetDomainName("Root")
516+
.SetEnableBackupService(true)
517+
.SetEnableChangefeedInitialScan(true)
518+
);
519+
520+
auto& runtime = *server->GetRuntime();
521+
const auto edgeActor = runtime.AllocateEdgeActor();
522+
523+
SetupLogging(runtime);
524+
InitRoot(server, edgeActor);
525+
526+
// Create table with a global index
527+
CreateShardedTable(server, edgeActor, "/Root", "TableWithIndex",
528+
TShardedTableOptions()
529+
.Columns({
530+
{"key", "Uint32", true, false},
531+
{"value", "Uint32", false, false},
532+
{"indexed", "Uint32", false, false}
533+
})
534+
.Indexes({
535+
{"idx", {"indexed"}, {}, NKikimrSchemeOp::EIndexTypeGlobal}
536+
}));
537+
538+
// Insert test data
539+
ExecSQL(server, edgeActor, R"(
540+
UPSERT INTO `/Root/TableWithIndex` (key, value, indexed) VALUES
541+
(1, 10, 100),
542+
(2, 20, 200),
543+
(3, 30, 300);
544+
)");
545+
546+
// Verify index works before backup
547+
auto beforeBackup = KqpSimpleExecSuccess(runtime, R"(
548+
SELECT key FROM `/Root/TableWithIndex` VIEW idx WHERE indexed = 200
549+
)");
550+
UNIT_ASSERT_C(beforeBackup.find("uint32_value: 2") != TString::npos,
551+
"Index should work before backup: " << beforeBackup);
552+
553+
// Create backup collection
554+
ExecSQL(server, edgeActor, R"(
555+
CREATE BACKUP COLLECTION `TestCollection`
556+
( TABLE `/Root/TableWithIndex` )
557+
WITH
558+
( STORAGE = 'cluster'
559+
, INCREMENTAL_BACKUP_ENABLED = 'true'
560+
);
561+
)", false);
562+
563+
// Perform full backup
564+
ExecSQL(server, edgeActor, R"(BACKUP `TestCollection`;)", false);
565+
566+
// Wait longer for CDC streams to be fully created (AtTable phase completes async)
567+
// The CDC AtTable phase increments schema versions and syncs indexes
568+
SimulateSleep(server, TDuration::Seconds(5));
569+
570+
// Add version diagnostics after backup
571+
{
572+
Cerr << "========== VERSION DIAGNOSTICS AFTER BACKUP ==========" << Endl;
573+
574+
auto describeTable = Ls(runtime, edgeActor, "/Root/TableWithIndex");
575+
if (!describeTable->ResultSet.empty() && describeTable->ResultSet[0].TableId) {
576+
Cerr << "Main table SchemaVersion: " << describeTable->ResultSet[0].TableId.SchemaVersion << Endl;
577+
if (describeTable->ResultSet[0].Self) {
578+
Cerr << "Main table PathVersion: " << describeTable->ResultSet[0].Self->Info.GetPathVersion() << Endl;
579+
}
580+
}
581+
582+
auto describeIndex = Ls(runtime, edgeActor, "/Root/TableWithIndex/idx");
583+
if (!describeIndex->ResultSet.empty() && describeIndex->ResultSet[0].Self) {
584+
Cerr << "Index PathVersion: " << describeIndex->ResultSet[0].Self->Info.GetPathVersion() << Endl;
585+
}
586+
587+
auto describeImplTable = Ls(runtime, edgeActor, "/Root/TableWithIndex/idx/indexImplTable");
588+
if (!describeImplTable->ResultSet.empty() && describeImplTable->ResultSet[0].TableId) {
589+
Cerr << "Index impl table SchemaVersion: " << describeImplTable->ResultSet[0].TableId.SchemaVersion << Endl;
590+
if (describeImplTable->ResultSet[0].Self) {
591+
Cerr << "Index impl table PathVersion: " << describeImplTable->ResultSet[0].Self->Info.GetPathVersion() << Endl;
592+
}
593+
}
594+
595+
Cerr << "======================================================" << Endl;
596+
}
597+
598+
// Capture expected data
599+
auto expectedData = KqpSimpleExecSuccess(runtime, R"(
600+
SELECT key, value, indexed FROM `/Root/TableWithIndex` ORDER BY key
601+
)");
602+
603+
// Drop table
604+
ExecSQL(server, edgeActor, R"(DROP TABLE `/Root/TableWithIndex`;)", false);
605+
runtime.SimulateSleep(TDuration::Seconds(1));
606+
607+
// Restore from backup
608+
ExecSQL(server, edgeActor, R"(RESTORE `TestCollection`;)", false);
609+
runtime.SimulateSleep(TDuration::Seconds(5));
610+
611+
// Verify data is restored
612+
auto actualData = KqpSimpleExecSuccess(runtime, R"(
613+
SELECT key, value, indexed FROM `/Root/TableWithIndex` ORDER BY key
614+
)");
615+
UNIT_ASSERT_VALUES_EQUAL(expectedData, actualData);
616+
617+
// Verify index still exists and works after restore
618+
auto afterRestore = KqpSimpleExecSuccess(runtime, R"(
619+
SELECT key FROM `/Root/TableWithIndex` VIEW idx WHERE indexed = 200
620+
)");
621+
UNIT_ASSERT_C(afterRestore.find("uint32_value: 2") != TString::npos,
622+
"Index should work after restore: " << afterRestore);
623+
624+
// Verify index implementation table exists and has data
625+
auto indexImplData = KqpSimpleExecSuccess(runtime, R"(
626+
SELECT COUNT(*) FROM `/Root/TableWithIndex/idx/indexImplTable`
627+
)");
628+
UNIT_ASSERT_C(indexImplData.find("uint64_value: 3") != TString::npos,
629+
"Index impl table should have 3 rows: " << indexImplData);
630+
}
631+
511632
Y_UNIT_TEST(MultiBackup) {
512633
TPortManager portManager;
513634
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())

0 commit comments

Comments
 (0)