Skip to content

Commit 9ff3a8c

Browse files
Remove NotificationQueuer.Close
In order to cleanup the dual stop + close we introduce a queueLock allowing for a since queueCloser func to be returned by Open instead of a done channel which requires a Close call.
1 parent 6ac3fad commit 9ff3a8c

File tree

4 files changed

+35
-39
lines changed

4 files changed

+35
-39
lines changed

driver/sql/projection_notification_processor.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
type (
1313
// ProjectionNotificationProcessor provides a way to Trigger a notification using a set of background processes.
1414
ProjectionNotificationProcessor struct {
15-
done chan struct{}
1615
queueProcessors int
1716

1817
logger goengine.Logger
@@ -83,7 +82,7 @@ func (b *ProjectionNotificationProcessor) Execute(ctx context.Context, handler P
8382

8483
// Start starts the background processes that will call the ProcessHandler based on the notification queued by Exec
8584
func (b *ProjectionNotificationProcessor) Start(ctx context.Context, handler ProcessHandler) func() {
86-
b.done = b.notificationQueue.Open()
85+
queueClose := b.notificationQueue.Open()
8786

8887
var wg sync.WaitGroup
8988
wg.Add(b.queueProcessors)
@@ -98,9 +97,8 @@ func (b *ProjectionNotificationProcessor) Start(ctx context.Context, handler Pro
9897
runtime.Gosched()
9998

10099
return func() {
101-
close(b.done)
100+
queueClose()
102101
wg.Wait()
103-
b.notificationQueue.Close()
104102
}
105103
}
106104

driver/sql/projection_notification_processor_test.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,11 @@ func TestStartProcessor(t *testing.T) {
7373
channel <- notification
7474
called := false
7575

76-
expect.Open().DoAndReturn(func() chan struct{} {
77-
return done
76+
expect.Open().DoAndReturn(func() func() {
77+
return func() {
78+
close(done)
79+
close(channel)
80+
}
7881
}).AnyTimes()
7982

8083
expect.Next(gomock.Eq(ctx)).DoAndReturn(func(ctx context.Context) (*sql.ProjectionNotification, bool) {
@@ -85,10 +88,6 @@ func TestStartProcessor(t *testing.T) {
8588
return notification, false
8689
}).AnyTimes()
8790

88-
expect.Close().Do(func() {
89-
close(channel)
90-
})
91-
9291
processor, err := sql.NewBackgroundProcessor(queueProcessorsCount, queueBufferSize, nil, nil, notificationQueue)
9392
require.NoError(t, err)
9493

driver/sql/projection_notification_queue.go

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,17 @@ package sql
33
import (
44
"context"
55
"errors"
6+
"sync"
67
"time"
78
)
89

9-
// Ensure the NotificationQueue.Queue is a ProjectionTrigger
10-
var _ ProjectionTrigger = (&NotificationQueue{}).Queue
11-
12-
// Ensure the NotificationQueue.ReQueue is a ProjectionTrigger
13-
var _ ProjectionTrigger = (&NotificationQueue{}).ReQueue
10+
// Ensure the NotificationQueue is a NotificationQueuer
11+
var _ NotificationQueuer = &NotificationQueue{}
1412

1513
type (
1614
// NotificationQueuer describes a smart queue for projection notifications
1715
NotificationQueuer interface {
18-
Open() chan struct{}
19-
Close()
16+
Open() func()
2017

2118
Empty() bool
2219
Next(context.Context) (*ProjectionNotification, bool)
@@ -31,6 +28,7 @@ type (
3128
metrics Metrics
3229
done chan struct{}
3330
queue chan *ProjectionNotification
31+
queueLock sync.Mutex
3432
queueBuffer int
3533
}
3634
)
@@ -48,16 +46,21 @@ func newNotificationQueue(queueBuffer int, retryDelay time.Duration, metrics Met
4846
}
4947

5048
// Open enables the queue for business
51-
func (nq *NotificationQueue) Open() chan struct{} {
49+
func (nq *NotificationQueue) Open() func() {
50+
nq.queueLock.Lock()
51+
defer nq.queueLock.Unlock()
52+
5253
nq.done = make(chan struct{})
5354
nq.queue = make(chan *ProjectionNotification, nq.queueBuffer)
5455

55-
return nq.done
56-
}
56+
return func() {
57+
close(nq.done)
5758

58-
// Close closes the queue channel
59-
func (nq *NotificationQueue) Close() {
60-
close(nq.queue)
59+
nq.queueLock.Lock()
60+
defer nq.queueLock.Unlock()
61+
62+
close(nq.queue)
63+
}
6164
}
6265

6366
// Empty returns whether the queue is empty
@@ -75,7 +78,7 @@ func (nq *NotificationQueue) Next(ctx context.Context) (*ProjectionNotification,
7578
return nil, true
7679
case notification := <-nq.queue:
7780
if notification != nil && notification.ValidAfter.After(time.Now()) {
78-
nq.queue <- notification
81+
nq.queueNotification(notification)
7982
continue
8083
}
8184
return notification, false
@@ -95,7 +98,8 @@ func (nq *NotificationQueue) Queue(ctx context.Context, notification *Projection
9598

9699
nq.metrics.QueueNotification(notification)
97100

98-
nq.queue <- notification
101+
nq.queueNotification(notification)
102+
99103
return nil
100104
}
101105

@@ -105,3 +109,10 @@ func (nq *NotificationQueue) ReQueue(ctx context.Context, notification *Projecti
105109

106110
return nq.Queue(ctx, notification)
107111
}
112+
113+
func (nq *NotificationQueue) queueNotification(notification *ProjectionNotification) {
114+
nq.queueLock.Lock()
115+
defer nq.queueLock.Unlock()
116+
117+
nq.queue <- notification
118+
}

mocks/driver/sql/notification_queue.go

Lines changed: 2 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)