Skip to content

Commit 01f2eee

Browse files
authored
PLM-172: Retryable write operation for bulk writes in repl and clone (#120)
1 parent f041e34 commit 01f2eee

File tree

6 files changed

+93
-62
lines changed

6 files changed

+93
-62
lines changed

plm/bulk.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"golang.org/x/sync/errgroup"
1414

1515
"github.com/percona/percona-link-mongodb/errors"
16+
"github.com/percona/percona-link-mongodb/topo"
1617
)
1718

1819
//nolint:gochecknoglobals
@@ -58,9 +59,13 @@ func (o *clientBulkWrite) Empty() bool {
5859
}
5960

6061
func (o *clientBulkWrite) Do(ctx context.Context, m *mongo.Client) (int, error) {
61-
_, err := m.BulkWrite(ctx, o.writes, clientBulkOptions)
62+
err := topo.RunWithRetry(ctx, func(ctx context.Context) error {
63+
_, err := m.BulkWrite(ctx, o.writes, clientBulkOptions)
64+
65+
return errors.Wrap(err, "bulk write")
66+
}, topo.DefaultRetryInterval, topo.DefaultMaxRetries)
6267
if err != nil {
63-
return 0, errors.Wrap(err, "bulk write")
68+
return 0, err // nolint:wrapcheck
6469
}
6570

6671
size := len(o.writes)
@@ -152,9 +157,14 @@ func (o *collectionBulkWrite) Do(ctx context.Context, m *mongo.Client) (int, err
152157
for ns, ops := range o.writes {
153158
grp.Go(func() error {
154159
mcoll := m.Database(ns.Database).Collection(ns.Collection)
155-
_, err := mcoll.BulkWrite(grpCtx, ops, collectionBulkOptions)
160+
161+
err := topo.RunWithRetry(ctx, func(_ context.Context) error {
162+
_, err := mcoll.BulkWrite(grpCtx, ops, collectionBulkOptions)
163+
164+
return errors.Wrapf(err, "bulk write %q", ns)
165+
}, topo.DefaultRetryInterval, topo.DefaultMaxRetries)
156166
if err != nil {
157-
return errors.Wrapf(err, "bulkWrite %q", ns)
167+
return err // nolint:wrapcheck
158168
}
159169

160170
total.Add(int64(len(ops)))

plm/catalog.go

Lines changed: 47 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -223,10 +223,12 @@ func (c *Catalog) doCreateCollection(
223223
}
224224

225225
err := runWithRetry(ctx, func(ctx context.Context) error {
226-
return c.target.Database(db).RunCommand(ctx, cmd).Err()
226+
err := c.target.Database(db).RunCommand(ctx, cmd).Err()
227+
228+
return errors.Wrapf(err, "create collection %s.%s", db, coll)
227229
})
228230
if err != nil {
229-
return errors.Wrap(err, "create collection")
231+
return err //nolint:wrapcheck
230232
}
231233

232234
log.Ctx(ctx).Debugf("Created collection %s.%s", db, coll)
@@ -256,10 +258,12 @@ func (c *Catalog) doCreateView(
256258
}
257259

258260
err := runWithRetry(ctx, func(ctx context.Context) error {
259-
return c.target.Database(db).RunCommand(ctx, cmd).Err()
261+
err := c.target.Database(db).RunCommand(ctx, cmd).Err()
262+
263+
return errors.Wrapf(err, "create view %s.%s", db, view)
260264
})
261265
if err != nil {
262-
return errors.Wrap(err, "create view")
266+
return err //nolint:wrapcheck
263267
}
264268

265269
log.Ctx(ctx).Debugf("Created view %s.%s", db, view)
@@ -270,7 +274,9 @@ func (c *Catalog) doCreateView(
270274
// DropCollection drops a collection in the target MongoDB.
271275
func (c *Catalog) DropCollection(ctx context.Context, db, coll string) error {
272276
err := runWithRetry(ctx, func(ctx context.Context) error {
273-
return c.target.Database(db).Collection(coll).Drop(ctx)
277+
err := c.target.Database(db).Collection(coll).Drop(ctx)
278+
279+
return errors.Wrapf(err, "drop collection %s.%s", db, coll)
274280
})
275281
if err != nil {
276282
return err //nolint:wrapcheck
@@ -299,10 +305,12 @@ func (c *Catalog) DropDatabase(ctx context.Context, db string) error {
299305
for _, coll := range colls {
300306
eg.Go(func() error {
301307
err := runWithRetry(grpCtx, func(ctx context.Context) error {
302-
return c.target.Database(db).Collection(coll).Drop(ctx)
308+
err := c.target.Database(db).Collection(coll).Drop(ctx)
309+
310+
return errors.Wrapf(err, "drop namespace %s.%s", db, coll)
303311
})
304312
if err != nil {
305-
return errors.Wrapf(err, "drop namespace %s.%s", db, coll)
313+
return err // nolint:wrapcheck
306314
}
307315

308316
lg.Debugf("Dropped collection %s.%s", db, coll)
@@ -382,10 +390,12 @@ func (c *Catalog) CreateIndexes(
382390
// which does not support `prepareUnique`.
383391
for _, index := range idxs {
384392
err := runWithRetry(ctx, func(ctx context.Context) error {
385-
return c.target.Database(db).RunCommand(ctx, bson.D{
393+
err := c.target.Database(db).RunCommand(ctx, bson.D{
386394
{"createIndexes", coll},
387395
{"indexes", bson.A{index}},
388396
}).Err()
397+
398+
return errors.Wrapf(err, "create index %s.%s.%s", db, coll, index.Name)
389399
})
390400
if err != nil {
391401
processedIdxs[index.Name] = err
@@ -512,7 +522,9 @@ func (c *Catalog) ModifyCappedCollection(
512522
}
513523

514524
return runWithRetry(ctx, func(ctx context.Context) error {
515-
return c.target.Database(db).RunCommand(ctx, cmd).Err()
525+
err := c.target.Database(db).RunCommand(ctx, cmd).Err()
526+
527+
return errors.Wrapf(err, "modify capped collection %s.%s", db, coll)
516528
}) //nolint:wrapcheck
517529
}
518530

@@ -525,7 +537,9 @@ func (c *Catalog) ModifyView(ctx context.Context, db, view, viewOn string, pipel
525537
}
526538

527539
return runWithRetry(ctx, func(ctx context.Context) error {
528-
return c.target.Database(db).RunCommand(ctx, cmd).Err()
540+
err := c.target.Database(db).RunCommand(ctx, cmd).Err()
541+
542+
return errors.Wrapf(err, "modify view %s.%s", db, view)
529543
}) //nolint:wrapcheck
530544
}
531545

@@ -541,7 +555,9 @@ func (c *Catalog) ModifyChangeStreamPreAndPostImages(
541555
}
542556

543557
return runWithRetry(ctx, func(ctx context.Context) error {
544-
return c.target.Database(db).RunCommand(ctx, cmd).Err()
558+
err := c.target.Database(db).RunCommand(ctx, cmd).Err()
559+
560+
return errors.Wrapf(err, "modify changeStreamPreAndPostImages %s.%s", db, coll)
545561
}) //nolint:wrapcheck
546562
}
547563

@@ -568,7 +584,9 @@ func (c *Catalog) ModifyValidation(
568584
}
569585

570586
return runWithRetry(ctx, func(ctx context.Context) error {
571-
return c.target.Database(db).RunCommand(ctx, cmd).Err()
587+
err := c.target.Database(db).RunCommand(ctx, cmd).Err()
588+
589+
return errors.Wrapf(err, "modify validation %s.%s", db, coll)
572590
}) //nolint:wrapcheck
573591
}
574592

@@ -584,10 +602,12 @@ func (c *Catalog) ModifyIndex(ctx context.Context, db, coll string, mods *Modify
584602
}
585603

586604
err := runWithRetry(ctx, func(ctx context.Context) error {
587-
return c.target.Database(db).RunCommand(ctx, cmd).Err()
605+
err := c.target.Database(db).RunCommand(ctx, cmd).Err()
606+
607+
return errors.Wrapf(err, "modify index %s.%s.%s", db, coll, mods.Name)
588608
})
589609
if err != nil {
590-
return errors.Wrap(err, "modify index: "+mods.Name)
610+
return err //nolint:wrapcheck
591611
}
592612
}
593613

@@ -629,7 +649,9 @@ func (c *Catalog) Rename(ctx context.Context, db, coll, targetDB, targetColl str
629649
}
630650

631651
err := runWithRetry(ctx, func(ctx context.Context) error {
632-
return c.target.Database("admin").RunCommand(ctx, opts).Err()
652+
err := c.target.Database("admin").RunCommand(ctx, opts).Err()
653+
654+
return errors.Wrapf(err, "rename collection %s.%s to %s.%s", db, coll, targetDB, targetColl)
633655
})
634656
if err != nil {
635657
if topo.IsNamespaceNotFound(err) {
@@ -638,7 +660,7 @@ func (c *Catalog) Rename(ctx context.Context, db, coll, targetDB, targetColl str
638660
return nil
639661
}
640662

641-
return errors.Wrap(err, "rename collection")
663+
return err //nolint:wrapcheck
642664
}
643665

644666
lg.Debugf("Renamed collection %s.%s to %s.%s", db, coll, targetDB, targetColl)
@@ -653,7 +675,9 @@ func (c *Catalog) DropIndex(ctx context.Context, db, coll, index string) error {
653675
lg := log.Ctx(ctx)
654676

655677
err := runWithRetry(ctx, func(ctx context.Context) error {
656-
return c.target.Database(db).Collection(coll).Indexes().DropOne(ctx, index)
678+
err := c.target.Database(db).Collection(coll).Indexes().DropOne(ctx, index)
679+
680+
return errors.Wrapf(err, "drop index %s.%s.%s", db, coll, index)
657681
})
658682
if err != nil {
659683
if !topo.IsNamespaceNotFound(err) && !topo.IsIndexNotFound(err) {
@@ -859,10 +883,12 @@ func (c *Catalog) finalizeUnsuccessfulIndexes(ctx context.Context) {
859883
}
860884

861885
err := runWithRetry(ctx, func(ctx context.Context) error {
862-
return c.target.Database(db).RunCommand(ctx, bson.D{
886+
err := c.target.Database(db).RunCommand(ctx, bson.D{
863887
{"createIndexes", coll},
864888
{"indexes", bson.A{index.IndexSpecification}},
865889
}).Err()
890+
891+
return errors.Wrapf(err, "recreate index %s.%s.%s", db, coll, index.Name)
866892
})
867893
if err != nil {
868894
lg.Warnf("Failed to recreate unsuccessful index %s on %s.%s: %v",
@@ -889,13 +915,15 @@ func (c *Catalog) doModifyIndexOption(
889915
value any,
890916
) error {
891917
return runWithRetry(ctx, func(ctx context.Context) error {
892-
return c.target.Database(db).RunCommand(ctx, bson.D{
918+
err := c.target.Database(db).RunCommand(ctx, bson.D{
893919
{"collMod", coll},
894920
{"index", bson.D{
895921
{"name", index},
896922
{propName, value},
897923
}},
898924
}).Err()
925+
926+
return errors.Wrapf(err, "modify index %s.%s.%s: %s", db, coll, index, propName)
899927
}) //nolint:wrapcheck
900928
}
901929

plm/copy.go

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -491,21 +491,12 @@ func (cm *CopyManager) insertBatch(ctx context.Context, task insertBatchTask) {
491491
startedAt := time.Now()
492492

493493
collection := cm.target.Database(task.Namespace.Database).Collection(task.Namespace.Collection)
494-
_, err := collection.InsertMany(ctx, task.Documents, insertOptions)
495-
if topo.IsRetryableWrite(err) {
496-
zl.Warn().
497-
Err(err).
498-
Uint32("id", task.ID).
499-
Int("size_bytes", task.SizeBytes).
500-
Int("count", len(task.Documents)).
501-
Dur("elapsed", time.Since(startedAt).Round(time.Millisecond)).
502-
Msgf("insert batch %d [RetryableWrite]", task.ID)
503494

504-
startedAt = time.Now()
495+
err := topo.RunWithRetry(ctx, func(ctx context.Context) error {
496+
_, err := collection.InsertMany(ctx, task.Documents, insertOptions)
505497

506-
// try one more time
507-
_, err = collection.InsertMany(ctx, task.Documents, insertOptions)
508-
}
498+
return errors.Wrapf(err, "insert batch: id %d, doc count %d", task.ID, len(task.Documents))
499+
}, topo.DefaultRetryInterval, topo.DefaultMaxRetries)
509500

510501
count := len(task.Documents)
511502

topo/errors.go

Lines changed: 22 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,14 @@
11
package topo
22

33
import (
4+
"context"
45
"strings"
56

67
"go.mongodb.org/mongo-driver/v2/mongo"
78

89
"github.com/percona/percona-link-mongodb/errors"
910
)
1011

11-
// IsRetryableWrite checks if the error has the "RetryableWriteError" label.
12-
func IsRetryableWrite(err error) bool {
13-
for err != nil {
14-
le, ok := err.(mongo.LabeledError) //nolint:errorlint
15-
if ok {
16-
return le.HasErrorLabel("RetryableWriteError")
17-
}
18-
19-
err = errors.Unwrap(err)
20-
}
21-
22-
return false
23-
}
24-
2512
// IsIndexNotFound checks if an error is an index not found error.
2613
func IsIndexNotFound(err error) bool {
2714
return isMongoCommandError(err, "IndexNotFound")
@@ -74,9 +61,18 @@ func isMongoCommandError(err error, name string) bool {
7461
return false
7562
}
7663

77-
// IsTransientError checks if the error is a transient error that can be retried.
64+
// IsTransient checks if the error is a transient error that can be retried.
7865
// It checks for specific MongoDB error codes that indicate transient issues.
79-
func IsTransientError(err error) bool {
66+
func IsTransient(err error) bool {
67+
if mongo.IsNetworkError(err) || mongo.IsTimeout(err) || errors.Is(err, context.DeadlineExceeded) {
68+
return true
69+
}
70+
71+
le, ok := err.(mongo.LabeledError) //nolint:errorlint
72+
if ok && le.HasErrorLabel("RetryableWriteError") {
73+
return true
74+
}
75+
8076
transientErrorCodes := map[int]struct{}{
8177
11602: {}, // InterruptedDueToReplStateChange
8278
91: {}, // ShutdownInProgress
@@ -88,22 +84,23 @@ func IsTransientError(err error) bool {
8884
var wEx mongo.WriteException
8985
if errors.As(err, &wEx) {
9086
for _, we := range wEx.WriteErrors {
91-
if _, ok := transientErrorCodes[we.Code]; ok {
92-
return true
93-
}
87+
_, ok := transientErrorCodes[we.Code]
88+
89+
return ok
9490
}
91+
9592
if wEx.WriteConcernError != nil {
96-
if _, ok := transientErrorCodes[wEx.WriteConcernError.Code]; ok {
97-
return true
98-
}
93+
_, ok := transientErrorCodes[wEx.WriteConcernError.Code]
94+
95+
return ok
9996
}
10097
}
10198

10299
var cmdErr mongo.CommandError
103100
if errors.As(err, &cmdErr) {
104-
if _, ok := transientErrorCodes[int(cmdErr.Code)]; ok {
105-
return true
106-
}
101+
_, ok := transientErrorCodes[int(cmdErr.Code)]
102+
103+
return ok
107104
}
108105

109106
return false

topo/topo.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,13 +211,14 @@ func RunWithRetry(
211211
var err error
212212

213213
currentInterval := retryInterval
214+
214215
for attempt := 1; attempt <= maxRetries; attempt++ {
215216
err = fn(ctx)
216217
if err == nil {
217218
return nil
218219
}
219220

220-
if !IsTransientError(err) {
221+
if !IsTransient(err) {
221222
return err //nolint:wrapcheck
222223
}
223224

topo/topo_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ func TestRunWithRetry_NonTransientError(t *testing.T) {
2525
if !errors.Is(err, nonTransiantErr) {
2626
t.Errorf("expected error %v, got %v", nonTransiantErr, err)
2727
}
28+
2829
if calls != 1 {
2930
t.Errorf("expected fn to be called once, got %d", calls)
3031
}
@@ -51,10 +52,12 @@ func TestRunWithRetry_FalureOnAllRetries(t *testing.T) {
5152
}
5253

5354
maxRetries := 3
55+
5456
err := RunWithRetry(t.Context(), fn, 1*time.Millisecond, maxRetries)
5557
if !errors.As(err, &transientErr) {
5658
t.Errorf("expected error %v, got %v", transientErr, err)
5759
}
60+
5861
if calls != maxRetries {
5962
t.Errorf("expected fn to be called %d times, got %d", maxRetries, calls)
6063
}
@@ -86,6 +89,7 @@ func TestRunWithRetry_SuccessOnRetry(t *testing.T) {
8689
if err != nil {
8790
t.Errorf("expected nil error, got %v", err)
8891
}
92+
8993
if calls != 2 {
9094
t.Errorf("expected fn to be called 2 times, got %d", calls)
9195
}

0 commit comments

Comments
 (0)