-
Notifications
You must be signed in to change notification settings - Fork 202
concurrency-safe vertex iterators for levelled forest #8202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
AlexHentschel
wants to merge
8
commits into
master
Choose a base branch
from
alex/LevelledForest_concurrency-safe-iterator
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 2 commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
0ee39f6
first complete version of concurrency-safe vertex iterators for level…
AlexHentschel e011e57
linted code
AlexHentschel 462124e
Apply suggestions from code review
AlexHentschel e4fe8d5
added minor emphasis in goDoc
AlexHentschel a7a28e5
Merge branch 'alex/LevelledForest_concurrency-safe-iterator' of githu…
AlexHentschel 3eb367b
linting
AlexHentschel e240045
Merge branch 'master' into alex/LevelledForest_concurrency-safe-iterator
AlexHentschel 58f56de
linting :-/
AlexHentschel File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| package forest | ||
|
|
||
| import ( | ||
| "sync" | ||
| ) | ||
|
|
||
| /* ATTENTION: LevelledForest and its derived objects, such as the VertexIterator, are NOT Concurrency Safe. The | ||
| * LevelledForest is a low-level library geared for performance. As locking is not needed in some application | ||
| * scenarios (most notably the consensus EventHandler, which by design is single-threaded), concurrency handling | ||
| * is delegated to the higher-level business logic using the LevelledForest. | ||
| * | ||
| * Here, we provide helper structs for higher-level business logic, to simplify their concurrency handling. | ||
| */ | ||
|
|
||
| // VertexIteratorConcurrencySafe wraps the Vertex Iterator to make it concurrency safe. Effectively, | ||
| // the behaviour is like iterating on a snapshot at the time of iterator construction. | ||
| // Under concurrent recalls, the iterator delivers each item once across all concurrent callers. | ||
| // Items are delivered in order and `NextVertex` establishes a 'synchronized before' relation as | ||
| // defined in the go memory model https://go.dev/ref/mem. | ||
| type VertexIteratorConcurrencySafe struct { | ||
| unsafeIter VertexIterator | ||
| mu sync.RWMutex | ||
| } | ||
|
|
||
| func NewVertexIteratorConcurrencySafe(iter VertexIterator) *VertexIteratorConcurrencySafe { | ||
| return &VertexIteratorConcurrencySafe{unsafeIter: iter} | ||
| } | ||
|
|
||
| // NextVertex returns the next Vertex or nil if there is none. A caller receiving a non-nil value | ||
| // are 'synchronized before' (see https://go.dev/ref/mem) the receiver of the subsequent non-nil | ||
| // value. NextVertex() delivers each item once, following a fully sequential deterministic order, | ||
| // with results being distributed in order across all competing threads. | ||
| func (i *VertexIteratorConcurrencySafe) NextVertex() Vertex { | ||
| i.mu.Lock() // must acquire write lock here, as wrapped `VertexIterator` changes its internal state | ||
| defer i.mu.Unlock() | ||
| return i.unsafeIter.NextVertex() | ||
| } | ||
|
|
||
| // HasNext returns true if and only if there is a next Vertex | ||
| func (i *VertexIteratorConcurrencySafe) HasNext() bool { | ||
| i.mu.RLock() | ||
| defer i.mu.RUnlock() | ||
| return i.unsafeIter.HasNext() | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,259 @@ | ||
| package forest | ||
|
|
||
| import ( | ||
| "fmt" | ||
| "math/rand" | ||
| "sync" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/stretchr/testify/assert" | ||
|
|
||
| "github.com/onflow/flow-go/model/flow" | ||
| "github.com/onflow/flow-go/utils/unittest" | ||
| ) | ||
|
|
||
| // Test_SlicePrimitives demonstrates that we can use slices, including `VertexList` | ||
| // as concurrency-safe snapshots. | ||
| func Test_SlicePrimitives(t *testing.T) { | ||
| // Conceptually, we always proceed along the following pattern: | ||
| // • We assume there is a LevelledForest instance, protected for concurrent access by higher-level | ||
| // business logic (not represented in this test). | ||
| // • The higher-level business logic instantiates a `VertexIterator` (not represented in this test) by calling | ||
| // `GetChildren` or `GetVerticesAtLevel` for example. Under the hood, the `VertexIterator` receives a `VertexList` | ||
| // as it's sole input. The slice `VertexList` golang internally represents as the tripel | ||
| // [pointer to array, slice length, slice capacity] (see https://go.dev/blog/slices-intro for details). The slice | ||
| // is passed by value, i.e. `VertexIterator` maintains its own copy of these values. | ||
| // • Here, we emulate interleaving writes by the forest to the shared slice `VertexList`. | ||
|
|
||
| v := NewVertexMock("v", 3, "C", 2) | ||
| vContainer := &vertexContainer{id: unittest.IdentifierFixture(), level: 3, vertex: v} | ||
|
|
||
| t.Run("nil slice", func(t *testing.T) { | ||
| // Prepare vertex list that, representing the slice of children held by the | ||
| var vertexList VertexList // nil zero value | ||
|
|
||
| // vertex iterator maintains a snapshot of a nil slice | ||
| iterator := newVertexIterator(vertexList) | ||
|
|
||
| // Emulating concurrent access, where new data is added by the forest: | ||
| // we expect that vertexList was expanded, but the iterator's notion should be unchanged | ||
| vertexList = append(vertexList, vContainer) | ||
| assert.Nil(t, iterator.data) | ||
| assert.Equal(t, len(vertexList), len(iterator.data)+1) | ||
| }) | ||
|
|
||
| t.Run("empty slice of zero capacity", func(t *testing.T) { | ||
| // Prepare vertex list that, representing the slice of children held by the | ||
| var vertexList VertexList = []*vertexContainer{} | ||
|
|
||
| // vertex iterator maintains a snapshot of the non-nil slice, with zero capacity | ||
| iterator := newVertexIterator(vertexList) | ||
|
|
||
| // Emulating concurrent access, where new data is added by the forest: | ||
| // we expect that vertexList was expanded, but the iterator's notion should be unchanged | ||
| vertexList = append(vertexList, vContainer) | ||
| assert.NotNil(t, iterator.data) | ||
| assert.Zero(t, len(iterator.data)) | ||
| assert.Equal(t, len(vertexList), len(iterator.data)+1) | ||
| }) | ||
|
|
||
| t.Run("empty slice of with capacity 2 (len = 0, cap = 2)", func(t *testing.T) { | ||
| // Prepare vertex list that, representing the slice of children held by the | ||
| var vertexList VertexList = make(VertexList, 0, 2) | ||
|
|
||
| // vertex iterator maintains a snapshot of a slice with length zero but capacity 2 | ||
| iterator := newVertexIterator(vertexList) | ||
|
|
||
| // Emulating concurrent access, where new data is added by the forest: | ||
| // we expect that vertexList was expanded, but the iterator's notion should be unchanged | ||
| vertexList = append(vertexList, vContainer) | ||
| assert.NotNil(t, iterator.data) | ||
| assert.Zero(t, len(iterator.data)) | ||
| assert.Equal(t, 2, cap(iterator.data)) | ||
| assert.Equal(t, len(vertexList), len(iterator.data)+1) | ||
| }) | ||
|
|
||
| t.Run("non-empty slice with larger capacity (len = 1, cap = 2)", func(t *testing.T) { | ||
| // Prepare vertex list that, representing the slice of children held by the | ||
| var vertexList VertexList = make(VertexList, 1, 2) | ||
| _v := NewVertexMock("v", 3, "C", 2) | ||
| vertexList[0] = &vertexContainer{id: unittest.IdentifierFixture(), level: 3, vertex: _v} | ||
|
|
||
| // vertex iterator maintains a snapshot of a slice with length 1 but capacity 2 | ||
| iterator := newVertexIterator(vertexList) | ||
|
|
||
| // Emulating concurrent access, where new data is added by the forest: | ||
| // we expect that vertexList was expanded, but the iterator's notion should be unchanged | ||
| vertexList = append(vertexList, vContainer) | ||
| assert.NotNil(t, iterator.data) | ||
| assert.Equal(t, 1, len(iterator.data)) | ||
| assert.Equal(t, 2, cap(iterator.data)) | ||
| assert.Equal(t, len(vertexList), len(iterator.data)+1) | ||
| }) | ||
|
|
||
| t.Run(fmt.Sprintf("fully filled non-empty slice (len = 10, cap = 10)"), func(t *testing.T) { | ||
| // Prepare vertex list that, representing the slice of children held by the | ||
| //nolint:S1019 | ||
| var vertexList VertexList = make(VertexList, 10, 10) // we want to explicitly state the capacity here for clarity | ||
| for i := 0; i < cap(vertexList); i++ { | ||
| _v := NewVertexMock(fmt.Sprintf("v%d", i), 3, "C", 2) | ||
| vertexList[i] = &vertexContainer{id: unittest.IdentifierFixture(), level: 3, vertex: _v} | ||
| } | ||
|
|
||
| // vertex iterator maintains a snapshot of the slice, where it is filled with 10 elements | ||
| iterator := newVertexIterator(vertexList) | ||
|
|
||
| // Emulating concurrent access, where new data is added by the forest | ||
| vertexList = append(vertexList, vContainer) | ||
|
|
||
| // we expect that vertexList was expanded, but the iterator's notion should be unchanged | ||
| assert.NotNil(t, iterator.data) | ||
| assert.Equal(t, 10, len(iterator.data)) | ||
| assert.Equal(t, 10, cap(iterator.data)) | ||
| assert.Equal(t, len(vertexList), len(iterator.data)+1) | ||
| }) | ||
| } | ||
|
|
||
| // Test_VertexIteratorConcurrencySafe verifies concurrent iteration | ||
| // We start with a forest (populated by `populateNewForest`) containing the following vertices: | ||
| // | ||
| // ↙-- [A] | ||
| // ··-[C] ←-- [D] | ||
| // | ||
| // Then vertices v0, v1, v2, etc are added concurrently here in the test | ||
| // | ||
| // ↙-- [A] | ||
| // ··-[C] ←-- [D] | ||
| // ↖-- [v0] | ||
| // ↖-- [v1] | ||
| // ⋮ | ||
| // | ||
| // Before each addition, we create a vertex operator. Wile more and more vertices are added | ||
| // the constructed VertexIterators are checked to confirm they are unaffected, like they | ||
| // are operating on a snapshot taken at the time of their construction. | ||
| func Test_VertexIteratorConcurrencySafe(t *testing.T) { | ||
| forest := newConcurrencySafeForestWrapper(populateNewForest(t)) | ||
|
|
||
| start := make(chan struct{}) | ||
| done1, done2 := make(chan struct{}), make(chan struct{}) | ||
|
|
||
| go func() { // Go Routine 1 | ||
| <-start | ||
| for i := 0; i < 1000; i++ { | ||
| // add additional child vertex of [C] | ||
| var v Vertex = NewVertexMock(fmt.Sprintf("v%03d", i), 3, "C", 2) | ||
| err := forest.VerifyAndAddVertex(&v) | ||
| assert.NoError(t, err) | ||
| time.Sleep(500 * time.Microsecond) // sleep 0.5ms -> in total 0.5s | ||
| } | ||
| close(done1) | ||
| }() | ||
|
|
||
| go func() { // Go Routine 2 | ||
| <-start | ||
| var vertexIteratorCheckers []*vertexIteratorChecker | ||
|
|
||
| for { | ||
| select { | ||
| case <-done1: | ||
| close(done2) | ||
| return | ||
| default: // fallthrough | ||
| } | ||
|
|
||
| // the other thread is concurrently adding [C]. At all times, there should be at least | ||
| iteratorChecker := forest.GetChildren(TestVertices["C"].VertexID()) | ||
| vertexIteratorCheckers = append(vertexIteratorCheckers, iteratorChecker) | ||
| for _, checker := range vertexIteratorCheckers { | ||
| checker.Check(t) | ||
| } | ||
| // sleep randomly up to 2ms, average 1ms, so we create only about half as much | ||
| // iterators as new vertices are added. | ||
| time.Sleep(time.Duration(rand.Intn(2000)) * time.Microsecond) | ||
| } | ||
| }() | ||
|
|
||
| // start, and then wait for all go routines to finish. Routine 1 finishes after it added 1000 | ||
| // new vertices [v000], [v001], [v999] to the forest. Routine 2 will run until routine 1 has | ||
| // finished. While routine 2 is running, it verifies that vertex additions to the forests | ||
| // leve the iterators unchanged. | ||
| close(start) | ||
|
|
||
| // Wait up to 2 seconds, checking every 100 milliseconds | ||
| bothDone := func() bool { | ||
| select { | ||
| case <-done1: | ||
| select { | ||
| case <-done2: | ||
| return true | ||
| default: | ||
| return false | ||
| } | ||
| default: | ||
| return false | ||
| } | ||
| } | ||
| assert.Eventually(t, bothDone, 2*time.Second, 100*time.Millisecond, "Condition never became true") | ||
|
|
||
| } | ||
|
|
||
| // For testing only! | ||
| type concurrencySafeForestWrapper struct { | ||
| forest *LevelledForest | ||
| mu sync.RWMutex | ||
| } | ||
|
|
||
| func newConcurrencySafeForestWrapper(f *LevelledForest) *concurrencySafeForestWrapper { | ||
| return &concurrencySafeForestWrapper{forest: f} | ||
| } | ||
|
|
||
| func (w *concurrencySafeForestWrapper) VerifyAndAddVertex(vertex *Vertex) error { | ||
| w.mu.Lock() | ||
| defer w.mu.Unlock() | ||
| err := w.forest.VerifyVertex(*vertex) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| w.forest.AddVertex(*vertex) | ||
| return nil | ||
| } | ||
|
|
||
| // GetChildren returns an iterator the children of the specified vertex. | ||
| func (w *concurrencySafeForestWrapper) GetChildren(id flow.Identifier) *vertexIteratorChecker { | ||
| w.mu.RLock() | ||
| defer w.mu.RUnlock() | ||
|
|
||
| // creating non-concurrency safe iterator and memorizing its snapshot information for later testing | ||
| unsafeIter := w.forest.GetChildren(id) | ||
| numberChildren := w.forest.GetNumberOfChildren(id) | ||
| sliceCapacity := cap(unsafeIter.data) | ||
|
|
||
| // create wapper `VertexIteratorConcurrencySafe` and a check for verifying it | ||
| safeIter := NewVertexIteratorConcurrencySafe(unsafeIter) | ||
| return newVertexIteratorChecker(safeIter, numberChildren, sliceCapacity) | ||
| } | ||
|
|
||
| // For testing only! | ||
| type vertexIteratorChecker struct { | ||
| safeIterator *VertexIteratorConcurrencySafe | ||
| expectedLength int | ||
| expectedCapacity int | ||
| } | ||
|
|
||
| func newVertexIteratorChecker(iter *VertexIteratorConcurrencySafe, expectedLength int, expectedCapacity int) *vertexIteratorChecker { | ||
| return &vertexIteratorChecker{ | ||
| safeIterator: iter, | ||
| expectedLength: expectedLength, | ||
| expectedCapacity: expectedCapacity, | ||
| } | ||
| } | ||
|
|
||
| func (c *vertexIteratorChecker) Check(t *testing.T) { | ||
| // We are directly accessing the slice here backing the unsafe iterator without any concurrency | ||
| // protection. This is expected to be fine, because the `data` slice is append only. | ||
| unsafeIter := c.safeIterator.unsafeIter | ||
| assert.NotNil(t, unsafeIter.data) | ||
| assert.Equal(t, c.expectedLength, len(unsafeIter.data)) | ||
| assert.Equal(t, c.expectedCapacity, cap(unsafeIter.data)) | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.