Skip to content

Commit 1785f78

Browse files
authored
fix correct initial poller count for autoscaler (#1440)
What changed? rename pollerCount to less confusing name pollerCountWithoutAutoscaling ensure starting enough goroutines for pollers when autoscaler is enabled Why? Consider a case when autoscaler is enabled with 40 max, 10 initial, 2 min. We need to start 40 poller goroutines first and only allow 10 to pass initially. This is achieved by starting 40 go bw.runPoller() goroutines and set PollerPermit to 10. The bug we have now is we only start 10 goroutines at the beginning so it just won't scale up. How did you test it? bench test with stable tasklist traffic. Pollers are 2 intially with 20 instances with autoscaler enabled. After scaling down 20 -> 10 and 10 -> 2. We saw the poller quota increased to cover the loss of hosts. Schedule to start latency is maintained. Potential risks Additional goroutines will be created but it's ok because we are already creating a max of 10k goroutines by default for sticky executions.
1 parent 40074ec commit 1785f78

File tree

2 files changed

+56
-52
lines changed

2 files changed

+56
-52
lines changed

internal/internal_worker.go

Lines changed: 29 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -276,21 +276,17 @@ func newWorkflowTaskWorkerInternal(
276276
domain,
277277
params,
278278
)
279-
pollerCount := params.MaxConcurrentDecisionTaskPollers
280-
if params.AutoScalerOptions.Enabled {
281-
pollerCount = params.AutoScalerOptions.PollerInitCount
282-
}
283279
worker := newBaseWorker(baseWorkerOptions{
284-
pollerAutoScaler: params.AutoScalerOptions,
285-
pollerCount: pollerCount,
286-
pollerRate: defaultPollerRate,
287-
maxConcurrentTask: params.MaxConcurrentDecisionTaskExecutionSize,
288-
maxTaskPerSecond: params.WorkerDecisionTasksPerSecond,
289-
taskWorker: poller,
290-
identity: params.Identity,
291-
workerType: "DecisionWorker",
292-
shutdownTimeout: params.WorkerStopTimeout,
293-
pollerTracker: params.WorkerStats.PollerTracker,
280+
pollerAutoScaler: params.AutoScalerOptions,
281+
pollerCountWithoutAutoScaling: params.MaxConcurrentDecisionTaskPollers,
282+
pollerRate: defaultPollerRate,
283+
maxConcurrentTask: params.MaxConcurrentDecisionTaskExecutionSize,
284+
maxTaskPerSecond: params.WorkerDecisionTasksPerSecond,
285+
taskWorker: poller,
286+
identity: params.Identity,
287+
workerType: "DecisionWorker",
288+
shutdownTimeout: params.WorkerStopTimeout,
289+
pollerTracker: params.WorkerStats.PollerTracker,
294290
},
295291
params.Logger,
296292
params.MetricsScope,
@@ -308,14 +304,14 @@ func newWorkflowTaskWorkerInternal(
308304
// 2) local activity task poller will poll from laTunnel, and result will be pushed to laTunnel
309305
localActivityTaskPoller := newLocalActivityPoller(params, laTunnel)
310306
localActivityWorker := newBaseWorker(baseWorkerOptions{
311-
pollerCount: 1, // 1 poller (from local channel) is enough for local activity
312-
maxConcurrentTask: params.MaxConcurrentLocalActivityExecutionSize,
313-
maxTaskPerSecond: params.WorkerLocalActivitiesPerSecond,
314-
taskWorker: localActivityTaskPoller,
315-
identity: params.Identity,
316-
workerType: "LocalActivityWorker",
317-
shutdownTimeout: params.WorkerStopTimeout,
318-
pollerTracker: params.WorkerStats.PollerTracker,
307+
pollerCountWithoutAutoScaling: 1, // 1 poller (from local channel) is enough for local activity
308+
maxConcurrentTask: params.MaxConcurrentLocalActivityExecutionSize,
309+
maxTaskPerSecond: params.WorkerLocalActivitiesPerSecond,
310+
taskWorker: localActivityTaskPoller,
311+
identity: params.Identity,
312+
workerType: "LocalActivityWorker",
313+
shutdownTimeout: params.WorkerStopTimeout,
314+
pollerTracker: params.WorkerStats.PollerTracker,
319315
},
320316
params.Logger,
321317
params.MetricsScope,
@@ -472,23 +468,19 @@ func newActivityTaskWorker(
472468
workerType string,
473469
) (worker *activityWorker) {
474470
ensureRequiredParams(&workerParams)
475-
pollerCount := workerParams.MaxConcurrentActivityTaskPollers
476-
if workerParams.AutoScalerOptions.Enabled {
477-
pollerCount = workerParams.AutoScalerOptions.PollerInitCount
478-
}
479471
base := newBaseWorker(
480472
baseWorkerOptions{
481-
pollerAutoScaler: workerParams.AutoScalerOptions,
482-
pollerCount: pollerCount,
483-
pollerRate: defaultPollerRate,
484-
maxConcurrentTask: workerParams.MaxConcurrentActivityExecutionSize,
485-
maxTaskPerSecond: workerParams.WorkerActivitiesPerSecond,
486-
taskWorker: poller,
487-
identity: workerParams.Identity,
488-
workerType: workerType,
489-
shutdownTimeout: workerParams.WorkerStopTimeout,
490-
userContextCancel: workerParams.UserContextCancel,
491-
pollerTracker: workerParams.WorkerStats.PollerTracker,
473+
pollerAutoScaler: workerParams.AutoScalerOptions,
474+
pollerCountWithoutAutoScaling: workerParams.MaxConcurrentActivityTaskPollers,
475+
pollerRate: defaultPollerRate,
476+
maxConcurrentTask: workerParams.MaxConcurrentActivityExecutionSize,
477+
maxTaskPerSecond: workerParams.WorkerActivitiesPerSecond,
478+
taskWorker: poller,
479+
identity: workerParams.Identity,
480+
workerType: workerType,
481+
shutdownTimeout: workerParams.WorkerStopTimeout,
482+
userContextCancel: workerParams.UserContextCancel,
483+
pollerTracker: workerParams.WorkerStats.PollerTracker,
492484
},
493485

494486
workerParams.Logger,

internal/internal_worker_base.go

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -115,18 +115,18 @@ type (
115115

116116
// baseWorkerOptions options to configure base worker.
117117
baseWorkerOptions struct {
118-
pollerAutoScaler AutoScalerOptions
119-
pollerCount int
120-
pollerRate int
121-
maxConcurrentTask int
122-
maxTaskPerSecond float64
123-
taskWorker taskPoller
124-
identity string
125-
workerType string
126-
shutdownTimeout time.Duration
127-
userContextCancel context.CancelFunc
128-
host string
129-
pollerTracker debug.PollerTracker
118+
pollerAutoScaler AutoScalerOptions
119+
pollerCountWithoutAutoScaling int
120+
pollerRate int
121+
maxConcurrentTask int
122+
maxTaskPerSecond float64
123+
taskWorker taskPoller
124+
identity string
125+
workerType string
126+
shutdownTimeout time.Duration
127+
userContextCancel context.CancelFunc
128+
host string
129+
pollerTracker debug.PollerTracker
130130
}
131131

132132
// baseWorker that wraps worker activities.
@@ -172,12 +172,16 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t
172172
metricsScope = tagScope(metricsScope, tagWorkerType, options.workerType)
173173

174174
concurrency := &worker.ConcurrencyLimit{
175-
PollerPermit: worker.NewResizablePermit(options.pollerCount),
175+
PollerPermit: worker.NewResizablePermit(options.pollerCountWithoutAutoScaling),
176176
TaskPermit: worker.NewResizablePermit(options.maxConcurrentTask),
177177
}
178178

179179
var concurrencyAS *worker.ConcurrencyAutoScaler
180180
if pollerOptions := options.pollerAutoScaler; pollerOptions.Enabled {
181+
concurrency = &worker.ConcurrencyLimit{
182+
PollerPermit: worker.NewResizablePermit(pollerOptions.PollerInitCount),
183+
TaskPermit: worker.NewResizablePermit(options.maxConcurrentTask),
184+
}
181185
concurrencyAS = worker.NewConcurrencyAutoScaler(worker.ConcurrencyAutoScalerInput{
182186
Concurrency: concurrency,
183187
Cooldown: pollerOptions.Cooldown,
@@ -221,7 +225,11 @@ func (bw *baseWorker) Start() {
221225

222226
bw.concurrencyAutoScaler.Start()
223227

224-
for i := 0; i < bw.options.pollerCount; i++ {
228+
maxPollerCount := bw.options.pollerCountWithoutAutoScaling
229+
if bw.options.pollerAutoScaler.Enabled {
230+
maxPollerCount = bw.options.pollerAutoScaler.PollerMaxCount
231+
}
232+
for i := 0; i < maxPollerCount; i++ {
225233
bw.shutdownWG.Add(1)
226234
go bw.runPoller()
227235
}
@@ -232,9 +240,13 @@ func (bw *baseWorker) Start() {
232240
bw.isWorkerStarted = true
233241
traceLog(func() {
234242
bw.logger.Info("Started Worker",
235-
zap.Int("PollerCount", bw.options.pollerCount),
243+
zap.Int("PollerCountWithoutAutoScaling", bw.options.pollerCountWithoutAutoScaling),
236244
zap.Int("MaxConcurrentTask", bw.options.maxConcurrentTask),
237245
zap.Float64("MaxTaskPerSecond", bw.options.maxTaskPerSecond),
246+
zap.Bool("AutoscalerEnabled", bw.options.pollerAutoScaler.Enabled),
247+
zap.Int("AutoscalerPollerMaxCount", bw.options.pollerAutoScaler.PollerMaxCount),
248+
zap.Int("AutoscalerPollerMinCount", bw.options.pollerAutoScaler.PollerMinCount),
249+
zap.Int("AutoscalerPollerInitialCount", bw.options.pollerAutoScaler.PollerInitCount),
238250
)
239251
})
240252
}

0 commit comments

Comments
 (0)