Skip to content

Commit e45f508

Browse files
authored
Merge pull request #42 from hellofresh/patch/enhance-delays
Enhance delays
2 parents 67783fa + d98e192 commit e45f508

File tree

4 files changed

+83
-91
lines changed

4 files changed

+83
-91
lines changed

driver/sql/projection_notification_processor.go

Lines changed: 20 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"runtime"
66
"sync"
7-
"time"
87

98
"github.com/hellofresh/goengine"
109
"github.com/pkg/errors"
@@ -111,39 +110,31 @@ func (b *ProjectionNotificationProcessor) Queue(ctx context.Context, notificatio
111110
}
112111

113112
func (b *ProjectionNotificationProcessor) startProcessor(ctx context.Context, handler ProcessHandler) {
114-
ProcessorLoop:
115113
for {
116-
select {
117-
case <-b.done:
114+
notification, stopped := b.notificationQueue.Next(ctx)
115+
if stopped {
118116
return
119-
case <-ctx.Done():
120-
return
121-
case notification := <-b.notificationQueue.Channel():
122-
var queueFunc ProjectionTrigger
123-
if notification == nil {
124-
queueFunc = b.notificationQueue.Queue
125-
} else {
126-
queueFunc = b.notificationQueue.ReQueue
127-
128-
if notification.ValidAfter.After(time.Now()) {
129-
b.notificationQueue.PutBack(notification)
130-
continue ProcessorLoop
131-
}
132-
}
117+
}
133118

134-
// Execute the notification
135-
b.metrics.StartNotificationProcessing(notification)
136-
if err := handler(ctx, notification, queueFunc); err != nil {
137-
b.logger.Error("the ProcessHandler produced an error", func(e goengine.LoggerEntry) {
138-
e.Error(err)
139-
e.Any("notification", notification)
140-
})
119+
var queueFunc ProjectionTrigger
120+
if notification == nil {
121+
queueFunc = b.notificationQueue.Queue
122+
} else {
123+
queueFunc = b.notificationQueue.ReQueue
124+
}
141125

142-
b.metrics.FinishNotificationProcessing(notification, false)
126+
// Execute the notification
127+
b.metrics.StartNotificationProcessing(notification)
128+
if err := handler(ctx, notification, queueFunc); err != nil {
129+
b.logger.Error("the ProcessHandler produced an error", func(e goengine.LoggerEntry) {
130+
e.Error(err)
131+
e.Any("notification", notification)
132+
})
143133

144-
} else {
145-
b.metrics.FinishNotificationProcessing(notification, true)
146-
}
134+
b.metrics.FinishNotificationProcessing(notification, false)
135+
136+
} else {
137+
b.metrics.FinishNotificationProcessing(notification, true)
147138
}
148139
}
149140
}

driver/sql/projection_notification_processor_test.go

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -51,40 +51,43 @@ func TestStartProcessor(t *testing.T) {
5151
for _, testCase := range testCases {
5252
t.Run(testCase.title, func(t *testing.T) {
5353
ctrl := gomock.NewController(t)
54-
55-
nqMock := mocks.NewNotificationQueuer(ctrl)
54+
queueBufferSize := 1
55+
queueProcessorsCount := 1
5656
ctx := context.Background()
5757
notification := testCase.notification()
5858

59-
e := nqMock.EXPECT()
60-
queueCallCount := 0
61-
reQueueCallCount := 0
59+
notificationQueue := mocks.NewNotificationQueuer(ctrl)
60+
expect := notificationQueue.EXPECT()
61+
6262
switch testCase.queueFunc {
6363
case "Queue":
64-
queueCallCount++
64+
expect.Queue(gomock.Eq(ctx), gomock.Eq(notification)).Times(1)
6565
case "ReQueue":
66-
reQueueCallCount++
66+
expect.ReQueue(gomock.Eq(ctx), gomock.Eq(notification)).Times(1)
6767
}
6868

69-
e.Queue(gomock.Eq(ctx), gomock.Eq(notification)).Times(queueCallCount)
70-
e.ReQueue(gomock.Eq(ctx), gomock.Eq(notification)).Times(reQueueCallCount)
7169
done := make(chan struct{})
72-
e.Open().DoAndReturn(func() chan struct{} {
70+
channel := make(chan *sql.ProjectionNotification, queueBufferSize)
71+
channel <- notification
72+
called := false
73+
74+
expect.Open().DoAndReturn(func() chan struct{} {
7375
return done
7476
}).AnyTimes()
75-
channel := make(chan *sql.ProjectionNotification, 1)
76-
channel <- notification
77-
e.Channel().Return(channel).AnyTimes()
78-
e.PutBack(gomock.Eq(notification)).Do(func(notification *sql.ProjectionNotification) {
79-
channel <- notification
77+
78+
expect.Next(gomock.Eq(ctx)).DoAndReturn(func(ctx context.Context) (*sql.ProjectionNotification, bool) {
79+
if called {
80+
return nil, true
81+
}
82+
called = true
83+
return notification, false
8084
}).AnyTimes()
81-
e.Close().Do(func() {
85+
86+
expect.Close().Do(func() {
8287
close(channel)
8388
})
8489

85-
bufferSize := 1
86-
queueProcessorsCount := 1
87-
processor, err := sql.NewBackgroundProcessor(queueProcessorsCount, bufferSize, nil, nil, nqMock)
90+
processor, err := sql.NewBackgroundProcessor(queueProcessorsCount, queueBufferSize, nil, nil, notificationQueue)
8891
require.NoError(t, err)
8992

9093
var wg sync.WaitGroup

driver/sql/projection_notification_queue.go

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@ var _ ProjectionTrigger = (&NotificationQueue{}).ReQueue
1515
type (
1616
// NotificationQueuer describes a smart queue for projection notifications
1717
NotificationQueuer interface {
18-
Channel() chan *ProjectionNotification
18+
Open() chan struct{}
1919
Close()
20+
2021
Empty() bool
21-
Open() chan struct{}
22-
PutBack(*ProjectionNotification)
22+
Next(context.Context) (*ProjectionNotification, bool)
23+
2324
Queue(context.Context, *ProjectionNotification) error
2425
ReQueue(context.Context, *ProjectionNotification) error
2526
}
@@ -46,9 +47,12 @@ func newNotificationQueue(queueBuffer int, retryDelay time.Duration, metrics Met
4647
}
4748
}
4849

49-
// Channel returns the queue channel
50-
func (nq *NotificationQueue) Channel() chan *ProjectionNotification {
51-
return nq.queue
50+
// Open enables the queue for business
51+
func (nq *NotificationQueue) Open() chan struct{} {
52+
nq.done = make(chan struct{})
53+
nq.queue = make(chan *ProjectionNotification, nq.queueBuffer)
54+
55+
return nq.done
5256
}
5357

5458
// Close closes the queue channel
@@ -61,9 +65,22 @@ func (nq *NotificationQueue) Empty() bool {
6165
return len(nq.queue) == 0
6266
}
6367

64-
// PutBack sends a notification to the queue channel without further ado
65-
func (nq *NotificationQueue) PutBack(notification *ProjectionNotification) {
66-
nq.queue <- notification
68+
// Next yields the next notification on the queue or stopped when processor has stopped
69+
func (nq *NotificationQueue) Next(ctx context.Context) (*ProjectionNotification, bool) {
70+
for {
71+
select {
72+
case <-nq.done:
73+
return nil, true
74+
case <-ctx.Done():
75+
return nil, true
76+
case notification := <-nq.queue:
77+
if notification != nil && notification.ValidAfter.After(time.Now()) {
78+
nq.queue <- notification
79+
continue
80+
}
81+
return notification, false
82+
}
83+
}
6784
}
6885

6986
// Queue sends a notification to the queue
@@ -88,11 +105,3 @@ func (nq *NotificationQueue) ReQueue(ctx context.Context, notification *Projecti
88105

89106
return nq.Queue(ctx, notification)
90107
}
91-
92-
// Open enables the queue for business
93-
func (nq *NotificationQueue) Open() chan struct{} {
94-
nq.done = make(chan struct{})
95-
nq.queue = make(chan *ProjectionNotification, nq.queueBuffer)
96-
97-
return nq.done
98-
}

mocks/driver/sql/notification_queue.go

Lines changed: 15 additions & 26 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)