Skip to content

Commit 2da954b

Browse files
authored
Expose worker option to set ConcurrentDecisionTaskExecutionSize (#539)
1 parent f6d319a commit 2da954b

File tree

2 files changed

+27
-3
lines changed

2 files changed

+27
-3
lines changed

internal/internal_worker.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ const (
6161
defaultTaskListActivitiesPerSecond = 100000.0 // Large activity executions/sec (unlimited)
6262

6363
defaultMaxConcurrentTaskExecutionSize = 1000 // hardcoded max task execution size.
64-
defaultMaxTaskExecutionRate = 100000 // Large task execution rate (unlimited)
64+
defaultWorkerTaskExecutionRate = 100000 // Large task execution rate (unlimited)
6565

6666
defaultPollerRate = 1000
6767

@@ -116,6 +116,12 @@ type (
116116
// Defines rate limiting on number of activity tasks that can be executed per second per worker.
117117
WorkerActivitiesPerSecond float64
118118

119+
// Defines how many concurrent decision task executions by this worker.
120+
ConcurrentDecisionTaskExecutionSize int
121+
122+
// Defines rate limiting on number of decision tasks that can be executed per second per worker.
123+
WorkerDecisionTasksPerSecond float64
124+
119125
// Defines how many concurrent local activity executions by this worker.
120126
ConcurrentLocalActivityExecutionSize int
121127

@@ -257,8 +263,8 @@ func newWorkflowTaskWorkerInternal(
257263
worker := newBaseWorker(baseWorkerOptions{
258264
pollerCount: params.ConcurrentPollRoutineSize,
259265
pollerRate: defaultPollerRate,
260-
maxConcurrentTask: defaultMaxConcurrentTaskExecutionSize,
261-
maxTaskPerSecond: defaultMaxTaskExecutionRate,
266+
maxConcurrentTask: params.ConcurrentDecisionTaskExecutionSize,
267+
maxTaskPerSecond: params.WorkerDecisionTasksPerSecond,
262268
taskWorker: poller,
263269
identity: params.Identity,
264270
workerType: "DecisionWorker"},
@@ -931,6 +937,8 @@ func newAggregatedWorker(
931937
WorkerActivitiesPerSecond: wOptions.WorkerActivitiesPerSecond,
932938
ConcurrentLocalActivityExecutionSize: wOptions.MaxConcurrentLocalActivityExecutionSize,
933939
WorkerLocalActivitiesPerSecond: wOptions.WorkerLocalActivitiesPerSecond,
940+
ConcurrentDecisionTaskExecutionSize: wOptions.MaxConcurrentDecisionTaskExecutionSize,
941+
WorkerDecisionTasksPerSecond: wOptions.WorkerDecisionTasksPerSecond,
934942
Identity: wOptions.Identity,
935943
MetricsScope: wOptions.MetricsScope,
936944
Logger: wOptions.Logger,
@@ -1151,6 +1159,12 @@ func fillWorkerOptionsDefaults(options WorkerOptions) WorkerOptions {
11511159
if options.WorkerActivitiesPerSecond == 0 {
11521160
options.WorkerActivitiesPerSecond = defaultWorkerActivitiesPerSecond
11531161
}
1162+
if options.MaxConcurrentDecisionTaskExecutionSize == 0 {
1163+
options.MaxConcurrentDecisionTaskExecutionSize = defaultMaxConcurrentTaskExecutionSize
1164+
}
1165+
if options.WorkerDecisionTasksPerSecond == 0 {
1166+
options.WorkerDecisionTasksPerSecond = defaultWorkerTaskExecutionRate
1167+
}
11541168
if options.MaxConcurrentLocalActivityExecutionSize == 0 {
11551169
options.MaxConcurrentLocalActivityExecutionSize = defaultMaxConcurrentLocalActivityExecutionSize
11561170
}

internal/worker.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,16 @@ type (
9090
// The zero value of this uses the default value. Default: 100k
9191
TaskListActivitiesPerSecond float64
9292

93+
// Optional: To set the maximum concurrent decision task executions this worker can have.
94+
// The zero value of this uses the default value.
95+
// default: defaultMaxConcurrentTaskExecutionSize(1k)
96+
MaxConcurrentDecisionTaskExecutionSize int
97+
98+
// Optional: Sets the rate limiting on number of decision tasks that can be executed per second per
99+
// worker. This can be used to limit resources used by the worker.
100+
// The zero value of this uses the default value. Default: 100k
101+
WorkerDecisionTasksPerSecond float64
102+
93103
// Optional: if the activities need auto heart beating for those activities
94104
// by the framework
95105
// default: false not to heartbeat.

0 commit comments

Comments
 (0)