Skip to content

Commit 19975b9

Browse files
Added retry logic for bulk writes
1 parent 0a86628 commit 19975b9

File tree

1 file changed

+42
-15
lines changed

1 file changed

+42
-15
lines changed

internal/cloner/copy.go

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ func (cm *CopyManager) runLinearScan(ctx context.Context, sourceColl, targetColl
390390
// It avoids memory sorts and "mixed type" index failures entirely.
391391
readOpts := options.Find().
392392
SetBatchSize(int32(config.Cfg.Cloner.ReadBatchSize))
393-
// NOTE: NO SORT HERE. This is the key fix for Linear Mode.
393+
// NOTE: NO SORT HERE. This is the fix for Linear Mode.
394394

395395
cursor, err := sourceColl.Find(errCtx, bson.D{}, readOpts)
396396
if err != nil {
@@ -635,7 +635,7 @@ func (cm *CopyManager) readWorker(
635635
}
636636
}
637637

638-
// insertWorker runs BulkWrite with ReplaceOneModels
638+
// insertWorker runs BulkWrite with ReplaceOneModels and Retry Logic
639639
func (cm *CopyManager) insertWorker(
640640
ctx context.Context,
641641
cancel context.CancelFunc,
@@ -656,23 +656,50 @@ func (cm *CopyManager) insertWorker(
656656
if len(batch.models) == 0 {
657657
continue
658658
}
659-
if ctx.Err() != nil {
660-
return
661-
}
662-
663-
start := time.Now()
664-
res, err := coll.BulkWrite(ctx, batch.models, bulkWriteOpts)
665659

660+
var res *mongo.BulkWriteResult
661+
var err error
666662
var docCount int64
667-
if res != nil {
668-
docCount = res.InsertedCount + res.UpsertedCount + res.MatchedCount
663+
664+
// --- RETRY LOOP ---
665+
maxRetries := 5
666+
for i := 1; i <= maxRetries; i++ {
667+
if ctx.Err() != nil {
668+
return
669+
}
670+
671+
start := time.Now()
672+
res, err = coll.BulkWrite(ctx, batch.models, bulkWriteOpts)
673+
674+
// Capture count even on partial success/failure
675+
docCount = 0
676+
if res != nil {
677+
docCount = res.InsertedCount + res.UpsertedCount + res.MatchedCount
678+
}
679+
680+
// Log the attempt duration
681+
logging.LogFullLoadBatchOp(start, ns, docCount, batch.size, err)
682+
683+
if err == nil {
684+
break // Success!
685+
}
686+
687+
// If it's a retryable error, wait and try again
688+
if shouldRetry(err) {
689+
logging.PrintWarning(fmt.Sprintf("[%s] BulkWrite error (attempt %d/%d): %v. Retrying...", ns, i, maxRetries, err), 0)
690+
time.Sleep(time.Second * time.Duration(i)) // Exponential-ish backoff
691+
continue
692+
}
693+
694+
// If it's not retryable (e.g. auth error, bad query), break immediately
695+
break
669696
}
670-
logging.LogFullLoadBatchOp(start, ns, docCount, batch.size, err)
671697

672698
if err != nil {
673699
if ctx.Err() != nil {
674700
return
675701
}
702+
// Fatal error after retries
676703
logging.PrintError(fmt.Sprintf("[%s] BulkWrite failed: %v", ns, err), 4)
677704
cancel()
678705
return
@@ -684,14 +711,14 @@ func (cm *CopyManager) insertWorker(
684711
statusMgr.AddClonedBytes(batch.size)
685712
}
686713

687-
elapsed := time.Since(start)
688-
714+
// Success Log (Debug level shows range)
715+
elapsed := time.Duration(0) // Logic for elapsed only makes sense per-attempt, but we can log final success msg
689716
if config.Cfg.Logging.Level == "debug" {
690717
logging.PrintStep(fmt.Sprintf("[%s] Processed batch: %d inserted, %d replaced. (%s) in %s. Range: [%v - %v]",
691718
ns, res.InsertedCount+res.UpsertedCount, res.ModifiedCount, humanize.Bytes(uint64(batch.size)), elapsed, batch.first, batch.last), 4)
692719
} else {
693-
logging.PrintStep(fmt.Sprintf("[%s] Processed batch: %d inserted, %d replaced. (%s) in %s",
694-
ns, res.InsertedCount+res.UpsertedCount, res.ModifiedCount, humanize.Bytes(uint64(batch.size)), elapsed), 4)
720+
logging.PrintStep(fmt.Sprintf("[%s] Processed batch: %d inserted, %d replaced. (%s)",
721+
ns, res.InsertedCount+res.UpsertedCount, res.ModifiedCount, humanize.Bytes(uint64(batch.size))), 4)
695722
}
696723
}
697724
}

0 commit comments

Comments
 (0)