@@ -4,33 +4,37 @@ import (
44 "context"
55 "runtime"
66 "sync"
7+ "time"
78
89 "github.com/hellofresh/goengine"
910 "github.com/pkg/errors"
1011)
1112
12- // Ensure the projectionNotificationProcessor.Queue is a ProjectionTrigger
13- var _ ProjectionTrigger = (& projectionNotificationProcessor {}).Queue
14-
1513type (
16- // projectionNotificationProcessor provides a way to Trigger a notification using a set of background processes.
17- projectionNotificationProcessor struct {
14+ // ProjectionNotificationProcessor provides a way to Trigger a notification using a set of background processes.
15+ ProjectionNotificationProcessor struct {
1816 done chan struct {}
19- queue chan * ProjectionNotification
2017 queueProcessors int
21- queueBuffer int
2218
2319 logger goengine.Logger
2420 metrics Metrics
21+
22+ notificationQueue NotificationQueuer
2523 }
2624
2725 // ProcessHandler is a func used to trigger a notification but with the addition of providing a Trigger func so
2826 // the original notification can trigger other notifications
2927 ProcessHandler func (context.Context , * ProjectionNotification , ProjectionTrigger ) error
3028)
3129
32- // newBackgroundProcessor create a new projectionNotificationProcessor
33- func newBackgroundProcessor (queueProcessors , queueBuffer int , logger goengine.Logger , metrics Metrics ) (* projectionNotificationProcessor , error ) {
30+ // NewBackgroundProcessor create a new projectionNotificationProcessor
31+ func NewBackgroundProcessor (
32+ queueProcessors ,
33+ queueBuffer int ,
34+ logger goengine.Logger ,
35+ metrics Metrics ,
36+ notificationQueue NotificationQueuer ,
37+ ) (* ProjectionNotificationProcessor , error ) {
3438 if queueProcessors <= 0 {
3539 return nil , errors .New ("queueProcessors must be greater then zero" )
3640 }
@@ -43,17 +47,20 @@ func newBackgroundProcessor(queueProcessors, queueBuffer int, logger goengine.Lo
4347 if metrics == nil {
4448 metrics = NopMetrics
4549 }
50+ if notificationQueue == nil {
51+ notificationQueue = newNotificationQueue (queueBuffer , 0 , metrics )
52+ }
4653
47- return & projectionNotificationProcessor {
48- queueProcessors : queueProcessors ,
49- queueBuffer : queueBuffer ,
50- logger : logger ,
51- metrics : metrics ,
54+ return & ProjectionNotificationProcessor {
55+ queueProcessors : queueProcessors ,
56+ logger : logger ,
57+ metrics : metrics ,
58+ notificationQueue : notificationQueue ,
5259 }, nil
5360}
5461
5562// Execute starts the background worker and wait for the notification to be executed
56- func (b * projectionNotificationProcessor ) Execute (ctx context.Context , handler ProcessHandler , notification * ProjectionNotification ) error {
63+ func (b * ProjectionNotificationProcessor ) Execute (ctx context.Context , handler ProcessHandler , notification * ProjectionNotification ) error {
5764 // Wrap the processNotification in order to know that the first trigger finished
5865 handler , handlerDone := b .wrapProcessHandlerForSingleRun (handler )
5966
@@ -62,7 +69,7 @@ func (b *projectionNotificationProcessor) Execute(ctx context.Context, handler P
6269 defer stopExecutor ()
6370
6471 // Execute a run of the internal.
65- if err := b .Queue (ctx , nil ); err != nil {
72+ if err := b .notificationQueue . Queue (ctx , nil ); err != nil {
6673 return err
6774 }
6875
@@ -76,9 +83,8 @@ func (b *projectionNotificationProcessor) Execute(ctx context.Context, handler P
7683}
7784
7885// Start starts the background processes that will call the ProcessHandler based on the notification queued by Exec
79- func (b * projectionNotificationProcessor ) Start (ctx context.Context , handler ProcessHandler ) func () {
80- b .done = make (chan struct {})
81- b .queue = make (chan * ProjectionNotification , b .queueBuffer )
86+ func (b * ProjectionNotificationProcessor ) Start (ctx context.Context , handler ProcessHandler ) func () {
87+ b .done = b .notificationQueue .Open ()
8288
8389 var wg sync.WaitGroup
8490 wg .Add (b .queueProcessors )
@@ -95,37 +101,39 @@ func (b *projectionNotificationProcessor) Start(ctx context.Context, handler Pro
95101 return func () {
96102 close (b .done )
97103 wg .Wait ()
98- close ( b . queue )
104+ b . notificationQueue . Close ( )
99105 }
100106}
101107
102108// Queue puts the notification on the queue to be processed
103- func (b * projectionNotificationProcessor ) Queue (ctx context.Context , notification * ProjectionNotification ) error {
104- select {
105- default :
106- case <- ctx .Done ():
107- return context .Canceled
108- case <- b .done :
109- return errors .New ("goengine: unable to queue notification because the processor was stopped" )
110- }
111-
112- b .metrics .QueueNotification (notification )
113-
114- b .queue <- notification
115- return nil
109+ func (b * ProjectionNotificationProcessor ) Queue (ctx context.Context , notification * ProjectionNotification ) error {
110+ return b .notificationQueue .Queue (ctx , notification )
116111}
117112
118- func (b * projectionNotificationProcessor ) startProcessor (ctx context.Context , handler ProcessHandler ) {
113+ func (b * ProjectionNotificationProcessor ) startProcessor (ctx context.Context , handler ProcessHandler ) {
114+ ProcessorLoop:
119115 for {
120116 select {
121117 case <- b .done :
122118 return
123119 case <- ctx .Done ():
124120 return
125- case notification := <- b .queue :
121+ case notification := <- b .notificationQueue .Channel ():
122+ var queueFunc ProjectionTrigger
123+ if notification == nil {
124+ queueFunc = b .notificationQueue .Queue
125+ } else {
126+ queueFunc = b .notificationQueue .ReQueue
127+
128+ if notification .ValidAfter .After (time .Now ()) {
129+ b .notificationQueue .PutBack (notification )
130+ continue ProcessorLoop
131+ }
132+ }
133+
126134 // Execute the notification
127135 b .metrics .StartNotificationProcessing (notification )
128- if err := handler (ctx , notification , b . Queue ); err != nil {
136+ if err := handler (ctx , notification , queueFunc ); err != nil {
129137 b .logger .Error ("the ProcessHandler produced an error" , func (e goengine.LoggerEntry ) {
130138 e .Error (err )
131139 e .Any ("notification" , notification )
@@ -142,7 +150,7 @@ func (b *projectionNotificationProcessor) startProcessor(ctx context.Context, ha
142150
143151// wrapProcessHandlerForSingleRun returns a wrapped ProcessHandler with a done channel that is closed after the
144152// provided ProcessHandler it's first call and related messages are finished or when the context is done.
145- func (b * projectionNotificationProcessor ) wrapProcessHandlerForSingleRun (handler ProcessHandler ) (ProcessHandler , chan struct {}) {
153+ func (b * ProjectionNotificationProcessor ) wrapProcessHandlerForSingleRun (handler ProcessHandler ) (ProcessHandler , chan struct {}) {
146154 done := make (chan struct {})
147155
148156 var m sync.Mutex
@@ -169,7 +177,7 @@ func (b *projectionNotificationProcessor) wrapProcessHandlerForSingleRun(handler
169177 close (done )
170178 default :
171179 // No more queued messages to close the run
172- if len ( b . queue ) == 0 {
180+ if b . notificationQueue . Empty () {
173181 close (done )
174182 }
175183 }
0 commit comments