-
Notifications
You must be signed in to change notification settings - Fork 15
REP-5201 Persist the change stream’s resume token. #30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
tdq45gj
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good in general. I've left some comments and questions.
internal/verifier/change_stream.go
Outdated
| verifier.changeStreamErrChan <- err | ||
| return | ||
| verifier.logger.Fatal().Err(err).Msg("Error handling change event") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we also need to update the persisted resume token during rechecks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just moved around from preexisting code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no preexisting token to persist change stream resume tokens. If we persist resume tokens during Check phase, it sounds like we can also persist resume tokens during Recheck phase right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't persist during Recheck, then a failure during Recheck will go back to the last persisted token from Check on resume, which seems wrong. So I think it should persist here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I can tell the change stream doesn’t distinguish check from recheck. Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, no I’d missed this. Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’ve refactored this code to remove duplication. That effort also solves the missing cs.Err() check that @edobranov pointed out.
Co-authored-by: Jian Guan <[email protected]>
internal/verifier/change_stream.go
Outdated
| default: | ||
| var err error | ||
|
|
||
| if next := cs.TryNext(ctx); next { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tangential question: shouldn't we be checking cs.Err()? If it's not nil, then cs.TryNext() will always return false but we'll continue iterating the change stream. I think this can end up looking like a hang from the user's perspective?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point … I’ll make a separate ticket for that (since this is just copied code).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bah … it’s so simple I’ll just fix it here.
internal/verifier/change_stream.go
Outdated
| verifier.logger.Debug(). | ||
| Stringer("resumeToken", token). | ||
| Msg("Persisted change stream resume token.") | ||
|
|
||
| return errors.Wrapf(err, "failed to persist change stream resume token (%v)", token) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd swap these. I don't think we want the logs to say that the resume token got persisted, but then we see an error indicating that this didn't actually happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch. Fixed.
|
|
||
| if savedResumeToken != nil { | ||
| logEvent := csStartLogEvent. | ||
| Stringer("resumeToken", savedResumeToken) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be wary of printing out the whole resume token as-is. This caused a fatal error in REP-3625, so I think we should either truncate it or omit it. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would think that not to happen here because the verifier will run simpler change streams than mongosync does under ORR, but yeah it’s probably best not to chance it.
Replacing with an extracted timestamp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think REP-3625 is evergreen unable to parse a long log line, which seems to have been fixed by evergreen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even so, it seems best to err here on the side of caution. We probably don’t need the full resume token in the log.
tdq45gj
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM % one question about handling resume token persistence errors.
internal/verifier/change_stream.go
Outdated
| return | ||
| } | ||
|
|
||
| persistResumeTokenIfNeeded() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we care about the error from persisting resume token?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doh. Yes.
edobranov
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
PR #30 added persistence of the change stream’s resume token but mishandled some of the refactoring by making every change stream iteration loop read all events. This restores the intended logic: check for context cancellation or the end of the change stream after each event.
Reviewer Notes
A fair bit of change_stream.go has just been moved around from before.
This teaches migration-verifier to persist its resume token. Without this, success reports from the verifier aren’t fully reliable because during any downtime it’s possible for change events to happen that would go unverified.
This changeset institutes persistence of the resume token by persisting the token roughly every 10 seconds.
This changeset alters (and simplifies) the verifier’s method for setting its
srcStartAtTsproperty. The rationale for this follows:Before this changeset, the verifier called
GetLastOpTimeAndSyncShardClusterTime()before opening the change stream. If that function succeeded, the verifier would start the change stream at that time and set it assrcStartAtTs; if not, the verifier would ignore the failure, start the change stream, read the resume token’s timestamp as well as the cluster time, then set the earlier of those two assrcStartAtTs.If
min(tokenTs, clusterTs)is acceptable and always available, then there’s no reason to callGetLastOpTimeAndSyncShardClusterTime()(which can fail) in the first place. Thus, this changeset removes that function and always determinessrcStartAtTsfrom the new change stream’s resume token (or session cluster time).If a resume token was previously persisted, then the change stream is initialized with that resume token. Note that
srcStartAtTswill correspond to that resume token, but that shouldn’t actually affect anything because it’s OK forsrcStartAtTsto be “in the past” since it’s only use is to be afindcommand’sreadConcern.afterClusterTime.