Skip to content

Commit c844a6d

Browse files
authored
Merge branch 'master' into janez/public-port-7125-master
2 parents 8137ddf + 280df4e commit c844a6d

File tree

69 files changed

+1142
-616
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

69 files changed

+1142
-616
lines changed

cmd/access/node_builder/access_node_builder.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -502,12 +502,12 @@ func (builder *FlowAccessNodeBuilder) buildFollowerEngine() *FlowAccessNodeBuild
502502
node.Storage.Headers,
503503
builder.Finalized,
504504
core,
505+
builder.FollowerDistributor,
505506
node.ComplianceConfig,
506507
)
507508
if err != nil {
508509
return nil, fmt.Errorf("could not create follower engine: %w", err)
509510
}
510-
builder.FollowerDistributor.AddOnBlockFinalizedConsumer(builder.FollowerEng.OnFinalizedBlock)
511511

512512
return builder.FollowerEng, nil
513513
})
@@ -532,12 +532,12 @@ func (builder *FlowAccessNodeBuilder) buildSyncEngine() *FlowAccessNodeBuilder {
532532
builder.SyncCore,
533533
builder.SyncEngineParticipantsProviderFactory(),
534534
spamConfig,
535+
builder.FollowerDistributor,
535536
)
536537
if err != nil {
537538
return nil, fmt.Errorf("could not create synchronization engine: %w", err)
538539
}
539540
builder.SyncEng = sync
540-
builder.FollowerDistributor.AddFinalizationConsumer(sync)
541541

542542
return builder.SyncEng, nil
543543
})
@@ -755,14 +755,13 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
755755
builder.Storage.Headers,
756756
builder.executionDataConfig,
757757
execDataDistributor,
758+
builder.FollowerDistributor,
758759
)
759760
if err != nil {
760761
return nil, fmt.Errorf("failed to create execution data requester: %w", err)
761762
}
762763
builder.ExecutionDataRequester = r
763764

764-
builder.FollowerDistributor.AddOnBlockFinalizedConsumer(builder.ExecutionDataRequester.OnBlockFinalized)
765-
766765
// add requester into ReadyDoneAware dependency passed to indexer. This allows the indexer
767766
// to wait for the requester to be ready before starting.
768767
requesterDependable.Init(builder.ExecutionDataRequester)
@@ -2178,6 +2177,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
21782177
notNil(builder.stateStreamBackend),
21792178
builder.stateStreamConf,
21802179
indexReporter,
2180+
builder.FollowerDistributor,
21812181
)
21822182
if err != nil {
21832183
return nil, err
@@ -2190,7 +2190,6 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
21902190
if err != nil {
21912191
return nil, err
21922192
}
2193-
builder.FollowerDistributor.AddOnBlockFinalizedConsumer(builder.RpcEng.OnFinalizedBlock)
21942193

21952194
return builder.RpcEng, nil
21962195
}).
@@ -2278,14 +2277,14 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
22782277
notNil(builder.CollectionIndexer),
22792278
notNil(builder.collectionExecutedMetric),
22802279
notNil(builder.TxResultErrorMessagesCore),
2280+
builder.FollowerDistributor,
22812281
)
22822282
if err != nil {
22832283
return nil, err
22842284
}
22852285
builder.IngestEng = ingestEng
22862286

22872287
ingestionDependable.Init(builder.IngestEng)
2288-
builder.FollowerDistributor.AddOnBlockFinalizedConsumer(builder.IngestEng.OnFinalizedBlock)
22892288

22902289
return builder.IngestEng, nil
22912290
})
@@ -2322,11 +2321,11 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
23222321
node.Storage.Headers,
23232322
processedTxErrorMessagesBlockHeight,
23242323
builder.TxResultErrorMessagesCore,
2324+
builder.FollowerDistributor,
23252325
)
23262326
if err != nil {
23272327
return nil, err
23282328
}
2329-
builder.FollowerDistributor.AddOnBlockFinalizedConsumer(engine.OnFinalizedBlock)
23302329

23312330
return engine, nil
23322331
})
@@ -2342,11 +2341,11 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
23422341
node.State,
23432342
node.Storage.Blocks,
23442343
builder.SyncCore,
2344+
builder.FollowerDistributor,
23452345
)
23462346
if err != nil {
23472347
return nil, fmt.Errorf("could not create public sync request handler: %w", err)
23482348
}
2349-
builder.FollowerDistributor.AddFinalizationConsumer(syncRequestHandler)
23502349

23512350
return syncRequestHandler, nil
23522351
})

cmd/collection/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -435,12 +435,12 @@ func main() {
435435
node.Storage.Headers,
436436
node.LastFinalizedHeader,
437437
core,
438+
followerDistributor,
438439
node.ComplianceConfig,
439440
)
440441
if err != nil {
441442
return nil, fmt.Errorf("could not create follower engine: %w", err)
442443
}
443-
followerDistributor.AddOnBlockFinalizedConsumer(followerEng.OnFinalizedBlock)
444444

445445
return followerEng, nil
446446
}).
@@ -462,11 +462,11 @@ func main() {
462462
mainChainSyncCore,
463463
node.SyncEngineIdentifierProvider,
464464
spamConfig,
465+
followerDistributor,
465466
)
466467
if err != nil {
467468
return nil, fmt.Errorf("could not create synchronization engine: %w", err)
468469
}
469-
followerDistributor.AddFinalizationConsumer(sync)
470470

471471
return sync, nil
472472
}).

cmd/consensus/main.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -478,15 +478,12 @@ func main() {
478478
chunkAssigner,
479479
seals,
480480
getSealingConfigs,
481+
followerDistributor,
481482
)
482483
if err != nil {
483484
return nil, fmt.Errorf("could not initialize sealing engine: %w", err)
484485
}
485486

486-
// subscribe for finalization events from hotstuff
487-
followerDistributor.AddOnBlockFinalizedConsumer(e.OnFinalizedBlock)
488-
followerDistributor.AddOnBlockIncorporatedConsumer(e.OnBlockIncorporated)
489-
490487
return e, err
491488
}).
492489
Component("matching engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
@@ -532,15 +529,14 @@ func main() {
532529
node.Storage.Receipts,
533530
node.Storage.Index,
534531
core,
532+
followerDistributor,
535533
)
536534
if err != nil {
537535
return nil, err
538536
}
539537

540538
// subscribe engine to inputs from other node-internal components
541539
receiptRequester.WithHandle(e.HandleReceipt)
542-
followerDistributor.AddOnBlockFinalizedConsumer(e.OnFinalizedBlock)
543-
followerDistributor.AddOnBlockIncorporatedConsumer(e.OnBlockIncorporated)
544540

545541
return e, err
546542
}).
@@ -606,7 +602,6 @@ func main() {
606602
// create consensus logger
607603
logger := createLogger(node.Logger, node.RootChainID)
608604

609-
telemetryConsumer := notifications.NewTelemetryConsumer(logger)
610605
slashingViolationConsumer := notifications.NewSlashingViolationsConsumer(nodeBuilder.Logger)
611606
followerDistributor.AddProposalViolationConsumer(slashingViolationConsumer)
612607

@@ -616,9 +611,11 @@ func main() {
616611
mainMetrics,
617612
)
618613

614+
telemetryConsumer := notifications.NewTelemetryConsumer(logger, notifier)
615+
616+
// TODO(leo): move these to NewTelemetryConsumer
619617
notifier.AddParticipantConsumer(telemetryConsumer)
620618
notifier.AddCommunicatorConsumer(telemetryConsumer)
621-
notifier.AddFinalizationConsumer(telemetryConsumer)
622619
notifier.AddFollowerConsumer(followerDistributor)
623620

624621
// initialize the persister
@@ -711,12 +708,11 @@ func main() {
711708
if err != nil {
712709
return nil, fmt.Errorf("could not load liveness data: %w", err)
713710
}
714-
ctl, err := cruisectl.NewBlockTimeController(node.Logger, metrics.NewCruiseCtlMetrics(), cruiseCtlConfig, node.State, livenessData.CurrentView)
711+
ctl, err := cruisectl.NewBlockTimeController(node.Logger, metrics.NewCruiseCtlMetrics(), cruiseCtlConfig, node.State, livenessData.CurrentView, hotstuffModules.Notifier)
715712
if err != nil {
716713
return nil, err
717714
}
718715
proposalDurProvider = ctl
719-
hotstuffModules.Notifier.AddOnBlockIncorporatedConsumer(ctl.OnBlockIncorporated)
720716
node.ProtocolEvents.AddConsumer(ctl)
721717

722718
// set up admin commands for dynamically updating configs
@@ -840,11 +836,11 @@ func main() {
840836
logger,
841837
node.Me,
842838
complianceCore,
839+
followerDistributor,
843840
)
844841
if err != nil {
845842
return nil, fmt.Errorf("could not initialize compliance engine: %w", err)
846843
}
847-
followerDistributor.AddOnBlockFinalizedConsumer(comp.OnFinalizedBlock)
848844

849845
return comp, nil
850846
}).
@@ -884,11 +880,11 @@ func main() {
884880
syncCore,
885881
node.SyncEngineIdentifierProvider,
886882
spamConfig,
883+
followerDistributor,
887884
)
888885
if err != nil {
889886
return nil, fmt.Errorf("could not initialize synchronization engine: %w", err)
890887
}
891-
followerDistributor.AddFinalizationConsumer(sync)
892888

893889
return sync, nil
894890
}).

cmd/execution_builder.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1267,12 +1267,12 @@ func (exeNode *ExecutionNode) LoadFollowerEngine(
12671267
node.Storage.Headers,
12681268
node.LastFinalizedHeader,
12691269
core,
1270+
exeNode.followerDistributor,
12701271
node.ComplianceConfig,
12711272
)
12721273
if err != nil {
12731274
return nil, fmt.Errorf("could not create follower engine: %w", err)
12741275
}
1275-
exeNode.followerDistributor.AddOnBlockFinalizedConsumer(exeNode.followerEng.OnFinalizedBlock)
12761276

12771277
return exeNode.followerEng, nil
12781278
}
@@ -1350,11 +1350,11 @@ func (exeNode *ExecutionNode) LoadSynchronizationEngine(
13501350
exeNode.syncCore,
13511351
node.SyncEngineIdentifierProvider,
13521352
spamConfig,
1353+
exeNode.followerDistributor,
13531354
)
13541355
if err != nil {
13551356
return nil, fmt.Errorf("could not initialize synchronization engine: %w", err)
13561357
}
1357-
exeNode.followerDistributor.AddFinalizationConsumer(exeNode.syncEngine)
13581358

13591359
return exeNode.syncEngine, nil
13601360
}

cmd/observer/node_builder/observer_builder.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -520,14 +520,13 @@ func (builder *ObserverServiceBuilder) buildFollowerEngine() *ObserverServiceBui
520520
node.Storage.Headers,
521521
builder.Finalized,
522522
core,
523+
builder.FollowerDistributor,
523524
builder.ComplianceConfig,
524525
follower.WithChannel(channels.PublicReceiveBlocks),
525526
)
526527
if err != nil {
527528
return nil, fmt.Errorf("could not create follower engine: %w", err)
528529
}
529-
builder.FollowerDistributor.
530-
AddOnBlockFinalizedConsumer(builder.FollowerEng.OnFinalizedBlock)
531530

532531
return builder.FollowerEng, nil
533532
})
@@ -553,12 +552,12 @@ func (builder *ObserverServiceBuilder) buildSyncEngine() *ObserverServiceBuilder
553552
builder.SyncCore,
554553
builder.SyncEngineParticipantsProviderFactory(),
555554
spamConfig,
555+
builder.FollowerDistributor,
556556
)
557557
if err != nil {
558558
return nil, fmt.Errorf("could not create synchronization engine: %w", err)
559559
}
560560
builder.SyncEng = sync
561-
builder.FollowerDistributor.AddFinalizationConsumer(sync)
562561

563562
return builder.SyncEng, nil
564563
})
@@ -1298,14 +1297,13 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
12981297
builder.Storage.Headers,
12991298
builder.executionDataConfig,
13001299
execDataDistributor,
1300+
builder.FollowerDistributor,
13011301
)
13021302
if err != nil {
13031303
return nil, fmt.Errorf("failed to create execution data requester: %w", err)
13041304
}
13051305
builder.ExecutionDataRequester = r
13061306

1307-
builder.FollowerDistributor.AddOnBlockFinalizedConsumer(builder.ExecutionDataRequester.OnBlockFinalized)
1308-
13091307
// add requester into ReadyDoneAware dependency passed to indexer. This allows the indexer
13101308
// to wait for the requester to be ready before starting.
13111309
requesterDependable.Init(builder.ExecutionDataRequester)
@@ -2064,6 +2062,7 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
20642062
builder.stateStreamBackend,
20652063
builder.stateStreamConf,
20662064
indexReporter,
2065+
builder.FollowerDistributor,
20672066
)
20682067
if err != nil {
20692068
return nil, err
@@ -2091,7 +2090,6 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
20912090
if err != nil {
20922091
return nil, err
20932092
}
2094-
builder.FollowerDistributor.AddOnBlockFinalizedConsumer(builder.RpcEng.OnFinalizedBlock)
20952093
return builder.RpcEng, nil
20962094
})
20972095

cmd/verification_builder.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,8 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
322322
node.Storage.Blocks,
323323
node.State,
324324
assignerEngine,
325-
v.verConf.blockWorkers)
325+
v.verConf.blockWorkers,
326+
followerDistributor)
326327

327328
if err != nil {
328329
return nil, fmt.Errorf("could not initialize block consumer: %w", err)
@@ -359,8 +360,6 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
359360
return nil, fmt.Errorf("could not find latest finalized block and pending blocks to recover consensus follower: %w", err)
360361
}
361362

362-
followerDistributor.AddOnBlockFinalizedConsumer(blockConsumer.OnFinalizedBlock)
363-
364363
// creates a consensus follower with ingestEngine as the notifier
365364
// so that it gets notified upon each new finalized block
366365
followerCore, err = flowconsensus.NewFollower(
@@ -414,12 +413,12 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
414413
node.Storage.Headers,
415414
node.LastFinalizedHeader,
416415
core,
416+
followerDistributor,
417417
node.ComplianceConfig,
418418
)
419419
if err != nil {
420420
return nil, fmt.Errorf("could not create follower engine: %w", err)
421421
}
422-
followerDistributor.AddOnBlockFinalizedConsumer(followerEng.OnFinalizedBlock)
423422

424423
return followerEng, nil
425424
}).
@@ -440,11 +439,11 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
440439
syncCore,
441440
node.SyncEngineIdentifierProvider,
442441
spamConfig,
442+
followerDistributor,
443443
)
444444
if err != nil {
445445
return nil, fmt.Errorf("could not create synchronization engine: %w", err)
446446
}
447-
followerDistributor.AddFinalizationConsumer(sync)
448447

449448
return sync, nil
450449
})

consensus/hotstuff/cruisectl/block_time_controller.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ var _ protocol.Consumer = (*BlockTimeController)(nil)
114114
var _ component.Component = (*BlockTimeController)(nil)
115115

116116
// NewBlockTimeController returns a new BlockTimeController.
117-
func NewBlockTimeController(log zerolog.Logger, metrics module.CruiseCtlMetrics, config *Config, state protocol.State, curView uint64) (*BlockTimeController, error) {
117+
func NewBlockTimeController(log zerolog.Logger, metrics module.CruiseCtlMetrics, config *Config, state protocol.State, curView uint64, registrar hotstuff.FinalizationRegistrar) (*BlockTimeController, error) {
118118
// Initial error must be 0 unless we are making assumptions of the prior history of the proportional error `e[v]`
119119
initProptlErr, initItgErr, initDrivErr := .0, .0, .0
120120
proportionalErr, err := NewEwma(config.alpha(), initProptlErr)
@@ -156,6 +156,8 @@ func NewBlockTimeController(log zerolog.Logger, metrics module.CruiseCtlMetrics,
156156
ctl.metrics.ControllerOutput(0)
157157
ctl.metrics.TargetProposalDuration(0)
158158

159+
registrar.AddOnBlockIncorporatedConsumer(ctl.OnBlockIncorporated)
160+
159161
return ctl, nil
160162
}
161163

consensus/hotstuff/cruisectl/block_time_controller_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"go.uber.org/atomic"
1515

1616
"github.com/onflow/flow-go/consensus/hotstuff/model"
17+
"github.com/onflow/flow-go/consensus/hotstuff/notifications/pubsub"
1718
"github.com/onflow/flow-go/model/flow"
1819
"github.com/onflow/flow-go/module/irrecoverable"
1920
mockmodule "github.com/onflow/flow-go/module/mock"
@@ -112,7 +113,8 @@ func setupMocks(bs *BlockTimeControllerSuite) {
112113
// CreateAndStartController creates and starts the BlockTimeController.
113114
// Should be called only once per test case.
114115
func (bs *BlockTimeControllerSuite) CreateAndStartController() {
115-
ctl, err := NewBlockTimeController(unittest.Logger(), &bs.metrics, bs.config, &bs.state, bs.initialView)
116+
followerDistributor := pubsub.NewFollowerDistributor()
117+
ctl, err := NewBlockTimeController(unittest.Logger(), &bs.metrics, bs.config, &bs.state, bs.initialView, followerDistributor)
116118
require.NoError(bs.T(), err)
117119
bs.ctl = ctl
118120
bs.ctl.Start(bs.ctx)

0 commit comments

Comments
 (0)