Skip to content

Commit 1fc3a19

Browse files
committed
Refactor pusher engine: change interface
- Remove pusher engine implementation of network.Engine - Replace with network.MessageProcessor - See: #6747 (comment) - Remove SubmitCollectionGuarantee message type - Was only used between Finalizer and Pusher engine - New interface passes and stores collection guarantees directly, instead of wrapping and then unwrapping them - See: #6747 (comment) - Add GuaranteedCollectionPublisher interface, implemented by pusher engine - Only used by the Finalizer (and intermediate constructors) - Mocks are generated for it, used in Finalizer unit tests - See: #6747 (comment)
1 parent 7e4258a commit 1fc3a19

File tree

8 files changed

+124
-131
lines changed

8 files changed

+124
-131
lines changed

engine/collection/epochmgr/factories/builder.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ import (
66
"github.com/dgraph-io/badger/v2"
77
"github.com/rs/zerolog"
88

9+
"github.com/onflow/flow-go/engine/collection"
910
"github.com/onflow/flow-go/module"
1011
builder "github.com/onflow/flow-go/module/builder/collection"
1112
finalizer "github.com/onflow/flow-go/module/finalizer/collection"
1213
"github.com/onflow/flow-go/module/mempool"
13-
"github.com/onflow/flow-go/network"
1414
clusterstate "github.com/onflow/flow-go/state/cluster"
1515
"github.com/onflow/flow-go/state/protocol"
1616
"github.com/onflow/flow-go/storage"
@@ -23,7 +23,7 @@ type BuilderFactory struct {
2323
trace module.Tracer
2424
opts []builder.Opt
2525
metrics module.CollectionMetrics
26-
pusher network.Engine // engine for pushing finalized collection to consensus committee
26+
pusher collection.GuaranteedCollectionPublisher // engine for pushing finalized collection to consensus committee
2727
log zerolog.Logger
2828
}
2929

@@ -33,7 +33,7 @@ func NewBuilderFactory(
3333
mainChainHeaders storage.Headers,
3434
trace module.Tracer,
3535
metrics module.CollectionMetrics,
36-
pusher network.Engine,
36+
pusher collection.GuaranteedCollectionPublisher,
3737
log zerolog.Logger,
3838
opts ...builder.Opt,
3939
) (*BuilderFactory, error) {
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package collection
2+
3+
import (
4+
"github.com/onflow/flow-go/model/flow"
5+
)
6+
7+
// GuaranteedCollectionPublisher defines the interface to send collection guarantees
8+
// from a collection node to consensus nodes. Collection guarantees are broadcast on a best-effort basis,
9+
// and it is acceptable to discard some guarantees (especially those that are out of date).
10+
// Implementation is non-blocking and concurrency safe.
11+
type GuaranteedCollectionPublisher interface {
12+
// SubmitCollectionGuarantee adds a guarantee to an internal queue
13+
// to be published to consensus nodes.
14+
SubmitCollectionGuarantee(guarantee *flow.CollectionGuarantee)
15+
}

engine/collection/mock/guaranteed_collection_publisher.go

Lines changed: 32 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/collection/pusher/engine.go

Lines changed: 10 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/onflow/flow-go/engine/common/fifoqueue"
1414
"github.com/onflow/flow-go/model/flow"
1515
"github.com/onflow/flow-go/model/flow/filter"
16-
"github.com/onflow/flow-go/model/messages"
1716
"github.com/onflow/flow-go/module"
1817
"github.com/onflow/flow-go/module/component"
1918
"github.com/onflow/flow-go/module/irrecoverable"
@@ -44,8 +43,7 @@ type Engine struct {
4443
cm *component.ComponentManager
4544
}
4645

47-
// TODO convert to network.MessageProcessor
48-
var _ network.Engine = (*Engine)(nil)
46+
var _ network.MessageProcessor = (*Engine)(nil)
4947
var _ component.Component = (*Engine)(nil)
5048

5149
// New creates a new pusher engine.
@@ -120,17 +118,17 @@ func (e *Engine) outboundQueueWorker(ctx irrecoverable.SignalerContext, ready co
120118
// No errors expected during normal operations.
121119
func (e *Engine) processOutboundMessages(ctx context.Context) error {
122120
for {
123-
nextMessage, ok := e.queue.Pop()
121+
item, ok := e.queue.Pop()
124122
if !ok {
125123
return nil
126124
}
127125

128-
asSCGMsg, ok := nextMessage.(*messages.SubmitCollectionGuarantee)
126+
guarantee, ok := item.(*flow.CollectionGuarantee)
129127
if !ok {
130-
return fmt.Errorf("invalid message type in pusher engine queue")
128+
return fmt.Errorf("invalid type in pusher engine queue")
131129
}
132130

133-
err := e.publishCollectionGuarantee(&asSCGMsg.Guarantee)
131+
err := e.publishCollectionGuarantee(guarantee)
134132
if err != nil {
135133
return err
136134
}
@@ -143,44 +141,18 @@ func (e *Engine) processOutboundMessages(ctx context.Context) error {
143141
}
144142
}
145143

146-
// SubmitLocal submits an event originating on the local node.
147-
func (e *Engine) SubmitLocal(event interface{}) {
148-
ev, ok := event.(*messages.SubmitCollectionGuarantee)
149-
if ok {
150-
e.SubmitCollectionGuarantee(ev)
151-
} else {
152-
engine.LogError(e.log, fmt.Errorf("invalid message argument to pusher engine"))
153-
}
154-
}
155-
156-
// Submit submits the given event from the node with the given origin ID
157-
// for processing in a non-blocking manner. It returns instantly and logs
158-
// a potential processing error internally when done.
159-
func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) {
160-
engine.LogError(e.log, fmt.Errorf("pusher engine should only receive local messages on the same node"))
161-
}
162-
163-
// ProcessLocal processes an event originating on the local node.
164-
func (e *Engine) ProcessLocal(event interface{}) error {
165-
ev, ok := event.(*messages.SubmitCollectionGuarantee)
166-
if ok {
167-
e.SubmitCollectionGuarantee(ev)
168-
return nil
169-
} else {
170-
return fmt.Errorf("invalid message argument to pusher engine")
171-
}
172-
}
173-
174144
// Process processes the given event from the node with the given origin ID in
175-
// a blocking manner. It returns the potential processing error when done.
145+
// a non-blocking manner. It returns the potential processing error when done.
146+
// Because the pusher engine does not accept inputs from the network,
147+
// always drop any messages and return an error.
176148
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, message any) error {
177149
return fmt.Errorf("pusher engine should only receive local messages on the same node")
178150
}
179151

180152
// SubmitCollectionGuarantee adds a collection guarantee to the engine's queue
181153
// to later be published to consensus nodes.
182-
func (e *Engine) SubmitCollectionGuarantee(msg *messages.SubmitCollectionGuarantee) {
183-
if e.queue.Push(msg) {
154+
func (e *Engine) SubmitCollectionGuarantee(guarantee *flow.CollectionGuarantee) {
155+
if e.queue.Push(guarantee) {
184156
e.notifier.Notify()
185157
} else {
186158
e.engMetrics.OutboundMessageDropped(metrics.EngineCollectionProvider, metrics.MessageCollectionGuarantee)

engine/collection/pusher/engine_test.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/onflow/flow-go/engine/collection/pusher"
1414
"github.com/onflow/flow-go/model/flow"
1515
"github.com/onflow/flow-go/model/flow/filter"
16-
"github.com/onflow/flow-go/model/messages"
1716
"github.com/onflow/flow-go/module/irrecoverable"
1817
"github.com/onflow/flow-go/module/metrics"
1918
module "github.com/onflow/flow-go/module/mock"
@@ -97,11 +96,9 @@ func (suite *Suite) TestSubmitCollectionGuarantee() {
9796
suite.conduit.On("Publish", guarantee, consensus[0].NodeID).
9897
Run(func(_ mock.Arguments) { close(done) }).Return(nil).Once()
9998

100-
msg := &messages.SubmitCollectionGuarantee{
101-
Guarantee: *guarantee,
102-
}
103-
err := suite.engine.ProcessLocal(msg)
104-
suite.Require().Nil(err)
99+
suite.engine.SubmitCollectionGuarantee(guarantee)
100+
// TODO signature?
101+
//suite.Require().Nil(err)
105102

106103
unittest.RequireCloseBefore(suite.T(), done, time.Second, "message not sent")
107104

@@ -116,10 +113,7 @@ func (suite *Suite) TestSubmitCollectionGuaranteeNonLocal() {
116113
// send from a non-allowed role
117114
sender := suite.identities.Filter(filter.HasRole[flow.Identity](flow.RoleVerification))[0]
118115

119-
msg := &messages.SubmitCollectionGuarantee{
120-
Guarantee: *guarantee,
121-
}
122-
err := suite.engine.Process(channels.PushGuarantees, sender.NodeID, msg)
116+
err := suite.engine.Process(channels.PushGuarantees, sender.NodeID, guarantee)
123117
suite.Require().Error(err)
124118

125119
suite.conduit.AssertNumberOfCalls(suite.T(), "Multicast", 0)

model/messages/collection.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,6 @@ import (
55
"github.com/onflow/flow-go/model/flow"
66
)
77

8-
// SubmitCollectionGuarantee is a request to submit the given collection
9-
// guarantee to consensus nodes. Only valid as a node-local message.
10-
type SubmitCollectionGuarantee struct {
11-
Guarantee flow.CollectionGuarantee
12-
}
13-
148
// CollectionRequest request all transactions from a collection with the given
159
// fingerprint.
1610
type CollectionRequest struct {

module/finalizer/collection/finalizer.go

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,11 @@ import (
55

66
"github.com/dgraph-io/badger/v2"
77

8+
"github.com/onflow/flow-go/engine/collection"
89
"github.com/onflow/flow-go/model/cluster"
910
"github.com/onflow/flow-go/model/flow"
10-
"github.com/onflow/flow-go/model/messages"
1111
"github.com/onflow/flow-go/module"
1212
"github.com/onflow/flow-go/module/mempool"
13-
"github.com/onflow/flow-go/network"
1413
"github.com/onflow/flow-go/storage/badger/operation"
1514
"github.com/onflow/flow-go/storage/badger/procedure"
1615
)
@@ -22,15 +21,15 @@ import (
2221
type Finalizer struct {
2322
db *badger.DB
2423
transactions mempool.Transactions
25-
prov network.Engine
24+
prov collection.GuaranteedCollectionPublisher
2625
metrics module.CollectionMetrics
2726
}
2827

2928
// NewFinalizer creates a new finalizer for collection nodes.
3029
func NewFinalizer(
3130
db *badger.DB,
3231
transactions mempool.Transactions,
33-
prov network.Engine,
32+
prov collection.GuaranteedCollectionPublisher,
3433
metrics module.CollectionMetrics,
3534
) *Finalizer {
3635
f := &Finalizer{
@@ -160,14 +159,12 @@ func (f *Finalizer) MakeFinal(blockID flow.Identifier) error {
160159
// collection.
161160

162161
// TODO add real signatures here (2711)
163-
f.prov.SubmitLocal(&messages.SubmitCollectionGuarantee{
164-
Guarantee: flow.CollectionGuarantee{
165-
CollectionID: payload.Collection.ID(),
166-
ReferenceBlockID: payload.ReferenceBlockID,
167-
ChainID: header.ChainID,
168-
SignerIndices: step.ParentVoterIndices,
169-
Signature: nil, // TODO: to remove because it's not easily verifiable by consensus nodes
170-
},
162+
f.prov.SubmitCollectionGuarantee(&flow.CollectionGuarantee{
163+
CollectionID: payload.Collection.ID(),
164+
ReferenceBlockID: payload.ReferenceBlockID,
165+
ChainID: header.ChainID,
166+
SignerIndices: step.ParentVoterIndices,
167+
Signature: nil, // TODO: to remove because it's not easily verifiable by consensus nodes
171168
})
172169
}
173170

0 commit comments

Comments
 (0)