-
Notifications
You must be signed in to change notification settings - Fork 16
REP-5358 Fix hangs when reading change stream channels. #77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
tdq45gj
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me in general. I wonder if there's any point calling Get before Eventual is ready.
internal/util/eventual.go
Outdated
|
|
||
| // Get returns an option that contains the Eventual’s value, or | ||
| // empty if the value isn’t ready yet. | ||
| func (e *Eventual[T]) Get() option.Option[T] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any case that we want to call Get before it's ready? Looks like we're calling Get().MustGet() everywhere, can we make Get return T or fail if it's not ready?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about that. I was/am a bit averse to proliferating panics, but you’re right that there’s little point in calling Get() before the value is ready. (It can be checked anyway by doing a select on Ready() with a default.)
I’ll change it.
| sess, err := suite.srcMongoClient.StartSession() | ||
| suite.Require().NoError(err) | ||
| sctx := mongo.NewSessionContext(ctx, sess) | ||
| _, err = suite.srcMongoClient.Database("testDb").Collection("testColl").InsertOne( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use suite.DBNameForTest()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
tdq45gj
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This fixes hangs from the change stream and document reader when contexts are canceled.
StartChangeEventHandler doesn’t need to send to the error channel. It now just returns its error.
The other channels—writes-off, error, and done—now allow infinite (non-blocking) reads once populated. This works via a new struct, Eventual, that supplies the same semantics as Context’s Done() and Err() methods. The change stream struct’s writes-off and error channels are written to be Eventual. The doneChan remains a channel but now is just closed rather than written to.
This also refactors a couple functions for general reuse and fixes TestStartAtTimeNoChanges, which has flapped occasionally.