Skip to content

Commit f66ba69

Browse files
committed
Refactor pusher engine
- Partially implement suggestion #6747 (comment) - Make `SubmitCollectionGuarantee` non-exported and rename to `publishCollectionGuarantee` - Add new `SubmitCollectionGuarantee` exported function that just adds to the queue - Remove `messageHandler` field, instead directly add to queue from review: #6747 (comment) - `OriginID`s no longer included in messages in the queue, and therefore not checked by the worker - if necessary they should be checked when Submitting
1 parent f2f53a8 commit f66ba69

File tree

1 file changed

+35
-22
lines changed

1 file changed

+35
-22
lines changed

engine/collection/pusher/engine.go

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package pusher
55

66
import (
77
"context"
8-
"errors"
98
"fmt"
109

1110
"github.com/rs/zerolog"
@@ -38,9 +37,8 @@ type Engine struct {
3837
collections storage.Collections
3938
transactions storage.Transactions
4039

41-
messageHandler *engine.MessageHandler
42-
notifier engine.Notifier
43-
inbound *fifoqueue.FifoQueue
40+
notifier engine.Notifier
41+
inbound *fifoqueue.FifoQueue
4442

4543
component.Component
4644
cm *component.ComponentManager
@@ -58,10 +56,6 @@ func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, e
5856
}
5957

6058
notifier := engine.NewNotifier()
61-
messageHandler := engine.NewMessageHandler(log, notifier, engine.Pattern{
62-
Match: engine.MatchType[*messages.SubmitCollectionGuarantee],
63-
Store: &engine.FifoMessageStore{FifoQueue: inbound},
64-
})
6559

6660
e := &Engine{
6761
log: log.With().Str("engine", "pusher").Logger(),
@@ -72,9 +66,8 @@ func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, e
7266
collections: collections,
7367
transactions: transactions,
7468

75-
messageHandler: messageHandler,
76-
notifier: notifier,
77-
inbound: inbound,
69+
notifier: notifier,
70+
inbound: inbound,
7871
}
7972

8073
conduit, err := net.Register(channels.PushGuarantees, e)
@@ -119,11 +112,12 @@ func (e *Engine) processOutboundMessages(ctx context.Context) error {
119112
return nil
120113
}
121114

122-
asEngineWrapper := nextMessage.(*engine.Message)
123-
asSCGMsg := asEngineWrapper.Payload.(*messages.SubmitCollectionGuarantee)
124-
originID := asEngineWrapper.OriginID
115+
asSCGMsg, ok := nextMessage.(*messages.SubmitCollectionGuarantee)
116+
if !ok {
117+
return fmt.Errorf("invalid message type in pusher engine queue")
118+
}
125119

126-
err := e.process(originID, asSCGMsg)
120+
err := e.publishCollectionGuarantee(&asSCGMsg.Guarantee)
127121
if err != nil {
128122
return err
129123
}
@@ -138,9 +132,11 @@ func (e *Engine) processOutboundMessages(ctx context.Context) error {
138132

139133
// SubmitLocal submits an event originating on the local node.
140134
func (e *Engine) SubmitLocal(event interface{}) {
141-
err := e.messageHandler.Process(e.me.NodeID(), event)
142-
if err != nil {
143-
engine.LogError(e.log, err)
135+
ev, ok := event.(*messages.SubmitCollectionGuarantee)
136+
if ok {
137+
e.SubmitCollectionGuarantee(ev)
138+
} else {
139+
engine.LogError(e.log, fmt.Errorf("invalid message argument to pusher engine"))
144140
}
145141
}
146142

@@ -153,7 +149,13 @@ func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, even
153149

154150
// ProcessLocal processes an event originating on the local node.
155151
func (e *Engine) ProcessLocal(event interface{}) error {
156-
return e.messageHandler.Process(e.me.NodeID(), event)
152+
ev, ok := event.(*messages.SubmitCollectionGuarantee)
153+
if ok {
154+
e.SubmitCollectionGuarantee(ev)
155+
return nil
156+
} else {
157+
return fmt.Errorf("invalid message argument to pusher engine")
158+
}
157159
}
158160

159161
// Process processes the given event from the node with the given origin ID in
@@ -162,6 +164,17 @@ func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, mes
162164
return fmt.Errorf("pusher engine should only receive local messages on the same node")
163165
}
164166

167+
// SubmitCollectionGuarantee adds a collection guarantee to the engine's queue
168+
// to later be published to consensus nodes.
169+
func (e *Engine) SubmitCollectionGuarantee(msg *messages.SubmitCollectionGuarantee) {
170+
ok := e.inbound.Push(msg)
171+
if !ok {
172+
e.log.Err(fmt.Errorf("failed to store collection guarantee in queue"))
173+
return
174+
}
175+
e.notifier.Notify()
176+
}
177+
165178
// process processes events for the pusher engine on the collection node.
166179
func (e *Engine) process(originID flow.Identifier, event interface{}) error {
167180
switch ev := event.(type) {
@@ -171,14 +184,14 @@ func (e *Engine) process(originID flow.Identifier, event interface{}) error {
171184
if originID != e.me.NodeID() {
172185
return fmt.Errorf("invalid remote request to submit collection guarantee (from=%x)", originID)
173186
}
174-
return e.SubmitCollectionGuarantee(&ev.Guarantee)
187+
return e.publishCollectionGuarantee(&ev.Guarantee)
175188
default:
176189
return fmt.Errorf("invalid event type (%T)", event)
177190
}
178191
}
179192

180-
// SubmitCollectionGuarantee submits the collection guarantee to all consensus nodes.
181-
func (e *Engine) SubmitCollectionGuarantee(guarantee *flow.CollectionGuarantee) error {
193+
// publishCollectionGuarantee publishes the collection guarantee to all consensus nodes.
194+
func (e *Engine) publishCollectionGuarantee(guarantee *flow.CollectionGuarantee) error {
182195
consensusNodes, err := e.state.Final().Identities(filter.HasRole[flow.Identity](flow.RoleConsensus))
183196
if err != nil {
184197
return fmt.Errorf("could not get consensus nodes: %w", err)

0 commit comments

Comments
 (0)