Skip to content

Commit d4c2109

Browse files
author
Christian Wygoda
committed
Move done channel to NotificationQueuer
1 parent a6fade4 commit d4c2109

File tree

4 files changed

+19
-12
lines changed

4 files changed

+19
-12
lines changed

driver/sql/projection_notification_processor.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,7 @@ func (b *ProjectionNotificationProcessor) Execute(ctx context.Context, handler P
8484

8585
// Start starts the background processes that will call the ProcessHandler based on the notification queued by Exec
8686
func (b *ProjectionNotificationProcessor) Start(ctx context.Context, handler ProcessHandler) func() {
87-
b.done = make(chan struct{})
88-
89-
b.notificationQueue.Open(b.done)
87+
b.done = b.notificationQueue.Open()
9088

9189
var wg sync.WaitGroup
9290
wg.Add(b.queueProcessors)

driver/sql/projection_notification_processor_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,14 +68,19 @@ func TestStartProcessor(t *testing.T) {
6868

6969
e.Queue(gomock.Eq(ctx), gomock.Eq(notification)).Times(queueCallCount)
7070
e.ReQueue(gomock.Eq(ctx), gomock.Eq(notification)).Times(reQueueCallCount)
71-
e.Open(gomock.Any()).AnyTimes()
71+
done := make(chan struct{})
72+
e.Open().DoAndReturn(func() chan struct{} {
73+
return done
74+
}).AnyTimes()
7275
channel := make(chan *sql.ProjectionNotification, 1)
7376
channel <- notification
7477
e.Channel().Return(channel).AnyTimes()
7578
e.PutBack(gomock.Eq(notification)).Do(func(notification *sql.ProjectionNotification) {
7679
channel <- notification
7780
}).AnyTimes()
78-
e.Close()
81+
e.Close().Do(func() {
82+
close(channel)
83+
})
7984

8085
bufferSize := 1
8186
queueProcessorsCount := 1

driver/sql/projection_notification_queue.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ type (
1818
Channel() chan *ProjectionNotification
1919
Close()
2020
Empty() bool
21-
Open(chan struct{})
21+
Open() chan struct{}
2222
PutBack(*ProjectionNotification)
2323
Queue(context.Context, *ProjectionNotification) error
2424
ReQueue(context.Context, *ProjectionNotification) error
@@ -90,7 +90,9 @@ func (nq *NotificationQueue) ReQueue(ctx context.Context, notification *Projecti
9090
}
9191

9292
// Open enables the queue for business
93-
func (nq *NotificationQueue) Open(done chan struct{}) {
94-
nq.done = done
93+
func (nq *NotificationQueue) Open() chan struct{} {
94+
nq.done = make(chan struct{})
9595
nq.queue = make(chan *ProjectionNotification, nq.queueBuffer)
96+
97+
return nq.done
9698
}

mocks/driver/sql/notification_queue.go

Lines changed: 6 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)