Skip to content

Commit 018512f

Browse files
Merge #4118
4118: [Follower] Skipping proposals too far ahead of locally finalized view r=durkmurder a=durkmurder ### Context This PR removes `compliance.Config` from `follower.Core` and moves it to `follower.Engine` which uses `SkipNewProposalsThreshold` to drop proposals that are too far ahead in future. Co-authored-by: Yurii Oleksyshyn <[email protected]>
2 parents c539928 + dd4c28f commit 018512f

File tree

12 files changed

+36
-30
lines changed

12 files changed

+36
-30
lines changed

cmd/access/node_builder/access_node_builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,6 @@ func (builder *FlowAccessNodeBuilder) buildFollowerEngine() *FlowAccessNodeBuild
334334
builder.Validator,
335335
builder.SyncCore,
336336
node.Tracer,
337-
modulecompliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold),
338337
)
339338
if err != nil {
340339
return nil, fmt.Errorf("could not create follower core: %w", err)
@@ -348,6 +347,7 @@ func (builder *FlowAccessNodeBuilder) buildFollowerEngine() *FlowAccessNodeBuild
348347
node.Storage.Headers,
349348
builder.Finalized,
350349
core,
350+
followereng.WithComplianceConfigOpt(modulecompliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold)),
351351
)
352352
if err != nil {
353353
return nil, fmt.Errorf("could not create follower engine: %w", err)

cmd/collection/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,6 @@ func main() {
317317
validator,
318318
mainChainSyncCore,
319319
node.Tracer,
320-
modulecompliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold),
321320
)
322321
if err != nil {
323322
return nil, fmt.Errorf("could not create follower core: %w", err)
@@ -331,6 +330,7 @@ func main() {
331330
node.Storage.Headers,
332331
finalizedHeader.Get(),
333332
core,
333+
followereng.WithComplianceConfigOpt(modulecompliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold)),
334334
)
335335
if err != nil {
336336
return nil, fmt.Errorf("could not create follower engine: %w", err)

cmd/execution_builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -899,7 +899,6 @@ func (exeNode *ExecutionNode) LoadFollowerEngine(
899899
validator,
900900
exeNode.syncCore,
901901
node.Tracer,
902-
compliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold),
903902
)
904903
if err != nil {
905904
return nil, fmt.Errorf("could not create follower core: %w", err)
@@ -913,6 +912,7 @@ func (exeNode *ExecutionNode) LoadFollowerEngine(
913912
node.Storage.Headers,
914913
exeNode.finalizedHeader.Get(),
915914
core,
915+
followereng.WithComplianceConfigOpt(compliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold)),
916916
)
917917
if err != nil {
918918
return nil, fmt.Errorf("could not create follower engine: %w", err)

cmd/observer/node_builder/observer_builder.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import (
3535
"github.com/onflow/flow-go/engine/access/rpc"
3636
"github.com/onflow/flow-go/engine/access/rpc/backend"
3737
"github.com/onflow/flow-go/engine/common/follower"
38-
followereng "github.com/onflow/flow-go/engine/common/follower"
3938
synceng "github.com/onflow/flow-go/engine/common/synchronization"
4039
"github.com/onflow/flow-go/engine/protocol"
4140
"github.com/onflow/flow-go/model/encodable"
@@ -185,7 +184,7 @@ type ObserverServiceBuilder struct {
185184
SyncEngineParticipantsProviderFactory func() module.IdentifierProvider
186185

187186
// engines
188-
FollowerEng *followereng.ComplianceEngine
187+
FollowerEng *follower.ComplianceEngine
189188
SyncEng *synceng.Engine
190189

191190
// Public network
@@ -356,7 +355,7 @@ func (builder *ObserverServiceBuilder) buildFollowerEngine() *ObserverServiceBui
356355
if node.HeroCacheMetricsEnable {
357356
heroCacheCollector = metrics.FollowerCacheMetrics(node.MetricsRegisterer)
358357
}
359-
core, err := followereng.NewComplianceCore(
358+
core, err := follower.NewComplianceCore(
360359
node.Logger,
361360
node.Metrics.Mempool,
362361
heroCacheCollector,
@@ -366,20 +365,20 @@ func (builder *ObserverServiceBuilder) buildFollowerEngine() *ObserverServiceBui
366365
builder.Validator,
367366
builder.SyncCore,
368367
node.Tracer,
369-
compliance.WithSkipNewProposalsThreshold(builder.ComplianceConfig.SkipNewProposalsThreshold),
370368
)
371369
if err != nil {
372370
return nil, fmt.Errorf("could not create follower core: %w", err)
373371
}
374372

375-
builder.FollowerEng, err = followereng.NewComplianceLayer(
373+
builder.FollowerEng, err = follower.NewComplianceLayer(
376374
node.Logger,
377375
node.Network,
378376
node.Me,
379377
node.Metrics.Engine,
380378
node.Storage.Headers,
381379
builder.Finalized,
382380
core,
381+
follower.WithComplianceConfigOpt(compliance.WithSkipNewProposalsThreshold(builder.ComplianceConfig.SkipNewProposalsThreshold)),
383382
follower.WithChannel(channels.PublicReceiveBlocks),
384383
)
385384
if err != nil {

cmd/verification_builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,6 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
381381
validator,
382382
syncCore,
383383
node.Tracer,
384-
modulecompliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold),
385384
)
386385
if err != nil {
387386
return nil, fmt.Errorf("could not create follower core: %w", err)
@@ -395,6 +394,7 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
395394
node.Storage.Headers,
396395
finalizedHeader.Get(),
397396
core,
397+
followereng.WithComplianceConfigOpt(modulecompliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold)),
398398
)
399399
if err != nil {
400400
return nil, fmt.Errorf("could not create follower engine: %w", err)

engine/collection/compliance/core.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,10 @@ func (c *Core) OnBlockProposal(originID flow.Identifier, proposal *messages.Clus
150150
// ignore proposals which are too far ahead of our local finalized state
151151
// instead, rely on sync engine to catch up finalization more effectively, and avoid
152152
// large subtree of blocks to be cached.
153-
if header.Height > finalHeight+c.config.SkipNewProposalsThreshold {
153+
if header.View > finalView+c.config.SkipNewProposalsThreshold {
154154
log.Debug().
155155
Uint64("skip_new_proposals_threshold", c.config.SkipNewProposalsThreshold).
156-
Msg("dropping block too far ahead of locally finalized height")
156+
Msg("dropping block too far ahead of locally finalized view")
157157
return nil
158158
}
159159

engine/common/follower/compliance_core.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/onflow/flow-go/engine/common/follower/pending_tree"
1414
"github.com/onflow/flow-go/model/flow"
1515
"github.com/onflow/flow-go/module"
16-
"github.com/onflow/flow-go/module/compliance"
1716
"github.com/onflow/flow-go/module/component"
1817
"github.com/onflow/flow-go/module/irrecoverable"
1918
"github.com/onflow/flow-go/module/trace"
@@ -41,7 +40,6 @@ type ComplianceCore struct {
4140
*component.ComponentManager
4241
log zerolog.Logger
4342
mempoolMetrics module.MempoolMetrics
44-
config compliance.Config
4543
tracer module.Tracer
4644
pendingCache *cache.Cache
4745
pendingTree *pending_tree.PendingTree
@@ -66,17 +64,11 @@ func NewComplianceCore(log zerolog.Logger,
6664
validator hotstuff.Validator,
6765
sync module.BlockRequester,
6866
tracer module.Tracer,
69-
opts ...compliance.Opt,
7067
) (*ComplianceCore, error) {
7168
onEquivocation := func(block, otherBlock *flow.Block) {
7269
finalizationConsumer.OnDoubleProposeDetected(model.BlockFromFlow(block.Header), model.BlockFromFlow(otherBlock.Header))
7370
}
7471

75-
config := compliance.DefaultConfig()
76-
for _, apply := range opts {
77-
apply(&config)
78-
}
79-
8072
finalizedBlock, err := state.Final().Head()
8173
if err != nil {
8274
return nil, fmt.Errorf("could not query finalized block: %w", err)
@@ -92,7 +84,6 @@ func NewComplianceCore(log zerolog.Logger,
9284
validator: validator,
9385
sync: sync,
9486
tracer: tracer,
95-
config: config,
9687
certifiedRangesChan: make(chan CertifiedBlocks, defaultCertifiedRangeChannelCapacity),
9788
finalizedBlocksChan: make(chan *flow.Header, defaultFinalizedBlocksChannelCapacity),
9889
}

engine/common/follower/compliance_engine.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/onflow/flow-go/model/flow"
1414
"github.com/onflow/flow-go/model/messages"
1515
"github.com/onflow/flow-go/module"
16+
"github.com/onflow/flow-go/module/compliance"
1617
"github.com/onflow/flow-go/module/component"
1718
"github.com/onflow/flow-go/module/irrecoverable"
1819
"github.com/onflow/flow-go/module/metrics"
@@ -30,6 +31,13 @@ func WithChannel(channel channels.Channel) EngineOption {
3031
}
3132
}
3233

34+
// WithComplianceConfigOpt applies compliance config opt to internal config
35+
func WithComplianceConfigOpt(opt compliance.Opt) EngineOption {
36+
return func(e *ComplianceEngine) {
37+
opt(&e.config)
38+
}
39+
}
40+
3341
// defaultBatchProcessingWorkers number of concurrent workers that process incoming blocks.
3442
const defaultBatchProcessingWorkers = 4
3543

@@ -65,6 +73,7 @@ type ComplianceEngine struct {
6573
me module.Local
6674
engMetrics module.EngineMetrics
6775
con network.Conduit
76+
config compliance.Config
6877
channel channels.Channel
6978
headers storage.Headers
7079
pendingProposals *fifoqueue.FifoQueue // queue for fresh proposals
@@ -106,6 +115,7 @@ func NewComplianceLayer(
106115
log: log.With().Str("engine", "follower").Logger(),
107116
me: me,
108117
engMetrics: engMetrics,
118+
config: compliance.DefaultConfig(),
109119
channel: channels.ReceiveBlocks,
110120
pendingProposals: pendingBlocks,
111121
syncedBlocks: syncedBlocks,
@@ -327,6 +337,13 @@ func (e *ComplianceEngine) submitConnectedBatch(log zerolog.Logger, latestFinali
327337
log.Debug().Msgf("dropping range [%d, %d] below finalized view %d", blocks[0].Header.View, lastBlock.View, latestFinalizedView)
328338
return
329339
}
340+
if lastBlock.View > latestFinalizedView+e.config.SkipNewProposalsThreshold {
341+
log.Debug().
342+
Uint64("skip_new_proposals_threshold", e.config.SkipNewProposalsThreshold).
343+
Msgf("dropping range [%d, %d] too far ahead of locally finalized view %d",
344+
blocks[0].Header.View, lastBlock.View, latestFinalizedView)
345+
return
346+
}
330347
log.Debug().Msgf("submitting sub-range with views [%d, %d] for further processing", blocks[0].Header.View, lastBlock.View)
331348

332349
select {

engine/consensus/compliance/core.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,10 +158,10 @@ func (c *Core) OnBlockProposal(originID flow.Identifier, proposal *messages.Bloc
158158
// ignore proposals which are too far ahead of our local finalized state
159159
// instead, rely on sync engine to catch up finalization more effectively, and avoid
160160
// large subtree of blocks to be cached.
161-
if header.Height > finalHeight+c.config.SkipNewProposalsThreshold {
161+
if header.View > finalView+c.config.SkipNewProposalsThreshold {
162162
log.Debug().
163163
Uint64("skip_new_proposals_threshold", c.config.SkipNewProposalsThreshold).
164-
Msg("dropping block too far ahead of locally finalized height")
164+
Msg("dropping block too far ahead of locally finalized view")
165165
return nil
166166
}
167167

engine/consensus/compliance/core_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ func (cs *CoreSuite) TestOnBlockProposalSkipProposalThreshold() {
321321
// create a proposal which is far enough ahead to be dropped
322322
originID := cs.participants[1].NodeID
323323
block := unittest.BlockFixture()
324-
block.Header.Height = cs.head.Height + compliance.DefaultConfig().SkipNewProposalsThreshold + 1
324+
block.Header.View = cs.head.View + compliance.DefaultConfig().SkipNewProposalsThreshold + 1
325325
proposal := unittest.ProposalFromBlock(&block)
326326

327327
err := cs.core.OnBlockProposal(originID, proposal)

0 commit comments

Comments
 (0)