Skip to content

Commit f29fbbd

Browse files
authored
Set up activity throttle limit as input and propagate to server (#331)
1 parent 63fac2e commit f29fbbd

File tree

5 files changed

+91
-36
lines changed

5 files changed

+91
-36
lines changed

internal/internal_public.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ func NewActivityTaskWorker(
104104
TaskList: taskList,
105105
ConcurrentPollRoutineSize: defaultConcurrentPollRoutineSize,
106106
ConcurrentActivityExecutionSize: wOptions.MaxConcurrentActivityExecutionSize,
107-
MaxActivityExecutionPerSecond: wOptions.MaxActivityExecutionPerSecond,
107+
WorkerActivitiesPerSecond: wOptions.WorkerActivitiesPerSecond,
108+
TaskListActivitiesPerSecond: wOptions.TaskListActivitiesPerSecond,
108109
Identity: wOptions.Identity,
109110
MetricsScope: wOptions.MetricsScope,
110111
Logger: wOptions.Logger,

internal/internal_task_pollers.go

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,14 @@ type (
8181

8282
// activityTaskPoller implements polling/processing a workflow task
8383
activityTaskPoller struct {
84-
domain string
85-
taskListName string
86-
identity string
87-
service workflowserviceclient.Interface
88-
taskHandler ActivityTaskHandler
89-
metricsScope tally.Scope
90-
logger *zap.Logger
84+
domain string
85+
taskListName string
86+
identity string
87+
service workflowserviceclient.Interface
88+
taskHandler ActivityTaskHandler
89+
metricsScope tally.Scope
90+
logger *zap.Logger
91+
activitiesPerSecond float64
9192
}
9293

9394
historyIteratorImpl struct {
@@ -431,13 +432,15 @@ func newGetHistoryPageFunc(
431432
func newActivityTaskPoller(taskHandler ActivityTaskHandler, service workflowserviceclient.Interface,
432433
domain string, params workerExecutionParameters) *activityTaskPoller {
433434
return &activityTaskPoller{
434-
taskHandler: taskHandler,
435-
service: metrics.NewWorkflowServiceWrapper(service, params.MetricsScope),
436-
domain: domain,
437-
taskListName: params.TaskList,
438-
identity: params.Identity,
439-
logger: params.Logger,
440-
metricsScope: params.MetricsScope}
435+
taskHandler: taskHandler,
436+
service: metrics.NewWorkflowServiceWrapper(service, params.MetricsScope),
437+
domain: domain,
438+
taskListName: params.TaskList,
439+
identity: params.Identity,
440+
logger: params.Logger,
441+
metricsScope: params.MetricsScope,
442+
activitiesPerSecond: params.TaskListActivitiesPerSecond,
443+
}
441444
}
442445

443446
// Poll for a single activity task from the service
@@ -450,9 +453,10 @@ func (atp *activityTaskPoller) poll() (*activityTask, error) {
450453
atp.logger.Debug("activityTaskPoller::Poll")
451454
})
452455
request := &s.PollForActivityTaskRequest{
453-
Domain: common.StringPtr(atp.domain),
454-
TaskList: common.TaskListPtr(s.TaskList{Name: common.StringPtr(atp.taskListName)}),
455-
Identity: common.StringPtr(atp.identity),
456+
Domain: common.StringPtr(atp.domain),
457+
TaskList: common.TaskListPtr(s.TaskList{Name: common.StringPtr(atp.taskListName)}),
458+
Identity: common.StringPtr(atp.identity),
459+
TaskListMetadata: &s.TaskListMetadata{MaxTasksPerSecond: &atp.activitiesPerSecond},
456460
}
457461

458462
tchCtx, cancel, opt := newChannelContext(context.Background(), chanTimeout(pollTaskServiceTimeOut))

internal/internal_worker.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ const (
5252
defaultConcurrentPollRoutineSize = 2
5353

5454
defaultMaxConcurrentActivityExecutionSize = 1000 // Large concurrent activity execution size (1k)
55-
defaultMaxActivityExecutionRate = 100000 // Large activity execution rate (unlimited)
55+
_defaultWorkerActivitiesPerSecond = 100000 // Large activity executions/sec (unlimited)
56+
57+
_defaultTaskListActivitiesPerSecond = 100000.0 // Large activity executions/sec (unlimited)
5658

5759
defaultMaxConcurrentWorkflowExecutionSize = 50 // hardcoded max workflow execution size.
5860
defaultMaxWorkflowExecutionRate = 100000 // Large workflow execution rate (unlimited)
@@ -108,8 +110,11 @@ type (
108110
// Defines how many concurrent executions for task list by this worker.
109111
ConcurrentActivityExecutionSize int
110112

111-
// Defines rate limiting on number of activity tasks that can be executed per second.
112-
MaxActivityExecutionPerSecond float64
113+
// Defines rate limiting on number of activity tasks that can be executed per second per worker.
114+
WorkerActivitiesPerSecond float64
115+
116+
// TaskListActivitiesPerSecond is the throttling limit for activity tasks controlled by the server
117+
TaskListActivitiesPerSecond float64
113118

114119
// User can provide an identity for the debuggability. If not provided the framework has
115120
// a default option.
@@ -310,7 +315,7 @@ func newActivityTaskWorker(
310315
baseWorkerOptions{
311316
pollerCount: workerParams.ConcurrentPollRoutineSize,
312317
maxConcurrentTask: workerParams.ConcurrentActivityExecutionSize,
313-
maxTaskPerSecond: workerParams.MaxActivityExecutionPerSecond,
318+
maxTaskPerSecond: workerParams.WorkerActivitiesPerSecond,
314319
taskWorker: poller,
315320
identity: workerParams.Identity,
316321
workerType: "ActivityWorker",
@@ -1010,14 +1015,15 @@ func newAggregatedWorker(
10101015
TaskList: taskList,
10111016
ConcurrentPollRoutineSize: defaultConcurrentPollRoutineSize,
10121017
ConcurrentActivityExecutionSize: wOptions.MaxConcurrentActivityExecutionSize,
1013-
MaxActivityExecutionPerSecond: wOptions.MaxActivityExecutionPerSecond,
1018+
WorkerActivitiesPerSecond: wOptions.WorkerActivitiesPerSecond,
10141019
Identity: wOptions.Identity,
10151020
MetricsScope: wOptions.MetricsScope,
10161021
Logger: wOptions.Logger,
10171022
EnableLoggingInReplay: wOptions.EnableLoggingInReplay,
10181023
UserContext: wOptions.BackgroundActivityContext,
10191024
DisableStickyExecution: wOptions.DisableStickyExecution,
10201025
StickyScheduleToStartTimeout: wOptions.StickyScheduleToStartTimeout,
1026+
TaskListActivitiesPerSecond: wOptions.TaskListActivitiesPerSecond,
10211027
}
10221028

10231029
ensureRequiredParams(&workerParams)
@@ -1264,8 +1270,11 @@ func fillWorkerOptionsDefaults(options WorkerOptions) WorkerOptions {
12641270
if options.MaxConcurrentActivityExecutionSize == 0 {
12651271
options.MaxConcurrentActivityExecutionSize = defaultMaxConcurrentActivityExecutionSize
12661272
}
1267-
if options.MaxActivityExecutionPerSecond == 0 {
1268-
options.MaxActivityExecutionPerSecond = defaultMaxActivityExecutionRate
1273+
if options.WorkerActivitiesPerSecond == 0 {
1274+
options.WorkerActivitiesPerSecond = _defaultWorkerActivitiesPerSecond
1275+
}
1276+
if options.TaskListActivitiesPerSecond == 0 {
1277+
options.TaskListActivitiesPerSecond = _defaultTaskListActivitiesPerSecond
12691278
}
12701279
if options.StickyScheduleToStartTimeout.Seconds() == 0 {
12711280
options.StickyScheduleToStartTimeout = stickyDecisionScheduleToStartTimeoutSeconds * time.Second

internal/internal_worker_test.go

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ func TestCreateWorker(t *testing.T) {
214214
mockCtrl := gomock.NewController(t)
215215
service := workflowservicetest.NewMockClient(mockCtrl)
216216

217-
worker := createWorker(t, service)
217+
worker := createWorkerWithThrottle(t, service, float64(500.0))
218218
err := worker.Start()
219219
require.NoError(t, err)
220220
time.Sleep(time.Millisecond * 200)
@@ -293,7 +293,33 @@ func TestWorkerStartFailsWithInvalidDomain(t *testing.T) {
293293
}
294294
}
295295

296+
func ofPollForActivityTaskRequest(tps float64) gomock.Matcher {
297+
return &mockPollForActivityTaskRequest{tps: tps}
298+
}
299+
300+
type mockPollForActivityTaskRequest struct {
301+
tps float64
302+
}
303+
304+
func (m *mockPollForActivityTaskRequest) Matches(x interface{}) bool {
305+
v, ok := x.(*s.PollForActivityTaskRequest)
306+
if !ok {
307+
return false
308+
}
309+
return *(v.TaskListMetadata.MaxTasksPerSecond) == m.tps
310+
}
311+
312+
func (m *mockPollForActivityTaskRequest) String() string {
313+
return "PollForActivityTaskRequest"
314+
}
315+
296316
func createWorker(t *testing.T, service *workflowservicetest.MockClient) Worker {
317+
return createWorkerWithThrottle(t, service, float64(0.0))
318+
}
319+
320+
func createWorkerWithThrottle(
321+
t *testing.T, service *workflowservicetest.MockClient, activitiesPerSecond float64,
322+
) Worker {
297323
domain := "testDomain"
298324
domainStatus := s.DomainStatusRegistered
299325
domainDesc := &s.DescribeDomainResponse{
@@ -309,7 +335,13 @@ func createWorker(t *testing.T, service *workflowservicetest.MockClient) Worker
309335
}).AnyTimes()
310336

311337
activityTask := &s.PollForActivityTaskResponse{}
312-
service.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), callOptions...).Return(activityTask, nil).AnyTimes()
338+
expectedActivitiesPerSecond := activitiesPerSecond
339+
if expectedActivitiesPerSecond == 0.0 {
340+
expectedActivitiesPerSecond = _defaultTaskListActivitiesPerSecond
341+
}
342+
service.EXPECT().PollForActivityTask(
343+
gomock.Any(), ofPollForActivityTaskRequest(expectedActivitiesPerSecond), callOptions...,
344+
).Return(activityTask, nil).AnyTimes()
313345
service.EXPECT().RespondActivityTaskCompleted(gomock.Any(), gomock.Any(), callOptions...).Return(nil).AnyTimes()
314346

315347
decisionTask := &s.PollForDecisionTaskResponse{}
@@ -318,7 +350,8 @@ func createWorker(t *testing.T, service *workflowservicetest.MockClient) Worker
318350

319351
// Configure worker options.
320352
workerOptions := WorkerOptions{}
321-
workerOptions.MaxActivityExecutionPerSecond = 20
353+
workerOptions.WorkerActivitiesPerSecond = 20
354+
workerOptions.TaskListActivitiesPerSecond = activitiesPerSecond
322355

323356
// Start Worker.
324357
worker := NewWorker(

internal/worker.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,22 @@ type (
4949
// default: defaultMaxConcurrentActivityExecutionSize(1k)
5050
MaxConcurrentActivityExecutionSize int
5151

52-
// Optional: Sets the rate limiting on number of activities that can be executed per second. Notice that the
53-
// number is represented in float, so that you can set it to less than 1 if needed. For example, set the number
54-
// to 0.1 means you want your activity to be executed once for every 10 seconds. This can be used to protect
55-
// down stream services from flooding.
56-
// The zero value of this uses the default value.
57-
// default: defaultMaxActivityExecutionRate(100k)
58-
// Warning: activity's StartToCloseTimeout starts ticking even if a task is blocked due to rate limiting.
59-
MaxActivityExecutionPerSecond float64
52+
// Optional: Sets the rate limiting on number of activities that can be executed per second per
53+
// worker. This can be used to limit resources used by the worker.
54+
// Notice that the number is represented in float, so that you can set it to less than
55+
// 1 if needed. For example, set the number to 0.1 means you want your activity to be executed
56+
// once for every 10 seconds. This can be used to protect down stream services from flooding.
57+
// The zero value of this uses the default value. Default: 100k
58+
WorkerActivitiesPerSecond float64
59+
60+
// Optional: Sets the rate limiting on number of activities that can be executed per second.
61+
// This is managed by the server and controls activities per second for your entire tasklist
62+
// whereas WorkerActivityTasksPerSecond controls activities only per worker.
63+
// Notice that the number is represented in float, so that you can set it to less than
64+
// 1 if needed. For example, set the number to 0.1 means you want your activity to be executed
65+
// once for every 10 seconds. This can be used to protect down stream services from flooding.
66+
// The zero value of this uses the default value. Default: 100k
67+
TaskListActivitiesPerSecond float64
6068

6169
// Optional: if the activities need auto heart beating for those activities
6270
// by the framework

0 commit comments

Comments
 (0)