Skip to content

Commit 9ffbb1f

Browse files
authored
add ConcurrencyLimit to worker to enable dynamic tuning of concurrencies (#1410)
What changed? [High Risk] replaced buffered channel with resizable semaphore to control task concurrency [Low Risk] added worker package for modularity added ConcurrencyLimit entity to worker removed unused methods of autoscaler interface Why? needed as first step to enable dynamic tuning of poller and task concurrencies How did you test it? Unit Test
1 parent 641e4a7 commit 9ffbb1f

File tree

7 files changed

+482
-46
lines changed

7 files changed

+482
-46
lines changed

internal/common/autoscaler/autoscaler.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,6 @@ package autoscaler
2424
type (
2525
AutoScaler interface {
2626
Estimator
27-
// Acquire X ResourceUnit of resource
28-
Acquire(ResourceUnit) error
29-
// Release X ResourceUnit of resource
30-
Release(ResourceUnit)
31-
// GetCurrent ResourceUnit of resource
32-
GetCurrent() ResourceUnit
3327
// Start starts the autoscaler go routine that scales the ResourceUnit according to Estimator
3428
Start()
3529
// Stop stops the autoscaler if started or do nothing if not yet started

internal/internal_poller_autoscaler.go

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ import (
2626
"sync"
2727
"time"
2828

29-
"github.com/marusama/semaphore/v2"
3029
"go.uber.org/atomic"
3130
"go.uber.org/zap"
3231

3332
"go.uber.org/cadence/internal/common/autoscaler"
33+
"go.uber.org/cadence/internal/worker"
3434
)
3535

3636
// defaultPollerScalerCooldownInSeconds
@@ -53,7 +53,7 @@ type (
5353
isDryRun bool
5454
cooldownTime time.Duration
5555
logger *zap.Logger
56-
sem semaphore.Semaphore // resizable semaphore to control number of concurrent pollers
56+
permit worker.Permit
5757
ctx context.Context
5858
cancel context.CancelFunc
5959
wg *sync.WaitGroup // graceful stop
@@ -82,6 +82,7 @@ type (
8282
func newPollerScaler(
8383
options pollerAutoScalerOptions,
8484
logger *zap.Logger,
85+
permit worker.Permit,
8586
hooks ...func()) *pollerAutoScaler {
8687
if !options.Enabled {
8788
return nil
@@ -91,7 +92,7 @@ func newPollerScaler(
9192
isDryRun: options.DryRun,
9293
cooldownTime: options.Cooldown,
9394
logger: logger,
94-
sem: semaphore.New(options.InitCount),
95+
permit: permit,
9596
wg: &sync.WaitGroup{},
9697
ctx: ctx,
9798
cancel: cancel,
@@ -107,21 +108,6 @@ func newPollerScaler(
107108
}
108109
}
109110

110-
// Acquire concurrent poll quota
111-
func (p *pollerAutoScaler) Acquire(resource autoscaler.ResourceUnit) error {
112-
return p.sem.Acquire(p.ctx, int(resource))
113-
}
114-
115-
// Release concurrent poll quota
116-
func (p *pollerAutoScaler) Release(resource autoscaler.ResourceUnit) {
117-
p.sem.Release(int(resource))
118-
}
119-
120-
// GetCurrent poll quota
121-
func (p *pollerAutoScaler) GetCurrent() autoscaler.ResourceUnit {
122-
return autoscaler.ResourceUnit(p.sem.GetLimit())
123-
}
124-
125111
// Start an auto-scaler go routine and returns a done to stop it
126112
func (p *pollerAutoScaler) Start() {
127113
logger := p.logger.Sugar()
@@ -133,7 +119,7 @@ func (p *pollerAutoScaler) Start() {
133119
case <-p.ctx.Done():
134120
return
135121
case <-time.After(p.cooldownTime):
136-
currentResource := autoscaler.ResourceUnit(p.sem.GetLimit())
122+
currentResource := autoscaler.ResourceUnit(p.permit.Quota())
137123
currentUsages, err := p.pollerUsageEstimator.Estimate()
138124
if err != nil {
139125
logger.Warnw("poller autoscaler skip due to estimator error", "error", err)
@@ -146,7 +132,7 @@ func (p *pollerAutoScaler) Start() {
146132
"recommend", uint64(proposedResource),
147133
"isDryRun", p.isDryRun)
148134
if !p.isDryRun {
149-
p.sem.SetLimit(int(proposedResource))
135+
p.permit.SetQuota(int(proposedResource))
150136
}
151137
p.pollerUsageEstimator.Reset()
152138

internal/internal_poller_autoscaler_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@
2121
package internal
2222

2323
import (
24+
"context"
2425
"math/rand"
2526
"sync"
2627
"testing"
2728
"time"
2829

2930
"go.uber.org/cadence/internal/common/testlogger"
31+
"go.uber.org/cadence/internal/worker"
3032

3133
"github.com/stretchr/testify/assert"
3234
"go.uber.org/atomic"
@@ -171,6 +173,7 @@ func Test_pollerAutoscaler(t *testing.T) {
171173
TargetUtilization: float64(tt.args.targetMilliUsage) / 1000,
172174
},
173175
testlogger.NewZap(t),
176+
worker.NewResizablePermit(tt.args.initialPollerCount),
174177
// hook function that collects number of iterations
175178
func() {
176179
autoscalerEpoch.Add(1)
@@ -190,18 +193,19 @@ func Test_pollerAutoscaler(t *testing.T) {
190193
go func() {
191194
defer wg.Done()
192195
for pollResult := range pollChan {
193-
pollerScaler.Acquire(1)
196+
err := pollerScaler.permit.Acquire(context.Background())
197+
assert.NoError(t, err)
194198
pollerScaler.CollectUsage(pollResult)
195-
pollerScaler.Release(1)
199+
pollerScaler.permit.Release()
196200
}
197201
}()
198202
}
199203

200204
assert.Eventually(t, func() bool {
201205
return autoscalerEpoch.Load() == uint64(tt.args.autoScalerEpoch)
202-
}, tt.args.cooldownTime+20*time.Millisecond, 10*time.Millisecond)
206+
}, tt.args.cooldownTime+100*time.Millisecond, 10*time.Millisecond)
203207
pollerScaler.Stop()
204-
res := pollerScaler.GetCurrent()
208+
res := pollerScaler.permit.Quota() - pollerScaler.permit.Count()
205209
assert.Equal(t, tt.want, int(res))
206210
})
207211
}

internal/internal_worker_base.go

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"time"
3434

3535
"go.uber.org/cadence/internal/common/debug"
36+
"go.uber.org/cadence/internal/worker"
3637

3738
"github.com/uber-go/tally"
3839
"go.uber.org/zap"
@@ -141,7 +142,7 @@ type (
141142
logger *zap.Logger
142143
metricsScope tally.Scope
143144

144-
pollerRequestCh chan struct{}
145+
concurrency *worker.ConcurrencyLimit
145146
pollerAutoScaler *pollerAutoScaler
146147
taskQueueCh chan interface{}
147148
sessionTokenBucket *sessionTokenBucket
@@ -167,11 +168,17 @@ func createPollRetryPolicy() backoff.RetryPolicy {
167168
func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope tally.Scope, sessionTokenBucket *sessionTokenBucket) *baseWorker {
168169
ctx, cancel := context.WithCancel(context.Background())
169170

171+
concurrency := &worker.ConcurrencyLimit{
172+
PollerPermit: worker.NewResizablePermit(options.pollerCount),
173+
TaskPermit: worker.NewResizablePermit(options.maxConcurrentTask),
174+
}
175+
170176
var pollerAS *pollerAutoScaler
171177
if pollerOptions := options.pollerAutoScaler; pollerOptions.Enabled {
172178
pollerAS = newPollerScaler(
173179
pollerOptions,
174180
logger,
181+
concurrency.PollerPermit,
175182
)
176183
}
177184

@@ -182,7 +189,7 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t
182189
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
183190
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
184191
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
185-
pollerRequestCh: make(chan struct{}, options.maxConcurrentTask),
192+
concurrency: concurrency,
186193
pollerAutoScaler: pollerAS,
187194
taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.
188195
limiterContext: ctx,
@@ -241,14 +248,19 @@ func (bw *baseWorker) runPoller() {
241248
bw.metricsScope.Counter(metrics.PollerStartCounter).Inc(1)
242249

243250
for {
251+
permitChannel, channelDone := bw.concurrency.TaskPermit.AcquireChan(bw.limiterContext)
244252
select {
245253
case <-bw.shutdownCh:
254+
channelDone()
246255
return
247-
case <-bw.pollerRequestCh:
248-
bw.metricsScope.Gauge(metrics.ConcurrentTaskQuota).Update(float64(cap(bw.pollerRequestCh)))
249-
// This metric is used to monitor how many poll requests have been allocated
250-
// and can be used to approximate number of concurrent task running (not pinpoint accurate)
251-
bw.metricsScope.Gauge(metrics.PollerRequestBufferUsage).Update(float64(cap(bw.pollerRequestCh) - len(bw.pollerRequestCh)))
256+
case <-permitChannel: // don't poll unless there is a task permit
257+
channelDone()
258+
// TODO move to a centralized place inside the worker
259+
// emit metrics on concurrent task permit quota and current task permit count
260+
// NOTE task permit doesn't mean there is a task running, it still needs to poll until it gets a task to process
261+
// thus the metrics is only an estimated value of how many tasks are running concurrently
262+
bw.metricsScope.Gauge(metrics.ConcurrentTaskQuota).Update(float64(bw.concurrency.TaskPermit.Quota()))
263+
bw.metricsScope.Gauge(metrics.PollerRequestBufferUsage).Update(float64(bw.concurrency.TaskPermit.Count()))
252264
if bw.sessionTokenBucket != nil {
253265
bw.sessionTokenBucket.waitForAvailableToken()
254266
}
@@ -260,10 +272,6 @@ func (bw *baseWorker) runPoller() {
260272
func (bw *baseWorker) runTaskDispatcher() {
261273
defer bw.shutdownWG.Done()
262274

263-
for i := 0; i < bw.options.maxConcurrentTask; i++ {
264-
bw.pollerRequestCh <- struct{}{}
265-
}
266-
267275
for {
268276
// wait for new task or shutdown
269277
select {
@@ -294,10 +302,10 @@ func (bw *baseWorker) pollTask() {
294302
var task interface{}
295303

296304
if bw.pollerAutoScaler != nil {
297-
if pErr := bw.pollerAutoScaler.Acquire(1); pErr == nil {
298-
defer bw.pollerAutoScaler.Release(1)
305+
if pErr := bw.concurrency.PollerPermit.Acquire(bw.limiterContext); pErr == nil {
306+
defer bw.concurrency.PollerPermit.Release()
299307
} else {
300-
bw.logger.Warn("poller auto scaler acquire error", zap.Error(pErr))
308+
bw.logger.Warn("poller permit acquire error", zap.Error(pErr))
301309
}
302310
}
303311

@@ -333,7 +341,7 @@ func (bw *baseWorker) pollTask() {
333341
case <-bw.shutdownCh:
334342
}
335343
} else {
336-
bw.pollerRequestCh <- struct{}{} // poll failed, trigger a new poll
344+
bw.concurrency.TaskPermit.Release() // poll failed, trigger a new poll by returning a task permit
337345
}
338346
}
339347

@@ -368,7 +376,7 @@ func (bw *baseWorker) processTask(task interface{}) {
368376
}
369377

370378
if isPolledTask {
371-
bw.pollerRequestCh <- struct{}{}
379+
bw.concurrency.TaskPermit.Release() // task processed, trigger a new poll by returning a task permit
372380
}
373381
}()
374382
err := bw.options.taskWorker.ProcessTask(task)

internal/worker/concurrency.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Copyright (c) 2017-2021 Uber Technologies Inc.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a copy
4+
// of this software and associated documentation files (the "Software"), to deal
5+
// in the Software without restriction, including without limitation the rights
6+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
// copies of the Software, and to permit persons to whom the Software is
8+
// furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
// THE SOFTWARE.
20+
21+
package worker
22+
23+
import "context"
24+
25+
var _ Permit = (*resizablePermit)(nil)
26+
27+
// ConcurrencyLimit contains synchronization primitives for dynamically controlling the concurrencies in workers
28+
type ConcurrencyLimit struct {
29+
PollerPermit Permit // controls concurrency of pollers
30+
TaskPermit Permit // controls concurrency of task processing
31+
}
32+
33+
// Permit is an adaptive permit issuer to control concurrency
34+
type Permit interface {
35+
Acquire(context.Context) error
36+
AcquireChan(context.Context) (channel <-chan struct{}, done func())
37+
Count() int
38+
Quota() int
39+
Release()
40+
SetQuota(int)
41+
}

0 commit comments

Comments
 (0)