Skip to content

Commit 0b1de87

Browse files
authored
Merge pull request #4107 from onflow/alex/6493-follower-core_-_suggestions1
suggestions for PR 4094
2 parents 3dc0b7c + 76ebb03 commit 0b1de87

File tree

9 files changed

+58
-64
lines changed

9 files changed

+58
-64
lines changed

cmd/access/node_builder/access_node_builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ func (builder *FlowAccessNodeBuilder) buildFollowerEngine() *FlowAccessNodeBuild
333333
builder.Validator,
334334
builder.SyncCore,
335335
node.Tracer,
336-
followereng.WithComplianceOptions(modulecompliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold)),
336+
modulecompliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold),
337337
)
338338
if err != nil {
339339
return nil, fmt.Errorf("could not create follower core: %w", err)

cmd/collection/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ func main() {
317317
validator,
318318
mainChainSyncCore,
319319
node.Tracer,
320-
followereng.WithComplianceOptions(modulecompliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold)),
320+
modulecompliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold),
321321
)
322322
if err != nil {
323323
return nil, fmt.Errorf("could not create follower core: %w", err)

cmd/execution_builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -899,7 +899,7 @@ func (exeNode *ExecutionNode) LoadFollowerEngine(
899899
validator,
900900
exeNode.syncCore,
901901
node.Tracer,
902-
followereng.WithComplianceOptions(compliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold)),
902+
compliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold),
903903
)
904904
if err != nil {
905905
return nil, fmt.Errorf("could not create follower core: %w", err)

cmd/observer/node_builder/observer_builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ func (builder *ObserverServiceBuilder) buildFollowerEngine() *ObserverServiceBui
365365
builder.Validator,
366366
builder.SyncCore,
367367
node.Tracer,
368-
follower.WithComplianceOptions(compliance.WithSkipNewProposalsThreshold(builder.ComplianceConfig.SkipNewProposalsThreshold)),
368+
compliance.WithSkipNewProposalsThreshold(builder.ComplianceConfig.SkipNewProposalsThreshold),
369369
)
370370
if err != nil {
371371
return nil, fmt.Errorf("could not create follower core: %w", err)

cmd/verification_builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
381381
validator,
382382
syncCore,
383383
node.Tracer,
384-
followereng.WithComplianceOptions(modulecompliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold)),
384+
modulecompliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold),
385385
)
386386
if err != nil {
387387
return nil, fmt.Errorf("could not create follower core: %w", err)

engine/common/follower/core.go

Lines changed: 47 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,6 @@ import (
2121
"github.com/onflow/flow-go/state/protocol"
2222
)
2323

24-
type ComplianceOption func(*Core)
25-
26-
// WithComplianceOptions sets options for the core's compliance config
27-
func WithComplianceOptions(opts ...compliance.Opt) ComplianceOption {
28-
return func(c *Core) {
29-
for _, apply := range opts {
30-
apply(&c.config)
31-
}
32-
}
33-
}
34-
3524
// CertifiedBlocks is a connected list of certified blocks, in ascending height order.
3625
type CertifiedBlocks []pending_tree.CertifiedBlock
3726

@@ -78,11 +67,17 @@ func NewCore(log zerolog.Logger,
7867
validator hotstuff.Validator,
7968
sync module.BlockRequester,
8069
tracer module.Tracer,
81-
opts ...ComplianceOption) (*Core, error) {
70+
opts ...compliance.Opt,
71+
) (*Core, error) {
8272
onEquivocation := func(block, otherBlock *flow.Block) {
8373
finalizationConsumer.OnDoubleProposeDetected(model.BlockFromFlow(block.Header), model.BlockFromFlow(otherBlock.Header))
8474
}
8575

76+
config := compliance.DefaultConfig()
77+
for _, apply := range opts {
78+
apply(&config)
79+
}
80+
8681
finalizedBlock, err := state.Final().Head()
8782
if err != nil {
8883
return nil, fmt.Errorf("could not query finalized block: %w", err)
@@ -98,15 +93,11 @@ func NewCore(log zerolog.Logger,
9893
validator: validator,
9994
sync: sync,
10095
tracer: tracer,
101-
config: compliance.DefaultConfig(),
96+
config: config,
10297
certifiedRangesChan: make(chan CertifiedBlocks, defaultCertifiedRangeChannelCapacity),
10398
finalizedBlocksChan: make(chan *flow.Header, defaultFinalizedBlocksChannelCapacity),
10499
}
105100

106-
for _, apply := range opts {
107-
apply(c)
108-
}
109-
110101
// prune cache to latest finalized view
111102
c.pendingCache.PruneUpToView(finalizedBlock.View)
112103

@@ -118,11 +109,11 @@ func NewCore(log zerolog.Logger,
118109
}
119110

120111
// OnBlockRange processes a range of connected blocks. The input list must be sequentially ordered forming a chain.
121-
// Submitting batch with invalid order results in error, such batch will be discarded and exception will be returned.
122-
// Effectively, this function validates incoming batch, adds it to cache of pending blocks and possibly schedules blocks for further
123-
// processing if they were certified.
124-
// This function is safe to use in concurrent environment.
125-
// Caution: this function might block if internally too many certified blocks are queued in the channel `certifiedRangesChan`.
112+
// Effectively, this method validates the incoming batch, adds it to cache of pending blocks and possibly schedules
113+
// blocks for further processing if they were certified. Submitting a batch with invalid causes an
114+
// `ErrDisconnectedBatch` error and the batch is dropped (no-op).
115+
// This method is safe to use in concurrent environment.
116+
// Caution: method might block if internally too many certified blocks are queued in the channel `certifiedRangesChan`.
126117
// Expected errors during normal operations:
127118
// - cache.ErrDisconnectedBatch
128119
func (c *Core) OnBlockRange(originID flow.Identifier, batch []*flow.Block) error {
@@ -148,12 +139,12 @@ func (c *Core) OnBlockRange(originID flow.Identifier, batch []*flow.Block) error
148139

149140
if c.pendingCache.Peek(hotstuffProposal.Block.BlockID) == nil {
150141
log.Debug().Msg("block not found in cache, performing validation")
151-
// Caution: we are _not_ verifying the proposal's full validity here. Instead, we need to check
142+
// Caution: we are _not_ checking the proposal's full validity here. Instead, we need to check
152143
// the following two critical properties:
153144
// 1. The block has been signed by the legitimate primary for the view. This is important in case
154145
// there are multiple blocks for the view. We need to differentiate the following byzantine cases:
155146
// (i) Some other consensus node that is _not_ primary is trying to publish a block.
156-
// This would result in the validation below failing with and `InvalidBlockError`.
147+
// This would result in the validation below failing with an `InvalidBlockError`.
157148
// (ii) The legitimate primary for the view is equivocating. In this case, the validity check
158149
// below would pass. Though, the `PendingTree` would eventually notice this, when we connect
159150
// the equivocating blocks to the latest finalized block.
@@ -208,7 +199,7 @@ func (c *Core) OnBlockRange(originID flow.Identifier, batch []*flow.Block) error
208199

209200
// processCoreSeqEvents processes events that need to be dispatched on dedicated core's goroutine.
210201
// Here we process events that need to be sequentially ordered(processing certified blocks and new finalized blocks).
211-
// Is NOT concurrency safe, has to be used by only one internal worker goroutine.
202+
// Is NOT concurrency safe: should be executed by _single dedicated_ goroutine.
212203
func (c *Core) processCoreSeqEvents(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
213204
ready()
214205

@@ -247,52 +238,48 @@ func (c *Core) OnFinalizedBlock(final *flow.Header) {
247238
}
248239
}
249240

250-
// processCertifiedBlocks process a batch of certified blocks by adding them to the tree of pending blocks.
251-
// As soon as tree returns a range of connected and certified blocks they will be added to the protocol state.
252-
// Is NOT concurrency safe, has to be used by internal goroutine.
241+
// processCertifiedBlocks processes the batch of certified blocks:
242+
// 1. We add the certified blocks to the PendingTree. This might causes the pending PendingTree to detect
243+
// additional blocks as now being connected to the latest finalized block. Specifically, the PendingTree
244+
// returns the list `connectedBlocks`, which contains the subset of `blocks` that are connect to the
245+
// finalized block plus all of their connected descendants. The list `connectedBlocks` is in 'parent first'
246+
// order, i.e. a block is listed before any of its descendants. The PendingTree guarantees that all
247+
// ancestors are listed, _unless_ the ancestor is the finalized block or the ancestor has been returned
248+
// by a previous call to `PendingTree.AddBlocks`.
249+
// 2. We extend the protocol state with the connected certified blocks from step 1.
250+
// 3. We submit the connected certified blocks from step 1 to the consensus follower.
251+
//
252+
// Is NOT concurrency safe: should be executed by _single dedicated_ goroutine.
253253
// No errors expected during normal operations.
254254
func (c *Core) processCertifiedBlocks(ctx context.Context, blocks CertifiedBlocks) error {
255255
span, ctx := c.tracer.StartSpanFromContext(ctx, trace.FollowerProcessCertifiedBlocks)
256256
defer span.End()
257257

258+
// Step 1: add blocks to our PendingTree of certified blocks
259+
pendingTreeSpan, _ := c.tracer.StartSpanFromContext(ctx, trace.FollowerExtendPendingTree)
258260
connectedBlocks, err := c.pendingTree.AddBlocks(blocks)
261+
defer pendingTreeSpan.End()
259262
if err != nil {
260263
return fmt.Errorf("could not process batch of certified blocks: %w", err)
261264
}
262-
err = c.extendCertifiedBlocks(ctx, connectedBlocks)
263-
if err != nil {
264-
return fmt.Errorf("could not extend protocol state: %w", err)
265-
}
266-
return nil
267-
}
268-
269-
// extendCertifiedBlocks processes a connected range of certified blocks by applying them to protocol state.
270-
// As result of this operation we might extend protocol state.
271-
// Is NOT concurrency safe, has to be used by internal goroutine.
272-
// No errors expected during normal operations.
273-
func (c *Core) extendCertifiedBlocks(parentCtx context.Context, connectedBlocks CertifiedBlocks) error {
274-
span, parentCtx := c.tracer.StartSpanFromContext(parentCtx, trace.FollowerExtendCertifiedBlocks)
275-
defer span.End()
276265

266+
// Step 2 & 3: extend protocol state with connected certified blocks and forward them to consensus follower
277267
for _, certifiedBlock := range connectedBlocks {
278-
span, ctx := c.tracer.StartBlockSpan(parentCtx, certifiedBlock.ID(), trace.FollowerExtendCertified)
279-
err := c.state.ExtendCertified(ctx, certifiedBlock.Block, certifiedBlock.QC)
280-
span.End()
268+
s, _ := c.tracer.StartBlockSpan(ctx, certifiedBlock.ID(), trace.FollowerExtendCertified)
269+
err = c.state.ExtendCertified(ctx, certifiedBlock.Block, certifiedBlock.QC)
270+
s.End()
281271
if err != nil {
282272
return fmt.Errorf("could not extend protocol state with certified block: %w", err)
283273
}
284274

285275
hotstuffProposal := model.ProposalFromFlow(certifiedBlock.Block.Header)
286-
// submit the model to follower for async processing
287-
c.follower.SubmitProposal(hotstuffProposal)
276+
c.follower.SubmitProposal(hotstuffProposal) // submit the model to follower for async processing
288277
}
289278
return nil
290279
}
291280

292-
// processFinalizedBlock processes new finalized block by applying to the PendingTree.
293-
// Potentially PendingTree can resolve blocks that previously were not connected. Those blocks will be applied to the
294-
// protocol state, resulting in extending length of chain.
295-
// Is NOT concurrency safe, has to be used by internal goroutine.
281+
// processFinalizedBlock informs the PendingTree about finalization of the given block.
282+
// Is NOT concurrency safe: should be executed by _single dedicated_ goroutine.
296283
// No errors expected during normal operations.
297284
func (c *Core) processFinalizedBlock(ctx context.Context, finalized *flow.Header) error {
298285
span, ctx := c.tracer.StartSpanFromContext(ctx, trace.FollowerProcessFinalizedBlock)
@@ -302,9 +289,15 @@ func (c *Core) processFinalizedBlock(ctx context.Context, finalized *flow.Header
302289
if err != nil {
303290
return fmt.Errorf("could not process finalized fork at view %d: %w", finalized.View, err)
304291
}
305-
err = c.extendCertifiedBlocks(ctx, connectedBlocks)
306-
if err != nil {
307-
return fmt.Errorf("could not extend protocol state during finalization: %w", err)
292+
// The pending tree allows to skip ahead, which makes the algorithm more general and simplifies its implementation.
293+
// However, here we are implementing the consensus follower, which cannot skip ahead. This is because the consensus
294+
// follower locally determines finality and therefore must ingest every block. In other words: ever block that is
295+
// later finalized must have been connected before. Otherwise, the block would never have been forwarded to the
296+
// HotStuff follower and no finalization notification would have been triggered.
297+
// Therefore, from the perspective of the consensus follower, receiving a _non-empty_ `connectedBlocks` is a
298+
// symptom of internal state corruption or a bug.
299+
if len(connectedBlocks) > 0 {
300+
return fmt.Errorf("finalizing block %v caused the PendingTree to connect additional blocks, which is a symptom of internal state corruption or a bug", finalized.ID())
308301
}
309302
return nil
310303
}

follower/follower_builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ func (builder *FollowerServiceBuilder) buildFollowerEngine() *FollowerServiceBui
242242
builder.Validator,
243243
builder.SyncCore,
244244
node.Tracer,
245-
followereng.WithComplianceOptions(compliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold)),
245+
compliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold),
246246
)
247247
if err != nil {
248248
return nil, fmt.Errorf("could not create follower core: %w", err)

module/compliance/config.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ const MinSkipNewProposalsThreshold = 1000
55
// Config is shared config for consensus and collection compliance engines, and
66
// the consensus follower engine.
77
type Config struct {
8-
// SkipNewProposalsThreshold defines the threshold where, if we observe a new
9-
// proposal which is this far behind our local latest finalized, we drop the
10-
// proposal rather than cache it.
8+
// SkipNewProposalsThreshold defines the threshold for dropping blocks that are too far in
9+
// the future. Formally, let `H` be the height of the latest finalized block known to this
10+
// node. A new block `B` is dropped without further processing, if
11+
// B.Height > H + SkipNewProposalsThreshold
1112
SkipNewProposalsThreshold uint64
1213
}
1314

module/trace/constants.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ const (
5555
// Follower Core
5656
FollowerProcessFinalizedBlock SpanName = "follower.processFinalizedBlock"
5757
FollowerProcessCertifiedBlocks SpanName = "follower.processCertifiedBlocks"
58-
FollowerExtendCertifiedBlocks SpanName = "follower.extendCertifiedBlocks"
58+
FollowerExtendPendingTree SpanName = "follower.extendPendingTree"
5959
FollowerExtendCertified SpanName = "follower.extendCertified"
6060

6161
// Collection Node

0 commit comments

Comments
 (0)