@@ -28,6 +28,7 @@ import (
2828 "github.com/onflow/flow-go/consensus"
2929 "github.com/onflow/flow-go/consensus/hotstuff"
3030 "github.com/onflow/flow-go/consensus/hotstuff/committees"
31+ "github.com/onflow/flow-go/consensus/hotstuff/notifications"
3132 consensuspubsub "github.com/onflow/flow-go/consensus/hotstuff/notifications/pubsub"
3233 "github.com/onflow/flow-go/consensus/hotstuff/signature"
3334 hotstuffvalidator "github.com/onflow/flow-go/consensus/hotstuff/validator"
@@ -39,17 +40,15 @@ import (
3940 "github.com/onflow/flow-go/engine/access/rpc"
4041 "github.com/onflow/flow-go/engine/access/rpc/backend"
4142 "github.com/onflow/flow-go/engine/access/state_stream"
42- "github.com/onflow/flow-go/engine/common/follower"
4343 followereng "github.com/onflow/flow-go/engine/common/follower"
4444 "github.com/onflow/flow-go/engine/common/requester"
4545 synceng "github.com/onflow/flow-go/engine/common/synchronization"
4646 "github.com/onflow/flow-go/model/flow"
4747 "github.com/onflow/flow-go/model/flow/filter"
4848 "github.com/onflow/flow-go/module"
4949 "github.com/onflow/flow-go/module/blobs"
50- "github.com/onflow/flow-go/module/buffer"
5150 "github.com/onflow/flow-go/module/chainsync"
52- "github.com/onflow/flow-go/module/compliance"
51+ modulecompliance "github.com/onflow/flow-go/module/compliance"
5352 "github.com/onflow/flow-go/module/executiondatasync/execution_data"
5453 finalizer "github.com/onflow/flow-go/module/finalizer/consensus"
5554 "github.com/onflow/flow-go/module/id"
@@ -69,6 +68,7 @@ import (
6968 "github.com/onflow/flow-go/network/p2p/dht"
7069 "github.com/onflow/flow-go/network/p2p/middleware"
7170 "github.com/onflow/flow-go/network/p2p/p2pbuilder"
71+ "github.com/onflow/flow-go/network/p2p/p2pbuilder/inspector"
7272 "github.com/onflow/flow-go/network/p2p/subscription"
7373 "github.com/onflow/flow-go/network/p2p/tracer"
7474 "github.com/onflow/flow-go/network/p2p/translator"
@@ -223,7 +223,7 @@ type FlowAccessNodeBuilder struct {
223223 // engines
224224 IngestEng * ingestion.Engine
225225 RequestEng * requester.Engine
226- FollowerEng * followereng.Engine
226+ FollowerEng * followereng.ComplianceEngine
227227 SyncEng * synceng.Engine
228228 StateStreamEng * state_stream.Engine
229229}
@@ -237,7 +237,15 @@ func (builder *FlowAccessNodeBuilder) buildFollowerState() *FlowAccessNodeBuilde
237237 return fmt .Errorf ("only implementations of type badger.State are currently supported but read-only state has type %T" , node .State )
238238 }
239239
240- followerState , err := badgerState .NewFollowerState (state , node .Storage .Index , node .Storage .Payloads , node .Tracer , node .ProtocolEvents , blocktimer .DefaultBlockTimer )
240+ followerState , err := badgerState .NewFollowerState (
241+ node .Logger ,
242+ node .Tracer ,
243+ node .ProtocolEvents ,
244+ state ,
245+ node .Storage .Index ,
246+ node .Storage .Payloads ,
247+ blocktimer .DefaultBlockTimer ,
248+ )
241249 builder .FollowerState = followerState
242250
243251 return err
@@ -319,31 +327,39 @@ func (builder *FlowAccessNodeBuilder) buildFollowerCore() *FlowAccessNodeBuilder
319327
320328func (builder * FlowAccessNodeBuilder ) buildFollowerEngine () * FlowAccessNodeBuilder {
321329 builder .Component ("follower engine" , func (node * cmd.NodeConfig ) (module.ReadyDoneAware , error ) {
322- // initialize cleaner for DB
323- cleaner := bstorage .NewCleaner (node .Logger , node .DB , builder .Metrics .CleanCollector , flow .DefaultValueLogGCFrequency )
324- conCache := buffer .NewPendingBlocks ()
330+ var heroCacheCollector module.HeroCacheMetrics = metrics .NewNoopCollector ()
331+ if node .HeroCacheMetricsEnable {
332+ heroCacheCollector = metrics .FollowerCacheMetrics (node .MetricsRegisterer )
333+ }
325334
326- followerEng , err := follower . New (
335+ core , err := followereng . NewComplianceCore (
327336 node .Logger ,
328- node .Network ,
329- node .Me ,
330- node .Metrics .Engine ,
331337 node .Metrics .Mempool ,
332- cleaner ,
333- node .Storage .Headers ,
334- node .Storage .Payloads ,
338+ heroCacheCollector ,
339+ builder .FinalizationDistributor ,
335340 builder .FollowerState ,
336- conCache ,
337341 builder .FollowerCore ,
338342 builder .Validator ,
339343 builder .SyncCore ,
340344 node .Tracer ,
341- follower .WithComplianceOptions (compliance .WithSkipNewProposalsThreshold (builder .ComplianceConfig .SkipNewProposalsThreshold )),
345+ )
346+ if err != nil {
347+ return nil , fmt .Errorf ("could not create follower core: %w" , err )
348+ }
349+
350+ builder .FollowerEng , err = followereng .NewComplianceLayer (
351+ node .Logger ,
352+ node .Network ,
353+ node .Me ,
354+ node .Metrics .Engine ,
355+ node .Storage .Headers ,
356+ builder .Finalized ,
357+ core ,
358+ followereng .WithComplianceConfigOpt (modulecompliance .WithSkipNewProposalsThreshold (node .ComplianceConfig .SkipNewProposalsThreshold )),
342359 )
343360 if err != nil {
344361 return nil , fmt .Errorf ("could not create follower engine: %w" , err )
345362 }
346- builder .FollowerEng = followerEng
347363
348364 return builder .FollowerEng , nil
349365 })
@@ -560,10 +576,12 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionDataRequester() *FlowAccessN
560576}
561577
562578func FlowAccessNode (nodeBuilder * cmd.FlowNodeBuilder ) * FlowAccessNodeBuilder {
579+ dist := consensuspubsub .NewFinalizationDistributor ()
580+ dist .AddConsumer (notifications .NewSlashingViolationsConsumer (nodeBuilder .Logger ))
563581 return & FlowAccessNodeBuilder {
564582 AccessNodeConfig : DefaultAccessNodeConfig (),
565583 FlowNodeBuilder : nodeBuilder ,
566- FinalizationDistributor : consensuspubsub . NewFinalizationDistributor () ,
584+ FinalizationDistributor : dist ,
567585 }
568586}
569587
@@ -1011,7 +1029,7 @@ func (builder *FlowAccessNodeBuilder) enqueuePublicNetworkInit() {
10111029 }).
10121030 Component ("public libp2p node" , func (node * cmd.NodeConfig ) (module.ReadyDoneAware , error ) {
10131031
1014- libP2PFactory := builder .initLibP2PFactory (builder .NodeConfig .NetworkKey , builder .PublicNetworkConfig .BindAddress , builder .PublicNetworkConfig .Metrics )
1032+ libP2PFactory := builder .initPublicLibP2PFactory (builder .NodeConfig .NetworkKey , builder .PublicNetworkConfig .BindAddress , builder .PublicNetworkConfig .Metrics )
10151033
10161034 var err error
10171035 libp2pNode , err = libP2PFactory ()
@@ -1057,15 +1075,15 @@ func (builder *FlowAccessNodeBuilder) enqueuePublicNetworkInit() {
10571075 })
10581076}
10591077
1060- // initLibP2PFactory creates the LibP2P factory function for the given node ID and network key.
1078+ // initPublicLibP2PFactory creates the LibP2P factory function for the given node ID and network key.
10611079// The factory function is later passed into the initMiddleware function to eventually instantiate the p2p.LibP2PNode instance
10621080// The LibP2P host is created with the following options:
10631081// - DHT as server
10641082// - The address from the node config or the specified bind address as the listen address
10651083// - The passed in private key as the libp2p key
10661084// - No connection gater
10671085// - Default Flow libp2p pubsub options
1068- func (builder * FlowAccessNodeBuilder ) initLibP2PFactory (networkKey crypto.PrivateKey , bindAddress string , networkMetrics module.LibP2PMetrics ) p2p.LibP2PFactoryFunc {
1086+ func (builder * FlowAccessNodeBuilder ) initPublicLibP2PFactory (networkKey crypto.PrivateKey , bindAddress string , networkMetrics module.LibP2PMetrics ) p2p.LibP2PFactoryFunc {
10691087 return func () (p2p.LibP2PNode , error ) {
10701088 connManager , err := connection .NewConnManager (builder .Logger , networkMetrics , builder .ConnectionManagerConfig )
10711089 if err != nil {
@@ -1078,6 +1096,16 @@ func (builder *FlowAccessNodeBuilder) initLibP2PFactory(networkKey crypto.Privat
10781096 builder .IdentityProvider ,
10791097 builder .GossipSubConfig .LocalMeshLogInterval )
10801098
1099+ // setup RPC inspectors
1100+ rpcInspectorBuilder := inspector .NewGossipSubInspectorBuilder (builder .Logger , builder .SporkID , builder .GossipSubRPCInspectorsConfig , builder .GossipSubInspectorNotifDistributor )
1101+ rpcInspectors , err := rpcInspectorBuilder .
1102+ SetPublicNetwork (p2p .PublicNetworkEnabled ).
1103+ SetMetrics (builder .Metrics .Network , builder .MetricsRegisterer ).
1104+ SetMetricsEnabled (builder .MetricsEnabled ).Build ()
1105+ if err != nil {
1106+ return nil , fmt .Errorf ("failed to create gossipsub rpc inspectors: %w" , err )
1107+ }
1108+
10811109 libp2pNode , err := p2pbuilder .NewNodeBuilder (
10821110 builder .Logger ,
10831111 networkMetrics ,
@@ -1107,6 +1135,7 @@ func (builder *FlowAccessNodeBuilder) initLibP2PFactory(networkKey crypto.Privat
11071135 SetStreamCreationRetryInterval (builder .UnicastCreateStreamRetryDelay ).
11081136 SetGossipSubTracer (meshTracer ).
11091137 SetGossipSubScoreTracerInterval (builder .GossipSubConfig .ScoreTracerInterval ).
1138+ SetGossipSubRPCInspectors (rpcInspectors ... ).
11101139 Build ()
11111140
11121141 if err != nil {
0 commit comments