@@ -2,6 +2,10 @@ package redisdb
2
2
3
3
import (
4
4
"context"
5
+ "errors"
6
+ "fmt"
7
+ "log"
8
+ "runtime"
5
9
"testing"
6
10
"time"
7
11
@@ -61,6 +65,199 @@ func TestNSQShutdown(t *testing.T) {
61
65
q .Wait ()
62
66
}
63
67
68
+ func TestCustomFuncAndWait (t * testing.T ) {
69
+ m := & mockMessage {
70
+ Message : "foo" ,
71
+ }
72
+ w := NewWorker (
73
+ WithAddr (host + ":6379" ),
74
+ WithChannel ("test3" ),
75
+ WithRunFunc (func (ctx context.Context , m queue.QueuedMessage ) error {
76
+ time .Sleep (500 * time .Millisecond )
77
+ return nil
78
+ }),
79
+ )
80
+ q , err := queue .NewQueue (
81
+ queue .WithWorker (w ),
82
+ queue .WithWorkerCount (10 ),
83
+ )
84
+ assert .NoError (t , err )
85
+ q .Start ()
86
+ time .Sleep (100 * time .Millisecond )
87
+ assert .NoError (t , q .Queue (m ))
88
+ assert .NoError (t , q .Queue (m ))
89
+ assert .NoError (t , q .Queue (m ))
90
+ assert .NoError (t , q .Queue (m ))
91
+ time .Sleep (1000 * time .Millisecond )
92
+ q .Shutdown ()
93
+ q .Wait ()
94
+ // you will see the execute time > 1000ms
95
+ }
96
+
97
+ func TestEnqueueJobAfterShutdown (t * testing.T ) {
98
+ m := mockMessage {
99
+ Message : "foo" ,
100
+ }
101
+ w := NewWorker (
102
+ WithAddr (host + ":6379" ),
103
+ )
104
+ q , err := queue .NewQueue (
105
+ queue .WithWorker (w ),
106
+ queue .WithWorkerCount (2 ),
107
+ )
108
+ assert .NoError (t , err )
109
+ q .Start ()
110
+ time .Sleep (50 * time .Millisecond )
111
+ q .Shutdown ()
112
+ // can't queue task after shutdown
113
+ err = q .Queue (m )
114
+ assert .Error (t , err )
115
+ assert .Equal (t , queue .ErrQueueShutdown , err )
116
+ q .Wait ()
117
+ }
118
+
119
+ func TestWorkerNumAfterShutdown (t * testing.T ) {
120
+ w := NewWorker (
121
+ WithAddr (host + ":6379" ),
122
+ )
123
+ q , err := queue .NewQueue (
124
+ queue .WithWorker (w ),
125
+ queue .WithWorkerCount (2 ),
126
+ )
127
+ assert .NoError (t , err )
128
+ q .Start ()
129
+ q .Start ()
130
+ time .Sleep (100 * time .Millisecond )
131
+ assert .Equal (t , 4 , q .Workers ())
132
+ q .Shutdown ()
133
+ q .Wait ()
134
+ assert .Equal (t , 0 , q .Workers ())
135
+ q .Start ()
136
+ q .Start ()
137
+ assert .Equal (t , 0 , q .Workers ())
138
+ }
139
+
140
+ func TestJobReachTimeout (t * testing.T ) {
141
+ m := mockMessage {
142
+ Message : "foo" ,
143
+ }
144
+ w := NewWorker (
145
+ WithAddr (host + ":6379" ),
146
+ WithChannel ("timeout" ),
147
+ WithRunFunc (func (ctx context.Context , m queue.QueuedMessage ) error {
148
+ for {
149
+ select {
150
+ case <- ctx .Done ():
151
+ log .Println ("get data:" , string (m .Bytes ()))
152
+ if errors .Is (ctx .Err (), context .Canceled ) {
153
+ log .Println ("queue has been shutdown and cancel the job" )
154
+ } else if errors .Is (ctx .Err (), context .DeadlineExceeded ) {
155
+ log .Println ("job deadline exceeded" )
156
+ }
157
+ return nil
158
+ default :
159
+ }
160
+ time .Sleep (50 * time .Millisecond )
161
+ }
162
+ }),
163
+ )
164
+ q , err := queue .NewQueue (
165
+ queue .WithWorker (w ),
166
+ queue .WithWorkerCount (2 ),
167
+ )
168
+ assert .NoError (t , err )
169
+ q .Start ()
170
+ time .Sleep (50 * time .Millisecond )
171
+ assert .NoError (t , q .QueueWithTimeout (20 * time .Millisecond , m ))
172
+ time .Sleep (2 * time .Second )
173
+ q .Shutdown ()
174
+ q .Wait ()
175
+ }
176
+
177
+ func TestCancelJobAfterShutdown (t * testing.T ) {
178
+ m := mockMessage {
179
+ Message : "test" ,
180
+ }
181
+ w := NewWorker (
182
+ WithAddr (host + ":6379" ),
183
+ WithChannel ("cancel" ),
184
+ WithLogger (queue .NewLogger ()),
185
+ WithRunFunc (func (ctx context.Context , m queue.QueuedMessage ) error {
186
+ for {
187
+ select {
188
+ case <- ctx .Done ():
189
+ log .Println ("get data:" , string (m .Bytes ()))
190
+ if errors .Is (ctx .Err (), context .Canceled ) {
191
+ log .Println ("queue has been shutdown and cancel the job" )
192
+ } else if errors .Is (ctx .Err (), context .DeadlineExceeded ) {
193
+ log .Println ("job deadline exceeded" )
194
+ }
195
+ return nil
196
+ default :
197
+ }
198
+ time .Sleep (50 * time .Millisecond )
199
+ }
200
+ }),
201
+ )
202
+ q , err := queue .NewQueue (
203
+ queue .WithWorker (w ),
204
+ queue .WithWorkerCount (2 ),
205
+ )
206
+ assert .NoError (t , err )
207
+ q .Start ()
208
+ time .Sleep (50 * time .Millisecond )
209
+ assert .NoError (t , q .QueueWithTimeout (3 * time .Second , m ))
210
+ time .Sleep (2 * time .Second )
211
+ q .Shutdown ()
212
+ q .Wait ()
213
+ }
214
+
215
+ func TestGoroutineLeak (t * testing.T ) {
216
+ m := mockMessage {
217
+ Message : "foo" ,
218
+ }
219
+ w := NewWorker (
220
+ WithAddr (host + ":6379" ),
221
+ WithChannel ("GoroutineLeak" ),
222
+ WithLogger (queue .NewEmptyLogger ()),
223
+ WithRunFunc (func (ctx context.Context , m queue.QueuedMessage ) error {
224
+ for {
225
+ select {
226
+ case <- ctx .Done ():
227
+ log .Println ("get data:" , string (m .Bytes ()))
228
+ if errors .Is (ctx .Err (), context .Canceled ) {
229
+ log .Println ("queue has been shutdown and cancel the job" )
230
+ } else if errors .Is (ctx .Err (), context .DeadlineExceeded ) {
231
+ log .Println ("job deadline exceeded" )
232
+ }
233
+ return nil
234
+ default :
235
+ log .Println ("get data:" , string (m .Bytes ()))
236
+ time .Sleep (50 * time .Millisecond )
237
+ return nil
238
+ }
239
+ }
240
+ }),
241
+ )
242
+ q , err := queue .NewQueue (
243
+ queue .WithLogger (queue .NewEmptyLogger ()),
244
+ queue .WithWorker (w ),
245
+ queue .WithWorkerCount (10 ),
246
+ )
247
+ assert .NoError (t , err )
248
+ q .Start ()
249
+ time .Sleep (50 * time .Millisecond )
250
+ for i := 0 ; i < 500 ; i ++ {
251
+ m .Message = fmt .Sprintf ("foobar: %d" , i + 1 )
252
+ assert .NoError (t , q .Queue (m ))
253
+ }
254
+ time .Sleep (2 * time .Second )
255
+ q .Shutdown ()
256
+ q .Wait ()
257
+ time .Sleep (2 * time .Second )
258
+ fmt .Println ("number of goroutines:" , runtime .NumGoroutine ())
259
+ }
260
+
64
261
func TestGoroutinePanic (t * testing.T ) {
65
262
m := mockMessage {
66
263
Message : "foo" ,
@@ -86,3 +283,82 @@ func TestGoroutinePanic(t *testing.T) {
86
283
assert .Error (t , q .Queue (m ))
87
284
q .Wait ()
88
285
}
286
+
287
+ func TestHandleTimeout (t * testing.T ) {
288
+ job := queue.Job {
289
+ Timeout : 100 * time .Millisecond ,
290
+ Body : []byte ("foo" ),
291
+ }
292
+ w := NewWorker (
293
+ WithRunFunc (func (ctx context.Context , m queue.QueuedMessage ) error {
294
+ time .Sleep (200 * time .Millisecond )
295
+ return nil
296
+ }),
297
+ )
298
+
299
+ err := w .handle (job )
300
+ assert .Error (t , err )
301
+ assert .Equal (t , context .DeadlineExceeded , err )
302
+
303
+ job = queue.Job {
304
+ Timeout : 150 * time .Millisecond ,
305
+ Body : []byte ("foo" ),
306
+ }
307
+
308
+ w = NewWorker (
309
+ WithRunFunc (func (ctx context.Context , m queue.QueuedMessage ) error {
310
+ time .Sleep (200 * time .Millisecond )
311
+ return nil
312
+ }),
313
+ )
314
+
315
+ done := make (chan error )
316
+ go func () {
317
+ done <- w .handle (job )
318
+ }()
319
+
320
+ assert .NoError (t , w .Shutdown ())
321
+
322
+ err = <- done
323
+ assert .Error (t , err )
324
+ assert .Equal (t , context .DeadlineExceeded , err )
325
+ }
326
+
327
+ func TestJobComplete (t * testing.T ) {
328
+ job := queue.Job {
329
+ Timeout : 100 * time .Millisecond ,
330
+ Body : []byte ("foo" ),
331
+ }
332
+ w := NewWorker (
333
+ WithRunFunc (func (ctx context.Context , m queue.QueuedMessage ) error {
334
+ return errors .New ("job completed" )
335
+ }),
336
+ )
337
+
338
+ err := w .handle (job )
339
+ assert .Error (t , err )
340
+ assert .Equal (t , errors .New ("job completed" ), err )
341
+
342
+ job = queue.Job {
343
+ Timeout : 250 * time .Millisecond ,
344
+ Body : []byte ("foo" ),
345
+ }
346
+
347
+ w = NewWorker (
348
+ WithRunFunc (func (ctx context.Context , m queue.QueuedMessage ) error {
349
+ time .Sleep (200 * time .Millisecond )
350
+ return errors .New ("job completed" )
351
+ }),
352
+ )
353
+
354
+ done := make (chan error )
355
+ go func () {
356
+ done <- w .handle (job )
357
+ }()
358
+
359
+ assert .NoError (t , w .Shutdown ())
360
+
361
+ err = <- done
362
+ assert .Error (t , err )
363
+ assert .Equal (t , errors .New ("job completed" ), err )
364
+ }
0 commit comments