Skip to content

Commit 9117fd8

Browse files
inelpandzicgithub-actions[bot]Copilot
authored
PLM-131: Failed index reports and recreate (#111)
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: Copilot <[email protected]>
1 parent d30e915 commit 9117fd8

File tree

3 files changed

+101
-14
lines changed

3 files changed

+101
-14
lines changed

plm/catalog.go

Lines changed: 89 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,11 @@ type collectionCatalog struct {
9999
type indexCatalogEntry struct {
100100
*topo.IndexSpecification
101101
Incomplete bool `bson:"incomplete"`
102+
Failed bool `bson:"failed"`
102103
}
103104

104-
func (i indexCatalogEntry) Ready() bool {
105-
return !i.Incomplete
105+
func (i indexCatalogEntry) Unsuccessful() bool {
106+
return i.Failed || i.Incomplete
106107
}
107108

108109
// NewCatalog creates a new Catalog.
@@ -401,10 +402,13 @@ func (c *Catalog) CreateIndexes(
401402

402403
successfulIdxs := make([]indexCatalogEntry, 0, len(processedIdxs))
403404
successfulIdxNames := make([]string, 0, len(processedIdxs))
405+
406+
failedIdxs := make([]*topo.IndexSpecification, 0, len(processedIdxs))
404407
var idxErrors []error
405408

406409
for _, idx := range indexes {
407410
if err := processedIdxs[idx.Name]; err != nil {
411+
failedIdxs = append(failedIdxs, idx)
408412
idxErrors = append(idxErrors, errors.Wrap(err, "create index: "+idx.Name))
409413

410414
continue
@@ -418,6 +422,8 @@ func (c *Catalog) CreateIndexes(
418422
c.addIndexesToCatalog(ctx, db, coll, successfulIdxs)
419423

420424
if len(idxErrors) > 0 {
425+
c.AddFailedIndexes(ctx, db, coll, failedIdxs)
426+
421427
lg.Errorf(errors.Join(idxErrors...),
422428
"One or more indexes failed to create on %s.%s", db, coll)
423429
}
@@ -457,6 +463,35 @@ func (c *Catalog) AddIncompleteIndexes(
457463
c.addIndexesToCatalog(ctx, db, coll, indexEntries)
458464
}
459465

466+
// AddFailedIndexes adds indexes in the catalog that failed to create on the target cluster.
467+
// The indexes have set [indexCatalogEntry.Failed] flag.
468+
func (c *Catalog) AddFailedIndexes(
469+
ctx context.Context,
470+
db string,
471+
coll string,
472+
indexes []*topo.IndexSpecification,
473+
) {
474+
lg := log.Ctx(ctx)
475+
476+
if len(indexes) == 0 {
477+
lg.Error(nil, "No failed indexes to add")
478+
479+
return
480+
}
481+
482+
indexEntries := make([]indexCatalogEntry, len(indexes))
483+
for i, index := range indexes {
484+
indexEntries[i] = indexCatalogEntry{
485+
IndexSpecification: index,
486+
Failed: true,
487+
}
488+
489+
lg.Tracef("Added failed index %q for %s.%s to catalog", index.Name, db, coll)
490+
}
491+
492+
c.addIndexesToCatalog(ctx, db, coll, indexEntries)
493+
}
494+
460495
// ModifyCappedCollection modifies a capped collection in the target MongoDB.
461496
func (c *Catalog) ModifyCappedCollection(
462497
ctx context.Context,
@@ -725,12 +760,13 @@ func (c *Catalog) Finalize(ctx context.Context) error {
725760

726761
var idxErrors []error
727762

763+
foundUnsuccessfulIdx := false
764+
728765
for db, colls := range c.Databases {
729766
for coll, collEntry := range colls.Collections {
730767
for _, index := range collEntry.Indexes {
731-
if !index.Ready() {
732-
lg.Warnf("Index %s on %s.%s was incomplete during replication, skipping it",
733-
index.Name, db, coll)
768+
if index.Unsuccessful() {
769+
foundUnsuccessfulIdx = true
734770

735771
continue
736772
}
@@ -804,13 +840,61 @@ func (c *Catalog) Finalize(ctx context.Context) error {
804840
}
805841
}
806842

843+
if foundUnsuccessfulIdx {
844+
c.finalizeUnsuccessfulIndexes(ctx)
845+
}
846+
807847
if len(idxErrors) > 0 {
808848
lg.Errorf(errors.Join(idxErrors...), "Finalize indexes")
809849
}
810850

811851
return nil
812852
}
813853

854+
// finalizeUnsuccessfulIndexes finalizes indexes that were unsuccessful
855+
// during replication, failed or incomplete.
856+
func (c *Catalog) finalizeUnsuccessfulIndexes(ctx context.Context) {
857+
lg := log.Ctx(ctx)
858+
lg.Info("Finalizing unsuccessful indexes")
859+
860+
for db, colls := range c.Databases {
861+
for coll, collEntry := range colls.Collections {
862+
for _, index := range collEntry.Indexes {
863+
if !index.Unsuccessful() {
864+
continue // skip successful indexes
865+
}
866+
867+
if index.Incomplete {
868+
lg.Infof("Index %s on %s.%s was incomplete during replication, trying to create it",
869+
index.Name, db, coll)
870+
}
871+
872+
if index.Failed {
873+
lg.Infof("Index %s on %s.%s failed to create during replication, trying to recreate it",
874+
index.Name, db, coll)
875+
}
876+
877+
err := runWithRetry(ctx, func(ctx context.Context) error {
878+
return c.target.Database(db).RunCommand(ctx, bson.D{
879+
{"createIndexes", coll},
880+
{"indexes", bson.A{index.IndexSpecification}},
881+
}).Err()
882+
})
883+
if err != nil {
884+
lg.Warnf("Failed to recreate unsuccessful index %s on %s.%s: %v",
885+
index.Name, db, coll, err)
886+
887+
continue
888+
}
889+
890+
lg.Infof("Recreated index %s on %s.%s", index.Name, db, coll)
891+
892+
c.addIndexesToCatalog(ctx, db, coll, []indexCatalogEntry{{IndexSpecification: index.IndexSpecification}})
893+
}
894+
}
895+
}
896+
}
897+
814898
// doModifyIndexOption modifies an index property in the target MongoDB.
815899
func (c *Catalog) doModifyIndexOption(
816900
ctx context.Context,
@@ -888,9 +972,6 @@ func (c *Catalog) addIndexesToCatalog(
888972

889973
for i, catIndex := range collCat.Indexes {
890974
if catIndex.Name == index.Name {
891-
lg.Warnf("add indexes: index %q already exists in %q namespace",
892-
index.Name, db+"."+coll)
893-
894975
collCat.Indexes[i] = index
895976
found = true
896977

plm/plm.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -620,7 +620,7 @@ func (ml *PLM) Finalize(ctx context.Context, options FinalizeOptions) error {
620620
return errors.New("initial sync is not completed")
621621
}
622622

623-
lg := log.Ctx(ctx)
623+
lg := log.New("finalize")
624624
lg.Info("Starting Finalization")
625625

626626
if status.Repl.IsRunning() {
@@ -663,7 +663,7 @@ func (ml *PLM) Finalize(ctx context.Context, options FinalizeOptions) error {
663663
go ml.onStateChanged(StateFinalized)
664664
}()
665665

666-
log.New("plm").Info("Finalizing")
666+
lg.Info("Finalizing")
667667

668668
go ml.onStateChanged(StateFinalizing)
669669

tests/test_indexes.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,15 @@ def test_create_unique(t: Testing, phase: Runner.Phase):
3232
t.compare_all()
3333

3434

35+
@pytest.mark.parametrize("phase", [Runner.Phase.APPLY, Runner.Phase.CLONE])
36+
def test_create_non_unique_with_the_same_fields_as_unique(t: Testing, phase: Runner.Phase):
37+
with t.run(phase):
38+
t.source["db_1"]["coll_1"].create_index({"i": 1}, unique=True, name="unique_idx")
39+
t.source["db_1"]["coll_1"].create_index({"i": 1}, name="non_unique_idx")
40+
41+
t.compare_all()
42+
43+
3544
@pytest.mark.parametrize("phase", [Runner.Phase.APPLY, Runner.Phase.CLONE])
3645
def test_create_prepare_unique(t: Testing, phase: Runner.Phase):
3746
with t.run(phase):
@@ -500,10 +509,7 @@ def test_continue_creating_indexes_if_some_fail(t: Testing, phase: Runner.Phase)
500509
name="idx_3",
501510
)
502511

503-
target_idx_count = len(t.target["db_1"]["coll_1"].index_information())
504-
source_idx_count = len(t.source["db_1"]["coll_1"].index_information())
505-
506-
assert source_idx_count - 1 == target_idx_count
512+
t.compare_all()
507513

508514

509515
def test_plm_95_drop_index_for_non_existing_namespace(t: Testing):

0 commit comments

Comments
 (0)