Skip to content

Commit 14144a2

Browse files
committed
add AutoscalerOptions and deprecate several first level WorkerOptions fields related
1 parent ffdfc41 commit 14144a2

File tree

9 files changed

+129
-108
lines changed

9 files changed

+129
-108
lines changed

internal/internal_poller_autoscaler.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,20 @@ import (
2626

2727
// defaultPollerScalerCooldownInSeconds
2828
const (
29-
defaultPollerAutoScalerCooldown = 10 * time.Second
30-
defaultPollerAutoScalerTargetUtilization = 0.6
31-
defaultMinConcurrentActivityPollerSize = 1
32-
defaultMinConcurrentDecisionPollerSize = 2
29+
defaultPollerAutoScalerCooldown = 10 * time.Second
30+
defaultMinPollerSize = 2
31+
defaultMaxPollerSize = 200
32+
defaultPollerAutoScalerWaitTimeUpperBound = 256 * time.Millisecond
33+
defaultPollerAutoScalerWaitTimeLowerBound = 16 * time.Millisecond
3334
)
3435

3536
type (
36-
pollerAutoScalerOptions struct {
37-
Enabled bool
38-
InitCount int
39-
MinCount int
40-
MaxCount int
41-
Cooldown time.Duration
42-
DryRun bool
43-
TargetUtilization float64
37+
AutoScalerOptions struct {
38+
Enabled bool
39+
MinCount int
40+
MaxCount int
41+
Cooldown time.Duration
42+
PollerWaitTimeUpperBound time.Duration
43+
PollerWaitTimeLowerBound time.Duration
4444
}
4545
)

internal/internal_utils.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ const (
7474
type (
7575
FeatureFlags struct {
7676
WorkflowExecutionAlreadyCompletedErrorEnabled bool
77-
PollerAutoScalerEnabled bool
77+
// Deprecated: use AutoScalerOptions instead
78+
PollerAutoScalerEnabled bool
7879
}
7980
)
8081

internal/internal_worker.go

Lines changed: 22 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -277,15 +277,7 @@ func newWorkflowTaskWorkerInternal(
277277
params,
278278
)
279279
worker := newBaseWorker(baseWorkerOptions{
280-
pollerAutoScaler: pollerAutoScalerOptions{
281-
Enabled: params.FeatureFlags.PollerAutoScalerEnabled,
282-
InitCount: params.MaxConcurrentDecisionTaskPollers,
283-
MinCount: params.MinConcurrentDecisionTaskPollers,
284-
MaxCount: params.MaxConcurrentDecisionTaskPollers,
285-
Cooldown: params.PollerAutoScalerCooldown,
286-
DryRun: params.PollerAutoScalerDryRun,
287-
TargetUtilization: params.PollerAutoScalerTargetUtilization,
288-
},
280+
pollerAutoScaler: params.AutoScalerOptions,
289281
pollerCount: params.MaxConcurrentDecisionTaskPollers,
290282
pollerRate: defaultPollerRate,
291283
maxConcurrentTask: params.MaxConcurrentDecisionTaskExecutionSize,
@@ -478,15 +470,7 @@ func newActivityTaskWorker(
478470
ensureRequiredParams(&workerParams)
479471
base := newBaseWorker(
480472
baseWorkerOptions{
481-
pollerAutoScaler: pollerAutoScalerOptions{
482-
Enabled: workerParams.FeatureFlags.PollerAutoScalerEnabled,
483-
InitCount: workerParams.MaxConcurrentActivityTaskPollers,
484-
MinCount: workerParams.MinConcurrentActivityTaskPollers,
485-
MaxCount: workerParams.MaxConcurrentActivityTaskPollers,
486-
Cooldown: workerParams.PollerAutoScalerCooldown,
487-
DryRun: workerParams.PollerAutoScalerDryRun,
488-
TargetUtilization: workerParams.PollerAutoScalerTargetUtilization,
489-
},
473+
pollerAutoScaler: workerParams.AutoScalerOptions,
490474
pollerCount: workerParams.MaxConcurrentActivityTaskPollers,
491475
pollerRate: defaultPollerRate,
492476
maxConcurrentTask: workerParams.MaxConcurrentActivityExecutionSize,
@@ -1287,18 +1271,7 @@ func AugmentWorkerOptions(options WorkerOptions) WorkerOptions {
12871271
if options.MaxConcurrentSessionExecutionSize == 0 {
12881272
options.MaxConcurrentSessionExecutionSize = defaultMaxConcurrentSessionExecutionSize
12891273
}
1290-
if options.MinConcurrentActivityTaskPollers == 0 {
1291-
options.MinConcurrentActivityTaskPollers = defaultMinConcurrentActivityPollerSize
1292-
}
1293-
if options.MinConcurrentDecisionTaskPollers == 0 {
1294-
options.MinConcurrentDecisionTaskPollers = defaultMinConcurrentDecisionPollerSize
1295-
}
1296-
if options.PollerAutoScalerCooldown == 0 {
1297-
options.PollerAutoScalerCooldown = defaultPollerAutoScalerCooldown
1298-
}
1299-
if options.PollerAutoScalerTargetUtilization == 0 {
1300-
options.PollerAutoScalerTargetUtilization = defaultPollerAutoScalerTargetUtilization
1301-
}
1274+
options.AutoScalerOptions = augmentAutoScalerOptions(options.AutoScalerOptions)
13021275

13031276
// if the user passes in a tracer then add a tracing context propagator
13041277
if options.Tracer != nil {
@@ -1316,6 +1289,25 @@ func AugmentWorkerOptions(options WorkerOptions) WorkerOptions {
13161289
return options
13171290
}
13181291

1292+
func augmentAutoScalerOptions(options AutoScalerOptions) AutoScalerOptions {
1293+
if options.MinCount <= 1 {
1294+
options.MinCount = defaultMinPollerSize
1295+
}
1296+
if options.MaxCount <= 1 {
1297+
options.MaxCount = defaultMaxPollerSize
1298+
}
1299+
if options.Cooldown == 0 {
1300+
options.Cooldown = defaultPollerAutoScalerCooldown
1301+
}
1302+
if options.PollerWaitTimeUpperBound == 0 {
1303+
options.PollerWaitTimeUpperBound = defaultPollerAutoScalerWaitTimeUpperBound
1304+
}
1305+
if options.PollerWaitTimeLowerBound == 0 {
1306+
options.PollerWaitTimeLowerBound = defaultPollerAutoScalerWaitTimeLowerBound
1307+
}
1308+
return options
1309+
}
1310+
13191311
// getTestTags returns the test tags in the context.
13201312
func getTestTags(ctx context.Context) map[string]map[string]string {
13211313
if ctx != nil {

internal/internal_worker_base.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ type (
115115

116116
// baseWorkerOptions options to configure base worker.
117117
baseWorkerOptions struct {
118-
pollerAutoScaler pollerAutoScalerOptions
118+
pollerAutoScaler AutoScalerOptions
119119
pollerCount int
120120
pollerRate int
121121
maxConcurrentTask int
@@ -179,13 +179,15 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t
179179
var concurrencyAS *worker.ConcurrencyAutoScaler
180180
if pollerOptions := options.pollerAutoScaler; pollerOptions.Enabled {
181181
concurrencyAS = worker.NewConcurrencyAutoScaler(worker.ConcurrencyAutoScalerInput{
182-
Concurrency: concurrency,
183-
Cooldown: pollerOptions.Cooldown,
184-
PollerMaxCount: pollerOptions.MaxCount,
185-
PollerMinCount: pollerOptions.MinCount,
186-
Logger: logger,
187-
Scope: metricsScope,
188-
Clock: clockwork.NewRealClock(),
182+
Concurrency: concurrency,
183+
Cooldown: pollerOptions.Cooldown,
184+
PollerMaxCount: pollerOptions.MaxCount,
185+
PollerMinCount: pollerOptions.MinCount,
186+
PollerWaitTimeUpperBound: pollerOptions.PollerWaitTimeUpperBound,
187+
PollerWaitTimeLowerBound: pollerOptions.PollerWaitTimeLowerBound,
188+
Logger: logger,
189+
Scope: metricsScope,
190+
Clock: clockwork.NewRealClock(),
189191
})
190192
}
191193

internal/internal_worker_test.go

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1294,14 +1294,9 @@ func Test_augmentWorkerOptions(t *testing.T) {
12941294
WorkerLocalActivitiesPerSecond: 20,
12951295
TaskListActivitiesPerSecond: 30,
12961296
MaxConcurrentActivityTaskPollers: 10,
1297-
MinConcurrentActivityTaskPollers: 2,
12981297
MaxConcurrentDecisionTaskExecutionSize: 40,
12991298
WorkerDecisionTasksPerSecond: 50,
13001299
MaxConcurrentDecisionTaskPollers: 15,
1301-
MinConcurrentDecisionTaskPollers: 4,
1302-
PollerAutoScalerCooldown: time.Minute * 2,
1303-
PollerAutoScalerTargetUtilization: 0.8,
1304-
PollerAutoScalerDryRun: false,
13051300
Identity: "identity",
13061301
MetricsScope: tally.NoopScope,
13071302
Logger: zap.NewNop(),
@@ -1323,6 +1318,14 @@ func Test_augmentWorkerOptions(t *testing.T) {
13231318
ShadowOptions: ShadowOptions{},
13241319
FeatureFlags: FeatureFlags{},
13251320
Authorization: nil,
1321+
AutoScalerOptions: AutoScalerOptions{
1322+
Enabled: true,
1323+
MinCount: 10,
1324+
MaxCount: 20,
1325+
Cooldown: time.Minute * 3,
1326+
PollerWaitTimeUpperBound: time.Millisecond * 200,
1327+
PollerWaitTimeLowerBound: time.Millisecond * 100,
1328+
},
13261329
}},
13271330
want: WorkerOptions{
13281331
MaxConcurrentActivityExecutionSize: 3,
@@ -1331,14 +1334,9 @@ func Test_augmentWorkerOptions(t *testing.T) {
13311334
WorkerLocalActivitiesPerSecond: 20,
13321335
TaskListActivitiesPerSecond: 30,
13331336
MaxConcurrentActivityTaskPollers: 10,
1334-
MinConcurrentActivityTaskPollers: 2,
13351337
MaxConcurrentDecisionTaskExecutionSize: 40,
13361338
WorkerDecisionTasksPerSecond: 50,
13371339
MaxConcurrentDecisionTaskPollers: 15,
1338-
MinConcurrentDecisionTaskPollers: 4,
1339-
PollerAutoScalerCooldown: time.Minute * 2,
1340-
PollerAutoScalerTargetUtilization: 0.8,
1341-
PollerAutoScalerDryRun: false,
13421340
Identity: "identity",
13431341
MetricsScope: tally.NoopScope,
13441342
Logger: zap.NewNop(),
@@ -1360,6 +1358,14 @@ func Test_augmentWorkerOptions(t *testing.T) {
13601358
ShadowOptions: ShadowOptions{},
13611359
FeatureFlags: FeatureFlags{},
13621360
Authorization: nil,
1361+
AutoScalerOptions: AutoScalerOptions{
1362+
Enabled: true,
1363+
MinCount: 10,
1364+
MaxCount: 20,
1365+
Cooldown: time.Minute * 3,
1366+
PollerWaitTimeUpperBound: time.Millisecond * 200,
1367+
PollerWaitTimeLowerBound: time.Millisecond * 100,
1368+
},
13631369
},
13641370
},
13651371
{
@@ -1372,14 +1378,9 @@ func Test_augmentWorkerOptions(t *testing.T) {
13721378
WorkerLocalActivitiesPerSecond: 100000,
13731379
TaskListActivitiesPerSecond: 100000,
13741380
MaxConcurrentActivityTaskPollers: 2,
1375-
MinConcurrentActivityTaskPollers: 1,
13761381
MaxConcurrentDecisionTaskExecutionSize: 1000,
13771382
WorkerDecisionTasksPerSecond: 100000,
13781383
MaxConcurrentDecisionTaskPollers: 2,
1379-
MinConcurrentDecisionTaskPollers: 2,
1380-
PollerAutoScalerCooldown: time.Second * 10,
1381-
PollerAutoScalerTargetUtilization: 0.6,
1382-
PollerAutoScalerDryRun: false,
13831384
Identity: "",
13841385
MetricsScope: nil,
13851386
Logger: nil,
@@ -1401,6 +1402,14 @@ func Test_augmentWorkerOptions(t *testing.T) {
14011402
ShadowOptions: ShadowOptions{},
14021403
FeatureFlags: FeatureFlags{},
14031404
Authorization: nil,
1405+
AutoScalerOptions: AutoScalerOptions{
1406+
Enabled: false,
1407+
MinCount: 2,
1408+
MaxCount: 200,
1409+
Cooldown: time.Second * 10,
1410+
PollerWaitTimeUpperBound: time.Millisecond * 256,
1411+
PollerWaitTimeLowerBound: time.Millisecond * 16,
1412+
},
14041413
},
14051414
},
14061415
}

internal/worker.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ type (
8888
// rate at which the worker is able to consume tasks from a task list,
8989
// unless FeatureFlags.PollerAutoScalerEnabled is set to true.
9090
// Default value is 1
91+
// Deprecated: Use AutoScalerOptions instead.
9192
MinConcurrentActivityTaskPollers int
9293

9394
// Optional: To set the maximum concurrent decision task executions this worker can have.
@@ -110,23 +111,30 @@ type (
110111
// cadence-server to retrieve decision tasks. If FeatureFlags.PollerAutoScalerEnabled is set to true,
111112
// changing this value will NOT affect the rate at which the worker is able to consume tasks from a task list.
112113
// Default value is 2
114+
// Deprecated: Use AutoScalerOptions instead.
113115
MinConcurrentDecisionTaskPollers int
114116

115117
// optional: Sets the interval of poller autoscaling, between which poller autoscaler changes the poller count
116118
// based on poll result. It takes effect if FeatureFlags.PollerAutoScalerEnabled is set to true.
117119
// Default value is 1 min
120+
// Deprecated: Use AutoScalerOptions instead.
118121
PollerAutoScalerCooldown time.Duration
119122

120123
// optional: Sets the target utilization rate between [0,1].
121124
// Utilization Rate = pollResultWithTask / (pollResultWithTask + pollResultWithNoTask)
122125
// It takes effect if FeatureFlags.PollerAutoScalerEnabled is set to true.
123126
// Default value is 0.6
127+
// Deprecated: not used any more
124128
PollerAutoScalerTargetUtilization float64
125129

126130
// optional: Sets whether to start dry run mode of autoscaler.
127131
// Default value is false
132+
// Deprecated: not used any more
128133
PollerAutoScalerDryRun bool
129134

135+
// optional: Sets the options for poller autoscaler
136+
AutoScalerOptions AutoScalerOptions
137+
130138
// Optional: Sets an identify that can be used to track this host for debugging.
131139
// default: default identity that include hostname, groupName and process ID.
132140
Identity string
@@ -384,7 +392,7 @@ func ReplayPartialWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName s
384392
// Validate sanity validation of WorkerOptions
385393
func (o WorkerOptions) Validate() error {
386394
// decision task pollers must be >= 2 or unset if sticky tasklist is enabled https://github.com/uber-go/cadence-client/issues/1369
387-
if !o.DisableStickyExecution && (o.MaxConcurrentDecisionTaskPollers == 1 || o.MinConcurrentDecisionTaskPollers == 1) {
395+
if !o.DisableStickyExecution && (o.MaxConcurrentDecisionTaskPollers == 1) {
388396
return fmt.Errorf("DecisionTaskPollers must be >= 2 or use default value")
389397
}
390398
return nil

0 commit comments

Comments
 (0)