Skip to content

Commit 1e2100c

Browse files
committed
Add some tests for workerPool.ScheduleWithTimeout()
1 parent a4f7c86 commit 1e2100c

File tree

2 files changed

+85
-14
lines changed

2 files changed

+85
-14
lines changed

worker/pool.go

Lines changed: 51 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,9 @@ type DefaultWorkerPool struct {
7878
spawnWorkerCh fpgo.ChannelQueue[int]
7979
lastAliveTime time.Time
8080

81-
scheduleWaitCh fpgo.ChannelQueue[int]
82-
scheduleWaitCount int
81+
scheduleWaitQueue fpgo.Queue[fpgo.ChannelQueue[int]]
82+
scheduleWaitChPool sync.Pool
83+
scheduleWaitCount int
8384

8485
// Settings
8586
DefaultWorkerPoolSettings
@@ -93,12 +94,15 @@ func NewDefaultWorkerPool(jobQueue *fpgo.BufferedChannelQueue[func()], settings
9394
workerPool := &DefaultWorkerPool{
9495
jobQueue: jobQueue,
9596

96-
spawnWorkerCh: fpgo.NewChannelQueue[int](1),
97-
scheduleWaitCh: fpgo.NewChannelQueue[int](1),
97+
spawnWorkerCh: fpgo.NewChannelQueue[int](1),
98+
scheduleWaitQueue: fpgo.NewConcurrentQueue[fpgo.ChannelQueue[int]](fpgo.NewLinkedListQueue[fpgo.ChannelQueue[int]]()),
9899

99100
// Settings
100101
DefaultWorkerPoolSettings: *settings,
101102
}
103+
workerPool.scheduleWaitChPool.New = func() interface{} {
104+
return fpgo.NewChannelQueue[int](1)
105+
}
102106
go workerPool.spawnLoop()
103107

104108
return workerPool
@@ -108,12 +112,12 @@ func NewDefaultWorkerPool(jobQueue *fpgo.BufferedChannelQueue[func()], settings
108112
func (workerPoolSelf *DefaultWorkerPool) trySpawn() {
109113
workerPoolSelf.lock.RLock()
110114
batchSize := workerPoolSelf.workerBatchSize
111-
if batchSize < 1 {
112-
batchSize = 1
113-
}
114-
expectedWorkerCount := workerPoolSelf.jobQueue.Count() / batchSize
115-
if workerPoolSelf.jobQueue.Count()%batchSize > 0 {
116-
expectedWorkerCount++
115+
var expectedWorkerCount int
116+
if batchSize > 0 {
117+
expectedWorkerCount = workerPoolSelf.jobQueue.Count() / batchSize
118+
if workerPoolSelf.jobQueue.Count()%batchSize > 0 {
119+
expectedWorkerCount++
120+
}
117121
}
118122
if workerPoolSelf.workerSizeStandBy > expectedWorkerCount {
119123
expectedWorkerCount = workerPoolSelf.workerSizeStandBy
@@ -198,8 +202,18 @@ func (workerPoolSelf *DefaultWorkerPool) generateWorkerWithMaximum(maximum int)
198202
loopLabel:
199203
for {
200204
workerPoolSelf.lastAliveTime = time.Now()
205+
workerPoolSelf.lock.RLock()
201206
if workerPoolSelf.scheduleWaitCount > 0 {
202-
workerPoolSelf.scheduleWaitCh.Offer(1)
207+
ch, _ := workerPoolSelf.scheduleWaitQueue.Poll()
208+
if ch != nil {
209+
ch.Offer(1)
210+
defer workerPoolSelf.scheduleWaitChPool.Put(ch)
211+
}
212+
}
213+
workerPoolSelf.lock.RUnlock()
214+
215+
if workerPoolSelf.IsClosed() {
216+
return
203217
}
204218

205219
select {
@@ -229,6 +243,12 @@ func (workerPoolSelf *DefaultWorkerPool) SetJobQueue(jobQueue *fpgo.BufferedChan
229243
return workerPoolSelf
230244
}
231245

246+
// SetScheduleWaitQueue Set the ScheduleWaitQueue(WARNING: if the pool has started to use, doing this is not safe)
247+
func (workerPoolSelf *DefaultWorkerPool) SetScheduleWaitQueue(scheduleWaitQueue fpgo.Queue[fpgo.ChannelQueue[int]]) *DefaultWorkerPool {
248+
workerPoolSelf.scheduleWaitQueue = scheduleWaitQueue
249+
return workerPoolSelf
250+
}
251+
232252
// SetIsJobQueueClosedWhenClose Set is the JobQueue closed when the WorkerPool.Close()
233253
func (workerPoolSelf *DefaultWorkerPool) SetIsJobQueueClosedWhenClose(isJobQueueClosedWhenClose bool) *DefaultWorkerPool {
234254
workerPoolSelf.isJobQueueClosedWhenClose = isJobQueueClosedWhenClose
@@ -328,9 +348,20 @@ func (workerPoolSelf *DefaultWorkerPool) ScheduleWithTimeout(fn func(), timeout
328348
return err
329349
}
330350

351+
// if retry < 1 {
352+
// retry = 1
353+
// }
354+
// retryInterval := timeout / time.Duration(retry+1)
331355
deadline := time.Now().Add(timeout)
332356
workerPoolSelf.addScheduleWaitCount(1)
333-
go workerPoolSelf.addScheduleWaitCount(-1)
357+
defer workerPoolSelf.addScheduleWaitCount(-1)
358+
359+
scheduleWaitCh := workerPoolSelf.scheduleWaitChPool.Get().(fpgo.ChannelQueue[int])
360+
defer workerPoolSelf.scheduleWaitChPool.Put(scheduleWaitCh)
361+
err = workerPoolSelf.scheduleWaitQueue.Offer(scheduleWaitCh)
362+
if err != nil {
363+
return err
364+
}
334365

335366
for {
336367
if workerPoolSelf.IsClosed() {
@@ -342,8 +373,15 @@ func (workerPoolSelf *DefaultWorkerPool) ScheduleWithTimeout(fn func(), timeout
342373
return err
343374
}
344375

376+
// fmt.Println(retryInterval)
377+
// fmt.Println(deadline.Sub(now))
378+
// if time.Now().After(deadline) {
379+
// return ErrWorkerPoolScheduleTimeout
380+
// }
381+
// time.Sleep(retryInterval)
382+
345383
select {
346-
case <-workerPoolSelf.scheduleWaitCh:
384+
case <-scheduleWaitCh:
347385
continue
348386
case <-time.After(deadline.Sub(time.Now())):
349387
return ErrWorkerPoolScheduleTimeout

worker/pool_test.go

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,42 @@ func TestWorkerPool(t *testing.T) {
5454
}
5555
time.Sleep(1 * time.Millisecond)
5656
// BatchSize: 3, Jobs: 4 -> ceil(4/3) = 2 workers
57-
// assert.Equal(t, 2, defaultWorkerPool.workerCount)
57+
assert.GreaterOrEqual(t, defaultWorkerPool.workerCount, 2)
5858
defaultWorkerPool.SetWorkerSizeStandBy(0)
5959
time.Sleep(10 * time.Millisecond)
6060
// workerSizeStandBy: 1
6161
assert.Equal(t, 0, defaultWorkerPool.workerCount)
6262
}
63+
64+
func TestScheduleWithTimeout(t *testing.T) {
65+
var workerPool WorkerPool
66+
var err error
67+
defaultWorkerPool := NewDefaultWorkerPool(fpgo.NewBufferedChannelQueue[func()](3, 1, 3), nil).
68+
SetSpawnWorkerDuration(1 * time.Millisecond / 10).
69+
SetWorkerExpiryDuration(2 * time.Millisecond).
70+
SetWorkerSizeMaximum(3).
71+
SetWorkerSizeStandBy(3).
72+
SetWorkerBatchSize(0)
73+
// defaultWorkerPool.PreAllocWorkerSize(5)
74+
workerPool = defaultWorkerPool
75+
76+
// Test ScheduleWithTimeout
77+
// channel: 3 positions, buffered 1 => 4 positions
78+
for i := 0; i < 4; i++ {
79+
v := i
80+
err = workerPool.ScheduleWithTimeout(func() {
81+
// Nothing to do
82+
time.Sleep(3 * time.Millisecond)
83+
t.Log(v)
84+
}, 1*time.Millisecond)
85+
assert.NoError(t, err)
86+
}
87+
err = workerPool.Schedule(func() {})
88+
assert.Equal(t, ErrWorkerPoolJobQueueIsFull, err)
89+
err = workerPool.ScheduleWithTimeout(func() {}, 1*time.Millisecond)
90+
assert.Equal(t, ErrWorkerPoolScheduleTimeout, err)
91+
92+
// defaultWorkerPool.SetWorkerSizeMaximum(3)
93+
err = workerPool.ScheduleWithTimeout(func() {}, 10*time.Millisecond)
94+
assert.Equal(t, nil, err)
95+
}

0 commit comments

Comments
 (0)