Skip to content

Commit 7abfbe9

Browse files
committed
Added a separate queue for handling incoming proposals that are not received from sync engine. Added test
1 parent 874d3ed commit 7abfbe9

File tree

2 files changed

+56
-19
lines changed

2 files changed

+56
-19
lines changed

engine/common/follower/engine.go

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,11 @@ func WithChannel(channel channels.Channel) EngineOption {
3434
// defaultBatchProcessingWorkers number of concurrent workers that process incoming blocks.
3535
const defaultBatchProcessingWorkers = 4
3636

37-
// defaultBlockQueueCapacity maximum capacity of inbound queue for batches of BlocksBatch.
38-
const defaultBlockQueueCapacity = 100
37+
// defaultPendingBlockQueueCapacity maximum capacity of inbound queue for blocks directly received from other nodes.
38+
const defaultPendingBlockQueueCapacity = 10
39+
40+
// defaultSyncedBlockQueueCapacity maximum capacity of inbound queue for batches of synced blocks.
41+
const defaultSyncedBlockQueueCapacity = 100
3942

4043
// defaultPendingConnectedBlocksChanCapacity capacity of buffered channel that is used to receive pending blocks that form a sequence.
4144
const defaultPendingConnectedBlocksChanCapacity = 100
@@ -59,8 +62,9 @@ type Engine struct {
5962
con network.Conduit
6063
channel channels.Channel
6164
headers storage.Headers
62-
pendingBlocks *fifoqueue.FifoQueue // queue for processing inbound batches of blocks
63-
pendingBlocksNotifier engine.Notifier // notifies that new batches are ready to be processed
65+
pendingBlocks *fifoqueue.FifoQueue // queue for processing inbound blocks
66+
syncedBlocks *fifoqueue.FifoQueue // queue for processing inbound batches of blocks
67+
blocksAvailableNotifier engine.Notifier // notifies that new blocks are ready to be processed
6468
finalizedBlockTracker *tracker.NewestBlockTracker // tracks the latest finalization block
6569
finalizedBlockNotifier engine.Notifier // notifies when the latest finalized block changes
6670
pendingConnectedBlocksChan chan flow.Slashable[[]*flow.Block]
@@ -80,8 +84,13 @@ func New(
8084
core common.FollowerCore,
8185
opts ...EngineOption,
8286
) (*Engine, error) {
83-
// FIFO queue for block proposals
84-
pendingBlocks, err := fifoqueue.NewFifoQueue(defaultBlockQueueCapacity)
87+
// FIFO queue for inbound block proposals
88+
pendingBlocks, err := fifoqueue.NewFifoQueue(defaultPendingBlockQueueCapacity)
89+
if err != nil {
90+
return nil, fmt.Errorf("failed to create queue for inbound blocks: %w", err)
91+
}
92+
// FIFO queue for synced blocks
93+
syncedBlocks, err := fifoqueue.NewFifoQueue(defaultSyncedBlockQueueCapacity)
8594
if err != nil {
8695
return nil, fmt.Errorf("failed to create queue for inbound blocks: %w", err)
8796
}
@@ -92,7 +101,8 @@ func New(
92101
engMetrics: engMetrics,
93102
channel: channels.ReceiveBlocks,
94103
pendingBlocks: pendingBlocks,
95-
pendingBlocksNotifier: engine.NewNotifier(),
104+
syncedBlocks: syncedBlocks,
105+
blocksAvailableNotifier: engine.NewNotifier(),
96106
pendingConnectedBlocksChan: make(chan flow.Slashable[[]*flow.Block], defaultPendingConnectedBlocksChanCapacity),
97107
finalizedBlockTracker: tracker.NewNewestBlockTracker(),
98108
finalizedBlockNotifier: engine.NewNotifier(),
@@ -141,13 +151,9 @@ func New(
141151
// OnBlockProposal performs processing of incoming block by pushing into queue and notifying worker.
142152
func (e *Engine) OnBlockProposal(proposal flow.Slashable[*messages.BlockProposal]) {
143153
e.engMetrics.MessageReceived(metrics.EngineFollower, metrics.MessageBlockProposal)
144-
proposalAsList := flow.Slashable[[]*messages.BlockProposal]{
145-
OriginID: proposal.OriginID,
146-
Message: []*messages.BlockProposal{proposal.Message},
147-
}
148154
// queue proposal
149-
if e.pendingBlocks.Push(proposalAsList) {
150-
e.pendingBlocksNotifier.Notify()
155+
if e.pendingBlocks.Push(proposal) {
156+
e.blocksAvailableNotifier.Notify()
151157
}
152158
}
153159

@@ -157,8 +163,8 @@ func (e *Engine) OnSyncedBlocks(blocks flow.Slashable[[]*messages.BlockProposal]
157163
// The synchronization engine feeds the follower with batches of blocks. The field `Slashable.OriginID`
158164
// states which node forwarded the batch to us. Each block contains its proposer and signature.
159165

160-
if e.pendingBlocks.Push(blocks) {
161-
e.pendingBlocksNotifier.Notify()
166+
if e.syncedBlocks.Push(blocks) {
167+
e.blocksAvailableNotifier.Notify()
162168
}
163169
}
164170

@@ -194,7 +200,7 @@ func (e *Engine) processBlocksLoop(ctx irrecoverable.SignalerContext, ready comp
194200
ready()
195201

196202
doneSignal := ctx.Done()
197-
newPendingBlockSignal := e.pendingBlocksNotifier.Channel()
203+
newPendingBlockSignal := e.blocksAvailableNotifier.Channel()
198204
for {
199205
select {
200206
case <-doneSignal:
@@ -221,8 +227,23 @@ func (e *Engine) processQueuedBlocks(doneSignal <-chan struct{}) error {
221227
}
222228

223229
msg, ok := e.pendingBlocks.Pop()
230+
if ok {
231+
blockMsg := msg.(flow.Slashable[*messages.BlockProposal])
232+
block := blockMsg.Message.Block.ToInternal()
233+
log := e.log.With().
234+
Hex("origin_id", blockMsg.OriginID[:]).
235+
Str("chain_id", block.Header.ChainID.String()).
236+
Uint64("view", block.Header.View).
237+
Uint64("height", block.Header.Height).
238+
Logger()
239+
latestFinalizedView := e.finalizedBlockTracker.NewestBlock().View
240+
e.submitConnectedBatch(log, latestFinalizedView, blockMsg.OriginID, []*flow.Block{block})
241+
continue
242+
}
243+
244+
msg, ok = e.syncedBlocks.Pop()
224245
if !ok {
225-
// when there are no more messages in the queue, back to the processBlocksLoop to wait
246+
// when there are no more messages in the queue, back to the processQueuedBlocks to wait
226247
// for the next incoming message to arrive.
227248
return nil
228249
}
@@ -263,8 +284,6 @@ func (e *Engine) processQueuedBlocks(doneSignal <-chan struct{}) error {
263284
e.submitConnectedBatch(log, latestFinalizedView, batch.OriginID, blocks[indexOfLastConnected:])
264285

265286
e.engMetrics.MessageHandled(metrics.EngineFollower, metrics.MessageBlockProposal)
266-
continue
267-
268287
}
269288
}
270289

engine/common/follower/engine_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,24 @@ func (s *EngineSuite) TestProcessGossipedBlock() {
124124
unittest.AssertClosesBefore(s.T(), done, time.Second)
125125
}
126126

127+
// TestProcessBlockFromComplianceInterface check that processing single gossiped block using compliance interface results in call to FollowerCore.
128+
func (s *EngineSuite) TestProcessBlockFromComplianceInterface() {
129+
block := unittest.BlockWithParentFixture(s.finalized)
130+
131+
originID := unittest.IdentifierFixture()
132+
done := make(chan struct{})
133+
s.core.On("OnBlockRange", originID, []*flow.Block{block}).Return(nil).Run(func(_ mock.Arguments) {
134+
close(done)
135+
}).Once()
136+
137+
s.engine.OnBlockProposal(flow.Slashable[*messages.BlockProposal]{
138+
OriginID: originID,
139+
Message: messages.NewBlockProposal(block),
140+
})
141+
142+
unittest.AssertClosesBefore(s.T(), done, time.Second)
143+
}
144+
127145
// TestProcessBatchOfDisconnectedBlocks tests that processing a batch that consists of one connected range and individual blocks
128146
// results in submitting all of them.
129147
func (s *EngineSuite) TestProcessBatchOfDisconnectedBlocks() {

0 commit comments

Comments
 (0)