diff --git a/internal/internal_worker.go b/internal/internal_worker.go index ef56b2da0..11b6e5c1b 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -276,21 +276,17 @@ func newWorkflowTaskWorkerInternal( domain, params, ) - pollerCount := params.MaxConcurrentDecisionTaskPollers - if params.AutoScalerOptions.Enabled { - pollerCount = params.AutoScalerOptions.PollerInitCount - } worker := newBaseWorker(baseWorkerOptions{ - pollerAutoScaler: params.AutoScalerOptions, - pollerCount: pollerCount, - pollerRate: defaultPollerRate, - maxConcurrentTask: params.MaxConcurrentDecisionTaskExecutionSize, - maxTaskPerSecond: params.WorkerDecisionTasksPerSecond, - taskWorker: poller, - identity: params.Identity, - workerType: "DecisionWorker", - shutdownTimeout: params.WorkerStopTimeout, - pollerTracker: params.WorkerStats.PollerTracker, + pollerAutoScaler: params.AutoScalerOptions, + pollerCountWithoutAutoScaling: params.MaxConcurrentDecisionTaskPollers, + pollerRate: defaultPollerRate, + maxConcurrentTask: params.MaxConcurrentDecisionTaskExecutionSize, + maxTaskPerSecond: params.WorkerDecisionTasksPerSecond, + taskWorker: poller, + identity: params.Identity, + workerType: "DecisionWorker", + shutdownTimeout: params.WorkerStopTimeout, + pollerTracker: params.WorkerStats.PollerTracker, }, params.Logger, params.MetricsScope, @@ -308,14 +304,14 @@ func newWorkflowTaskWorkerInternal( // 2) local activity task poller will poll from laTunnel, and result will be pushed to laTunnel localActivityTaskPoller := newLocalActivityPoller(params, laTunnel) localActivityWorker := newBaseWorker(baseWorkerOptions{ - pollerCount: 1, // 1 poller (from local channel) is enough for local activity - maxConcurrentTask: params.MaxConcurrentLocalActivityExecutionSize, - maxTaskPerSecond: params.WorkerLocalActivitiesPerSecond, - taskWorker: localActivityTaskPoller, - identity: params.Identity, - workerType: "LocalActivityWorker", - shutdownTimeout: params.WorkerStopTimeout, - pollerTracker: params.WorkerStats.PollerTracker, + pollerCountWithoutAutoScaling: 1, // 1 poller (from local channel) is enough for local activity + maxConcurrentTask: params.MaxConcurrentLocalActivityExecutionSize, + maxTaskPerSecond: params.WorkerLocalActivitiesPerSecond, + taskWorker: localActivityTaskPoller, + identity: params.Identity, + workerType: "LocalActivityWorker", + shutdownTimeout: params.WorkerStopTimeout, + pollerTracker: params.WorkerStats.PollerTracker, }, params.Logger, params.MetricsScope, @@ -472,23 +468,19 @@ func newActivityTaskWorker( workerType string, ) (worker *activityWorker) { ensureRequiredParams(&workerParams) - pollerCount := workerParams.MaxConcurrentActivityTaskPollers - if workerParams.AutoScalerOptions.Enabled { - pollerCount = workerParams.AutoScalerOptions.PollerInitCount - } base := newBaseWorker( baseWorkerOptions{ - pollerAutoScaler: workerParams.AutoScalerOptions, - pollerCount: pollerCount, - pollerRate: defaultPollerRate, - maxConcurrentTask: workerParams.MaxConcurrentActivityExecutionSize, - maxTaskPerSecond: workerParams.WorkerActivitiesPerSecond, - taskWorker: poller, - identity: workerParams.Identity, - workerType: workerType, - shutdownTimeout: workerParams.WorkerStopTimeout, - userContextCancel: workerParams.UserContextCancel, - pollerTracker: workerParams.WorkerStats.PollerTracker, + pollerAutoScaler: workerParams.AutoScalerOptions, + pollerCountWithoutAutoScaling: workerParams.MaxConcurrentActivityTaskPollers, + pollerRate: defaultPollerRate, + maxConcurrentTask: workerParams.MaxConcurrentActivityExecutionSize, + maxTaskPerSecond: workerParams.WorkerActivitiesPerSecond, + taskWorker: poller, + identity: workerParams.Identity, + workerType: workerType, + shutdownTimeout: workerParams.WorkerStopTimeout, + userContextCancel: workerParams.UserContextCancel, + pollerTracker: workerParams.WorkerStats.PollerTracker, }, workerParams.Logger, diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index 727a6fdda..37368776d 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -115,18 +115,18 @@ type ( // baseWorkerOptions options to configure base worker. baseWorkerOptions struct { - pollerAutoScaler AutoScalerOptions - pollerCount int - pollerRate int - maxConcurrentTask int - maxTaskPerSecond float64 - taskWorker taskPoller - identity string - workerType string - shutdownTimeout time.Duration - userContextCancel context.CancelFunc - host string - pollerTracker debug.PollerTracker + pollerAutoScaler AutoScalerOptions + pollerCountWithoutAutoScaling int + pollerRate int + maxConcurrentTask int + maxTaskPerSecond float64 + taskWorker taskPoller + identity string + workerType string + shutdownTimeout time.Duration + userContextCancel context.CancelFunc + host string + pollerTracker debug.PollerTracker } // baseWorker that wraps worker activities. @@ -172,12 +172,16 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t metricsScope = tagScope(metricsScope, tagWorkerType, options.workerType) concurrency := &worker.ConcurrencyLimit{ - PollerPermit: worker.NewResizablePermit(options.pollerCount), + PollerPermit: worker.NewResizablePermit(options.pollerCountWithoutAutoScaling), TaskPermit: worker.NewResizablePermit(options.maxConcurrentTask), } var concurrencyAS *worker.ConcurrencyAutoScaler if pollerOptions := options.pollerAutoScaler; pollerOptions.Enabled { + concurrency = &worker.ConcurrencyLimit{ + PollerPermit: worker.NewResizablePermit(pollerOptions.PollerInitCount), + TaskPermit: worker.NewResizablePermit(options.maxConcurrentTask), + } concurrencyAS = worker.NewConcurrencyAutoScaler(worker.ConcurrencyAutoScalerInput{ Concurrency: concurrency, Cooldown: pollerOptions.Cooldown, @@ -221,7 +225,11 @@ func (bw *baseWorker) Start() { bw.concurrencyAutoScaler.Start() - for i := 0; i < bw.options.pollerCount; i++ { + maxPollerCount := bw.options.pollerCountWithoutAutoScaling + if bw.options.pollerAutoScaler.Enabled { + maxPollerCount = bw.options.pollerAutoScaler.PollerMaxCount + } + for i := 0; i < maxPollerCount; i++ { bw.shutdownWG.Add(1) go bw.runPoller() } @@ -232,9 +240,13 @@ func (bw *baseWorker) Start() { bw.isWorkerStarted = true traceLog(func() { bw.logger.Info("Started Worker", - zap.Int("PollerCount", bw.options.pollerCount), + zap.Int("PollerCountWithoutAutoScaling", bw.options.pollerCountWithoutAutoScaling), zap.Int("MaxConcurrentTask", bw.options.maxConcurrentTask), zap.Float64("MaxTaskPerSecond", bw.options.maxTaskPerSecond), + zap.Bool("AutoscalerEnabled", bw.options.pollerAutoScaler.Enabled), + zap.Int("AutoscalerPollerMaxCount", bw.options.pollerAutoScaler.PollerMaxCount), + zap.Int("AutoscalerPollerMinCount", bw.options.pollerAutoScaler.PollerMinCount), + zap.Int("AutoscalerPollerInitialCount", bw.options.pollerAutoScaler.PollerInitCount), ) }) }