Skip to content

Commit e714589

Browse files
authored
PLM-173: Improve locking in catalog (#116)
1 parent f49bfe2 commit e714589

File tree

3 files changed

+40
-40
lines changed

3 files changed

+40
-40
lines changed

CODEOWNERS

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
* @inelpandzic
1+
* @inelpandzic @boris-ilijic

plm/catalog.go

Lines changed: 37 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -160,9 +160,6 @@ func (c *Catalog) CreateCollection(
160160
coll string,
161161
opts *CreateCollectionOptions,
162162
) error {
163-
c.lock.Lock()
164-
defer c.lock.Unlock()
165-
166163
if opts.ViewOn != "" {
167164
if strings.HasPrefix(opts.ViewOn, TimeseriesPrefix) {
168165
return errors.New("timeseries is not supported: " + db + "." + coll)
@@ -208,9 +205,11 @@ func (c *Catalog) doCreateCollection(
208205
if opts.Validator != nil {
209206
cmd = append(cmd, bson.E{"validator", opts.Validator})
210207
}
208+
211209
if opts.ValidationLevel != nil {
212210
cmd = append(cmd, bson.E{"validationLevel", opts.ValidationLevel})
213211
}
212+
214213
if opts.ValidationAction != nil {
215214
cmd = append(cmd, bson.E{"validationAction", opts.ValidationAction})
216215
}
@@ -229,9 +228,12 @@ func (c *Catalog) doCreateCollection(
229228
if err != nil {
230229
return errors.Wrap(err, "create collection")
231230
}
231+
232232
log.Ctx(ctx).Debugf("Created collection %s.%s", db, coll)
233233

234+
c.lock.Lock()
234235
c.addCollectionToCatalog(ctx, db, coll)
236+
c.lock.Unlock()
235237

236238
return nil
237239
}
@@ -267,27 +269,24 @@ func (c *Catalog) doCreateView(
267269

268270
// DropCollection drops a collection in the target MongoDB.
269271
func (c *Catalog) DropCollection(ctx context.Context, db, coll string) error {
270-
c.lock.Lock()
271-
defer c.lock.Unlock()
272-
273272
err := runWithRetry(ctx, func(ctx context.Context) error {
274273
return c.target.Database(db).Collection(coll).Drop(ctx)
275274
})
276275
if err != nil {
277276
return err //nolint:wrapcheck
278277
}
278+
279279
log.Ctx(ctx).Debugf("Dropped collection %s.%s", db, coll)
280280

281+
c.lock.Lock()
281282
c.deleteCollectionFromCatalog(ctx, db, coll)
283+
c.lock.Unlock()
282284

283285
return nil
284286
}
285287

286288
// DropDatabase drops a database in the target MongoDB.
287289
func (c *Catalog) DropDatabase(ctx context.Context, db string) error {
288-
c.lock.Lock()
289-
defer c.lock.Unlock()
290-
291290
lg := log.Ctx(ctx)
292291

293292
colls, err := topo.ListCollectionNames(ctx, c.target, db)
@@ -326,9 +325,6 @@ func (c *Catalog) CreateIndexes(
326325
coll string,
327326
indexes []*topo.IndexSpecification,
328327
) error {
329-
c.lock.Lock()
330-
defer c.lock.Unlock()
331-
332328
lg := log.Ctx(ctx)
333329

334330
if len(indexes) == 0 {
@@ -404,10 +400,12 @@ func (c *Catalog) CreateIndexes(
404400
successfulIdxNames := make([]string, 0, len(processedIdxs))
405401

406402
failedIdxs := make([]*topo.IndexSpecification, 0, len(processedIdxs))
403+
407404
var idxErrors []error
408405

409406
for _, idx := range indexes {
410-
if err := processedIdxs[idx.Name]; err != nil {
407+
err := processedIdxs[idx.Name]
408+
if err != nil {
411409
failedIdxs = append(failedIdxs, idx)
412410
idxErrors = append(idxErrors, errors.Wrap(err, "create index: "+idx.Name))
413411

@@ -419,7 +417,10 @@ func (c *Catalog) CreateIndexes(
419417
}
420418

421419
lg.Debugf("Created indexes on %s.%s: %s", db, coll, strings.Join(successfulIdxNames, ", "))
420+
421+
c.lock.Lock()
422422
c.addIndexesToCatalog(ctx, db, coll, successfulIdxs)
423+
c.lock.Unlock()
423424

424425
if len(idxErrors) > 0 {
425426
c.AddFailedIndexes(ctx, db, coll, failedIdxs)
@@ -439,9 +440,6 @@ func (c *Catalog) AddIncompleteIndexes(
439440
coll string,
440441
indexes []*topo.IndexSpecification,
441442
) {
442-
c.lock.Lock()
443-
defer c.lock.Unlock()
444-
445443
lg := log.Ctx(ctx)
446444

447445
if len(indexes) == 0 {
@@ -460,7 +458,9 @@ func (c *Catalog) AddIncompleteIndexes(
460458
lg.Tracef("Added incomplete index %q for %s.%s to catalog", index.Name, db, coll)
461459
}
462460

461+
c.lock.Lock()
463462
c.addIndexesToCatalog(ctx, db, coll, indexEntries)
463+
c.lock.Unlock()
464464
}
465465

466466
// AddFailedIndexes adds indexes in the catalog that failed to create on the target cluster.
@@ -489,7 +489,9 @@ func (c *Catalog) AddFailedIndexes(
489489
lg.Tracef("Added failed index %q for %s.%s to catalog", index.Name, db, coll)
490490
}
491491

492+
c.lock.Lock()
492493
c.addIndexesToCatalog(ctx, db, coll, indexEntries)
494+
c.lock.Unlock()
493495
}
494496

495497
// ModifyCappedCollection modifies a capped collection in the target MongoDB.
@@ -500,9 +502,6 @@ func (c *Catalog) ModifyCappedCollection(
500502
sizeBytes *int64,
501503
maxDocs *int64,
502504
) error {
503-
c.lock.Lock()
504-
defer c.lock.Unlock()
505-
506505
cmd := bson.D{{"collMod", coll}}
507506
if sizeBytes != nil {
508507
cmd = append(cmd, bson.E{"cappedSize", sizeBytes})
@@ -519,9 +518,6 @@ func (c *Catalog) ModifyCappedCollection(
519518

520519
// ModifyView modifies a view in the target MongoDB.
521520
func (c *Catalog) ModifyView(ctx context.Context, db, view, viewOn string, pipeline any) error {
522-
c.lock.Lock()
523-
defer c.lock.Unlock()
524-
525521
cmd := bson.D{
526522
{"collMod", view},
527523
{"viewOn", viewOn},
@@ -539,9 +535,6 @@ func (c *Catalog) ModifyChangeStreamPreAndPostImages(
539535
coll string,
540536
enabled bool,
541537
) error {
542-
c.lock.Lock()
543-
defer c.lock.Unlock()
544-
545538
cmd := bson.D{
546539
{"collMod", coll},
547540
{"changeStreamPreAndPostImages", bson.D{{"enabled", enabled}}},
@@ -561,16 +554,15 @@ func (c *Catalog) ModifyValidation(
561554
validationLevel *string,
562555
validationAction *string,
563556
) error {
564-
c.lock.Lock()
565-
defer c.lock.Unlock()
566-
567557
cmd := bson.D{{"collMod", coll}}
568558
if validator != nil {
569559
cmd = append(cmd, bson.E{"validator", validator})
570560
}
561+
571562
if validationLevel != nil {
572563
cmd = append(cmd, bson.E{"validationLevel", validationLevel})
573564
}
565+
574566
if validationAction != nil {
575567
cmd = append(cmd, bson.E{"validationAction", validationAction})
576568
}
@@ -582,9 +574,6 @@ func (c *Catalog) ModifyValidation(
582574

583575
// ModifyIndex modifies an index in the target MongoDB.
584576
func (c *Catalog) ModifyIndex(ctx context.Context, db, coll string, mods *ModifyIndexOption) error {
585-
c.lock.Lock()
586-
defer c.lock.Unlock()
587-
588577
if mods.ExpireAfterSeconds != nil {
589578
cmd := bson.D{
590579
{"collMod", coll},
@@ -631,9 +620,6 @@ func (c *Catalog) ModifyIndex(ctx context.Context, db, coll string, mods *Modify
631620
}
632621

633622
func (c *Catalog) Rename(ctx context.Context, db, coll, targetDB, targetColl string) error {
634-
c.lock.Lock()
635-
defer c.lock.Unlock()
636-
637623
lg := log.Ctx(ctx)
638624

639625
opts := bson.D{
@@ -654,6 +640,7 @@ func (c *Catalog) Rename(ctx context.Context, db, coll, targetDB, targetColl str
654640

655641
return errors.Wrap(err, "rename collection")
656642
}
643+
657644
lg.Debugf("Renamed collection %s.%s to %s.%s", db, coll, targetDB, targetColl)
658645

659646
c.renameCollectionInCatalog(ctx, db, coll, targetDB, targetColl)
@@ -663,9 +650,6 @@ func (c *Catalog) Rename(ctx context.Context, db, coll, targetDB, targetColl str
663650

664651
// DropIndex drops an index in the target MongoDB.
665652
func (c *Catalog) DropIndex(ctx context.Context, db, coll, index string) error {
666-
c.lock.Lock()
667-
defer c.lock.Unlock()
668-
669653
lg := log.Ctx(ctx)
670654

671655
err := runWithRetry(ctx, func(ctx context.Context) error {
@@ -917,6 +901,9 @@ func (c *Catalog) doModifyIndexOption(
917901

918902
// getIndexFromCatalog gets an index spec from the catalog.
919903
func (c *Catalog) getIndexFromCatalog(db, coll, index string) *topo.IndexSpecification {
904+
c.lock.RLock()
905+
defer c.lock.RUnlock()
906+
920907
dbCat, ok := c.Databases[db]
921908
if !ok || len(dbCat.Collections) == 0 {
922909
return nil
@@ -967,6 +954,7 @@ func (c *Catalog) addIndexesToCatalog(
967954
}
968955

969956
idxNames := make([]string, 0, len(collCat.Indexes))
957+
970958
for _, index := range indexes {
971959
found := false
972960

@@ -992,6 +980,9 @@ func (c *Catalog) addIndexesToCatalog(
992980

993981
// removeIndexFromCatalog removes an index from the catalog.
994982
func (c *Catalog) removeIndexFromCatalog(ctx context.Context, db, coll, index string) {
983+
c.lock.Lock()
984+
defer c.lock.Unlock()
985+
995986
lg := log.Ctx(ctx)
996987

997988
databaseEntry, ok := c.Databases[db]
@@ -1068,13 +1059,18 @@ func (c *Catalog) deleteCollectionFromCatalog(ctx context.Context, db, coll stri
10681059
}
10691060

10701061
delete(databaseEntry.Collections, coll)
1062+
10711063
if len(databaseEntry.Collections) == 0 {
10721064
delete(c.Databases, db)
10731065
}
1066+
10741067
log.Ctx(ctx).Debugf("Collection deleted from catalog %s.%s", db, coll)
10751068
}
10761069

10771070
func (c *Catalog) deleteDatabaseFromCatalog(ctx context.Context, db string) {
1071+
c.lock.Lock()
1072+
defer c.lock.Unlock()
1073+
10781074
delete(c.Databases, db)
10791075
log.Ctx(ctx).Debugf("Database deleted from catalog %s", db)
10801076
}
@@ -1086,6 +1082,9 @@ func (c *Catalog) renameCollectionInCatalog(
10861082
targetDB string,
10871083
targetColl string,
10881084
) {
1085+
c.lock.Lock()
1086+
defer c.lock.Unlock()
1087+
10891088
lg := log.Ctx(ctx)
10901089

10911090
databaseEntry, ok := c.Databases[db]
@@ -1105,6 +1104,7 @@ func (c *Catalog) renameCollectionInCatalog(
11051104
c.addCollectionToCatalog(ctx, targetDB, targetColl)
11061105
c.Databases[targetDB].Collections[targetColl] = collectionEntry
11071106
c.deleteCollectionFromCatalog(ctx, db, coll)
1107+
11081108
lg.Debugf("Collection renamed in catalog %s.%s to %s.%s", db, coll, targetDB, targetColl)
11091109
}
11101110

tests/test_collections.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ def test_create_view_with_collation(t: Testing, phase: Runner.Phase):
127127
t.compare_all()
128128

129129

130-
@pytest.mark.timeout(20)
130+
@pytest.mark.timeout(40)
131131
@pytest.mark.parametrize("phase", [Runner.Phase.APPLY, Runner.Phase.CLONE])
132132
def test_create_timeseries_ignored(t: Testing, phase: Runner.Phase):
133133
with t.run(phase):
@@ -329,7 +329,7 @@ def test_modify_view(t: Testing, phase: Runner.Phase):
329329
t.compare_all()
330330

331331

332-
@pytest.mark.timeout(20)
332+
@pytest.mark.timeout(40)
333333
@pytest.mark.parametrize("phase", [Runner.Phase.APPLY, Runner.Phase.CLONE])
334334
def test_modify_timeseries_options_ignored(t: Testing, phase: Runner.Phase):
335335
t.source["db_1"].create_collection(

0 commit comments

Comments
 (0)