@@ -5,8 +5,10 @@ package queue
55
66import (
77 "context"
8+ "slices"
89 "strconv"
910 "sync"
11+ "sync/atomic"
1012 "testing"
1113 "time"
1214
@@ -250,23 +252,34 @@ func TestWorkerPoolQueueShutdown(t *testing.T) {
250252
251253func TestWorkerPoolQueueWorkerIdleReset (t * testing.T ) {
252254 defer test .MockVariableValue (& workerIdleDuration , 10 * time .Millisecond )()
253- defer mockBackoffDuration (10 * time .Millisecond )()
255+ defer mockBackoffDuration (5 * time .Millisecond )()
254256
257+ var q * WorkerPoolQueue [int ]
258+ var handledCount atomic.Int32
259+ var hasOnlyOneWorkerRunning atomic.Bool
255260 handler := func (items ... int ) (unhandled []int ) {
256- time .Sleep (50 * time .Millisecond )
261+ handledCount .Add (int32 (len (items )))
262+ // make each work have different duration, and check the active worker number periodically
263+ var activeNums []int
264+ for i := 0 ; i < 5 - items [0 ]% 2 ; i ++ {
265+ time .Sleep (workerIdleDuration * 2 )
266+ activeNums = append (activeNums , q .GetWorkerActiveNumber ())
267+ }
268+ // When the queue never becomes empty, the existing workers should keep working
269+ // It is not 100% true at the moment because the data-race in workergroup.go is not resolved, see that TODO */
270+ // If the "active worker numbers" is like [2 2 ... 1 1], it means that an existing worker exited and the no new worker is started.
271+ if slices .Equal ([]int {1 , 1 }, activeNums [len (activeNums )- 2 :]) {
272+ hasOnlyOneWorkerRunning .Store (true )
273+ }
257274 return nil
258275 }
259-
260- q , _ := newWorkerPoolQueueForTest ("test-workpoolqueue" , setting.QueueSettings {Type : "channel" , BatchLength : 1 , MaxWorkers : 2 , Length : 100 }, handler , false )
276+ q , _ = newWorkerPoolQueueForTest ("test-workpoolqueue" , setting.QueueSettings {Type : "channel" , BatchLength : 1 , MaxWorkers : 2 , Length : 100 }, handler , false )
261277 stop := runWorkerPoolQueue (q )
262- for i := 0 ; i < 20 ; i ++ {
278+ for i := 0 ; i < 100 ; i ++ {
263279 assert .NoError (t , q .Push (i ))
264280 }
265-
266281 time .Sleep (500 * time .Millisecond )
267- assert .EqualValues (t , 2 , q .GetWorkerNumber ())
268- assert .EqualValues (t , 2 , q .GetWorkerActiveNumber ())
269- // when the queue never becomes empty, the existing workers should keep working
270- assert .EqualValues (t , 2 , q .workerStartedCounter )
282+ assert .Greater (t , int (handledCount .Load ()), 4 ) // make sure there are enough items handled during the test
283+ assert .False (t , hasOnlyOneWorkerRunning .Load (), "a slow handler should not block other workers from starting" )
271284 stop ()
272285}
0 commit comments