Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,18 @@ linters:
- nolintlint
- varnamelen
- wsl
- wsl_v5
- funcorder
settings:
govet:
disable:
- composites
misspell:
locale: US
revive:
rules:
- name: var-naming
disabled: true
exclusions:
generated: lax
presets:
Expand Down
5 changes: 3 additions & 2 deletions config/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func UseTargetClientCompressors() []string {
allowCompressors := []string{"zstd", "zlib", "snappy"}

rv := make([]string, 0, min(len(s), len(allowCompressors)))
for _, a := range strings.Split(s, ",") {
for a := range strings.SplitSeq(s, ",") {
a = strings.TrimSpace(a)
if slices.Contains(allowCompressors, a) && !slices.Contains(rv, a) {
rv = append(rv, a)
Expand All @@ -90,7 +90,8 @@ func UseTargetClientCompressors() []string {
// DefaultMongoDBCliOperationTimeout is used.
func OperationMongoDBCliTimeout() time.Duration {
if v := strings.TrimSpace(os.Getenv("PCSM_MONGODB_CLI_OPERATION_TIMEOUT")); v != "" {
if d, err := time.ParseDuration(v); err == nil && d > 0 {
d, err := time.ParseDuration(v)
if err == nil && d > 0 {
return d
}
}
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,8 @@ func (s *server) handleStatus(w http.ResponseWriter, r *http.Request) {
State: status.State,
}

if err := status.Error; err != nil {
err := status.Error
if err != nil {
res.Err = err.Error()
}

Expand Down
2 changes: 1 addition & 1 deletion metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func AddCopyReadDocumentCount(v int) {
copyReadDocumentTotal.Add(float64(v))
}

// AddCopyReadDocumentCount increments the total count of the inserted documents.
// AddCopyInsertDocumentCount increments the total count of the inserted documents.
func AddCopyInsertDocumentCount(v int) {
copyInsertDocumentTotal.Add(float64(v))
}
Expand Down
204 changes: 193 additions & 11 deletions pcsm/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/percona/percona-clustersync-mongodb/errors"
"github.com/percona/percona-clustersync-mongodb/log"
"github.com/percona/percona-clustersync-mongodb/topo"
)

Expand Down Expand Up @@ -64,20 +65,94 @@ func (o *clientBulkWrite) Empty() bool {
}

func (o *clientBulkWrite) Do(ctx context.Context, m *mongo.Client) (int, error) {
err := topo.RunWithRetry(ctx, func(ctx context.Context) error {
_, err := m.BulkWrite(ctx, o.writes, clientBulkOptions)
totalSize := len(o.writes)

return errors.Wrap(err, "bulk write")
}, topo.DefaultRetryInterval, topo.DefaultMaxRetries)
err := o.doWithRetry(ctx, m, o.writes)
if err != nil {
return 0, err // nolint:wrapcheck
}

size := len(o.writes)
clear(o.writes)
o.writes = o.writes[:0]

return size, nil
return totalSize, nil
}

// doWithRetry executes bulk write operations with retry logic for duplicate key errors.
// In ordered mode, when an error occurs at index N, operations 0..N-1 are applied,
// operation N fails, and N+1..end are never executed. This function handles operation N
// and retries the remaining operations recursively.
func (o *clientBulkWrite) doWithRetry(
ctx context.Context,
m *mongo.Client,
bulkWrites []mongo.ClientBulkWrite,
) error {
if len(bulkWrites) == 0 {
return nil
}

var bulkErr error

err := topo.RunWithRetry(ctx, func(ctx context.Context) error {
_, err := m.BulkWrite(ctx, bulkWrites, clientBulkOptions)
bulkErr = err

return errors.Wrap(err, "bulk write")
}, topo.DefaultRetryInterval, topo.DefaultMaxRetries)
if err == nil {
return nil
}

// Try to handle duplicate key error with fallback
idx, replacement := o.extractDuplicateKeyReplacement(bulkErr, bulkWrites)
if replacement == nil {
return err //nolint:wrapcheck
}

write := bulkWrites[idx]
coll := m.Database(write.Database).Collection(write.Collection)

err = handleDuplicateKeyError(ctx, coll, replacement)
if err != nil {
return err
}

// Retry remaining operations (from index+1 onwards)
// These operations were never executed due to ordered semantics
return o.doWithRetry(ctx, m, bulkWrites[idx+1:])
}

// extractDuplicateKeyReplacement checks if the error is a duplicate key error on a ReplaceOne
// operation and returns the index and replacement document. Returns -1, nil if not applicable.
func (o *clientBulkWrite) extractDuplicateKeyReplacement(
bulkErr error,
writes []mongo.ClientBulkWrite,
) (int, any) {
var bwe mongo.ClientBulkWriteException
if !errors.As(bulkErr, &bwe) || len(bwe.WriteErrors) == 0 {
return -1, nil
}

// Find the minimum index in the WriteErrors map
// (in ordered mode, there should only be one error)
minIdx := -1
for idx := range bwe.WriteErrors {
if minIdx == -1 || idx < minIdx {
minIdx = idx
}
}

firstErr := bwe.WriteErrors[minIdx]
if !mongo.IsDuplicateKeyError(firstErr) || minIdx < 0 || minIdx >= len(writes) {
return -1, nil
}

replaceModel, ok := writes[minIdx].Model.(*mongo.ClientReplaceOneModel)
if !ok {
return -1, nil
}

return minIdx, replaceModel.Replacement
}

func (o *clientBulkWrite) Insert(ns Namespace, event *InsertEvent) {
Expand Down Expand Up @@ -194,11 +269,7 @@ func (o *collectionBulkWrite) Do(ctx context.Context, m *mongo.Client) (int, err
grp.Go(func() error {
mcoll := m.Database(namespace.Database).Collection(namespace.Collection)

err := topo.RunWithRetry(ctx, func(_ context.Context) error {
_, err := mcoll.BulkWrite(grpCtx, ops, collectionBulkOptions)

return errors.Wrapf(err, "bulk write %q", namespace)
}, topo.DefaultRetryInterval, topo.DefaultMaxRetries)
err := o.doWithRetry(grpCtx, mcoll, namespace, ops)
if err != nil {
return err // nolint:wrapcheck
}
Expand All @@ -220,6 +291,72 @@ func (o *collectionBulkWrite) Do(ctx context.Context, m *mongo.Client) (int, err
return int(total.Load()), nil
}

// doWithRetry executes bulk write operations for a single namespace with retry logic for duplicate key errors.
// In ordered mode, when an error occurs at index N, operations 0..N-1 are applied,
// operation N fails, and N+1..end are never executed. This function handles operation N
// and retries the remaining operations recursively.
func (o *collectionBulkWrite) doWithRetry(
ctx context.Context,
coll *mongo.Collection,
namespace Namespace,
bulkWrites []mongo.WriteModel,
) error {
if len(bulkWrites) == 0 {
return nil
}

var bulkErr error

err := topo.RunWithRetry(ctx, func(_ context.Context) error {
_, err := coll.BulkWrite(ctx, bulkWrites, collectionBulkOptions)
bulkErr = err

return errors.Wrapf(err, "bulk write %q", namespace)
}, topo.DefaultRetryInterval, topo.DefaultMaxRetries)
if err == nil {
return nil
}

// Try to handle duplicate key error with fallback
idx, replacement := o.extractDuplicateKeyReplacement(bulkErr, bulkWrites)
if replacement == nil {
return err //nolint:wrapcheck
}

err = handleDuplicateKeyError(ctx, coll, replacement)
if err != nil {
return err
}

// Retry remaining operations (from index+1 onwards)
// These operations were never executed due to ordered semantics
return o.doWithRetry(ctx, coll, namespace, bulkWrites[idx+1:])
}

// extractDuplicateKeyReplacement checks if the error is a duplicate key error on a ReplaceOne
// operation and returns the index and replacement document. Returns -1, nil if not applicable.
func (o *collectionBulkWrite) extractDuplicateKeyReplacement(
bulkErr error,
ops []mongo.WriteModel,
) (int, any) {
var bwe mongo.BulkWriteException
if !errors.As(bulkErr, &bwe) || len(bwe.WriteErrors) == 0 {
return -1, nil
}

firstErr := bwe.WriteErrors[0]
if !mongo.IsDuplicateKeyError(firstErr) || firstErr.Index < 0 || firstErr.Index >= len(ops) {
return -1, nil
}

replaceModel, ok := ops[firstErr.Index].(*mongo.ReplaceOneModel)
if !ok {
return -1, nil
}

return firstErr.Index, replaceModel.Replacement
}

func (o *collectionBulkWrite) Insert(ns Namespace, event *InsertEvent) {
missingShardKeys := bson.D{}

Expand Down Expand Up @@ -296,6 +433,51 @@ func (o *collectionBulkWrite) Delete(ns Namespace, event *DeleteEvent) {
o.count++
}

// handleDuplicateKeyError handles a duplicate key error on ReplaceOne by performing delete+insert.
func handleDuplicateKeyError(ctx context.Context, coll *mongo.Collection, replacement any) error {
// Extract _id from the replacement document
var doc bson.D

data, err := bson.Marshal(replacement)
if err != nil {
return errors.Wrap(err, "marshal replacement document")
}

err = bson.Unmarshal(data, &doc)
if err != nil {
return errors.Wrap(err, "unmarshal replacement document")
}

// Find _id in document
var docID any
for _, elem := range doc {
if elem.Key == "_id" {
docID = elem.Value

break
}
}

if docID == nil {
return errors.New("no _id found in replacement document")
}

log.Ctx(ctx).With(log.NS(coll.Database().Name(), coll.Name())).
Infof("Retrying with delete+insert fallback for _id: %v", docID)

_, err = coll.DeleteOne(ctx, bson.D{{"_id", docID}})
if err != nil && !errors.Is(err, mongo.ErrNoDocuments) {
return errors.Wrap(err, "delete before insert")
}

_, err = coll.InsertOne(ctx, replacement)
if err != nil {
return errors.Wrap(err, "insert after delete")
}

return nil
}

func collectUpdateOps(event *UpdateEvent) any {
for _, trunc := range event.UpdateDescription.TruncatedArrays {
for _, update := range event.UpdateDescription.UpdatedFields {
Expand Down
5 changes: 1 addition & 4 deletions pcsm/clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,10 +760,7 @@ func (c *Clone) createIndexes(ctx context.Context, ns Namespace) error {
return nil
}

builtIndexesCap := len(indexes) - len(unfinishedBuilds)
if builtIndexesCap < 0 {
builtIndexesCap = 0
}
builtIndexesCap := max(len(indexes)-len(unfinishedBuilds), 0)

builtIndexes := make([]*topo.IndexSpecification, 0, builtIndexesCap)

Expand Down
3 changes: 2 additions & 1 deletion pcsm/copy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ func BenchmarkRead(b *testing.B) {
}
}

if err = cur.Err(); err != nil {
err = cur.Err()
if err != nil {
b.Fatal(err)
}

Expand Down
2 changes: 1 addition & 1 deletion pcsm/pcsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ func (ml *PCSM) Resume(ctx context.Context, options ResumeOptions) error {
ml.lock.Lock()
defer ml.lock.Unlock()

if ml.state != StatePaused && !(ml.state == StateFailed && options.ResumeFromFailure) {
if ml.state != StatePaused && (ml.state != StateFailed || !options.ResumeFromFailure) {
return errors.New("cannot resume: not paused or not resuming from failure")
}

Expand Down
6 changes: 4 additions & 2 deletions pcsm/repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,8 @@ func (r *Repl) watchChangeEvents(
txn0 = change // process the new transaction
}

if err := cur.Err(); err != nil || cur.ID() == 0 {
err := cur.Err()
if err != nil || cur.ID() == 0 {
return errors.Wrap(err, "cursor")
}

Expand All @@ -428,7 +429,8 @@ func (r *Repl) watchChangeEvents(
}
}

if err := cur.Err(); err != nil || cur.ID() == 0 {
err = cur.Err()
if err != nil || cur.ID() == 0 {
return errors.Wrap(err, "cursor")
}

Expand Down
10 changes: 5 additions & 5 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading