Skip to content

Commit e3cc189

Browse files
committed
WIP-35-5-0
1 parent 2948095 commit e3cc189

File tree

2 files changed

+213
-119
lines changed

2 files changed

+213
-119
lines changed

ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3325,7 +3325,8 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) {
33253325

33263326
// Create full backup
33273327
ExecSQL(server, edgeActor, R"(BACKUP `MultiIndexCollection`;)", false);
3328-
SimulateSleep(server, TDuration::Seconds(1));
3328+
// Wait for CDC streams to be fully created and schema versions to stabilize
3329+
SimulateSleep(server, TDuration::Seconds(5));
33293330

33303331
// Modify data
33313332
ExecSQL(server, edgeActor, R"(

ydb/core/tx/schemeshard/schemeshard__operation_incremental_restore_finalize.cpp

Lines changed: 211 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ class TIncrementalRestoreFinalizeOp: public TSubOperationWithContext {
1313
TTxState::ETxState NextState(TTxState::ETxState state) const override {
1414
switch(state) {
1515
case TTxState::Waiting:
16+
return TTxState::ConfigureParts;
17+
case TTxState::ConfigureParts:
1618
return TTxState::Propose;
1719
case TTxState::Propose:
1820
return TTxState::Done;
@@ -24,6 +26,8 @@ class TIncrementalRestoreFinalizeOp: public TSubOperationWithContext {
2426
TSubOperationState::TPtr SelectStateFunc(TTxState::ETxState state) override {
2527
switch(state) {
2628
case TTxState::Waiting:
29+
case TTxState::ConfigureParts:
30+
return MakeHolder<TConfigureParts>(OperationId, Transaction);
2731
case TTxState::Propose:
2832
return MakeHolder<TFinalizationPropose>(OperationId, Transaction);
2933
case TTxState::Done:
@@ -33,6 +37,171 @@ class TIncrementalRestoreFinalizeOp: public TSubOperationWithContext {
3337
}
3438
}
3539

40+
class TConfigureParts: public TSubOperationState {
41+
private:
42+
TOperationId OperationId;
43+
TTxTransaction Transaction;
44+
45+
TString DebugHint() const override {
46+
return TStringBuilder()
47+
<< "TIncrementalRestoreFinalize TConfigureParts"
48+
<< " operationId: " << OperationId;
49+
}
50+
51+
public:
52+
TConfigureParts(TOperationId id, const TTxTransaction& tx)
53+
: OperationId(id), Transaction(tx)
54+
{
55+
IgnoreMessages(DebugHint(), {TEvHive::TEvCreateTabletReply::EventType});
56+
}
57+
58+
bool HandleReply(TEvDataShard::TEvProposeTransactionResult::TPtr& ev, TOperationContext& context) override {
59+
TTabletId ssId = context.SS->SelfTabletId();
60+
61+
LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
62+
DebugHint() << " HandleReply TEvProposeTransactionResult"
63+
<< ", at schemeshard: " << ssId
64+
<< ", message: " << ev->Get()->Record.ShortDebugString());
65+
66+
return NTableState::CollectProposeTransactionResults(OperationId, ev, context);
67+
}
68+
69+
bool ProgressState(TOperationContext& context) override {
70+
TTabletId ssId = context.SS->SelfTabletId();
71+
72+
LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
73+
DebugHint() << " ProgressState"
74+
<< ", at schemeshard: " << ssId);
75+
76+
TTxState* txState = context.SS->FindTx(OperationId);
77+
Y_ABORT_UNLESS(txState);
78+
79+
const auto& finalize = Transaction.GetIncrementalRestoreFinalize();
80+
81+
// Collect all index impl tables that need schema version updates
82+
THashSet<TPathId> implTablesToUpdate;
83+
CollectIndexImplTables(finalize, context, implTablesToUpdate);
84+
85+
if (implTablesToUpdate.empty()) {
86+
LOG_I(DebugHint() << " No index impl tables to update, skipping ConfigureParts");
87+
return true;
88+
}
89+
90+
// Prepare AlterData for each table and add shards to txState
91+
NIceDb::TNiceDb db(context.GetDB());
92+
txState->ClearShardsInProgress();
93+
94+
for (const auto& tablePathId : implTablesToUpdate) {
95+
if (!context.SS->Tables.contains(tablePathId)) {
96+
LOG_W(DebugHint() << " Table not found: " << tablePathId);
97+
continue;
98+
}
99+
100+
auto table = context.SS->Tables.at(tablePathId);
101+
102+
// Create AlterData if it doesn't exist
103+
if (!table->AlterData) {
104+
// Create minimal AlterData just to bump schema version
105+
auto alterData = MakeIntrusive<TTableInfo::TAlterTableInfo>();
106+
alterData->AlterVersion = table->AlterVersion + 1;
107+
alterData->NextColumnId = table->NextColumnId;
108+
alterData->Columns = table->Columns;
109+
alterData->KeyColumnIds = table->KeyColumnIds;
110+
alterData->IsBackup = table->IsBackup;
111+
alterData->IsRestore = table->IsRestore;
112+
alterData->TableDescriptionFull = table->TableDescription;
113+
114+
table->PrepareAlter(alterData);
115+
} else {
116+
// Increment AlterVersion if AlterData already exists
117+
table->AlterData->AlterVersion = table->AlterVersion + 1;
118+
}
119+
120+
LOG_I(DebugHint() << " Preparing ALTER for table " << tablePathId
121+
<< " version: " << table->AlterVersion << " -> " << table->AlterData->AlterVersion);
122+
123+
// Add all shards of this table to txState
124+
for (const auto& shard : table->GetPartitions()) {
125+
auto shardIdx = shard.ShardIdx;
126+
if (!txState->ShardsInProgress.contains(shardIdx)) {
127+
txState->Shards.emplace_back(shardIdx, ETabletType::DataShard, TTxState::ConfigureParts);
128+
txState->ShardsInProgress.insert(shardIdx);
129+
130+
LOG_I(DebugHint() << " Added shard " << shardIdx
131+
<< " (tablet: " << context.SS->ShardInfos[shardIdx].TabletID << ") to txState");
132+
}
133+
}
134+
}
135+
136+
context.SS->PersistTxState(db, OperationId);
137+
138+
// Send ALTER TABLE transactions to all datashards
139+
for (const auto& shard : txState->Shards) {
140+
auto shardIdx = shard.Idx;
141+
auto datashardId = context.SS->ShardInfos[shardIdx].TabletID;
142+
143+
LOG_I(DebugHint() << " Propose ALTER to datashard " << datashardId
144+
<< " shardIdx: " << shardIdx << " txid: " << OperationId);
145+
146+
const auto seqNo = context.SS->StartRound(*txState);
147+
148+
// Find which table this shard belongs to
149+
TPathId tablePathId;
150+
for (const auto& pathId : implTablesToUpdate) {
151+
auto table = context.SS->Tables.at(pathId);
152+
for (const auto& partition : table->GetPartitions()) {
153+
if (partition.ShardIdx == shardIdx) {
154+
tablePathId = pathId;
155+
break;
156+
}
157+
}
158+
if (tablePathId) break;
159+
}
160+
161+
if (!tablePathId) {
162+
LOG_W(DebugHint() << " Could not find table for shard " << shardIdx);
163+
continue;
164+
}
165+
166+
const auto txBody = context.SS->FillAlterTableTxBody(tablePathId, shardIdx, seqNo);
167+
auto event = context.SS->MakeDataShardProposal(tablePathId, OperationId, txBody, context.Ctx);
168+
context.OnComplete.BindMsgToPipe(OperationId, datashardId, shardIdx, event.Release());
169+
}
170+
171+
txState->UpdateShardsInProgress();
172+
return false;
173+
}
174+
175+
private:
176+
void CollectIndexImplTables(const NKikimrSchemeOp::TIncrementalRestoreFinalize& finalize,
177+
TOperationContext& context,
178+
THashSet<TPathId>& implTables) {
179+
for (const auto& tablePath : finalize.GetTargetTablePaths()) {
180+
// Check if this path looks like an index implementation table
181+
if (!tablePath.Contains("/indexImplTable")) {
182+
continue;
183+
}
184+
185+
TPath path = TPath::Resolve(tablePath, context.SS);
186+
if (!path.IsResolved()) {
187+
LOG_W("CollectIndexImplTables: Table not resolved: " << tablePath);
188+
continue;
189+
}
190+
191+
if (path.Base()->PathType != NKikimrSchemeOp::EPathType::EPathTypeTable) {
192+
continue;
193+
}
194+
195+
TPathId implTablePathId = path.Base()->PathId;
196+
if (context.SS->Tables.contains(implTablePathId)) {
197+
implTables.insert(implTablePathId);
198+
LOG_I("CollectIndexImplTables: Found index impl table: " << tablePath
199+
<< " pathId: " << implTablePathId);
200+
}
201+
}
202+
}
203+
};
204+
36205
class TFinalizationPropose: public TSubOperationState {
37206
private:
38207
TOperationId OperationId;
@@ -104,145 +273,69 @@ class TIncrementalRestoreFinalizeOp: public TSubOperationWithContext {
104273
LOG_I("SyncIndexSchemaVersions: Starting schema version sync for restored indexes");
105274
LOG_I("SyncIndexSchemaVersions: Processing " << finalize.GetTargetTablePaths().size() << " target table paths");
106275

107-
// Iterate through all target table paths and sync their indexes
276+
NIceDb::TNiceDb db(context.GetDB());
277+
278+
// Iterate through all target table paths and finalize their alters
108279
for (const auto& tablePath : finalize.GetTargetTablePaths()) {
109-
LOG_I("SyncIndexSchemaVersions: Processing table path: " << tablePath);
110-
280+
// Check if this path looks like an index implementation table
281+
if (!tablePath.Contains("/indexImplTable")) {
282+
continue;
283+
}
284+
111285
TPath path = TPath::Resolve(tablePath, context.SS);
112286
if (!path.IsResolved()) {
113287
LOG_W("SyncIndexSchemaVersions: Table not resolved: " << tablePath);
114288
continue;
115289
}
116-
117-
LOG_I("SyncIndexSchemaVersions: Path resolved, PathType=" << path.Base()->PathType);
118-
119-
// Check if this is a regular table (not an index impl table)
290+
120291
if (path.Base()->PathType != NKikimrSchemeOp::EPathType::EPathTypeTable) {
121-
LOG_I("SyncIndexSchemaVersions: Skipping non-table path: " << tablePath);
122292
continue;
123293
}
124-
125-
// Check if this path looks like an index implementation table
126-
if (tablePath.Contains("/indexImplTable")) {
127-
LOG_I("SyncIndexSchemaVersions: Found index impl table: " << tablePath);
128-
// For index impl tables, we need to sync the parent index
129-
SyncIndexImplTable(path, context);
294+
295+
TPathId implTablePathId = path.Base()->PathId;
296+
if (!context.SS->Tables.contains(implTablePathId)) {
297+
LOG_W("SyncIndexSchemaVersions: Table not found: " << implTablePathId);
298+
continue;
299+
}
300+
301+
auto table = context.SS->Tables.at(implTablePathId);
302+
if (!table->AlterData) {
303+
LOG_W("SyncIndexSchemaVersions: No AlterData for table: " << implTablePathId);
130304
continue;
131305
}
306+
307+
// Finalize the alter - this commits AlterData to the main table state
308+
LOG_I("SyncIndexSchemaVersions: Finalizing ALTER for table " << implTablePathId
309+
<< " version: " << table->AlterVersion << " -> " << table->AlterData->AlterVersion);
132310

133-
// For regular tables, sync all their indexes
134-
LOG_I("SyncIndexSchemaVersions: Checking table for indexes: " << tablePath
135-
<< ", children count: " << path.Base()->GetChildren().size());
311+
table->FinishAlter();
312+
context.SS->PersistTableAltered(db, implTablePathId, table);
136313

137-
for (const auto& [childName, childPathId] : path.Base()->GetChildren()) {
138-
LOG_I("SyncIndexSchemaVersions: Checking child: " << childName
139-
<< " pathId: " << childPathId
140-
<< " type: " << context.SS->PathsById.at(childPathId)->PathType);
141-
142-
auto childPath = context.SS->PathsById.at(childPathId);
143-
144-
if (childPath->PathType != NKikimrSchemeOp::EPathTypeTableIndex) {
145-
LOG_I("SyncIndexSchemaVersions: Child is not an index, skipping");
146-
continue;
147-
}
148-
149-
if (childPath->Dropped()) {
150-
LOG_I("SyncIndexSchemaVersions: Index is dropped, skipping");
151-
continue;
152-
}
153-
154-
auto indexInfo = context.SS->Indexes.at(childPathId);
155-
LOG_I("SyncIndexSchemaVersions: Index type: " << indexInfo->Type);
156-
157-
if (indexInfo->Type != NKikimrSchemeOp::EIndexTypeGlobal) {
158-
LOG_I("SyncIndexSchemaVersions: Index is not global, skipping");
159-
continue;
160-
}
161-
162-
LOG_I("SyncIndexSchemaVersions: Syncing schema for global index: " << childName);
163-
164-
// Update the index state to trigger schema sync
165-
NIceDb::TNiceDb db(context.GetDB());
166-
context.SS->Indexes[childPathId]->AlterVersion += 1;
167-
context.SS->PersistTableIndexAlterVersion(db, childPathId, context.SS->Indexes[childPathId]);
168-
169-
LOG_I("SyncIndexSchemaVersions: Index AlterVersion incremented to "
170-
<< context.SS->Indexes[childPathId]->AlterVersion);
171-
172-
// Update the index implementation table version
173-
auto indexPath = TPath::Init(childPathId, context.SS);
174-
Y_ABORT_UNLESS(indexPath.Base()->GetChildren().size() == 1);
175-
auto [implTableName, implTablePathId] = *indexPath.Base()->GetChildren().begin();
176-
177-
LOG_I("SyncIndexSchemaVersions: Index impl table: " << implTableName << " pathId: " << implTablePathId);
178-
179-
if (context.SS->Tables.contains(implTablePathId)) {
180-
auto table = context.SS->Tables.at(implTablePathId);
181-
auto oldVersion = table->AlterVersion;
182-
table->AlterVersion += 1;
183-
context.SS->PersistTable(db, implTablePathId);
314+
// Clear describe path caches and publish to scheme board
315+
context.SS->ClearDescribePathCaches(path.Base());
316+
context.OnComplete.PublishToSchemeBoard(OperationId, implTablePathId);
317+
318+
LOG_I("SyncIndexSchemaVersions: Finalized schema version for: " << tablePath);
319+
320+
// Also update the parent index version
321+
TPath indexPath = path.Parent();
322+
if (indexPath.IsResolved() && indexPath.Base()->PathType == NKikimrSchemeOp::EPathTypeTableIndex) {
323+
TPathId indexPathId = indexPath.Base()->PathId;
324+
if (context.SS->Indexes.contains(indexPathId)) {
325+
auto oldVersion = context.SS->Indexes[indexPathId]->AlterVersion;
326+
context.SS->Indexes[indexPathId]->AlterVersion += 1;
327+
context.SS->PersistTableIndexAlterVersion(db, indexPathId, context.SS->Indexes[indexPathId]);
184328

185-
LOG_I("SyncIndexSchemaVersions: Table AlterVersion incremented from "
186-
<< oldVersion << " to " << table->AlterVersion);
329+
LOG_I("SyncIndexSchemaVersions: Index AlterVersion incremented from "
330+
<< oldVersion << " to " << context.SS->Indexes[indexPathId]->AlterVersion);
187331

188-
// Publish to scheme board to notify datashards
189-
context.OnComplete.PublishToSchemeBoard(OperationId, implTablePathId);
190-
LOG_I("SyncIndexSchemaVersions: Published to SchemeBoard for table: " << implTableName);
191-
} else {
192-
LOG_W("SyncIndexSchemaVersions: Table not found in Tables map: " << implTablePathId);
332+
context.OnComplete.PublishToSchemeBoard(OperationId, indexPathId);
193333
}
194334
}
195335
}
196336

197337
LOG_I("SyncIndexSchemaVersions: Finished schema version sync");
198338
}
199-
200-
void SyncIndexImplTable(const TPath& implTablePath, TOperationContext& context) {
201-
LOG_I("SyncIndexImplTable: Processing impl table: " << implTablePath.PathString());
202-
203-
// For index impl tables, find the parent index and sync it
204-
TPath indexPath = implTablePath.Parent();
205-
if (!indexPath.IsResolved() || indexPath.Base()->PathType != NKikimrSchemeOp::EPathTypeTableIndex) {
206-
LOG_W("SyncIndexImplTable: Parent is not an index: " << indexPath.PathString());
207-
return;
208-
}
209-
210-
TPathId indexPathId = indexPath.Base()->PathId;
211-
TPathId implTablePathId = implTablePath.Base()->PathId;
212-
213-
LOG_I("SyncIndexImplTable: Syncing index: " << indexPath.PathString()
214-
<< " indexPathId: " << indexPathId);
215-
216-
NIceDb::TNiceDb db(context.GetDB());
217-
218-
// Update index version
219-
if (context.SS->Indexes.contains(indexPathId)) {
220-
auto oldVersion = context.SS->Indexes[indexPathId]->AlterVersion;
221-
context.SS->Indexes[indexPathId]->AlterVersion += 1;
222-
context.SS->PersistTableIndexAlterVersion(db, indexPathId, context.SS->Indexes[indexPathId]);
223-
LOG_I("SyncIndexImplTable: Index AlterVersion incremented from "
224-
<< oldVersion << " to " << context.SS->Indexes[indexPathId]->AlterVersion);
225-
} else {
226-
LOG_W("SyncIndexImplTable: Index not found in Indexes map: " << indexPathId);
227-
}
228-
229-
// Update table version and notify shards
230-
if (context.SS->Tables.contains(implTablePathId)) {
231-
auto table = context.SS->Tables.at(implTablePathId);
232-
auto oldVersion = table->AlterVersion;
233-
table->AlterVersion += 1;
234-
context.SS->PersistTable(db, implTablePathId);
235-
236-
LOG_I("SyncIndexImplTable: Table AlterVersion incremented from "
237-
<< oldVersion << " to " << table->AlterVersion);
238-
239-
// Publish to scheme board to notify datashards
240-
context.OnComplete.PublishToSchemeBoard(OperationId, implTablePathId);
241-
LOG_I("SyncIndexImplTable: Published to SchemeBoard for table: " << implTablePath.PathString());
242-
} else {
243-
LOG_W("SyncIndexImplTable: Table not found in Tables map: " << implTablePathId);
244-
}
245-
}
246339

247340
void CollectPathsToNormalize(const NKikimrSchemeOp::TIncrementalRestoreFinalize& finalize,
248341
TOperationContext& context,

0 commit comments

Comments
 (0)