Skip to content

Commit 34cfcac

Browse files
committed
Implement scheduleRetryInterval for workerPool.ScheduleWithTimeout()
1 parent 1e2100c commit 34cfcac

File tree

2 files changed

+68
-54
lines changed

2 files changed

+68
-54
lines changed

worker/pool.go

Lines changed: 64 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,12 @@ type DefaultWorkerPoolSettings struct {
3939

4040
// Worker
4141

42-
workerSizeStandBy int
43-
workerSizeMaximum int
44-
spawnWorkerDuration time.Duration
45-
workerExpiryDuration time.Duration
46-
workerJamDuration time.Duration
42+
workerSizeStandBy int
43+
workerSizeMaximum int
44+
spawnWorkerDuration time.Duration
45+
workerExpiryDuration time.Duration
46+
workerJamDuration time.Duration
47+
scheduleRetryInterval time.Duration
4748

4849
// Panic Handler
4950

@@ -64,6 +65,7 @@ var defaultDefaultWorkerSettings = &DefaultWorkerPoolSettings{
6465
spawnWorkerDuration: 100 * time.Millisecond,
6566
workerExpiryDuration: 5000 * time.Millisecond,
6667
workerJamDuration: 1000 * time.Millisecond,
68+
scheduleRetryInterval: 50 * time.Millisecond,
6769
panicHandler: defaultPanicHandler,
6870
}
6971

@@ -78,9 +80,9 @@ type DefaultWorkerPool struct {
7880
spawnWorkerCh fpgo.ChannelQueue[int]
7981
lastAliveTime time.Time
8082

81-
scheduleWaitQueue fpgo.Queue[fpgo.ChannelQueue[int]]
82-
scheduleWaitChPool sync.Pool
83-
scheduleWaitCount int
83+
// scheduleWaitQueue fpgo.Queue[fpgo.ChannelQueue[int]]
84+
// scheduleWaitChPool sync.Pool
85+
// scheduleWaitCount int
8486

8587
// Settings
8688
DefaultWorkerPoolSettings
@@ -94,15 +96,15 @@ func NewDefaultWorkerPool(jobQueue *fpgo.BufferedChannelQueue[func()], settings
9496
workerPool := &DefaultWorkerPool{
9597
jobQueue: jobQueue,
9698

97-
spawnWorkerCh: fpgo.NewChannelQueue[int](1),
98-
scheduleWaitQueue: fpgo.NewConcurrentQueue[fpgo.ChannelQueue[int]](fpgo.NewLinkedListQueue[fpgo.ChannelQueue[int]]()),
99+
spawnWorkerCh: fpgo.NewChannelQueue[int](1),
100+
// scheduleWaitQueue: fpgo.NewConcurrentQueue[fpgo.ChannelQueue[int]](fpgo.NewLinkedListQueue[fpgo.ChannelQueue[int]]()),
99101

100102
// Settings
101103
DefaultWorkerPoolSettings: *settings,
102104
}
103-
workerPool.scheduleWaitChPool.New = func() interface{} {
104-
return fpgo.NewChannelQueue[int](1)
105-
}
105+
// workerPool.scheduleWaitChPool.New = func() interface{} {
106+
// return fpgo.NewChannelQueue[int](1)
107+
// }
106108
go workerPool.spawnLoop()
107109

108110
return workerPool
@@ -202,15 +204,15 @@ func (workerPoolSelf *DefaultWorkerPool) generateWorkerWithMaximum(maximum int)
202204
loopLabel:
203205
for {
204206
workerPoolSelf.lastAliveTime = time.Now()
205-
workerPoolSelf.lock.RLock()
206-
if workerPoolSelf.scheduleWaitCount > 0 {
207-
ch, _ := workerPoolSelf.scheduleWaitQueue.Poll()
208-
if ch != nil {
209-
ch.Offer(1)
210-
defer workerPoolSelf.scheduleWaitChPool.Put(ch)
211-
}
212-
}
213-
workerPoolSelf.lock.RUnlock()
207+
// workerPoolSelf.lock.RLock()
208+
// if workerPoolSelf.scheduleWaitCount > 0 {
209+
// ch, _ := workerPoolSelf.scheduleWaitQueue.Poll()
210+
// if ch != nil {
211+
// ch.Offer(1)
212+
// defer workerPoolSelf.scheduleWaitChPool.Put(ch)
213+
// }
214+
// }
215+
// workerPoolSelf.lock.RUnlock()
214216

215217
if workerPoolSelf.IsClosed() {
216218
return
@@ -243,11 +245,11 @@ func (workerPoolSelf *DefaultWorkerPool) SetJobQueue(jobQueue *fpgo.BufferedChan
243245
return workerPoolSelf
244246
}
245247

246-
// SetScheduleWaitQueue Set the ScheduleWaitQueue(WARNING: if the pool has started to use, doing this is not safe)
247-
func (workerPoolSelf *DefaultWorkerPool) SetScheduleWaitQueue(scheduleWaitQueue fpgo.Queue[fpgo.ChannelQueue[int]]) *DefaultWorkerPool {
248-
workerPoolSelf.scheduleWaitQueue = scheduleWaitQueue
249-
return workerPoolSelf
250-
}
248+
// // SetScheduleWaitQueue Set the ScheduleWaitQueue(WARNING: if the pool has started to use, doing this is not safe)
249+
// func (workerPoolSelf *DefaultWorkerPool) SetScheduleWaitQueue(scheduleWaitQueue fpgo.Queue[fpgo.ChannelQueue[int]]) *DefaultWorkerPool {
250+
// workerPoolSelf.scheduleWaitQueue = scheduleWaitQueue
251+
// return workerPoolSelf
252+
// }
251253

252254
// SetIsJobQueueClosedWhenClose Set is the JobQueue closed when the WorkerPool.Close()
253255
func (workerPoolSelf *DefaultWorkerPool) SetIsJobQueueClosedWhenClose(isJobQueueClosedWhenClose bool) *DefaultWorkerPool {
@@ -302,6 +304,13 @@ func (workerPoolSelf *DefaultWorkerPool) SetWorkerJamDuration(workerJamDuration
302304
return workerPoolSelf
303305
}
304306

307+
// SetScheduleRetryInterval Retry interval for ScheduleWithTimeout
308+
func (workerPoolSelf *DefaultWorkerPool) SetScheduleRetryInterval(scheduleRetryInterval time.Duration) *DefaultWorkerPool {
309+
workerPoolSelf.scheduleRetryInterval = scheduleRetryInterval
310+
workerPoolSelf.notifyWorkers()
311+
return workerPoolSelf
312+
}
313+
305314
// SetDefaultWorkerPoolSettings Set the defaultWorkerPoolSettings
306315
func (workerPoolSelf *DefaultWorkerPool) SetDefaultWorkerPoolSettings(defaultWorkerPoolSettings DefaultWorkerPoolSettings) *DefaultWorkerPool {
307316
workerPoolSelf.DefaultWorkerPoolSettings = defaultWorkerPoolSettings
@@ -352,16 +361,21 @@ func (workerPoolSelf *DefaultWorkerPool) ScheduleWithTimeout(fn func(), timeout
352361
// retry = 1
353362
// }
354363
// retryInterval := timeout / time.Duration(retry+1)
355-
deadline := time.Now().Add(timeout)
356-
workerPoolSelf.addScheduleWaitCount(1)
357-
defer workerPoolSelf.addScheduleWaitCount(-1)
358-
359-
scheduleWaitCh := workerPoolSelf.scheduleWaitChPool.Get().(fpgo.ChannelQueue[int])
360-
defer workerPoolSelf.scheduleWaitChPool.Put(scheduleWaitCh)
361-
err = workerPoolSelf.scheduleWaitQueue.Offer(scheduleWaitCh)
362-
if err != nil {
363-
return err
364+
retryInterval := workerPoolSelf.scheduleRetryInterval
365+
if retryInterval > timeout/3 {
366+
// retryInterval = timeout * 95 / 100 / 3
367+
retryInterval = timeout/3
364368
}
369+
deadline := time.Now().Add(timeout)
370+
// workerPoolSelf.addScheduleWaitCount(1)
371+
// defer workerPoolSelf.addScheduleWaitCount(-1)
372+
373+
// scheduleWaitCh := workerPoolSelf.scheduleWaitChPool.Get().(fpgo.ChannelQueue[int])
374+
// defer workerPoolSelf.scheduleWaitChPool.Put(scheduleWaitCh)
375+
// err = workerPoolSelf.scheduleWaitQueue.Offer(scheduleWaitCh)
376+
// if err != nil {
377+
// return err
378+
// }
365379

366380
for {
367381
if workerPoolSelf.IsClosed() {
@@ -375,26 +389,26 @@ func (workerPoolSelf *DefaultWorkerPool) ScheduleWithTimeout(fn func(), timeout
375389

376390
// fmt.Println(retryInterval)
377391
// fmt.Println(deadline.Sub(now))
378-
// if time.Now().After(deadline) {
379-
// return ErrWorkerPoolScheduleTimeout
380-
// }
381-
// time.Sleep(retryInterval)
382-
383-
select {
384-
case <-scheduleWaitCh:
385-
continue
386-
case <-time.After(deadline.Sub(time.Now())):
392+
if time.Now().After(deadline) {
387393
return ErrWorkerPoolScheduleTimeout
388394
}
395+
time.Sleep(retryInterval)
396+
397+
// select {
398+
// case <-scheduleWaitCh:
399+
// continue
400+
// case <-time.After(deadline.Sub(time.Now())):
401+
// return ErrWorkerPoolScheduleTimeout
402+
// }
389403
}
390404
return err
391405
}
392406

393-
func (workerPoolSelf *DefaultWorkerPool) addScheduleWaitCount(amount int) {
394-
workerPoolSelf.lock.Lock()
395-
workerPoolSelf.scheduleWaitCount += amount
396-
workerPoolSelf.lock.Unlock()
397-
}
407+
// func (workerPoolSelf *DefaultWorkerPool) addScheduleWaitCount(amount int) {
408+
// workerPoolSelf.lock.Lock()
409+
// workerPoolSelf.scheduleWaitCount += amount
410+
// workerPoolSelf.lock.Unlock()
411+
// }
398412

399413
// Invokable
400414

worker/pool_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ func TestScheduleWithTimeout(t *testing.T) {
6767
defaultWorkerPool := NewDefaultWorkerPool(fpgo.NewBufferedChannelQueue[func()](3, 1, 3), nil).
6868
SetSpawnWorkerDuration(1 * time.Millisecond / 10).
6969
SetWorkerExpiryDuration(2 * time.Millisecond).
70-
SetWorkerSizeMaximum(3).
71-
SetWorkerSizeStandBy(3).
70+
SetWorkerSizeMaximum(0).
71+
SetWorkerSizeStandBy(0).
7272
SetWorkerBatchSize(0)
7373
// defaultWorkerPool.PreAllocWorkerSize(5)
7474
workerPool = defaultWorkerPool
@@ -86,10 +86,10 @@ func TestScheduleWithTimeout(t *testing.T) {
8686
}
8787
err = workerPool.Schedule(func() {})
8888
assert.Equal(t, ErrWorkerPoolJobQueueIsFull, err)
89-
err = workerPool.ScheduleWithTimeout(func() {}, 1*time.Millisecond)
89+
err = workerPool.ScheduleWithTimeout(func() {}, 1*time.Millisecond/2)
9090
assert.Equal(t, ErrWorkerPoolScheduleTimeout, err)
9191

92-
// defaultWorkerPool.SetWorkerSizeMaximum(3)
92+
defaultWorkerPool.SetWorkerSizeMaximum(3)
9393
err = workerPool.ScheduleWithTimeout(func() {}, 10*time.Millisecond)
9494
assert.Equal(t, nil, err)
9595
}

0 commit comments

Comments
 (0)