Skip to content

Commit 43930a7

Browse files
committed
Updated follower engine to drop blocks too far ahed of local state
1 parent f681cd4 commit 43930a7

File tree

8 files changed

+29
-23
lines changed

8 files changed

+29
-23
lines changed

cmd/access/node_builder/access_node_builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,6 @@ func (builder *FlowAccessNodeBuilder) buildFollowerEngine() *FlowAccessNodeBuild
333333
builder.Validator,
334334
builder.SyncCore,
335335
node.Tracer,
336-
modulecompliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold),
337336
)
338337
if err != nil {
339338
return nil, fmt.Errorf("could not create follower core: %w", err)
@@ -347,6 +346,7 @@ func (builder *FlowAccessNodeBuilder) buildFollowerEngine() *FlowAccessNodeBuild
347346
node.Storage.Headers,
348347
builder.Finalized,
349348
core,
349+
followereng.WithComplianceConfigOpt(modulecompliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold)),
350350
)
351351
if err != nil {
352352
return nil, fmt.Errorf("could not create follower engine: %w", err)

cmd/collection/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,6 @@ func main() {
317317
validator,
318318
mainChainSyncCore,
319319
node.Tracer,
320-
modulecompliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold),
321320
)
322321
if err != nil {
323322
return nil, fmt.Errorf("could not create follower core: %w", err)
@@ -331,6 +330,7 @@ func main() {
331330
node.Storage.Headers,
332331
finalizedHeader.Get(),
333332
core,
333+
followereng.WithComplianceConfigOpt(modulecompliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold)),
334334
)
335335
if err != nil {
336336
return nil, fmt.Errorf("could not create follower engine: %w", err)

cmd/execution_builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -899,7 +899,6 @@ func (exeNode *ExecutionNode) LoadFollowerEngine(
899899
validator,
900900
exeNode.syncCore,
901901
node.Tracer,
902-
compliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold),
903902
)
904903
if err != nil {
905904
return nil, fmt.Errorf("could not create follower core: %w", err)
@@ -913,6 +912,7 @@ func (exeNode *ExecutionNode) LoadFollowerEngine(
913912
node.Storage.Headers,
914913
exeNode.finalizedHeader.Get(),
915914
core,
915+
followereng.WithComplianceConfigOpt(compliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold)),
916916
)
917917
if err != nil {
918918
return nil, fmt.Errorf("could not create follower engine: %w", err)

cmd/observer/node_builder/observer_builder.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import (
3535
"github.com/onflow/flow-go/engine/access/rpc"
3636
"github.com/onflow/flow-go/engine/access/rpc/backend"
3737
"github.com/onflow/flow-go/engine/common/follower"
38-
followereng "github.com/onflow/flow-go/engine/common/follower"
3938
synceng "github.com/onflow/flow-go/engine/common/synchronization"
4039
"github.com/onflow/flow-go/engine/protocol"
4140
"github.com/onflow/flow-go/model/encodable"
@@ -184,7 +183,7 @@ type ObserverServiceBuilder struct {
184183
SyncEngineParticipantsProviderFactory func() module.IdentifierProvider
185184

186185
// engines
187-
FollowerEng *followereng.Engine
186+
FollowerEng *follower.Engine
188187
SyncEng *synceng.Engine
189188

190189
// Public network
@@ -355,7 +354,7 @@ func (builder *ObserverServiceBuilder) buildFollowerEngine() *ObserverServiceBui
355354
if node.HeroCacheMetricsEnable {
356355
heroCacheCollector = metrics.FollowerCacheMetrics(node.MetricsRegisterer)
357356
}
358-
core, err := followereng.NewCore(
357+
core, err := follower.NewCore(
359358
node.Logger,
360359
node.Metrics.Mempool,
361360
heroCacheCollector,
@@ -365,20 +364,20 @@ func (builder *ObserverServiceBuilder) buildFollowerEngine() *ObserverServiceBui
365364
builder.Validator,
366365
builder.SyncCore,
367366
node.Tracer,
368-
compliance.WithSkipNewProposalsThreshold(builder.ComplianceConfig.SkipNewProposalsThreshold),
369367
)
370368
if err != nil {
371369
return nil, fmt.Errorf("could not create follower core: %w", err)
372370
}
373371

374-
builder.FollowerEng, err = followereng.New(
372+
builder.FollowerEng, err = follower.New(
375373
node.Logger,
376374
node.Network,
377375
node.Me,
378376
node.Metrics.Engine,
379377
node.Storage.Headers,
380378
builder.Finalized,
381379
core,
380+
follower.WithComplianceConfigOpt(compliance.WithSkipNewProposalsThreshold(builder.ComplianceConfig.SkipNewProposalsThreshold)),
382381
follower.WithChannel(channels.PublicReceiveBlocks),
383382
)
384383
if err != nil {

cmd/verification_builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,6 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
381381
validator,
382382
syncCore,
383383
node.Tracer,
384-
modulecompliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold),
385384
)
386385
if err != nil {
387386
return nil, fmt.Errorf("could not create follower core: %w", err)
@@ -395,6 +394,7 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
395394
node.Storage.Headers,
396395
finalizedHeader.Get(),
397396
core,
397+
followereng.WithComplianceConfigOpt(modulecompliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold)),
398398
)
399399
if err != nil {
400400
return nil, fmt.Errorf("could not create follower engine: %w", err)

engine/common/follower/core.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"github.com/onflow/flow-go/engine/common/follower/pending_tree"
1515
"github.com/onflow/flow-go/model/flow"
1616
"github.com/onflow/flow-go/module"
17-
"github.com/onflow/flow-go/module/compliance"
1817
"github.com/onflow/flow-go/module/component"
1918
"github.com/onflow/flow-go/module/irrecoverable"
2019
"github.com/onflow/flow-go/module/trace"
@@ -42,7 +41,6 @@ type Core struct {
4241
*component.ComponentManager
4342
log zerolog.Logger
4443
mempoolMetrics module.MempoolMetrics
45-
config compliance.Config
4644
tracer module.Tracer
4745
pendingCache *cache.Cache
4846
pendingTree *pending_tree.PendingTree
@@ -67,17 +65,11 @@ func NewCore(log zerolog.Logger,
6765
validator hotstuff.Validator,
6866
sync module.BlockRequester,
6967
tracer module.Tracer,
70-
opts ...compliance.Opt,
7168
) (*Core, error) {
7269
onEquivocation := func(block, otherBlock *flow.Block) {
7370
finalizationConsumer.OnDoubleProposeDetected(model.BlockFromFlow(block.Header), model.BlockFromFlow(otherBlock.Header))
7471
}
7572

76-
config := compliance.DefaultConfig()
77-
for _, apply := range opts {
78-
apply(&config)
79-
}
80-
8173
finalizedBlock, err := state.Final().Head()
8274
if err != nil {
8375
return nil, fmt.Errorf("could not query finalized block: %w", err)
@@ -93,7 +85,6 @@ func NewCore(log zerolog.Logger,
9385
validator: validator,
9486
sync: sync,
9587
tracer: tracer,
96-
config: config,
9788
certifiedRangesChan: make(chan CertifiedBlocks, defaultCertifiedRangeChannelCapacity),
9889
finalizedBlocksChan: make(chan *flow.Header, defaultFinalizedBlocksChannelCapacity),
9990
}

engine/common/follower/engine.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/onflow/flow-go/model/flow"
1515
"github.com/onflow/flow-go/model/messages"
1616
"github.com/onflow/flow-go/module"
17+
"github.com/onflow/flow-go/module/compliance"
1718
"github.com/onflow/flow-go/module/component"
1819
"github.com/onflow/flow-go/module/irrecoverable"
1920
"github.com/onflow/flow-go/module/metrics"
@@ -31,6 +32,13 @@ func WithChannel(channel channels.Channel) EngineOption {
3132
}
3233
}
3334

35+
// WithComplianceConfigOpt applies compliance config opt to internal config
36+
func WithComplianceConfigOpt(opt compliance.Opt) EngineOption {
37+
return func(e *Engine) {
38+
opt(&e.config)
39+
}
40+
}
41+
3442
// defaultBatchProcessingWorkers number of concurrent workers that process incoming blocks.
3543
const defaultBatchProcessingWorkers = 4
3644

@@ -57,6 +65,7 @@ type Engine struct {
5765
me module.Local
5866
engMetrics module.EngineMetrics
5967
con network.Conduit
68+
config compliance.Config
6069
channel channels.Channel
6170
headers storage.Headers
6271
pendingBlocks *fifoqueue.FifoQueue // queue for processing inbound batches of blocks
@@ -90,6 +99,7 @@ func New(
9099
log: log.With().Str("engine", "follower").Logger(),
91100
me: me,
92101
engMetrics: engMetrics,
102+
config: compliance.DefaultConfig(),
93103
channel: channels.ReceiveBlocks,
94104
pendingBlocks: pendingBlocks,
95105
pendingBlocksNotifier: engine.NewNotifier(),
@@ -276,6 +286,13 @@ func (e *Engine) submitConnectedBatch(log zerolog.Logger, latestFinalizedView ui
276286
log.Debug().Msgf("dropping range [%d, %d] below finalized view %d", blocks[0].Header.View, lastBlock.View, latestFinalizedView)
277287
return
278288
}
289+
if lastBlock.View > latestFinalizedView+e.config.SkipNewProposalsThreshold {
290+
log.Debug().
291+
Uint64("skip_new_proposals_threshold", e.config.SkipNewProposalsThreshold).
292+
Msgf("dropping range [%d, %d] too far ahead of locally finalized view %d",
293+
blocks[0].Header.View, lastBlock.View, latestFinalizedView)
294+
return
295+
}
279296
log.Debug().Msgf("submitting sub-range with views [%d, %d] for further processing", blocks[0].Header.View, lastBlock.View)
280297

281298
select {

follower/follower_builder.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
recovery "github.com/onflow/flow-go/consensus/recovery/protocol"
2626
"github.com/onflow/flow-go/crypto"
2727
"github.com/onflow/flow-go/engine/common/follower"
28-
followereng "github.com/onflow/flow-go/engine/common/follower"
2928
synceng "github.com/onflow/flow-go/engine/common/synchronization"
3029
"github.com/onflow/flow-go/model/encodable"
3130
"github.com/onflow/flow-go/model/flow"
@@ -121,7 +120,7 @@ type FollowerServiceBuilder struct {
121120
SyncEngineParticipantsProviderFactory func() module.IdentifierProvider
122121

123122
// engines
124-
FollowerEng *followereng.Engine
123+
FollowerEng *follower.Engine
125124
SyncEng *synceng.Engine
126125

127126
peerID peer.ID
@@ -232,7 +231,7 @@ func (builder *FollowerServiceBuilder) buildFollowerEngine() *FollowerServiceBui
232231
heroCacheCollector = metrics.FollowerCacheMetrics(node.MetricsRegisterer)
233232
}
234233

235-
core, err := followereng.NewCore(
234+
core, err := follower.NewCore(
236235
node.Logger,
237236
node.Metrics.Mempool,
238237
heroCacheCollector,
@@ -242,13 +241,12 @@ func (builder *FollowerServiceBuilder) buildFollowerEngine() *FollowerServiceBui
242241
builder.Validator,
243242
builder.SyncCore,
244243
node.Tracer,
245-
compliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold),
246244
)
247245
if err != nil {
248246
return nil, fmt.Errorf("could not create follower core: %w", err)
249247
}
250248

251-
builder.FollowerEng, err = followereng.New(
249+
builder.FollowerEng, err = follower.New(
252250
node.Logger,
253251
node.Network,
254252
node.Me,
@@ -257,6 +255,7 @@ func (builder *FollowerServiceBuilder) buildFollowerEngine() *FollowerServiceBui
257255
builder.Finalized,
258256
core,
259257
follower.WithChannel(channels.PublicReceiveBlocks),
258+
follower.WithComplianceConfigOpt(compliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold)),
260259
)
261260
if err != nil {
262261
return nil, fmt.Errorf("could not create follower engine: %w", err)

0 commit comments

Comments
 (0)