-
Notifications
You must be signed in to change notification settings - Fork 15
REP-5201 Persist the change stream’s resume token. #30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 29 commits
3659c0a
c4a9b72
07e2691
a13cea7
0044884
2ed30e1
7d900c0
3b18845
8a39982
df8c59d
e65dd09
346cbde
3490a04
e5ed6ba
a218e84
8ffc161
c32edc8
6851b57
3017954
6833b7c
22cbc00
917b5c0
bc76160
b321bd0
3f37acd
d1e3bb8
6fe3ba6
67230b6
1d3af02
b368006
5a54633
36a6cac
3b26420
2b0a3f8
57a155a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,15 +2,17 @@ package verifier | |
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "fmt" | ||
| "time" | ||
|
|
||
| "github.com/10gen/migration-verifier/internal/keystring" | ||
| "github.com/pkg/errors" | ||
| "github.com/rs/zerolog" | ||
| "go.mongodb.org/mongo-driver/bson" | ||
| "go.mongodb.org/mongo-driver/bson/primitive" | ||
| "go.mongodb.org/mongo-driver/mongo" | ||
| "go.mongodb.org/mongo-driver/mongo/options" | ||
| "golang.org/x/exp/constraints" | ||
| ) | ||
|
|
||
| // ParsedEvent contains the fields of an event that we have parsed from 'bson.Raw'. | ||
|
|
@@ -32,6 +34,19 @@ type DocKey struct { | |
| ID interface{} `bson:"_id"` | ||
| } | ||
|
|
||
| const ( | ||
| minChangeStreamPersistInterval = time.Second * 10 | ||
| metadataChangeStreamCollectionName = "changeStream" | ||
| ) | ||
|
|
||
| type UnknownEventError struct { | ||
| Event *ParsedEvent | ||
| } | ||
|
|
||
| func (uee UnknownEventError) Error() string { | ||
| return fmt.Sprintf("Unknown event type: %#q", uee.Event.OpType) | ||
| } | ||
|
|
||
| // HandleChangeStreamEvent performs the necessary work for change stream events that occur during | ||
| // operation. | ||
| func (verifier *Verifier) HandleChangeStreamEvent(ctx context.Context, changeEvent *ParsedEvent) error { | ||
|
|
@@ -50,7 +65,7 @@ func (verifier *Verifier) HandleChangeStreamEvent(ctx context.Context, changeEve | |
| case "update": | ||
| return verifier.InsertChangeEventRecheckDoc(ctx, changeEvent) | ||
| default: | ||
| return errors.New(`Not supporting: "` + changeEvent.OpType + `" events`) | ||
| return UnknownEventError{Event: changeEvent} | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -67,121 +82,233 @@ func (verifier *Verifier) GetChangeStreamFilter() []bson.D { | |
| return []bson.D{stage} | ||
| } | ||
|
|
||
| // StartChangeStream starts the change stream. | ||
| func (verifier *Verifier) StartChangeStream(ctx context.Context, startTime *primitive.Timestamp) error { | ||
| streamReader := func(cs *mongo.ChangeStream) { | ||
| var changeEvent ParsedEvent | ||
| for { | ||
| select { | ||
| // if the context is cancelled return immmediately | ||
| case <-ctx.Done(): | ||
| return | ||
| // if the changeStreamEnderChan has a message, we have moved to the Recheck phase, obtain | ||
| // the remaining changes, but when TryNext returns false, we will exit, since there should | ||
| // be no message until the user has guaranteed writes to the source have ended. | ||
| case <-verifier.changeStreamEnderChan: | ||
| for cs.TryNext(ctx) { | ||
| if err := cs.Decode(&changeEvent); err != nil { | ||
| verifier.logger.Fatal().Err(err).Msg("Failed to decode change event") | ||
| } | ||
| err := verifier.HandleChangeStreamEvent(ctx, &changeEvent) | ||
| if err != nil { | ||
| verifier.changeStreamErrChan <- err | ||
| verifier.logger.Fatal().Err(err).Msg("Error handling change event") | ||
| } | ||
| } | ||
| verifier.mux.Lock() | ||
| verifier.changeStreamRunning = false | ||
| if verifier.lastChangeEventTime != nil { | ||
| verifier.srcStartAtTs = verifier.lastChangeEventTime | ||
| } | ||
| verifier.mux.Unlock() | ||
| // since we have started Recheck, we must signal that we have | ||
| // finished the change stream changes so that Recheck can continue. | ||
| verifier.changeStreamDoneChan <- struct{}{} | ||
| // since the changeStream is exhausted, we now return | ||
| verifier.logger.Debug().Msg("Change stream is done") | ||
| return | ||
| // the default case is that we are still in the Check phase, in the check phase we still | ||
| // use TryNext, but we do not exit if TryNext returns false. | ||
| default: | ||
| if next := cs.TryNext(ctx); !next { | ||
| continue | ||
| } | ||
| func (verifier *Verifier) iterateChangeStream(ctx context.Context, cs *mongo.ChangeStream) { | ||
| var changeEvent ParsedEvent | ||
|
|
||
| var lastPersistedTime time.Time | ||
|
|
||
| persistResumeTokenIfNeeded := func() error { | ||
| if time.Since(lastPersistedTime) <= minChangeStreamPersistInterval { | ||
| return nil | ||
| } | ||
|
|
||
| err := verifier.persistChangeStreamResumeToken(ctx, cs) | ||
| if err == nil { | ||
| lastPersistedTime = time.Now() | ||
| } | ||
|
|
||
| return err | ||
| } | ||
|
|
||
| for { | ||
| select { | ||
| // if the context is cancelled return immmediately | ||
| case <-ctx.Done(): | ||
| return | ||
| // if the changeStreamEnderChan has a message, we have moved to the Recheck phase, obtain | ||
| // the remaining changes, but when TryNext returns false, we will exit, since there should | ||
| // be no message until the user has guaranteed writes to the source have ended. | ||
| case <-verifier.changeStreamEnderChan: | ||
| for cs.TryNext(ctx) { | ||
tdq45gj marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if err := cs.Decode(&changeEvent); err != nil { | ||
| verifier.logger.Fatal().Err(err).Msg("") | ||
| verifier.logger.Fatal().Err(err).Msg("Failed to decode change event") | ||
| } | ||
| err := verifier.HandleChangeStreamEvent(ctx, &changeEvent) | ||
| if err != nil { | ||
| verifier.changeStreamErrChan <- err | ||
| return | ||
| verifier.logger.Fatal().Err(err).Msg("Error handling change event") | ||
| } | ||
|
||
| } | ||
| verifier.mux.Lock() | ||
| verifier.changeStreamRunning = false | ||
| if verifier.lastChangeEventTime != nil { | ||
| verifier.srcStartAtTs = verifier.lastChangeEventTime | ||
| } | ||
| verifier.mux.Unlock() | ||
| // since we have started Recheck, we must signal that we have | ||
| // finished the change stream changes so that Recheck can continue. | ||
| verifier.changeStreamDoneChan <- struct{}{} | ||
| // since the changeStream is exhausted, we now return | ||
| verifier.logger.Debug().Msg("Change stream is done") | ||
| return | ||
| // the default case is that we are still in the Check phase, in the check phase we still | ||
| // use TryNext, but we do not exit if TryNext returns false. | ||
| default: | ||
| var err error | ||
|
|
||
| if next := cs.TryNext(ctx); next { | ||
|
||
| if err = cs.Decode(&changeEvent); err != nil { | ||
| err = errors.Wrapf(err, "failed to decode change event (%v)", cs.Current) | ||
| } | ||
|
|
||
| if err == nil { | ||
| err = verifier.HandleChangeStreamEvent(ctx, &changeEvent) | ||
| if err != nil { | ||
| err = errors.Wrapf(err, "failed to handle change event (%+v)", changeEvent) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if err == nil { | ||
| err = persistResumeTokenIfNeeded() | ||
| } | ||
|
|
||
| if err != nil { | ||
| verifier.changeStreamErrChan <- err | ||
| return | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // StartChangeStream starts the change stream. | ||
| func (verifier *Verifier) StartChangeStream(ctx context.Context) error { | ||
| pipeline := verifier.GetChangeStreamFilter() | ||
| opts := options.ChangeStream().SetMaxAwaitTime(1 * time.Second) | ||
| if startTime != nil { | ||
| opts = opts.SetStartAtOperationTime(startTime) | ||
| verifier.srcStartAtTs = startTime | ||
|
|
||
| savedResumeToken, err := verifier.loadChangeStreamResumeToken(ctx) | ||
| if err != nil { | ||
| return errors.Wrap(err, "failed to load persisted change stream resume token") | ||
| } | ||
|
|
||
| csStartLogEvent := verifier.logger.Info() | ||
|
|
||
| if savedResumeToken != nil { | ||
| logEvent := csStartLogEvent. | ||
| Stringer("resumeToken", savedResumeToken) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd be wary of printing out the whole resume token as-is. This caused a fatal error in REP-3625, so I think we should either truncate it or omit it. Thoughts?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would think that not to happen here because the verifier will run simpler change streams than mongosync does under ORR, but yeah it’s probably best not to chance it. Replacing with an extracted timestamp.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think REP-3625 is evergreen unable to parse a long log line, which seems to have been fixed by evergreen.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even so, it seems best to err here on the side of caution. We probably don’t need the full resume token in the log. |
||
|
|
||
| ts, err := extractTimestampFromResumeToken(savedResumeToken) | ||
| if err == nil { | ||
| logEvent = addUnixTimeToLogEvent(ts.T, logEvent) | ||
| } else { | ||
| verifier.logger.Warn(). | ||
| Err(err). | ||
| Msg("Failed to extract timestamp from persisted resume token.") | ||
| } | ||
|
|
||
| logEvent.Msg("Starting change stream from persisted resume token.") | ||
|
|
||
| opts = opts.SetStartAfter(savedResumeToken) | ||
| } else { | ||
| csStartLogEvent.Msg("Starting change stream from current source cluster time.") | ||
| } | ||
|
|
||
| sess, err := verifier.srcClient.StartSession() | ||
| if err != nil { | ||
| return err | ||
| return errors.Wrap(err, "failed to start session") | ||
| } | ||
| sctx := mongo.NewSessionContext(ctx, sess) | ||
| srcChangeStream, err := verifier.srcClient.Watch(sctx, pipeline, opts) | ||
| if err != nil { | ||
| return err | ||
| return errors.Wrap(err, "failed to open change stream") | ||
| } | ||
| if startTime == nil { | ||
| resumeToken := srcChangeStream.ResumeToken() | ||
| if resumeToken == nil { | ||
| return errors.New("Resume token is missing; cannot choose start time") | ||
| } | ||
| // Change stream token is always a V1 keystring in the _data field | ||
| resumeTokenDataValue := resumeToken.Lookup("_data") | ||
| resumeTokenData, ok := resumeTokenDataValue.StringValueOK() | ||
| if !ok { | ||
| return fmt.Errorf("Resume token _data is missing or the wrong type: %v", | ||
| resumeTokenDataValue.Type) | ||
| } | ||
| resumeTokenBson, err := keystring.KeystringToBson(keystring.V1, resumeTokenData) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| // First element is the cluster time we want | ||
| resumeTokenTime, ok := resumeTokenBson[0].Value.(primitive.Timestamp) | ||
| if !ok { | ||
| return errors.New("Resume token lacks a cluster time") | ||
| } | ||
| verifier.srcStartAtTs = &resumeTokenTime | ||
|
|
||
| // On sharded servers the resume token time can be ahead of the actual cluster time by one | ||
| // increment. In that case we must use the actual cluster time or we will get errors. | ||
| clusterTimeRaw := sess.ClusterTime() | ||
| clusterTimeInner, err := clusterTimeRaw.LookupErr("$clusterTime") | ||
| if err != nil { | ||
| return err | ||
| } | ||
| clusterTimeTsVal, err := bson.Raw(clusterTimeInner.Value).LookupErr("clusterTime") | ||
| if err != nil { | ||
| return err | ||
| } | ||
| var clusterTimeTs primitive.Timestamp | ||
| clusterTimeTs.T, clusterTimeTs.I, ok = clusterTimeTsVal.TimestampOK() | ||
| if !ok { | ||
| return errors.New("Cluster time is not a timestamp") | ||
| } | ||
|
|
||
| verifier.logger.Debug().Msgf("Initial cluster time is %+v", clusterTimeTs) | ||
| if clusterTimeTs.Compare(resumeTokenTime) < 0 { | ||
| verifier.srcStartAtTs = &clusterTimeTs | ||
| } | ||
| err = verifier.persistChangeStreamResumeToken(ctx, srcChangeStream) | ||
| if err != nil { | ||
| return errors.Wrap(err, "failed to persist change stream resume token") | ||
tdq45gj marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| csTimestamp, err := extractTimestampFromResumeToken(srcChangeStream.ResumeToken()) | ||
| if err != nil { | ||
| return errors.Wrap(err, "failed to extract timestamp from change stream's resume token") | ||
tdq45gj marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| clusterTime, err := getClusterTimeFromSession(sess) | ||
| if err != nil { | ||
| return errors.Wrap(err, "failed to read cluster time from session") | ||
| } | ||
|
|
||
| verifier.srcStartAtTs = &csTimestamp | ||
| if csTimestamp.After(clusterTime) { | ||
| verifier.srcStartAtTs = &clusterTime | ||
| } | ||
|
|
||
| verifier.mux.Lock() | ||
| verifier.changeStreamRunning = true | ||
| verifier.mux.Unlock() | ||
| go streamReader(srcChangeStream) | ||
|
|
||
| go verifier.iterateChangeStream(ctx, srcChangeStream) | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func addUnixTimeToLogEvent[T constraints.Integer](unixTime T, event *zerolog.Event) *zerolog.Event { | ||
| return event.Time("clockTime", time.Unix(int64(unixTime), int64(0))) | ||
| } | ||
|
|
||
| func (v *Verifier) getChangeStreamMetadataCollection() *mongo.Collection { | ||
| return v.metaClient.Database(v.metaDBName).Collection(metadataChangeStreamCollectionName) | ||
| } | ||
|
|
||
| func (verifier *Verifier) loadChangeStreamResumeToken(ctx context.Context) (bson.Raw, error) { | ||
| coll := verifier.getChangeStreamMetadataCollection() | ||
|
|
||
| token, err := coll.FindOne( | ||
| ctx, | ||
| bson.D{{"_id", "resumeToken"}}, | ||
| ).Raw() | ||
|
|
||
| if errors.Is(err, mongo.ErrNoDocuments) { | ||
| return nil, nil | ||
| } | ||
|
|
||
| return token, err | ||
| } | ||
|
|
||
| func (verifier *Verifier) persistChangeStreamResumeToken(ctx context.Context, cs *mongo.ChangeStream) error { | ||
| token := cs.ResumeToken() | ||
|
|
||
| coll := verifier.getChangeStreamMetadataCollection() | ||
| _, err := coll.ReplaceOne( | ||
| ctx, | ||
| bson.D{{"_id", "resumeToken"}}, | ||
| token, | ||
| options.Replace().SetUpsert(true), | ||
| ) | ||
|
|
||
| verifier.logger.Debug(). | ||
| Stringer("resumeToken", token). | ||
| Msg("Persisted change stream resume token.") | ||
|
|
||
| return errors.Wrapf(err, "failed to persist change stream resume token (%v)", cs.ResumeToken()) | ||
FGasper marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| func extractTimestampFromResumeToken(resumeToken bson.Raw) (primitive.Timestamp, error) { | ||
| tokenStruct := struct { | ||
| Data string `bson:"_data"` | ||
| }{} | ||
|
|
||
| // Change stream token is always a V1 keystring in the _data field | ||
| err := bson.Unmarshal(resumeToken, &tokenStruct) | ||
| if err != nil { | ||
| return primitive.Timestamp{}, errors.Wrapf(err, "failed to extract %#q from resume token (%v)", "_data", resumeToken) | ||
| } | ||
|
|
||
| resumeTokenBson, err := keystring.KeystringToBson(keystring.V1, tokenStruct.Data) | ||
| if err != nil { | ||
| return primitive.Timestamp{}, err | ||
| } | ||
| // First element is the cluster time we want | ||
| resumeTokenTime, ok := resumeTokenBson[0].Value.(primitive.Timestamp) | ||
| if !ok { | ||
| return primitive.Timestamp{}, errors.Errorf("resume token data's (%+v) first element is of type %T, not a timestamp", resumeTokenBson, resumeTokenBson[0].Value) | ||
| } | ||
|
|
||
| return resumeTokenTime, nil | ||
| } | ||
|
|
||
| func getClusterTimeFromSession(sess mongo.Session) (primitive.Timestamp, error) { | ||
| ctStruct := struct { | ||
| ClusterTime struct { | ||
| ClusterTime primitive.Timestamp `bson:"clusterTime"` | ||
| } `bson:"$clusterTime"` | ||
| }{} | ||
|
|
||
| clusterTimeRaw := sess.ClusterTime() | ||
| err := bson.Unmarshal(sess.ClusterTime(), &ctStruct) | ||
| if err != nil { | ||
| return primitive.Timestamp{}, errors.Wrapf(err, "failed to find clusterTime in session cluster time document (%v)", clusterTimeRaw) | ||
| } | ||
|
|
||
| return ctStruct.ClusterTime.ClusterTime, nil | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.