diff --git a/module/forest/concurrency_helpers.go b/module/forest/concurrency_helpers.go new file mode 100644 index 00000000000..ef091f6a445 --- /dev/null +++ b/module/forest/concurrency_helpers.go @@ -0,0 +1,44 @@ +package forest + +import ( + "sync" +) + +/* ATTENTION: LevelledForest and its derived objects, such as the VertexIterator, are NOT Concurrency Safe. The + * LevelledForest is a low-level library geared for performance. As locking is not needed in some application + * scenarios (most notably the consensus EventHandler, which by design is single-threaded), concurrency handling + * is delegated to the higher-level business logic using the LevelledForest. + * + * Here, we provide helper structs for higher-level business logic, to simplify their concurrency handling. + */ + +// VertexIteratorConcurrencySafe wraps the Vertex Iterator to make it concurrency safe. Effectively, +// the behaviour is like iterating on a SNAPSHOT at the time of iterator construction. +// Under concurrent recalls, the iterator delivers each item once across all concurrent callers. +// Items are delivered in order and `NextVertex` establishes a 'synchronized before' relation as +// defined in the go memory model https://go.dev/ref/mem. +type VertexIteratorConcurrencySafe struct { + unsafeIter VertexIterator + mu sync.RWMutex +} + +func NewVertexIteratorConcurrencySafe(iter VertexIterator) *VertexIteratorConcurrencySafe { + return &VertexIteratorConcurrencySafe{unsafeIter: iter} +} + +// NextVertex returns the next Vertex or nil if there is none. A caller receiving a non-nil value +// are 'synchronized before' (see https://go.dev/ref/mem) the receiver of the subsequent non-nil +// value. NextVertex() delivers each item once, following a fully sequential deterministic order, +// with results being distributed in order across all competing threads. +func (i *VertexIteratorConcurrencySafe) NextVertex() Vertex { + i.mu.Lock() // must acquire write lock here, as wrapped `VertexIterator` changes its internal state + defer i.mu.Unlock() + return i.unsafeIter.NextVertex() +} + +// HasNext returns true if and only if there is a next Vertex +func (i *VertexIteratorConcurrencySafe) HasNext() bool { + i.mu.RLock() + defer i.mu.RUnlock() + return i.unsafeIter.HasNext() +} diff --git a/module/forest/concurrency_helpers_test.go b/module/forest/concurrency_helpers_test.go new file mode 100644 index 00000000000..fb73b7ac1d0 --- /dev/null +++ b/module/forest/concurrency_helpers_test.go @@ -0,0 +1,258 @@ +package forest + +import ( + "fmt" + "math/rand" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/utils/unittest" +) + +// Test_SlicePrimitives demonstrates that we can use slices, including `VertexList` +// as concurrency-safe snapshots. +func Test_SlicePrimitives(t *testing.T) { + // Conceptually, we always proceed along the following pattern: + // • We assume there is a LevelledForest instance, protected for concurrent access by higher-level + // business logic (not represented in this test). + // • The higher-level business logic instantiates a `VertexIterator` (not represented in this test) by calling + // `GetChildren` or `GetVerticesAtLevel` for example. Under the hood, the `VertexIterator` receives a `VertexList` + // as it's sole input. The slice `VertexList` golang internally represents as the tripel + // [pointer to array, slice length, slice capacity] (see https://go.dev/blog/slices-intro for details). The slice + // is passed by value, i.e. `VertexIterator` maintains its own copy of these values. + // • Here, we emulate interleaving writes by the forest to the shared slice `VertexList`. + + v := NewVertexMock("v", 3, "C", 2) + vContainer := &vertexContainer{id: unittest.IdentifierFixture(), level: 3, vertex: v} + + t.Run("nil slice", func(t *testing.T) { + // Prepare vertex list that, representing the slice of children held by the + var vertexList VertexList // nil zero value + + // vertex iterator maintains a snapshot of a nil slice + iterator := newVertexIterator(vertexList) + + // Emulating concurrent access, where new data is added by the forest: + // we expect that vertexList was expanded, but the iterator's notion should be unchanged + vertexList = append(vertexList, vContainer) + assert.Nil(t, iterator.data) + assert.Equal(t, len(vertexList), len(iterator.data)+1) + }) + + t.Run("empty slice of zero capacity", func(t *testing.T) { + // Prepare vertex list that, representing the slice of children held by the + var vertexList VertexList = []*vertexContainer{} + + // vertex iterator maintains a snapshot of the non-nil slice, with zero capacity + iterator := newVertexIterator(vertexList) + + // Emulating concurrent access, where new data is added by the forest: + // we expect that vertexList was expanded, but the iterator's notion should be unchanged + vertexList = append(vertexList, vContainer) + assert.NotNil(t, iterator.data) + assert.Zero(t, len(iterator.data)) + assert.Equal(t, len(vertexList), len(iterator.data)+1) + }) + + t.Run("empty slice of with capacity 2 (len = 0, cap = 2)", func(t *testing.T) { + // Prepare vertex list that, representing the slice of children held by the + var vertexList VertexList = make(VertexList, 0, 2) + + // vertex iterator maintains a snapshot of a slice with length zero but capacity 2 + iterator := newVertexIterator(vertexList) + + // Emulating concurrent access, where new data is added by the forest: + // we expect that vertexList was expanded, but the iterator's notion should be unchanged + vertexList = append(vertexList, vContainer) + assert.NotNil(t, iterator.data) + assert.Zero(t, len(iterator.data)) + assert.Equal(t, 2, cap(iterator.data)) + assert.Equal(t, len(vertexList), len(iterator.data)+1) + }) + + t.Run("non-empty slice with larger capacity (len = 1, cap = 2)", func(t *testing.T) { + // Prepare vertex list that, representing the slice of children held by the + var vertexList VertexList = make(VertexList, 1, 2) + _v := NewVertexMock("v", 3, "C", 2) + vertexList[0] = &vertexContainer{id: unittest.IdentifierFixture(), level: 3, vertex: _v} + + // vertex iterator maintains a snapshot of a slice with length 1 but capacity 2 + iterator := newVertexIterator(vertexList) + + // Emulating concurrent access, where new data is added by the forest: + // we expect that vertexList was expanded, but the iterator's notion should be unchanged + vertexList = append(vertexList, vContainer) + assert.NotNil(t, iterator.data) + assert.Equal(t, 1, len(iterator.data)) + assert.Equal(t, 2, cap(iterator.data)) + assert.Equal(t, len(vertexList), len(iterator.data)+1) + }) + + t.Run("fully filled non-empty slice (len = 10, cap = 10)", func(t *testing.T) { + // Prepare vertex list that, representing the slice of children held by the + var vertexList VertexList = make(VertexList, 10) + for i := 0; i < cap(vertexList); i++ { + _v := NewVertexMock(fmt.Sprintf("v%d", i), 3, "C", 2) + vertexList[i] = &vertexContainer{id: unittest.IdentifierFixture(), level: 3, vertex: _v} + } + + // vertex iterator maintains a snapshot of the slice, where it is filled with 10 elements + iterator := newVertexIterator(vertexList) + + // Emulating concurrent access, where new data is added by the forest + vertexList = append(vertexList, vContainer) + + // we expect that vertexList was expanded, but the iterator's notion should be unchanged + assert.NotNil(t, iterator.data) + assert.Equal(t, 10, len(iterator.data)) + assert.Equal(t, 10, cap(iterator.data)) + assert.Equal(t, len(vertexList), len(iterator.data)+1) + }) +} + +// Test_VertexIteratorConcurrencySafe verifies concurrent iteration +// We start with a forest (populated by `populateNewForest`) containing the following vertices: +// +// ↙-- [A] +// ··-[C] ←-- [D] +// +// Then vertices v0, v1, v2, etc are added concurrently here in the test +// +// ↙-- [A] +// ··-[C] ←-- [D] +// ↖-- [v0] +// ↖-- [v1] +// ⋮ +// +// Before each addition, we create a vertex operator. Wile more and more vertices are added +// the constructed VertexIterators are checked to confirm they are unaffected, like they +// are operating on a snapshot taken at the time of their construction. +func Test_VertexIteratorConcurrencySafe(t *testing.T) { + forest := newConcurrencySafeForestWrapper(populateNewForest(t)) + + start := make(chan struct{}) + done1, done2 := make(chan struct{}), make(chan struct{}) + + go func() { // Go Routine 1 + <-start + for i := 0; i < 1000; i++ { + // add additional child vertex of [C] + var v Vertex = NewVertexMock(fmt.Sprintf("v%03d", i), 3, "C", 2) + err := forest.VerifyAndAddVertex(&v) + assert.NoError(t, err) + time.Sleep(500 * time.Microsecond) // sleep 0.5ms -> in total 0.5s + } + close(done1) + }() + + go func() { // Go Routine 2 + <-start + var vertexIteratorCheckers []*vertexIteratorChecker + + for { + select { + case <-done1: + close(done2) + return + default: // fallthrough + } + + // the other thread is concurrently adding [C]. At all times, there should be at least + iteratorChecker := forest.GetChildren(TestVertices["C"].VertexID()) + vertexIteratorCheckers = append(vertexIteratorCheckers, iteratorChecker) + for _, checker := range vertexIteratorCheckers { + checker.Check(t) + } + // sleep randomly up to 2ms, average 1ms, so we create only about half as much + // iterators as new vertices are added. + time.Sleep(time.Duration(rand.Intn(2000)) * time.Microsecond) + } + }() + + // start, and then wait for all go routines to finish. Routine 1 finishes after it added 1000 + // new vertices [v000], [v001], [v999] to the forest. Routine 2 will run until routine 1 has + // finished. While routine 2 is running, it verifies that vertex additions to the forests + // leve the iterators unchanged. + close(start) + + // Wait up to 2 seconds, checking every 100 milliseconds + bothDone := func() bool { + select { + case <-done1: + select { + case <-done2: + return true + default: + return false + } + default: + return false + } + } + assert.Eventually(t, bothDone, 2*time.Second, 100*time.Millisecond, "Condition never became true") + +} + +// For testing only! +type concurrencySafeForestWrapper struct { + forest *LevelledForest + mu sync.RWMutex +} + +func newConcurrencySafeForestWrapper(f *LevelledForest) *concurrencySafeForestWrapper { + return &concurrencySafeForestWrapper{forest: f} +} + +func (w *concurrencySafeForestWrapper) VerifyAndAddVertex(vertex *Vertex) error { + w.mu.Lock() + defer w.mu.Unlock() + err := w.forest.VerifyVertex(*vertex) + if err != nil { + return err + } + w.forest.AddVertex(*vertex) + return nil +} + +// GetChildren returns an iterator the children of the specified vertex. +func (w *concurrencySafeForestWrapper) GetChildren(id flow.Identifier) *vertexIteratorChecker { + w.mu.RLock() + defer w.mu.RUnlock() + + // creating non-concurrency safe iterator and memorizing its snapshot information for later testing + unsafeIter := w.forest.GetChildren(id) + numberChildren := w.forest.GetNumberOfChildren(id) + sliceCapacity := cap(unsafeIter.data) + + // create wapper `VertexIteratorConcurrencySafe` and a check for verifying it + safeIter := NewVertexIteratorConcurrencySafe(unsafeIter) + return newVertexIteratorChecker(safeIter, numberChildren, sliceCapacity) +} + +// For testing only! +type vertexIteratorChecker struct { + safeIterator *VertexIteratorConcurrencySafe + expectedLength int + expectedCapacity int +} + +func newVertexIteratorChecker(iter *VertexIteratorConcurrencySafe, expectedLength int, expectedCapacity int) *vertexIteratorChecker { + return &vertexIteratorChecker{ + safeIterator: iter, + expectedLength: expectedLength, + expectedCapacity: expectedCapacity, + } +} + +func (c *vertexIteratorChecker) Check(t *testing.T) { + // We are directly accessing the slice here backing the unsafe iterator without any concurrency + // protection. This is expected to be fine, because the `data` slice is append only. + unsafeIter := c.safeIterator.unsafeIter + assert.NotNil(t, unsafeIter.data) + assert.Equal(t, c.expectedLength, len(unsafeIter.data)) + assert.Equal(t, c.expectedCapacity, cap(unsafeIter.data)) +} diff --git a/module/forest/leveled_forest.go b/module/forest/leveled_forest.go index e0967f052f5..e988f84872f 100644 --- a/module/forest/leveled_forest.go +++ b/module/forest/leveled_forest.go @@ -23,7 +23,7 @@ import ( // LevelledForest is NOT safe for concurrent use by multiple goroutines. type LevelledForest struct { vertices VertexSet - verticesAtLevel map[uint64]VertexList + verticesAtLevel map[uint64]VertexList // by convention, `VertexList`s are append-only (and eventually garbage collected, upon pruning) size uint64 LowestLevel uint64 } @@ -41,7 +41,7 @@ type VertexSet map[flow.Identifier]*vertexContainer type vertexContainer struct { id flow.Identifier level uint64 - children VertexList + children VertexList // by convention, append only // the following are only set if the block is actually known vertex Vertex @@ -136,6 +136,7 @@ func (f *LevelledForest) GetSize() uint64 { func (f *LevelledForest) GetChildren(id flow.Identifier) VertexIterator { // if vertex does not exist, container will be nil if container, ok := f.vertices[id]; ok { + // by design, the list of children is append-only. return newVertexIterator(container.children) } return newVertexIterator(nil) // VertexIterator gracefully handles nil slices diff --git a/module/forest/leveled_forest_test.go b/module/forest/leveled_forest_test.go index e63b2aba5ed..7da7cad1ba3 100644 --- a/module/forest/leveled_forest_test.go +++ b/module/forest/leveled_forest_test.go @@ -35,19 +35,18 @@ func NewVertexMock(vertexId string, vertexLevel uint64, parentId string, parentL // FOREST: // -// ↙-- [A] -// (Genesis) ← [B] ← [C] ←-- [D] -// ⋮ ⋮ ⋮ ⋮ -// ⋮ ⋮ ⋮ (Missing1) ←---- [W] -// ⋮ ⋮ ⋮ ⋮ (Missing2) ← [X] ← [Y] -// ⋮ ⋮ ⋮ ⋮ ⋮ ⋮ ↖ [Z] -// ⋮ ⋮ ⋮ ⋮ ⋮ ⋮ ⋮ +// ↙-- [A] +// (Genesis) ← [B] ← [C] ←-- [D] +// ⋮ ⋮ ⋮ ⋮ +// ⋮ ⋮ ⋮ (Missing1) ←---- [W] +// ⋮ ⋮ ⋮ ⋮ (Missing2) ← [X] ← [Y] +// ⋮ ⋮ ⋮ ⋮ ⋮ ⋮ ↖ [Z] +// ⋮ ⋮ ⋮ ⋮ ⋮ ⋮ ⋮ +// 0 1 2 3 4 5 6 Level // -// LEVEL: 0 1 2 3 4 5 6 // Nomenclature: -// -// [B] Vertex B (internally represented as a full vertex container) -// (M) referenced vertex that has not been added (internally represented as empty vertex container) +// [B] Vertex B (internally represented as a full vertex container) +// (M) referenced vertex that has not been added (internally represented as empty vertex container) var TestVertices = map[string]*mock.Vertex{ "A": NewVertexMock("A", 3, "Genesis", 0), "B": NewVertexMock("B", 1, "Genesis", 0), diff --git a/module/forest/vertex.go b/module/forest/vertex.go index bcb090516c4..c24325a9b0d 100644 --- a/module/forest/vertex.go +++ b/module/forest/vertex.go @@ -23,10 +23,36 @@ func VertexToString(v Vertex) string { } // VertexIterator is a stateful iterator for VertexList. -// Internally operates directly on the Vertex Containers +// Internally, it operates directly on the Vertex Containers. // It has one-element look ahead for skipping empty vertex containers. +// +// ATTENTION: Not Concurrency Safe! +// +// This vertex iterator does NOT COPY the provided list of vertices for +// efficiency reasons. For APPEND_ONLY `VertexList`s, the `VertexIterator` +// can be wrapped into a VertexIteratorConcurrencySafe to make it concurrency +// safe. By design, the LevelledForest guarantees this. Hence, construction +// of these vertex iterators is private to the `forest` package. type VertexIterator struct { - data VertexList + // CAUTION: to support concurrency-safe iterators, the `VertexIterator` *must* maintain its own slice descriptor. + // This is the default in Golang, as slices are typically passed by value, since only the slice descriptor (see + // https://go.dev/blog/slices-intro for details) is copied, but not the backing array. While very uncommon in go, + // we emphasize that a hypothetical change to `data *VertexList` (using pointer to slice) would break our wrapper + // `VertexIteratorConcurrencySafe`. + // Context: + // • `VertexIterator`s are instantiated by calling LevelledForest.GetChildren or .GetVerticesAtLevel` for + // example. In both cases, the provided `VertexList` is append-only in the levelled forest. So we assume a + // LevelledForest instance which is synchronized for concurrent access by higher-level business logic. Then, + // a `VertexIterator` many be iterated on, while concurrently elements are added to the forest. + // • Note that `data` is intrinsically safe for concurrent access, as long as no elements are modified inplace. + // In other words, append-only usage patterns are intrinsically safe for concurrent access. Eventually, the forest + // may exceed the current slice's capacity, at which point a new array is allocated by forest, while we maintain + // a reference to the older array here. Essentially, we maintain a snapshot of the slice at the point we received + // it, since our `data` field below also contains a local copy of the slice's length at the point we received it. + data VertexList // tldr; assumed safe for concurrent access, as forest operates append-only + + // not protected for concurrent access: + idx int next Vertex } @@ -55,6 +81,17 @@ func (it *VertexIterator) HasNext() bool { return it.next != nil } +// newVertexIterator instantiates an iterator. Essentially it operates on a snapshot of the slice. +// Even if the Levelled Forest makes additions to the input slice, we maintain our own notion of +// length and backing slice. +// CAUTION: +// - we do NOT COPY the list's containers for efficiency. +// - Package-private, as usage must be limited to APPEND-ONLY `VertexList` +// Without append-only guarantees, we would break the `VertexIteratorConcurrencySafe` +// and generally a lot of conceptual challenges arise for iteration in concurrent +// environments. We easily avoid the complexity by restricting the usage to the +// levelled forest, which by design operates append-only (and eventually garbage collected +// on pruning. func newVertexIterator(vertexList VertexList) VertexIterator { it := VertexIterator{ data: vertexList, @@ -75,11 +112,14 @@ func (err InvalidVertexError) Error() string { return fmt.Sprintf("invalid vertex %s: %s", VertexToString(err.Vertex), err.msg) } +// IsInvalidVertexError returns ture if and only if the input error is a (wrapped) InvalidVertexError. func IsInvalidVertexError(err error) bool { var target InvalidVertexError return errors.As(err, &target) } +// NewInvalidVertexErrorf instantiates an [InvalidVertexError]. The +// inputs `msg` and `args` follow the pattern of [fmt.Errorf]. func NewInvalidVertexErrorf(vertex Vertex, msg string, args ...interface{}) InvalidVertexError { return InvalidVertexError{ Vertex: vertex, diff --git a/module/irrecoverable/exception_test.go b/module/irrecoverable/exception_test.go index eb3fcf8e5e6..e847575d781 100644 --- a/module/irrecoverable/exception_test.go +++ b/module/irrecoverable/exception_test.go @@ -8,6 +8,27 @@ import ( "github.com/stretchr/testify/assert" ) +// IsIrrecoverableException returns true if and only of the provided error is +// (a wrapped) irrecoverable exception. By protocol convention, irrecoverable +// errors conceptually handled the same way as other unexpected errors: any +// occurrence means that the software has left the pre-defined path of _safe_ +// operations. Continuing despite an unexpected error or irrecoverable exception +// is impossible, because protocol-compliant operation of a node can no longer +// be expected. In the worst case the node might be slashed or the protocol as a +// hole compromised. +// For the reason mentioned above, protocol business logic should generally only +// check for sentinel errors expected exactly in the situation the business logic +// is in. Any error that does not match the sentinels known to be benign in that +// situation should be treated as a critical failure and the node must crash. +// Hence, PROTOCOL BUSINESS LOGIC should NEVER CHECK whether an error is an +// IRRECOVERABLE EXCEPTION. This function is for TESTING ONLY. +func IsIrrecoverableException(err error) bool { + // The Go build system specifically handles `_test.go` files, treating them as part of + // the test suite and not including them in the final production binary using go build. + var e = new(exception) + return errors.As(err, &e) +} + var sentinelVar = errors.New("sentinelVar") type sentinelType struct{}