@@ -43,6 +43,7 @@ func TestDefaultWorkFlow(t *testing.T) {
43
43
assert .NoError (t , q .Queue (m ))
44
44
assert .NoError (t , q .Queue (m ))
45
45
q .Start ()
46
+ time .Sleep (200 * time .Millisecond )
46
47
q .Release ()
47
48
}
48
49
@@ -228,3 +229,109 @@ func TestGoroutineLeak(t *testing.T) {
228
229
q .Wait ()
229
230
fmt .Println ("number of goroutines:" , runtime .NumGoroutine ())
230
231
}
232
+
233
+ func TestGoroutinePanic (t * testing.T ) {
234
+ m := mockMessage {
235
+ Message : "foo" ,
236
+ }
237
+ w := NewWorker (
238
+ WithSubj ("GoroutinePanic" ),
239
+ WithRunFunc (func (ctx context.Context , m core.QueuedMessage ) error {
240
+ panic ("missing something" )
241
+ }),
242
+ )
243
+ q , err := queue .NewQueue (
244
+ queue .WithWorker (w ),
245
+ queue .WithWorkerCount (2 ),
246
+ )
247
+ assert .NoError (t , err )
248
+ q .Start ()
249
+ time .Sleep (50 * time .Millisecond )
250
+ assert .NoError (t , q .Queue (m ))
251
+ assert .NoError (t , q .Queue (m ))
252
+ time .Sleep (2 * time .Second )
253
+ q .Shutdown ()
254
+ assert .Error (t , q .Queue (m ))
255
+ q .Wait ()
256
+ }
257
+
258
+ func TestHandleTimeout (t * testing.T ) {
259
+ job := & queue.Job {
260
+ Timeout : 100 * time .Millisecond ,
261
+ Payload : []byte ("foo" ),
262
+ }
263
+ w := NewWorker (
264
+ WithRunFunc (func (ctx context.Context , m core.QueuedMessage ) error {
265
+ time .Sleep (200 * time .Millisecond )
266
+ return nil
267
+ }),
268
+ )
269
+
270
+ err := w .handle (job )
271
+ assert .Error (t , err )
272
+ assert .Equal (t , context .DeadlineExceeded , err )
273
+ assert .NoError (t , w .Shutdown ())
274
+
275
+ job = & queue.Job {
276
+ Timeout : 150 * time .Millisecond ,
277
+ Payload : []byte ("foo" ),
278
+ }
279
+
280
+ w = NewWorker (
281
+ WithRunFunc (func (ctx context.Context , m core.QueuedMessage ) error {
282
+ time .Sleep (200 * time .Millisecond )
283
+ return nil
284
+ }),
285
+ )
286
+
287
+ done := make (chan error )
288
+ go func () {
289
+ done <- w .handle (job )
290
+ }()
291
+
292
+ assert .NoError (t , w .Shutdown ())
293
+
294
+ err = <- done
295
+ assert .Error (t , err )
296
+ assert .Equal (t , context .DeadlineExceeded , err )
297
+ }
298
+
299
+ func TestJobComplete (t * testing.T ) {
300
+ job := & queue.Job {
301
+ Timeout : 100 * time .Millisecond ,
302
+ Payload : []byte ("foo" ),
303
+ }
304
+ w := NewWorker (
305
+ WithRunFunc (func (ctx context.Context , m core.QueuedMessage ) error {
306
+ return errors .New ("job completed" )
307
+ }),
308
+ )
309
+
310
+ err := w .handle (job )
311
+ assert .Error (t , err )
312
+ assert .Equal (t , errors .New ("job completed" ), err )
313
+ assert .NoError (t , w .Shutdown ())
314
+
315
+ job = & queue.Job {
316
+ Timeout : 250 * time .Millisecond ,
317
+ Payload : []byte ("foo" ),
318
+ }
319
+
320
+ w = NewWorker (
321
+ WithRunFunc (func (ctx context.Context , m core.QueuedMessage ) error {
322
+ time .Sleep (200 * time .Millisecond )
323
+ return errors .New ("job completed" )
324
+ }),
325
+ )
326
+
327
+ done := make (chan error )
328
+ go func () {
329
+ done <- w .handle (job )
330
+ }()
331
+
332
+ assert .NoError (t , w .Shutdown ())
333
+
334
+ err = <- done
335
+ assert .Error (t , err )
336
+ assert .Equal (t , errors .New ("job completed" ), err )
337
+ }
0 commit comments