Skip to content

Commit e1772bd

Browse files
committed
Remove non-used freeWorker mechanisms(Just let them Expired and no need to take care about them)
1 parent 7edcfbe commit e1772bd

File tree

2 files changed

+6
-71
lines changed

2 files changed

+6
-71
lines changed

worker/pool.go

Lines changed: 6 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package worker
22

33
import (
44
"errors"
5-
"fmt"
65
"log"
76
"runtime"
87
"sync"
@@ -55,10 +54,8 @@ type DefaultWorkerPool struct {
5554
jobQueue *fpgo.BufferedChannelQueue[func()]
5655

5756
workerCount int
58-
workerRecordMap map[time.Time]*workerRecord
59-
workerRecordPool sync.Pool
6057
spawnWorkerCh fpgo.ChannelQueue[int]
61-
scaleDownWorkerCh fpgo.ChannelQueue[int]
58+
lastAccessTime time.Time
6259

6360
// Settings
6461

@@ -72,7 +69,6 @@ type DefaultWorkerPool struct {
7269
workerSizeStandBy int
7370
workerSizeMaximum int
7471
spawnWorkerDuration time.Duration
75-
freeWorkerDuration time.Duration
7672
workerExpiryDuration time.Duration
7773

7874
// Panic Handler
@@ -85,26 +81,18 @@ func NewDefaultWorkerPool(jobQueue *fpgo.BufferedChannelQueue[func()]) *DefaultW
8581
workerPool := &DefaultWorkerPool{
8682
jobQueue: jobQueue,
8783

88-
workerRecordMap: make(map[time.Time]*workerRecord, cap(jobQueue.GetChannel())),
89-
spawnWorkerCh: fpgo.NewChannelQueue[int](1),
84+
spawnWorkerCh: fpgo.NewChannelQueue[int](1),
9085

9186
// Settings
9287
isJobQueueClosedWhenClose: true,
9388
workerBatchSize: 5,
9489
workerSizeStandBy: 5,
9590
workerSizeMaximum: 1000,
9691
spawnWorkerDuration: 100 * time.Millisecond,
97-
freeWorkerDuration: 1000 * time.Millisecond,
9892
workerExpiryDuration: 5000 * time.Millisecond,
9993
panicHandler: defaultPanicHandler,
10094
}
101-
workerPool.workerRecordPool.New = func() interface{} {
102-
return &workerRecord{
103-
TerminatedCh: fpgo.NewChannelQueue[int](1),
104-
}
105-
}
10695
go workerPool.spawnLoop()
107-
go workerPool.scaleDownLoop()
10896

10997
return workerPool
11098
}
@@ -127,11 +115,10 @@ func (workerPoolSelf *DefaultWorkerPool) trySpawn() {
127115
expectedWorkerCount > workerPoolSelf.workerSizeMaximum {
128116
expectedWorkerCount = workerPoolSelf.workerSizeMaximum
129117
}
130-
workerCount := workerPoolSelf.workerCount
131118
workerPoolSelf.lock.RUnlock()
132119

133-
if workerCount < expectedWorkerCount {
134-
for i := workerCount; i < expectedWorkerCount; i++ {
120+
if workerPoolSelf.workerCount < expectedWorkerCount {
121+
for i := workerPoolSelf.workerCount; i < expectedWorkerCount; i++ {
135122
workerPoolSelf.generateWorker()
136123
}
137124
}
@@ -162,50 +149,17 @@ func (workerPoolSelf *DefaultWorkerPool) spawnLoop() {
162149
}
163150
}
164151

165-
func (workerPoolSelf *DefaultWorkerPool) scaleDownLoop() {
166-
defer func() {
167-
if panic := recover(); panic != nil {
168-
defaultPanicHandler(panic)
169-
}
170-
}()
171-
172-
for range workerPoolSelf.scaleDownWorkerCh {
173-
if workerPoolSelf.IsClosed() {
174-
break
175-
}
176-
177-
time.Sleep(workerPoolSelf.freeWorkerDuration)
178-
179-
workerPoolSelf.lock.RLock()
180-
fmt.Println("check")
181-
for _, workerRecord := range workerPoolSelf.workerRecordMap {
182-
fmt.Println("loop")
183-
if time.Now().Sub(workerRecord.LastAccessTime) > workerPoolSelf.workerExpiryDuration {
184-
fmt.Println("1")
185-
workerRecord.TerminatedCh.Offer(1)
186-
fmt.Println("1 done")
187-
}
188-
}
189-
workerPoolSelf.lock.RUnlock()
190-
}
191-
}
192-
193152
func (workerPoolSelf *DefaultWorkerPool) notifyWorkers() {
194153
if workerPoolSelf.workerCount < workerPoolSelf.workerSizeStandBy || workerPoolSelf.jobQueue.Count() > 0 {
195154
workerPoolSelf.spawnWorkerCh.Offer(1)
196155
}
197-
if workerPoolSelf.workerCount > workerPoolSelf.workerSizeStandBy {
198-
workerPoolSelf.scaleDownWorkerCh.Offer(1)
199-
}
200156
}
201157

202158
func (workerPoolSelf *DefaultWorkerPool) generateWorker() {
203159
// Initial
204160
workerID := time.Now()
205-
myRecord := workerPoolSelf.workerRecordPool.Get().(*workerRecord)
206-
myRecord.LastAccessTime = workerID
161+
workerPoolSelf.lastAccessTime = workerID
207162
workerPoolSelf.lock.Lock()
208-
workerPoolSelf.workerRecordMap[workerID] = myRecord
209163
workerPoolSelf.workerCount++
210164
workerPoolSelf.lock.Unlock()
211165

@@ -219,25 +173,15 @@ func (workerPoolSelf *DefaultWorkerPool) generateWorker() {
219173
}
220174

221175
workerPoolSelf.lock.Lock()
222-
workerPoolSelf.workerRecordMap[workerID] = nil
223176
workerPoolSelf.workerCount--
224-
delete(workerPoolSelf.workerRecordMap, workerID)
225-
// close(myRecord.TerminatedCh)
226-
for len(myRecord.TerminatedCh) > 0 {
227-
// Clear
228-
if _, err := myRecord.TerminatedCh.Poll(); err != nil {
229-
break
230-
}
231-
}
232177
workerPoolSelf.lock.Unlock()
233-
workerPoolSelf.workerRecordPool.Put(myRecord)
234178
// fmt.Println("Terminated")
235179
}()
236180

237181
// Do Jobs
238182
loopLabel:
239183
for {
240-
myRecord.LastAccessTime = time.Now()
184+
workerPoolSelf.lastAccessTime = time.Now()
241185

242186
select {
243187
case job := <-workerPoolSelf.jobQueue.GetChannel():
@@ -246,8 +190,6 @@ func (workerPoolSelf *DefaultWorkerPool) generateWorker() {
246190
job()
247191
// fmt.Println("DoJob")
248192
}
249-
case <-myRecord.TerminatedCh:
250-
break loopLabel
251193
case <-time.After(workerPoolSelf.workerExpiryDuration):
252194
workerPoolSelf.lock.RLock()
253195
workerCount := workerPoolSelf.workerCount
@@ -307,12 +249,6 @@ func (workerPoolSelf *DefaultWorkerPool) SetSpawnWorkerDuration(spawnWorkerDurat
307249
return workerPoolSelf
308250
}
309251

310-
// SetFreeWorkerDuration Set the freeWorkerDuration
311-
func (workerPoolSelf *DefaultWorkerPool) SetFreeWorkerDuration(freeWorkerDuration time.Duration) *DefaultWorkerPool {
312-
workerPoolSelf.freeWorkerDuration = freeWorkerDuration
313-
return workerPoolSelf
314-
}
315-
316252
// SetWorkerExpiryDuration Set the workerExpiryDuration
317253
func (workerPoolSelf *DefaultWorkerPool) SetWorkerExpiryDuration(workerExpiryDuration time.Duration) *DefaultWorkerPool {
318254
workerPoolSelf.workerExpiryDuration = workerExpiryDuration

worker/pool_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ func TestWorkerPool(t *testing.T) {
1515
var err error
1616
defaultWorkerPool := NewDefaultWorkerPool(fpgo.NewBufferedChannelQueue[func()](3, 10000, 100)).
1717
SetSpawnWorkerDuration(1 * time.Millisecond / 10).
18-
SetFreeWorkerDuration(1 * time.Millisecond / 10).
1918
SetWorkerExpiryDuration(2 * time.Millisecond).
2019
SetWorkerSizeMaximum(5).
2120
SetWorkerSizeStandBy(1).

0 commit comments

Comments
 (0)