Skip to content
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
351c94c
wip
tdq45gj Nov 22, 2024
0f645f1
Merge branch 'main' into REP-5317-add-dst-change-stream
tdq45gj Nov 22, 2024
53404bc
refactor
tdq45gj Nov 22, 2024
24d3881
Update change_stream_test.go
tdq45gj Nov 22, 2024
d66f25e
Update change_stream_test.go
tdq45gj Nov 22, 2024
9a081cc
Merge branch 'main' into REP-5317-add-dst-change-stream
tdq45gj Nov 25, 2024
733220c
Update migration_verifier.go
tdq45gj Nov 25, 2024
2ae4abc
Merge branch 'main' into REP-5317-add-dst-change-stream
tdq45gj Nov 25, 2024
39af4e7
fix
tdq45gj Nov 25, 2024
4ff9863
Update integration_test_suite.go
tdq45gj Nov 25, 2024
16224b1
fix tests
tdq45gj Nov 25, 2024
2d95686
Merge branch 'main' into REP-5317-add-dst-change-stream
tdq45gj Nov 25, 2024
a9cb0c1
Update change_stream.go
tdq45gj Nov 25, 2024
5bf64f6
Felipe's review
tdq45gj Nov 27, 2024
668e649
Merge branch 'main' into REP-5317-add-dst-change-stream
tdq45gj Nov 27, 2024
743dd6a
Update change_stream_test.go
tdq45gj Nov 27, 2024
cc34be4
initialize nsmap
tdq45gj Nov 27, 2024
4ba0eda
fix
tdq45gj Nov 27, 2024
b9d5f37
fix generational rechecking test
tdq45gj Nov 27, 2024
9eed454
Add end-to-end test
tdq45gj Nov 27, 2024
281afc0
wait for change event handler
tdq45gj Nov 27, 2024
c96a495
fix race condition in tests
tdq45gj Nov 27, 2024
e89d175
Update change_stream.go
tdq45gj Nov 27, 2024
f14962f
Update check.go
tdq45gj Nov 27, 2024
be93d05
wait for recheck docs creation
tdq45gj Nov 29, 2024
c1b7ead
fix
tdq45gj Nov 29, 2024
c119cb0
Update change_stream_test.go
tdq45gj Dec 3, 2024
e302d9d
Update change_stream.go
tdq45gj Dec 3, 2024
d15b020
refactors
tdq45gj Dec 3, 2024
ff0c7e9
Merge branch 'main' into REP-5317-add-dst-change-stream
tdq45gj Dec 3, 2024
e56dcbc
rename clusterType to whichCluster
tdq45gj Dec 3, 2024
c91eb0d
reorder controls in TestChangesOnDstBeforeSrc
tdq45gj Dec 3, 2024
faa2e24
Update migration_verifier_test.go
tdq45gj Dec 3, 2024
c71e9ef
Update change_stream_test.go
tdq45gj Dec 3, 2024
18a4cb3
reduce log level
tdq45gj Dec 3, 2024
3961b2d
Update change_stream_test.go
tdq45gj Dec 3, 2024
c2da8c4
change to private fields
tdq45gj Dec 3, 2024
e3e901c
Update migration_verifier_test.go
tdq45gj Dec 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
268 changes: 192 additions & 76 deletions internal/verifier/change_stream.go

Large diffs are not rendered by default.

134 changes: 99 additions & 35 deletions internal/verifier/change_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,33 @@ package verifier
import (
"context"
"strings"
"testing"
"time"

"github.com/10gen/migration-verifier/internal/testutil"
"github.com/10gen/migration-verifier/internal/util"
"github.com/10gen/migration-verifier/mslices"
"github.com/pkg/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readconcern"
"golang.org/x/sync/errgroup"
)

func TestChangeStreamFilter(t *testing.T) {
verifier := Verifier{}
verifier.SetMetaDBName("metadb")
assert.Contains(t,
verifier.GetChangeStreamFilter(),
func (suite *IntegrationTestSuite) TestChangeStreamFilter() {
verifier := suite.BuildVerifier()
suite.Assert().Contains(
verifier.srcChangeStreamReader.GetChangeStreamFilter(),
bson.D{
{"$match", bson.D{{"ns.db", bson.D{{"$ne", "metadb"}}}}},
{"$match", bson.D{{"ns.db", bson.D{{"$ne", metaDBName}}}}},
},
)
verifier.srcNamespaces = []string{"foo.bar", "foo.baz", "test.car", "test.chaz"}
assert.Contains(t,
verifier.GetChangeStreamFilter(),
verifier.srcChangeStreamReader.namespaces = []string{"foo.bar", "foo.baz", "test.car", "test.chaz"}
suite.Assert().Contains(
verifier.srcChangeStreamReader.GetChangeStreamFilter(),
bson.D{{"$match", bson.D{
{"$or", []bson.D{
{{"ns", bson.D{{"db", "foo"}, {"coll", "bar"}}}},
Expand All @@ -43,6 +41,12 @@ func TestChangeStreamFilter(t *testing.T) {
)
}

func (suite *IntegrationTestSuite) startSrcChangeStreamReaderAndHandler(ctx context.Context, verifier *Verifier) {
err := verifier.srcChangeStreamReader.StartChangeStream(ctx)
suite.Require().NoError(err)
verifier.StartChangeEventHandler(ctx, verifier.srcChangeStreamReader, &errgroup.Group{})
}

// TestChangeStreamResumability creates a verifier, starts its change stream,
// terminates that verifier, updates the source cluster, starts a new
// verifier with change stream, and confirms that things look as they should.
Expand All @@ -57,8 +61,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
verifier1 := suite.BuildVerifier()
ctx, cancel := context.WithCancel(suite.Context())
defer cancel()
err := verifier1.StartChangeStream(ctx)
suite.Require().NoError(err)
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier1)
}()

ctx, cancel := context.WithCancel(suite.Context())
Expand All @@ -82,13 +85,12 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {

newTime := suite.getClusterTime(ctx, suite.srcMongoClient)

err = verifier2.StartChangeStream(ctx)
suite.Require().NoError(err)
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier2)

suite.Require().NotNil(verifier2.srcStartAtTs)
suite.Require().NotNil(verifier2.srcChangeStreamReader.startAtTs)

suite.Assert().False(
verifier2.srcStartAtTs.After(newTime),
verifier2.srcChangeStreamReader.startAtTs.After(newTime),
"verifier2's change stream should be no later than this new session",
)

Expand Down Expand Up @@ -156,12 +158,11 @@ func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() {
suite.Require().NoError(err)
origStartTs := sess.OperationTime()
suite.Require().NotNil(origStartTs)
err = verifier.StartChangeStream(ctx)
suite.Require().NoError(err)
suite.Require().Equal(verifier.srcStartAtTs, origStartTs)
verifier.changeStreamWritesOffTsChan <- *origStartTs
<-verifier.changeStreamDoneChan
suite.Require().Equal(verifier.srcStartAtTs, origStartTs)
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier)
suite.Require().Equal(verifier.srcChangeStreamReader.startAtTs, origStartTs)
verifier.srcChangeStreamReader.ChangeStreamWritesOffTsChan <- *origStartTs
<-verifier.srcChangeStreamReader.ChangeStreamDoneChan
suite.Require().Equal(verifier.srcChangeStreamReader.startAtTs, origStartTs)
}

func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() {
Expand All @@ -176,13 +177,12 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() {

origSessionTime := sess.OperationTime()
suite.Require().NotNil(origSessionTime)
err = verifier.StartChangeStream(ctx)
suite.Require().NoError(err)
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier)

// srcStartAtTs derives from the change stream’s resume token, which can
// postdate our session time but should not precede it.
suite.Require().False(
verifier.srcStartAtTs.Before(*origSessionTime),
verifier.srcChangeStreamReader.startAtTs.Before(*origSessionTime),
"srcStartAtTs should be >= the insert’s optime",
)

Expand All @@ -206,12 +206,12 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() {
"session time after events should exceed the original",
)

verifier.changeStreamWritesOffTsChan <- *postEventsSessionTime
<-verifier.changeStreamDoneChan
verifier.srcChangeStreamReader.ChangeStreamWritesOffTsChan <- *postEventsSessionTime
<-verifier.srcChangeStreamReader.ChangeStreamDoneChan

suite.Assert().Equal(
*postEventsSessionTime,
*verifier.srcStartAtTs,
*verifier.srcChangeStreamReader.startAtTs,
"verifier.srcStartAtTs should now be our session timestamp",
)
}
Expand All @@ -227,10 +227,9 @@ func (suite *IntegrationTestSuite) TestNoStartAtTime() {
suite.Require().NoError(err)
origStartTs := sess.OperationTime()
suite.Require().NotNil(origStartTs)
err = verifier.StartChangeStream(ctx)
suite.Require().NoError(err)
suite.Require().NotNil(verifier.srcStartAtTs)
suite.Require().LessOrEqual(origStartTs.Compare(*verifier.srcStartAtTs), 0)
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier)
suite.Require().NotNil(verifier.srcChangeStreamReader.startAtTs)
suite.Require().LessOrEqual(origStartTs.Compare(*verifier.srcChangeStreamReader.startAtTs), 0)
}

func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
Expand All @@ -246,7 +245,7 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {

verifier := suite.BuildVerifier()

suite.Require().NoError(verifier.StartChangeStream(ctx))
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier)

_, err := coll1.InsertOne(ctx, bson.D{{"_id", 1}})
suite.Require().NoError(err)
Expand All @@ -267,7 +266,6 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
500*time.Millisecond,
"the verifier should flush a recheck doc after a batch",
)

}

func (suite *IntegrationTestSuite) TestCursorKilledResilience() {
Expand Down Expand Up @@ -387,6 +385,7 @@ func (suite *IntegrationTestSuite) testInsertsBeforeWritesOff(docsCount int) {
lo.ToAnySlice(docs),
)
suite.Require().NoError(err)
//fmt.Println(fmt.Sprintf("src cluster time %v", suite.getClusterTime(ctx, suite.srcMongoClient)))

suite.Require().NoError(verifier.WritesOff(ctx))

Expand Down Expand Up @@ -451,6 +450,71 @@ func (suite *IntegrationTestSuite) TestCreateForbidden() {
suite.Assert().Equal("create", eventErr.Event.OpType)
}

func (suite *IntegrationTestSuite) TestRecheckDocsWithDstChangeEvents() {
ctx := suite.Context()

srcDBName := suite.DBNameForTest("src")
dstDBName := suite.DBNameForTest("dst")

db := suite.dstMongoClient.Database(dstDBName)
coll1 := db.Collection("dstColl1")
coll2 := db.Collection("dstColl2")

for _, coll := range mslices.Of(coll1, coll2) {
suite.Require().NoError(db.CreateCollection(ctx, coll.Name()))
}

verifier := suite.BuildVerifier()
verifier.SetSrcNamespaces([]string{srcDBName + ".srcColl1", srcDBName + ".srcColl2"})
verifier.SetDstNamespaces([]string{dstDBName + ".dstColl1", dstDBName + ".dstColl2"})
verifier.SetNamespaceMap()

suite.Require().NoError(verifier.dstChangeStreamReader.StartChangeStream(ctx))
verifier.StartChangeEventHandler(ctx, verifier.dstChangeStreamReader, &errgroup.Group{})

_, err := coll1.InsertOne(ctx, bson.D{{"_id", 1}})
suite.Require().NoError(err)
_, err = coll1.InsertOne(ctx, bson.D{{"_id", 2}})
suite.Require().NoError(err)

_, err = coll2.InsertOne(ctx, bson.D{{"_id", 1}})
suite.Require().NoError(err)

var rechecks []RecheckDoc
require.Eventually(
suite.T(),
func() bool {
recheckColl := verifier.verificationDatabase().Collection(recheckQueue)
cursor, err := recheckColl.Find(ctx, bson.D{})
if errors.Is(err, mongo.ErrNoDocuments) {
return false
}

suite.Require().NoError(err)
suite.Require().NoError(cursor.All(ctx, &rechecks))
return len(rechecks) == 3
},
time.Minute,
500*time.Millisecond,
"the verifier should flush a recheck doc after a batch",
)

coll1RecheckCount, coll2RecheckCount := 0, 0
for _, recheck := range rechecks {
suite.Require().Equal(srcDBName, recheck.PrimaryKey.SrcDatabaseName)
switch recheck.PrimaryKey.SrcCollectionName {
case "srcColl1":
coll1RecheckCount++
case "srcColl2":
coll2RecheckCount++
default:
suite.T().Fatalf("unknown collection name: %v", recheck.PrimaryKey.SrcCollectionName)
}
}
suite.Require().Equal(2, coll1RecheckCount)
suite.Require().Equal(1, coll2RecheckCount)
}

func (suite *IntegrationTestSuite) TestLargeEvents() {
ctx := suite.Context()

Expand Down
49 changes: 29 additions & 20 deletions internal/verifier/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,17 @@ func (verifier *Verifier) Check(ctx context.Context, filter map[string]any) {
verifier.MaybeStartPeriodicHeapProfileCollection(ctx)
}

func (verifier *Verifier) waitForChangeStream(ctx context.Context) error {
func (verifier *Verifier) waitForChangeStream(ctx context.Context, csr *ChangeStreamReader) error {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-verifier.changeStreamErrChan:
case err := <-csr.ChangeStreamErrChan:
verifier.logger.Warn().Err(err).
Msg("Received error from change stream.")
Msgf("Received error from %s.", csr)
return err
case <-verifier.changeStreamDoneChan:
case <-csr.ChangeStreamDoneChan:
verifier.logger.Debug().
Msg("Received completion signal from change stream.")
Msgf("Received completion signal from %s.", csr)
break
}

Expand Down Expand Up @@ -82,8 +82,10 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error {
// If the change stream fails, everything should stop.
eg.Go(func() error {
select {
case err := <-verifier.changeStreamErrChan:
return errors.Wrap(err, "change stream failed")
case err := <-verifier.srcChangeStreamReader.ChangeStreamErrChan:
return errors.Wrapf(err, "%s failed", verifier.srcChangeStreamReader)
case err := <-verifier.dstChangeStreamReader.ChangeStreamErrChan:
return errors.Wrapf(err, "%s failed", verifier.dstChangeStreamReader)
case <-ctx.Done():
return nil
}
Expand Down Expand Up @@ -168,6 +170,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
}
verifier.running = true
verifier.globalFilter = filter
verifier.initializeChangeStreamReaders()
verifier.mux.Unlock()
defer func() {
verifier.mux.Lock()
Expand Down Expand Up @@ -204,17 +207,18 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
verifier.phase = Idle
}()

verifier.mux.RLock()
csRunning := verifier.changeStreamRunning
verifier.mux.RUnlock()
if csRunning {
verifier.logger.Debug().Msg("Check: Change stream already running.")
} else {
verifier.logger.Debug().Msg("Change stream not running; starting change stream")
ceHandlerGroup := &errgroup.Group{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not errgroup.WithContext()?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, why are the event handler goroutines in an errgroup while the event reader goroutines aren’t?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, errgroup.WithContext() seems better. The only reason for having handler goroutines in an errgroup is that we want to wait for them in the CheckDriver function. The change stream readers are currently waited through channels.

for _, csReader := range []*ChangeStreamReader{verifier.srcChangeStreamReader, verifier.dstChangeStreamReader} {
if csReader.changeStreamRunning {
verifier.logger.Debug().Msgf("Check: %s already running.", csReader)
} else {
verifier.logger.Debug().Msgf("%s not running; starting change stream", csReader)

err = verifier.StartChangeStream(ctx)
if err != nil {
return errors.Wrap(err, "failed to start change stream on source")
err = csReader.StartChangeStream(ctx)
if err != nil {
return errors.Wrapf(err, "failed to start %s", csReader)
}
verifier.StartChangeEventHandler(ctx, csReader, ceHandlerGroup)
}
}

Expand Down Expand Up @@ -279,13 +283,18 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
// caught again on the next iteration.
if verifier.writesOff {
verifier.logger.Debug().
Msg("Waiting for change stream to end.")
Msg("Waiting for change streams to end.")

// It's necessary to wait for the change stream to finish before incrementing the
// generation number, or the last changes will not be checked.
verifier.mux.Unlock()
err := verifier.waitForChangeStream(ctx)
if err != nil {
if err = verifier.waitForChangeStream(ctx, verifier.srcChangeStreamReader); err != nil {
return err
}
if err = verifier.waitForChangeStream(ctx, verifier.dstChangeStreamReader); err != nil {
return err
}
if err = ceHandlerGroup.Wait(); err != nil {
return err
}
verifier.mux.Lock()
Expand Down
4 changes: 2 additions & 2 deletions internal/verifier/compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (verifier *Verifier) getFetcherChannels(
ctx,
verifier.srcClientCollection(task),
verifier.srcBuildInfo,
verifier.srcStartAtTs,
verifier.srcChangeStreamReader.startAtTs,
task,
)

Expand All @@ -291,7 +291,7 @@ func (verifier *Verifier) getFetcherChannels(
ctx,
verifier.dstClientCollection(task),
verifier.dstBuildInfo,
nil, //startAtTs
verifier.dstChangeStreamReader.startAtTs,
task,
)

Expand Down
2 changes: 1 addition & 1 deletion internal/verifier/integration_test_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ func (suite *IntegrationTestSuite) TearDownTest() {

suite.contextCanceller(errors.Errorf("tearing down test %#q", suite.T().Name()))
suite.testContext, suite.contextCanceller = nil, nil

ctx := context.Background()
for _, client := range []*mongo.Client{suite.srcMongoClient, suite.dstMongoClient} {
dbNames, err := client.ListDatabaseNames(ctx, bson.D{})
Expand Down Expand Up @@ -185,6 +184,7 @@ func (suite *IntegrationTestSuite) BuildVerifier() *Verifier {
)
verifier.SetLogger("stderr")
verifier.SetMetaDBName(metaDBName)
verifier.initializeChangeStreamReaders()

suite.Require().NoError(verifier.srcClientCollection(&task).Drop(ctx))
suite.Require().NoError(verifier.dstClientCollection(&task).Drop(ctx))
Expand Down
Loading