44package pusher
55
66import (
7+ "context"
78 "fmt"
89
910 "github.com/rs/zerolog"
1011
1112 "github.com/onflow/flow-go/engine"
13+ "github.com/onflow/flow-go/engine/common/fifoqueue"
1214 "github.com/onflow/flow-go/model/flow"
1315 "github.com/onflow/flow-go/model/flow/filter"
14- "github.com/onflow/flow-go/model/messages"
1516 "github.com/onflow/flow-go/module"
17+ "github.com/onflow/flow-go/module/component"
18+ "github.com/onflow/flow-go/module/irrecoverable"
1619 "github.com/onflow/flow-go/module/metrics"
1720 "github.com/onflow/flow-go/network"
1821 "github.com/onflow/flow-go/network/channels"
@@ -21,30 +24,59 @@ import (
2124 "github.com/onflow/flow-go/utils/logging"
2225)
2326
24- // Engine is the collection pusher engine, which provides access to resources
25- // held by the collection node.
27+ // Engine is part of the Collection Node. It broadcasts finalized collections
28+ // ("collection guarantees") that the cluster generates to Consensus Nodes
29+ // for inclusion in blocks.
2630type Engine struct {
27- unit * engine.Unit
2831 log zerolog.Logger
2932 engMetrics module.EngineMetrics
30- colMetrics module.CollectionMetrics
3133 conduit network.Conduit
3234 me module.Local
3335 state protocol.State
3436 collections storage.Collections
3537 transactions storage.Transactions
38+
39+ notifier engine.Notifier
40+ queue * fifoqueue.FifoQueue
41+
42+ component.Component
43+ cm * component.ComponentManager
3644}
3745
38- func New (log zerolog.Logger , net network.EngineRegistry , state protocol.State , engMetrics module.EngineMetrics , colMetrics module.CollectionMetrics , me module.Local , collections storage.Collections , transactions storage.Transactions ) (* Engine , error ) {
46+ var _ network.MessageProcessor = (* Engine )(nil )
47+ var _ component.Component = (* Engine )(nil )
48+
49+ // New creates a new pusher engine.
50+ func New (
51+ log zerolog.Logger ,
52+ net network.EngineRegistry ,
53+ state protocol.State ,
54+ engMetrics module.EngineMetrics ,
55+ mempoolMetrics module.MempoolMetrics ,
56+ me module.Local ,
57+ collections storage.Collections ,
58+ transactions storage.Transactions ,
59+ ) (* Engine , error ) {
60+ queue , err := fifoqueue .NewFifoQueue (
61+ 200 , // roughly 1 minute of collections, at 3BPS
62+ fifoqueue .WithLengthObserver (func (len int ) {
63+ mempoolMetrics .MempoolEntries (metrics .ResourceSubmitCollectionGuaranteesQueue , uint (len ))
64+ }),
65+ )
66+ if err != nil {
67+ return nil , fmt .Errorf ("could not create fifoqueue: %w" , err )
68+ }
69+
3970 e := & Engine {
40- unit : engine .NewUnit (),
4171 log : log .With ().Str ("engine" , "pusher" ).Logger (),
4272 engMetrics : engMetrics ,
43- colMetrics : colMetrics ,
4473 me : me ,
4574 state : state ,
4675 collections : collections ,
4776 transactions : transactions ,
77+
78+ notifier : engine .NewNotifier (),
79+ queue : queue ,
4880 }
4981
5082 conduit , err := net .Register (channels .PushGuarantees , e )
@@ -53,88 +85,105 @@ func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, e
5385 }
5486 e .conduit = conduit
5587
88+ e .cm = component .NewComponentManagerBuilder ().
89+ AddWorker (e .outboundQueueWorker ).
90+ Build ()
91+ e .Component = e .cm
92+
5693 return e , nil
5794}
5895
59- // Ready returns a ready channel that is closed once the engine has fully
60- // started.
61- func (e * Engine ) Ready () <- chan struct {} {
62- return e .unit .Ready ()
96+ // outboundQueueWorker implements a component worker which broadcasts collection guarantees,
97+ // enqueued by the Finalizer upon finalization, to Consensus Nodes.
98+ func (e * Engine ) outboundQueueWorker (ctx irrecoverable.SignalerContext , ready component.ReadyFunc ) {
99+ ready ()
100+
101+ done := ctx .Done ()
102+ wake := e .notifier .Channel ()
103+ for {
104+ select {
105+ case <- done :
106+ return
107+ case <- wake :
108+ err := e .processOutboundMessages (ctx )
109+ if err != nil {
110+ ctx .Throw (err )
111+ }
112+ }
113+ }
63114}
64115
65- // Done returns a done channel that is closed once the engine has fully stopped.
66- func (e * Engine ) Done () <- chan struct {} {
67- return e .unit .Done ()
68- }
116+ // processOutboundMessages processes any available messages from the queue.
117+ // Only returns when the queue is empty (or the engine is terminated).
118+ // No errors expected during normal operations.
119+ func (e * Engine ) processOutboundMessages (ctx context.Context ) error {
120+ for {
121+ item , ok := e .queue .Pop ()
122+ if ! ok {
123+ return nil
124+ }
69125
70- // SubmitLocal submits an event originating on the local node.
71- func (e * Engine ) SubmitLocal (event interface {}) {
72- e .unit .Launch (func () {
73- err := e .process (e .me .NodeID (), event )
74- if err != nil {
75- engine .LogError (e .log , err )
126+ guarantee , ok := item .(* flow.CollectionGuarantee )
127+ if ! ok {
128+ return fmt .Errorf ("invalid type in pusher engine queue" )
76129 }
77- })
78- }
79130
80- // Submit submits the given event from the node with the given origin ID
81- // for processing in a non-blocking manner. It returns instantly and logs
82- // a potential processing error internally when done.
83- func (e * Engine ) Submit (channel channels.Channel , originID flow.Identifier , event interface {}) {
84- e .unit .Launch (func () {
85- err := e .process (originID , event )
131+ err := e .publishCollectionGuarantee (guarantee )
86132 if err != nil {
87- engine . LogError ( e . log , err )
133+ return err
88134 }
89- })
90- }
91-
92- // ProcessLocal processes an event originating on the local node.
93- func (e * Engine ) ProcessLocal (event interface {}) error {
94- return e .unit .Do (func () error {
95- return e .process (e .me .NodeID (), event )
96- })
97- }
98135
99- // Process processes the given event from the node with the given origin ID in
100- // a blocking manner. It returns the potential processing error when done.
101- func ( e * Engine ) Process ( channel channels. Channel , originID flow. Identifier , event interface {}) error {
102- return e . unit . Do ( func () error {
103- return e . process ( originID , event )
104- })
136+ select {
137+ case <- ctx . Done ():
138+ return nil
139+ default :
140+ }
141+ }
105142}
106143
107- // process processes events for the pusher engine on the collection node.
108- func (e * Engine ) process (originID flow.Identifier , event interface {}) error {
109- switch ev := event .(type ) {
110- case * messages.SubmitCollectionGuarantee :
111- e .engMetrics .MessageReceived (metrics .EngineCollectionProvider , metrics .MessageSubmitGuarantee )
112- defer e .engMetrics .MessageHandled (metrics .EngineCollectionProvider , metrics .MessageSubmitGuarantee )
113- return e .onSubmitCollectionGuarantee (originID , ev )
114- default :
115- return fmt .Errorf ("invalid event type (%T)" , event )
116- }
144+ // Process is called by the networking layer, when peers broadcast messages with this node
145+ // as one of the recipients. The protocol specifies that Collector nodes broadcast Collection
146+ // Guarantees to Consensus Nodes and _only_ those. When the pusher engine (running only on
147+ // Collectors) receives a message, this message is evidence of byzantine behavior.
148+ // Byzantine inputs are internally handled by the pusher.Engine and do *not* result in
149+ // error returns. No errors expected during normal operation (including byzantine inputs).
150+ func (e * Engine ) Process (channel channels.Channel , originID flow.Identifier , message any ) error {
151+ // Targeting a collector node's pusher.Engine with messages could be considered as a slashable offense.
152+ // Though, for generating cryptographic evidence, we need Message Forensics - see reference [1].
153+ // Much further into the future, when we are implementing slashing challenges, we'll probably implement a
154+ // dedicated consumer to post-process evidence of protocol violations into slashing challenges. For now,
155+ // we just log this with the `KeySuspicious` to alert the node operator.
156+ // [1] Message Forensics FLIP https://github.com/onflow/flips/pull/195)
157+ errs := fmt .Errorf ("collector node's pusher.Engine was targeted by message %T on channel %v" , message , channel )
158+ e .log .Warn ().
159+ Err (errs ).
160+ Bool (logging .KeySuspicious , true ).
161+ Str ("peer_id" , originID .String ()).
162+ Msg ("potentially byzantine networking traffic detected" )
163+ return nil
117164}
118165
119- // onSubmitCollectionGuarantee handles submitting the given collection guarantee
120- // to consensus nodes.
121- func (e * Engine ) onSubmitCollectionGuarantee (originID flow.Identifier , req * messages.SubmitCollectionGuarantee ) error {
122- if originID != e .me .NodeID () {
123- return fmt .Errorf ("invalid remote request to submit collection guarantee (from=%x)" , originID )
166+ // SubmitCollectionGuarantee adds a collection guarantee to the engine's queue
167+ // to later be published to consensus nodes.
168+ func (e * Engine ) SubmitCollectionGuarantee (guarantee * flow.CollectionGuarantee ) {
169+ if e .queue .Push (guarantee ) {
170+ e .notifier .Notify ()
171+ } else {
172+ e .engMetrics .OutboundMessageDropped (metrics .EngineCollectionProvider , metrics .MessageCollectionGuarantee )
124173 }
125-
126- return e .SubmitCollectionGuarantee (& req .Guarantee )
127174}
128175
129- // SubmitCollectionGuarantee submits the collection guarantee to all consensus nodes.
130- func (e * Engine ) SubmitCollectionGuarantee (guarantee * flow.CollectionGuarantee ) error {
176+ // publishCollectionGuarantee publishes the collection guarantee to all consensus nodes.
177+ // No errors expected during normal operation.
178+ func (e * Engine ) publishCollectionGuarantee (guarantee * flow.CollectionGuarantee ) error {
131179 consensusNodes , err := e .state .Final ().Identities (filter.HasRole [flow.Identity ](flow .RoleConsensus ))
132180 if err != nil {
133- return fmt .Errorf ("could not get consensus nodes: %w" , err )
181+ return fmt .Errorf ("could not get consensus nodes' identities : %w" , err )
134182 }
135183
136- // NOTE: Consensus nodes do not broadcast guarantees among themselves, so it needs that
137- // at least one collection node make a publish to all of them.
184+ // NOTE: Consensus nodes do not broadcast guarantees among themselves. So for the collection to be included,
185+ // at least one collector has to successfully broadcast the collection to consensus nodes. Otherwise, the
186+ // collection is lost, which is acceptable as long as we only lose a small fraction of collections.
138187 err = e .conduit .Publish (guarantee , consensusNodes .NodeIDs ()... )
139188 if err != nil {
140189 return fmt .Errorf ("could not submit collection guarantee: %w" , err )
0 commit comments