@@ -3,20 +3,18 @@ package sql
33import (
44 "context"
55 "errors"
6+ "sync"
7+ "sync/atomic"
68 "time"
79)
810
9- // Ensure the NotificationQueue.Queue is a ProjectionTrigger
10- var _ ProjectionTrigger = (& NotificationQueue {}).Queue
11-
12- // Ensure the NotificationQueue.ReQueue is a ProjectionTrigger
13- var _ ProjectionTrigger = (& NotificationQueue {}).ReQueue
11+ // Ensure the NotificationQueue is a NotificationQueuer
12+ var _ NotificationQueuer = & NotificationQueue {}
1413
1514type (
1615 // NotificationQueuer describes a smart queue for projection notifications
1716 NotificationQueuer interface {
18- Open () chan struct {}
19- Close ()
17+ Open () func ()
2018
2119 Empty () bool
2220 Next (context.Context ) (* ProjectionNotification , bool )
3129 metrics Metrics
3230 done chan struct {}
3331 queue chan * ProjectionNotification
32+ queueLock sync.Mutex
3433 queueBuffer int
34+ queueCount int32
3535 }
3636)
3737
@@ -48,21 +48,27 @@ func newNotificationQueue(queueBuffer int, retryDelay time.Duration, metrics Met
4848}
4949
5050// Open enables the queue for business
51- func (nq * NotificationQueue ) Open () chan struct {} {
51+ func (nq * NotificationQueue ) Open () func () {
52+ nq .queueLock .Lock ()
53+ defer nq .queueLock .Unlock ()
54+
5255 nq .done = make (chan struct {})
5356 nq .queue = make (chan * ProjectionNotification , nq .queueBuffer )
57+ nq .queueCount = 0
5458
55- return nq .done
56- }
59+ return func () {
60+ close (nq .done )
61+
62+ nq .queueLock .Lock ()
63+ defer nq .queueLock .Unlock ()
5764
58- // Close closes the queue channel
59- func (nq * NotificationQueue ) Close () {
60- close (nq .queue )
65+ close (nq .queue )
66+ }
6167}
6268
6369// Empty returns whether the queue is empty
6470func (nq * NotificationQueue ) Empty () bool {
65- return len ( nq .queue ) == 0
71+ return atomic . LoadInt32 ( & nq .queueCount ) == 0
6672}
6773
6874// Next yields the next notification on the queue or stopped when processor has stopped
@@ -75,9 +81,11 @@ func (nq *NotificationQueue) Next(ctx context.Context) (*ProjectionNotification,
7581 return nil , true
7682 case notification := <- nq .queue :
7783 if notification != nil && notification .ValidAfter .After (time .Now ()) {
78- nq .queue <- notification
84+ nq .queueNotification ( notification )
7985 continue
8086 }
87+
88+ atomic .AddInt32 (& nq .queueCount , - 1 )
8189 return notification , false
8290 }
8391 }
@@ -93,9 +101,12 @@ func (nq *NotificationQueue) Queue(ctx context.Context, notification *Projection
93101 return errors .New ("goengine: unable to queue notification because the processor was stopped" )
94102 }
95103
104+ atomic .AddInt32 (& nq .queueCount , 1 )
105+
96106 nq .metrics .QueueNotification (notification )
97107
98- nq .queue <- notification
108+ nq .queueNotification (notification )
109+
99110 return nil
100111}
101112
@@ -105,3 +116,15 @@ func (nq *NotificationQueue) ReQueue(ctx context.Context, notification *Projecti
105116
106117 return nq .Queue (ctx , notification )
107118}
119+
120+ func (nq * NotificationQueue ) queueNotification (notification * ProjectionNotification ) {
121+ nq .queueLock .Lock ()
122+ defer nq .queueLock .Unlock ()
123+
124+ select {
125+ case <- nq .done :
126+ return
127+ default :
128+ nq .queue <- notification
129+ }
130+ }
0 commit comments