Skip to content

Commit aa72915

Browse files
author
Christian Wygoda
committed
Use mocks for Queue/ReQueue in unit test
1 parent d225af1 commit aa72915

File tree

3 files changed

+137
-72
lines changed

3 files changed

+137
-72
lines changed

Gopkg.lock

Lines changed: 11 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

driver/sql/projection_notification_processor.go

Lines changed: 64 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ import (
1111
)
1212

1313
// Ensure the projectionNotificationProcessor.Queue is a ProjectionTrigger
14-
var _ ProjectionTrigger = (&projectionNotificationProcessor{}).Queue
14+
var _ ProjectionTrigger = (&notificationQueue{}).Queue
15+
var _ ProjectionTrigger = (&notificationQueue{}).ReQueue
1516

1617
type (
1718
// projectionNotificationProcessor provides a way to Trigger a notification using a set of background processes.
@@ -24,14 +25,64 @@ type (
2425
logger goengine.Logger
2526
metrics Metrics
2627

28+
notificationQueue notificationQueueInterface
29+
}
30+
31+
notificationQueueInterface interface {
32+
Start(done chan struct{}, queue chan *ProjectionNotification)
33+
Queue(ctx context.Context, notification *ProjectionNotification) error
34+
ReQueue(ctx context.Context, notification *ProjectionNotification) error
35+
}
36+
37+
notificationQueue struct {
2738
retryDelay time.Duration
39+
metrics Metrics
40+
done chan struct{}
41+
queue chan *ProjectionNotification
2842
}
2943

3044
// ProcessHandler is a func used to trigger a notification but with the addition of providing a Trigger func so
3145
// the original notification can trigger other notifications
3246
ProcessHandler func(context.Context, *ProjectionNotification, ProjectionTrigger) error
3347
)
3448

49+
func newNotificationQueue(retryDelay time.Duration, metrics Metrics) *notificationQueue {
50+
if retryDelay == 0 {
51+
retryDelay = time.Millisecond * 50
52+
}
53+
54+
return &notificationQueue{
55+
retryDelay: retryDelay,
56+
metrics: metrics,
57+
}
58+
}
59+
60+
func (nq *notificationQueue) Start(done chan struct{}, queue chan *ProjectionNotification) {
61+
nq.done = done
62+
nq.queue = queue
63+
}
64+
65+
func (nq *notificationQueue) Queue(ctx context.Context, notification *ProjectionNotification) error {
66+
select {
67+
default:
68+
case <-ctx.Done():
69+
return context.Canceled
70+
case <-nq.done:
71+
return errors.New("goengine: unable to queue notification because the processor was stopped")
72+
}
73+
74+
nq.metrics.QueueNotification(notification)
75+
76+
nq.queue <- notification
77+
return nil
78+
}
79+
80+
func (nq *notificationQueue) ReQueue(ctx context.Context, notification *ProjectionNotification) error {
81+
notification.ValidAfter = time.Now().Add(nq.retryDelay)
82+
83+
return nq.Queue(ctx, notification)
84+
}
85+
3586
// newBackgroundProcessor create a new projectionNotificationProcessor
3687
func newBackgroundProcessor(
3788
queueProcessors,
@@ -53,16 +104,12 @@ func newBackgroundProcessor(
53104
metrics = NopMetrics
54105
}
55106

56-
if retryDelay == 0 {
57-
retryDelay = time.Millisecond * 50
58-
}
59-
60107
return &projectionNotificationProcessor{
61-
queueProcessors: queueProcessors,
62-
queueBuffer: queueBuffer,
63-
logger: logger,
64-
metrics: metrics,
65-
retryDelay: retryDelay,
108+
queueProcessors: queueProcessors,
109+
queueBuffer: queueBuffer,
110+
logger: logger,
111+
metrics: metrics,
112+
notificationQueue: newNotificationQueue(retryDelay, metrics),
66113
}, nil
67114
}
68115

@@ -76,7 +123,7 @@ func (b *projectionNotificationProcessor) Execute(ctx context.Context, handler P
76123
defer stopExecutor()
77124

78125
// Execute a run of the internal.
79-
if err := b.Queue(ctx, nil); err != nil {
126+
if err := b.notificationQueue.Queue(ctx, nil); err != nil {
80127
return err
81128
}
82129

@@ -94,6 +141,8 @@ func (b *projectionNotificationProcessor) Start(ctx context.Context, handler Pro
94141
b.done = make(chan struct{})
95142
b.queue = make(chan *ProjectionNotification, b.queueBuffer)
96143

144+
b.notificationQueue.Start(b.done, b.queue)
145+
97146
var wg sync.WaitGroup
98147
wg.Add(b.queueProcessors)
99148
for i := 0; i < b.queueProcessors; i++ {
@@ -115,25 +164,12 @@ func (b *projectionNotificationProcessor) Start(ctx context.Context, handler Pro
115164

116165
// Queue puts the notification on the queue to be processed
117166
func (b *projectionNotificationProcessor) Queue(ctx context.Context, notification *ProjectionNotification) error {
118-
select {
119-
default:
120-
case <-ctx.Done():
121-
return context.Canceled
122-
case <-b.done:
123-
return errors.New("goengine: unable to queue notification because the processor was stopped")
124-
}
125-
126-
b.metrics.QueueNotification(notification)
127-
128-
b.queue <- notification
129-
return nil
167+
return b.notificationQueue.Queue(ctx, notification)
130168
}
131169

132170
// ReQueue puts the notification again on the queue to be processed with a ValidAfter set
133171
func (b *projectionNotificationProcessor) ReQueue(ctx context.Context, notification *ProjectionNotification) error {
134-
notification.ValidAfter = time.Now().Add(b.retryDelay)
135-
136-
return b.Queue(ctx, notification)
172+
return b.notificationQueue.ReQueue(ctx, notification)
137173
}
138174

139175
func (b *projectionNotificationProcessor) startProcessor(ctx context.Context, handler ProcessHandler) {
@@ -147,9 +183,9 @@ ProcessorLoop:
147183
case notification := <-b.queue:
148184
var queueFunc ProjectionTrigger
149185
if notification == nil {
150-
queueFunc = b.Queue
186+
queueFunc = b.notificationQueue.Queue
151187
} else {
152-
queueFunc = b.ReQueue
188+
queueFunc = b.notificationQueue.ReQueue
153189

154190
if notification.ValidAfter.After(time.Now()) {
155191
b.queue <- notification

driver/sql/projection_notification_processor_test.go

Lines changed: 62 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,52 @@ package sql
22

33
import (
44
"context"
5-
"reflect"
6-
"runtime"
75
"testing"
86
"time"
97

8+
"github.com/stretchr/testify/mock"
109
"github.com/stretchr/testify/require"
1110
)
1211

12+
type notificationQueueMock struct {
13+
mock.Mock
14+
done chan struct{}
15+
queue chan *ProjectionNotification
16+
}
17+
18+
func (m *notificationQueueMock) Start(done chan struct{}, queue chan *ProjectionNotification) {
19+
m.Called(done, queue)
20+
21+
m.done = done
22+
m.queue = queue
23+
}
24+
25+
func (m *notificationQueueMock) Queue(ctx context.Context, notification *ProjectionNotification) error {
26+
args := m.Called(ctx, notification)
27+
28+
m.queue <- notification
29+
30+
return args.Error(0)
31+
}
32+
33+
func (m *notificationQueueMock) ReQueue(ctx context.Context, notification *ProjectionNotification) error {
34+
args := m.Called(ctx, notification)
35+
36+
return args.Error(0)
37+
}
38+
1339
func TestStartProcessor(t *testing.T) {
1440
testCases := []struct {
1541
title string
1642
notification func() *ProjectionNotification
17-
mockHandler func(*testing.T, *projectionNotificationProcessor, context.CancelFunc) ProcessHandler
43+
queueMethod string
1844
}{
1945
{
2046
"Handle nil notification",
2147
func() *ProjectionNotification {
2248
return nil
2349
},
24-
func(t *testing.T, processor *projectionNotificationProcessor, cancel context.CancelFunc) ProcessHandler {
25-
return func(ctx context.Context, notification *ProjectionNotification, queue ProjectionTrigger) error {
26-
defer cancel()
27-
28-
expectedFuncName := runtime.FuncForPC(reflect.ValueOf(processor.Queue).Pointer()).Name()
29-
actualFuncName := runtime.FuncForPC(reflect.ValueOf(queue).Pointer()).Name()
30-
require.Equal(t, expectedFuncName, actualFuncName)
31-
return nil
32-
}
33-
},
50+
"Queue",
3451
},
3552
{
3653
"Handle new notification",
@@ -40,53 +57,55 @@ func TestStartProcessor(t *testing.T) {
4057
AggregateID: "abc",
4158
}
4259
},
43-
func(t *testing.T, processor *projectionNotificationProcessor, cancel context.CancelFunc) ProcessHandler {
44-
return func(ctx context.Context, notification *ProjectionNotification, queue ProjectionTrigger) error {
45-
defer cancel()
46-
47-
expectedFuncName := runtime.FuncForPC(reflect.ValueOf(processor.ReQueue).Pointer()).Name()
48-
actualFuncName := runtime.FuncForPC(reflect.ValueOf(queue).Pointer()).Name()
49-
require.Equal(t, expectedFuncName, actualFuncName)
50-
return nil
51-
}
52-
},
60+
"ReQueue",
5361
},
5462
{
5563
"Handle retried notification",
5664
func() *ProjectionNotification {
57-
now := time.Now().Add(time.Millisecond * 200)
5865
return &ProjectionNotification{
5966
No: 1,
6067
AggregateID: "abc",
61-
ValidAfter: now,
62-
}
63-
},
64-
func(t *testing.T, processor *projectionNotificationProcessor, cancel context.CancelFunc) ProcessHandler {
65-
return func(ctx context.Context, notification *ProjectionNotification, queue ProjectionTrigger) error {
66-
defer cancel()
67-
68-
expectedFuncName := runtime.FuncForPC(reflect.ValueOf(processor.ReQueue).Pointer()).Name()
69-
actualFuncName := runtime.FuncForPC(reflect.ValueOf(queue).Pointer()).Name()
70-
require.Equal(t, expectedFuncName, actualFuncName)
71-
72-
require.True(t, notification.ValidAfter.Before(time.Now()))
73-
return nil
68+
ValidAfter: time.Now().Add(time.Millisecond * 200),
7469
}
7570
},
71+
"ReQueue",
7672
},
7773
}
7874

7975
for _, testCase := range testCases {
8076
t.Run(testCase.title, func(t *testing.T) {
81-
processor, err := newBackgroundProcessor(1, 1, nil, nil, 0)
82-
require.Nil(t, err)
77+
bufferSize := 1
78+
queueProcessorsCount := 1
79+
retryDelay := time.Millisecond * 0
8380

81+
queue := make(chan *ProjectionNotification, bufferSize)
82+
done := make(chan struct{})
8483
ctx, cancel := context.WithCancel(context.Background())
85-
processor.queue = make(chan *ProjectionNotification, processor.queueBuffer)
84+
notification := testCase.notification()
8685

87-
err = processor.Queue(ctx, testCase.notification())
88-
require.Nil(t, err)
89-
processor.startProcessor(ctx, testCase.mockHandler(t, processor, cancel))
86+
processor, err := newBackgroundProcessor(queueProcessorsCount, bufferSize, nil, nil, retryDelay)
87+
require.NoError(t, err)
88+
processor.done = done
89+
processor.queue = queue
90+
91+
nqMock := &notificationQueueMock{}
92+
nqMock.On("Start", mock.Anything, queue).Return()
93+
nqMock.On(testCase.queueMethod, ctx, notification).Return(nil)
94+
nqMock.Start(processor.done, processor.queue)
95+
processor.notificationQueue = nqMock
96+
97+
queue <- notification
98+
99+
handler := func(ctx context.Context, notification *ProjectionNotification, queue ProjectionTrigger) error {
100+
defer cancel()
101+
102+
err := queue(ctx, notification)
103+
require.NoError(t, err)
104+
105+
nqMock.AssertExpectations(t)
106+
return nil
107+
}
108+
processor.startProcessor(ctx, handler)
90109
})
91110
}
92111
}

0 commit comments

Comments
 (0)