Skip to content

Commit f2f53a8

Browse files
committed
Refactor pusher engine: rename and propagate error
Rename `inboundMessageWorker` and `processInboundMessages` to `outbound` and also propagate errors to the top level of the worker where they can be thrown.
1 parent e25cba7 commit f2f53a8

File tree

1 file changed

+16
-7
lines changed

1 file changed

+16
-7
lines changed

engine/collection/pusher/engine.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,15 @@ func New(log zerolog.Logger, net network.EngineRegistry, state protocol.State, e
8484
e.conduit = conduit
8585

8686
e.cm = component.NewComponentManagerBuilder().
87-
AddWorker(e.inboundMessageWorker).
87+
AddWorker(e.outboundQueueWorker).
8888
Build()
8989
e.Component = e.cm
9090

9191
return e, nil
9292
}
9393

94-
func (e *Engine) inboundMessageWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
94+
// Worker to process SubmitCollectionGuarantee messages coming from the Finalizer.
95+
func (e *Engine) outboundQueueWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
9596
ready()
9697

9798
done := ctx.Done()
@@ -101,27 +102,35 @@ func (e *Engine) inboundMessageWorker(ctx irrecoverable.SignalerContext, ready c
101102
case <-done:
102103
return
103104
case <-wake:
104-
e.processInboundMessages(ctx)
105+
err := e.processOutboundMessages(ctx)
106+
if err != nil {
107+
ctx.Throw(err)
108+
}
105109
}
106110
}
107111
}
108112

109-
func (e *Engine) processInboundMessages(ctx context.Context) {
113+
// processOutboundMessages processes any available messages from the queue.
114+
// Only returns when the queue is empty (or the engine is terminated).
115+
func (e *Engine) processOutboundMessages(ctx context.Context) error {
110116
for {
111117
nextMessage, ok := e.inbound.Pop()
112118
if !ok {
113-
return
119+
return nil
114120
}
115121

116122
asEngineWrapper := nextMessage.(*engine.Message)
117123
asSCGMsg := asEngineWrapper.Payload.(*messages.SubmitCollectionGuarantee)
118124
originID := asEngineWrapper.OriginID
119125

120-
_ = e.process(originID, asSCGMsg)
126+
err := e.process(originID, asSCGMsg)
127+
if err != nil {
128+
return err
129+
}
121130

122131
select {
123132
case <-ctx.Done():
124-
return
133+
return nil
125134
default:
126135
}
127136
}

0 commit comments

Comments
 (0)