Skip to content

Commit 656cbd5

Browse files
authored
Merge pull request #6780 from onflow/tim/6765-pusher-engine-update-interface
Refactor Pusher Engine (part 2) - updated interface
2 parents 7e4258a + f58ccaf commit 656cbd5

File tree

8 files changed

+156
-159
lines changed

8 files changed

+156
-159
lines changed

engine/collection/epochmgr/factories/builder.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ import (
66
"github.com/dgraph-io/badger/v2"
77
"github.com/rs/zerolog"
88

9+
"github.com/onflow/flow-go/engine/collection"
910
"github.com/onflow/flow-go/module"
1011
builder "github.com/onflow/flow-go/module/builder/collection"
1112
finalizer "github.com/onflow/flow-go/module/finalizer/collection"
1213
"github.com/onflow/flow-go/module/mempool"
13-
"github.com/onflow/flow-go/network"
1414
clusterstate "github.com/onflow/flow-go/state/cluster"
1515
"github.com/onflow/flow-go/state/protocol"
1616
"github.com/onflow/flow-go/storage"
@@ -23,7 +23,7 @@ type BuilderFactory struct {
2323
trace module.Tracer
2424
opts []builder.Opt
2525
metrics module.CollectionMetrics
26-
pusher network.Engine // engine for pushing finalized collection to consensus committee
26+
pusher collection.GuaranteedCollectionPublisher // engine for pushing finalized collection to consensus committee
2727
log zerolog.Logger
2828
}
2929

@@ -33,7 +33,7 @@ func NewBuilderFactory(
3333
mainChainHeaders storage.Headers,
3434
trace module.Tracer,
3535
metrics module.CollectionMetrics,
36-
pusher network.Engine,
36+
pusher collection.GuaranteedCollectionPublisher,
3737
log zerolog.Logger,
3838
opts ...builder.Opt,
3939
) (*BuilderFactory, error) {
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package collection
2+
3+
import (
4+
"github.com/onflow/flow-go/model/flow"
5+
)
6+
7+
// GuaranteedCollectionPublisher defines the interface to send collection guarantees
8+
// from a collection node to consensus nodes. Collection guarantees are broadcast on a best-effort basis,
9+
// and it is acceptable to discard some guarantees (especially those that are out of date).
10+
// Implementation is non-blocking and concurrency safe.
11+
type GuaranteedCollectionPublisher interface {
12+
// SubmitCollectionGuarantee adds a guarantee to an internal queue
13+
// to be published to consensus nodes.
14+
SubmitCollectionGuarantee(guarantee *flow.CollectionGuarantee)
15+
}

engine/collection/mock/guaranteed_collection_publisher.go

Lines changed: 32 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/collection/pusher/engine.go

Lines changed: 26 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/onflow/flow-go/engine/common/fifoqueue"
1414
"github.com/onflow/flow-go/model/flow"
1515
"github.com/onflow/flow-go/model/flow/filter"
16-
"github.com/onflow/flow-go/model/messages"
1716
"github.com/onflow/flow-go/module"
1817
"github.com/onflow/flow-go/module/component"
1918
"github.com/onflow/flow-go/module/irrecoverable"
@@ -44,8 +43,7 @@ type Engine struct {
4443
cm *component.ComponentManager
4544
}
4645

47-
// TODO convert to network.MessageProcessor
48-
var _ network.Engine = (*Engine)(nil)
46+
var _ network.MessageProcessor = (*Engine)(nil)
4947
var _ component.Component = (*Engine)(nil)
5048

5149
// New creates a new pusher engine.
@@ -120,17 +118,17 @@ func (e *Engine) outboundQueueWorker(ctx irrecoverable.SignalerContext, ready co
120118
// No errors expected during normal operations.
121119
func (e *Engine) processOutboundMessages(ctx context.Context) error {
122120
for {
123-
nextMessage, ok := e.queue.Pop()
121+
item, ok := e.queue.Pop()
124122
if !ok {
125123
return nil
126124
}
127125

128-
asSCGMsg, ok := nextMessage.(*messages.SubmitCollectionGuarantee)
126+
guarantee, ok := item.(*flow.CollectionGuarantee)
129127
if !ok {
130-
return fmt.Errorf("invalid message type in pusher engine queue")
128+
return fmt.Errorf("invalid type in pusher engine queue")
131129
}
132130

133-
err := e.publishCollectionGuarantee(&asSCGMsg.Guarantee)
131+
err := e.publishCollectionGuarantee(guarantee)
134132
if err != nil {
135133
return err
136134
}
@@ -143,44 +141,32 @@ func (e *Engine) processOutboundMessages(ctx context.Context) error {
143141
}
144142
}
145143

146-
// SubmitLocal submits an event originating on the local node.
147-
func (e *Engine) SubmitLocal(event interface{}) {
148-
ev, ok := event.(*messages.SubmitCollectionGuarantee)
149-
if ok {
150-
e.SubmitCollectionGuarantee(ev)
151-
} else {
152-
engine.LogError(e.log, fmt.Errorf("invalid message argument to pusher engine"))
153-
}
154-
}
155-
156-
// Submit submits the given event from the node with the given origin ID
157-
// for processing in a non-blocking manner. It returns instantly and logs
158-
// a potential processing error internally when done.
159-
func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) {
160-
engine.LogError(e.log, fmt.Errorf("pusher engine should only receive local messages on the same node"))
161-
}
162-
163-
// ProcessLocal processes an event originating on the local node.
164-
func (e *Engine) ProcessLocal(event interface{}) error {
165-
ev, ok := event.(*messages.SubmitCollectionGuarantee)
166-
if ok {
167-
e.SubmitCollectionGuarantee(ev)
168-
return nil
169-
} else {
170-
return fmt.Errorf("invalid message argument to pusher engine")
171-
}
172-
}
173-
174-
// Process processes the given event from the node with the given origin ID in
175-
// a blocking manner. It returns the potential processing error when done.
144+
// Process is called by the networking layer, when peers broadcast messages with this node
145+
// as one of the recipients. The protocol specifies that Collector nodes broadcast Collection
146+
// Guarantees to Consensus Nodes and _only_ those. When the pusher engine (running only on
147+
// Collectors) receives a message, this message is evidence of byzantine behavior.
148+
// Byzantine inputs are internally handled by the pusher.Engine and do *not* result in
149+
// error returns. No errors expected during normal operation (including byzantine inputs).
176150
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, message any) error {
177-
return fmt.Errorf("pusher engine should only receive local messages on the same node")
151+
// Targeting a collector node's pusher.Engine with messages could be considered as a slashable offense.
152+
// Though, for generating cryptographic evidence, we need Message Forensics - see reference [1].
153+
// Much further into the future, when we are implementing slashing challenges, we'll probably implement a
154+
// dedicated consumer to post-process evidence of protocol violations into slashing challenges. For now,
155+
// we just log this with the `KeySuspicious` to alert the node operator.
156+
// [1] Message Forensics FLIP https://github.com/onflow/flips/pull/195)
157+
errs := fmt.Errorf("collector node's pusher.Engine was targeted by message %T on channel %v", message, channel)
158+
e.log.Warn().
159+
Err(errs).
160+
Bool(logging.KeySuspicious, true).
161+
Str("peer_id", originID.String()).
162+
Msg("potentially byzantine networking traffic detected")
163+
return nil
178164
}
179165

180166
// SubmitCollectionGuarantee adds a collection guarantee to the engine's queue
181167
// to later be published to consensus nodes.
182-
func (e *Engine) SubmitCollectionGuarantee(msg *messages.SubmitCollectionGuarantee) {
183-
if e.queue.Push(msg) {
168+
func (e *Engine) SubmitCollectionGuarantee(guarantee *flow.CollectionGuarantee) {
169+
if e.queue.Push(guarantee) {
184170
e.notifier.Notify()
185171
} else {
186172
e.engMetrics.OutboundMessageDropped(metrics.EngineCollectionProvider, metrics.MessageCollectionGuarantee)

engine/collection/pusher/engine_test.go

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/onflow/flow-go/engine/collection/pusher"
1414
"github.com/onflow/flow-go/model/flow"
1515
"github.com/onflow/flow-go/model/flow/filter"
16-
"github.com/onflow/flow-go/model/messages"
1716
"github.com/onflow/flow-go/module/irrecoverable"
1817
"github.com/onflow/flow-go/module/metrics"
1918
module "github.com/onflow/flow-go/module/mock"
@@ -97,11 +96,7 @@ func (suite *Suite) TestSubmitCollectionGuarantee() {
9796
suite.conduit.On("Publish", guarantee, consensus[0].NodeID).
9897
Run(func(_ mock.Arguments) { close(done) }).Return(nil).Once()
9998

100-
msg := &messages.SubmitCollectionGuarantee{
101-
Guarantee: *guarantee,
102-
}
103-
err := suite.engine.ProcessLocal(msg)
104-
suite.Require().Nil(err)
99+
suite.engine.SubmitCollectionGuarantee(guarantee)
105100

106101
unittest.RequireCloseBefore(suite.T(), done, time.Second, "message not sent")
107102

@@ -113,14 +108,13 @@ func (suite *Suite) TestSubmitCollectionGuaranteeNonLocal() {
113108

114109
guarantee := unittest.CollectionGuaranteeFixture()
115110

116-
// send from a non-allowed role
111+
// verify that pusher.Engine handles any (potentially byzantine) input:
112+
// A byzantine peer could target the collector node's pusher engine with messages
113+
// The pusher should discard those and explicitly not get tricked into broadcasting
114+
// collection guarantees which a byzantine peer might try to inject into the system.
117115
sender := suite.identities.Filter(filter.HasRole[flow.Identity](flow.RoleVerification))[0]
118116

119-
msg := &messages.SubmitCollectionGuarantee{
120-
Guarantee: *guarantee,
121-
}
122-
err := suite.engine.Process(channels.PushGuarantees, sender.NodeID, msg)
123-
suite.Require().Error(err)
124-
117+
err := suite.engine.Process(channels.PushGuarantees, sender.NodeID, guarantee)
118+
suite.Require().NoError(err)
125119
suite.conduit.AssertNumberOfCalls(suite.T(), "Multicast", 0)
126120
}

model/messages/collection.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,6 @@ import (
55
"github.com/onflow/flow-go/model/flow"
66
)
77

8-
// SubmitCollectionGuarantee is a request to submit the given collection
9-
// guarantee to consensus nodes. Only valid as a node-local message.
10-
type SubmitCollectionGuarantee struct {
11-
Guarantee flow.CollectionGuarantee
12-
}
13-
148
// CollectionRequest request all transactions from a collection with the given
159
// fingerprint.
1610
type CollectionRequest struct {

module/finalizer/collection/finalizer.go

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,11 @@ import (
55

66
"github.com/dgraph-io/badger/v2"
77

8+
"github.com/onflow/flow-go/engine/collection"
89
"github.com/onflow/flow-go/model/cluster"
910
"github.com/onflow/flow-go/model/flow"
10-
"github.com/onflow/flow-go/model/messages"
1111
"github.com/onflow/flow-go/module"
1212
"github.com/onflow/flow-go/module/mempool"
13-
"github.com/onflow/flow-go/network"
1413
"github.com/onflow/flow-go/storage/badger/operation"
1514
"github.com/onflow/flow-go/storage/badger/procedure"
1615
)
@@ -22,21 +21,21 @@ import (
2221
type Finalizer struct {
2322
db *badger.DB
2423
transactions mempool.Transactions
25-
prov network.Engine
24+
pusher collection.GuaranteedCollectionPublisher
2625
metrics module.CollectionMetrics
2726
}
2827

2928
// NewFinalizer creates a new finalizer for collection nodes.
3029
func NewFinalizer(
3130
db *badger.DB,
3231
transactions mempool.Transactions,
33-
prov network.Engine,
32+
pusher collection.GuaranteedCollectionPublisher,
3433
metrics module.CollectionMetrics,
3534
) *Finalizer {
3635
f := &Finalizer{
3736
db: db,
3837
transactions: transactions,
39-
prov: prov,
38+
pusher: pusher,
4039
metrics: metrics,
4140
}
4241
return f
@@ -159,15 +158,13 @@ func (f *Finalizer) MakeFinal(blockID flow.Identifier) error {
159158
// For now, we just use the parent signers as the guarantors of this
160159
// collection.
161160

162-
// TODO add real signatures here (2711)
163-
f.prov.SubmitLocal(&messages.SubmitCollectionGuarantee{
164-
Guarantee: flow.CollectionGuarantee{
165-
CollectionID: payload.Collection.ID(),
166-
ReferenceBlockID: payload.ReferenceBlockID,
167-
ChainID: header.ChainID,
168-
SignerIndices: step.ParentVoterIndices,
169-
Signature: nil, // TODO: to remove because it's not easily verifiable by consensus nodes
170-
},
161+
// TODO add real signatures here (https://github.com/onflow/flow-go-internal/issues/4569)
162+
f.pusher.SubmitCollectionGuarantee(&flow.CollectionGuarantee{
163+
CollectionID: payload.Collection.ID(),
164+
ReferenceBlockID: payload.ReferenceBlockID,
165+
ChainID: header.ChainID,
166+
SignerIndices: step.ParentVoterIndices,
167+
Signature: nil, // TODO: to remove because it's not easily verifiable by consensus nodes
171168
})
172169
}
173170

0 commit comments

Comments
 (0)