Skip to content

Commit b811ac4

Browse files
committed
add DynamicParams for worker
1 parent f8bfb87 commit b811ac4

File tree

3 files changed

+120
-27
lines changed

3 files changed

+120
-27
lines changed

internal/internal_poller_autoscaler.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ import (
2626
"sync"
2727
"time"
2828

29-
"github.com/marusama/semaphore/v2"
3029
"go.uber.org/atomic"
3130
"go.uber.org/zap"
3231

3332
"go.uber.org/cadence/internal/common/autoscaler"
33+
"go.uber.org/cadence/internal/worker"
3434
)
3535

3636
// defaultPollerScalerCooldownInSeconds
@@ -53,7 +53,7 @@ type (
5353
isDryRun bool
5454
cooldownTime time.Duration
5555
logger *zap.Logger
56-
sem semaphore.Semaphore // resizable semaphore to control number of concurrent pollers
56+
permit worker.Permit // resizable semaphore to control number of concurrent pollers
5757
ctx context.Context
5858
cancel context.CancelFunc
5959
wg *sync.WaitGroup // graceful stop
@@ -91,7 +91,7 @@ func newPollerScaler(
9191
isDryRun: options.DryRun,
9292
cooldownTime: options.Cooldown,
9393
logger: logger,
94-
sem: semaphore.New(options.InitCount),
94+
permit: worker.NewPermit(options.InitCount),
9595
wg: &sync.WaitGroup{},
9696
ctx: ctx,
9797
cancel: cancel,
@@ -109,17 +109,17 @@ func newPollerScaler(
109109

110110
// Acquire concurrent poll quota
111111
func (p *pollerAutoScaler) Acquire(resource autoscaler.ResourceUnit) error {
112-
return p.sem.Acquire(p.ctx, int(resource))
112+
return p.permit.Acquire(p.ctx, int(resource))
113113
}
114114

115115
// Release concurrent poll quota
116116
func (p *pollerAutoScaler) Release(resource autoscaler.ResourceUnit) {
117-
p.sem.Release(int(resource))
117+
p.permit.Release(int(resource))
118118
}
119119

120120
// GetCurrent poll quota
121121
func (p *pollerAutoScaler) GetCurrent() autoscaler.ResourceUnit {
122-
return autoscaler.ResourceUnit(p.sem.GetLimit())
122+
return autoscaler.ResourceUnit(p.permit.Quota())
123123
}
124124

125125
// Start an auto-scaler go routine and returns a done to stop it
@@ -133,7 +133,7 @@ func (p *pollerAutoScaler) Start() {
133133
case <-p.ctx.Done():
134134
return
135135
case <-time.After(p.cooldownTime):
136-
currentResource := autoscaler.ResourceUnit(p.sem.GetLimit())
136+
currentResource := autoscaler.ResourceUnit(p.permit.Quota())
137137
currentUsages, err := p.pollerUsageEstimator.Estimate()
138138
if err != nil {
139139
logger.Warnw("poller autoscaler skip due to estimator error", "error", err)
@@ -146,7 +146,7 @@ func (p *pollerAutoScaler) Start() {
146146
"recommend", uint64(proposedResource),
147147
"isDryRun", p.isDryRun)
148148
if !p.isDryRun {
149-
p.sem.SetLimit(int(proposedResource))
149+
p.permit.SetQuota(int(proposedResource))
150150
}
151151
p.pollerUsageEstimator.Reset()
152152

internal/internal_worker_base.go

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"time"
3434

3535
"go.uber.org/cadence/internal/common/debug"
36+
"go.uber.org/cadence/internal/worker"
3637

3738
"github.com/uber-go/tally"
3839
"go.uber.org/zap"
@@ -141,7 +142,7 @@ type (
141142
logger *zap.Logger
142143
metricsScope tally.Scope
143144

144-
pollerRequestCh chan struct{}
145+
dynamic *worker.DynamicParams
145146
pollerAutoScaler *pollerAutoScaler
146147
taskQueueCh chan interface{}
147148
sessionTokenBucket *sessionTokenBucket
@@ -176,13 +177,15 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t
176177
}
177178

178179
bw := &baseWorker{
179-
options: options,
180-
shutdownCh: make(chan struct{}),
181-
taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1),
182-
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
183-
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
184-
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
185-
pollerRequestCh: make(chan struct{}, options.maxConcurrentTask),
180+
options: options,
181+
shutdownCh: make(chan struct{}),
182+
taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1),
183+
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
184+
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
185+
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
186+
dynamic: &worker.DynamicParams{
187+
TaskPermit: worker.NewPermit(options.maxConcurrentTask),
188+
},
186189
pollerAutoScaler: pollerAS,
187190
taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.
188191
limiterContext: ctx,
@@ -244,11 +247,13 @@ func (bw *baseWorker) runPoller() {
244247
select {
245248
case <-bw.shutdownCh:
246249
return
247-
case <-bw.pollerRequestCh:
248-
bw.metricsScope.Gauge(metrics.ConcurrentTaskQuota).Update(float64(cap(bw.pollerRequestCh)))
249-
// This metric is used to monitor how many poll requests have been allocated
250-
// and can be used to approximate number of concurrent task running (not pinpoint accurate)
251-
bw.metricsScope.Gauge(metrics.PollerRequestBufferUsage).Update(float64(cap(bw.pollerRequestCh) - len(bw.pollerRequestCh)))
250+
case <-bw.dynamic.TaskPermit.AcquireChan(): // don't poll unless there is a task permit
251+
// TODO move to a centralized place inside the worker
252+
// emit metrics on concurrent task permit quota and current task permit count
253+
// NOTE task permit doesn't mean there is a task running, it still needs to poll until it gets a task to process
254+
// thus the metrics is only an estimated value of how many tasks are running concurrently
255+
bw.metricsScope.Gauge(metrics.ConcurrentTaskQuota).Update(float64(bw.dynamic.TaskPermit.Quota()))
256+
bw.metricsScope.Gauge(metrics.PollerRequestBufferUsage).Update(float64(bw.dynamic.TaskPermit.Count()))
252257
if bw.sessionTokenBucket != nil {
253258
bw.sessionTokenBucket.waitForAvailableToken()
254259
}
@@ -260,10 +265,6 @@ func (bw *baseWorker) runPoller() {
260265
func (bw *baseWorker) runTaskDispatcher() {
261266
defer bw.shutdownWG.Done()
262267

263-
for i := 0; i < bw.options.maxConcurrentTask; i++ {
264-
bw.pollerRequestCh <- struct{}{}
265-
}
266-
267268
for {
268269
// wait for new task or shutdown
269270
select {
@@ -333,7 +334,7 @@ func (bw *baseWorker) pollTask() {
333334
case <-bw.shutdownCh:
334335
}
335336
} else {
336-
bw.pollerRequestCh <- struct{}{} // poll failed, trigger a new poll
337+
bw.dynamic.TaskPermit.Release(1) // poll failed, trigger a new poll by returning a task permit
337338
}
338339
}
339340

@@ -368,7 +369,7 @@ func (bw *baseWorker) processTask(task interface{}) {
368369
}
369370

370371
if isPolledTask {
371-
bw.pollerRequestCh <- struct{}{}
372+
bw.dynamic.TaskPermit.Release(1) // task processed, trigger a new poll by returning a task permit
372373
}
373374
}()
374375
err := bw.options.taskWorker.ProcessTask(task)

internal/worker/dynamic_params.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Copyright (c) 2017-2021 Uber Technologies Inc.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a copy
4+
// of this software and associated documentation files (the "Software"), to deal
5+
// in the Software without restriction, including without limitation the rights
6+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
// copies of the Software, and to permit persons to whom the Software is
8+
// furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
// THE SOFTWARE.
20+
21+
package worker
22+
23+
import (
24+
"context"
25+
"fmt"
26+
27+
"github.com/marusama/semaphore/v2"
28+
)
29+
30+
var _ Permit = (*permit)(nil)
31+
32+
// Synchronization contains synchronization primitives for dynamic configuration.
33+
type DynamicParams struct {
34+
TaskPermit Permit
35+
}
36+
37+
// Permit is an adaptive
38+
type Permit interface {
39+
Acquire(context.Context, int) error
40+
AcquireChan() <-chan struct{}
41+
Quota() int
42+
SetQuota(int)
43+
Count() int
44+
Release(count int)
45+
}
46+
47+
type permit struct {
48+
sem semaphore.Semaphore
49+
}
50+
51+
// NewPermit creates a dynamic permit that's resizable
52+
func NewPermit(initCount int) Permit {
53+
return &permit{sem: semaphore.New(initCount)}
54+
}
55+
56+
// Acquire is blocking until a permit is acquired or returns error after context is done
57+
func (p *permit) Acquire(ctx context.Context, count int) error {
58+
if err := p.sem.Acquire(ctx, count); err != nil {
59+
return fmt.Errorf("failed to acquire permit before context is done: %w", err)
60+
}
61+
return nil
62+
}
63+
64+
// AcquireChan returns a permit ready channel. It's closed then permit is acquired
65+
func (p *permit) AcquireChan() <-chan struct{} {
66+
ch := make(chan struct{})
67+
go func() {
68+
if err := p.sem.Acquire(nil, 1); err != nil { // nil context indicates no need to exit on context done
69+
close(ch)
70+
return
71+
}
72+
ch <- struct{}{}
73+
close(ch)
74+
}()
75+
return ch
76+
}
77+
78+
func (p *permit) Release(count int) {
79+
p.sem.Release(count)
80+
}
81+
82+
func (p *permit) Quota() int {
83+
return p.sem.GetLimit()
84+
}
85+
86+
func (p *permit) SetQuota(c int) {
87+
p.sem.SetLimit(c)
88+
}
89+
90+
func (p *permit) Count() int {
91+
return p.sem.GetCount()
92+
}

0 commit comments

Comments
 (0)