Skip to content
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
351c94c
wip
tdq45gj Nov 22, 2024
0f645f1
Merge branch 'main' into REP-5317-add-dst-change-stream
tdq45gj Nov 22, 2024
53404bc
refactor
tdq45gj Nov 22, 2024
24d3881
Update change_stream_test.go
tdq45gj Nov 22, 2024
d66f25e
Update change_stream_test.go
tdq45gj Nov 22, 2024
9a081cc
Merge branch 'main' into REP-5317-add-dst-change-stream
tdq45gj Nov 25, 2024
733220c
Update migration_verifier.go
tdq45gj Nov 25, 2024
2ae4abc
Merge branch 'main' into REP-5317-add-dst-change-stream
tdq45gj Nov 25, 2024
39af4e7
fix
tdq45gj Nov 25, 2024
4ff9863
Update integration_test_suite.go
tdq45gj Nov 25, 2024
16224b1
fix tests
tdq45gj Nov 25, 2024
2d95686
Merge branch 'main' into REP-5317-add-dst-change-stream
tdq45gj Nov 25, 2024
a9cb0c1
Update change_stream.go
tdq45gj Nov 25, 2024
5bf64f6
Felipe's review
tdq45gj Nov 27, 2024
668e649
Merge branch 'main' into REP-5317-add-dst-change-stream
tdq45gj Nov 27, 2024
743dd6a
Update change_stream_test.go
tdq45gj Nov 27, 2024
cc34be4
initialize nsmap
tdq45gj Nov 27, 2024
4ba0eda
fix
tdq45gj Nov 27, 2024
b9d5f37
fix generational rechecking test
tdq45gj Nov 27, 2024
9eed454
Add end-to-end test
tdq45gj Nov 27, 2024
281afc0
wait for change event handler
tdq45gj Nov 27, 2024
c96a495
fix race condition in tests
tdq45gj Nov 27, 2024
e89d175
Update change_stream.go
tdq45gj Nov 27, 2024
f14962f
Update check.go
tdq45gj Nov 27, 2024
be93d05
wait for recheck docs creation
tdq45gj Nov 29, 2024
c1b7ead
fix
tdq45gj Nov 29, 2024
c119cb0
Update change_stream_test.go
tdq45gj Dec 3, 2024
e302d9d
Update change_stream.go
tdq45gj Dec 3, 2024
d15b020
refactors
tdq45gj Dec 3, 2024
ff0c7e9
Merge branch 'main' into REP-5317-add-dst-change-stream
tdq45gj Dec 3, 2024
e56dcbc
rename clusterType to whichCluster
tdq45gj Dec 3, 2024
c91eb0d
reorder controls in TestChangesOnDstBeforeSrc
tdq45gj Dec 3, 2024
faa2e24
Update migration_verifier_test.go
tdq45gj Dec 3, 2024
c71e9ef
Update change_stream_test.go
tdq45gj Dec 3, 2024
18a4cb3
reduce log level
tdq45gj Dec 3, 2024
3961b2d
Update change_stream_test.go
tdq45gj Dec 3, 2024
c2da8c4
change to private fields
tdq45gj Dec 3, 2024
e3e901c
Update migration_verifier_test.go
tdq45gj Dec 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
265 changes: 189 additions & 76 deletions internal/verifier/change_stream.go

Large diffs are not rendered by default.

136 changes: 101 additions & 35 deletions internal/verifier/change_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package verifier
import (
"context"
"strings"
"testing"
"time"

"github.com/10gen/migration-verifier/internal/testutil"
"github.com/10gen/migration-verifier/internal/util"
"github.com/10gen/migration-verifier/mslices"
"github.com/pkg/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
Expand All @@ -20,18 +18,17 @@ import (
"go.mongodb.org/mongo-driver/mongo/readconcern"
)

func TestChangeStreamFilter(t *testing.T) {
verifier := Verifier{}
verifier.SetMetaDBName("metadb")
assert.Contains(t,
verifier.GetChangeStreamFilter(),
func (suite *IntegrationTestSuite) TestChangeStreamFilter() {
verifier := suite.BuildVerifier()
suite.Assert().Contains(
verifier.srcChangeStreamReader.GetChangeStreamFilter(),
bson.D{
{"$match", bson.D{{"ns.db", bson.D{{"$ne", "metadb"}}}}},
{"$match", bson.D{{"ns.db", bson.D{{"$ne", metaDBName}}}}},
},
)
verifier.srcNamespaces = []string{"foo.bar", "foo.baz", "test.car", "test.chaz"}
assert.Contains(t,
verifier.GetChangeStreamFilter(),
verifier.srcChangeStreamReader.namespaces = []string{"foo.bar", "foo.baz", "test.car", "test.chaz"}
suite.Assert().Contains(
verifier.srcChangeStreamReader.GetChangeStreamFilter(),
bson.D{{"$match", bson.D{
{"$or", []bson.D{
{{"ns", bson.D{{"db", "foo"}, {"coll", "bar"}}}},
Expand All @@ -43,6 +40,14 @@ func TestChangeStreamFilter(t *testing.T) {
)
}

func (suite *IntegrationTestSuite) startSrcChangeStreamReaderAndHandler(ctx context.Context, verifier *Verifier) {
err := verifier.srcChangeStreamReader.StartChangeStream(ctx)
suite.Require().NoError(err)
go func() {
suite.Require().NoError(verifier.StartChangeEventHandler(ctx, verifier.srcChangeStreamReader))
}()
}

// TestChangeStreamResumability creates a verifier, starts its change stream,
// terminates that verifier, updates the source cluster, starts a new
// verifier with change stream, and confirms that things look as they should.
Expand All @@ -57,8 +62,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
verifier1 := suite.BuildVerifier()
ctx, cancel := context.WithCancel(suite.Context())
defer cancel()
err := verifier1.StartChangeStream(ctx)
suite.Require().NoError(err)
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier1)
}()

ctx, cancel := context.WithCancel(suite.Context())
Expand All @@ -82,13 +86,12 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {

newTime := suite.getClusterTime(ctx, suite.srcMongoClient)

err = verifier2.StartChangeStream(ctx)
suite.Require().NoError(err)
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier2)

suite.Require().NotNil(verifier2.srcStartAtTs)
suite.Require().NotNil(verifier2.srcChangeStreamReader.startAtTs)

suite.Assert().False(
verifier2.srcStartAtTs.After(newTime),
verifier2.srcChangeStreamReader.startAtTs.After(newTime),
"verifier2's change stream should be no later than this new session",
)

Expand Down Expand Up @@ -156,12 +159,11 @@ func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() {
suite.Require().NoError(err)
origStartTs := sess.OperationTime()
suite.Require().NotNil(origStartTs)
err = verifier.StartChangeStream(ctx)
suite.Require().NoError(err)
suite.Require().Equal(verifier.srcStartAtTs, origStartTs)
verifier.changeStreamWritesOffTsChan <- *origStartTs
<-verifier.changeStreamDoneChan
suite.Require().Equal(verifier.srcStartAtTs, origStartTs)
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier)
suite.Require().Equal(verifier.srcChangeStreamReader.startAtTs, origStartTs)
verifier.srcChangeStreamReader.WritesOffTsChan <- *origStartTs
<-verifier.srcChangeStreamReader.DoneChan
suite.Require().Equal(verifier.srcChangeStreamReader.startAtTs, origStartTs)
}

func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() {
Expand All @@ -176,13 +178,12 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() {

origSessionTime := sess.OperationTime()
suite.Require().NotNil(origSessionTime)
err = verifier.StartChangeStream(ctx)
suite.Require().NoError(err)
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier)

// srcStartAtTs derives from the change stream’s resume token, which can
// postdate our session time but should not precede it.
suite.Require().False(
verifier.srcStartAtTs.Before(*origSessionTime),
verifier.srcChangeStreamReader.startAtTs.Before(*origSessionTime),
"srcStartAtTs should be >= the insert’s optime",
)

Expand All @@ -206,12 +207,12 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() {
"session time after events should exceed the original",
)

verifier.changeStreamWritesOffTsChan <- *postEventsSessionTime
<-verifier.changeStreamDoneChan
verifier.srcChangeStreamReader.WritesOffTsChan <- *postEventsSessionTime
<-verifier.srcChangeStreamReader.DoneChan

suite.Assert().Equal(
*postEventsSessionTime,
*verifier.srcStartAtTs,
*verifier.srcChangeStreamReader.startAtTs,
"verifier.srcStartAtTs should now be our session timestamp",
)
}
Expand All @@ -227,10 +228,9 @@ func (suite *IntegrationTestSuite) TestNoStartAtTime() {
suite.Require().NoError(err)
origStartTs := sess.OperationTime()
suite.Require().NotNil(origStartTs)
err = verifier.StartChangeStream(ctx)
suite.Require().NoError(err)
suite.Require().NotNil(verifier.srcStartAtTs)
suite.Require().LessOrEqual(origStartTs.Compare(*verifier.srcStartAtTs), 0)
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier)
suite.Require().NotNil(verifier.srcChangeStreamReader.startAtTs)
suite.Require().LessOrEqual(origStartTs.Compare(*verifier.srcChangeStreamReader.startAtTs), 0)
}

func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
Expand All @@ -246,7 +246,7 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {

verifier := suite.BuildVerifier()

suite.Require().NoError(verifier.StartChangeStream(ctx))
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier)

_, err := coll1.InsertOne(ctx, bson.D{{"_id", 1}})
suite.Require().NoError(err)
Expand All @@ -267,7 +267,6 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
500*time.Millisecond,
"the verifier should flush a recheck doc after a batch",
)

}

func (suite *IntegrationTestSuite) TestCursorKilledResilience() {
Expand Down Expand Up @@ -451,6 +450,73 @@ func (suite *IntegrationTestSuite) TestCreateForbidden() {
suite.Assert().Equal("create", eventErr.Event.OpType)
}

func (suite *IntegrationTestSuite) TestRecheckDocsWithDstChangeEvents() {
ctx := suite.Context()

srcDBName := suite.DBNameForTest("src")
dstDBName := suite.DBNameForTest("dst")

db := suite.dstMongoClient.Database(dstDBName)
coll1 := db.Collection("dstColl1")
coll2 := db.Collection("dstColl2")

for _, coll := range mslices.Of(coll1, coll2) {
suite.Require().NoError(db.CreateCollection(ctx, coll.Name()))
}

verifier := suite.BuildVerifier()
verifier.SetSrcNamespaces([]string{srcDBName + ".srcColl1", srcDBName + ".srcColl2"})
verifier.SetDstNamespaces([]string{dstDBName + ".dstColl1", dstDBName + ".dstColl2"})
verifier.SetNamespaceMap()

suite.Require().NoError(verifier.dstChangeStreamReader.StartChangeStream(ctx))
go func() {
suite.Require().NoError(verifier.StartChangeEventHandler(ctx, verifier.dstChangeStreamReader))
}()

_, err := coll1.InsertOne(ctx, bson.D{{"_id", 1}})
suite.Require().NoError(err)
_, err = coll1.InsertOne(ctx, bson.D{{"_id", 2}})
suite.Require().NoError(err)

_, err = coll2.InsertOne(ctx, bson.D{{"_id", 1}})
suite.Require().NoError(err)

var rechecks []RecheckDoc
require.Eventually(
suite.T(),
func() bool {
recheckColl := verifier.verificationDatabase().Collection(recheckQueue)
cursor, err := recheckColl.Find(ctx, bson.D{})
if errors.Is(err, mongo.ErrNoDocuments) {
return false
}

suite.Require().NoError(err)
suite.Require().NoError(cursor.All(ctx, &rechecks))
return len(rechecks) == 3
},
time.Minute,
500*time.Millisecond,
"the verifier should flush a recheck doc after a batch",
)

coll1RecheckCount, coll2RecheckCount := 0, 0
for _, recheck := range rechecks {
suite.Require().Equal(srcDBName, recheck.PrimaryKey.SrcDatabaseName)
switch recheck.PrimaryKey.SrcCollectionName {
case "srcColl1":
coll1RecheckCount++
case "srcColl2":
coll2RecheckCount++
default:
suite.T().Fatalf("unknown collection name: %v", recheck.PrimaryKey.SrcCollectionName)
}
}
suite.Require().Equal(2, coll1RecheckCount)
suite.Require().Equal(1, coll2RecheckCount)
}

func (suite *IntegrationTestSuite) TestLargeEvents() {
ctx := suite.Context()

Expand Down
51 changes: 31 additions & 20 deletions internal/verifier/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,17 @@ func (verifier *Verifier) Check(ctx context.Context, filter map[string]any) {
verifier.MaybeStartPeriodicHeapProfileCollection(ctx)
}

func (verifier *Verifier) waitForChangeStream(ctx context.Context) error {
func (verifier *Verifier) waitForChangeStream(ctx context.Context, csr *ChangeStreamReader) error {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-verifier.changeStreamErrChan:
case err := <-csr.ErrChan:
verifier.logger.Warn().Err(err).
Msg("Received error from change stream.")
Msgf("Received error from %s.", csr)
return err
case <-verifier.changeStreamDoneChan:
case <-csr.DoneChan:
verifier.logger.Debug().
Msg("Received completion signal from change stream.")
Msgf("Received completion signal from %s.", csr)
break
}

Expand Down Expand Up @@ -82,8 +82,10 @@ func (verifier *Verifier) CheckWorker(ctxIn context.Context) error {
// If the change stream fails, everything should stop.
eg.Go(func() error {
select {
case err := <-verifier.changeStreamErrChan:
return errors.Wrap(err, "change stream failed")
case err := <-verifier.srcChangeStreamReader.ErrChan:
return errors.Wrapf(err, "%s failed", verifier.srcChangeStreamReader)
case err := <-verifier.dstChangeStreamReader.ErrChan:
return errors.Wrapf(err, "%s failed", verifier.dstChangeStreamReader)
case <-ctx.Done():
return nil
}
Expand Down Expand Up @@ -168,6 +170,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
}
verifier.running = true
verifier.globalFilter = filter
verifier.initializeChangeStreamReaders()
verifier.mux.Unlock()
defer func() {
verifier.mux.Lock()
Expand Down Expand Up @@ -204,17 +207,20 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
verifier.phase = Idle
}()

verifier.mux.RLock()
csRunning := verifier.changeStreamRunning
verifier.mux.RUnlock()
if csRunning {
verifier.logger.Debug().Msg("Check: Change stream already running.")
} else {
verifier.logger.Debug().Msg("Change stream not running; starting change stream")
ceHandlerGroup, groupCtx := errgroup.WithContext(ctx)
for _, csReader := range []*ChangeStreamReader{verifier.srcChangeStreamReader, verifier.dstChangeStreamReader} {
if csReader.changeStreamRunning {
verifier.logger.Debug().Msgf("Check: %s already running.", csReader)
} else {
verifier.logger.Debug().Msgf("%s not running; starting change stream", csReader)

err = verifier.StartChangeStream(ctx)
if err != nil {
return errors.Wrap(err, "failed to start change stream on source")
err = csReader.StartChangeStream(ctx)
if err != nil {
return errors.Wrapf(err, "failed to start %s", csReader)
}
ceHandlerGroup.Go(func() error {
return verifier.StartChangeEventHandler(groupCtx, csReader)
})
}
}

Expand Down Expand Up @@ -279,13 +285,18 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter map[string]any
// caught again on the next iteration.
if verifier.writesOff {
verifier.logger.Debug().
Msg("Waiting for change stream to end.")
Msg("Waiting for change streams to end.")

// It's necessary to wait for the change stream to finish before incrementing the
// generation number, or the last changes will not be checked.
verifier.mux.Unlock()
err := verifier.waitForChangeStream(ctx)
if err != nil {
if err = verifier.waitForChangeStream(ctx, verifier.srcChangeStreamReader); err != nil {
return err
}
if err = verifier.waitForChangeStream(ctx, verifier.dstChangeStreamReader); err != nil {
return err
}
if err = ceHandlerGroup.Wait(); err != nil {
return err
}
verifier.mux.Lock()
Expand Down
4 changes: 2 additions & 2 deletions internal/verifier/compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks(
ctx,
verifier.srcClientCollection(task),
verifier.srcClusterInfo,
verifier.srcStartAtTs,
verifier.srcChangeStreamReader.startAtTs,
task,
)

Expand All @@ -325,7 +325,7 @@ func (verifier *Verifier) getFetcherChannelsAndCallbacks(
ctx,
verifier.dstClientCollection(task),
verifier.dstClusterInfo,
nil, //startAtTs
verifier.dstChangeStreamReader.startAtTs,
task,
)

Expand Down
2 changes: 1 addition & 1 deletion internal/verifier/integration_test_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ func (suite *IntegrationTestSuite) TearDownTest() {

suite.contextCanceller(errors.Errorf("tearing down test %#q", suite.T().Name()))
suite.testContext, suite.contextCanceller = nil, nil

ctx := context.Background()
for _, client := range []*mongo.Client{suite.srcMongoClient, suite.dstMongoClient} {
dbNames, err := client.ListDatabaseNames(ctx, bson.D{})
Expand Down Expand Up @@ -184,6 +183,7 @@ func (suite *IntegrationTestSuite) BuildVerifier() *Verifier {
"should set metadata connection string",
)
verifier.SetMetaDBName(metaDBName)
verifier.initializeChangeStreamReaders()

suite.Require().NoError(verifier.srcClientCollection(&task).Drop(ctx))
suite.Require().NoError(verifier.dstClientCollection(&task).Drop(ctx))
Expand Down
Loading
Loading