Skip to content

Commit 4ee4500

Browse files
committed
fix initialization
1 parent 9010a8c commit 4ee4500

File tree

4 files changed

+20
-11
lines changed

4 files changed

+20
-11
lines changed

internal/internal_poller_autoscaler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ type (
8282
func newPollerScaler(
8383
options pollerAutoScalerOptions,
8484
logger *zap.Logger,
85+
permit worker.Permit,
8586
hooks ...func()) *pollerAutoScaler {
8687
if !options.Enabled {
8788
return nil
@@ -91,7 +92,7 @@ func newPollerScaler(
9192
isDryRun: options.DryRun,
9293
cooldownTime: options.Cooldown,
9394
logger: logger,
94-
permit: worker.NewPermit(options.InitCount),
95+
permit: permit,
9596
wg: &sync.WaitGroup{},
9697
ctx: ctx,
9798
cancel: cancel,

internal/internal_poller_autoscaler_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"time"
2828

2929
"go.uber.org/cadence/internal/common/testlogger"
30+
"go.uber.org/cadence/internal/worker"
3031

3132
"github.com/stretchr/testify/assert"
3233
"go.uber.org/atomic"
@@ -171,6 +172,7 @@ func Test_pollerAutoscaler(t *testing.T) {
171172
TargetUtilization: float64(tt.args.targetMilliUsage) / 1000,
172173
},
173174
testlogger.NewZap(t),
175+
worker.NewPermit(tt.args.initialPollerCount),
174176
// hook function that collects number of iterations
175177
func() {
176178
autoscalerEpoch.Add(1)

internal/internal_worker_base.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -168,24 +168,29 @@ func createPollRetryPolicy() backoff.RetryPolicy {
168168
func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope tally.Scope, sessionTokenBucket *sessionTokenBucket) *baseWorker {
169169
ctx, cancel := context.WithCancel(context.Background())
170170

171+
dynamic := &worker.DynamicParams{
172+
PollerPermit: worker.NewPermit(options.pollerCount),
173+
TaskPermit: worker.NewPermit(options.maxConcurrentTask),
174+
}
175+
171176
var pollerAS *pollerAutoScaler
172177
if pollerOptions := options.pollerAutoScaler; pollerOptions.Enabled {
178+
dynamic.PollerPermit = worker.NewPermit(pollerOptions.InitCount)
173179
pollerAS = newPollerScaler(
174180
pollerOptions,
175181
logger,
182+
dynamic.PollerPermit,
176183
)
177184
}
178185

179186
bw := &baseWorker{
180-
options: options,
181-
shutdownCh: make(chan struct{}),
182-
taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1),
183-
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
184-
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
185-
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
186-
dynamic: &worker.DynamicParams{
187-
TaskPermit: worker.NewPermit(options.maxConcurrentTask),
188-
},
187+
options: options,
188+
shutdownCh: make(chan struct{}),
189+
taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1),
190+
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
191+
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
192+
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
193+
dynamic: dynamic,
189194
pollerAutoScaler: pollerAS,
190195
taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.
191196
limiterContext: ctx,

internal/worker/dynamic_params.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ var _ Permit = (*permit)(nil)
3131

3232
// Synchronization contains synchronization primitives for dynamic configuration.
3333
type DynamicParams struct {
34-
TaskPermit Permit
34+
PollerPermit Permit // controls concurrency of pollers
35+
TaskPermit Permit // controlls concurrency of task processings
3536
}
3637

3738
// Permit is an adaptive

0 commit comments

Comments
 (0)