Skip to content

Commit 0ee39f6

Browse files
committed
first complete version of concurrency-safe vertex iterators for levelled forest (incl. detailed documentation and test coverage).
1 parent 542eea6 commit 0ee39f6

File tree

6 files changed

+382
-20
lines changed

6 files changed

+382
-20
lines changed
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package forest
2+
3+
import (
4+
"sync"
5+
)
6+
7+
/* ATTENTION: LevelledForest and its derived objects, such as the VertexIterator, are NOT Concurrency Safe. The
8+
* LevelledForest is a low-level library geared for performance. As locking is not needed in some application
9+
* scenarios (most notably the consensus EventHandler, which by design is single-threaded), concurrency handling
10+
* is delegated to the higher-level business logic using the LevelledForest.
11+
*
12+
* Here, we provide helper structs for higher-level business logic, to simplify their concurrency handling.
13+
*/
14+
15+
// VertexIteratorConcurrencySafe wraps the Vertex Iterator to make it concurrency safe. Effectively,
16+
// the behaviour is like iterating on a snapshot at the time of iterator construction.
17+
// Under concurrent recalls, the iterator delivers each item once across all concurrent callers.
18+
// Items are delivered in order and `NextVertex` establishes a 'synchronized before' relation as
19+
// defined in the go memory model https://go.dev/ref/mem.
20+
type VertexIteratorConcurrencySafe struct {
21+
unsafeIter VertexIterator
22+
mu sync.RWMutex
23+
}
24+
25+
func NewVertexIteratorConcurrencySafe(iter VertexIterator) *VertexIteratorConcurrencySafe {
26+
return &VertexIteratorConcurrencySafe{unsafeIter: iter}
27+
}
28+
29+
// NextVertex returns the next Vertex or nil if there is none. A caller receiving a non-nil value
30+
// are 'synchronized before' (see https://go.dev/ref/mem) the receiver of the subsequent non-nil
31+
// value. NextVertex() delivers each item once, following a fully sequential deterministic order,
32+
// with results being distributed in order across all competing threads.
33+
func (i *VertexIteratorConcurrencySafe) NextVertex() Vertex {
34+
i.mu.Lock() // must acquire write lock here, as wrapped `VertexIterator` changes its internal state
35+
defer i.mu.Unlock()
36+
return i.unsafeIter.NextVertex()
37+
}
38+
39+
// HasNext returns true if and only if there is a next Vertex
40+
func (i *VertexIteratorConcurrencySafe) HasNext() bool {
41+
i.mu.RLock()
42+
defer i.mu.RUnlock()
43+
return i.unsafeIter.HasNext()
44+
}
Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
package forest
2+
3+
import (
4+
"fmt"
5+
"math/rand"
6+
"sync"
7+
"testing"
8+
"time"
9+
10+
"github.com/stretchr/testify/assert"
11+
12+
"github.com/onflow/flow-go/model/flow"
13+
"github.com/onflow/flow-go/utils/unittest"
14+
)
15+
16+
// Test_SlicePrimitives demonstrates that we can use slices, including `VertexList`
17+
// as concurrency-safe snapshots.
18+
func Test_SlicePrimitives(t *testing.T) {
19+
// Conceptually, we always proceed along the following pattern:
20+
// • We assume there is a LevelledForest instance, protected for concurrent access by higher-level
21+
// business logic (not represented in this test).
22+
// • The higher-level business logic instantiates a `VertexIterator` (not represented in this test) by calling
23+
// `GetChildren` or `GetVerticesAtLevel` for example. Under the hood, the `VertexIterator` receives a `VertexList`
24+
// as it's sole input. The slice `VertexList` golang internally represents as the tripel
25+
// [pointer to array, slice length, slice capacity] (see https://go.dev/blog/slices-intro for details). The slice
26+
// is passed by value, i.e. `VertexIterator` maintains its own copy of these values.
27+
// • Here, we emulate interleaving writes by the forest to the shared slice `VertexList`.
28+
29+
v := NewVertexMock("v", 3, "C", 2)
30+
vContainer := &vertexContainer{id: unittest.IdentifierFixture(), level: 3, vertex: v}
31+
32+
t.Run(fmt.Sprintf("nil slice"), func(t *testing.T) {
33+
// Prepare vertex list that, representing the slice of children held by the
34+
var vertexList VertexList // nil zero value
35+
36+
// vertex iterator maintains a snapshot of a nil slice
37+
iterator := newVertexIterator(vertexList)
38+
39+
// Emulating concurrent access, where new data is added by the forest:
40+
// we expect that vertexList was expanded, but the iterator's notion should be unchanged
41+
vertexList = append(vertexList, vContainer)
42+
assert.Nil(t, iterator.data)
43+
assert.Equal(t, len(vertexList), len(iterator.data)+1)
44+
})
45+
46+
t.Run(fmt.Sprintf("empty slice of zero capacity"), func(t *testing.T) {
47+
// Prepare vertex list that, representing the slice of children held by the
48+
var vertexList VertexList = []*vertexContainer{}
49+
50+
// vertex iterator maintains a snapshot of the non-nil slice, with zero capacity
51+
iterator := newVertexIterator(vertexList)
52+
53+
// Emulating concurrent access, where new data is added by the forest:
54+
// we expect that vertexList was expanded, but the iterator's notion should be unchanged
55+
vertexList = append(vertexList, vContainer)
56+
assert.NotNil(t, iterator.data)
57+
assert.Zero(t, len(iterator.data))
58+
assert.Equal(t, len(vertexList), len(iterator.data)+1)
59+
})
60+
61+
t.Run(fmt.Sprintf("empty slice of zero capacity (len = 0, cap = 2)"), func(t *testing.T) {
62+
// Prepare vertex list that, representing the slice of children held by the
63+
var vertexList VertexList = make(VertexList, 0, 2)
64+
65+
// vertex iterator maintains a snapshot of a slice with length zero but capacity 2
66+
iterator := newVertexIterator(vertexList)
67+
68+
// Emulating concurrent access, where new data is added by the forest:
69+
// we expect that vertexList was expanded, but the iterator's notion should be unchanged
70+
vertexList = append(vertexList, vContainer)
71+
assert.NotNil(t, iterator.data)
72+
assert.Zero(t, len(iterator.data))
73+
assert.Equal(t, 2, cap(iterator.data))
74+
assert.Equal(t, len(vertexList), len(iterator.data)+1)
75+
})
76+
77+
t.Run(fmt.Sprintf("empty slice of zero capacity (len = 1, cap = 2)"), func(t *testing.T) {
78+
// Prepare vertex list that, representing the slice of children held by the
79+
var vertexList VertexList = make(VertexList, 1, 2)
80+
_v := NewVertexMock("v", 3, "C", 2)
81+
vertexList[0] = &vertexContainer{id: unittest.IdentifierFixture(), level: 3, vertex: _v}
82+
83+
// vertex iterator maintains a snapshot of a slice with length 1 but capacity 2
84+
iterator := newVertexIterator(vertexList)
85+
86+
// Emulating concurrent access, where new data is added by the forest:
87+
// we expect that vertexList was expanded, but the iterator's notion should be unchanged
88+
vertexList = append(vertexList, vContainer)
89+
assert.NotNil(t, iterator.data)
90+
assert.Equal(t, 1, len(iterator.data))
91+
assert.Equal(t, 2, cap(iterator.data))
92+
assert.Equal(t, len(vertexList), len(iterator.data)+1)
93+
})
94+
95+
t.Run(fmt.Sprintf("empty slice of zero capacity (len = 10, cap = 10)"), func(t *testing.T) {
96+
// Prepare vertex list that, representing the slice of children held by the
97+
var vertexList VertexList = make(VertexList, 10, 10)
98+
for i := 0; i < cap(vertexList); i++ {
99+
_v := NewVertexMock(fmt.Sprintf("v%d", i), 3, "C", 2)
100+
vertexList[i] = &vertexContainer{id: unittest.IdentifierFixture(), level: 3, vertex: _v}
101+
}
102+
103+
// vertex iterator maintains a snapshot of the slice, where it is filled with 10 elements
104+
iterator := newVertexIterator(vertexList)
105+
106+
// Emulating concurrent access, where new data is added by the forest
107+
vertexList = append(vertexList, vContainer)
108+
109+
// we expect that vertexList was expanded, but the iterator's notion should be unchanged
110+
assert.NotNil(t, iterator.data)
111+
assert.Equal(t, 10, len(iterator.data))
112+
assert.Equal(t, 10, cap(iterator.data))
113+
assert.Equal(t, len(vertexList), len(iterator.data)+1)
114+
})
115+
}
116+
117+
// Test_VertexIteratorConcurrencySafe verifies concurrent iteration
118+
// We start with a forest (populated by `populateNewForest`) containing the following vertices:
119+
//
120+
// ↙-- [A]
121+
// ··-[C] ←-- [D]
122+
//
123+
// Then vertices v0, v1, v2, etc are added concurrently here in the test
124+
//
125+
// ↙-- [A]
126+
// ··-[C] ←-- [D]
127+
// ↖-- [v0]
128+
// ↖-- [v1]
129+
// ⋮
130+
//
131+
// Before each addition, we create a vertex operator. Wile more and more vertices are added
132+
// the constructed VertexIterators are checked to confirm they are unaffected, like they
133+
// are operating on a snapshot taken at the time of their construction.
134+
func Test_VertexIteratorConcurrencySafe(t *testing.T) {
135+
forest := newConcurrencySafeForestWrapper(populateNewForest(t))
136+
137+
start := make(chan struct{})
138+
done1, done2 := make(chan struct{}), make(chan struct{})
139+
140+
go func() { // Go Routine 1
141+
<-start
142+
for i := 0; i < 1000; i++ {
143+
// add additional child vertex of [C]
144+
var v Vertex = NewVertexMock(fmt.Sprintf("v%03d", i), 3, "C", 2)
145+
forest.VerifyAndAddVertex(&v)
146+
time.Sleep(500 * time.Microsecond) // sleep 0.5ms -> in total 0.5s
147+
}
148+
close(done1)
149+
}()
150+
151+
go func() { // Go Routine 2
152+
<-start
153+
var vertexIteratorCheckers []*vertexIteratorChecker
154+
155+
for {
156+
select {
157+
case <-done1:
158+
close(done2)
159+
return
160+
default: // fallthrough
161+
}
162+
163+
// the other thread is concurrently adding [C]. At all times, there should be at least
164+
iteratorChecker := forest.GetChildren(TestVertices["C"].VertexID())
165+
vertexIteratorCheckers = append(vertexIteratorCheckers, iteratorChecker)
166+
for _, checker := range vertexIteratorCheckers {
167+
checker.Check(t)
168+
}
169+
// sleep randomly up to 2ms, average 1ms, so we create only about half as much
170+
// iterators as new vertices are added.
171+
time.Sleep(time.Duration(rand.Intn(2000)) * time.Microsecond)
172+
}
173+
}()
174+
175+
// start, and then wait for all go routines to finish. Routine 1 finishes after it added 1000
176+
// new vertices [v000], [v001], [v999] to the forest. Routine 2 will run until routine 1 has
177+
// finished. While routine 2 is running, it verifies that vertex additions to the forests
178+
// leve the iterators unchanged.
179+
close(start)
180+
181+
// Wait up to 2 seconds, checking every 100 milliseconds
182+
bothDone := func() bool {
183+
select {
184+
case <-done1:
185+
select {
186+
case <-done2:
187+
return true
188+
default:
189+
return false
190+
}
191+
default:
192+
return false
193+
}
194+
}
195+
assert.Eventually(t, bothDone, 2*time.Second, 100*time.Millisecond, "Condition never became true")
196+
197+
}
198+
199+
// For testing only!
200+
type concurrencySafeForestWrapper struct {
201+
forest *LevelledForest
202+
mu sync.RWMutex
203+
}
204+
205+
func newConcurrencySafeForestWrapper(f *LevelledForest) *concurrencySafeForestWrapper {
206+
return &concurrencySafeForestWrapper{forest: f}
207+
}
208+
209+
func (w *concurrencySafeForestWrapper) VerifyAndAddVertex(vertex *Vertex) error {
210+
w.mu.Lock()
211+
defer w.mu.Unlock()
212+
err := w.forest.VerifyVertex(*vertex)
213+
if err != nil {
214+
return err
215+
}
216+
w.forest.AddVertex(*vertex)
217+
return nil
218+
}
219+
220+
// GetChildren returns an iterator the children of the specified vertex.
221+
func (w *concurrencySafeForestWrapper) GetChildren(id flow.Identifier) *vertexIteratorChecker {
222+
w.mu.RLock()
223+
defer w.mu.RUnlock()
224+
225+
// creating non-concurrency safe iterator and memorizing its snapshot information for later testing
226+
unsafeIter := w.forest.GetChildren(id)
227+
numberChildren := w.forest.GetNumberOfChildren(id)
228+
sliceCapacity := cap(unsafeIter.data)
229+
230+
// create wapper `VertexIteratorConcurrencySafe` and a check for verifying it
231+
safeIter := NewVertexIteratorConcurrencySafe(unsafeIter)
232+
return newVertexIteratorChecker(safeIter, numberChildren, sliceCapacity)
233+
}
234+
235+
// For testing only!
236+
type vertexIteratorChecker struct {
237+
safeIterator *VertexIteratorConcurrencySafe
238+
expectedLength int
239+
expectedCapacity int
240+
}
241+
242+
func newVertexIteratorChecker(iter *VertexIteratorConcurrencySafe, expectedLength int, expectedCapacity int) *vertexIteratorChecker {
243+
return &vertexIteratorChecker{
244+
safeIterator: iter,
245+
expectedLength: expectedLength,
246+
expectedCapacity: expectedCapacity,
247+
}
248+
}
249+
250+
func (c *vertexIteratorChecker) Check(t *testing.T) {
251+
// We are directly accessing the slice here backing the unsafe iterator without any concurrency
252+
// protection. This is expected to be fine, because the `data` slice is append only.
253+
unsafeIter := c.safeIterator.unsafeIter
254+
assert.NotNil(t, unsafeIter.data)
255+
assert.Equal(t, c.expectedLength, len(unsafeIter.data))
256+
assert.Equal(t, c.expectedCapacity, cap(unsafeIter.data))
257+
}

module/forest/leveled_forest.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
// LevelledForest is NOT safe for concurrent use by multiple goroutines.
2424
type LevelledForest struct {
2525
vertices VertexSet
26-
verticesAtLevel map[uint64]VertexList
26+
verticesAtLevel map[uint64]VertexList // by convention, `VertexList`s are append-only (and eventually garbage collected, upon pruning)
2727
size uint64
2828
LowestLevel uint64
2929
}
@@ -41,7 +41,7 @@ type VertexSet map[flow.Identifier]*vertexContainer
4141
type vertexContainer struct {
4242
id flow.Identifier
4343
level uint64
44-
children VertexList
44+
children VertexList // by convention, append only
4545

4646
// the following are only set if the block is actually known
4747
vertex Vertex
@@ -136,6 +136,7 @@ func (f *LevelledForest) GetSize() uint64 {
136136
func (f *LevelledForest) GetChildren(id flow.Identifier) VertexIterator {
137137
// if vertex does not exist, container will be nil
138138
if container, ok := f.vertices[id]; ok {
139+
// by design, the list of children is append-only.
139140
return newVertexIterator(container.children)
140141
}
141142
return newVertexIterator(nil) // VertexIterator gracefully handles nil slices

module/forest/leveled_forest_test.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,18 @@ func NewVertexMock(vertexId string, vertexLevel uint64, parentId string, parentL
3535

3636
// FOREST:
3737
//
38-
// ↙-- [A]
39-
// (Genesis) ← [B] ← [C] ←-- [D]
40-
// ⋮ ⋮ ⋮ ⋮
41-
// ⋮ ⋮ ⋮ (Missing1) ←---- [W]
42-
// ⋮ ⋮ ⋮ ⋮ (Missing2) ← [X] ← [Y]
43-
// ⋮ ⋮ ⋮ ⋮ ⋮ ⋮ ↖ [Z]
44-
// ⋮ ⋮ ⋮ ⋮ ⋮ ⋮ ⋮
38+
// ↙-- [A]
39+
// (Genesis) ← [B] ← [C] ←-- [D]
40+
// ⋮ ⋮ ⋮ ⋮
41+
// ⋮ ⋮ ⋮ (Missing1) ←---- [W]
42+
// ⋮ ⋮ ⋮ ⋮ (Missing2) ← [X] ← [Y]
43+
// ⋮ ⋮ ⋮ ⋮ ⋮ ⋮ ↖ [Z]
44+
// ⋮ ⋮ ⋮ ⋮ ⋮ ⋮ ⋮
45+
// 0 1 2 3 4 5 6 Level
4546
//
46-
// LEVEL: 0 1 2 3 4 5 6
4747
// Nomenclature:
48-
//
49-
// [B] Vertex B (internally represented as a full vertex container)
50-
// (M) referenced vertex that has not been added (internally represented as empty vertex container)
48+
// [B] Vertex B (internally represented as a full vertex container)
49+
// (M) referenced vertex that has not been added (internally represented as empty vertex container)
5150
var TestVertices = map[string]*mock.Vertex{
5251
"A": NewVertexMock("A", 3, "Genesis", 0),
5352
"B": NewVertexMock("B", 1, "Genesis", 0),

0 commit comments

Comments
 (0)