Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
3659c0a
persist
FGasper Oct 30, 2024
c4a9b72
old method
FGasper Oct 30, 2024
07e2691
switch to always using resume token’s timestamp
FGasper Oct 30, 2024
a13cea7
fix StartChangeStream
FGasper Oct 30, 2024
0044884
Merge branch 'main' into REP-5201-persist-resume-token
FGasper Oct 30, 2024
2ed30e1
tweak timestamp extraction
FGasper Oct 30, 2024
7d900c0
save
FGasper Oct 30, 2024
3b18845
fix
FGasper Oct 30, 2024
8a39982
Merge branch 'main' into REP-5201-persist-resume-token
FGasper Oct 30, 2024
df8c59d
Merge branch 'main' into REP-5201-persist-resume-token
FGasper Oct 30, 2024
e65dd09
bump Go requirement to 1.20
FGasper Oct 30, 2024
346cbde
string
FGasper Oct 30, 2024
3490a04
fix test
FGasper Oct 30, 2024
e5ed6ba
reverse error check
FGasper Oct 30, 2024
a218e84
fix most lint issues
FGasper Oct 30, 2024
8ffc161
fix outstanding
FGasper Oct 30, 2024
c32edc8
update Go requirement
FGasper Oct 30, 2024
6851b57
update CI Go version
FGasper Oct 30, 2024
3017954
Merge branch 'fix_lint' into REP-5201-persist-resume-token
FGasper Oct 30, 2024
6833b7c
add resumability
FGasper Oct 30, 2024
22cbc00
fix test
FGasper Oct 30, 2024
917b5c0
tests against change stream resumption
FGasper Oct 30, 2024
bc76160
Merge branch 'main' into fix_lint
FGasper Oct 30, 2024
b321bd0
Merge branch 'fix_lint' into REP-5201-persist-resume-token
FGasper Oct 30, 2024
3f37acd
restore test
FGasper Oct 30, 2024
d1e3bb8
fix ordering assertion
FGasper Oct 30, 2024
6fe3ba6
fix error
FGasper Oct 30, 2024
67230b6
update README
FGasper Oct 30, 2024
1d3af02
tweak wording
FGasper Oct 30, 2024
b368006
Update internal/verifier/change_stream.go
FGasper Oct 31, 2024
5a54633
Evgeni’s review.
FGasper Nov 4, 2024
36a6cac
strike paragraph
FGasper Nov 4, 2024
3b26420
refactor & solve inconsistency
FGasper Nov 4, 2024
2b0a3f8
remove dupe wrap
FGasper Nov 4, 2024
57a155a
error-check the resume token persistence
FGasper Nov 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ Any collection metadata mismatches will occur in a task with the type '`verifyCo
In this case, '`failed_docs`' contains all the meta data mismatches, in this case an index named '`x_1`'.
# Resumability
The migration-verifier periodically persists its change stream’s resume token so that, in the event of a catastrophic failure (e.g., memory exhaustion), when restarted the verifier will receive any change events that happened while the verifier was down.
# Performance
The migration-verifier optimizes for the case where a migration’s initial sync is completed **and** change events are relatively infrequent. If you start verification before initial sync finishes, or if the source cluster is too busy, the verification may freeze.
Expand Down
331 changes: 233 additions & 98 deletions internal/verifier/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ package verifier

import (
"context"
"errors"
"fmt"
"time"

"github.com/10gen/migration-verifier/internal/keystring"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"golang.org/x/exp/constraints"
)

// ParsedEvent contains the fields of an event that we have parsed from 'bson.Raw'.
Expand All @@ -32,6 +34,19 @@ type DocKey struct {
ID interface{} `bson:"_id"`
}

const (
minChangeStreamPersistInterval = time.Second * 10
metadataChangeStreamCollectionName = "changeStream"
)

type UnknownEventError struct {
Event *ParsedEvent
}

func (uee UnknownEventError) Error() string {
return fmt.Sprintf("Unknown event type: %#q", uee.Event.OpType)
}

// HandleChangeStreamEvent performs the necessary work for change stream events that occur during
// operation.
func (verifier *Verifier) HandleChangeStreamEvent(ctx context.Context, changeEvent *ParsedEvent) error {
Expand All @@ -50,7 +65,7 @@ func (verifier *Verifier) HandleChangeStreamEvent(ctx context.Context, changeEve
case "update":
return verifier.InsertChangeEventRecheckDoc(ctx, changeEvent)
default:
return errors.New(`Not supporting: "` + changeEvent.OpType + `" events`)
return UnknownEventError{Event: changeEvent}
}
}

Expand All @@ -67,121 +82,241 @@ func (verifier *Verifier) GetChangeStreamFilter() []bson.D {
return []bson.D{stage}
}

// StartChangeStream starts the change stream.
func (verifier *Verifier) StartChangeStream(ctx context.Context, startTime *primitive.Timestamp) error {
streamReader := func(cs *mongo.ChangeStream) {
var changeEvent ParsedEvent
for {
select {
// if the context is cancelled return immmediately
case <-ctx.Done():
return
// if the changeStreamEnderChan has a message, we have moved to the Recheck phase, obtain
// the remaining changes, but when TryNext returns false, we will exit, since there should
// be no message until the user has guaranteed writes to the source have ended.
case <-verifier.changeStreamEnderChan:
for cs.TryNext(ctx) {
if err := cs.Decode(&changeEvent); err != nil {
verifier.logger.Fatal().Err(err).Msg("Failed to decode change event")
}
err := verifier.HandleChangeStreamEvent(ctx, &changeEvent)
if err != nil {
verifier.changeStreamErrChan <- err
verifier.logger.Fatal().Err(err).Msg("Error handling change event")
}
}
verifier.mux.Lock()
verifier.changeStreamRunning = false
if verifier.lastChangeEventTime != nil {
verifier.srcStartAtTs = verifier.lastChangeEventTime
}
verifier.mux.Unlock()
// since we have started Recheck, we must signal that we have
// finished the change stream changes so that Recheck can continue.
verifier.changeStreamDoneChan <- struct{}{}
// since the changeStream is exhausted, we now return
verifier.logger.Debug().Msg("Change stream is done")
return
// the default case is that we are still in the Check phase, in the check phase we still
// use TryNext, but we do not exit if TryNext returns false.
default:
if next := cs.TryNext(ctx); !next {
continue
}
if err := cs.Decode(&changeEvent); err != nil {
verifier.logger.Fatal().Err(err).Msg("")
}
err := verifier.HandleChangeStreamEvent(ctx, &changeEvent)
if err != nil {
verifier.changeStreamErrChan <- err
return
}
func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.ChangeStream) {
var changeEvent ParsedEvent

var lastPersistedTime time.Time

persistResumeTokenIfNeeded := func() error {
if time.Since(lastPersistedTime) <= minChangeStreamPersistInterval {
return nil
}

err := verifier.persistChangeStreamResumeToken(ctx, cs)
if err == nil {
lastPersistedTime = time.Now()
}

return err
}

for {
var err error

for cs.TryNext(ctx) {
if err = cs.Decode(&changeEvent); err != nil {
err = errors.Wrap(err, "failed to decode change event")
break
}
err = verifier.HandleChangeStreamEvent(ctx, &changeEvent)
if err != nil {
err = errors.Wrap(err, "failed to handle change event")
break
}
}

if cs.Err() != nil {
err = errors.Wrap(
cs.Err(),
"change stream iteration failed",
)
}

if err == nil {
err = persistResumeTokenIfNeeded()
}

if err != nil {
if !errors.Is(err, context.Canceled) {
verifier.changeStreamErrChan <- err
}

return
}

select {
// If the changeStreamEnderChan has a message, the user has indicated that
// source writes are ended. This means we should exit rather than continue
// reading the change stream since there should be no more events.
case <-verifier.changeStreamEnderChan:
verifier.mux.Lock()
verifier.changeStreamRunning = false
if verifier.lastChangeEventTime != nil {
verifier.srcStartAtTs = verifier.lastChangeEventTime
}
verifier.mux.Unlock()
// since we have started Recheck, we must signal that we have
// finished the change stream changes so that Recheck can continue.
verifier.changeStreamDoneChan <- struct{}{}
// since the changeStream is exhausted, we now return
verifier.logger.Debug().Msg("Change stream is done")
return
default:
}
}
}

// StartChangeStream starts the change stream.
func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
pipeline := verifier.GetChangeStreamFilter()
opts := options.ChangeStream().SetMaxAwaitTime(1 * time.Second)
if startTime != nil {
opts = opts.SetStartAtOperationTime(startTime)
verifier.srcStartAtTs = startTime

savedResumeToken, err := verifier.loadChangeStreamResumeToken(ctx)
if err != nil {
return errors.Wrap(err, "failed to load persisted change stream resume token")
}

csStartLogEvent := verifier.logger.Info()

if savedResumeToken != nil {
logEvent := csStartLogEvent.
Stringer("resumeToken", savedResumeToken)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be wary of printing out the whole resume token as-is. This caused a fatal error in REP-3625, so I think we should either truncate it or omit it. Thoughts?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think that not to happen here because the verifier will run simpler change streams than mongosync does under ORR, but yeah it’s probably best not to chance it.

Replacing with an extracted timestamp.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think REP-3625 is evergreen unable to parse a long log line, which seems to have been fixed by evergreen.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even so, it seems best to err here on the side of caution. We probably don’t need the full resume token in the log.


ts, err := extractTimestampFromResumeToken(savedResumeToken)
if err == nil {
logEvent = addUnixTimeToLogEvent(ts.T, logEvent)
} else {
verifier.logger.Warn().
Err(err).
Msg("Failed to extract timestamp from persisted resume token.")
}

logEvent.Msg("Starting change stream from persisted resume token.")

opts = opts.SetStartAfter(savedResumeToken)
} else {
csStartLogEvent.Msg("Starting change stream from current source cluster time.")
}

sess, err := verifier.srcClient.StartSession()
if err != nil {
return err
return errors.Wrap(err, "failed to start session")
}
sctx := mongo.NewSessionContext(ctx, sess)
srcChangeStream, err := verifier.srcClient.Watch(sctx, pipeline, opts)
if err != nil {
return errors.Wrap(err, "failed to open change stream")
}

err = verifier.persistChangeStreamResumeToken(ctx, srcChangeStream)
if err != nil {
return err
}
if startTime == nil {
resumeToken := srcChangeStream.ResumeToken()
if resumeToken == nil {
return errors.New("Resume token is missing; cannot choose start time")
}
// Change stream token is always a V1 keystring in the _data field
resumeTokenDataValue := resumeToken.Lookup("_data")
resumeTokenData, ok := resumeTokenDataValue.StringValueOK()
if !ok {
return fmt.Errorf("Resume token _data is missing or the wrong type: %v",
resumeTokenDataValue.Type)
}
resumeTokenBson, err := keystring.KeystringToBson(keystring.V1, resumeTokenData)
if err != nil {
return err
}
// First element is the cluster time we want
resumeTokenTime, ok := resumeTokenBson[0].Value.(primitive.Timestamp)
if !ok {
return errors.New("Resume token lacks a cluster time")
}
verifier.srcStartAtTs = &resumeTokenTime

// On sharded servers the resume token time can be ahead of the actual cluster time by one
// increment. In that case we must use the actual cluster time or we will get errors.
clusterTimeRaw := sess.ClusterTime()
clusterTimeInner, err := clusterTimeRaw.LookupErr("$clusterTime")
if err != nil {
return err
}
clusterTimeTsVal, err := bson.Raw(clusterTimeInner.Value).LookupErr("clusterTime")
if err != nil {
return err
}
var clusterTimeTs primitive.Timestamp
clusterTimeTs.T, clusterTimeTs.I, ok = clusterTimeTsVal.TimestampOK()
if !ok {
return errors.New("Cluster time is not a timestamp")
}
csTimestamp, err := extractTimestampFromResumeToken(srcChangeStream.ResumeToken())
if err != nil {
return errors.Wrap(err, "failed to extract timestamp from change stream's resume token")
}

verifier.logger.Debug().Msgf("Initial cluster time is %+v", clusterTimeTs)
if clusterTimeTs.Compare(resumeTokenTime) < 0 {
verifier.srcStartAtTs = &clusterTimeTs
}
clusterTime, err := getClusterTimeFromSession(sess)
if err != nil {
return errors.Wrap(err, "failed to read cluster time from session")
}

verifier.srcStartAtTs = &csTimestamp
if csTimestamp.After(clusterTime) {
verifier.srcStartAtTs = &clusterTime
}

verifier.mux.Lock()
verifier.changeStreamRunning = true
verifier.mux.Unlock()
go streamReader(srcChangeStream)

go verifier.iterateChangeStream(ctx, srcChangeStream)

return nil
}

func addUnixTimeToLogEvent[T constraints.Integer](unixTime T, event *zerolog.Event) *zerolog.Event {
return event.Time("clockTime", time.Unix(int64(unixTime), int64(0)))
}

func (v *Verifier) getChangeStreamMetadataCollection() *mongo.Collection {
return v.metaClient.Database(v.metaDBName).Collection(metadataChangeStreamCollectionName)
}

func (verifier *Verifier) loadChangeStreamResumeToken(ctx context.Context) (bson.Raw, error) {
coll := verifier.getChangeStreamMetadataCollection()

token, err := coll.FindOne(
ctx,
bson.D{{"_id", "resumeToken"}},
).Raw()

if errors.Is(err, mongo.ErrNoDocuments) {
return nil, nil
}

return token, err
}

func (verifier *Verifier) persistChangeStreamResumeToken(ctx context.Context, cs *mongo.ChangeStream) error {
token := cs.ResumeToken()

coll := verifier.getChangeStreamMetadataCollection()
_, err := coll.ReplaceOne(
ctx,
bson.D{{"_id", "resumeToken"}},
token,
options.Replace().SetUpsert(true),
)

if err == nil {
ts, err := extractTimestampFromResumeToken(token)

logEvent := verifier.logger.Debug()

if err == nil {
logEvent = addUnixTimeToLogEvent(ts.T, logEvent)
} else {
verifier.logger.Warn().Err(err).
Msg("failed to extract resume token timestamp")
}

logEvent.Msg("Persisted change stream resume token.")

return nil
}

return errors.Wrapf(err, "failed to persist change stream resume token (%v)", token)
}

func extractTimestampFromResumeToken(resumeToken bson.Raw) (primitive.Timestamp, error) {
tokenStruct := struct {
Data string `bson:"_data"`
}{}

// Change stream token is always a V1 keystring in the _data field
err := bson.Unmarshal(resumeToken, &tokenStruct)
if err != nil {
return primitive.Timestamp{}, errors.Wrapf(err, "failed to extract %#q from resume token (%v)", "_data", resumeToken)
}

resumeTokenBson, err := keystring.KeystringToBson(keystring.V1, tokenStruct.Data)
if err != nil {
return primitive.Timestamp{}, err
}
// First element is the cluster time we want
resumeTokenTime, ok := resumeTokenBson[0].Value.(primitive.Timestamp)
if !ok {
return primitive.Timestamp{}, errors.Errorf("resume token data's (%+v) first element is of type %T, not a timestamp", resumeTokenBson, resumeTokenBson[0].Value)
}

return resumeTokenTime, nil
}

func getClusterTimeFromSession(sess mongo.Session) (primitive.Timestamp, error) {
ctStruct := struct {
ClusterTime struct {
ClusterTime primitive.Timestamp `bson:"clusterTime"`
} `bson:"$clusterTime"`
}{}

clusterTimeRaw := sess.ClusterTime()
err := bson.Unmarshal(sess.ClusterTime(), &ctStruct)
if err != nil {
return primitive.Timestamp{}, errors.Wrapf(err, "failed to find clusterTime in session cluster time document (%v)", clusterTimeRaw)
}

return ctStruct.ClusterTime.ClusterTime, nil
}
Loading
Loading