@@ -34,9 +34,13 @@ func WithChannel(channel channels.Channel) EngineOption {
3434const defaultBatchProcessingWorkers = 4
3535
3636// defaultPendingBlockQueueCapacity maximum capacity of inbound queue for blocks directly received from other nodes.
37+ // Small capacity is suitable here, as there will be hardly any pending blocks during normal operations. If the node
38+ // is so overloaded that it can't keep up with the newest blocks within 10 seconds (processing them with priority),
39+ // it is probably better to fall back on synchronization anyway.
3740const defaultPendingBlockQueueCapacity = 10
3841
3942// defaultSyncedBlockQueueCapacity maximum capacity of inbound queue for batches of synced blocks.
43+ // While catching up, we want to be able to buffer a bit larger amount of work.
4044const defaultSyncedBlockQueueCapacity = 100
4145
4246// defaultPendingConnectedBlocksChanCapacity capacity of buffered channel that is used to receive pending blocks that form a sequence.
@@ -61,8 +65,8 @@ type ComplianceEngine struct {
6165 con network.Conduit
6266 channel channels.Channel
6367 headers storage.Headers
64- pendingBlocks * fifoqueue.FifoQueue // queue for processing inbound blocks
65- syncedBlocks * fifoqueue.FifoQueue // queue for processing inbound batches of blocks
68+ pendingProposals * fifoqueue.FifoQueue // queue for fresh proposals
69+ syncedBlocks * fifoqueue.FifoQueue // queue for processing inbound batches of synced blocks
6670 blocksAvailableNotifier engine.Notifier // notifies that new blocks are ready to be processed
6771 finalizedBlockTracker * tracker.NewestBlockTracker // tracks the latest finalization block
6872 finalizedBlockNotifier engine.Notifier // notifies when the latest finalized block changes
@@ -99,7 +103,7 @@ func NewComplianceLayer(
99103 me : me ,
100104 engMetrics : engMetrics ,
101105 channel : channels .ReceiveBlocks ,
102- pendingBlocks : pendingBlocks ,
106+ pendingProposals : pendingBlocks ,
103107 syncedBlocks : syncedBlocks ,
104108 blocksAvailableNotifier : engine .NewNotifier (),
105109 pendingConnectedBlocksChan : make (chan flow.Slashable [[]* flow.Block ], defaultPendingConnectedBlocksChanCapacity ),
@@ -147,16 +151,21 @@ func NewComplianceLayer(
147151 return e , nil
148152}
149153
150- // OnBlockProposal performs processing of incoming block by pushing into queue and notifying worker.
154+ // OnBlockProposal queues *untrusted* proposals for further processing and notifies the Engine's
155+ // internal workers. This method is intended for fresh proposals received directly from leaders.
156+ // It can ingest synced blocks as well, but is less performant compared to method `OnSyncedBlocks`.
151157func (e * ComplianceEngine ) OnBlockProposal (proposal flow.Slashable [* messages.BlockProposal ]) {
152158 e .engMetrics .MessageReceived (metrics .EngineFollower , metrics .MessageBlockProposal )
153159 // queue proposal
154- if e .pendingBlocks .Push (proposal ) {
160+ if e .pendingProposals .Push (proposal ) {
155161 e .blocksAvailableNotifier .Notify ()
156162 }
157163}
158164
159- // OnSyncedBlocks consumes incoming blocks by pushing into queue and notifying worker.
165+ // OnSyncedBlocks is an optimized consumer for *untrusted* synced blocks. It is specifically
166+ // efficient for batches of continuously connected blocks (honest nodes supply finalized blocks
167+ // in suitable sequences where possible). Nevertheless, the method tolerates blocks in arbitrary
168+ // order (less efficient), making it robust against byzantine nodes.
160169func (e * ComplianceEngine ) OnSyncedBlocks (blocks flow.Slashable [[]* messages.BlockProposal ]) {
161170 e .engMetrics .MessageReceived (metrics .EngineFollower , metrics .MessageSyncedBlocks )
162171 // The synchronization engine feeds the follower with batches of blocks. The field `Slashable.OriginID`
@@ -167,11 +176,13 @@ func (e *ComplianceEngine) OnSyncedBlocks(blocks flow.Slashable[[]*messages.Bloc
167176 }
168177}
169178
170- // OnFinalizedBlock implements the `OnFinalizedBlock` callback from the `hotstuff.FinalizationConsumer`
171- // It informs follower.ComplianceCore about finalization of the respective block.
179+ // OnFinalizedBlock informs the compliance layer about finalization of a new block. It does not block
180+ // and asynchronously executes the internal pruning logic. We accept inputs out of order, and only act
181+ // on inputs with strictly monotonously increasing views.
172182//
183+ // Implements the `OnFinalizedBlock` callback from the `hotstuff.FinalizationConsumer`
173184// CAUTION: the input to this callback is treated as trusted; precautions should be taken that messages
174- // from external nodes cannot be considered as inputs to this function
185+ // from external nodes cannot be considered as inputs to this function.
175186func (e * ComplianceEngine ) OnFinalizedBlock (block * model.Block ) {
176187 if e .finalizedBlockTracker .Track (block ) {
177188 e .finalizedBlockNotifier .Notify ()
@@ -180,6 +191,8 @@ func (e *ComplianceEngine) OnFinalizedBlock(block *model.Block) {
180191
181192// Process processes the given event from the node with the given origin ID in
182193// a blocking manner. It returns the potential processing error when done.
194+ // This method is intended to be used as a callback by the networking layer,
195+ // notifying us about fresh proposals directly from the consensus leaders.
183196func (e * ComplianceEngine ) Process (channel channels.Channel , originID flow.Identifier , message interface {}) error {
184197 switch msg := message .(type ) {
185198 case * messages.BlockProposal :
@@ -215,6 +228,17 @@ func (e *ComplianceEngine) processBlocksLoop(ctx irrecoverable.SignalerContext,
215228
216229// processQueuedBlocks processes any available messages until the message queue is empty.
217230// Only returns when all inbound queues are empty (or the engine is terminated).
231+ // Prioritization: In a nutshell, we prioritize the resilience of the happy path over
232+ // performance gains on the recovery path. Details:
233+ // - We prioritize new proposals. Thereby, it becomes much harder for a malicious node
234+ // to overwhelm another node through synchronization messages and drown out new blocks
235+ // for a node that is up-to-date.
236+ // - On the flip side, new proposals are relatively infrequent compared to the load that
237+ // synchronization produces for a note that is catching up. In other words, prioritizing
238+ // the few new proposals first is probably not going to be much of a distraction.
239+ // Proposals too far in the future are dropped (see parameter `SkipNewProposalsThreshold`
240+ // in `compliance.Config`), to prevent memory overflow.
241+ //
218242// No errors are expected during normal operation. All returned exceptions are potential
219243// symptoms of internal state corruption and should be fatal.
220244func (e * ComplianceEngine ) processQueuedBlocks (doneSignal <- chan struct {}) error {
@@ -225,7 +249,8 @@ func (e *ComplianceEngine) processQueuedBlocks(doneSignal <-chan struct{}) error
225249 default :
226250 }
227251
228- msg , ok := e .pendingBlocks .Pop ()
252+ // Priority 1: ingest fresh proposals
253+ msg , ok := e .pendingProposals .Pop ()
229254 if ok {
230255 blockMsg := msg .(flow.Slashable [* messages.BlockProposal ])
231256 block := blockMsg .Message .Block .ToInternal ()
@@ -241,6 +266,7 @@ func (e *ComplianceEngine) processQueuedBlocks(doneSignal <-chan struct{}) error
241266 continue
242267 }
243268
269+ // Priority 2: ingest synced blocks
244270 msg , ok = e .syncedBlocks .Pop ()
245271 if ! ok {
246272 // when there are no more messages in the queue, back to the processQueuedBlocks to wait
0 commit comments