Skip to content

Commit e25cba7

Browse files
committed
Refactor pusher engine: error on non-local messages
1 parent b7166f3 commit e25cba7

File tree

1 file changed

+2
-14
lines changed

1 file changed

+2
-14
lines changed

engine/collection/pusher/engine.go

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,7 @@ func (e *Engine) SubmitLocal(event interface{}) {
139139
// for processing in a non-blocking manner. It returns instantly and logs
140140
// a potential processing error internally when done.
141141
func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) {
142-
err := e.messageHandler.Process(originID, event)
143-
if err != nil {
144-
engine.LogError(e.log, err)
145-
}
142+
engine.LogError(e.log, fmt.Errorf("pusher engine should only receive local messages on the same node"))
146143
}
147144

148145
// ProcessLocal processes an event originating on the local node.
@@ -153,16 +150,7 @@ func (e *Engine) ProcessLocal(event interface{}) error {
153150
// Process processes the given event from the node with the given origin ID in
154151
// a blocking manner. It returns the potential processing error when done.
155152
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, message any) error {
156-
err := e.messageHandler.Process(originID, message)
157-
if err != nil {
158-
if errors.Is(err, engine.IncompatibleInputTypeError) {
159-
e.log.Warn().Bool(logging.KeySuspicious, true).Msgf("%v delivered unsupported message %T through %v", originID, message, channel)
160-
return nil
161-
}
162-
// TODO add comment about Process errors...
163-
return fmt.Errorf("unexpected failure to process inbound pusher message")
164-
}
165-
return nil
153+
return fmt.Errorf("pusher engine should only receive local messages on the same node")
166154
}
167155

168156
// process processes events for the pusher engine on the collection node.

0 commit comments

Comments
 (0)