Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 40 additions & 13 deletions internal/verifier/compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package verifier
import (
"bytes"
"context"
"time"

"github.com/10gen/migration-verifier/internal/types"
"github.com/pkg/errors"
Expand All @@ -12,6 +13,8 @@ import (
"golang.org/x/sync/errgroup"
)

const readTimeout = 10 * time.Minute

func (verifier *Verifier) FetchAndCompareDocuments(
givenCtx context.Context,
task *VerificationTask,
Expand All @@ -25,9 +28,9 @@ func (verifier *Verifier) FetchAndCompareDocuments(
// another to read from the destination, and a third one to receive the
// docs from the other 2 threads and compare them. It’s done this way,
// rather than fetch-everything-then-compare, to minimize memory usage.
errGroup, ctx := errgroup.WithContext(givenCtx)
errGroup, groupCtx := errgroup.WithContext(givenCtx)

srcChannel, dstChannel := verifier.getFetcherChannels(ctx, errGroup, task)
srcChannel, dstChannel := verifier.getFetcherChannels(groupCtx, errGroup, task)

results := []VerificationResult{}
var docCount types.DocumentCount
Expand All @@ -36,7 +39,7 @@ func (verifier *Verifier) FetchAndCompareDocuments(
errGroup.Go(func() error {
var err error
results, docCount, byteCount, err = verifier.compareDocsFromChannels(
ctx,
groupCtx,
task,
srcChannel,
dstChannel,
Expand Down Expand Up @@ -131,16 +134,29 @@ func (verifier *Verifier) compareDocsFromChannels(
}

var srcClosed, dstClosed bool
var err error

readTimer := time.NewTimer(0)
defer func() {
if !readTimer.Stop() {
<-readTimer.C
}
}()

// We always read src & dst back & forth. This ensures that, if one side
// lags the other significantly, we won’t keep caching the faster side’s
// documents and thus consume more & more memory.
for err == nil && (!srcClosed || !dstClosed) {
for !srcClosed || !dstClosed {
if !srcClosed {
simpleTimerReset(readTimer, readTimeout)

select {
case <-ctx.Done():
return nil, 0, 0, ctx.Err()
case <-readTimer.C:
return nil, 0, 0, errors.Errorf(
"failed to read from source after %s",
readTimeout,
)
case doc, alive := <-srcChannel:
if !alive {
srcClosed = true
Expand All @@ -149,10 +165,10 @@ func (verifier *Verifier) compareDocsFromChannels(
docCount++
byteCount += types.ByteCount(len(doc))

err = handleNewDoc(doc, true)
err := handleNewDoc(doc, true)

if err != nil {
err = errors.Wrapf(
return nil, 0, 0, errors.Wrapf(
err,
"comparer thread failed to handle source doc with ID %v",
doc.Lookup("_id"),
Expand All @@ -162,19 +178,26 @@ func (verifier *Verifier) compareDocsFromChannels(
}

if !dstClosed {
simpleTimerReset(readTimer, readTimeout)

select {
case <-ctx.Done():
return nil, 0, 0, ctx.Err()
case <-readTimer.C:
return nil, 0, 0, errors.Errorf(
"failed to read from destination after %s",
readTimeout,
)
case doc, alive := <-dstChannel:
if !alive {
dstClosed = true
break
}

err = handleNewDoc(doc, false)
err := handleNewDoc(doc, false)

if err != nil {
err = errors.Wrapf(
return nil, 0, 0, errors.Wrapf(
err,
"comparer thread failed to handle destination doc with ID %v",
doc.Lookup("_id"),
Expand All @@ -184,10 +207,6 @@ func (verifier *Verifier) compareDocsFromChannels(
}
}

if err != nil {
return nil, 0, 0, errors.Wrap(err, "comparer thread failed")
}

// We got here because both srcChannel and dstChannel are closed,
// which means we have processed all documents with the same mapKey
// between source & destination.
Expand Down Expand Up @@ -227,6 +246,14 @@ func (verifier *Verifier) compareDocsFromChannels(
return results, docCount, byteCount, nil
}

func simpleTimerReset(t *time.Timer, dur time.Duration) {
if !t.Stop() {
<-t.C
}

t.Reset(dur)
}

func (verifier *Verifier) getFetcherChannels(
ctx context.Context,
errGroup *errgroup.Group,
Expand Down
Loading