Skip to content

Commit 9ebe8ea

Browse files
authored
worker_options: expose poller count as a user option (#940)
1 parent 968e0ce commit 9ebe8ea

File tree

5 files changed

+179
-31
lines changed

5 files changed

+179
-31
lines changed

internal/internal_worker.go

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -124,21 +124,24 @@ type (
124124
// Task list name to poll.
125125
TaskList string
126126

127-
// Defines how many concurrent poll requests for the task list by this worker.
128-
ConcurrentPollRoutineSize int
129-
130127
// Defines how many concurrent activity executions by this worker.
131128
ConcurrentActivityExecutionSize int
132129

133130
// Defines rate limiting on number of activity tasks that can be executed per second per worker.
134131
WorkerActivitiesPerSecond float64
135132

133+
// MaxConcurrentActivityPollers is the max number of pollers for activity task list
134+
MaxConcurrentActivityPollers int
135+
136136
// Defines how many concurrent decision task executions by this worker.
137137
ConcurrentDecisionTaskExecutionSize int
138138

139139
// Defines rate limiting on number of decision tasks that can be executed per second per worker.
140140
WorkerDecisionTasksPerSecond float64
141141

142+
// MaxConcurrentDecisionPollers is the max number of pollers for decision task list
143+
MaxConcurrentDecisionPollers int
144+
142145
// Defines how many concurrent local activity executions by this worker.
143146
ConcurrentLocalActivityExecutionSize int
144147

@@ -294,7 +297,7 @@ func newWorkflowTaskWorkerInternal(
294297
params,
295298
)
296299
worker := newBaseWorker(baseWorkerOptions{
297-
pollerCount: params.ConcurrentPollRoutineSize,
300+
pollerCount: params.MaxConcurrentDecisionPollers,
298301
pollerRate: defaultPollerRate,
299302
maxConcurrentTask: params.ConcurrentDecisionTaskExecutionSize,
300303
maxTaskPerSecond: params.WorkerDecisionTasksPerSecond,
@@ -395,7 +398,7 @@ func newSessionWorker(service workflowserviceclient.Interface,
395398
params.TaskList = sessionEnvironment.GetResourceSpecificTasklist()
396399
activityWorker := newActivityWorker(service, domain, params, overrides, env, nil)
397400

398-
params.ConcurrentPollRoutineSize = 1
401+
params.MaxConcurrentActivityPollers = 1
399402
params.TaskList = creationTasklist
400403
creationWorker := newActivityWorker(service, domain, params, overrides, env, sessionEnvironment.GetTokenBucket())
401404

@@ -473,7 +476,7 @@ func newActivityTaskWorker(
473476

474477
base := newBaseWorker(
475478
baseWorkerOptions{
476-
pollerCount: workerParams.ConcurrentPollRoutineSize,
479+
pollerCount: workerParams.MaxConcurrentActivityPollers,
477480
pollerRate: defaultPollerRate,
478481
maxConcurrentTask: workerParams.ConcurrentActivityExecutionSize,
479482
maxTaskPerSecond: workerParams.WorkerActivitiesPerSecond,
@@ -1117,9 +1120,7 @@ func (aw *aggregatedWorker) Stop() {
11171120
aw.logger.Info("Stopped Worker")
11181121
}
11191122

1120-
// aggregatedWorker returns an instance to manage the workers. Use defaultConcurrentPollRoutineSize (which is 2) as
1121-
// poller size. The typical RTT (round-trip time) is below 1ms within data center. And the poll API latency is about 5ms.
1122-
// With 2 poller, we could achieve around 300~400 RPS.
1123+
// aggregatedWorker returns an instance to manage both activity and decision workers
11231124
func newAggregatedWorker(
11241125
service workflowserviceclient.Interface,
11251126
domain string,
@@ -1135,13 +1136,14 @@ func newAggregatedWorker(
11351136

11361137
workerParams := workerExecutionParameters{
11371138
TaskList: taskList,
1138-
ConcurrentPollRoutineSize: defaultConcurrentPollRoutineSize,
11391139
ConcurrentActivityExecutionSize: wOptions.MaxConcurrentActivityExecutionSize,
11401140
WorkerActivitiesPerSecond: wOptions.WorkerActivitiesPerSecond,
1141+
MaxConcurrentActivityPollers: wOptions.MaxConcurrentActivityTaskPollers,
11411142
ConcurrentLocalActivityExecutionSize: wOptions.MaxConcurrentLocalActivityExecutionSize,
11421143
WorkerLocalActivitiesPerSecond: wOptions.WorkerLocalActivitiesPerSecond,
11431144
ConcurrentDecisionTaskExecutionSize: wOptions.MaxConcurrentDecisionTaskExecutionSize,
11441145
WorkerDecisionTasksPerSecond: wOptions.WorkerDecisionTasksPerSecond,
1146+
MaxConcurrentDecisionPollers: wOptions.MaxConcurrentDecisionTaskPollers,
11451147
Identity: wOptions.Identity,
11461148
MetricsScope: wOptions.MetricsScope,
11471149
Logger: wOptions.Logger,
@@ -1253,7 +1255,8 @@ func processTestTags(wOptions *WorkerOptions, ep *workerExecutionParameters) {
12531255
switch key {
12541256
case workerOptionsConfigConcurrentPollRoutineSize:
12551257
if size, err := strconv.Atoi(val); err == nil {
1256-
ep.ConcurrentPollRoutineSize = size
1258+
ep.MaxConcurrentActivityPollers = size
1259+
ep.MaxConcurrentDecisionPollers = size
12571260
}
12581261
}
12591262
}
@@ -1388,12 +1391,18 @@ func augmentWorkerOptions(options WorkerOptions) WorkerOptions {
13881391
if options.WorkerActivitiesPerSecond == 0 {
13891392
options.WorkerActivitiesPerSecond = defaultWorkerActivitiesPerSecond
13901393
}
1394+
if options.MaxConcurrentActivityTaskPollers <= 0 {
1395+
options.MaxConcurrentActivityTaskPollers = defaultConcurrentPollRoutineSize
1396+
}
13911397
if options.MaxConcurrentDecisionTaskExecutionSize == 0 {
13921398
options.MaxConcurrentDecisionTaskExecutionSize = defaultMaxConcurrentTaskExecutionSize
13931399
}
13941400
if options.WorkerDecisionTasksPerSecond == 0 {
13951401
options.WorkerDecisionTasksPerSecond = defaultWorkerTaskExecutionRate
13961402
}
1403+
if options.MaxConcurrentDecisionTaskPollers <= 0 {
1404+
options.MaxConcurrentDecisionTaskPollers = defaultConcurrentPollRoutineSize
1405+
}
13971406
if options.MaxConcurrentLocalActivityExecutionSize == 0 {
13981407
options.MaxConcurrentLocalActivityExecutionSize = defaultMaxConcurrentLocalActivityExecutionSize
13991408
}

internal/internal_worker_interfaces_test.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -174,10 +174,11 @@ func (s *InterfacesTestSuite) TestInterface() {
174174
domain := "testDomain"
175175
// Workflow execution parameters.
176176
workflowExecutionParameters := workerExecutionParameters{
177-
TaskList: "testTaskList",
178-
ConcurrentPollRoutineSize: 4,
179-
Logger: logger,
180-
Tracer: opentracing.NoopTracer{},
177+
TaskList: "testTaskList",
178+
MaxConcurrentActivityPollers: 4,
179+
MaxConcurrentDecisionPollers: 4,
180+
Logger: logger,
181+
Tracer: opentracing.NoopTracer{},
181182
}
182183

183184
domainStatus := m.DomainStatusRegistered
@@ -204,10 +205,11 @@ func (s *InterfacesTestSuite) TestInterface() {
204205

205206
// Create activity execution parameters.
206207
activityExecutionParameters := workerExecutionParameters{
207-
TaskList: "testTaskList",
208-
ConcurrentPollRoutineSize: 10,
209-
Logger: logger,
210-
Tracer: opentracing.NoopTracer{},
208+
TaskList: "testTaskList",
209+
MaxConcurrentActivityPollers: 10,
210+
MaxConcurrentDecisionPollers: 10,
211+
Logger: logger,
212+
Tracer: opentracing.NoopTracer{},
211213
}
212214

213215
// Register activity instances and launch the worker.

internal/internal_worker_test.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,11 @@ import (
3131
"time"
3232

3333
"github.com/golang/mock/gomock"
34+
"github.com/opentracing/opentracing-go"
3435
"github.com/stretchr/testify/assert"
3536
"github.com/stretchr/testify/require"
3637
"github.com/stretchr/testify/suite"
38+
"github.com/uber-go/tally"
3739
"go.uber.org/cadence/.gen/go/cadence/workflowservicetest"
3840
"go.uber.org/cadence/.gen/go/shared"
3941
"go.uber.org/cadence/internal/common"
@@ -1133,6 +1135,129 @@ func TestActivityNilArgs_WithDataConverter(t *testing.T) {
11331135
require.Error(t, err) // testDataConverter cannot encode nil value
11341136
}
11351137

1138+
func TestWorkerOptionDefaults(t *testing.T) {
1139+
domain := "worker-options-test"
1140+
taskList := "worker-options-tl"
1141+
worker := newAggregatedWorker(nil, domain, taskList, WorkerOptions{})
1142+
aggWorker, ok := worker.(*aggregatedWorker)
1143+
require.True(t, ok)
1144+
1145+
decisionWorker, ok := aggWorker.workflowWorker.(*workflowWorker)
1146+
require.True(t, ok)
1147+
require.True(t, decisionWorker.executionParameters.Identity != "")
1148+
require.NotNil(t, decisionWorker.executionParameters.Logger)
1149+
require.NotNil(t, decisionWorker.executionParameters.MetricsScope)
1150+
require.Nil(t, decisionWorker.executionParameters.ContextPropagators)
1151+
1152+
expected := workerExecutionParameters{
1153+
TaskList: taskList,
1154+
MaxConcurrentActivityPollers: defaultConcurrentPollRoutineSize,
1155+
MaxConcurrentDecisionPollers: defaultConcurrentPollRoutineSize,
1156+
ConcurrentLocalActivityExecutionSize: defaultMaxConcurrentLocalActivityExecutionSize,
1157+
ConcurrentActivityExecutionSize: defaultMaxConcurrentActivityExecutionSize,
1158+
ConcurrentDecisionTaskExecutionSize: defaultMaxConcurrentTaskExecutionSize,
1159+
WorkerActivitiesPerSecond: defaultTaskListActivitiesPerSecond,
1160+
WorkerDecisionTasksPerSecond: defaultWorkerTaskExecutionRate,
1161+
TaskListActivitiesPerSecond: defaultTaskListActivitiesPerSecond,
1162+
WorkerLocalActivitiesPerSecond: defaultWorkerLocalActivitiesPerSecond,
1163+
StickyScheduleToStartTimeout: stickyDecisionScheduleToStartTimeoutSeconds * time.Second,
1164+
DataConverter: getDefaultDataConverter(),
1165+
Tracer: opentracing.NoopTracer{},
1166+
Logger: decisionWorker.executionParameters.Logger,
1167+
MetricsScope: decisionWorker.executionParameters.MetricsScope,
1168+
Identity: decisionWorker.executionParameters.Identity,
1169+
UserContext: decisionWorker.executionParameters.UserContext,
1170+
}
1171+
1172+
assertWorkerExecutionParamsEqual(t, expected, decisionWorker.executionParameters)
1173+
1174+
activityWorker, ok := aggWorker.activityWorker.(*activityWorker)
1175+
require.True(t, ok)
1176+
require.True(t, activityWorker.executionParameters.Identity != "")
1177+
require.NotNil(t, activityWorker.executionParameters.Logger)
1178+
require.NotNil(t, activityWorker.executionParameters.MetricsScope)
1179+
require.Nil(t, activityWorker.executionParameters.ContextPropagators)
1180+
assertWorkerExecutionParamsEqual(t, expected, activityWorker.executionParameters)
1181+
}
1182+
1183+
func TestWorkerOptionNonDefaults(t *testing.T) {
1184+
domain := "worker-options-test"
1185+
taskList := "worker-options-tl"
1186+
1187+
options := WorkerOptions{
1188+
Identity: "143@worker-options-test-1",
1189+
TaskListActivitiesPerSecond: 8888,
1190+
MaxConcurrentSessionExecutionSize: 3333,
1191+
MaxConcurrentDecisionTaskExecutionSize: 2222,
1192+
MaxConcurrentActivityExecutionSize: 1111,
1193+
MaxConcurrentLocalActivityExecutionSize: 101,
1194+
MaxConcurrentDecisionTaskPollers: 11,
1195+
MaxConcurrentActivityTaskPollers: 12,
1196+
WorkerLocalActivitiesPerSecond: 222,
1197+
WorkerDecisionTasksPerSecond: 111,
1198+
WorkerActivitiesPerSecond: 99,
1199+
StickyScheduleToStartTimeout: 555 * time.Minute,
1200+
DataConverter: &defaultDataConverter{},
1201+
BackgroundActivityContext: context.Background(),
1202+
Logger: zap.NewNop(),
1203+
MetricsScope: tally.NoopScope,
1204+
Tracer: opentracing.NoopTracer{},
1205+
}
1206+
1207+
worker := newAggregatedWorker(nil, domain, taskList, options)
1208+
aggWorker, ok := worker.(*aggregatedWorker)
1209+
require.True(t, ok)
1210+
1211+
decisionWorker, ok := aggWorker.workflowWorker.(*workflowWorker)
1212+
require.True(t, len(decisionWorker.executionParameters.ContextPropagators) > 0)
1213+
require.True(t, ok)
1214+
1215+
expected := workerExecutionParameters{
1216+
TaskList: taskList,
1217+
MaxConcurrentActivityPollers: options.MaxConcurrentActivityTaskPollers,
1218+
MaxConcurrentDecisionPollers: options.MaxConcurrentDecisionTaskPollers,
1219+
ConcurrentLocalActivityExecutionSize: options.MaxConcurrentLocalActivityExecutionSize,
1220+
ConcurrentActivityExecutionSize: options.MaxConcurrentActivityExecutionSize,
1221+
ConcurrentDecisionTaskExecutionSize: options.MaxConcurrentDecisionTaskExecutionSize,
1222+
WorkerActivitiesPerSecond: options.WorkerActivitiesPerSecond,
1223+
WorkerDecisionTasksPerSecond: options.WorkerDecisionTasksPerSecond,
1224+
TaskListActivitiesPerSecond: options.TaskListActivitiesPerSecond,
1225+
WorkerLocalActivitiesPerSecond: options.WorkerLocalActivitiesPerSecond,
1226+
StickyScheduleToStartTimeout: options.StickyScheduleToStartTimeout,
1227+
DataConverter: options.DataConverter,
1228+
Tracer: options.Tracer,
1229+
Logger: options.Logger,
1230+
MetricsScope: options.MetricsScope,
1231+
Identity: options.Identity,
1232+
}
1233+
1234+
assertWorkerExecutionParamsEqual(t, expected, decisionWorker.executionParameters)
1235+
1236+
activityWorker, ok := aggWorker.activityWorker.(*activityWorker)
1237+
require.True(t, ok)
1238+
require.True(t, len(activityWorker.executionParameters.ContextPropagators) > 0)
1239+
assertWorkerExecutionParamsEqual(t, expected, activityWorker.executionParameters)
1240+
}
1241+
1242+
func assertWorkerExecutionParamsEqual(t *testing.T, paramsA workerExecutionParameters, paramsB workerExecutionParameters) {
1243+
require.Equal(t, paramsA.TaskList, paramsA.TaskList)
1244+
require.Equal(t, paramsA.Identity, paramsB.Identity)
1245+
require.Equal(t, paramsA.DataConverter, paramsB.DataConverter)
1246+
require.Equal(t, paramsA.Tracer, paramsB.Tracer)
1247+
require.Equal(t, paramsA.ConcurrentLocalActivityExecutionSize, paramsB.ConcurrentLocalActivityExecutionSize)
1248+
require.Equal(t, paramsA.ConcurrentActivityExecutionSize, paramsB.ConcurrentActivityExecutionSize)
1249+
require.Equal(t, paramsA.ConcurrentDecisionTaskExecutionSize, paramsB.ConcurrentDecisionTaskExecutionSize)
1250+
require.Equal(t, paramsA.WorkerActivitiesPerSecond, paramsB.WorkerActivitiesPerSecond)
1251+
require.Equal(t, paramsA.WorkerDecisionTasksPerSecond, paramsB.WorkerDecisionTasksPerSecond)
1252+
require.Equal(t, paramsA.TaskListActivitiesPerSecond, paramsB.TaskListActivitiesPerSecond)
1253+
require.Equal(t, paramsA.StickyScheduleToStartTimeout, paramsB.StickyScheduleToStartTimeout)
1254+
require.Equal(t, paramsA.MaxConcurrentDecisionPollers, paramsB.MaxConcurrentDecisionPollers)
1255+
require.Equal(t, paramsA.MaxConcurrentActivityPollers, paramsB.MaxConcurrentActivityPollers)
1256+
require.Equal(t, paramsA.NonDeterministicWorkflowPolicy, paramsB.NonDeterministicWorkflowPolicy)
1257+
require.Equal(t, paramsA.EnableLoggingInReplay, paramsB.EnableLoggingInReplay)
1258+
require.Equal(t, paramsA.DisableStickyExecution, paramsB.DisableStickyExecution)
1259+
}
1260+
11361261
/*
11371262
var testWorkflowID1 = s.WorkflowExecution{WorkflowId: common.StringPtr("testWID"), RunId: common.StringPtr("runID")}
11381263
var testWorkflowID2 = s.WorkflowExecution{WorkflowId: common.StringPtr("testWID2"), RunId: common.StringPtr("runID2")}

internal/internal_workers_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,11 @@ func (s *WorkersTestSuite) TestWorkflowWorker() {
9494

9595
ctx, cancel := context.WithCancel(context.Background())
9696
executionParameters := workerExecutionParameters{
97-
TaskList: "testTaskList",
98-
ConcurrentPollRoutineSize: 5,
99-
Logger: logger,
100-
UserContext: ctx,
101-
UserContextCancel: cancel,
97+
TaskList: "testTaskList",
98+
MaxConcurrentDecisionPollers: 5,
99+
Logger: logger,
100+
UserContext: ctx,
101+
UserContextCancel: cancel,
102102
}
103103
overrides := &workerOverrides{workflowTaskHandler: newSampleWorkflowTaskHandler()}
104104
workflowWorker := newWorkflowWorkerInternal(
@@ -119,9 +119,9 @@ func (s *WorkersTestSuite) TestActivityWorker() {
119119
s.service.EXPECT().RespondActivityTaskCompleted(gomock.Any(), gomock.Any(), callOptions...).Return(nil).AnyTimes()
120120

121121
executionParameters := workerExecutionParameters{
122-
TaskList: "testTaskList",
123-
ConcurrentPollRoutineSize: 5,
124-
Logger: logger,
122+
TaskList: "testTaskList",
123+
MaxConcurrentActivityPollers: 5,
124+
Logger: logger,
125125
}
126126
overrides := &workerOverrides{activityTaskHandler: newSampleActivityTaskHandler()}
127127
a := &greeterActivity{}
@@ -163,7 +163,7 @@ func (s *WorkersTestSuite) TestActivityWorkerStop() {
163163
ctx, cancel := context.WithCancel(context.Background())
164164
executionParameters := workerExecutionParameters{
165165
TaskList: "testTaskList",
166-
ConcurrentPollRoutineSize: 5,
166+
MaxConcurrentActivityPollers: 5,
167167
ConcurrentActivityExecutionSize: 2,
168168
Logger: logger,
169169
UserContext: ctx,
@@ -202,9 +202,9 @@ func (s *WorkersTestSuite) TestPollForDecisionTask_InternalServiceError() {
202202
s.service.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), callOptions...).Return(&m.PollForDecisionTaskResponse{}, &m.InternalServiceError{}).AnyTimes()
203203

204204
executionParameters := workerExecutionParameters{
205-
TaskList: "testDecisionTaskList",
206-
ConcurrentPollRoutineSize: 5,
207-
Logger: zap.NewNop(),
205+
TaskList: "testDecisionTaskList",
206+
MaxConcurrentDecisionPollers: 5,
207+
Logger: zap.NewNop(),
208208
}
209209
overrides := &workerOverrides{workflowTaskHandler: newSampleWorkflowTaskHandler()}
210210
workflowWorker := newWorkflowWorkerInternal(

internal/worker.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,12 @@ type (
9292
// The zero value of this uses the default value. Default: 100k
9393
TaskListActivitiesPerSecond float64
9494

95+
// optional: Sets the maximum number of goroutines that will concurrently poll the
96+
// cadence-server to retrieve activity tasks. Changing this value will affect the
97+
// rate at which the worker is able to consume tasks from a task list.
98+
// Default value is 2
99+
MaxConcurrentActivityTaskPollers int
100+
95101
// Optional: To set the maximum concurrent decision task executions this worker can have.
96102
// The zero value of this uses the default value.
97103
// default: defaultMaxConcurrentTaskExecutionSize(1k)
@@ -102,6 +108,12 @@ type (
102108
// The zero value of this uses the default value. Default: 100k
103109
WorkerDecisionTasksPerSecond float64
104110

111+
// optional: Sets the maximum number of goroutines that will concurrently poll the
112+
// cadence-server to retrieve decision tasks. Changing this value will affect the
113+
// rate at which the worker is able to consume tasks from a task list.
114+
// Default value is 2
115+
MaxConcurrentDecisionTaskPollers int
116+
105117
// Optional: if the activities need auto heart beating for those activities
106118
// by the framework
107119
// default: false not to heartbeat.

0 commit comments

Comments
 (0)