Skip to content

Commit 614a82a

Browse files
authored
fix rate limiter bug (#230)
* fix rate limiter bug * add comments on how to set rate to less than 1
1 parent 1a0a0b7 commit 614a82a

File tree

5 files changed

+49
-67
lines changed

5 files changed

+49
-67
lines changed

internal_public.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -94,16 +94,15 @@ func NewActivityTaskWorker(
9494
) Worker {
9595
wOptions := fillWorkerOptionsDefaults(options)
9696
workerParams := workerExecutionParameters{
97-
TaskList: taskList,
98-
ConcurrentPollRoutineSize: defaultConcurrentPollRoutineSize,
99-
ConcurrentActivityExecutionSize: wOptions.MaxConcurrentActivityExecutionSize,
100-
MaxActivityExecutionRate: wOptions.MaxActivityExecutionRate,
101-
MaxActivityExecutionRateRefreshDuration: wOptions.MaxActivityExecutionRateRefreshDuration,
102-
Identity: wOptions.Identity,
103-
MetricsScope: wOptions.MetricsScope,
104-
Logger: wOptions.Logger,
105-
EnableLoggingInReplay: wOptions.EnableLoggingInReplay,
106-
UserContext: wOptions.BackgroundActivityContext,
97+
TaskList: taskList,
98+
ConcurrentPollRoutineSize: defaultConcurrentPollRoutineSize,
99+
ConcurrentActivityExecutionSize: wOptions.MaxConcurrentActivityExecutionSize,
100+
MaxActivityExecutionPerSecond: wOptions.MaxActivityExecutionPerSecond,
101+
Identity: wOptions.Identity,
102+
MetricsScope: wOptions.MetricsScope,
103+
Logger: wOptions.Logger,
104+
EnableLoggingInReplay: wOptions.EnableLoggingInReplay,
105+
UserContext: wOptions.BackgroundActivityContext,
107106
}
108107

109108
processTestTags(&wOptions, &workerParams)

internal_worker.go

Lines changed: 25 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import (
3333
"strconv"
3434
"strings"
3535
"sync"
36-
"time"
3736

3837
"github.com/apache/thrift/lib/go/thrift"
3938
"github.com/uber-go/tally"
@@ -107,11 +106,8 @@ type (
107106
// Defines how many concurrent executions for task list by this worker.
108107
ConcurrentActivityExecutionSize int
109108

110-
// Defines rate limiting on number of activity tasks that can be executed per MaxActivityExecutionRateRefreshDuration.
111-
MaxActivityExecutionRate int
112-
113-
// Defines refresh duration for rate limit. If not specified, it uses 1s as default.
114-
MaxActivityExecutionRateRefreshDuration time.Duration
109+
// Defines rate limiting on number of activity tasks that can be executed per second.
110+
MaxActivityExecutionPerSecond float64
115111

116112
// User can provide an identity for the debuggability. If not provided the framework has
117113
// a default option.
@@ -226,13 +222,12 @@ func newWorkflowTaskWorkerInternal(
226222
params,
227223
)
228224
worker := newBaseWorker(baseWorkerOptions{
229-
pollerCount: params.ConcurrentPollRoutineSize,
230-
maxConcurrentTask: defaultMaxConcurrentWorkflowExecutionSize,
231-
maxTaskRate: defaultMaxWorkflowExecutionRate,
232-
maxTaskRateRefreshDuration: time.Second,
233-
taskWorker: poller,
234-
identity: params.Identity,
235-
workerType: "DecisionWorker"},
225+
pollerCount: params.ConcurrentPollRoutineSize,
226+
maxConcurrentTask: defaultMaxConcurrentWorkflowExecutionSize,
227+
maxTaskPerSecond: defaultMaxWorkflowExecutionRate,
228+
taskWorker: poller,
229+
identity: params.Identity,
230+
workerType: "DecisionWorker"},
236231
params.Logger,
237232
params.MetricsScope,
238233
)
@@ -306,13 +301,12 @@ func newActivityTaskWorker(
306301

307302
base := newBaseWorker(
308303
baseWorkerOptions{
309-
pollerCount: workerParams.ConcurrentPollRoutineSize,
310-
maxConcurrentTask: workerParams.ConcurrentActivityExecutionSize,
311-
maxTaskRate: workerParams.MaxActivityExecutionRate,
312-
maxTaskRateRefreshDuration: workerParams.MaxActivityExecutionRateRefreshDuration,
313-
taskWorker: poller,
314-
identity: workerParams.Identity,
315-
workerType: "ActivityWorker",
304+
pollerCount: workerParams.ConcurrentPollRoutineSize,
305+
maxConcurrentTask: workerParams.ConcurrentActivityExecutionSize,
306+
maxTaskPerSecond: workerParams.MaxActivityExecutionPerSecond,
307+
taskWorker: poller,
308+
identity: workerParams.Identity,
309+
workerType: "ActivityWorker",
316310
},
317311
workerParams.Logger,
318312
workerParams.MetricsScope,
@@ -1005,16 +999,15 @@ func newAggregatedWorker(
1005999
) (worker Worker) {
10061000
wOptions := fillWorkerOptionsDefaults(options)
10071001
workerParams := workerExecutionParameters{
1008-
TaskList: taskList,
1009-
ConcurrentPollRoutineSize: defaultConcurrentPollRoutineSize,
1010-
ConcurrentActivityExecutionSize: wOptions.MaxConcurrentActivityExecutionSize,
1011-
MaxActivityExecutionRate: wOptions.MaxActivityExecutionRate,
1012-
MaxActivityExecutionRateRefreshDuration: wOptions.MaxActivityExecutionRateRefreshDuration,
1013-
Identity: wOptions.Identity,
1014-
MetricsScope: wOptions.MetricsScope,
1015-
Logger: wOptions.Logger,
1016-
EnableLoggingInReplay: wOptions.EnableLoggingInReplay,
1017-
UserContext: wOptions.BackgroundActivityContext,
1002+
TaskList: taskList,
1003+
ConcurrentPollRoutineSize: defaultConcurrentPollRoutineSize,
1004+
ConcurrentActivityExecutionSize: wOptions.MaxConcurrentActivityExecutionSize,
1005+
MaxActivityExecutionPerSecond: wOptions.MaxActivityExecutionPerSecond,
1006+
Identity: wOptions.Identity,
1007+
MetricsScope: wOptions.MetricsScope,
1008+
Logger: wOptions.Logger,
1009+
EnableLoggingInReplay: wOptions.EnableLoggingInReplay,
1010+
UserContext: wOptions.BackgroundActivityContext,
10181011
}
10191012

10201013
ensureRequiredParams(&workerParams)
@@ -1227,11 +1220,8 @@ func fillWorkerOptionsDefaults(options WorkerOptions) WorkerOptions {
12271220
if options.MaxConcurrentActivityExecutionSize == 0 {
12281221
options.MaxConcurrentActivityExecutionSize = defaultMaxConcurrentActivityExecutionSize
12291222
}
1230-
if options.MaxActivityExecutionRate == 0 {
1231-
options.MaxActivityExecutionRate = defaultMaxActivityExecutionRate
1232-
}
1233-
if options.MaxActivityExecutionRateRefreshDuration == 0 {
1234-
options.MaxActivityExecutionRateRefreshDuration = time.Second
1223+
if options.MaxActivityExecutionPerSecond == 0 {
1224+
options.MaxActivityExecutionPerSecond = defaultMaxActivityExecutionRate
12351225
}
12361226
return options
12371227
}

internal_worker_base.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,12 @@ type (
8282

8383
// baseWorkerOptions options to configure base worker.
8484
baseWorkerOptions struct {
85-
pollerCount int
86-
maxConcurrentTask int
87-
maxTaskRate int
88-
maxTaskRateRefreshDuration time.Duration
89-
taskWorker taskPoller
90-
identity string
91-
workerType string
85+
pollerCount int
86+
maxConcurrentTask int
87+
maxTaskPerSecond float64
88+
taskWorker taskPoller
89+
identity string
90+
workerType string
9291
}
9392

9493
// baseWorker that wraps worker activities.
@@ -127,8 +126,8 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t
127126
return &baseWorker{
128127
options: options,
129128
shutdownCh: make(chan struct{}),
130-
pollLimiter: rate.NewLimiter(rate.Every(time.Millisecond*100), 100),
131-
taskLimiter: rate.NewLimiter(rate.Every(options.maxTaskRateRefreshDuration), options.maxTaskRate),
129+
pollLimiter: rate.NewLimiter(rate.Limit(1000), 1),
130+
taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1),
132131
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
133132
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
134133
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
@@ -160,8 +159,7 @@ func (bw *baseWorker) Start() {
160159
bw.logger.Info("Started Worker",
161160
zap.Int("PollerCount", bw.options.pollerCount),
162161
zap.Int("MaxConcurrentTask", bw.options.maxConcurrentTask),
163-
zap.Int("MaxTaskRate", bw.options.maxTaskRate),
164-
zap.Duration("MaxTaskRateRefreshDuration", bw.options.maxTaskRateRefreshDuration),
162+
zap.Float64("MaxTaskPerSecond", bw.options.maxTaskPerSecond),
165163
)
166164
})
167165
}

internal_worker_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ func createWorker(t *testing.T, service *mocks.TChanWorkflowService) Worker {
340340

341341
// Configure worker options.
342342
workerOptions := WorkerOptions{}
343-
workerOptions.MaxActivityExecutionRate = 20
343+
workerOptions.MaxActivityExecutionPerSecond = 20
344344

345345
// Start Worker.
346346
worker := NewWorker(

worker.go

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ package cadence
2222

2323
import (
2424
"context"
25-
"time"
2625

2726
"github.com/uber-go/tally"
2827

@@ -53,18 +52,14 @@ type (
5352
// default: defaultMaxConcurrentActivityExecutionSize(1k)
5453
MaxConcurrentActivityExecutionSize int
5554

56-
// Optional: Sets the rate limiting on number of activities that can be executed per refresh duration.
57-
// This can be used to protect down stream services from flooding.
55+
// Optional: Sets the rate limiting on number of activities that can be executed per second. Notice that the
56+
// number is represented in float, so that you can set it to less than 1 if needed. For example, set the number
57+
// to 0.1 means you want your activity to be executed once for every 10 seconds. This can be used to protect
58+
// down stream services from flooding.
5859
// The zero value of this uses the default value.
5960
// default: defaultMaxActivityExecutionRate(100k)
6061
// Warning: activity's StartToCloseTimeout starts ticking even if a task is blocked due to rate limiting.
61-
MaxActivityExecutionRate int
62-
63-
// Optional: Sets the refresh duration for rate limit. If not specified, it uses 1s as default.
64-
// Use this to fine tune your rate limiter. For example, you could set this duration to 10s with max rate set to
65-
// 1 which means we rate limit the activity task to at most 1 per every 10s. You could also set this duration to
66-
// 100ms with max rate set wo 10 which means rate limit to 10 activity task execution per every 100ms.
67-
MaxActivityExecutionRateRefreshDuration time.Duration
62+
MaxActivityExecutionPerSecond float64
6863

6964
// Optional: if the activities need auto heart beating for those activities
7065
// by the framework

0 commit comments

Comments
 (0)