Skip to content

Commit 7e4258a

Browse files
authored
Merge pull request #6747 from onflow/tim/7018-pusher-engine-use-componentmanager
Refactor pusher engine part 1 replace engine.Unit with ComponentManager in Pusher Engine
2 parents 8a3055c + 2048b4a commit 7e4258a

File tree

5 files changed

+134
-59
lines changed

5 files changed

+134
-59
lines changed

cmd/collection/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -480,7 +480,7 @@ func main() {
480480
node.EngineRegistry,
481481
node.State,
482482
node.Metrics.Engine,
483-
colMetrics,
483+
node.Metrics.Mempool,
484484
node.Me,
485485
node.Storage.Collections,
486486
node.Storage.Transactions,

engine/collection/pusher/engine.go

Lines changed: 120 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,19 @@
44
package pusher
55

66
import (
7+
"context"
78
"fmt"
89

910
"github.com/rs/zerolog"
1011

1112
"github.com/onflow/flow-go/engine"
13+
"github.com/onflow/flow-go/engine/common/fifoqueue"
1214
"github.com/onflow/flow-go/model/flow"
1315
"github.com/onflow/flow-go/model/flow/filter"
1416
"github.com/onflow/flow-go/model/messages"
1517
"github.com/onflow/flow-go/module"
18+
"github.com/onflow/flow-go/module/component"
19+
"github.com/onflow/flow-go/module/irrecoverable"
1620
"github.com/onflow/flow-go/module/metrics"
1721
"github.com/onflow/flow-go/network"
1822
"github.com/onflow/flow-go/network/channels"
@@ -21,30 +25,60 @@ import (
2125
"github.com/onflow/flow-go/utils/logging"
2226
)
2327

24-
// Engine is the collection pusher engine, which provides access to resources
25-
// held by the collection node.
28+
// Engine is part of the Collection Node. It broadcasts finalized collections
29+
// ("collection guarantees") that the cluster generates to Consensus Nodes
30+
// for inclusion in blocks.
2631
type Engine struct {
27-
unit *engine.Unit
2832
log zerolog.Logger
2933
engMetrics module.EngineMetrics
30-
colMetrics module.CollectionMetrics
3134
conduit network.Conduit
3235
me module.Local
3336
state protocol.State
3437
collections storage.Collections
3538
transactions storage.Transactions
39+
40+
notifier engine.Notifier
41+
queue *fifoqueue.FifoQueue
42+
43+
component.Component
44+
cm *component.ComponentManager
3645
}
3746

38-
func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, engMetrics module.EngineMetrics, colMetrics module.CollectionMetrics, me module.Local, collections storage.Collections, transactions storage.Transactions) (*Engine, error) {
47+
// TODO convert to network.MessageProcessor
48+
var _ network.Engine = (*Engine)(nil)
49+
var _ component.Component = (*Engine)(nil)
50+
51+
// New creates a new pusher engine.
52+
func New(
53+
log zerolog.Logger,
54+
net network.EngineRegistry,
55+
state protocol.State,
56+
engMetrics module.EngineMetrics,
57+
mempoolMetrics module.MempoolMetrics,
58+
me module.Local,
59+
collections storage.Collections,
60+
transactions storage.Transactions,
61+
) (*Engine, error) {
62+
queue, err := fifoqueue.NewFifoQueue(
63+
200, // roughly 1 minute of collections, at 3BPS
64+
fifoqueue.WithLengthObserver(func(len int) {
65+
mempoolMetrics.MempoolEntries(metrics.ResourceSubmitCollectionGuaranteesQueue, uint(len))
66+
}),
67+
)
68+
if err != nil {
69+
return nil, fmt.Errorf("could not create fifoqueue: %w", err)
70+
}
71+
3972
e := &Engine{
40-
unit: engine.NewUnit(),
4173
log: log.With().Str("engine", "pusher").Logger(),
4274
engMetrics: engMetrics,
43-
colMetrics: colMetrics,
4475
me: me,
4576
state: state,
4677
collections: collections,
4778
transactions: transactions,
79+
80+
notifier: engine.NewNotifier(),
81+
queue: queue,
4882
}
4983

5084
conduit, err := net.Register(channels.PushGuarantees, e)
@@ -53,88 +87,117 @@ func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, e
5387
}
5488
e.conduit = conduit
5589

90+
e.cm = component.NewComponentManagerBuilder().
91+
AddWorker(e.outboundQueueWorker).
92+
Build()
93+
e.Component = e.cm
94+
5695
return e, nil
5796
}
5897

59-
// Ready returns a ready channel that is closed once the engine has fully
60-
// started.
61-
func (e *Engine) Ready() <-chan struct{} {
62-
return e.unit.Ready()
98+
// outboundQueueWorker implements a component worker which broadcasts collection guarantees,
99+
// enqueued by the Finalizer upon finalization, to Consensus Nodes.
100+
func (e *Engine) outboundQueueWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
101+
ready()
102+
103+
done := ctx.Done()
104+
wake := e.notifier.Channel()
105+
for {
106+
select {
107+
case <-done:
108+
return
109+
case <-wake:
110+
err := e.processOutboundMessages(ctx)
111+
if err != nil {
112+
ctx.Throw(err)
113+
}
114+
}
115+
}
63116
}
64117

65-
// Done returns a done channel that is closed once the engine has fully stopped.
66-
func (e *Engine) Done() <-chan struct{} {
67-
return e.unit.Done()
118+
// processOutboundMessages processes any available messages from the queue.
119+
// Only returns when the queue is empty (or the engine is terminated).
120+
// No errors expected during normal operations.
121+
func (e *Engine) processOutboundMessages(ctx context.Context) error {
122+
for {
123+
nextMessage, ok := e.queue.Pop()
124+
if !ok {
125+
return nil
126+
}
127+
128+
asSCGMsg, ok := nextMessage.(*messages.SubmitCollectionGuarantee)
129+
if !ok {
130+
return fmt.Errorf("invalid message type in pusher engine queue")
131+
}
132+
133+
err := e.publishCollectionGuarantee(&asSCGMsg.Guarantee)
134+
if err != nil {
135+
return err
136+
}
137+
138+
select {
139+
case <-ctx.Done():
140+
return nil
141+
default:
142+
}
143+
}
68144
}
69145

70146
// SubmitLocal submits an event originating on the local node.
71147
func (e *Engine) SubmitLocal(event interface{}) {
72-
e.unit.Launch(func() {
73-
err := e.process(e.me.NodeID(), event)
74-
if err != nil {
75-
engine.LogError(e.log, err)
76-
}
77-
})
148+
ev, ok := event.(*messages.SubmitCollectionGuarantee)
149+
if ok {
150+
e.SubmitCollectionGuarantee(ev)
151+
} else {
152+
engine.LogError(e.log, fmt.Errorf("invalid message argument to pusher engine"))
153+
}
78154
}
79155

80156
// Submit submits the given event from the node with the given origin ID
81157
// for processing in a non-blocking manner. It returns instantly and logs
82158
// a potential processing error internally when done.
83159
func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) {
84-
e.unit.Launch(func() {
85-
err := e.process(originID, event)
86-
if err != nil {
87-
engine.LogError(e.log, err)
88-
}
89-
})
160+
engine.LogError(e.log, fmt.Errorf("pusher engine should only receive local messages on the same node"))
90161
}
91162

92163
// ProcessLocal processes an event originating on the local node.
93164
func (e *Engine) ProcessLocal(event interface{}) error {
94-
return e.unit.Do(func() error {
95-
return e.process(e.me.NodeID(), event)
96-
})
165+
ev, ok := event.(*messages.SubmitCollectionGuarantee)
166+
if ok {
167+
e.SubmitCollectionGuarantee(ev)
168+
return nil
169+
} else {
170+
return fmt.Errorf("invalid message argument to pusher engine")
171+
}
97172
}
98173

99174
// Process processes the given event from the node with the given origin ID in
100175
// a blocking manner. It returns the potential processing error when done.
101-
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error {
102-
return e.unit.Do(func() error {
103-
return e.process(originID, event)
104-
})
176+
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, message any) error {
177+
return fmt.Errorf("pusher engine should only receive local messages on the same node")
105178
}
106179

107-
// process processes events for the pusher engine on the collection node.
108-
func (e *Engine) process(originID flow.Identifier, event interface{}) error {
109-
switch ev := event.(type) {
110-
case *messages.SubmitCollectionGuarantee:
111-
e.engMetrics.MessageReceived(metrics.EngineCollectionProvider, metrics.MessageSubmitGuarantee)
112-
defer e.engMetrics.MessageHandled(metrics.EngineCollectionProvider, metrics.MessageSubmitGuarantee)
113-
return e.onSubmitCollectionGuarantee(originID, ev)
114-
default:
115-
return fmt.Errorf("invalid event type (%T)", event)
180+
// SubmitCollectionGuarantee adds a collection guarantee to the engine's queue
181+
// to later be published to consensus nodes.
182+
func (e *Engine) SubmitCollectionGuarantee(msg *messages.SubmitCollectionGuarantee) {
183+
if e.queue.Push(msg) {
184+
e.notifier.Notify()
185+
} else {
186+
e.engMetrics.OutboundMessageDropped(metrics.EngineCollectionProvider, metrics.MessageCollectionGuarantee)
116187
}
117188
}
118189

119-
// onSubmitCollectionGuarantee handles submitting the given collection guarantee
120-
// to consensus nodes.
121-
func (e *Engine) onSubmitCollectionGuarantee(originID flow.Identifier, req *messages.SubmitCollectionGuarantee) error {
122-
if originID != e.me.NodeID() {
123-
return fmt.Errorf("invalid remote request to submit collection guarantee (from=%x)", originID)
124-
}
125-
126-
return e.SubmitCollectionGuarantee(&req.Guarantee)
127-
}
128-
129-
// SubmitCollectionGuarantee submits the collection guarantee to all consensus nodes.
130-
func (e *Engine) SubmitCollectionGuarantee(guarantee *flow.CollectionGuarantee) error {
190+
// publishCollectionGuarantee publishes the collection guarantee to all consensus nodes.
191+
// No errors expected during normal operation.
192+
func (e *Engine) publishCollectionGuarantee(guarantee *flow.CollectionGuarantee) error {
131193
consensusNodes, err := e.state.Final().Identities(filter.HasRole[flow.Identity](flow.RoleConsensus))
132194
if err != nil {
133-
return fmt.Errorf("could not get consensus nodes: %w", err)
195+
return fmt.Errorf("could not get consensus nodes' identities: %w", err)
134196
}
135197

136-
// NOTE: Consensus nodes do not broadcast guarantees among themselves, so it needs that
137-
// at least one collection node make a publish to all of them.
198+
// NOTE: Consensus nodes do not broadcast guarantees among themselves. So for the collection to be included,
199+
// at least one collector has to successfully broadcast the collection to consensus nodes. Otherwise, the
200+
// collection is lost, which is acceptable as long as we only lose a small fraction of collections.
138201
err = e.conduit.Publish(guarantee, consensusNodes.NodeIDs()...)
139202
if err != nil {
140203
return fmt.Errorf("could not submit collection guarantee: %w", err)

engine/collection/pusher/engine_test.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package pusher_test
22

33
import (
4+
"context"
45
"io"
56
"testing"
7+
"time"
68

79
"github.com/rs/zerolog"
810
"github.com/stretchr/testify/mock"
@@ -12,6 +14,7 @@ import (
1214
"github.com/onflow/flow-go/model/flow"
1315
"github.com/onflow/flow-go/model/flow/filter"
1416
"github.com/onflow/flow-go/model/messages"
17+
"github.com/onflow/flow-go/module/irrecoverable"
1518
"github.com/onflow/flow-go/module/metrics"
1619
module "github.com/onflow/flow-go/module/mock"
1720
"github.com/onflow/flow-go/network/channels"
@@ -82,19 +85,26 @@ func TestPusherEngine(t *testing.T) {
8285

8386
// should be able to submit collection guarantees to consensus nodes
8487
func (suite *Suite) TestSubmitCollectionGuarantee() {
88+
ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(suite.T(), context.Background())
89+
suite.engine.Start(ctx)
90+
defer cancel()
91+
done := make(chan struct{})
8592

8693
guarantee := unittest.CollectionGuaranteeFixture()
8794

8895
// should submit the collection to consensus nodes
8996
consensus := suite.identities.Filter(filter.HasRole[flow.Identity](flow.RoleConsensus))
90-
suite.conduit.On("Publish", guarantee, consensus[0].NodeID).Return(nil)
97+
suite.conduit.On("Publish", guarantee, consensus[0].NodeID).
98+
Run(func(_ mock.Arguments) { close(done) }).Return(nil).Once()
9199

92100
msg := &messages.SubmitCollectionGuarantee{
93101
Guarantee: *guarantee,
94102
}
95103
err := suite.engine.ProcessLocal(msg)
96104
suite.Require().Nil(err)
97105

106+
unittest.RequireCloseBefore(suite.T(), done, time.Second, "message not sent")
107+
98108
suite.conduit.AssertExpectations(suite.T())
99109
}
100110

engine/testutil/mock/nodes.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ func (n CollectionNode) Start(t *testing.T) {
140140
n.IngestionEngine.Start(n.Ctx)
141141
n.EpochManagerEngine.Start(n.Ctx)
142142
n.ProviderEngine.Start(n.Ctx)
143+
n.PusherEngine.Start(n.Ctx)
143144
}
144145

145146
func (n CollectionNode) Ready() <-chan struct{} {

module/metrics/labels.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ const (
113113
ResourceFollowerLoopCertifiedBlocksChannel = "follower_loop_certified_blocks_channel" // follower loop, certified blocks buffered channel
114114
ResourceClusterBlockProposalQueue = "cluster_compliance_proposal_queue" // collection node, compliance engine
115115
ResourceTransactionIngestQueue = "ingest_transaction_queue" // collection node, ingest engine
116+
ResourceSubmitCollectionGuaranteesQueue = "pusher_col_guarantee_queue" // collection node, pusher engine
116117
ResourceBeaconKey = "beacon-key" // consensus node, DKG engine
117118
ResourceDKGMessage = "dkg_private_message" // consensus, DKG messaging engine
118119
ResourceApprovalQueue = "sealing_approval_queue" // consensus node, sealing engine

0 commit comments

Comments
 (0)