File tree Expand file tree Collapse file tree 3 files changed +36
-6
lines changed
Expand file tree Collapse file tree 3 files changed +36
-6
lines changed Original file line number Diff line number Diff line change @@ -17,7 +17,7 @@ type Option func(*config)
1717
1818// WithIdleTimeout configures the the amount of time that the worker pool must
1919// be idle before a worker is automatically stopped. If zero or unset the value
20- // defaults to DefaultIdleTimeout.
20+ // defaults to DefaultIdleTimeout. A negative value disables the idle timeout.
2121func WithIdleTimeout (timeout time.Duration ) Option {
2222 return func (c * config ) {
2323 if timeout != 0 {
Original file line number Diff line number Diff line change @@ -186,11 +186,18 @@ func (p *WorkerPool) Pause(ctx context.Context) {
186186// dispatch sends the next queued task to an available worker.
187187func (p * WorkerPool ) dispatch (idleTimeout time.Duration ) {
188188 defer close (p .stoppedChan )
189- timeout := time .NewTimer (idleTimeout )
190- var workerCount int
191189 var idle bool
190+ var idleTO <- chan time.Time
191+ var workerCount int
192192 var wg sync.WaitGroup
193193
194+ var timeout * time.Timer
195+ if idleTimeout > 0 {
196+ timeout = time .NewTimer (idleTimeout )
197+ defer timeout .Stop ()
198+ idleTO = timeout .C
199+ }
200+
194201Loop:
195202 for {
196203 // As long as tasks are in the waiting queue, incoming tasks are put
@@ -225,7 +232,7 @@ Loop:
225232 }
226233 }
227234 idle = false
228- case <- timeout . C :
235+ case <- idleTO :
229236 // Timed out waiting for work to arrive. Kill a ready worker if
230237 // pool has been idle for a whole timeout.
231238 if idle && workerCount > 0 {
@@ -249,8 +256,6 @@ Loop:
249256 workerCount --
250257 }
251258 wg .Wait ()
252-
253- timeout .Stop ()
254259}
255260
256261// worker executes tasks and stops when it receives a nil task.
Original file line number Diff line number Diff line change @@ -152,6 +152,31 @@ func TestWorkerTimeout(t *testing.T) {
152152 })
153153}
154154
155+ func TestWorkerNoTimeout (t * testing.T ) {
156+ synctest .Test (t , func (t * testing.T ) {
157+ wp := New (max , WithIdleTimeout (- 1 ))
158+ defer wp .Stop ()
159+
160+ // Start workers, and have them all wait on ctx before completing.
161+ ctx , cancel := context .WithCancel (context .Background ())
162+ wp .Pause (ctx )
163+ cancel ()
164+
165+ // Check that a worker timed out.
166+ time .Sleep (time .Hour )
167+ synctest .Wait ()
168+ if countReady (wp ) != max {
169+ t .Fatal ("no workers should have timed out" )
170+ }
171+ // Check again.
172+ time .Sleep (time .Hour )
173+ synctest .Wait ()
174+ if countReady (wp ) != max {
175+ t .Fatal ("no workers should have timed out" )
176+ }
177+ })
178+ }
179+
155180func TestStop (t * testing.T ) {
156181 synctest .Test (t , func (t * testing.T ) {
157182 wp := New (max )
You can’t perform that action at this time.
0 commit comments