diff --git a/internal/common/autoscaler/autoscaler.go b/internal/common/autoscaler/autoscaler.go deleted file mode 100644 index c080917ce..000000000 --- a/internal/common/autoscaler/autoscaler.go +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright (c) 2017-2021 Uber Technologies Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package autoscaler - -// AutoScaler collects data and estimate usage -type ( - AutoScaler interface { - Estimator - // Start starts the autoscaler go routine that scales the ResourceUnit according to Estimator - Start() - // Stop stops the autoscaler if started or do nothing if not yet started - Stop() - } -) diff --git a/internal/common/autoscaler/estimator.go b/internal/common/autoscaler/estimator.go deleted file mode 100644 index 3814b2d21..000000000 --- a/internal/common/autoscaler/estimator.go +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright (c) 2017-2021 Uber Technologies Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package autoscaler - -type ( - // Estimator collects data and estimate usage - Estimator interface { - CollectUsage(data interface{}) error - Estimate() (Usages, error) - Reset() - } -) diff --git a/internal/common/autoscaler/recommender.go b/internal/common/autoscaler/recommender.go deleted file mode 100644 index 4338886a7..000000000 --- a/internal/common/autoscaler/recommender.go +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright (c) 2017-2021 Uber Technologies Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package autoscaler - -import "math" - -// Recommender a recommendation generator for ResourceUnit -type Recommender interface { - Recommend(currentResource ResourceUnit, currentUsages Usages) ResourceUnit -} - -type linearRecommender struct { - lower, upper ResourceUnit - targetUsages Usages -} - -// NewLinearRecommender create a linear Recommender -func NewLinearRecommender(lower, upper ResourceUnit, targetUsages Usages) Recommender { - return &linearRecommender{ - lower: lower, - upper: upper, - targetUsages: targetUsages, - } -} - -// Recommend recommends the new value -func (l *linearRecommender) Recommend(currentResource ResourceUnit, currentUsages Usages) ResourceUnit { - var recommend float64 - - // average recommendation over all UsageType - for usageType := range currentUsages { - var r float64 - if l.targetUsages[usageType] == 0 { // avoid division by zero - r = math.MaxFloat64 - } else { - if currentUsages[usageType].Value() == float64(1000) { - r = l.upper.Value() - } else { - r = currentResource.Value() * currentUsages[usageType].Value() / l.targetUsages[usageType].Value() - } - } - // boundary check - r = math.Min(l.upper.Value(), math.Max(l.lower.Value(), r)) - recommend += r - } - recommend /= float64(len(currentUsages)) - return ResourceUnit(recommend) -} diff --git a/internal/common/autoscaler/recommender_test.go b/internal/common/autoscaler/recommender_test.go deleted file mode 100644 index 8b716e4cb..000000000 --- a/internal/common/autoscaler/recommender_test.go +++ /dev/null @@ -1,144 +0,0 @@ -// Copyright (c) 2017-2021 Uber Technologies Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package autoscaler - -import "testing" - -func Test_linearRecommender_Recommend(t *testing.T) { - type fields struct { - lower ResourceUnit - upper ResourceUnit - targetUsages Usages - } - type args struct { - currentResource ResourceUnit - currentUsages Usages - } - - defaultFields := fields{ - lower: 5, - upper: 15, - targetUsages: map[UsageType]MilliUsage{ - PollerUtilizationRate: 500, - }, - } - - highUpperValue := fields{ - lower: 5, - upper: 100, - targetUsages: map[UsageType]MilliUsage{ - PollerUtilizationRate: 500, - }, - } - - tests := []struct { - name string - fields fields - args args - want ResourceUnit - }{ - { - name: "on target usage", - fields: defaultFields, - args: args{ - currentResource: 10, - currentUsages: map[UsageType]MilliUsage{ - PollerUtilizationRate: 500, - }, - }, - want: ResourceUnit(10), - }, - { - name: "under utilized, scale down", - fields: defaultFields, - args: args{ - currentResource: 10, - currentUsages: map[UsageType]MilliUsage{ - PollerUtilizationRate: 400, - }, - }, - want: ResourceUnit(8), - }, - { - name: "under utilized, scale down but bounded", - fields: defaultFields, - args: args{ - currentResource: 10, - currentUsages: map[UsageType]MilliUsage{ - PollerUtilizationRate: 200, - }, - }, - want: ResourceUnit(5), - }, - { - name: "zero utilization, scale down to min", - fields: defaultFields, - args: args{ - currentResource: 10, - currentUsages: map[UsageType]MilliUsage{ - PollerUtilizationRate: 0, - }, - }, - want: ResourceUnit(5), - }, - { - name: "over utilized, scale up", - fields: defaultFields, - args: args{ - currentResource: 10, - currentUsages: map[UsageType]MilliUsage{ - PollerUtilizationRate: 600, - }, - }, - want: ResourceUnit(12), - }, - { - name: "over utilized, scale up but bounded", - fields: defaultFields, - args: args{ - currentResource: 10, - currentUsages: map[UsageType]MilliUsage{ - PollerUtilizationRate: 1000, - }, - }, - want: ResourceUnit(15), - }, - { - name: "over utilized, since we do not how many tasks are in the queue (because poller usage at 100%), scale up to max", - fields: highUpperValue, - args: args{ - currentResource: 10, - currentUsages: map[UsageType]MilliUsage{ - PollerUtilizationRate: 1000, - }, - }, - want: ResourceUnit(100), - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - l := NewLinearRecommender(tt.fields.lower, tt.fields.upper, tt.fields.targetUsages) - if got := l.Recommend(tt.args.currentResource, tt.args.currentUsages); got != tt.want { - t.Errorf("Recommend() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/internal/common/autoscaler/types.go b/internal/common/autoscaler/types.go deleted file mode 100644 index e7a8c699a..000000000 --- a/internal/common/autoscaler/types.go +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright (c) 2017-2021 Uber Technologies Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package autoscaler - -type ( - // ResourceUnit is the unit of scalable resources - ResourceUnit uint - - // MilliUsage is the custom defined usage of ResourceUnit times 1000 - MilliUsage uint64 - - // Usages are different measurements used by a Recommender to provide a recommended ResourceUnit - Usages map[UsageType]MilliUsage - - // UsageType type of usage - UsageType string -) - -const ( - // PollerUtilizationRate is a scale from 0 to 1 to indicate poller usages - PollerUtilizationRate UsageType = "pollerUtilizationRate" -) - -// Value helper method for type conversion -func (r ResourceUnit) Value() float64 { - return float64(r) -} - -// Value helper method for type conversion -func (u MilliUsage) Value() float64 { - return float64(u) -} diff --git a/internal/common/metrics/constants.go b/internal/common/metrics/constants.go index a20298443..b09b2bf8a 100644 --- a/internal/common/metrics/constants.go +++ b/internal/common/metrics/constants.go @@ -114,4 +114,7 @@ const ( ServerSideHistorySize = CadenceMetricsPrefix + "server-side-history-size" ConcurrentTaskQuota = CadenceMetricsPrefix + "concurrent-task-quota" PollerRequestBufferUsage = CadenceMetricsPrefix + "poller-request-buffer-usage" + + // Concurrency Auto Scaler + ConcurrencyAutoScalerScope = CadenceMetricsPrefix + "concurrency-auto-scaler" ) diff --git a/internal/internal_poller_autoscaler.go b/internal/internal_poller_autoscaler.go index 2dc81e7ba..00510c8e0 100644 --- a/internal/internal_poller_autoscaler.go +++ b/internal/internal_poller_autoscaler.go @@ -21,179 +21,44 @@ package internal import ( - "context" - "errors" - "sync" "time" - - "go.uber.org/atomic" - "go.uber.org/zap" - - "go.uber.org/cadence/internal/common/autoscaler" - "go.uber.org/cadence/internal/worker" ) // defaultPollerScalerCooldownInSeconds const ( - defaultPollerAutoScalerCooldown = time.Minute - defaultPollerAutoScalerTargetUtilization = 0.6 - defaultMinConcurrentActivityPollerSize = 1 - defaultMinConcurrentDecisionPollerSize = 2 -) - -var ( - _ autoscaler.AutoScaler = (*pollerAutoScaler)(nil) - _ autoscaler.Estimator = (*pollerUsageEstimator)(nil) + defaultPollerAutoScalerCooldown = 10 * time.Second + defaultMinPollerSize = 2 + defaultMaxPollerSize = 200 + defaultPollerAutoScalerWaitTimeUpperBound = 256 * time.Millisecond + defaultPollerAutoScalerWaitTimeLowerBound = 16 * time.Millisecond ) type ( - pollerAutoScaler struct { - pollerUsageEstimator - - isDryRun bool - cooldownTime time.Duration - logger *zap.Logger - permit worker.Permit - ctx context.Context - cancel context.CancelFunc - wg *sync.WaitGroup // graceful stop - recommender autoscaler.Recommender - onAutoScale []func() // hook functions that run post autoscale - } - - pollerUsageEstimator struct { - // This single atomic variable stores two variables: - // left 32 bits is noTaskCounts, right 32 bits is taskCounts. - // This avoids unnecessary usage of CompareAndSwap - atomicBits *atomic.Uint64 - } - - pollerAutoScalerOptions struct { - Enabled bool - InitCount int - MinCount int - MaxCount int - Cooldown time.Duration - DryRun bool - TargetUtilization float64 + AutoScalerOptions struct { + // Optional: Enable the auto scaler. + // default: false + Enabled bool + + // Optional: The cooldown period after a scale up or down. + // default: 10 seconds + Cooldown time.Duration + + // Optional: The minimum number of pollers to start with. + // default: 2 + PollerMinCount int + + // Optional: The maximum number of pollers to start with. + // default: 200 + PollerMaxCount int + + // Optional: The upper bound of poller wait time for poller autoscaler to scale down. + // default: 256ms + // NOTE: This is normally not needed to be set by user. + PollerWaitTimeUpperBound time.Duration + + // Optional: The lower bound of poller wait time for poller autoscaler to scale up. + // default: 16ms + // NOTE: This is normally not needed to be set by user. + PollerWaitTimeLowerBound time.Duration } ) - -func newPollerScaler( - options pollerAutoScalerOptions, - logger *zap.Logger, - permit worker.Permit, - hooks ...func()) *pollerAutoScaler { - if !options.Enabled { - return nil - } - ctx, cancel := context.WithCancel(context.Background()) - return &pollerAutoScaler{ - isDryRun: options.DryRun, - cooldownTime: options.Cooldown, - logger: logger, - permit: permit, - wg: &sync.WaitGroup{}, - ctx: ctx, - cancel: cancel, - pollerUsageEstimator: pollerUsageEstimator{atomicBits: atomic.NewUint64(0)}, - recommender: autoscaler.NewLinearRecommender( - autoscaler.ResourceUnit(options.MinCount), - autoscaler.ResourceUnit(options.MaxCount), - autoscaler.Usages{ - autoscaler.PollerUtilizationRate: autoscaler.MilliUsage(options.TargetUtilization * 1000), - }, - ), - onAutoScale: hooks, - } -} - -// Start an auto-scaler go routine and returns a done to stop it -func (p *pollerAutoScaler) Start() { - logger := p.logger.Sugar() - p.wg.Add(1) - go func() { - defer p.wg.Done() - for { - select { - case <-p.ctx.Done(): - return - case <-time.After(p.cooldownTime): - currentResource := autoscaler.ResourceUnit(p.permit.Quota()) - currentUsages, err := p.pollerUsageEstimator.Estimate() - if err != nil { - logger.Warnw("poller autoscaler skip due to estimator error", "error", err) - continue - } - proposedResource := p.recommender.Recommend(currentResource, currentUsages) - logger.Debugw("poller autoscaler recommendation", - "currentUsage", currentUsages, - "current", uint64(currentResource), - "recommend", uint64(proposedResource), - "isDryRun", p.isDryRun) - if !p.isDryRun { - p.permit.SetQuota(int(proposedResource)) - } - p.pollerUsageEstimator.Reset() - - // hooks - for i := range p.onAutoScale { - p.onAutoScale[i]() - } - } - } - }() - return -} - -// Stop stops the poller autoscaler -func (p *pollerAutoScaler) Stop() { - p.cancel() - p.wg.Wait() -} - -// Reset metrics from the start -func (m *pollerUsageEstimator) Reset() { - m.atomicBits.Store(0) -} - -// CollectUsage counts past poll results to estimate autoscaler.Usages -func (m *pollerUsageEstimator) CollectUsage(data interface{}) error { - isEmpty, err := isTaskEmpty(data) - if err != nil { - return err - } - if isEmpty { // no-task poll - m.atomicBits.Add(1 << 32) - } else { - m.atomicBits.Add(1) - } - return nil -} - -func isTaskEmpty(task interface{}) (bool, error) { - switch t := task.(type) { - case *workflowTask: - return t == nil || t.task == nil, nil - case *activityTask: - return t == nil || t.task == nil, nil - case *localActivityTask: - return t == nil || t.workflowTask == nil, nil - default: - return false, errors.New("unknown task type") - } -} - -// Estimate is based on past poll counts -func (m *pollerUsageEstimator) Estimate() (autoscaler.Usages, error) { - bits := m.atomicBits.Load() - noTaskCounts := bits >> 32 // left 32 bits - taskCounts := bits & ((1 << 32) - 1) // right 32 bits - if noTaskCounts+taskCounts == 0 { - return nil, errors.New("autoscaler.Estimator::Estimate error: not enough data") - } - - return autoscaler.Usages{ - autoscaler.PollerUtilizationRate: autoscaler.MilliUsage(taskCounts * 1000 / (noTaskCounts + taskCounts)), - }, nil -} diff --git a/internal/internal_poller_autoscaler_test.go b/internal/internal_poller_autoscaler_test.go deleted file mode 100644 index 4a441b642..000000000 --- a/internal/internal_poller_autoscaler_test.go +++ /dev/null @@ -1,300 +0,0 @@ -// Copyright (c) 2017-2021 Uber Technologies Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package internal - -import ( - "context" - "math/rand" - "sync" - "testing" - "time" - - "go.uber.org/cadence/internal/common/testlogger" - "go.uber.org/cadence/internal/worker" - - "github.com/stretchr/testify/assert" - "go.uber.org/atomic" - - s "go.uber.org/cadence/.gen/go/shared" - "go.uber.org/cadence/internal/common/autoscaler" -) - -func Test_pollerAutoscaler(t *testing.T) { - type args struct { - disabled bool - noTaskPoll, taskPoll, unrelated int - initialPollerCount int - minPollerCount int - maxPollerCount int - targetMilliUsage uint64 - cooldownTime time.Duration - autoScalerEpoch int - isDryRun bool - } - - coolDownTime := time.Millisecond * 50 - - tests := []struct { - name string - args args - want int - }{ - { - name: "dry run doesn't change anything", - args: args{ - noTaskPoll: 100, - taskPoll: 0, - unrelated: 0, - initialPollerCount: 10, - minPollerCount: 2, - maxPollerCount: 10, - targetMilliUsage: 500, - cooldownTime: coolDownTime, - autoScalerEpoch: 1, - isDryRun: true, - }, - want: 10, - }, - { - name: "no utilization, scale to min", - args: args{ - noTaskPoll: 100, - taskPoll: 0, - unrelated: 0, - initialPollerCount: 10, - minPollerCount: 1, - maxPollerCount: 10, - targetMilliUsage: 500, - cooldownTime: coolDownTime, - autoScalerEpoch: 1, - isDryRun: false, - }, - want: 1, - }, - { - name: "low utilization, scale down", - args: args{ - noTaskPoll: 75, - taskPoll: 25, - unrelated: 0, - initialPollerCount: 10, - minPollerCount: 1, - maxPollerCount: 10, - targetMilliUsage: 500, - cooldownTime: coolDownTime, - autoScalerEpoch: 1, - isDryRun: false, - }, - want: 5, - }, - { - name: "over utilized, scale up", - args: args{ - noTaskPoll: 0, - taskPoll: 100, - unrelated: 0, - initialPollerCount: 2, - minPollerCount: 1, - maxPollerCount: 10, - targetMilliUsage: 500, - cooldownTime: coolDownTime, - autoScalerEpoch: 1, - isDryRun: false, - }, - want: 10, - }, - { - name: "over utilized, scale up to max", - args: args{ - noTaskPoll: 0, - taskPoll: 100, - unrelated: 0, - initialPollerCount: 6, - minPollerCount: 1, - maxPollerCount: 10, - targetMilliUsage: 500, - cooldownTime: coolDownTime, - autoScalerEpoch: 1, - isDryRun: false, - }, - want: 10, - }, - { - name: "over utilized, but wait time less than cooldown time", - args: args{ - noTaskPoll: 0, - taskPoll: 100, - unrelated: 0, - initialPollerCount: 6, - minPollerCount: 1, - maxPollerCount: 10, - targetMilliUsage: 500, - cooldownTime: coolDownTime, - autoScalerEpoch: 0, - isDryRun: false, - }, - want: 6, - }, - { - name: "disabled", - args: args{disabled: true}, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - autoscalerEpoch := atomic.NewUint64(0) - pollerScaler := newPollerScaler( - pollerAutoScalerOptions{ - Enabled: !tt.args.disabled, - InitCount: tt.args.initialPollerCount, - MinCount: tt.args.minPollerCount, - MaxCount: tt.args.maxPollerCount, - Cooldown: tt.args.cooldownTime, - DryRun: tt.args.isDryRun, - TargetUtilization: float64(tt.args.targetMilliUsage) / 1000, - }, - testlogger.NewZap(t), - worker.NewResizablePermit(tt.args.initialPollerCount), - // hook function that collects number of iterations - func() { - autoscalerEpoch.Add(1) - }) - if tt.args.disabled { - assert.Nil(t, pollerScaler) - return - } - - pollerScaler.Start() - - // simulate concurrent polling - pollChan := generateRandomPollResults(tt.args.noTaskPoll, tt.args.taskPoll, tt.args.unrelated) - wg := &sync.WaitGroup{} - for i := 0; i < tt.args.maxPollerCount; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for pollResult := range pollChan { - err := pollerScaler.permit.Acquire(context.Background()) - assert.NoError(t, err) - pollerScaler.CollectUsage(pollResult) - pollerScaler.permit.Release() - } - }() - } - - assert.Eventually(t, func() bool { - return autoscalerEpoch.Load() == uint64(tt.args.autoScalerEpoch) - }, tt.args.cooldownTime+100*time.Millisecond, 10*time.Millisecond) - pollerScaler.Stop() - res := pollerScaler.permit.Quota() - pollerScaler.permit.Count() - assert.Equal(t, tt.want, int(res)) - }) - } -} - -func Test_pollerUsageEstimator(t *testing.T) { - type args struct { - noTaskPoll, taskPoll, unrelated int - pollerCount int - } - tests := []struct { - name string - args args - want autoscaler.Usages - wantErr bool - }{ - { - name: "400 no-task, 100 task, 100 unrelated", - args: args{ - noTaskPoll: 400, - taskPoll: 100, - unrelated: 100, - pollerCount: 5, - }, - want: autoscaler.Usages{ - autoscaler.PollerUtilizationRate: 200, - }, - }, - { - name: "0 no-task, 0 task, 100 unrelated", - args: args{ - noTaskPoll: 0, - taskPoll: 0, - unrelated: 100, - pollerCount: 5, - }, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - estimator := &pollerUsageEstimator{atomicBits: atomic.NewUint64(0)} - pollChan := generateRandomPollResults(tt.args.noTaskPoll, tt.args.taskPoll, tt.args.unrelated) - wg := &sync.WaitGroup{} - for i := 0; i < tt.args.pollerCount; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for pollResult := range pollChan { - estimator.CollectUsage(pollResult) - } - }() - } - wg.Wait() - - res, err := estimator.Estimate() - if tt.wantErr { - assert.Error(t, err) - assert.Nil(t, res) - } else { - assert.NoError(t, err) - assert.Equal(t, tt.want, res) - } - }) - } -} - -type unrelatedPolledTask struct{} - -func generateRandomPollResults(noTaskPoll, taskPoll, unrelated int) <-chan interface{} { - var result []interface{} - for i := 0; i < noTaskPoll; i++ { - result = append(result, &activityTask{}) - } - for i := 0; i < taskPoll; i++ { - result = append(result, &activityTask{task: &s.PollForActivityTaskResponse{}}) - } - for i := 0; i < unrelated; i++ { - result = append(result, &unrelatedPolledTask{}) - } - rand.Seed(time.Now().UnixNano()) - rand.Shuffle(len(result), func(i, j int) { - result[i], result[j] = result[j], result[i] - }) - - pollChan := make(chan interface{}, len(result)) - defer close(pollChan) - for i := range result { - pollChan <- result[i] - } - return pollChan -} diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index a1b206f26..841b98199 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -59,6 +59,11 @@ const ( defaultMediumLivedWorkflowTimeoutUpperLimitInSec = 8 * 3600 ) +var ( + _ autoConfigHintAwareTask = (*workflowTask)(nil) + _ autoConfigHintAwareTask = (*activityTask)(nil) +) + type ( // workflowExecutionEventHandler process a single event. workflowExecutionEventHandler interface { @@ -72,9 +77,15 @@ type ( Close() } + // autoConfigHintAwareTask is a task that can provide auto config hint + autoConfigHintAwareTask interface { + getAutoConfigHint() *s.AutoConfigHint + } + // workflowTask wraps a decision task. workflowTask struct { task *s.PollForDecisionTaskResponse + autoConfigHint *s.AutoConfigHint historyIterator HistoryIterator doneCh chan struct{} laResultCh chan *localActivityResult @@ -82,8 +93,9 @@ type ( // activityTask wraps a activity task. activityTask struct { - task *s.PollForActivityTaskResponse - pollStartTime time.Time + task *s.PollForActivityTaskResponse + autoConfigHint *s.AutoConfigHint + pollStartTime time.Time } // resetStickinessTask wraps a ResetStickyTaskListRequest. @@ -153,6 +165,26 @@ type ( } ) +func (t *workflowTask) getAutoConfigHint() *s.AutoConfigHint { + if t.autoConfigHint != nil { + return t.autoConfigHint + } + if t.task != nil { + return t.task.AutoConfigHint + } + return nil +} + +func (t *activityTask) getAutoConfigHint() *s.AutoConfigHint { + if t.autoConfigHint != nil { + return t.autoConfigHint + } + if t.task != nil { + return t.task.AutoConfigHint + } + return nil +} + func newHistory(task *workflowTask, eventsHandler *workflowExecutionEventHandlerImpl) *history { result := &history{ workflowTask: task, diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index 8ad408a07..74d385f6e 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -847,7 +847,7 @@ func (wtp *workflowTaskPoller) poll(ctx context.Context) (interface{}, error) { if response == nil || len(response.TaskToken) == 0 { wtp.metricsScope.Counter(metrics.DecisionPollNoTaskCounter).Inc(1) wtp.updateBacklog(request.TaskList.GetKind(), 0) - return &workflowTask{}, nil + return &workflowTask{autoConfigHint: response.GetAutoConfigHint()}, nil } wtp.updateBacklog(request.TaskList.GetKind(), response.GetBacklogCountHint()) @@ -1115,7 +1115,7 @@ func (atp *activityTaskPoller) pollWithMetrics(ctx context.Context, return nil, err } if response == nil || len(response.TaskToken) == 0 { - return &activityTask{}, nil + return &activityTask{autoConfigHint: response.GetAutoConfigHint()}, nil } workflowType := response.WorkflowType.GetName() diff --git a/internal/internal_utils.go b/internal/internal_utils.go index f3a3695aa..b0e5f09f2 100644 --- a/internal/internal_utils.go +++ b/internal/internal_utils.go @@ -74,7 +74,8 @@ const ( type ( FeatureFlags struct { WorkflowExecutionAlreadyCompletedErrorEnabled bool - PollerAutoScalerEnabled bool + // Deprecated: use AutoScalerOptions instead + PollerAutoScalerEnabled bool } ) diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 4319d0b54..f661b982f 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -277,15 +277,7 @@ func newWorkflowTaskWorkerInternal( params, ) worker := newBaseWorker(baseWorkerOptions{ - pollerAutoScaler: pollerAutoScalerOptions{ - Enabled: params.FeatureFlags.PollerAutoScalerEnabled, - InitCount: params.MaxConcurrentDecisionTaskPollers, - MinCount: params.MinConcurrentDecisionTaskPollers, - MaxCount: params.MaxConcurrentDecisionTaskPollers, - Cooldown: params.PollerAutoScalerCooldown, - DryRun: params.PollerAutoScalerDryRun, - TargetUtilization: params.PollerAutoScalerTargetUtilization, - }, + pollerAutoScaler: params.AutoScalerOptions, pollerCount: params.MaxConcurrentDecisionTaskPollers, pollerRate: defaultPollerRate, maxConcurrentTask: params.MaxConcurrentDecisionTaskExecutionSize, @@ -476,18 +468,14 @@ func newActivityTaskWorker( workerType string, ) (worker *activityWorker) { ensureRequiredParams(&workerParams) + pollerCount := workerParams.MaxConcurrentActivityTaskPollers + if workerParams.AutoScalerOptions.Enabled { + pollerCount = workerParams.AutoScalerOptions.PollerMaxCount + } base := newBaseWorker( baseWorkerOptions{ - pollerAutoScaler: pollerAutoScalerOptions{ - Enabled: workerParams.FeatureFlags.PollerAutoScalerEnabled, - InitCount: workerParams.MaxConcurrentActivityTaskPollers, - MinCount: workerParams.MinConcurrentActivityTaskPollers, - MaxCount: workerParams.MaxConcurrentActivityTaskPollers, - Cooldown: workerParams.PollerAutoScalerCooldown, - DryRun: workerParams.PollerAutoScalerDryRun, - TargetUtilization: workerParams.PollerAutoScalerTargetUtilization, - }, - pollerCount: workerParams.MaxConcurrentActivityTaskPollers, + pollerAutoScaler: workerParams.AutoScalerOptions, + pollerCount: pollerCount, pollerRate: defaultPollerRate, maxConcurrentTask: workerParams.MaxConcurrentActivityExecutionSize, maxTaskPerSecond: workerParams.WorkerActivitiesPerSecond, @@ -1287,18 +1275,7 @@ func AugmentWorkerOptions(options WorkerOptions) WorkerOptions { if options.MaxConcurrentSessionExecutionSize == 0 { options.MaxConcurrentSessionExecutionSize = defaultMaxConcurrentSessionExecutionSize } - if options.MinConcurrentActivityTaskPollers == 0 { - options.MinConcurrentActivityTaskPollers = defaultMinConcurrentActivityPollerSize - } - if options.MinConcurrentDecisionTaskPollers == 0 { - options.MinConcurrentDecisionTaskPollers = defaultMinConcurrentDecisionPollerSize - } - if options.PollerAutoScalerCooldown == 0 { - options.PollerAutoScalerCooldown = defaultPollerAutoScalerCooldown - } - if options.PollerAutoScalerTargetUtilization == 0 { - options.PollerAutoScalerTargetUtilization = defaultPollerAutoScalerTargetUtilization - } + options.AutoScalerOptions = augmentAutoScalerOptions(options.AutoScalerOptions) // if the user passes in a tracer then add a tracing context propagator if options.Tracer != nil { @@ -1316,6 +1293,26 @@ func AugmentWorkerOptions(options WorkerOptions) WorkerOptions { return options } +func augmentAutoScalerOptions(options AutoScalerOptions) AutoScalerOptions { + // need at least 2 for sticky task polling to work + if options.PollerMinCount <= 1 { + options.PollerMinCount = defaultMinPollerSize + } + if options.PollerMaxCount <= 1 { + options.PollerMaxCount = defaultMaxPollerSize + } + if options.Cooldown == 0 { + options.Cooldown = defaultPollerAutoScalerCooldown + } + if options.PollerWaitTimeUpperBound == 0 { + options.PollerWaitTimeUpperBound = defaultPollerAutoScalerWaitTimeUpperBound + } + if options.PollerWaitTimeLowerBound == 0 { + options.PollerWaitTimeLowerBound = defaultPollerAutoScalerWaitTimeLowerBound + } + return options +} + // getTestTags returns the test tags in the context. func getTestTags(ctx context.Context) map[string]map[string]string { if ctx != nil { diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index b4bfb0ad6..9da58a0a5 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -35,9 +35,9 @@ import ( "go.uber.org/cadence/internal/common/debug" "go.uber.org/cadence/internal/worker" + "github.com/jonboulle/clockwork" "github.com/uber-go/tally" "go.uber.org/zap" - "go.uber.org/zap/zapcore" "golang.org/x/time/rate" "go.uber.org/cadence/.gen/go/shared" @@ -114,7 +114,7 @@ type ( // baseWorkerOptions options to configure base worker. baseWorkerOptions struct { - pollerAutoScaler pollerAutoScalerOptions + pollerAutoScaler AutoScalerOptions pollerCount int pollerRate int maxConcurrentTask int @@ -142,10 +142,10 @@ type ( logger *zap.Logger metricsScope tally.Scope - concurrency *worker.ConcurrencyLimit - pollerAutoScaler *pollerAutoScaler - taskQueueCh chan interface{} - sessionTokenBucket *sessionTokenBucket + concurrency *worker.ConcurrencyLimit + concurrencyAutoScaler *worker.ConcurrencyAutoScaler + taskQueueCh chan interface{} + sessionTokenBucket *sessionTokenBucket } polledTask struct { @@ -167,34 +167,42 @@ func createPollRetryPolicy() backoff.RetryPolicy { func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope tally.Scope, sessionTokenBucket *sessionTokenBucket) *baseWorker { ctx, cancel := context.WithCancel(context.Background()) + logger = logger.With(zap.String(tagWorkerType, options.workerType)) + metricsScope = tagScope(metricsScope, tagWorkerType, options.workerType) concurrency := &worker.ConcurrencyLimit{ PollerPermit: worker.NewResizablePermit(options.pollerCount), TaskPermit: worker.NewResizablePermit(options.maxConcurrentTask), } - var pollerAS *pollerAutoScaler + var concurrencyAS *worker.ConcurrencyAutoScaler if pollerOptions := options.pollerAutoScaler; pollerOptions.Enabled { - pollerAS = newPollerScaler( - pollerOptions, - logger, - concurrency.PollerPermit, - ) + concurrencyAS = worker.NewConcurrencyAutoScaler(worker.ConcurrencyAutoScalerInput{ + Concurrency: concurrency, + Cooldown: pollerOptions.Cooldown, + PollerMaxCount: pollerOptions.PollerMaxCount, + PollerMinCount: pollerOptions.PollerMinCount, + PollerWaitTimeUpperBound: pollerOptions.PollerWaitTimeUpperBound, + PollerWaitTimeLowerBound: pollerOptions.PollerWaitTimeLowerBound, + Logger: logger, + Scope: metricsScope, + Clock: clockwork.NewRealClock(), + }) } bw := &baseWorker{ - options: options, - shutdownCh: make(chan struct{}), - taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1), - retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy), - logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}), - metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType), - concurrency: concurrency, - pollerAutoScaler: pollerAS, - taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched. - limiterContext: ctx, - limiterContextCancel: cancel, - sessionTokenBucket: sessionTokenBucket, + options: options, + shutdownCh: make(chan struct{}), + taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1), + retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy), + logger: logger, + metricsScope: metricsScope, + concurrency: concurrency, + concurrencyAutoScaler: concurrencyAS, + taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched. + limiterContext: ctx, + limiterContextCancel: cancel, + sessionTokenBucket: sessionTokenBucket, } if options.pollerRate > 0 { bw.pollLimiter = rate.NewLimiter(rate.Limit(options.pollerRate), 1) @@ -210,9 +218,7 @@ func (bw *baseWorker) Start() { bw.metricsScope.Counter(metrics.WorkerStartCounter).Inc(1) - if bw.pollerAutoScaler != nil { - bw.pollerAutoScaler.Start() - } + bw.concurrencyAutoScaler.Start() for i := 0; i < bw.options.pollerCount; i++ { bw.shutdownWG.Add(1) @@ -301,12 +307,10 @@ func (bw *baseWorker) pollTask() { var err error var task interface{} - if bw.pollerAutoScaler != nil { - if pErr := bw.concurrency.PollerPermit.Acquire(bw.limiterContext); pErr == nil { - defer bw.concurrency.PollerPermit.Release() - } else { - bw.logger.Warn("poller permit acquire error", zap.Error(pErr)) - } + if pErr := bw.concurrency.PollerPermit.Acquire(bw.limiterContext); pErr == nil { + defer bw.concurrency.PollerPermit.Release() + } else { + bw.logger.Warn("poller permit acquire error", zap.Error(pErr)) } bw.retrier.Throttle() @@ -324,14 +328,10 @@ func (bw *baseWorker) pollTask() { } bw.retrier.Failed() } else { - if bw.pollerAutoScaler != nil { - if pErr := bw.pollerAutoScaler.CollectUsage(task); pErr != nil { - bw.logger.Sugar().Warnw("poller auto scaler collect usage error", - "error", pErr, - "task", task) - } - } bw.retrier.Succeeded() + if t, ok := task.(autoConfigHintAwareTask); ok { + bw.concurrencyAutoScaler.ProcessPollerHint(t.getAutoConfigHint()) + } } } @@ -405,9 +405,7 @@ func (bw *baseWorker) Stop() { } close(bw.shutdownCh) bw.limiterContextCancel() - if bw.pollerAutoScaler != nil { - bw.pollerAutoScaler.Stop() - } + bw.concurrencyAutoScaler.Stop() if success := util.AwaitWaitGroup(&bw.shutdownWG, bw.options.shutdownTimeout); !success { traceLog(func() { diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index 2a2848350..1cb85b275 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -1298,14 +1298,9 @@ func Test_augmentWorkerOptions(t *testing.T) { WorkerLocalActivitiesPerSecond: 20, TaskListActivitiesPerSecond: 30, MaxConcurrentActivityTaskPollers: 10, - MinConcurrentActivityTaskPollers: 2, MaxConcurrentDecisionTaskExecutionSize: 40, WorkerDecisionTasksPerSecond: 50, MaxConcurrentDecisionTaskPollers: 15, - MinConcurrentDecisionTaskPollers: 4, - PollerAutoScalerCooldown: time.Minute * 2, - PollerAutoScalerTargetUtilization: 0.8, - PollerAutoScalerDryRun: false, Identity: "identity", MetricsScope: tally.NoopScope, Logger: zap.NewNop(), @@ -1327,6 +1322,14 @@ func Test_augmentWorkerOptions(t *testing.T) { ShadowOptions: ShadowOptions{}, FeatureFlags: FeatureFlags{}, Authorization: nil, + AutoScalerOptions: AutoScalerOptions{ + Enabled: true, + PollerMinCount: 10, + PollerMaxCount: 20, + Cooldown: time.Minute * 3, + PollerWaitTimeUpperBound: time.Millisecond * 200, + PollerWaitTimeLowerBound: time.Millisecond * 100, + }, }}, want: WorkerOptions{ MaxConcurrentActivityExecutionSize: 3, @@ -1335,14 +1338,9 @@ func Test_augmentWorkerOptions(t *testing.T) { WorkerLocalActivitiesPerSecond: 20, TaskListActivitiesPerSecond: 30, MaxConcurrentActivityTaskPollers: 10, - MinConcurrentActivityTaskPollers: 2, MaxConcurrentDecisionTaskExecutionSize: 40, WorkerDecisionTasksPerSecond: 50, MaxConcurrentDecisionTaskPollers: 15, - MinConcurrentDecisionTaskPollers: 4, - PollerAutoScalerCooldown: time.Minute * 2, - PollerAutoScalerTargetUtilization: 0.8, - PollerAutoScalerDryRun: false, Identity: "identity", MetricsScope: tally.NoopScope, Logger: zap.NewNop(), @@ -1364,6 +1362,14 @@ func Test_augmentWorkerOptions(t *testing.T) { ShadowOptions: ShadowOptions{}, FeatureFlags: FeatureFlags{}, Authorization: nil, + AutoScalerOptions: AutoScalerOptions{ + Enabled: true, + PollerMinCount: 10, + PollerMaxCount: 20, + Cooldown: time.Minute * 3, + PollerWaitTimeUpperBound: time.Millisecond * 200, + PollerWaitTimeLowerBound: time.Millisecond * 100, + }, }, }, { @@ -1376,14 +1382,9 @@ func Test_augmentWorkerOptions(t *testing.T) { WorkerLocalActivitiesPerSecond: 100000, TaskListActivitiesPerSecond: 100000, MaxConcurrentActivityTaskPollers: 2, - MinConcurrentActivityTaskPollers: 1, MaxConcurrentDecisionTaskExecutionSize: 1000, WorkerDecisionTasksPerSecond: 100000, MaxConcurrentDecisionTaskPollers: 2, - MinConcurrentDecisionTaskPollers: 2, - PollerAutoScalerCooldown: time.Minute, - PollerAutoScalerTargetUtilization: 0.6, - PollerAutoScalerDryRun: false, Identity: "", MetricsScope: nil, Logger: nil, @@ -1405,6 +1406,14 @@ func Test_augmentWorkerOptions(t *testing.T) { ShadowOptions: ShadowOptions{}, FeatureFlags: FeatureFlags{}, Authorization: nil, + AutoScalerOptions: AutoScalerOptions{ + Enabled: false, + PollerMinCount: 2, + PollerMaxCount: 200, + Cooldown: time.Second * 10, + PollerWaitTimeUpperBound: time.Millisecond * 256, + PollerWaitTimeLowerBound: time.Millisecond * 16, + }, }, }, } @@ -1502,3 +1511,57 @@ func TestTestValidateFnFormat_Workflow(t *testing.T) { }) } } + +func TestGetTaskAutoConfigHint(t *testing.T) { + + hint := shared.AutoConfigHint{ + EnableAutoConfig: common.BoolPtr(true), + PollerWaitTimeInMs: common.Int64Ptr(100), + } + + for _, tt := range []struct { + name string + task interface{} + want *shared.AutoConfigHint + }{ + { + "decision task", + &workflowTask{ + task: &shared.PollForDecisionTaskResponse{AutoConfigHint: &hint}}, + &hint, + }, + { + "empty decision task", + &workflowTask{ + autoConfigHint: &hint}, + &hint, + }, + { + "activity task", + &activityTask{ + task: &shared.PollForActivityTaskResponse{AutoConfigHint: &hint}}, + &hint, + }, + { + "empty activity task", + &activityTask{ + autoConfigHint: &hint}, + &hint, + }, + { + "localactivity task", + &localActivityTask{}, + nil, + }, + } { + t.Run(tt.name, func(t *testing.T) { + task, ok := tt.task.(autoConfigHintAwareTask) + if tt.want == nil { + assert.False(t, ok) + } else { + assert.True(t, ok) + assert.Equal(t, tt.want, task.getAutoConfigHint()) + } + }) + } +} diff --git a/internal/worker.go b/internal/worker.go index 6c8e5cbf4..236351ccf 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -193,6 +193,7 @@ type ( // cadence-server to retrieve activity tasks. Changing this value will affect the // rate at which the worker is able to consume tasks from a task list. // Default value is 2 + // NOTE: if AutoScalerOptions.Enabled is set to true, this value will be ignored and AutoScalerOptions.PollerMaxCount will be used instead MaxConcurrentActivityTaskPollers int // optional: Sets the minimum number of goroutines that will concurrently poll the @@ -200,6 +201,7 @@ type ( // rate at which the worker is able to consume tasks from a task list, // unless FeatureFlags.PollerAutoScalerEnabled is set to true. // Default value is 1 + // Deprecated: No effect and use AutoScalerOptions instead. MinConcurrentActivityTaskPollers int // Optional: To set the maximum concurrent decision task executions this worker can have. @@ -216,29 +218,37 @@ type ( // cadence-server to retrieve decision tasks. Changing this value will affect the // rate at which the worker is able to consume tasks from a task list. // Default value is 2 + // NOTE: if AutoScalerOptions.Enabled is set to true, this value will be the initial value of poller count that scales automatically MaxConcurrentDecisionTaskPollers int // optional: Sets the minimum number of goroutines that will concurrently poll the // cadence-server to retrieve decision tasks. If FeatureFlags.PollerAutoScalerEnabled is set to true, // changing this value will NOT affect the rate at which the worker is able to consume tasks from a task list. // Default value is 2 + // Deprecated: NO effect and use AutoScalerOptions instead. MinConcurrentDecisionTaskPollers int // optional: Sets the interval of poller autoscaling, between which poller autoscaler changes the poller count // based on poll result. It takes effect if FeatureFlags.PollerAutoScalerEnabled is set to true. // Default value is 1 min + // Deprecated: Use AutoScalerOptions instead. PollerAutoScalerCooldown time.Duration // optional: Sets the target utilization rate between [0,1]. // Utilization Rate = pollResultWithTask / (pollResultWithTask + pollResultWithNoTask) // It takes effect if FeatureFlags.PollerAutoScalerEnabled is set to true. // Default value is 0.6 + // Deprecated: not used any more PollerAutoScalerTargetUtilization float64 // optional: Sets whether to start dry run mode of autoscaler. // Default value is false + // Deprecated: not used any more PollerAutoScalerDryRun bool + // optional: Sets the options for poller autoscaler + AutoScalerOptions AutoScalerOptions + // Optional: Sets an identify that can be used to track this host for debugging. // default: default identity that include hostname, groupName and process ID. Identity string @@ -496,7 +506,7 @@ func ReplayPartialWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName s // Validate sanity validation of WorkerOptions func (o WorkerOptions) Validate() error { // decision task pollers must be >= 2 or unset if sticky tasklist is enabled https://github.com/uber-go/cadence-client/issues/1369 - if !o.DisableStickyExecution && (o.MaxConcurrentDecisionTaskPollers == 1 || o.MinConcurrentDecisionTaskPollers == 1) { + if !o.DisableStickyExecution && (o.MaxConcurrentDecisionTaskPollers == 1) { return fmt.Errorf("DecisionTaskPollers must be >= 2 or use default value") } return nil diff --git a/internal/worker/concurrency_auto_scaler.go b/internal/worker/concurrency_auto_scaler.go new file mode 100644 index 000000000..c9cce9960 --- /dev/null +++ b/internal/worker/concurrency_auto_scaler.go @@ -0,0 +1,333 @@ +// Copyright (c) 2017-2021 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package worker + +import ( + "math" + "sync" + "time" + + "github.com/jonboulle/clockwork" + "github.com/uber-go/tally" + "go.uber.org/zap" + + "go.uber.org/cadence/.gen/go/shared" + "go.uber.org/cadence/internal/common/metrics" +) + +const ( + defaultAutoScalerUpdateTick = time.Second + numberOfPollsInRollingAverage = 5 // control flakiness of the auto scaler signal + + autoScalerEventPollerScaleUp autoScalerEvent = "poller-limit-scale-up" + autoScalerEventPollerScaleDown autoScalerEvent = "poller-limit-scale-down" + autoScalerEventPollerSkipUpdateCooldown autoScalerEvent = "poller-limit-skip-update-cooldown" + autoScalerEventPollerSkipUpdateNoChange autoScalerEvent = "poller-limit-skip-update-no-change" + autoScalerEventPollerSkipUpdateNotEnabled autoScalerEvent = "poller-limit-skip-update-not-enabled" + autoScalerEventEmitMetrics autoScalerEvent = "auto-scaler-emit-metrics" + autoScalerEventEnable autoScalerEvent = "auto-scaler-enable" + autoScalerEventDisable autoScalerEvent = "auto-scaler-disable" + autoScalerEventStart autoScalerEvent = "auto-scaler-start" + autoScalerEventStop autoScalerEvent = "auto-scaler-stop" + autoScalerEventLogMsg string = "concurrency auto scaler event" + + metricsEnabled = "enabled" + metricsDisabled = "disabled" + metricsPollerQuota = "poller-quota" + metricsPollerWaitTime = "poller-wait-time" +) + +var ( + metricsPollerQuotaBuckets = tally.MustMakeExponentialValueBuckets(1, 2, 10) // 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024 + metricsPollerWaitTimeBuckets = tally.MustMakeExponentialDurationBuckets(1*time.Millisecond, 2, 14) // 1ms, 2ms, 4ms, 8ms, 16ms, 32ms, 64ms, 128ms, 256ms, 512ms, 1024ms, 2048ms, 4096ms, 8192ms, 16384ms +) + +type ( + ConcurrencyAutoScaler struct { + shutdownChan chan struct{} + wg sync.WaitGroup + log *zap.Logger + scope tally.Scope + clock clockwork.Clock + + concurrency *ConcurrencyLimit + cooldown time.Duration + updateTick time.Duration + + // state of autoscaler + lock sync.RWMutex + enabled bool + + // poller + pollerInitCount int + pollerMaxCount int + pollerMinCount int + pollerWaitTime *rollingAverage[time.Duration] + pollerPermitLastUpdate time.Time + pollerWaitTimeUpperBound time.Duration + pollerWaitTimeLowerBound time.Duration + } + + ConcurrencyAutoScalerInput struct { + Concurrency *ConcurrencyLimit + Cooldown time.Duration // cooldown time of update + Tick time.Duration // frequency of update check + PollerMaxCount int + PollerMinCount int + PollerWaitTimeUpperBound time.Duration + PollerWaitTimeLowerBound time.Duration + Logger *zap.Logger + Scope tally.Scope + Clock clockwork.Clock + } + + autoScalerEvent string +) + +func NewConcurrencyAutoScaler(input ConcurrencyAutoScalerInput) *ConcurrencyAutoScaler { + tick := defaultAutoScalerUpdateTick + if input.Tick != 0 { + tick = input.Tick + } + return &ConcurrencyAutoScaler{ + shutdownChan: make(chan struct{}), + concurrency: input.Concurrency, + cooldown: input.Cooldown, + log: input.Logger.With(zap.String("component", metrics.ConcurrencyAutoScalerScope)), + scope: input.Scope.SubScope(metrics.ConcurrencyAutoScalerScope), + clock: input.Clock, + updateTick: tick, + enabled: false, // initial value should be false and is only turned on from auto config hint + pollerInitCount: input.Concurrency.PollerPermit.Quota(), + pollerMaxCount: input.PollerMaxCount, + pollerMinCount: input.PollerMinCount, + pollerWaitTime: newRollingAverage[time.Duration](numberOfPollsInRollingAverage), + pollerWaitTimeUpperBound: input.PollerWaitTimeUpperBound, + pollerWaitTimeLowerBound: input.PollerWaitTimeLowerBound, + pollerPermitLastUpdate: input.Clock.Now(), + } +} + +func (c *ConcurrencyAutoScaler) Start() { + if c == nil { + return // no-op if auto scaler is not set + } + c.logEvent(autoScalerEventStart) + + c.wg.Add(1) + + go func() { + defer c.wg.Done() + defer func() { + if r := recover(); r != nil { + c.log.Error("panic in concurrency auto scaler, stopping the auto scaler", zap.Any("error", r)) + } + }() + ticker := c.clock.NewTicker(c.updateTick) + defer ticker.Stop() + for { + select { + case <-c.shutdownChan: + return + case <-ticker.Chan(): + c.lock.Lock() + c.logEvent(autoScalerEventEmitMetrics) + c.updatePollerPermit() + c.lock.Unlock() + } + } + }() +} + +func (c *ConcurrencyAutoScaler) Stop() { + if c == nil { + return // no-op if auto scaler is not set + } + c.lock.Lock() + c.logEvent(autoScalerEventStop) + c.lock.Unlock() + close(c.shutdownChan) + c.wg.Wait() +} + +// ProcessPollerHint reads the poller response hint and take actions in a transactional way +// 1. update poller wait time +// 2. enable/disable auto scaler +func (c *ConcurrencyAutoScaler) ProcessPollerHint(hint *shared.AutoConfigHint) { + if c == nil { + return // no-op if auto scaler is not set + } + + if hint == nil { + return + } + + c.lock.Lock() + defer c.lock.Unlock() + + if hint.PollerWaitTimeInMs != nil && *hint.PollerWaitTimeInMs >= 0 { + waitTimeInMs := *hint.PollerWaitTimeInMs + c.pollerWaitTime.Add(time.Millisecond * time.Duration(waitTimeInMs)) + } + + var shouldEnable bool + if hint.EnableAutoConfig != nil && *hint.EnableAutoConfig { + shouldEnable = true + } + if shouldEnable != c.enabled { // flag switched + c.enabled = shouldEnable + if shouldEnable { + c.logEvent(autoScalerEventEnable) + } else { + c.resetConcurrency() + c.logEvent(autoScalerEventDisable) + } + } +} + +// resetConcurrency reset poller quota to the initial value. This will be used for gracefully switching the auto scaler off to avoid workers stuck in the wrong state +func (c *ConcurrencyAutoScaler) resetConcurrency() { + c.concurrency.PollerPermit.SetQuota(c.pollerInitCount) +} + +func (c *ConcurrencyAutoScaler) logEvent(event autoScalerEvent) { + if c.enabled { + c.scope.Counter(metricsEnabled).Inc(1) + } else { + c.scope.Counter(metricsDisabled).Inc(1) + } + c.scope.Histogram(metricsPollerQuota, metricsPollerQuotaBuckets).RecordValue(float64(c.concurrency.PollerPermit.Quota())) + c.scope.Histogram(metricsPollerWaitTime, metricsPollerWaitTimeBuckets).RecordDuration(c.pollerWaitTime.Average()) + c.log.Debug(autoScalerEventLogMsg, + zap.String("event", string(event)), + zap.Bool("enabled", c.enabled), + zap.Int("poller_quota", c.concurrency.PollerPermit.Quota()), + ) +} + +func (c *ConcurrencyAutoScaler) updatePollerPermit() { + if !c.enabled { // skip update if auto scaler is disabled + c.logEvent(autoScalerEventPollerSkipUpdateNotEnabled) + return + } + updateTime := c.clock.Now() + if updateTime.Before(c.pollerPermitLastUpdate.Add(c.cooldown)) { // before cooldown + c.logEvent(autoScalerEventPollerSkipUpdateCooldown) + return + } + + var newQuota int + pollerWaitTime := c.pollerWaitTime.Average() + if pollerWaitTime < c.pollerWaitTimeLowerBound { // pollers are busy + newQuota = c.scaleUpPollerPermit(pollerWaitTime) + c.concurrency.PollerPermit.SetQuota(newQuota) + c.pollerPermitLastUpdate = updateTime + c.logEvent(autoScalerEventPollerScaleUp) + } else if pollerWaitTime > c.pollerWaitTimeUpperBound { // pollers are idle + newQuota = c.scaleDownPollerPermit(pollerWaitTime) + c.concurrency.PollerPermit.SetQuota(newQuota) + c.pollerPermitLastUpdate = updateTime + c.logEvent(autoScalerEventPollerScaleDown) + } else { + c.logEvent(autoScalerEventPollerSkipUpdateNoChange) + } +} + +func (c *ConcurrencyAutoScaler) scaleUpPollerPermit(pollerWaitTime time.Duration) int { + currentQuota := c.concurrency.PollerPermit.Quota() + + // inverse scaling with edge case of 0 wait time + // use logrithm to smooth the scaling to avoid drastic change + newQuota := math.Round( + float64(currentQuota) * smoothingFunc(c.pollerWaitTimeLowerBound) / smoothingFunc(pollerWaitTime)) + newQuota = math.Max( + float64(c.pollerMinCount), + math.Min(float64(c.pollerMaxCount), newQuota), + ) + return int(newQuota) +} + +func (c *ConcurrencyAutoScaler) scaleDownPollerPermit(pollerWaitTime time.Duration) int { + currentQuota := c.concurrency.PollerPermit.Quota() + + // inverse scaling with edge case of 0 wait time + // use logrithm to smooth the scaling to avoid drastic change + newQuota := math.Round( + float64(currentQuota) * smoothingFunc(c.pollerWaitTimeUpperBound) / smoothingFunc(pollerWaitTime)) + newQuota = math.Max( + float64(c.pollerMinCount), + math.Min(float64(c.pollerMaxCount), newQuota), + ) + return int(newQuota) +} + +// smoothingFunc is a log2 function with offset to smooth the scaling and address 0 values +func smoothingFunc(x time.Duration) float64 { + return math.Log2(2 + float64(x/time.Millisecond)) +} + +type number interface { + int64 | float64 | time.Duration +} + +type rollingAverage[T number] struct { + mu sync.RWMutex + window []T + index int + sum T + count int +} + +func newRollingAverage[T number](capacity int) *rollingAverage[T] { + return &rollingAverage[T]{ + window: make([]T, capacity), + } +} + +// Add always add positive numbers +func (r *rollingAverage[T]) Add(value T) { + // no op on zero rolling window + if len(r.window) == 0 { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + // replace the old value with the new value + r.sum += value - r.window[r.index] + r.window[r.index] = value + r.index++ + r.index %= len(r.window) + + if r.count < len(r.window) { + r.count++ + } +} + +func (r *rollingAverage[T]) Average() T { + r.mu.RLock() + defer r.mu.RUnlock() + if r.count == 0 { + return 0 + } + return r.sum / T(r.count) +} diff --git a/internal/worker/concurrency_auto_scaler_test.go b/internal/worker/concurrency_auto_scaler_test.go new file mode 100644 index 000000000..9298c053d --- /dev/null +++ b/internal/worker/concurrency_auto_scaler_test.go @@ -0,0 +1,390 @@ +// Copyright (c) 2017-2021 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package worker + +import ( + "math/rand" + "sync" + "testing" + "time" + + "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/assert" + "github.com/uber-go/tally" + "go.uber.org/atomic" + "go.uber.org/goleak" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" + + "go.uber.org/cadence/.gen/go/shared" + "go.uber.org/cadence/internal/common" +) + +const ( + testTickTime = 1 * time.Second + testTimeFormat = time.TimeOnly +) + +func createTestConcurrencyAutoScaler(t *testing.T, logger *zap.Logger, clock clockwork.Clock) *ConcurrencyAutoScaler { + return NewConcurrencyAutoScaler(ConcurrencyAutoScalerInput{ + Concurrency: &ConcurrencyLimit{ + PollerPermit: NewResizablePermit(100), + TaskPermit: NewResizablePermit(1000), + }, + Cooldown: 2 * testTickTime, + Tick: testTickTime, + PollerMaxCount: 200, + PollerMinCount: 50, + Logger: logger, + Scope: tally.NoopScope, + Clock: clock, + PollerWaitTimeLowerBound: 16 * time.Millisecond, + PollerWaitTimeUpperBound: 256 * time.Millisecond, + }) +} + +func TestConcurrencyAutoScaler(t *testing.T) { + + type eventLog struct { + eventType autoScalerEvent + enabled bool + pollerQuota int64 + } + + for _, tt := range []struct { + name string + pollAutoConfigHint []*shared.AutoConfigHint + expectedEvents []eventLog + }{ + { + "start and stop immediately", + []*shared.AutoConfigHint{}, + []eventLog{ + {autoScalerEventStart, false, 100}, + {autoScalerEventStop, false, 100}, + }, + }, + { + "just enough pollers", + []*shared.AutoConfigHint{ + {common.PtrOf(true), common.PtrOf(int64(16))}, // <- tick, in cool down + {common.PtrOf(true), common.PtrOf(int64(16))}, // <- tick, no update + }, + []eventLog{ + {autoScalerEventStart, false, 100}, + {autoScalerEventEnable, true, 100}, + {autoScalerEventPollerSkipUpdateCooldown, true, 100}, + {autoScalerEventPollerSkipUpdateNoChange, true, 100}, + {autoScalerEventStop, true, 100}, + }, + }, + { + "poller slightly idle but no change", + []*shared.AutoConfigHint{ + {common.PtrOf(true), common.PtrOf(int64(100))}, // <- tick, in cool down + {common.PtrOf(true), common.PtrOf(int64(100))}, // <- tick, no update + }, + []eventLog{ + {autoScalerEventStart, false, 100}, + {autoScalerEventEnable, true, 100}, + {autoScalerEventPollerSkipUpdateCooldown, true, 100}, + {autoScalerEventPollerSkipUpdateNoChange, true, 100}, + {autoScalerEventStop, true, 100}, + }, + }, + { + "busy pollers", + []*shared.AutoConfigHint{ + {common.PtrOf(true), common.PtrOf(int64(10))}, // <- tick, in cool down + {common.PtrOf(true), common.PtrOf(int64(10))}, // <- tick, scale up + }, + []eventLog{ + {autoScalerEventStart, false, 100}, + {autoScalerEventEnable, true, 100}, + {autoScalerEventPollerSkipUpdateCooldown, true, 100}, + {autoScalerEventPollerScaleUp, true, 116}, + {autoScalerEventStop, true, 116}, + }, + }, + { + "very busy pollers, scale up to maximum", + []*shared.AutoConfigHint{ + {common.PtrOf(true), common.PtrOf(int64(0))}, // <- tick, in cool down + {common.PtrOf(true), common.PtrOf(int64(0))}, // <- tick, scale up significantly to maximum + }, + []eventLog{ + {autoScalerEventStart, false, 100}, + {autoScalerEventEnable, true, 100}, + {autoScalerEventPollerSkipUpdateCooldown, true, 100}, + {autoScalerEventPollerScaleUp, true, 200}, + {autoScalerEventStop, true, 200}, + }, + }, + { + "busy pollers, scale up, and then scale down slowly", + []*shared.AutoConfigHint{ + {common.PtrOf(true), common.PtrOf(int64(10))}, // <- tick, in cool down + {common.PtrOf(true), common.PtrOf(int64(10))}, // <- tick, scale up + {common.PtrOf(true), common.PtrOf(int64(10000))}, // <- tick, skip due to cooldown + {common.PtrOf(true), common.PtrOf(int64(10000))}, // <- tick, scale down + {common.PtrOf(true), common.PtrOf(int64(10000))}, // <- tick, skip due to cooldown + {common.PtrOf(true), common.PtrOf(int64(10000))}, // <- tick, scale down further + }, + []eventLog{ + {autoScalerEventStart, false, 100}, + {autoScalerEventEnable, true, 100}, + {autoScalerEventPollerSkipUpdateCooldown, true, 100}, + {autoScalerEventPollerScaleUp, true, 116}, + {autoScalerEventPollerSkipUpdateCooldown, true, 116}, + {autoScalerEventPollerScaleDown, true, 70}, + {autoScalerEventPollerSkipUpdateCooldown, true, 70}, + {autoScalerEventPollerScaleDown, true, 50}, + {autoScalerEventStop, true, 50}, + }, + }, + { + "pollers, scale up and down multiple times", + []*shared.AutoConfigHint{ + {common.PtrOf(true), common.PtrOf(int64(10))}, // <- tick, in cool down + {common.PtrOf(true), common.PtrOf(int64(10))}, // <- tick, scale up + {common.PtrOf(true), common.PtrOf(int64(10000))}, // <- tick, skip due to cooldown + {common.PtrOf(true), common.PtrOf(int64(10000))}, // <- tick, scale down + {common.PtrOf(true), common.PtrOf(int64(10))}, // <- tick, skip due to cooldown + {common.PtrOf(true), common.PtrOf(int64(10))}, // <- tick, scale up again + {common.PtrOf(true), common.PtrOf(int64(10000))}, // <- tick, skip due to cooldown + {common.PtrOf(true), common.PtrOf(int64(10000))}, // <- tick, scale down again + }, + []eventLog{ + {autoScalerEventStart, false, 100}, + {autoScalerEventEnable, true, 100}, + {autoScalerEventPollerSkipUpdateCooldown, true, 100}, + {autoScalerEventPollerScaleUp, true, 116}, + {autoScalerEventPollerSkipUpdateCooldown, true, 116}, + {autoScalerEventPollerScaleDown, true, 70}, + {autoScalerEventPollerSkipUpdateCooldown, true, 70}, + {autoScalerEventPollerScaleUp, true, 81}, + {autoScalerEventPollerSkipUpdateCooldown, true, 81}, + {autoScalerEventPollerScaleDown, true, 50}, + {autoScalerEventStop, true, 50}, + }, + }, + { + "idle pollers waiting for tasks", + []*shared.AutoConfigHint{ + {common.PtrOf(true), common.PtrOf(int64(1000))}, // <- tick, in cool down + {common.PtrOf(true), common.PtrOf(int64(1000))}, // <- tick, scale down + }, + []eventLog{ + {autoScalerEventStart, false, 100}, + {autoScalerEventEnable, true, 100}, + {autoScalerEventPollerSkipUpdateCooldown, true, 100}, + {autoScalerEventPollerScaleDown, true, 80}, + {autoScalerEventStop, true, 80}, + }, + }, + { + "idle pollers, scale down to minimum", + []*shared.AutoConfigHint{ + {common.PtrOf(true), common.PtrOf(int64(60000))}, // <- tick, in cool down + {common.PtrOf(true), common.PtrOf(int64(60000))}, // <- tick, scale down + }, + []eventLog{ + {autoScalerEventStart, false, 100}, + {autoScalerEventEnable, true, 100}, + {autoScalerEventPollerSkipUpdateCooldown, true, 100}, + {autoScalerEventPollerScaleDown, true, 50}, + {autoScalerEventStop, true, 50}, + }, + }, + { + "idle pollers but disabled scaling", + []*shared.AutoConfigHint{ + {common.PtrOf(false), common.PtrOf(int64(60000))}, // <- tick, in cool down + {common.PtrOf(false), common.PtrOf(int64(60000))}, // <- tick, no update due to disabled + }, + []eventLog{ + {autoScalerEventStart, false, 100}, + {autoScalerEventPollerSkipUpdateNotEnabled, false, 100}, + {autoScalerEventPollerSkipUpdateNotEnabled, false, 100}, + {autoScalerEventStop, false, 100}, + }, + }, + { + "idle pollers but disabled scaling at a later time", + []*shared.AutoConfigHint{ + {common.PtrOf(true), common.PtrOf(int64(1000))}, // <- tick, in cool down + {common.PtrOf(true), common.PtrOf(int64(1000))}, // <- tick, scale down + {common.PtrOf(false), common.PtrOf(int64(1000))}, // <- disable + }, + []eventLog{ + {autoScalerEventStart, false, 100}, + {autoScalerEventEnable, true, 100}, + {autoScalerEventPollerSkipUpdateCooldown, true, 100}, + {autoScalerEventPollerScaleDown, true, 80}, + {autoScalerEventDisable, false, 100}, + {autoScalerEventPollerSkipUpdateNotEnabled, false, 100}, + {autoScalerEventStop, false, 100}, + }, + }, + { + "idle pollers and enabled at a later time", + []*shared.AutoConfigHint{ + {common.PtrOf(false), common.PtrOf(int64(1000))}, // <- tick, in cool down + {common.PtrOf(false), common.PtrOf(int64(1000))}, // <- tick, not enabled + {common.PtrOf(true), common.PtrOf(int64(1000))}, // <- tick, enable scale up + }, + []eventLog{ + {autoScalerEventStart, false, 100}, + {autoScalerEventPollerSkipUpdateNotEnabled, false, 100}, + {autoScalerEventPollerSkipUpdateNotEnabled, false, 100}, + {autoScalerEventEnable, true, 100}, + {autoScalerEventPollerScaleDown, true, 80}, + {autoScalerEventStop, true, 80}, + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + defer goleak.VerifyNone(t) + core, obs := observer.New(zap.DebugLevel) + logger := zap.New(core) + clock := clockwork.NewFakeClockAt(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)) + scaler := createTestConcurrencyAutoScaler(t, logger, clock) + + // mock poller every 1 tick time + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + clock.Sleep(testTickTime / 2) // poll delay by 0.5 unit of time to avoid test flakiness + for _, hint := range tt.pollAutoConfigHint { + t.Log("hint process time: ", clock.Now().Format(testTimeFormat)) + for i := 0; i < numberOfPollsInRollingAverage; i++ { // simulate polling multiple times to avoid flakiness + scaler.ProcessPollerHint(hint) + } + clock.Sleep(testTickTime) + } + }() + + scaler.Start() + clock.BlockUntil(2) + + // advance clock + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < len(tt.pollAutoConfigHint)*2+1; i++ { + clock.Advance(testTickTime / 2) + time.Sleep(100 * time.Millisecond) // process non-time logic + } + }() + + wg.Wait() + scaler.Stop() + + var actualEvents []eventLog + for _, event := range obs.FilterMessage(autoScalerEventLogMsg).All() { + if event.ContextMap()["event"] != string(autoScalerEventEmitMetrics) { + t.Log("event: ", event.ContextMap()) + + actualEvents = append(actualEvents, eventLog{ + eventType: autoScalerEvent(event.ContextMap()["event"].(string)), + enabled: event.ContextMap()["enabled"].(bool), + pollerQuota: event.ContextMap()["poller_quota"].(int64), + }) + } + } + assert.ElementsMatch(t, tt.expectedEvents, actualEvents) + }) + } +} + +func TestRollingAverage(t *testing.T) { + for _, tt := range []struct { + name string + cap int + input []float64 + expected []float64 + }{ + { + "cap is 0", + 0, + []float64{1, 2, 3, 4, 5, 6, 7}, + []float64{0, 0, 0, 0, 0, 0, 0}, + }, + { + "cap is 1", + 1, + []float64{1, 2, 3, 4, 5, 6, 7}, + []float64{1, 2, 3, 4, 5, 6, 7}, + }, + { + "cap is 2", + 2, + []float64{1, 2, 3, 4, 5, 6, 7}, + []float64{1, 1.5, 2.5, 3.5, 4.5, 5.5, 6.5}, + }, + { + "cap is 3", + 3, + []float64{1, 2, 3, 4, 5, 6, 7}, + []float64{1, 1.5, 2, 3, 4, 5, 6}, + }, + { + "cap is 4", + 4, + []float64{1, 2, 3, 4, 5, 6, 7}, + []float64{1, 1.5, 2, 2.5, 3.5, 4.5, 5.5}, + }, + } { + t.Run(tt.name, func(t *testing.T) { + defer goleak.VerifyNone(t) + r := newRollingAverage[float64](tt.cap) + for i := range tt.input { + r.Add(tt.input[i]) + assert.Equal(t, tt.expected[i], r.Average()) + } + }) + } +} + +func TestRollingAverage_Race(t *testing.T) { + total := 100000 + r := newRollingAverage[float64](total) + trueSum := atomic.NewFloat64(0) + var wg sync.WaitGroup + for i := 0; i < total; i++ { + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(time.Millisecond * time.Duration(rand.Intn(10))) + v := rand.Float64() + r.Add(v) + trueSum.Add(v) + time.Sleep(time.Millisecond * time.Duration(rand.Intn(10))) + r.Average() + }() + } + + wg.Wait() + + // sanity check + assert.InDelta(t, trueSum.Load()/float64(total), r.Average(), 0.001) +} diff --git a/worker/worker.go b/worker/worker.go index 3dd3cee85..401d058dd 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -219,6 +219,9 @@ type ( // ShadowOptions is used to configure a WorkflowShadower. ShadowOptions = internal.ShadowOptions + // AutoScalerOptions is used to configure the auto scaler. + AutoScalerOptions = internal.AutoScalerOptions + // ShadowMode is an enum for configuring if shadowing should continue after all workflows matches the WorkflowQuery have been replayed. ShadowMode = internal.ShadowMode // TimeFilter represents a time range through the min and max timestamp