Skip to content

Commit c5f2d53

Browse files
committed
add PermitChannel
1 parent 082d125 commit c5f2d53

File tree

6 files changed

+89
-143
lines changed

6 files changed

+89
-143
lines changed

internal/internal_worker_base.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,12 +170,11 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t
170170

171171
concurrency := &worker.ConcurrencyLimit{
172172
PollerPermit: worker.NewResizablePermit(options.pollerCount),
173-
TaskPermit: worker.NewChannelPermit(options.maxConcurrentTask),
173+
TaskPermit: worker.NewResizablePermit(options.maxConcurrentTask),
174174
}
175175

176176
var pollerAS *pollerAutoScaler
177177
if pollerOptions := options.pollerAutoScaler; pollerOptions.Enabled {
178-
concurrency.PollerPermit = worker.NewResizablePermit(pollerOptions.InitCount)
179178
pollerAS = newPollerScaler(
180179
pollerOptions,
181180
logger,
@@ -249,10 +248,14 @@ func (bw *baseWorker) runPoller() {
249248
bw.metricsScope.Counter(metrics.PollerStartCounter).Inc(1)
250249

251250
for {
251+
// permitChannel can be blocking without passing context because shutdownCh is used
252+
permitChannel := bw.concurrency.PollerPermit.AcquireChan(context.Background())
252253
select {
253254
case <-bw.shutdownCh:
255+
permitChannel.Close()
254256
return
255-
case <-bw.concurrency.TaskPermit.GetChan(): // don't poll unless there is a task permit
257+
case <-permitChannel.C(): // don't poll unless there is a task permit
258+
permitChannel.Close()
256259
// TODO move to a centralized place inside the worker
257260
// emit metrics on concurrent task permit quota and current task permit count
258261
// NOTE task permit doesn't mean there is a task running, it still needs to poll until it gets a task to process

internal/worker/channel_permit.go

Lines changed: 0 additions & 75 deletions
This file was deleted.

internal/worker/concurrency.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,26 @@ package worker
2323
import "context"
2424

2525
var _ Permit = (*resizablePermit)(nil)
26-
var _ ChannelPermit = (*channelPermit)(nil)
2726

2827
// ConcurrencyLimit contains synchronization primitives for dynamically controlling the concurrencies in workers
2928
type ConcurrencyLimit struct {
30-
PollerPermit Permit // controls concurrency of pollers
31-
TaskPermit ChannelPermit // controls concurrency of task processing
29+
PollerPermit Permit // controls concurrency of pollers
30+
TaskPermit Permit // controls concurrency of task processing
3231
}
3332

3433
// Permit is an adaptive permit issuer to control concurrency
3534
type Permit interface {
3635
Acquire(context.Context) error
36+
AcquireChan(context.Context) PermitChannel
3737
Count() int
3838
Quota() int
3939
Release()
4040
SetQuota(int)
4141
}
42+
43+
// PermitChannel is a channel that can be used to wait for a permit to be available
44+
// Remember to call Close() to avoid goroutine leak
45+
type PermitChannel interface {
46+
C() <-chan struct{}
47+
Close()
48+
}

internal/worker/concurrency_auto_scaler.go

Lines changed: 0 additions & 60 deletions
This file was deleted.

internal/worker/resizable_permit.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ package worker
2323
import (
2424
"context"
2525
"fmt"
26+
"sync"
2627

2728
"github.com/marusama/semaphore/v2"
2829
)
@@ -45,14 +46,17 @@ func (p *resizablePermit) Acquire(ctx context.Context) error {
4546
return nil
4647
}
4748

49+
// Release release one permit
4850
func (p *resizablePermit) Release() {
4951
p.sem.Release(1)
5052
}
5153

54+
// Quota returns the maximum number of permits that can be acquired
5255
func (p *resizablePermit) Quota() int {
5356
return p.sem.GetLimit()
5457
}
5558

59+
// SetQuota sets the maximum number of permits that can be acquired
5660
func (p *resizablePermit) SetQuota(c int) {
5761
p.sem.SetLimit(c)
5862
}
@@ -61,3 +65,53 @@ func (p *resizablePermit) SetQuota(c int) {
6165
func (p *resizablePermit) Count() int {
6266
return p.sem.GetCount()
6367
}
68+
69+
// AcquireChan creates a PermitChannel that can be used to wait for a permit
70+
// After usage:
71+
// 1. avoid goroutine leak by calling permitChannel.Close()
72+
// 2. release permit by calling permit.Release()
73+
func (p *resizablePermit) AcquireChan(ctx context.Context) PermitChannel {
74+
ctx, cancel := context.WithCancel(ctx)
75+
pc := &permitChannel{
76+
p: p,
77+
c: make(chan struct{}),
78+
ctx: ctx,
79+
cancel: cancel,
80+
wg: &sync.WaitGroup{},
81+
}
82+
pc.start()
83+
return pc
84+
}
85+
86+
type permitChannel struct {
87+
p Permit
88+
c chan struct{}
89+
ctx context.Context
90+
cancel context.CancelFunc
91+
wg *sync.WaitGroup
92+
}
93+
94+
func (ch *permitChannel) C() <-chan struct{} {
95+
return ch.c
96+
}
97+
98+
func (ch *permitChannel) start() {
99+
ch.wg.Add(1)
100+
go func() {
101+
defer ch.wg.Done()
102+
if err := ch.p.Acquire(ch.ctx); err != nil {
103+
return
104+
}
105+
// avoid blocking on sending to the channel
106+
select {
107+
case ch.c <- struct{}{}:
108+
case <-ch.ctx.Done(): // release if acquire is successful but fail to send to the channel
109+
ch.p.Release()
110+
}
111+
}()
112+
}
113+
114+
func (ch *permitChannel) Close() {
115+
ch.cancel()
116+
ch.wg.Wait()
117+
}

internal/worker/resizable_permit_test.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,9 @@ func TestPermit_Simulation(t *testing.T) {
9999
failures := atomic.NewInt32(0)
100100
ctx, cancel := context.WithTimeout(context.Background(), tt.maxTestDuration)
101101
defer cancel()
102-
for i := 0; i < tt.goroutines; i++ {
102+
103+
aquireChan := tt.goroutines / 2
104+
for i := 0; i < tt.goroutines-aquireChan; i++ {
103105
wg.Add(1)
104106
go func() {
105107
defer wg.Done()
@@ -111,10 +113,25 @@ func TestPermit_Simulation(t *testing.T) {
111113
permit.Release()
112114
}()
113115
}
116+
for i := 0; i < aquireChan; i++ {
117+
wg.Add(1)
118+
go func() {
119+
defer wg.Done()
120+
permitChan := permit.AcquireChan(ctx)
121+
select {
122+
case <-permitChan.C():
123+
time.Sleep(time.Duration(100+rand.Intn(50)) * time.Millisecond)
124+
permit.Release()
125+
case <-ctx.Done():
126+
failures.Inc()
127+
}
128+
permitChan.Close()
129+
}()
130+
}
114131

115132
wg.Wait()
116133
// sanity check
117-
assert.Equal(t, 0, permit.Count())
134+
assert.Equal(t, 0, permit.Count(), "all permit should be released")
118135
assert.Equal(t, tt.capacity[len(tt.capacity)-1], permit.Quota())
119136

120137
// expect failures in range

0 commit comments

Comments
 (0)