Skip to content

Conversation

@tdq45gj
Copy link
Collaborator

@tdq45gj tdq45gj commented Nov 22, 2024

This PR adds a destination change stream to the MV. It also adds a ChangeStreamReader struct that reads change events from either source or destination. Change events are handled in change event handlers that're goroutines started for each of the change stream reader.

@tdq45gj tdq45gj marked this pull request as ready for review November 22, 2024 20:59
@tdq45gj tdq45gj requested a review from FGasper November 25, 2024 22:12
Copy link
Collaborator

@FGasper FGasper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In broad strokes this looks good.

I’ve noted some small things, mostly nits.

Also, can there be a more end-to-end-ish test that if the destination receives an update to a document after MV has checked it that we’ll get a mismatch reported?

dstNamespaces []string
nsMap map[string]string
srcDstNsMap map[string]string
dstSrcNsMap map[string]string
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this always just the inverse of srcDstNsMap? If so, I think we’d be better served by a dedicated type with an accessor.

verifier.logger,
verifier.srcClient,
)
srcFinalTs, err := GetNewClusterTime(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In upstream the mux is locked during GetNewClusterTime. Does this need to change? If so, why?

If not, can we keep upstream’s behavior here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think GetNewClusterTime needs to be under the lock. Here I'm removing the verifier.writesOffTimestamp field and adding the above error so that writesOff can't be called twice.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think GetNewClusterTime needs to be under the lock.

What benefit do we realize from changing this, though?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we should minimize the critical section.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that this is fairly brittle stuff. We’ve made it less so, but altering concurrency-related code seems like a risk we’re better off avoiding.

I can’t see anything wrong with your reasoning, but this sort of stuff is prone to “surprises”. Will this palpably improve speed? If so, then I’m OK with it. If not, IMO we should retain the present logic.

}
} else {
verifier.mux.Unlock()
// This has to happen under the lock because the change stream
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment restores my “under the lock” typo. :-P Should probably fix.

// sorting by _id will guarantee that all rechecks for a given
// namespace appear consecutively.
//
// DatabaseName and CollectionName should be on the source.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of this comment, the struct could rename its members?

// If the changeStreamEnderChan has a message, the user has indicated that
// source writes are ended. This means we should exit rather than continue
// If the ChangeStreamEnderChan has a message, the user has indicated that
// source and destination writes are ended. This means we should exit rather than continue
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t know that the “and destination” part is gainful here. It might suggest that user writes to the destination are expected during a migration, which they definitely aren’t.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it means that the migration tool has stopped/committed. I'll clarify in this comment.

gotwritesOffTimestamp = true

// Read all change events until the source reports no events.
// Read all change events until the source / destination reports no events.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log is inaccurate upstream; we might as well fix it here.

Maybe:

Read change events until the stream reaches the writesOffTs.

case err := <-verifier.srcChangeStreamReader.ChangeStreamErrChan:
cancel()
return errors.Wrap(err, "change stream failed")
return errors.Wrapf(err, "got an error from %s", verifier.srcChangeStreamReader)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you prefer “got an error” to “failed” here? The latter seems (to me, at least) like stronger, more concise wording.

Maybe:

errors.Wrapf(err, "%s failed", verifier.srcChangeStreamReader)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

return errors.Wrapf(err, "got an error from %s", verifier.srcChangeStreamReader)
case err := <-verifier.dstChangeStreamReader.ChangeStreamErrChan:
cancel()
return errors.Wrapf(err, "got an error from %s", verifier.dstChangeStreamReader)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same q re “got an error”

if err != nil {
return errors.Wrap(err, "failed to start change stream on destination")
}
verifier.StartChangeEventHandler(ctx, verifier.dstChangeStreamReader)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since ChangeStreamReader has a String() method, you could just loop over the src & dst change streams rather than duplicating this logic.

Comment on lines 475 to 476
verifier.SetSrcNamespaces([]string{"srcDB.srcColl1", "srcDB.srcColl2"})
verifier.SetDstNamespaces([]string{"dstDB.dstColl1", "dstDB.dstColl2"})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can these use suite.DBNameForTest() rather than hard-coding DB names?

@tdq45gj tdq45gj requested a review from FGasper December 2, 2024 12:54
Copy link
Collaborator

@FGasper FGasper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ve left a few more small comments. I think this is quite close! Please ping me when you’ve had a chance to look.

Thanks!


// StartChangeEventHandler starts a goroutine that handles change event batches from the reader.
// It needs to be started after the reader starts.
func (verifier *Verifier) StartChangeEventHandler(ctx context.Context, reader *ChangeStreamReader, errGroup *errgroup.Group) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It seems slightly cleaner not to pass the errGroup around, but just to let the caller call this function in errGroup.Go().

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return errors.Wrapf(err, "failed to decode change event to %T", changeEventBatch[eventsRead])
}

csr.logger.Trace().Msgf("%s received a change event: %v", csr, changeEventBatch[eventsRead])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trace-level logs make sense, but is there any way to actually see them? The CLI just exposes a --debug option.

Copy link
Collaborator

@FGasper FGasper Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, consider making the event an Interface() in the log and using Msg() instead.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this only logs in tests. I added a comment.

}

csr.logger.Trace().Msgf("%s received a change event: %v", csr, changeEventBatch[eventsRead])
fmt.Printf("%d %d\n", changeEventBatch[eventsRead].ClusterTime.T, changeEventBatch[eventsRead].ClusterTime.I)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leftover?

csr.logger.Trace().Msgf("%s received a change event: %v", csr, changeEventBatch[eventsRead])
fmt.Printf("%d %d\n", changeEventBatch[eventsRead].ClusterTime.T, changeEventBatch[eventsRead].ClusterTime.I)

if changeEventBatch[eventsRead].ClusterTime != nil &&
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is just copied logic, but is there ever a case where we’d expect a nil cluster time?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there would be a nil cluster time, although the ClusterTime field has a BSON tag that omits empty.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like if a missing clusterTime ever happened that’d be something we should at least warn about, if not fail outright. But that’d be more apropos to a separate PR.

return errors.Wrap(err, "failed to handle change events")
}

csr.ChangeEventBatchChan <- changeEventBatch
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do the handling in a separate goroutine?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handling needs to call the verifier's method insertRecheckDocs. Now that we're separating the change stream read logic to its own struct, it seems to me that having a separate goroutine to handle the change events in the verifier makes the most sense.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, yeah it works around the circular dependency.

Ideally I’d us to solve that by breaking apart Verifier, but that may be a bigger change than the present “mini-project” warrants.

verifier.logger.Debug().Msg("Check: Change stream already running.")
} else {
verifier.logger.Debug().Msg("Change stream not running; starting change stream")
ceHandlerGroup := &errgroup.Group{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not errgroup.WithContext()?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, why are the event handler goroutines in an errgroup while the event reader goroutines aren’t?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, errgroup.WithContext() seems better. The only reason for having handler goroutines in an errgroup is that we want to wait for them in the CheckDriver function. The change stream readers are currently waited through channels.

type clusterType string

const (
srcReaderType clusterType = "source"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

“cluster type” seems relatively non-descript: “type” often means topology, for example.

Maybe whichCluster?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't think of a better name. I changed it to whichCluster.

verifier.logger,
verifier.srcClient,
)
srcFinalTs, err := GetNewClusterTime(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think GetNewClusterTime needs to be under the lock.

What benefit do we realize from changing this, though?

}
} else {
verifier.mux.Unlock()
// This has to happen outside the lock because the change stream
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 change streams … can you amend this comment accordingly?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// Dry run generation 0 to make sure change stream reader is started.
suite.Require().NoError(runner.AwaitGenerationEnd())

suite.Require().NoError(runner.StartNextGeneration())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the generation start after the inserts? It seems like that’d be a bit less race-prone.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it's more likely to race if inserts happen after generation start because StartNextGeneration isn't blocked on the next generation. I've reverted the changes for this test so that change events occur in the previous generation and tasks should appear in the next generation.

@tdq45gj tdq45gj requested a review from FGasper December 3, 2024 18:09
Copy link
Collaborator

@FGasper FGasper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % the last few small notes I’ve made.

Comment on lines 68 to 71
ChangeEventBatchChan chan []ParsedEvent
WritesOffTsChan chan primitive.Timestamp
ErrChan chan error
DoneChan chan struct{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Do these need to be exported?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically none of them need to be exported since they're all in the verifier package. Changed to private.

csr.logger.Trace().Msgf("%s received a change event: %v", csr, changeEventBatch[eventsRead])
fmt.Printf("%d %d\n", changeEventBatch[eventsRead].ClusterTime.T, changeEventBatch[eventsRead].ClusterTime.I)

if changeEventBatch[eventsRead].ClusterTime != nil &&
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like if a missing clusterTime ever happened that’d be something we should at least warn about, if not fail outright. But that’d be more apropos to a separate PR.

return errors.Wrap(err, "failed to handle change events")
}

csr.ChangeEventBatchChan <- changeEventBatch
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, yeah it works around the circular dependency.

Ideally I’d us to solve that by breaking apart Verifier, but that may be a bigger change than the present “mini-project” warrants.

verifier.logger,
verifier.srcClient,
)
srcFinalTs, err := GetNewClusterTime(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that this is fairly brittle stuff. We’ve made it less so, but altering concurrency-related code seems like a risk we’re better off avoiding.

I can’t see anything wrong with your reasoning, but this sort of stuff is prone to “surprises”. Will this palpably improve speed? If so, then I’m OK with it. If not, IMO we should retain the present logic.

@tdq45gj tdq45gj merged commit ad69b4d into mongodb-labs:main Dec 3, 2024
49 checks passed
FGasper added a commit to FGasper/migration-verifier that referenced this pull request Dec 4, 2024
FGasper added a commit to FGasper/migration-verifier that referenced this pull request Dec 4, 2024
This fixes an oversight from PR mongodb-labs#53. It also “upgrades” the test
for the change stream filter to focus on behavior rather than
implementation.
FGasper added a commit that referenced this pull request Dec 4, 2024
This fixes an oversight from PR #53. It also “upgrades” the test
for the change stream filter to focus on behavior rather than
implementation.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants