Skip to content

Commit 1aa0e51

Browse files
committed
Added integration tests for follower. Removed some temporary logic. Updated tests
1 parent b4b91c4 commit 1aa0e51

File tree

3 files changed

+188
-3
lines changed

3 files changed

+188
-3
lines changed

engine/common/follower/core_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -245,8 +245,8 @@ func (s *CoreSuite) TestDetectingProposalEquivocation() {
245245
// submitted for further processing to Hotstuff layer.
246246
func (s *CoreSuite) TestConcurrentAdd() {
247247
workers := 5
248-
batchesPerWorker := 1
249-
blocksPerBatch := 1
248+
batchesPerWorker := 10
249+
blocksPerBatch := 10
250250
blocksPerWorker := blocksPerBatch * batchesPerWorker
251251
blocks := unittest.ChainFixtureFrom(workers*blocksPerWorker, s.finalizedBlock)
252252
targetSubmittedBlockID := blocks[len(blocks)-2].ID()
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
package follower
2+
3+
import (
4+
"context"
5+
"sync"
6+
"testing"
7+
"time"
8+
9+
"github.com/dgraph-io/badger/v2"
10+
"github.com/stretchr/testify/mock"
11+
"github.com/stretchr/testify/require"
12+
"go.uber.org/atomic"
13+
14+
"github.com/onflow/flow-go/consensus"
15+
"github.com/onflow/flow-go/consensus/hotstuff"
16+
"github.com/onflow/flow-go/consensus/hotstuff/follower"
17+
"github.com/onflow/flow-go/consensus/hotstuff/mocks"
18+
"github.com/onflow/flow-go/consensus/hotstuff/notifications/pubsub"
19+
"github.com/onflow/flow-go/model/flow"
20+
"github.com/onflow/flow-go/model/messages"
21+
moduleconsensus "github.com/onflow/flow-go/module/finalizer/consensus"
22+
"github.com/onflow/flow-go/module/irrecoverable"
23+
"github.com/onflow/flow-go/module/metrics"
24+
module "github.com/onflow/flow-go/module/mock"
25+
"github.com/onflow/flow-go/module/trace"
26+
moduleutil "github.com/onflow/flow-go/module/util"
27+
"github.com/onflow/flow-go/network/mocknetwork"
28+
pbadger "github.com/onflow/flow-go/state/protocol/badger"
29+
"github.com/onflow/flow-go/state/protocol/events"
30+
"github.com/onflow/flow-go/state/protocol/util"
31+
"github.com/onflow/flow-go/storage/badger/operation"
32+
storageutil "github.com/onflow/flow-go/storage/util"
33+
"github.com/onflow/flow-go/utils/unittest"
34+
)
35+
36+
// TestFollowerHappyPath tests Engine integrated with real modules, mocked modules are used only for functionality which is static
37+
// or implemented by our test case. Tests that syncing batches of blocks from other participants results in extending protocol state.
38+
// After processing all available blocks we check if chain has correct height and finalized block.
39+
// We use next setup:
40+
// Number of workers - workers
41+
// Number of batches submitted by worker - batchesPerWorker
42+
// Number of blocks in each batch submitted by worker - blocksPerBatch
43+
// Each worker submits batchesPerWorker*blocksPerBatch blocks
44+
// In total we will submit workers*batchesPerWorker*blocksPerBatch
45+
func TestFollowerHappyPath(t *testing.T) {
46+
allIdentities := unittest.CompleteIdentitySet()
47+
rootSnapshot := unittest.RootSnapshotFixture(allIdentities)
48+
unittest.RunWithBadgerDB(t, func(db *badger.DB) {
49+
metrics := metrics.NewNoopCollector()
50+
tracer := trace.NewNoopTracer()
51+
consumer := events.NewNoop()
52+
headers, _, seals, index, payloads, blocks, qcs, setups, commits, statuses, results := storageutil.StorageLayer(t, db)
53+
54+
// bootstrap root snapshot
55+
state, err := pbadger.Bootstrap(metrics, db, headers, seals, results, blocks, qcs, setups, commits, statuses, rootSnapshot)
56+
require.NoError(t, err)
57+
mockTimer := util.MockBlockTimer()
58+
59+
// create follower state
60+
followerState, err := pbadger.NewFollowerState(state, index, payloads, tracer, consumer, mockTimer)
61+
require.NoError(t, err)
62+
finalizer := moduleconsensus.NewFinalizer(db, headers, followerState, tracer)
63+
rootHeader, err := rootSnapshot.Head()
64+
require.NoError(t, err)
65+
rootQC, err := rootSnapshot.QuorumCertificate()
66+
require.NoError(t, err)
67+
68+
// Hack EECC.
69+
// Since root snapshot is created with 1000 views for first epoch, we will forcefully enter EECC to avoid errors
70+
// related to epoch transitions.
71+
db.NewTransaction(true)
72+
err = db.Update(func(txn *badger.Txn) error {
73+
return operation.SetEpochEmergencyFallbackTriggered(rootHeader.ID())(txn)
74+
})
75+
require.NoError(t, err)
76+
77+
consensusConsumer := pubsub.NewFinalizationDistributor()
78+
// use real consensus modules
79+
forks, err := consensus.NewForks(rootHeader, headers, finalizer, consensusConsumer, rootHeader, rootQC)
80+
require.NoError(t, err)
81+
82+
// assume all proposals are valid
83+
validator := mocks.NewValidator(t)
84+
validator.On("ValidateProposal", mock.Anything).Return(nil)
85+
86+
// initialize the follower followerHotstuffLogic
87+
followerHotstuffLogic, err := follower.New(unittest.Logger(), validator, forks)
88+
require.NoError(t, err)
89+
90+
// initialize the follower loop
91+
followerLoop, err := hotstuff.NewFollowerLoop(unittest.Logger(), followerHotstuffLogic)
92+
require.NoError(t, err)
93+
94+
syncCore := module.NewBlockRequester(t)
95+
followerCore, err := NewCore(
96+
unittest.Logger(),
97+
metrics,
98+
metrics,
99+
consensusConsumer,
100+
followerState,
101+
followerLoop,
102+
validator,
103+
syncCore,
104+
tracer,
105+
)
106+
require.NoError(t, err)
107+
108+
me := module.NewLocal(t)
109+
nodeID := unittest.IdentifierFixture()
110+
me.On("NodeID").Return(nodeID).Maybe()
111+
112+
net := mocknetwork.NewNetwork(t)
113+
con := mocknetwork.NewConduit(t)
114+
net.On("Register", mock.Anything, mock.Anything).Return(con, nil)
115+
116+
// use real engine
117+
engine, err := New(unittest.Logger(), net, me, metrics, headers, rootHeader, followerCore)
118+
require.NoError(t, err)
119+
// don't forget to subscribe for finalization notifications
120+
consensusConsumer.AddOnBlockFinalizedConsumer(engine.OnFinalizedBlock)
121+
122+
// start hotstuff logic and follower engine
123+
ctx, cancel, errs := irrecoverable.WithSignallerAndCancel(context.Background())
124+
followerLoop.Start(ctx)
125+
engine.Start(ctx)
126+
unittest.RequireCloseBefore(t, moduleutil.AllReady(engine, followerLoop), time.Second, "engine failed to start")
127+
128+
// prepare chain of blocks, we will use a continuous chain assuming it was generated on happy path.
129+
workers := 5
130+
batchesPerWorker := 10
131+
blocksPerBatch := 100
132+
blocksPerWorker := blocksPerBatch * batchesPerWorker
133+
flowBlocks := unittest.ChainFixtureFrom(workers*blocksPerWorker, rootHeader)
134+
require.Greaterf(t, len(flowBlocks), defaultPendingBlocksCacheCapacity, "this test assumes that we operate with more blocks than cache's upper limit")
135+
136+
// fix block views, so we generate blocks as it's a happy path
137+
for i, block := range flowBlocks {
138+
block.Header.View = block.Header.Height
139+
if i > 0 {
140+
block.Header.ParentView = flowBlocks[i-1].Header.View
141+
block.Header.ParentID = flowBlocks[i-1].Header.ID()
142+
}
143+
}
144+
pendingBlocks := flowBlocksToBlockProposals(flowBlocks...)
145+
146+
// this block should be finalized based on 2-chain finalization rule
147+
targetBlockHeight := pendingBlocks[len(pendingBlocks)-4].Block.Header.Height
148+
149+
// emulate syncing logic, where we push same blocks over and over.
150+
originID := unittest.IdentifierFixture()
151+
submittingBlocks := atomic.NewBool(true)
152+
var wg sync.WaitGroup
153+
wg.Add(workers)
154+
for i := 0; i < workers; i++ {
155+
go func(blocks []*messages.BlockProposal) {
156+
defer wg.Done()
157+
for submittingBlocks.Load() {
158+
for batch := 0; batch < batchesPerWorker; batch++ {
159+
engine.OnSyncedBlocks(flow.Slashable[[]*messages.BlockProposal]{
160+
OriginID: originID,
161+
Message: blocks[batch*blocksPerBatch : (batch+1)*blocksPerBatch],
162+
})
163+
}
164+
}
165+
}(pendingBlocks[i*blocksPerWorker : (i+1)*blocksPerWorker])
166+
}
167+
168+
// wait for target block to become finalized, this might take a while.
169+
require.Eventually(t, func() bool {
170+
final, err := followerState.Final().Head()
171+
require.NoError(t, err)
172+
return final.Height == targetBlockHeight
173+
}, time.Minute, time.Second, "expect to process all blocks before timeout")
174+
175+
// shutdown and cleanup test
176+
submittingBlocks.Store(false)
177+
unittest.RequireReturnsBefore(t, wg.Wait, time.Second, "expect workers to stop producing")
178+
cancel()
179+
unittest.RequireCloseBefore(t, moduleutil.AllDone(engine, followerLoop), time.Second, "engine failed to stop")
180+
select {
181+
case err := <-errs:
182+
require.NoError(t, err)
183+
default:
184+
}
185+
})
186+
}

state/protocol/badger/mutator.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,6 @@ func (m *FollowerState) ExtendCertified(ctx context.Context, candidate *flow.Blo
116116
span, ctx := m.tracer.StartSpanFromContext(ctx, trace.ProtoStateMutatorHeaderExtend)
117117
defer span.End()
118118

119-
// TODO: this is a temporary if statement since follower engine doesn't deliver QCs yet. Once the implementation is complete
120119
// there are no cases where certifyingQC can be nil.
121120
if certifyingQC != nil {
122121
blockID := candidate.ID()

0 commit comments

Comments
 (0)