Skip to content

Commit 082d125

Browse files
committed
wip
1 parent 14e3c01 commit 082d125

File tree

1 file changed

+60
-0
lines changed

1 file changed

+60
-0
lines changed
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package worker
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
)
8+
9+
type ConcurrencyAutoScaler struct {
10+
ctx context.Context
11+
cancel context.CancelFunc
12+
wg *sync.WaitGroup
13+
14+
concurrency ConcurrencyLimit
15+
tick time.Duration
16+
17+
PollerPermitLastUpdate time.Time
18+
}
19+
20+
type ConcurrencyAutoScalerOptions struct {
21+
Concurrency ConcurrencyLimit
22+
Tick time.Duration // frequency of auto tuning
23+
Cooldown time.Duration // cooldown time of update
24+
25+
}
26+
27+
func NewConcurrencyAutoScaler(options ConcurrencyAutoScalerOptions) *ConcurrencyAutoScaler {
28+
ctx, cancel := context.WithCancel(context.Background())
29+
30+
return &ConcurrencyAutoScaler{
31+
ctx: ctx,
32+
cancel: cancel,
33+
wg: &sync.WaitGroup{},
34+
concurrency: options.Concurrency,
35+
tick: options.Tick,
36+
}
37+
}
38+
39+
func (c *ConcurrencyAutoScaler) Start() {
40+
c.wg.Add(1)
41+
go func () {
42+
defer c.wg.Done()
43+
for {
44+
select {
45+
case <-c.ctx.Done():
46+
case <-time.Tick(c.tick):
47+
48+
}
49+
}
50+
}()
51+
}
52+
53+
func (c *ConcurrencyAutoScaler) updatePollerPermit() {
54+
c.PollerPermitLastUpdate = time.Now()
55+
}
56+
57+
func (c *ConcurrencyAutoScaler) Stop() {
58+
c.cancel()
59+
c.wg.Wait()
60+
}

0 commit comments

Comments
 (0)