@@ -10,87 +10,32 @@ import (
1010 "github.com/pkg/errors"
1111)
1212
13- // Ensure the projectionNotificationProcessor.Queue is a ProjectionTrigger
14- var _ ProjectionTrigger = (& notificationQueue {}).Queue
15- var _ ProjectionTrigger = (& notificationQueue {}).ReQueue
16-
1713type (
18- // projectionNotificationProcessor provides a way to Trigger a notification using a set of background processes.
19- projectionNotificationProcessor struct {
14+ // ProjectionNotificationProcessor provides a way to Trigger a notification using a set of background processes.
15+ ProjectionNotificationProcessor struct {
2016 done chan struct {}
21- queue chan * ProjectionNotification
2217 queueProcessors int
23- queueBuffer int
2418
2519 logger goengine.Logger
2620 metrics Metrics
2721
28- notificationQueue notificationQueueInterface
29- }
30-
31- notificationQueueInterface interface {
32- Start (done chan struct {}, queue chan * ProjectionNotification )
33- Queue (ctx context.Context , notification * ProjectionNotification ) error
34- ReQueue (ctx context.Context , notification * ProjectionNotification ) error
35- }
36-
37- notificationQueue struct {
38- retryDelay time.Duration
39- metrics Metrics
40- done chan struct {}
41- queue chan * ProjectionNotification
22+ notificationQueue NotificationQueueInterface
4223 }
4324
4425 // ProcessHandler is a func used to trigger a notification but with the addition of providing a Trigger func so
4526 // the original notification can trigger other notifications
4627 ProcessHandler func (context.Context , * ProjectionNotification , ProjectionTrigger ) error
4728)
4829
49- func newNotificationQueue (retryDelay time.Duration , metrics Metrics ) * notificationQueue {
50- if retryDelay == 0 {
51- retryDelay = time .Millisecond * 50
52- }
53-
54- return & notificationQueue {
55- retryDelay : retryDelay ,
56- metrics : metrics ,
57- }
58- }
59-
60- func (nq * notificationQueue ) Start (done chan struct {}, queue chan * ProjectionNotification ) {
61- nq .done = done
62- nq .queue = queue
63- }
64-
65- func (nq * notificationQueue ) Queue (ctx context.Context , notification * ProjectionNotification ) error {
66- select {
67- default :
68- case <- ctx .Done ():
69- return context .Canceled
70- case <- nq .done :
71- return errors .New ("goengine: unable to queue notification because the processor was stopped" )
72- }
73-
74- nq .metrics .QueueNotification (notification )
75-
76- nq .queue <- notification
77- return nil
78- }
79-
80- func (nq * notificationQueue ) ReQueue (ctx context.Context , notification * ProjectionNotification ) error {
81- notification .ValidAfter = time .Now ().Add (nq .retryDelay )
82-
83- return nq .Queue (ctx , notification )
84- }
85-
86- // newBackgroundProcessor create a new projectionNotificationProcessor
87- func newBackgroundProcessor (
30+ // NewBackgroundProcessor create a new projectionNotificationProcessor
31+ func NewBackgroundProcessor (
8832 queueProcessors ,
8933 queueBuffer int ,
9034 logger goengine.Logger ,
9135 metrics Metrics ,
9236 retryDelay time.Duration ,
93- ) (* projectionNotificationProcessor , error ) {
37+ notificationQueue NotificationQueueInterface ,
38+ ) (* ProjectionNotificationProcessor , error ) {
9439 if queueProcessors <= 0 {
9540 return nil , errors .New ("queueProcessors must be greater then zero" )
9641 }
@@ -103,18 +48,20 @@ func newBackgroundProcessor(
10348 if metrics == nil {
10449 metrics = NopMetrics
10550 }
51+ if notificationQueue == nil {
52+ notificationQueue = newNotificationQueue (queueBuffer , retryDelay , metrics )
53+ }
10654
107- return & projectionNotificationProcessor {
55+ return & ProjectionNotificationProcessor {
10856 queueProcessors : queueProcessors ,
109- queueBuffer : queueBuffer ,
11057 logger : logger ,
11158 metrics : metrics ,
112- notificationQueue : newNotificationQueue ( retryDelay , metrics ) ,
59+ notificationQueue : notificationQueue ,
11360 }, nil
11461}
11562
11663// Execute starts the background worker and wait for the notification to be executed
117- func (b * projectionNotificationProcessor ) Execute (ctx context.Context , handler ProcessHandler , notification * ProjectionNotification ) error {
64+ func (b * ProjectionNotificationProcessor ) Execute (ctx context.Context , handler ProcessHandler , notification * ProjectionNotification ) error {
11865 // Wrap the processNotification in order to know that the first trigger finished
11966 handler , handlerDone := b .wrapProcessHandlerForSingleRun (handler )
12067
@@ -137,11 +84,10 @@ func (b *projectionNotificationProcessor) Execute(ctx context.Context, handler P
13784}
13885
13986// Start starts the background processes that will call the ProcessHandler based on the notification queued by Exec
140- func (b * projectionNotificationProcessor ) Start (ctx context.Context , handler ProcessHandler ) func () {
87+ func (b * ProjectionNotificationProcessor ) Start (ctx context.Context , handler ProcessHandler ) func () {
14188 b .done = make (chan struct {})
142- b .queue = make (chan * ProjectionNotification , b .queueBuffer )
14389
144- b .notificationQueue .Start (b .done , b . queue )
90+ b .notificationQueue .Open (b .done )
14591
14692 var wg sync.WaitGroup
14793 wg .Add (b .queueProcessors )
@@ -158,37 +104,32 @@ func (b *projectionNotificationProcessor) Start(ctx context.Context, handler Pro
158104 return func () {
159105 close (b .done )
160106 wg .Wait ()
161- close ( b . queue )
107+ b . notificationQueue . Close ( )
162108 }
163109}
164110
165111// Queue puts the notification on the queue to be processed
166- func (b * projectionNotificationProcessor ) Queue (ctx context.Context , notification * ProjectionNotification ) error {
112+ func (b * ProjectionNotificationProcessor ) Queue (ctx context.Context , notification * ProjectionNotification ) error {
167113 return b .notificationQueue .Queue (ctx , notification )
168114}
169115
170- // ReQueue puts the notification again on the queue to be processed with a ValidAfter set
171- func (b * projectionNotificationProcessor ) ReQueue (ctx context.Context , notification * ProjectionNotification ) error {
172- return b .notificationQueue .ReQueue (ctx , notification )
173- }
174-
175- func (b * projectionNotificationProcessor ) startProcessor (ctx context.Context , handler ProcessHandler ) {
116+ func (b * ProjectionNotificationProcessor ) startProcessor (ctx context.Context , handler ProcessHandler ) {
176117ProcessorLoop:
177118 for {
178119 select {
179120 case <- b .done :
180121 return
181122 case <- ctx .Done ():
182123 return
183- case notification := <- b .queue :
124+ case notification := <- b .notificationQueue . Channel () :
184125 var queueFunc ProjectionTrigger
185126 if notification == nil {
186127 queueFunc = b .notificationQueue .Queue
187128 } else {
188129 queueFunc = b .notificationQueue .ReQueue
189130
190131 if notification .ValidAfter .After (time .Now ()) {
191- b .queue <- notification
132+ b .notificationQueue . PutBack ( notification )
192133 continue ProcessorLoop
193134 }
194135 }
@@ -212,7 +153,7 @@ ProcessorLoop:
212153
213154// wrapProcessHandlerForSingleRun returns a wrapped ProcessHandler with a done channel that is closed after the
214155// provided ProcessHandler it's first call and related messages are finished or when the context is done.
215- func (b * projectionNotificationProcessor ) wrapProcessHandlerForSingleRun (handler ProcessHandler ) (ProcessHandler , chan struct {}) {
156+ func (b * ProjectionNotificationProcessor ) wrapProcessHandlerForSingleRun (handler ProcessHandler ) (ProcessHandler , chan struct {}) {
216157 done := make (chan struct {})
217158
218159 var m sync.Mutex
@@ -239,7 +180,7 @@ func (b *projectionNotificationProcessor) wrapProcessHandlerForSingleRun(handler
239180 close (done )
240181 default :
241182 // No more queued messages to close the run
242- if len ( b . queue ) == 0 {
183+ if b . notificationQueue . Empty () {
243184 close (done )
244185 }
245186 }
0 commit comments