Skip to content

Commit 14e3c01

Browse files
committed
add concurrencylimit entity to worker
1 parent 641e4a7 commit 14e3c01

File tree

8 files changed

+340
-50
lines changed

8 files changed

+340
-50
lines changed

internal/common/autoscaler/autoscaler.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,6 @@ package autoscaler
2424
type (
2525
AutoScaler interface {
2626
Estimator
27-
// Acquire X ResourceUnit of resource
28-
Acquire(ResourceUnit) error
29-
// Release X ResourceUnit of resource
30-
Release(ResourceUnit)
31-
// GetCurrent ResourceUnit of resource
32-
GetCurrent() ResourceUnit
3327
// Start starts the autoscaler go routine that scales the ResourceUnit according to Estimator
3428
Start()
3529
// Stop stops the autoscaler if started or do nothing if not yet started

internal/internal_poller_autoscaler.go

Lines changed: 6 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ import (
2626
"sync"
2727
"time"
2828

29-
"github.com/marusama/semaphore/v2"
3029
"go.uber.org/atomic"
3130
"go.uber.org/zap"
3231

3332
"go.uber.org/cadence/internal/common/autoscaler"
33+
"go.uber.org/cadence/internal/worker"
3434
)
3535

3636
// defaultPollerScalerCooldownInSeconds
@@ -53,7 +53,7 @@ type (
5353
isDryRun bool
5454
cooldownTime time.Duration
5555
logger *zap.Logger
56-
sem semaphore.Semaphore // resizable semaphore to control number of concurrent pollers
56+
permit worker.Permit
5757
ctx context.Context
5858
cancel context.CancelFunc
5959
wg *sync.WaitGroup // graceful stop
@@ -82,6 +82,7 @@ type (
8282
func newPollerScaler(
8383
options pollerAutoScalerOptions,
8484
logger *zap.Logger,
85+
permit worker.Permit,
8586
hooks ...func()) *pollerAutoScaler {
8687
if !options.Enabled {
8788
return nil
@@ -91,7 +92,7 @@ func newPollerScaler(
9192
isDryRun: options.DryRun,
9293
cooldownTime: options.Cooldown,
9394
logger: logger,
94-
sem: semaphore.New(options.InitCount),
95+
permit: permit,
9596
wg: &sync.WaitGroup{},
9697
ctx: ctx,
9798
cancel: cancel,
@@ -107,21 +108,6 @@ func newPollerScaler(
107108
}
108109
}
109110

110-
// Acquire concurrent poll quota
111-
func (p *pollerAutoScaler) Acquire(resource autoscaler.ResourceUnit) error {
112-
return p.sem.Acquire(p.ctx, int(resource))
113-
}
114-
115-
// Release concurrent poll quota
116-
func (p *pollerAutoScaler) Release(resource autoscaler.ResourceUnit) {
117-
p.sem.Release(int(resource))
118-
}
119-
120-
// GetCurrent poll quota
121-
func (p *pollerAutoScaler) GetCurrent() autoscaler.ResourceUnit {
122-
return autoscaler.ResourceUnit(p.sem.GetLimit())
123-
}
124-
125111
// Start an auto-scaler go routine and returns a done to stop it
126112
func (p *pollerAutoScaler) Start() {
127113
logger := p.logger.Sugar()
@@ -133,7 +119,7 @@ func (p *pollerAutoScaler) Start() {
133119
case <-p.ctx.Done():
134120
return
135121
case <-time.After(p.cooldownTime):
136-
currentResource := autoscaler.ResourceUnit(p.sem.GetLimit())
122+
currentResource := autoscaler.ResourceUnit(p.permit.Quota())
137123
currentUsages, err := p.pollerUsageEstimator.Estimate()
138124
if err != nil {
139125
logger.Warnw("poller autoscaler skip due to estimator error", "error", err)
@@ -146,14 +132,9 @@ func (p *pollerAutoScaler) Start() {
146132
"recommend", uint64(proposedResource),
147133
"isDryRun", p.isDryRun)
148134
if !p.isDryRun {
149-
p.sem.SetLimit(int(proposedResource))
135+
p.permit.SetQuota(int(proposedResource))
150136
}
151137
p.pollerUsageEstimator.Reset()
152-
153-
// hooks
154-
for i := range p.onAutoScale {
155-
p.onAutoScale[i]()
156-
}
157138
}
158139
}
159140
}()

internal/internal_poller_autoscaler_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@
2121
package internal
2222

2323
import (
24+
"context"
2425
"math/rand"
2526
"sync"
2627
"testing"
2728
"time"
2829

2930
"go.uber.org/cadence/internal/common/testlogger"
31+
"go.uber.org/cadence/internal/worker"
3032

3133
"github.com/stretchr/testify/assert"
3234
"go.uber.org/atomic"
@@ -171,6 +173,7 @@ func Test_pollerAutoscaler(t *testing.T) {
171173
TargetUtilization: float64(tt.args.targetMilliUsage) / 1000,
172174
},
173175
testlogger.NewZap(t),
176+
worker.NewResizablePermit(tt.args.initialPollerCount),
174177
// hook function that collects number of iterations
175178
func() {
176179
autoscalerEpoch.Add(1)
@@ -190,9 +193,9 @@ func Test_pollerAutoscaler(t *testing.T) {
190193
go func() {
191194
defer wg.Done()
192195
for pollResult := range pollChan {
193-
pollerScaler.Acquire(1)
196+
pollerScaler.permit.Acquire(context.Background())
194197
pollerScaler.CollectUsage(pollResult)
195-
pollerScaler.Release(1)
198+
pollerScaler.permit.Release()
196199
}
197200
}()
198201
}
@@ -201,7 +204,7 @@ func Test_pollerAutoscaler(t *testing.T) {
201204
return autoscalerEpoch.Load() == uint64(tt.args.autoScalerEpoch)
202205
}, tt.args.cooldownTime+20*time.Millisecond, 10*time.Millisecond)
203206
pollerScaler.Stop()
204-
res := pollerScaler.GetCurrent()
207+
res := pollerScaler.permit.Quota() - pollerScaler.permit.Count()
205208
assert.Equal(t, tt.want, int(res))
206209
})
207210
}

internal/internal_worker_base.go

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"time"
3434

3535
"go.uber.org/cadence/internal/common/debug"
36+
"go.uber.org/cadence/internal/worker"
3637

3738
"github.com/uber-go/tally"
3839
"go.uber.org/zap"
@@ -141,7 +142,7 @@ type (
141142
logger *zap.Logger
142143
metricsScope tally.Scope
143144

144-
pollerRequestCh chan struct{}
145+
concurrency *worker.ConcurrencyLimit
145146
pollerAutoScaler *pollerAutoScaler
146147
taskQueueCh chan interface{}
147148
sessionTokenBucket *sessionTokenBucket
@@ -167,11 +168,18 @@ func createPollRetryPolicy() backoff.RetryPolicy {
167168
func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope tally.Scope, sessionTokenBucket *sessionTokenBucket) *baseWorker {
168169
ctx, cancel := context.WithCancel(context.Background())
169170

171+
concurrency := &worker.ConcurrencyLimit{
172+
PollerPermit: worker.NewResizablePermit(options.pollerCount),
173+
TaskPermit: worker.NewChannelPermit(options.maxConcurrentTask),
174+
}
175+
170176
var pollerAS *pollerAutoScaler
171177
if pollerOptions := options.pollerAutoScaler; pollerOptions.Enabled {
178+
concurrency.PollerPermit = worker.NewResizablePermit(pollerOptions.InitCount)
172179
pollerAS = newPollerScaler(
173180
pollerOptions,
174181
logger,
182+
concurrency.PollerPermit,
175183
)
176184
}
177185

@@ -182,7 +190,7 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t
182190
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
183191
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
184192
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
185-
pollerRequestCh: make(chan struct{}, options.maxConcurrentTask),
193+
concurrency: concurrency,
186194
pollerAutoScaler: pollerAS,
187195
taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.
188196
limiterContext: ctx,
@@ -244,11 +252,13 @@ func (bw *baseWorker) runPoller() {
244252
select {
245253
case <-bw.shutdownCh:
246254
return
247-
case <-bw.pollerRequestCh:
248-
bw.metricsScope.Gauge(metrics.ConcurrentTaskQuota).Update(float64(cap(bw.pollerRequestCh)))
249-
// This metric is used to monitor how many poll requests have been allocated
250-
// and can be used to approximate number of concurrent task running (not pinpoint accurate)
251-
bw.metricsScope.Gauge(metrics.PollerRequestBufferUsage).Update(float64(cap(bw.pollerRequestCh) - len(bw.pollerRequestCh)))
255+
case <-bw.concurrency.TaskPermit.GetChan(): // don't poll unless there is a task permit
256+
// TODO move to a centralized place inside the worker
257+
// emit metrics on concurrent task permit quota and current task permit count
258+
// NOTE task permit doesn't mean there is a task running, it still needs to poll until it gets a task to process
259+
// thus the metrics is only an estimated value of how many tasks are running concurrently
260+
bw.metricsScope.Gauge(metrics.ConcurrentTaskQuota).Update(float64(bw.concurrency.TaskPermit.Quota()))
261+
bw.metricsScope.Gauge(metrics.PollerRequestBufferUsage).Update(float64(bw.concurrency.TaskPermit.Count()))
252262
if bw.sessionTokenBucket != nil {
253263
bw.sessionTokenBucket.waitForAvailableToken()
254264
}
@@ -260,10 +270,6 @@ func (bw *baseWorker) runPoller() {
260270
func (bw *baseWorker) runTaskDispatcher() {
261271
defer bw.shutdownWG.Done()
262272

263-
for i := 0; i < bw.options.maxConcurrentTask; i++ {
264-
bw.pollerRequestCh <- struct{}{}
265-
}
266-
267273
for {
268274
// wait for new task or shutdown
269275
select {
@@ -294,10 +300,10 @@ func (bw *baseWorker) pollTask() {
294300
var task interface{}
295301

296302
if bw.pollerAutoScaler != nil {
297-
if pErr := bw.pollerAutoScaler.Acquire(1); pErr == nil {
298-
defer bw.pollerAutoScaler.Release(1)
303+
if pErr := bw.concurrency.PollerPermit.Acquire(bw.limiterContext); pErr == nil {
304+
defer bw.concurrency.PollerPermit.Release()
299305
} else {
300-
bw.logger.Warn("poller auto scaler acquire error", zap.Error(pErr))
306+
bw.logger.Warn("poller permit acquire error", zap.Error(pErr))
301307
}
302308
}
303309

@@ -333,7 +339,7 @@ func (bw *baseWorker) pollTask() {
333339
case <-bw.shutdownCh:
334340
}
335341
} else {
336-
bw.pollerRequestCh <- struct{}{} // poll failed, trigger a new poll
342+
bw.concurrency.TaskPermit.Release() // poll failed, trigger a new poll by returning a task permit
337343
}
338344
}
339345

@@ -368,7 +374,7 @@ func (bw *baseWorker) processTask(task interface{}) {
368374
}
369375

370376
if isPolledTask {
371-
bw.pollerRequestCh <- struct{}{}
377+
bw.concurrency.TaskPermit.Release() // task processed, trigger a new poll by returning a task permit
372378
}
373379
}()
374380
err := bw.options.taskWorker.ProcessTask(task)

internal/worker/channel_permit.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Copyright (c) 2017-2021 Uber Technologies Inc.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a copy
4+
// of this software and associated documentation files (the "Software"), to deal
5+
// in the Software without restriction, including without limitation the rights
6+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
// copies of the Software, and to permit persons to whom the Software is
8+
// furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
// THE SOFTWARE.
20+
21+
package worker
22+
23+
import (
24+
"context"
25+
"fmt"
26+
)
27+
28+
// ChannelPermit is a handy wrapper entity over a buffered channel
29+
type ChannelPermit interface {
30+
Acquire(context.Context) error
31+
Count() int
32+
Quota() int
33+
Release()
34+
GetChan() <-chan struct{} // fetch the underlying channel
35+
}
36+
37+
type channelPermit struct {
38+
channel chan struct{}
39+
}
40+
41+
// NewChannelPermit creates a static permit that's not resizable
42+
func NewChannelPermit(count int) ChannelPermit {
43+
channel := make(chan struct{}, count)
44+
for i := 0; i < count; i++ {
45+
channel <- struct{}{}
46+
}
47+
return &channelPermit{channel: channel}
48+
}
49+
50+
func (p *channelPermit) Acquire(ctx context.Context) error {
51+
select {
52+
case <-ctx.Done():
53+
return fmt.Errorf("failed to acquire permit before context is done")
54+
case p.channel <- struct{}{}:
55+
return nil
56+
}
57+
}
58+
59+
// AcquireChan returns a permit ready channel
60+
func (p *channelPermit) GetChan() <-chan struct{} {
61+
return p.channel
62+
}
63+
64+
func (p *channelPermit) Release() {
65+
p.channel <- struct{}{}
66+
}
67+
68+
// Count returns the number of permits available
69+
func (p *channelPermit) Count() int {
70+
return len(p.channel)
71+
}
72+
73+
func (p *channelPermit) Quota() int {
74+
return cap(p.channel)
75+
}

internal/worker/concurrency.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Copyright (c) 2017-2021 Uber Technologies Inc.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a copy
4+
// of this software and associated documentation files (the "Software"), to deal
5+
// in the Software without restriction, including without limitation the rights
6+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
// copies of the Software, and to permit persons to whom the Software is
8+
// furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
// THE SOFTWARE.
20+
21+
package worker
22+
23+
import "context"
24+
25+
var _ Permit = (*resizablePermit)(nil)
26+
var _ ChannelPermit = (*channelPermit)(nil)
27+
28+
// ConcurrencyLimit contains synchronization primitives for dynamically controlling the concurrencies in workers
29+
type ConcurrencyLimit struct {
30+
PollerPermit Permit // controls concurrency of pollers
31+
TaskPermit ChannelPermit // controls concurrency of task processing
32+
}
33+
34+
// Permit is an adaptive permit issuer to control concurrency
35+
type Permit interface {
36+
Acquire(context.Context) error
37+
Count() int
38+
Quota() int
39+
Release()
40+
SetQuota(int)
41+
}

0 commit comments

Comments
 (0)