Skip to content

Commit f5d9d2a

Browse files
PCSM-221. Add test to reproduce pcsm duplicate key error when handling shard key updates (#144)
Co-authored-by: Inel Pandzic <[email protected]>
1 parent 4edbbf8 commit f5d9d2a

File tree

13 files changed

+268
-33
lines changed

13 files changed

+268
-33
lines changed

.golangci.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,18 @@ linters:
1212
- nolintlint
1313
- varnamelen
1414
- wsl
15+
- wsl_v5
1516
- funcorder
1617
settings:
1718
govet:
1819
disable:
1920
- composites
2021
misspell:
2122
locale: US
23+
revive:
24+
rules:
25+
- name: var-naming
26+
disabled: true
2227
exclusions:
2328
generated: lax
2429
presets:

config/values.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func UseTargetClientCompressors() []string {
7474
allowCompressors := []string{"zstd", "zlib", "snappy"}
7575

7676
rv := make([]string, 0, min(len(s), len(allowCompressors)))
77-
for _, a := range strings.Split(s, ",") {
77+
for a := range strings.SplitSeq(s, ",") {
7878
a = strings.TrimSpace(a)
7979
if slices.Contains(allowCompressors, a) && !slices.Contains(rv, a) {
8080
rv = append(rv, a)
@@ -90,7 +90,8 @@ func UseTargetClientCompressors() []string {
9090
// DefaultMongoDBCliOperationTimeout is used.
9191
func OperationMongoDBCliTimeout() time.Duration {
9292
if v := strings.TrimSpace(os.Getenv("PCSM_MONGODB_CLI_OPERATION_TIMEOUT")); v != "" {
93-
if d, err := time.ParseDuration(v); err == nil && d > 0 {
93+
d, err := time.ParseDuration(v)
94+
if err == nil && d > 0 {
9495
return d
9596
}
9697
}

main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -679,7 +679,8 @@ func (s *server) handleStatus(w http.ResponseWriter, r *http.Request) {
679679
State: status.State,
680680
}
681681

682-
if err := status.Error; err != nil {
682+
err := status.Error
683+
if err != nil {
683684
res.Err = err.Error()
684685
}
685686

metrics/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func AddCopyReadDocumentCount(v int) {
125125
copyReadDocumentTotal.Add(float64(v))
126126
}
127127

128-
// AddCopyReadDocumentCount increments the total count of the inserted documents.
128+
// AddCopyInsertDocumentCount increments the total count of the inserted documents.
129129
func AddCopyInsertDocumentCount(v int) {
130130
copyInsertDocumentTotal.Add(float64(v))
131131
}

pcsm/bulk.go

Lines changed: 193 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"golang.org/x/sync/errgroup"
1414

1515
"github.com/percona/percona-clustersync-mongodb/errors"
16+
"github.com/percona/percona-clustersync-mongodb/log"
1617
"github.com/percona/percona-clustersync-mongodb/topo"
1718
)
1819

@@ -64,20 +65,94 @@ func (o *clientBulkWrite) Empty() bool {
6465
}
6566

6667
func (o *clientBulkWrite) Do(ctx context.Context, m *mongo.Client) (int, error) {
67-
err := topo.RunWithRetry(ctx, func(ctx context.Context) error {
68-
_, err := m.BulkWrite(ctx, o.writes, clientBulkOptions)
68+
totalSize := len(o.writes)
6969

70-
return errors.Wrap(err, "bulk write")
71-
}, topo.DefaultRetryInterval, topo.DefaultMaxRetries)
70+
err := o.doWithRetry(ctx, m, o.writes)
7271
if err != nil {
7372
return 0, err // nolint:wrapcheck
7473
}
7574

76-
size := len(o.writes)
7775
clear(o.writes)
7876
o.writes = o.writes[:0]
7977

80-
return size, nil
78+
return totalSize, nil
79+
}
80+
81+
// doWithRetry executes bulk write operations with retry logic for duplicate key errors.
82+
// In ordered mode, when an error occurs at index N, operations 0..N-1 are applied,
83+
// operation N fails, and N+1..end are never executed. This function handles operation N
84+
// and retries the remaining operations recursively.
85+
func (o *clientBulkWrite) doWithRetry(
86+
ctx context.Context,
87+
m *mongo.Client,
88+
bulkWrites []mongo.ClientBulkWrite,
89+
) error {
90+
if len(bulkWrites) == 0 {
91+
return nil
92+
}
93+
94+
var bulkErr error
95+
96+
err := topo.RunWithRetry(ctx, func(ctx context.Context) error {
97+
_, err := m.BulkWrite(ctx, bulkWrites, clientBulkOptions)
98+
bulkErr = err
99+
100+
return errors.Wrap(err, "bulk write")
101+
}, topo.DefaultRetryInterval, topo.DefaultMaxRetries)
102+
if err == nil {
103+
return nil
104+
}
105+
106+
// Try to handle duplicate key error with fallback
107+
idx, replacement := o.extractDuplicateKeyReplacement(bulkErr, bulkWrites)
108+
if replacement == nil {
109+
return err //nolint:wrapcheck
110+
}
111+
112+
write := bulkWrites[idx]
113+
coll := m.Database(write.Database).Collection(write.Collection)
114+
115+
err = handleDuplicateKeyError(ctx, coll, replacement)
116+
if err != nil {
117+
return err
118+
}
119+
120+
// Retry remaining operations (from index+1 onwards)
121+
// These operations were never executed due to ordered semantics
122+
return o.doWithRetry(ctx, m, bulkWrites[idx+1:])
123+
}
124+
125+
// extractDuplicateKeyReplacement checks if the error is a duplicate key error on a ReplaceOne
126+
// operation and returns the index and replacement document. Returns -1, nil if not applicable.
127+
func (o *clientBulkWrite) extractDuplicateKeyReplacement(
128+
bulkErr error,
129+
writes []mongo.ClientBulkWrite,
130+
) (int, any) {
131+
var bwe mongo.ClientBulkWriteException
132+
if !errors.As(bulkErr, &bwe) || len(bwe.WriteErrors) == 0 {
133+
return -1, nil
134+
}
135+
136+
// Find the minimum index in the WriteErrors map
137+
// (in ordered mode, there should only be one error)
138+
minIdx := -1
139+
for idx := range bwe.WriteErrors {
140+
if minIdx == -1 || idx < minIdx {
141+
minIdx = idx
142+
}
143+
}
144+
145+
firstErr := bwe.WriteErrors[minIdx]
146+
if !mongo.IsDuplicateKeyError(firstErr) || minIdx < 0 || minIdx >= len(writes) {
147+
return -1, nil
148+
}
149+
150+
replaceModel, ok := writes[minIdx].Model.(*mongo.ClientReplaceOneModel)
151+
if !ok {
152+
return -1, nil
153+
}
154+
155+
return minIdx, replaceModel.Replacement
81156
}
82157

83158
func (o *clientBulkWrite) Insert(ns Namespace, event *InsertEvent) {
@@ -194,11 +269,7 @@ func (o *collectionBulkWrite) Do(ctx context.Context, m *mongo.Client) (int, err
194269
grp.Go(func() error {
195270
mcoll := m.Database(namespace.Database).Collection(namespace.Collection)
196271

197-
err := topo.RunWithRetry(ctx, func(_ context.Context) error {
198-
_, err := mcoll.BulkWrite(grpCtx, ops, collectionBulkOptions)
199-
200-
return errors.Wrapf(err, "bulk write %q", namespace)
201-
}, topo.DefaultRetryInterval, topo.DefaultMaxRetries)
272+
err := o.doWithRetry(grpCtx, mcoll, namespace, ops)
202273
if err != nil {
203274
return err // nolint:wrapcheck
204275
}
@@ -220,6 +291,72 @@ func (o *collectionBulkWrite) Do(ctx context.Context, m *mongo.Client) (int, err
220291
return int(total.Load()), nil
221292
}
222293

294+
// doWithRetry executes bulk write operations for a single namespace with retry logic for duplicate key errors.
295+
// In ordered mode, when an error occurs at index N, operations 0..N-1 are applied,
296+
// operation N fails, and N+1..end are never executed. This function handles operation N
297+
// and retries the remaining operations recursively.
298+
func (o *collectionBulkWrite) doWithRetry(
299+
ctx context.Context,
300+
coll *mongo.Collection,
301+
namespace Namespace,
302+
bulkWrites []mongo.WriteModel,
303+
) error {
304+
if len(bulkWrites) == 0 {
305+
return nil
306+
}
307+
308+
var bulkErr error
309+
310+
err := topo.RunWithRetry(ctx, func(_ context.Context) error {
311+
_, err := coll.BulkWrite(ctx, bulkWrites, collectionBulkOptions)
312+
bulkErr = err
313+
314+
return errors.Wrapf(err, "bulk write %q", namespace)
315+
}, topo.DefaultRetryInterval, topo.DefaultMaxRetries)
316+
if err == nil {
317+
return nil
318+
}
319+
320+
// Try to handle duplicate key error with fallback
321+
idx, replacement := o.extractDuplicateKeyReplacement(bulkErr, bulkWrites)
322+
if replacement == nil {
323+
return err //nolint:wrapcheck
324+
}
325+
326+
err = handleDuplicateKeyError(ctx, coll, replacement)
327+
if err != nil {
328+
return err
329+
}
330+
331+
// Retry remaining operations (from index+1 onwards)
332+
// These operations were never executed due to ordered semantics
333+
return o.doWithRetry(ctx, coll, namespace, bulkWrites[idx+1:])
334+
}
335+
336+
// extractDuplicateKeyReplacement checks if the error is a duplicate key error on a ReplaceOne
337+
// operation and returns the index and replacement document. Returns -1, nil if not applicable.
338+
func (o *collectionBulkWrite) extractDuplicateKeyReplacement(
339+
bulkErr error,
340+
ops []mongo.WriteModel,
341+
) (int, any) {
342+
var bwe mongo.BulkWriteException
343+
if !errors.As(bulkErr, &bwe) || len(bwe.WriteErrors) == 0 {
344+
return -1, nil
345+
}
346+
347+
firstErr := bwe.WriteErrors[0]
348+
if !mongo.IsDuplicateKeyError(firstErr) || firstErr.Index < 0 || firstErr.Index >= len(ops) {
349+
return -1, nil
350+
}
351+
352+
replaceModel, ok := ops[firstErr.Index].(*mongo.ReplaceOneModel)
353+
if !ok {
354+
return -1, nil
355+
}
356+
357+
return firstErr.Index, replaceModel.Replacement
358+
}
359+
223360
func (o *collectionBulkWrite) Insert(ns Namespace, event *InsertEvent) {
224361
missingShardKeys := bson.D{}
225362

@@ -296,6 +433,51 @@ func (o *collectionBulkWrite) Delete(ns Namespace, event *DeleteEvent) {
296433
o.count++
297434
}
298435

436+
// handleDuplicateKeyError handles a duplicate key error on ReplaceOne by performing delete+insert.
437+
func handleDuplicateKeyError(ctx context.Context, coll *mongo.Collection, replacement any) error {
438+
// Extract _id from the replacement document
439+
var doc bson.D
440+
441+
data, err := bson.Marshal(replacement)
442+
if err != nil {
443+
return errors.Wrap(err, "marshal replacement document")
444+
}
445+
446+
err = bson.Unmarshal(data, &doc)
447+
if err != nil {
448+
return errors.Wrap(err, "unmarshal replacement document")
449+
}
450+
451+
// Find _id in document
452+
var docID any
453+
for _, elem := range doc {
454+
if elem.Key == "_id" {
455+
docID = elem.Value
456+
457+
break
458+
}
459+
}
460+
461+
if docID == nil {
462+
return errors.New("no _id found in replacement document")
463+
}
464+
465+
log.Ctx(ctx).With(log.NS(coll.Database().Name(), coll.Name())).
466+
Infof("Retrying with delete+insert fallback for _id: %v", docID)
467+
468+
_, err = coll.DeleteOne(ctx, bson.D{{"_id", docID}})
469+
if err != nil && !errors.Is(err, mongo.ErrNoDocuments) {
470+
return errors.Wrap(err, "delete before insert")
471+
}
472+
473+
_, err = coll.InsertOne(ctx, replacement)
474+
if err != nil {
475+
return errors.Wrap(err, "insert after delete")
476+
}
477+
478+
return nil
479+
}
480+
299481
func collectUpdateOps(event *UpdateEvent) any {
300482
for _, trunc := range event.UpdateDescription.TruncatedArrays {
301483
for _, update := range event.UpdateDescription.UpdatedFields {

pcsm/clone.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -760,10 +760,7 @@ func (c *Clone) createIndexes(ctx context.Context, ns Namespace) error {
760760
return nil
761761
}
762762

763-
builtIndexesCap := len(indexes) - len(unfinishedBuilds)
764-
if builtIndexesCap < 0 {
765-
builtIndexesCap = 0
766-
}
763+
builtIndexesCap := max(len(indexes)-len(unfinishedBuilds), 0)
767764

768765
builtIndexes := make([]*topo.IndexSpecification, 0, builtIndexesCap)
769766

pcsm/copy_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,8 @@ func BenchmarkRead(b *testing.B) {
132132
}
133133
}
134134

135-
if err = cur.Err(); err != nil {
135+
err = cur.Err()
136+
if err != nil {
136137
b.Fatal(err)
137138
}
138139

pcsm/pcsm.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,7 @@ func (ml *PCSM) Resume(ctx context.Context, options ResumeOptions) error {
555555
ml.lock.Lock()
556556
defer ml.lock.Unlock()
557557

558-
if ml.state != StatePaused && !(ml.state == StateFailed && options.ResumeFromFailure) {
558+
if ml.state != StatePaused && (ml.state != StateFailed || !options.ResumeFromFailure) {
559559
return errors.New("cannot resume: not paused or not resuming from failure")
560560
}
561561

pcsm/repl.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,8 @@ func (r *Repl) watchChangeEvents(
409409
txn0 = change // process the new transaction
410410
}
411411

412-
if err := cur.Err(); err != nil || cur.ID() == 0 {
412+
err := cur.Err()
413+
if err != nil || cur.ID() == 0 {
413414
return errors.Wrap(err, "cursor")
414415
}
415416

@@ -428,7 +429,8 @@ func (r *Repl) watchChangeEvents(
428429
}
429430
}
430431

431-
if err := cur.Err(); err != nil || cur.ID() == 0 {
432+
err = cur.Err()
433+
if err != nil || cur.ID() == 0 {
432434
return errors.Wrap(err, "cursor")
433435
}
434436

poetry.lock

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)