Skip to content

Commit 874d3ed

Browse files
committed
Apply suggestions from PR review
1 parent a176e5f commit 874d3ed

File tree

4 files changed

+59
-69
lines changed

4 files changed

+59
-69
lines changed

engine/common/follower/cache/cache_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ func (s *CacheSuite) TestPruneUpToView() {
177177
}
178178

179179
// TestConcurrentAdd simulates multiple workers adding batches of blocks out of order.
180-
// We use next setup:
180+
// We use the following setup:
181181
// Number of workers - workers
182182
// Number of batches submitted by worker - batchesPerWorker
183183
// Number of blocks in each batch submitted by worker - blocksPerBatch

engine/common/follower/core_test.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ func (s *CoreSuite) TestAddFinalizedBlock() {
128128
// 2. added to the pending cache
129129
// 3. added to the pending tree
130130
// 4. added to the protocol state
131-
// Finally, the certified blocks should be forwarded to the HotStuff follower.
131+
//
132+
// Finally, the certified blocks should be forwarded to the HotStuff follower.
132133
func (s *CoreSuite) TestProcessingRangeHappyPath() {
133134
blocks := unittest.ChainFixtureFrom(10, s.finalizedBlock)
134135

@@ -202,9 +203,11 @@ func (s *CoreSuite) TestProcessingConnectedRangesOutOfOrder() {
202203

203204
var wg sync.WaitGroup
204205
wg.Add(len(blocks) - 1)
205-
s.follower.On("SubmitProposal", mock.Anything).Return().Run(func(args mock.Arguments) {
206-
wg.Done()
207-
}).Times(len(blocks) - 1)
206+
for _, block := range blocks[:len(blocks)-1] {
207+
s.follower.On("SubmitProposal", model.ProposalFromFlow(block.Header)).Return().Run(func(args mock.Arguments) {
208+
wg.Done()
209+
}).Once()
210+
}
208211

209212
lastSubmittedBlockID := flow.ZeroID
210213
s.state.On("ExtendCertified", mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
@@ -241,12 +244,13 @@ func (s *CoreSuite) TestDetectingProposalEquivocation() {
241244
}
242245

243246
// TestConcurrentAdd simulates multiple workers adding batches of connected blocks out of order.
244-
// We use next setup:
247+
// We use the following setup:
245248
// Number of workers - workers
246249
// - Number of workers - workers
247250
// - Number of batches submitted by worker - batchesPerWorker
248251
// - Number of blocks in each batch submitted by worker - blocksPerBatch
249252
// - Each worker submits batchesPerWorker*blocksPerBatch blocks
253+
//
250254
// In total we will submit workers*batchesPerWorker*blocksPerBatch
251255
// After submitting all blocks we expect that chain of blocks except last one will be added to the protocol state and
252256
// submitted for further processing to Hotstuff layer.

engine/common/follower/engine.go

Lines changed: 48 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,17 @@ func New(
138138
return e, nil
139139
}
140140

141-
// OnBlockProposal logs an error and drops the proposal. This is because the follower ingests new
142-
// blocks directly from the networking layer (channel `channels.ReceiveBlocks` by default), which
143-
// delivers its messages by calling the generic `Process` method. Receiving block proposal as
144-
// from another internal component is likely an implementation bug.
145-
func (e *Engine) OnBlockProposal(_ flow.Slashable[*messages.BlockProposal]) {
146-
e.log.Error().Msg("received unexpected block proposal via internal method")
141+
// OnBlockProposal performs processing of incoming block by pushing into queue and notifying worker.
142+
func (e *Engine) OnBlockProposal(proposal flow.Slashable[*messages.BlockProposal]) {
143+
e.engMetrics.MessageReceived(metrics.EngineFollower, metrics.MessageBlockProposal)
144+
proposalAsList := flow.Slashable[[]*messages.BlockProposal]{
145+
OriginID: proposal.OriginID,
146+
Message: []*messages.BlockProposal{proposal.Message},
147+
}
148+
// queue proposal
149+
if e.pendingBlocks.Push(proposalAsList) {
150+
e.pendingBlocksNotifier.Notify()
151+
}
147152
}
148153

149154
// OnSyncedBlocks consumes incoming blocks by pushing into queue and notifying worker.
@@ -173,7 +178,7 @@ func (e *Engine) OnFinalizedBlock(block *model.Block) {
173178
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, message interface{}) error {
174179
switch msg := message.(type) {
175180
case *messages.BlockProposal:
176-
e.onBlockProposal(flow.Slashable[*messages.BlockProposal]{
181+
e.OnBlockProposal(flow.Slashable[*messages.BlockProposal]{
177182
OriginID: originID,
178183
Message: msg,
179184
})
@@ -221,52 +226,45 @@ func (e *Engine) processQueuedBlocks(doneSignal <-chan struct{}) error {
221226
// for the next incoming message to arrive.
222227
return nil
223228
}
224-
batch := msg.(flow.Slashable[[]*messages.BlockProposal])
225-
if len(batch.Message) < 1 {
226-
continue
227-
}
228-
blocks := make([]*flow.Block, 0, len(batch.Message))
229-
for _, block := range batch.Message {
230-
blocks = append(blocks, block.Block.ToInternal())
231-
}
232229

233-
firstBlock := blocks[0].Header
234-
lastBlock := blocks[len(blocks)-1].Header
235-
log := e.log.With().
236-
Hex("origin_id", batch.OriginID[:]).
237-
Str("chain_id", lastBlock.ChainID.String()).
238-
Uint64("first_block_height", firstBlock.Height).
239-
Uint64("first_block_view", firstBlock.View).
240-
Uint64("last_block_height", lastBlock.Height).
241-
Uint64("last_block_view", lastBlock.View).
242-
Int("range_length", len(blocks)).
243-
Logger()
244-
245-
latestFinalizedView := e.finalizedBlockTracker.NewestBlock().View
246-
submitConnectedBatch := func(blocks []*flow.Block) {
247-
e.submitConnectedBatch(log, latestFinalizedView, batch.OriginID, blocks)
248-
}
230+
batch := msg.(flow.Slashable[[]*messages.BlockProposal])
231+
if len(batch.Message) < 1 {
232+
continue
233+
}
234+
blocks := make([]*flow.Block, 0, len(batch.Message))
235+
for _, block := range batch.Message {
236+
blocks = append(blocks, block.Block.ToInternal())
237+
}
249238

250-
// extract sequences of connected blocks and schedule them for further processing
251-
// we assume the sender has already ordered blocks into connected ranges if possible
252-
parentID := blocks[0].ID()
253-
indexOfLastConnected := 0
254-
for i := 1; i < len(blocks); i++ {
255-
if blocks[i].Header.ParentID != parentID {
256-
submitConnectedBatch(blocks[indexOfLastConnected:i])
257-
indexOfLastConnected = i
258-
}
259-
parentID = blocks[i].Header.ID()
239+
firstBlock := blocks[0].Header
240+
lastBlock := blocks[len(blocks)-1].Header
241+
log := e.log.With().
242+
Hex("origin_id", batch.OriginID[:]).
243+
Str("chain_id", lastBlock.ChainID.String()).
244+
Uint64("first_block_height", firstBlock.Height).
245+
Uint64("first_block_view", firstBlock.View).
246+
Uint64("last_block_height", lastBlock.Height).
247+
Uint64("last_block_view", lastBlock.View).
248+
Int("range_length", len(blocks)).
249+
Logger()
250+
251+
// extract sequences of connected blocks and schedule them for further processing
252+
// we assume the sender has already ordered blocks into connected ranges if possible
253+
latestFinalizedView := e.finalizedBlockTracker.NewestBlock().View
254+
parentID := blocks[0].ID()
255+
indexOfLastConnected := 0
256+
for i, block := range blocks {
257+
if block.Header.ParentID != parentID {
258+
e.submitConnectedBatch(log, latestFinalizedView, batch.OriginID, blocks[indexOfLastConnected:i])
259+
indexOfLastConnected = i
260260
}
261-
submitConnectedBatch(blocks[indexOfLastConnected:])
262-
263-
e.engMetrics.MessageHandled(metrics.EngineFollower, metrics.MessageBlockProposal)
264-
continue
261+
parentID = block.Header.ID()
265262
}
263+
e.submitConnectedBatch(log, latestFinalizedView, batch.OriginID, blocks[indexOfLastConnected:])
264+
265+
e.engMetrics.MessageHandled(metrics.EngineFollower, metrics.MessageBlockProposal)
266+
continue
266267

267-
// when there are no more messages in the queue, back to the processBlocksLoop to wait
268-
// for the next incoming message to arrive.
269-
return nil
270268
}
271269
}
272270

@@ -308,7 +306,8 @@ func (e *Engine) processConnectedBatch(ctx irrecoverable.SignalerContext, ready
308306
}
309307
}
310308

311-
// finalizationProcessingLoop is a separate goroutine that performs processing of finalization events
309+
// finalizationProcessingLoop is a separate goroutine that performs processing of finalization events.
310+
// Implements `component.ComponentWorker` signature.
312311
func (e *Engine) finalizationProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
313312
ready()
314313

@@ -328,16 +327,3 @@ func (e *Engine) finalizationProcessingLoop(ctx irrecoverable.SignalerContext, r
328327
}
329328
}
330329
}
331-
332-
// onBlockProposal performs processing of incoming block by pushing into queue and notifying worker.
333-
func (e *Engine) onBlockProposal(proposal flow.Slashable[*messages.BlockProposal]) {
334-
e.engMetrics.MessageReceived(metrics.EngineFollower, metrics.MessageBlockProposal)
335-
proposalAsList := flow.Slashable[[]*messages.BlockProposal]{
336-
OriginID: proposal.OriginID,
337-
Message: []*messages.BlockProposal{proposal.Message},
338-
}
339-
// queue proposal
340-
if e.pendingBlocks.Push(proposalAsList) {
341-
e.pendingBlocksNotifier.Notify()
342-
}
343-
}

engine/common/follower/integration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import (
3636
// TestFollowerHappyPath tests Engine integrated with real modules, mocked modules are used only for functionality which is static
3737
// or implemented by our test case. Tests that syncing batches of blocks from other participants results in extending protocol state.
3838
// After processing all available blocks we check if chain has correct height and finalized block.
39-
// We use next setup:
39+
// We use the following setup:
4040
// Number of workers - workers
4141
// Number of batches submitted by worker - batchesPerWorker
4242
// Number of blocks in each batch submitted by worker - blocksPerBatch

0 commit comments

Comments
 (0)