@@ -80,10 +80,6 @@ type DefaultWorkerPool struct {
8080 spawnWorkerCh fpgo.ChannelQueue [int ]
8181 lastAliveTime time.Time
8282
83- // scheduleWaitQueue fpgo.Queue[fpgo.ChannelQueue[int]]
84- // scheduleWaitChPool sync.Pool
85- // scheduleWaitCount int
86-
8783 // Settings
8884 DefaultWorkerPoolSettings
8985}
@@ -97,14 +93,10 @@ func NewDefaultWorkerPool(jobQueue *fpgo.BufferedChannelQueue[func()], settings
9793 jobQueue : jobQueue ,
9894
9995 spawnWorkerCh : fpgo.NewChannelQueue [int ](1 ),
100- // scheduleWaitQueue: fpgo.NewConcurrentQueue[fpgo.ChannelQueue[int]](fpgo.NewLinkedListQueue[fpgo.ChannelQueue[int]]()),
10196
10297 // Settings
10398 DefaultWorkerPoolSettings : * settings ,
10499 }
105- // workerPool.scheduleWaitChPool.New = func() interface{} {
106- // return fpgo.NewChannelQueue[int](1)
107- // }
108100 go workerPool .spawnLoop ()
109101
110102 return workerPool
@@ -197,33 +189,21 @@ func (workerPoolSelf *DefaultWorkerPool) generateWorkerWithMaximum(maximum int)
197189 workerPoolSelf .lock .Lock ()
198190 workerPoolSelf .workerCount --
199191 workerPoolSelf .lock .Unlock ()
200- // fmt.Println("Terminated")
201192 }()
202193
203194 // Do Jobs
204195 loopLabel:
205196 for {
206197 workerPoolSelf .lastAliveTime = time .Now ()
207- // workerPoolSelf.lock.RLock()
208- // if workerPoolSelf.scheduleWaitCount > 0 {
209- // ch, _ := workerPoolSelf.scheduleWaitQueue.Poll()
210- // if ch != nil {
211- // ch.Offer(1)
212- // defer workerPoolSelf.scheduleWaitChPool.Put(ch)
213- // }
214- // }
215- // workerPoolSelf.lock.RUnlock()
216198
217199 if workerPoolSelf .IsClosed () {
218200 return
219201 }
220202
221203 select {
222204 case job := <- workerPoolSelf .jobQueue .GetChannel ():
223- // fmt.Println("GetJob")
224205 if job != nil {
225206 job ()
226- // fmt.Println("DoJob")
227207 }
228208 case <- time .After (workerPoolSelf .workerExpiryDuration ):
229209 workerPoolSelf .lock .RLock ()
@@ -245,12 +225,6 @@ func (workerPoolSelf *DefaultWorkerPool) SetJobQueue(jobQueue *fpgo.BufferedChan
245225 return workerPoolSelf
246226}
247227
248- // // SetScheduleWaitQueue Set the ScheduleWaitQueue(WARNING: if the pool has started to use, doing this is not safe)
249- // func (workerPoolSelf *DefaultWorkerPool) SetScheduleWaitQueue(scheduleWaitQueue fpgo.Queue[fpgo.ChannelQueue[int]]) *DefaultWorkerPool {
250- // workerPoolSelf.scheduleWaitQueue = scheduleWaitQueue
251- // return workerPoolSelf
252- // }
253-
254228// SetIsJobQueueClosedWhenClose Set is the JobQueue closed when the WorkerPool.Close()
255229func (workerPoolSelf * DefaultWorkerPool ) SetIsJobQueueClosedWhenClose (isJobQueueClosedWhenClose bool ) * DefaultWorkerPool {
256230 workerPoolSelf .isJobQueueClosedWhenClose = isJobQueueClosedWhenClose
@@ -357,25 +331,12 @@ func (workerPoolSelf *DefaultWorkerPool) ScheduleWithTimeout(fn func(), timeout
357331 return err
358332 }
359333
360- // if retry < 1 {
361- // retry = 1
362- // }
363- // retryInterval := timeout / time.Duration(retry+1)
364334 retryInterval := workerPoolSelf .scheduleRetryInterval
365335 if retryInterval > timeout / 3 {
366336 // retryInterval = timeout * 95 / 100 / 3
367- retryInterval = timeout / 3
337+ retryInterval = timeout / 3
368338 }
369339 deadline := time .Now ().Add (timeout )
370- // workerPoolSelf.addScheduleWaitCount(1)
371- // defer workerPoolSelf.addScheduleWaitCount(-1)
372-
373- // scheduleWaitCh := workerPoolSelf.scheduleWaitChPool.Get().(fpgo.ChannelQueue[int])
374- // defer workerPoolSelf.scheduleWaitChPool.Put(scheduleWaitCh)
375- // err = workerPoolSelf.scheduleWaitQueue.Offer(scheduleWaitCh)
376- // if err != nil {
377- // return err
378- // }
379340
380341 for {
381342 if workerPoolSelf .IsClosed () {
@@ -387,29 +348,15 @@ func (workerPoolSelf *DefaultWorkerPool) ScheduleWithTimeout(fn func(), timeout
387348 return err
388349 }
389350
390- // fmt.Println(retryInterval)
391- // fmt.Println(deadline.Sub(now))
392351 if time .Now ().After (deadline ) {
393352 return ErrWorkerPoolScheduleTimeout
394353 }
395354 time .Sleep (retryInterval )
396355
397- // select {
398- // case <-scheduleWaitCh:
399- // continue
400- // case <-time.After(deadline.Sub(time.Now())):
401- // return ErrWorkerPoolScheduleTimeout
402- // }
403356 }
404357 return err
405358}
406359
407- // func (workerPoolSelf *DefaultWorkerPool) addScheduleWaitCount(amount int) {
408- // workerPoolSelf.lock.Lock()
409- // workerPoolSelf.scheduleWaitCount += amount
410- // workerPoolSelf.lock.Unlock()
411- // }
412-
413360// Invokable
414361
415362// Invokable Invokable inspired by Java ExecutorService
0 commit comments