-
Notifications
You must be signed in to change notification settings - Fork 16
REP-5358 Fix hangs when reading change stream channels. #77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 25 commits
Commits
Show all changes
33 commits
Select commit
Hold shift + click to select a range
90f7502
recreate
FGasper aea6160
revamp retryer
FGasper 9c0cf51
note success too
FGasper 5dfafab
real clones
FGasper 1652778
a bit more debugging
FGasper b8ed20d
wording tweaks, and a return that was missing
FGasper ac170b4
s/failed/discrepancies
FGasper e424812
Merge branch 'felipe_log_lag_redux' into REP-5358-add-retry-description
FGasper 5f9733c
fix/beef-up
FGasper 90aa035
add more detail to retries
FGasper 6bc2e10
tweak
FGasper 7dda460
Convert change stream err and writesOffTs channels to Eventuals.
FGasper 78c609f
add test for Eventual
FGasper 9c17faf
trace for test that flapped
FGasper fa63ff7
godoc
FGasper ab10df3
Merge branch 'main' into REP-5358-retry-and-channels
FGasper 668e194
Merge branch 'main' into REP-5358-add-retry-description
FGasper 50708b7
Merge branch 'REP-5358-add-retry-description' into REP-5358-retry-and…
FGasper 28b8525
fix case where context closes docs reader
FGasper 490b2d6
add test
FGasper c199009
revert retryer changes
FGasper b2a8c6d
Merge branch 'main' into REP-5358-retry-and-channels
FGasper bab60a5
docs tweak
FGasper 250a659
add test
FGasper be2638e
change stream test
FGasper 41229d1
Fix flapping TestStartAtTimeNoChanges.
FGasper b4dad36
cluster_time.go
FGasper e602c9e
check err
FGasper 3b2ff9f
only log once
FGasper d11cf4a
pruning
FGasper 42376f5
Make Eventual Get() panic if the value is unset.
FGasper 5b5e871
use DBNameForTest
FGasper f26d63d
beef up test description … why flapping?
FGasper File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| package util | ||
|
|
||
| import ( | ||
| "sync" | ||
|
|
||
| "github.com/10gen/migration-verifier/option" | ||
| ) | ||
|
|
||
| // Eventual solves the “one writer, many readers” problem: a value gets | ||
| // written once, then the readers will see that the value is `Ready()` and | ||
| // can then `Get()` it. | ||
| // | ||
| // It’s like how `context.Context`’s `Done()` and `Err()` methods work, but | ||
| // generalized to any data type. | ||
| type Eventual[T any] struct { | ||
| ready chan struct{} | ||
| val option.Option[T] | ||
| mux sync.RWMutex | ||
| } | ||
|
|
||
| // NewEventual creates an Eventual and returns a pointer | ||
| // to it. | ||
| func NewEventual[T any]() *Eventual[T] { | ||
| return &Eventual[T]{ | ||
| ready: make(chan struct{}), | ||
| } | ||
| } | ||
|
|
||
| // Ready returns a channel that closes once the Eventual’s value is ready. | ||
| func (e *Eventual[T]) Ready() <-chan struct{} { | ||
| return e.ready | ||
| } | ||
|
|
||
| // Get returns an option that contains the Eventual’s value, or | ||
| // empty if the value isn’t ready yet. | ||
| func (e *Eventual[T]) Get() option.Option[T] { | ||
| e.mux.RLock() | ||
| defer e.mux.RUnlock() | ||
|
|
||
| return e.val | ||
| } | ||
|
|
||
| // Set sets the Eventual’s value. It may be called only once; | ||
| // if called again it will panic. | ||
| func (e *Eventual[T]) Set(val T) { | ||
| e.mux.Lock() | ||
| defer e.mux.Unlock() | ||
|
|
||
| if e.val.IsSome() { | ||
| panic("Tried to set an eventual twice!") | ||
| } | ||
|
|
||
| // NB: This *must* happen before the close(), or else a fast reader may | ||
| // not see this value. | ||
| e.val = option.Some(val) | ||
|
|
||
| close(e.ready) | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| package util | ||
|
|
||
| import ( | ||
| "time" | ||
|
|
||
| "github.com/10gen/migration-verifier/option" | ||
| ) | ||
|
|
||
| func (s *UnitTestSuite) TestEventual() { | ||
| eventual := NewEventual[int]() | ||
|
|
||
| s.Assert().Equal( | ||
| option.None[int](), | ||
| eventual.Get(), | ||
| "Get() should return empty", | ||
| ) | ||
|
|
||
| select { | ||
| case <-eventual.Ready(): | ||
| s.Require().Fail("should not be ready") | ||
| case <-time.NewTimer(time.Second).C: | ||
| } | ||
|
|
||
| eventual.Set(123) | ||
|
|
||
| select { | ||
| case <-eventual.Ready(): | ||
| case <-time.NewTimer(time.Second).C: | ||
| s.Require().Fail("should be ready") | ||
| } | ||
|
|
||
| s.Assert().Equal( | ||
| option.Some(123), | ||
| eventual.Get(), | ||
| "Get() should return the value", | ||
| ) | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any case that we want to call
Getbefore it's ready? Looks like we're callingGet().MustGet()everywhere, can we makeGetreturnTor fail if it's not ready?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about that. I was/am a bit averse to proliferating panics, but you’re right that there’s little point in calling Get() before the value is ready. (It can be checked anyway by doing a select on Ready() with a default.)
I’ll change it.