Skip to content

Conversation

@tdq45gj
Copy link
Collaborator

@tdq45gj tdq45gj commented Nov 7, 2024

This makes the change stream reader:

  1. reads a change event until RemainingBatchSize == 0
  2. buffer all events of a batch in memory
  3. batch insert recheck docs

It also fixes the bug that the eventRecorder counts each event twice.

@tdq45gj tdq45gj requested review from FGasper and autarch November 7, 2024 21:11
Copy link
Collaborator

@FGasper FGasper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally this looks good, but I wonder if we can’t simplify it some.

Also the dupe-key handling seems like a point of potential concern. How necessary is that?

// HandleChangeStreamEvent performs the necessary work for change stream events that occur during
// operation.
func (verifier *Verifier) HandleChangeStreamEvent(ctx context.Context, changeEvent *ParsedEvent) error {
func (verifier *Verifier) HandleChangeStreamEvent(changeEvent *ParsedEvent) error {
Copy link
Collaborator

@FGasper FGasper Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, what was the advantage of doing the buffering here versus closer to the read from the cursor? I was thinking we’d read all the events into a slice then pass that slice to this function (renamed HandleChangeStreamEvents).

We wouldn’t need separate handle-vs-flush methods in that case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I made the changes.

}

eventsRead++
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be error handling for cs.Err() here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to the if statement above.


// StartChangeStream starts the change stream.
func (verifier *Verifier) StartChangeStream(ctx context.Context) error {
func (verifier *Verifier) StartChangeStream(ctx context.Context, batchSize *int32) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be a setting? Aren’t we just taking whatever batch size we get from the source?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this for a test to pass in a small batch size.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was that necessary, though?

getMore always returns after a relatively short period of inactivity with just whatever events it happens to have seen. So the batch size should be naturally limited to whichever events you’ve triggered … ?

}

dbName, collName := SplitNamespace(namespace)
if err := verifier.insertRecheckDocs(ctx, dbName, collName, ids, dataSizes); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach will cause us to block between each namespace’s events, I think.

Could we instead persist all of the events for all namespaces at once?

}

// Silence any duplicate key errors as recheck docs should have existed.
if mongo.IsDuplicateKeyError(err) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dupe key errors can exist alongside other errors.

We should probably instead look for non-simple dupe-key errors so that anything that isn’t a dupe-key will still be handled.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that there shouldn't be any duplicate key error here because it's actually replace rather than insert.


pprofInterval time.Duration

changeEventRecheckBuf ChangeEventRecheckBuffer
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: We shouldn’t need this if we switch to reading all events from the cursor into a slice.

"the verifier should flush a recheck doc after a batch",
)
suite.Require().Empty(verifier.changeEventRecheckBuf["testDB.testColl1"])
suite.Require().Empty(verifier.changeEventRecheckBuf["testDB.testColl2"])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid testing verifier internals here? It’d be ideal not to depend on them.

@tdq45gj tdq45gj requested a review from FGasper November 8, 2024 20:29
Copy link
Collaborator

@autarch autarch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % some small stuff. No need for another review.

func (verifier *Verifier) insertRecheckDocsWhileLocked(
ctx context.Context,
dbName, collName string, documentIDs []interface{}, dataSizes []int) error {
dbNames []string, collNames []string, documentIDs []interface{}, dataSizes []int) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This is a very weird way to format this. I think it'd more readable as one arg per line.

Comment on lines 93 to 96
verifier.mux.Lock()
defer verifier.mux.Unlock()

return verifier.insertRecheckDocsWhileLocked(ctx, dbNames, collNames, docIDs, dataSizes)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd make more sense to put the locking in the insertRecheckDocsWhileLocked method. Right now every caller has to remember to use the mutex properly, which seems like a recipe for mistakes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it this way just this week to accommodate stats. If we’re going to change this I’d rather it’d be in a separate PR since the changes don’t really seem germane.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed the change event stats from this function in this PR. I think it makes sense to make this change here as well.

}

if eventsRead > 0 {
verifier.logger.Debug().Msgf("Received a batch of %d events", eventsRead)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you also want to log the 0 events case? I don't have enough context to know if that would be useful.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer not to log if there's no event. We've recommended users to run the m-v with debug log level, so I don't want too many log lines even at debug level.

}

if eventsRead > 0 {
verifier.logger.Debug().Msgf("Received a batch of %d events", eventsRead)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
verifier.logger.Debug().Msgf("Received a batch of %d events", eventsRead)
verifier.logger.Debug().Int("eventsCount", eventsRead).Msgf("Received a batch of events.")

verifier.logger.Debug().Msgf("Received a batch of %d events", eventsRead)
}

err := verifier.HandleChangeStreamEvents(ctx, changeEventBatch)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this handling logic also be under the >0 condition?

@tdq45gj tdq45gj requested a review from FGasper November 11, 2024 14:12
}

return gotEvent, errors.Wrap(cs.Err(), "change stream iteration failed")
return eventsRead > 0, errors.Wrap(cs.Err(), "change stream iteration failed")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems strange to defer handling cs.Err() until after HandleChangeStreamEvents() is called. It’d look a bit more idiomatic if the check were closer to the TryNext.

dbNames []string,
collNames []string,
documentIDs []interface{},
dataSizes []int,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If we’re doing a separate slice for each category it’d seem a bit cleaner if there were 1 slice with a struct that contains these data points.

Copy link
Collaborator

@FGasper FGasper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks!

suite.Require().Equal(VerificationStatus{TotalTasks: 1, FailedTasks: 1}, *status)

checkContinueChan <- struct{}{}
require.NoError(suite.T(), errGroup.Wait())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice find!

@tdq45gj tdq45gj merged commit 13e2a62 into mongodb-labs:main Nov 12, 2024
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants