Skip to content

Commit 3cbcd97

Browse files
authored
PLM-155: Run commands in catalog with retries for transient errors (#105)
1 parent d0da605 commit 3cbcd97

File tree

4 files changed

+227
-32
lines changed

4 files changed

+227
-32
lines changed

plm/catalog.go

Lines changed: 55 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,9 @@ func (c *Catalog) doCreateCollection(
222222
cmd = append(cmd, bson.E{"indexOptionDefaults", opts.IndexOptionDefaults})
223223
}
224224

225-
err := c.target.Database(db).RunCommand(ctx, cmd).Err()
225+
err := runWithRetry(ctx, func(ctx context.Context) error {
226+
return c.target.Database(db).RunCommand(ctx, cmd).Err()
227+
})
226228
if err != nil {
227229
return errors.Wrap(err, "create collection")
228230
}
@@ -250,7 +252,9 @@ func (c *Catalog) doCreateView(
250252
cmd = append(cmd, bson.E{"collation", opts.Collation})
251253
}
252254

253-
err := c.target.Database(db).RunCommand(ctx, cmd).Err()
255+
err := runWithRetry(ctx, func(ctx context.Context) error {
256+
return c.target.Database(db).RunCommand(ctx, cmd).Err()
257+
})
254258
if err != nil {
255259
return errors.Wrap(err, "create view")
256260
}
@@ -265,7 +269,9 @@ func (c *Catalog) DropCollection(ctx context.Context, db, coll string) error {
265269
c.lock.Lock()
266270
defer c.lock.Unlock()
267271

268-
err := c.target.Database(db).Collection(coll).Drop(ctx)
272+
err := runWithRetry(ctx, func(ctx context.Context) error {
273+
return c.target.Database(db).Collection(coll).Drop(ctx)
274+
})
269275
if err != nil {
270276
return err //nolint:wrapcheck
271277
}
@@ -292,7 +298,9 @@ func (c *Catalog) DropDatabase(ctx context.Context, db string) error {
292298

293299
for _, coll := range colls {
294300
eg.Go(func() error {
295-
err := c.target.Database(db).Collection(coll).Drop(grpCtx)
301+
err := runWithRetry(grpCtx, func(ctx context.Context) error {
302+
return c.target.Database(db).Collection(coll).Drop(ctx)
303+
})
296304
if err != nil {
297305
return errors.Wrapf(err, "drop namespace %s.%s", db, coll)
298306
}
@@ -376,12 +384,13 @@ func (c *Catalog) CreateIndexes(
376384
// NOTE: [mongo.IndexView.CreateMany] uses [mongo.IndexModel]
377385
// which does not support `prepareUnique`.
378386
for _, index := range idxs {
379-
res := c.target.Database(db).RunCommand(ctx, bson.D{
380-
{"createIndexes", coll},
381-
{"indexes", bson.A{index}},
387+
err := runWithRetry(ctx, func(ctx context.Context) error {
388+
return c.target.Database(db).RunCommand(ctx, bson.D{
389+
{"createIndexes", coll},
390+
{"indexes", bson.A{index}},
391+
}).Err()
382392
})
383-
384-
if err := res.Err(); err != nil {
393+
if err != nil {
385394
processedIdxs[index.Name] = err
386395

387396
continue
@@ -468,9 +477,9 @@ func (c *Catalog) ModifyCappedCollection(
468477
cmd = append(cmd, bson.E{"cappedMax", maxDocs})
469478
}
470479

471-
err := c.target.Database(db).RunCommand(ctx, cmd).Err()
472-
473-
return err //nolint:wrapcheck
480+
return runWithRetry(ctx, func(ctx context.Context) error {
481+
return c.target.Database(db).RunCommand(ctx, cmd).Err()
482+
}) //nolint:wrapcheck
474483
}
475484

476485
// ModifyView modifies a view in the target MongoDB.
@@ -483,9 +492,10 @@ func (c *Catalog) ModifyView(ctx context.Context, db, view, viewOn string, pipel
483492
{"viewOn", viewOn},
484493
{"pipeline", pipeline},
485494
}
486-
err := c.target.Database(db).RunCommand(ctx, cmd).Err()
487495

488-
return err //nolint:wrapcheck
496+
return runWithRetry(ctx, func(ctx context.Context) error {
497+
return c.target.Database(db).RunCommand(ctx, cmd).Err()
498+
}) //nolint:wrapcheck
489499
}
490500

491501
func (c *Catalog) ModifyChangeStreamPreAndPostImages(
@@ -501,9 +511,10 @@ func (c *Catalog) ModifyChangeStreamPreAndPostImages(
501511
{"collMod", coll},
502512
{"changeStreamPreAndPostImages", bson.D{{"enabled", enabled}}},
503513
}
504-
err := c.target.Database(db).RunCommand(ctx, cmd).Err()
505514

506-
return err //nolint:wrapcheck
515+
return runWithRetry(ctx, func(ctx context.Context) error {
516+
return c.target.Database(db).RunCommand(ctx, cmd).Err()
517+
}) //nolint:wrapcheck
507518
}
508519

509520
// ModifyCappedCollection modifies a capped collection in the target MongoDB.
@@ -529,9 +540,9 @@ func (c *Catalog) ModifyValidation(
529540
cmd = append(cmd, bson.E{"validationAction", validationAction})
530541
}
531542

532-
err := c.target.Database(db).RunCommand(ctx, cmd).Err()
533-
534-
return err //nolint:wrapcheck
543+
return runWithRetry(ctx, func(ctx context.Context) error {
544+
return c.target.Database(db).RunCommand(ctx, cmd).Err()
545+
}) //nolint:wrapcheck
535546
}
536547

537548
// ModifyIndex modifies an index in the target MongoDB.
@@ -548,7 +559,9 @@ func (c *Catalog) ModifyIndex(ctx context.Context, db, coll string, mods *Modify
548559
}},
549560
}
550561

551-
err := c.target.Database(db).RunCommand(ctx, cmd).Err()
562+
err := runWithRetry(ctx, func(ctx context.Context) error {
563+
return c.target.Database(db).RunCommand(ctx, cmd).Err()
564+
})
552565
if err != nil {
553566
return errors.Wrap(err, "modify index: "+mods.Name)
554567
}
@@ -594,7 +607,9 @@ func (c *Catalog) Rename(ctx context.Context, db, coll, targetDB, targetColl str
594607
{"dropTarget", true},
595608
}
596609

597-
err := c.target.Database("admin").RunCommand(ctx, opts).Err()
610+
err := runWithRetry(ctx, func(ctx context.Context) error {
611+
return c.target.Database("admin").RunCommand(ctx, opts).Err()
612+
})
598613
if err != nil {
599614
if topo.IsNamespaceNotFound(err) {
600615
lg.Errorf(err, "")
@@ -618,7 +633,9 @@ func (c *Catalog) DropIndex(ctx context.Context, db, coll, index string) error {
618633

619634
lg := log.Ctx(ctx)
620635

621-
err := c.target.Database(db).Collection(coll).Indexes().DropOne(ctx, index)
636+
err := runWithRetry(ctx, func(ctx context.Context) error {
637+
return c.target.Database(db).Collection(coll).Indexes().DropOne(ctx, index)
638+
})
622639
if err != nil {
623640
if !topo.IsNamespaceNotFound(err) && !topo.IsIndexNotFound(err) {
624641
return err //nolint:wrapcheck
@@ -803,15 +820,15 @@ func (c *Catalog) doModifyIndexOption(
803820
propName string,
804821
value any,
805822
) error {
806-
res := c.target.Database(db).RunCommand(ctx, bson.D{
807-
{"collMod", coll},
808-
{"index", bson.D{
809-
{"name", index},
810-
{propName, value},
811-
}},
812-
})
813-
814-
return res.Err() //nolint:wrapcheck
823+
return runWithRetry(ctx, func(ctx context.Context) error {
824+
return c.target.Database(db).RunCommand(ctx, bson.D{
825+
{"collMod", coll},
826+
{"index", bson.D{
827+
{"name", index},
828+
{propName, value},
829+
}},
830+
}).Err()
831+
}) //nolint:wrapcheck
815832
}
816833

817834
// getIndexFromCatalog gets an index spec from the catalog.
@@ -1009,3 +1026,10 @@ func (c *Catalog) renameCollectionInCatalog(
10091026
c.deleteCollectionFromCatalog(ctx, db, coll)
10101027
lg.Debugf("Collection renamed in catalog %s.%s to %s.%s", db, coll, targetDB, targetColl)
10111028
}
1029+
1030+
func runWithRetry(
1031+
ctx context.Context,
1032+
fn func(context.Context) error,
1033+
) error {
1034+
return topo.RunWithRetry(ctx, fn, topo.DefaultRetryInterval, topo.DefaultMaxRetries) //nolint:wrapcheck
1035+
}

topo/errors.go

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
package topo
22

33
import (
4-
"errors"
54
"strings"
65

76
"go.mongodb.org/mongo-driver/v2/mongo"
7+
8+
"github.com/percona/percona-link-mongodb/errors"
89
)
910

1011
// IsRetryableWrite checks if the error has the "RetryableWriteError" label.
@@ -72,3 +73,38 @@ func isMongoCommandError(err error, name string) bool {
7273

7374
return false
7475
}
76+
77+
// IsTransientError checks if the error is a transient error that can be retried.
78+
// It checks for specific MongoDB error codes that indicate transient issues.
79+
func IsTransientError(err error) bool {
80+
transientErrorCodes := map[int]struct{}{
81+
11602: {}, // InterruptedDueToReplStateChange
82+
91: {}, // ShutdownInProgress
83+
189: {}, // PrimarySteppedDown
84+
10107: {}, // NotWritablePrimary
85+
13435: {}, // NotPrimaryNoSecondaryOk
86+
}
87+
88+
var wEx mongo.WriteException
89+
if errors.As(err, &wEx) {
90+
for _, we := range wEx.WriteErrors {
91+
if _, ok := transientErrorCodes[we.Code]; ok {
92+
return true
93+
}
94+
}
95+
if wEx.WriteConcernError != nil {
96+
if _, ok := transientErrorCodes[wEx.WriteConcernError.Code]; ok {
97+
return true
98+
}
99+
}
100+
}
101+
102+
var cmdErr mongo.CommandError
103+
if errors.As(err, &cmdErr) {
104+
if _, ok := transientErrorCodes[int(cmdErr.Code)]; ok {
105+
return true
106+
}
107+
}
108+
109+
return false
110+
}

topo/topo.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,13 @@ import (
1313
"github.com/percona/percona-link-mongodb/util"
1414
)
1515

16+
const (
17+
// DefaultRetryInterval is the default interval for retrying transient errors.
18+
DefaultRetryInterval = 5 * time.Second
19+
// DefaultMaxRetries is the default maximum number of retries for transient errors.
20+
DefaultMaxRetries = 3
21+
)
22+
1623
// errMissingClusterTime is returned when the cluster time is missing.
1724
var errMissingClusterTime = errors.New("missig clusterTime")
1825

@@ -187,3 +194,39 @@ func GetCollStats(ctx context.Context, m *mongo.Client, db, coll string) (*CollS
187194

188195
return stats, nil
189196
}
197+
198+
// RunWithRetry executes the provided function with retry logic for transient errors.
199+
// It retries the function up to maxRetries times,
200+
// with an exponential backoff starting from retryInterval.
201+
func RunWithRetry(
202+
ctx context.Context,
203+
fn func(context.Context) error,
204+
retryInterval time.Duration,
205+
maxRetries int,
206+
) error {
207+
if retryInterval <= 0 || maxRetries <= 0 {
208+
return errors.New("retryInterval and maxRetries must be greater than zero")
209+
}
210+
211+
var err error
212+
213+
currentInterval := retryInterval
214+
for attempt := 1; attempt <= maxRetries; attempt++ {
215+
err = fn(ctx)
216+
if err == nil {
217+
return nil
218+
}
219+
220+
if !IsTransientError(err) {
221+
return err //nolint:wrapcheck
222+
}
223+
224+
log.Ctx(ctx).Warnf("Transient write error: %v, retry attempt %d retrying in %s",
225+
err, attempt, currentInterval)
226+
227+
time.Sleep(currentInterval)
228+
currentInterval *= 2
229+
}
230+
231+
return err //nolint:wrapcheck
232+
}

topo/topo_test.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package topo //nolint:testpackage
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
"time"
8+
9+
"go.mongodb.org/mongo-driver/v2/mongo"
10+
)
11+
12+
func TestRunWithRetry_NonTransientError(t *testing.T) {
13+
t.Parallel()
14+
15+
nonTransiantErr := errors.New("non-transient error")
16+
calls := 0
17+
18+
fn := func(_ context.Context) error {
19+
calls++
20+
21+
return nonTransiantErr
22+
}
23+
24+
err := RunWithRetry(t.Context(), fn, 10*time.Millisecond, 2)
25+
if !errors.Is(err, nonTransiantErr) {
26+
t.Errorf("expected error %v, got %v", nonTransiantErr, err)
27+
}
28+
if calls != 1 {
29+
t.Errorf("expected fn to be called once, got %d", calls)
30+
}
31+
}
32+
33+
func TestRunWithRetry_FalureOnAllRetries(t *testing.T) {
34+
t.Parallel()
35+
36+
transientErr := mongo.WriteException{
37+
WriteErrors: []mongo.WriteError{
38+
{
39+
Code: 91, // ShutdownInProgress
40+
Message: "transient error",
41+
},
42+
},
43+
}
44+
45+
calls := 0
46+
47+
fn := func(_ context.Context) error {
48+
calls++
49+
50+
return transientErr
51+
}
52+
53+
maxRetries := 3
54+
err := RunWithRetry(t.Context(), fn, 1*time.Millisecond, maxRetries)
55+
if errors.Is(err, &transientErr) {
56+
t.Errorf("expected error %v, got %v", transientErr, err)
57+
}
58+
if calls != maxRetries {
59+
t.Errorf("expected fn to be called %d times, got %d", maxRetries, calls)
60+
}
61+
}
62+
63+
func TestRunWithRetry_SuccessOnRetry(t *testing.T) {
64+
t.Parallel()
65+
66+
transientErr := mongo.WriteException{
67+
WriteErrors: []mongo.WriteError{
68+
{
69+
Code: 91, // ShutdownInProgress
70+
Message: "transient error",
71+
},
72+
},
73+
}
74+
calls := 0
75+
76+
fn := func(_ context.Context) error {
77+
calls++
78+
if calls < 2 {
79+
return transientErr
80+
}
81+
82+
return nil
83+
}
84+
85+
err := RunWithRetry(t.Context(), fn, 1*time.Millisecond, 3)
86+
if err != nil {
87+
t.Errorf("expected nil error, got %v", err)
88+
}
89+
if calls != 2 {
90+
t.Errorf("expected fn to be called 2 times, got %d", calls)
91+
}
92+
}

0 commit comments

Comments
 (0)