Skip to content

Commit 30383cb

Browse files
committed
Merge branch 'main' into REP-5329-simplify-retryer
2 parents 948c454 + baea449 commit 30383cb

File tree

5 files changed

+115
-80
lines changed

5 files changed

+115
-80
lines changed

internal/reportutils/reportutils.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,5 +183,6 @@ func FindBestUnit[T num16Plus](count T) DataUnit {
183183
// FmtBytes is a convenience that combines BytesToUnit with FindBestUnit.
184184
// Use it to format a single count of bytes.
185185
func FmtBytes[T num16Plus](count T) string {
186-
return BytesToUnit(count, FindBestUnit(count))
186+
unit := FindBestUnit(count)
187+
return BytesToUnit(count, unit) + " " + string(unit)
187188
}

internal/verifier/check.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error {
146146
err = nil
147147
}
148148

149-
if err != nil {
149+
if err == nil {
150150
verifier.logger.Debug().
151151
Int("generation", generation).
152152
Msgf("Check finished.")
@@ -432,11 +432,6 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error {
432432
duration := verifier.workerSleepDelayMillis * time.Millisecond
433433

434434
if duration > 0 {
435-
verifier.logger.Debug().
436-
Int("workerNum", workerNum).
437-
Stringer("duration", duration).
438-
Msg("No tasks found. Sleeping.")
439-
440435
time.Sleep(duration)
441436
}
442437

@@ -453,6 +448,8 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error {
453448
switch task.Type {
454449
case verificationTaskVerifyCollection:
455450
err := verifier.ProcessCollectionVerificationTask(ctx, workerNum, task)
451+
verifier.workerTracker.Unset(workerNum)
452+
456453
if err != nil {
457454
return err
458455
}
@@ -464,6 +461,8 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error {
464461
}
465462
case verificationTaskVerifyDocuments:
466463
err := verifier.ProcessVerifyTask(ctx, workerNum, task)
464+
verifier.workerTracker.Unset(workerNum)
465+
467466
if err != nil {
468467
return err
469468
}

internal/verifier/compare.go

Lines changed: 78 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ func (verifier *Verifier) compareDocsFromChannels(
6464
error,
6565
) {
6666
results := []VerificationResult{}
67-
var docCount types.DocumentCount
68-
var byteCount types.ByteCount
67+
var srcDocCount types.DocumentCount
68+
var srcByteCount types.ByteCount
6969

7070
mapKeyFieldNames := make([]string, 1+len(task.QueryFilter.ShardKeys))
7171
mapKeyFieldNames[0] = "_id"
@@ -142,67 +142,95 @@ func (verifier *Verifier) compareDocsFromChannels(
142142
}
143143
}()
144144

145-
// We always read src & dst back & forth. This ensures that, if one side
145+
// We always read src & dst together. This ensures that, if one side
146146
// lags the other significantly, we won’t keep caching the faster side’s
147147
// documents and thus consume more & more memory.
148148
for !srcClosed || !dstClosed {
149+
simpleTimerReset(readTimer, readTimeout)
150+
151+
var srcDoc, dstDoc bson.Raw
152+
153+
eg, egCtx := errgroup.WithContext(ctx)
154+
149155
if !srcClosed {
150-
simpleTimerReset(readTimer, readTimeout)
151-
152-
select {
153-
case <-ctx.Done():
154-
return nil, 0, 0, ctx.Err()
155-
case <-readTimer.C:
156-
return nil, 0, 0, errors.Errorf(
157-
"failed to read from source after %s",
158-
readTimeout,
159-
)
160-
case doc, alive := <-srcChannel:
161-
if !alive {
162-
srcClosed = true
163-
break
156+
eg.Go(func() error {
157+
var alive bool
158+
select {
159+
case <-egCtx.Done():
160+
return egCtx.Err()
161+
case <-readTimer.C:
162+
return errors.Errorf(
163+
"failed to read from source after %s",
164+
readTimeout,
165+
)
166+
case srcDoc, alive = <-srcChannel:
167+
if !alive {
168+
srcClosed = true
169+
break
170+
}
171+
172+
srcDocCount++
173+
srcByteCount += types.ByteCount(len(srcDoc))
164174
}
165-
docCount++
166-
byteCount += types.ByteCount(len(doc))
167175

168-
err := handleNewDoc(doc, true)
176+
return nil
177+
})
178+
}
169179

170-
if err != nil {
171-
return nil, 0, 0, errors.Wrapf(
172-
err,
173-
"comparer thread failed to handle source doc with ID %v",
174-
doc.Lookup("_id"),
180+
if !dstClosed {
181+
eg.Go(func() error {
182+
var alive bool
183+
select {
184+
case <-egCtx.Done():
185+
return egCtx.Err()
186+
case <-readTimer.C:
187+
return errors.Errorf(
188+
"failed to read from destination after %s",
189+
readTimeout,
175190
)
191+
case dstDoc, alive = <-dstChannel:
192+
if !alive {
193+
dstClosed = true
194+
break
195+
}
176196
}
177-
}
197+
198+
return nil
199+
})
178200
}
179201

180-
if !dstClosed {
181-
simpleTimerReset(readTimer, readTimeout)
182-
183-
select {
184-
case <-ctx.Done():
185-
return nil, 0, 0, ctx.Err()
186-
case <-readTimer.C:
187-
return nil, 0, 0, errors.Errorf(
188-
"failed to read from destination after %s",
189-
readTimeout,
202+
if err := eg.Wait(); err != nil {
203+
return nil, 0, 0, errors.Wrap(
204+
err,
205+
"failed to read documents",
206+
)
207+
}
208+
209+
if srcDoc != nil {
210+
err := handleNewDoc(srcDoc, true)
211+
212+
if err != nil {
213+
return nil, 0, 0, errors.Wrapf(
214+
err,
215+
"comparer thread failed to handle %#q's source doc (task: %s) with ID %v",
216+
namespace,
217+
task.PrimaryKey,
218+
srcDoc.Lookup("_id"),
190219
)
191-
case doc, alive := <-dstChannel:
192-
if !alive {
193-
dstClosed = true
194-
break
195-
}
220+
}
221+
}
196222

197-
err := handleNewDoc(doc, false)
223+
if dstDoc != nil {
224+
err := handleNewDoc(dstDoc, false)
198225

199-
if err != nil {
200-
return nil, 0, 0, errors.Wrapf(
201-
err,
202-
"comparer thread failed to handle destination doc with ID %v",
203-
doc.Lookup("_id"),
204-
)
205-
}
226+
if err != nil {
227+
return nil, 0, 0, errors.Wrapf(
228+
err,
229+
"comparer thread failed to handle %#q's destination doc (task: %s) with ID %v",
230+
namespace,
231+
task.PrimaryKey,
232+
dstDoc.Lookup("_id"),
233+
)
206234
}
207235
}
208236
}
@@ -243,7 +271,7 @@ func (verifier *Verifier) compareDocsFromChannels(
243271
)
244272
}
245273

246-
return results, docCount, byteCount, nil
274+
return results, srcDocCount, srcByteCount, nil
247275
}
248276

249277
func simpleTimerReset(t *time.Timer, dur time.Duration) {

internal/verifier/migration_verifier.go

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -503,11 +503,16 @@ func (verifier *Verifier) getDocumentsCursor(ctx context.Context, collection *mo
503503
}
504504
}
505505
findCmd := append(bson.D{{"find", collection.Name()}}, findOptions...)
506-
verifier.logger.Debug().
507-
Interface("task", task.PrimaryKey).
508-
Str("findCmd", fmt.Sprintf("%s", findCmd)).
509-
Str("options", fmt.Sprintf("%v", *runCommandOptions)).
510-
Msg("getDocuments findCmd.")
506+
507+
// Suppress this log for recheck tasks because the list of IDs can be
508+
// quite long.
509+
if len(task.Ids) == 0 {
510+
verifier.logger.Debug().
511+
Interface("task", task.PrimaryKey).
512+
Str("findCmd", fmt.Sprintf("%s", findCmd)).
513+
Str("options", fmt.Sprintf("%v", *runCommandOptions)).
514+
Msg("getDocuments findCmd.")
515+
}
511516

512517
return collection.Database().RunCommandCursor(ctx, findCmd, runCommandOptions)
513518
}
@@ -595,16 +600,6 @@ func (verifier *Verifier) ProcessVerifyTask(ctx context.Context, workerNum int,
595600
Interface("task", task.PrimaryKey).
596601
Msg("Processing document comparison task.")
597602

598-
defer func() {
599-
elapsed := time.Since(start)
600-
601-
verifier.logger.Debug().
602-
Int("workerNum", workerNum).
603-
Interface("task", task.PrimaryKey).
604-
Stringer("timeElapsed", elapsed).
605-
Msg("Finished document comparison task.")
606-
}()
607-
608603
problems, docsCount, bytesCount, err := verifier.FetchAndCompareDocuments(
609604
ctx,
610605
task,
@@ -681,12 +676,24 @@ func (verifier *Verifier) ProcessVerifyTask(ctx context.Context, workerNum int,
681676
}
682677
}
683678

684-
return errors.Wrapf(
685-
verifier.UpdateVerificationTask(ctx, task),
686-
"failed to persist task %s's new status (%#q)",
687-
task.PrimaryKey,
688-
task.Status,
689-
)
679+
err = verifier.UpdateVerificationTask(ctx, task)
680+
681+
if err != nil {
682+
return errors.Wrapf(
683+
err,
684+
"failed to persist task %s's new status (%#q)",
685+
task.PrimaryKey,
686+
task.Status,
687+
)
688+
}
689+
690+
verifier.logger.Debug().
691+
Int("workerNum", workerNum).
692+
Interface("task", task.PrimaryKey).
693+
Stringer("timeElapsed", time.Since(start)).
694+
Msg("Finished document comparison task.")
695+
696+
return nil
690697
}
691698

692699
func (verifier *Verifier) logChunkInfo(ctx context.Context, namespaceAndUUID *uuidutil.NamespaceAndUUID) {

internal/verifier/migration_verifier_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,7 @@ func TestVerifierCompareDocs(t *testing.T) {
574574
{{"_id", id}, {"sharded", 123}},
575575
},
576576
compareFn: func(t *testing.T, mismatchedIds []VerificationResult) {
577-
assert.Empty(t, mismatchedIds)
577+
assert.Empty(t, mismatchedIds, "should be no problems")
578578
},
579579
},
580580
}
@@ -1491,7 +1491,7 @@ func (suite *IntegrationTestSuite) TestVerifierWithFilter() {
14911491
status := waitForTasks()
14921492
suite.Require().Greater(status.CompletedTasks, 1)
14931493
suite.Require().Greater(status.TotalTasks, 1)
1494-
suite.Require().Equal(status.FailedTasks, 0)
1494+
suite.Require().Zero(status.FailedTasks, "there should be no failed tasks")
14951495

14961496
// Insert another document that is not in the filter.
14971497
// This should trigger a recheck despite being outside the filter.

0 commit comments

Comments
 (0)