Skip to content

Commit c2b22c3

Browse files
committed
Implement workerJamDuration
1 parent 0a31ba6 commit c2b22c3

File tree

1 file changed

+21
-7
lines changed

1 file changed

+21
-7
lines changed

worker/pool.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type DefaultWorkerPoolSettings struct {
4545
workerSizeMaximum int
4646
spawnWorkerDuration time.Duration
4747
workerExpiryDuration time.Duration
48+
workerJamDuration time.Duration
4849

4950
// Panic Handler
5051

@@ -64,6 +65,7 @@ var defaultDefaultWorkerSettings = &DefaultWorkerPoolSettings{
6465
workerSizeMaximum: 1000,
6566
spawnWorkerDuration: 100 * time.Millisecond,
6667
workerExpiryDuration: 5000 * time.Millisecond,
68+
workerJamDuration: 1000 * time.Millisecond,
6769
panicHandler: defaultPanicHandler,
6870
}
6971

@@ -74,9 +76,9 @@ type DefaultWorkerPool struct {
7476

7577
jobQueue *fpgo.BufferedChannelQueue[func()]
7678

77-
workerCount int
78-
spawnWorkerCh fpgo.ChannelQueue[int]
79-
lastAccessTime time.Time
79+
workerCount int
80+
spawnWorkerCh fpgo.ChannelQueue[int]
81+
lastAliveTime time.Time
8082

8183
// Settings
8284
DefaultWorkerPoolSettings
@@ -118,6 +120,11 @@ func (workerPoolSelf *DefaultWorkerPool) trySpawn() {
118120
expectedWorkerCount > workerPoolSelf.workerSizeMaximum {
119121
expectedWorkerCount = workerPoolSelf.workerSizeMaximum
120122
}
123+
// Avoid Jam if (now - lastAliveTime) is over workerJamDuration
124+
if time.Now().Sub(workerPoolSelf.lastAliveTime) > workerPoolSelf.workerJamDuration &&
125+
workerPoolSelf.workerCount >= expectedWorkerCount {
126+
expectedWorkerCount++
127+
}
121128
workerPoolSelf.lock.RUnlock()
122129

123130
if workerPoolSelf.workerCount < expectedWorkerCount {
@@ -167,7 +174,7 @@ func (workerPoolSelf *DefaultWorkerPool) generateWorkerWithMaximum(maximum int)
167174
return
168175
}
169176
// workerID := time.Now()
170-
workerPoolSelf.lastAccessTime = time.Now()
177+
workerPoolSelf.lastAliveTime = time.Now()
171178
workerPoolSelf.workerCount++
172179

173180
go func() {
@@ -188,7 +195,7 @@ func (workerPoolSelf *DefaultWorkerPool) generateWorkerWithMaximum(maximum int)
188195
// Do Jobs
189196
loopLabel:
190197
for {
191-
workerPoolSelf.lastAccessTime = time.Now()
198+
workerPoolSelf.lastAliveTime = time.Now()
192199

193200
select {
194201
case job := <-workerPoolSelf.jobQueue.GetChannel():
@@ -250,19 +257,26 @@ func (workerPoolSelf *DefaultWorkerPool) SetWorkerSizeMaximum(workerSizeMaximum
250257
return workerPoolSelf
251258
}
252259

253-
// SetSpawnWorkerDuration Set the spawnWorkerDuration
260+
// SetSpawnWorkerDuration Set the spawnWorkerDuration(Checking repeating by the interval/duration)
254261
func (workerPoolSelf *DefaultWorkerPool) SetSpawnWorkerDuration(spawnWorkerDuration time.Duration) *DefaultWorkerPool {
255262
workerPoolSelf.spawnWorkerDuration = spawnWorkerDuration
256263
return workerPoolSelf
257264
}
258265

259-
// SetWorkerExpiryDuration Set the workerExpiryDuration
266+
// SetWorkerExpiryDuration The worker would be dead if the worker is idle without jobs over the duration
260267
func (workerPoolSelf *DefaultWorkerPool) SetWorkerExpiryDuration(workerExpiryDuration time.Duration) *DefaultWorkerPool {
261268
workerPoolSelf.workerExpiryDuration = workerExpiryDuration
262269
workerPoolSelf.notifyWorkers()
263270
return workerPoolSelf
264271
}
265272

273+
// SetWorkerJamDuration A new worker would be created if there's no available worker to do jobs over the duration
274+
func (workerPoolSelf *DefaultWorkerPool) SetWorkerJamDuration(workerJamDuration time.Duration) *DefaultWorkerPool {
275+
workerPoolSelf.workerJamDuration = workerJamDuration
276+
workerPoolSelf.notifyWorkers()
277+
return workerPoolSelf
278+
}
279+
266280
// SetDefaultWorkerPoolSettings Set the defaultWorkerPoolSettings
267281
func (workerPoolSelf *DefaultWorkerPool) SetDefaultWorkerPoolSettings(defaultWorkerPoolSettings DefaultWorkerPoolSettings) *DefaultWorkerPool {
268282
workerPoolSelf.DefaultWorkerPoolSettings = defaultWorkerPoolSettings

0 commit comments

Comments
 (0)