Skip to content

Commit 0d11cc6

Browse files
committed
fix correct initial poller count for autoscaler
1 parent 40074ec commit 0d11cc6

File tree

2 files changed

+56
-52
lines changed

2 files changed

+56
-52
lines changed

internal/internal_worker.go

Lines changed: 29 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -276,21 +276,17 @@ func newWorkflowTaskWorkerInternal(
276276
domain,
277277
params,
278278
)
279-
pollerCount := params.MaxConcurrentDecisionTaskPollers
280-
if params.AutoScalerOptions.Enabled {
281-
pollerCount = params.AutoScalerOptions.PollerInitCount
282-
}
283279
worker := newBaseWorker(baseWorkerOptions{
284-
pollerAutoScaler: params.AutoScalerOptions,
285-
pollerCount: pollerCount,
286-
pollerRate: defaultPollerRate,
287-
maxConcurrentTask: params.MaxConcurrentDecisionTaskExecutionSize,
288-
maxTaskPerSecond: params.WorkerDecisionTasksPerSecond,
289-
taskWorker: poller,
290-
identity: params.Identity,
291-
workerType: "DecisionWorker",
292-
shutdownTimeout: params.WorkerStopTimeout,
293-
pollerTracker: params.WorkerStats.PollerTracker,
280+
pollerAutoScaler: params.AutoScalerOptions,
281+
pollerCountWithoutAutoScaling: params.MaxConcurrentDecisionTaskPollers,
282+
pollerRate: defaultPollerRate,
283+
maxConcurrentTask: params.MaxConcurrentDecisionTaskExecutionSize,
284+
maxTaskPerSecond: params.WorkerDecisionTasksPerSecond,
285+
taskWorker: poller,
286+
identity: params.Identity,
287+
workerType: "DecisionWorker",
288+
shutdownTimeout: params.WorkerStopTimeout,
289+
pollerTracker: params.WorkerStats.PollerTracker,
294290
},
295291
params.Logger,
296292
params.MetricsScope,
@@ -308,14 +304,14 @@ func newWorkflowTaskWorkerInternal(
308304
// 2) local activity task poller will poll from laTunnel, and result will be pushed to laTunnel
309305
localActivityTaskPoller := newLocalActivityPoller(params, laTunnel)
310306
localActivityWorker := newBaseWorker(baseWorkerOptions{
311-
pollerCount: 1, // 1 poller (from local channel) is enough for local activity
312-
maxConcurrentTask: params.MaxConcurrentLocalActivityExecutionSize,
313-
maxTaskPerSecond: params.WorkerLocalActivitiesPerSecond,
314-
taskWorker: localActivityTaskPoller,
315-
identity: params.Identity,
316-
workerType: "LocalActivityWorker",
317-
shutdownTimeout: params.WorkerStopTimeout,
318-
pollerTracker: params.WorkerStats.PollerTracker,
307+
pollerCountWithoutAutoScaling: 1, // 1 poller (from local channel) is enough for local activity
308+
maxConcurrentTask: params.MaxConcurrentLocalActivityExecutionSize,
309+
maxTaskPerSecond: params.WorkerLocalActivitiesPerSecond,
310+
taskWorker: localActivityTaskPoller,
311+
identity: params.Identity,
312+
workerType: "LocalActivityWorker",
313+
shutdownTimeout: params.WorkerStopTimeout,
314+
pollerTracker: params.WorkerStats.PollerTracker,
319315
},
320316
params.Logger,
321317
params.MetricsScope,
@@ -472,23 +468,19 @@ func newActivityTaskWorker(
472468
workerType string,
473469
) (worker *activityWorker) {
474470
ensureRequiredParams(&workerParams)
475-
pollerCount := workerParams.MaxConcurrentActivityTaskPollers
476-
if workerParams.AutoScalerOptions.Enabled {
477-
pollerCount = workerParams.AutoScalerOptions.PollerInitCount
478-
}
479471
base := newBaseWorker(
480472
baseWorkerOptions{
481-
pollerAutoScaler: workerParams.AutoScalerOptions,
482-
pollerCount: pollerCount,
483-
pollerRate: defaultPollerRate,
484-
maxConcurrentTask: workerParams.MaxConcurrentActivityExecutionSize,
485-
maxTaskPerSecond: workerParams.WorkerActivitiesPerSecond,
486-
taskWorker: poller,
487-
identity: workerParams.Identity,
488-
workerType: workerType,
489-
shutdownTimeout: workerParams.WorkerStopTimeout,
490-
userContextCancel: workerParams.UserContextCancel,
491-
pollerTracker: workerParams.WorkerStats.PollerTracker,
473+
pollerAutoScaler: workerParams.AutoScalerOptions,
474+
pollerCountWithoutAutoScaling: workerParams.MaxConcurrentActivityTaskPollers,
475+
pollerRate: defaultPollerRate,
476+
maxConcurrentTask: workerParams.MaxConcurrentActivityExecutionSize,
477+
maxTaskPerSecond: workerParams.WorkerActivitiesPerSecond,
478+
taskWorker: poller,
479+
identity: workerParams.Identity,
480+
workerType: workerType,
481+
shutdownTimeout: workerParams.WorkerStopTimeout,
482+
userContextCancel: workerParams.UserContextCancel,
483+
pollerTracker: workerParams.WorkerStats.PollerTracker,
492484
},
493485

494486
workerParams.Logger,

internal/internal_worker_base.go

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -115,18 +115,18 @@ type (
115115

116116
// baseWorkerOptions options to configure base worker.
117117
baseWorkerOptions struct {
118-
pollerAutoScaler AutoScalerOptions
119-
pollerCount int
120-
pollerRate int
121-
maxConcurrentTask int
122-
maxTaskPerSecond float64
123-
taskWorker taskPoller
124-
identity string
125-
workerType string
126-
shutdownTimeout time.Duration
127-
userContextCancel context.CancelFunc
128-
host string
129-
pollerTracker debug.PollerTracker
118+
pollerAutoScaler AutoScalerOptions
119+
pollerCountWithoutAutoScaling int
120+
pollerRate int
121+
maxConcurrentTask int
122+
maxTaskPerSecond float64
123+
taskWorker taskPoller
124+
identity string
125+
workerType string
126+
shutdownTimeout time.Duration
127+
userContextCancel context.CancelFunc
128+
host string
129+
pollerTracker debug.PollerTracker
130130
}
131131

132132
// baseWorker that wraps worker activities.
@@ -172,12 +172,16 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t
172172
metricsScope = tagScope(metricsScope, tagWorkerType, options.workerType)
173173

174174
concurrency := &worker.ConcurrencyLimit{
175-
PollerPermit: worker.NewResizablePermit(options.pollerCount),
175+
PollerPermit: worker.NewResizablePermit(options.pollerCountWithoutAutoScaling),
176176
TaskPermit: worker.NewResizablePermit(options.maxConcurrentTask),
177177
}
178178

179179
var concurrencyAS *worker.ConcurrencyAutoScaler
180180
if pollerOptions := options.pollerAutoScaler; pollerOptions.Enabled {
181+
concurrency = &worker.ConcurrencyLimit{
182+
PollerPermit: worker.NewResizablePermit(pollerOptions.PollerInitCount),
183+
TaskPermit: worker.NewResizablePermit(options.maxConcurrentTask),
184+
}
181185
concurrencyAS = worker.NewConcurrencyAutoScaler(worker.ConcurrencyAutoScalerInput{
182186
Concurrency: concurrency,
183187
Cooldown: pollerOptions.Cooldown,
@@ -221,7 +225,11 @@ func (bw *baseWorker) Start() {
221225

222226
bw.concurrencyAutoScaler.Start()
223227

224-
for i := 0; i < bw.options.pollerCount; i++ {
228+
maxPollerCount := bw.options.pollerCountWithoutAutoScaling
229+
if bw.options.pollerAutoScaler.Enabled {
230+
maxPollerCount = bw.options.pollerAutoScaler.PollerMaxCount
231+
}
232+
for i := 0; i < maxPollerCount; i++ {
225233
bw.shutdownWG.Add(1)
226234
go bw.runPoller()
227235
}
@@ -232,9 +240,13 @@ func (bw *baseWorker) Start() {
232240
bw.isWorkerStarted = true
233241
traceLog(func() {
234242
bw.logger.Info("Started Worker",
235-
zap.Int("PollerCount", bw.options.pollerCount),
243+
zap.Int("PollerCountWithoutAutoScaling", bw.options.pollerCountWithoutAutoScaling),
236244
zap.Int("MaxConcurrentTask", bw.options.maxConcurrentTask),
237245
zap.Float64("MaxTaskPerSecond", bw.options.maxTaskPerSecond),
246+
zap.Bool("AutoscalerEnabled", bw.options.pollerAutoScaler.Enabled),
247+
zap.Int("AutoscalerPollerMaxCount", bw.options.pollerAutoScaler.PollerMaxCount),
248+
zap.Int("AutoscalerPollerMinCount", bw.options.pollerAutoScaler.PollerMinCount),
249+
zap.Int("AutoscalerPollerInitialCount", bw.options.pollerAutoScaler.PollerInitCount),
238250
)
239251
})
240252
}

0 commit comments

Comments
 (0)