Skip to content

Commit d56612f

Browse files
authored
Add activity tasklist rate limiter option to worker options (#332)
1 parent 2c66816 commit d56612f

File tree

3 files changed

+57
-27
lines changed

3 files changed

+57
-27
lines changed

src/main/java/com/uber/cadence/internal/worker/ActivityPollTask.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.uber.cadence.PollForActivityTaskResponse;
2323
import com.uber.cadence.ServiceBusyError;
2424
import com.uber.cadence.TaskList;
25+
import com.uber.cadence.TaskListMetadata;
2526
import com.uber.cadence.internal.metrics.MetricsType;
2627
import com.uber.cadence.serviceclient.IWorkflowService;
2728
import com.uber.m3.tally.Stopwatch;
@@ -56,6 +57,13 @@ public ActivityWorker.MeasurableActivityTask poll() throws TException {
5657
pollRequest.setDomain(domain);
5758
pollRequest.setIdentity(options.getIdentity());
5859
pollRequest.setTaskList(new TaskList().setName(taskList));
60+
61+
if (options.getTaskListActivitiesPerSecond() > 0) {
62+
TaskListMetadata metadata = new TaskListMetadata();
63+
metadata.setMaxTasksPerSecond(options.getTaskListActivitiesPerSecond());
64+
pollRequest.setTaskListMetadata(metadata);
65+
}
66+
5967
if (log.isDebugEnabled()) {
6068
log.debug("poll request begin: " + pollRequest);
6169
}

src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -30,20 +30,15 @@ public final class SingleWorkerOptions {
3030
public static final class Builder {
3131

3232
private String identity;
33-
3433
private DataConverter dataConverter;
35-
3634
private int taskExecutorThreadPoolSize = 100;
37-
35+
private double taskListActivitiesPerSecond;
3836
private PollerOptions pollerOptions;
39-
4037
/** TODO: Dynamic expiration based on activity timeout */
4138
private RetryOptions reportCompletionRetryOptions;
4239

4340
private RetryOptions reportFailureRetryOptions;
44-
4541
private Scope metricsScope;
46-
4742
private boolean enableLoggingInReplay;
4843

4944
public Builder() {}
@@ -52,6 +47,7 @@ public Builder(SingleWorkerOptions options) {
5247
this.identity = options.getIdentity();
5348
this.dataConverter = options.getDataConverter();
5449
this.pollerOptions = options.getPollerOptions();
50+
this.taskListActivitiesPerSecond = options.getTaskListActivitiesPerSecond();
5551
this.taskExecutorThreadPoolSize = options.getTaskExecutorThreadPoolSize();
5652
this.reportCompletionRetryOptions = options.getReportCompletionRetryOptions();
5753
this.reportFailureRetryOptions = options.getReportFailureRetryOptions();
@@ -89,6 +85,21 @@ public Builder setEnableLoggingInReplay(boolean enableLoggingInReplay) {
8985
return this;
9086
}
9187

88+
public Builder setTaskListActivitiesPerSecond(double taskListActivitiesPerSecond) {
89+
this.taskListActivitiesPerSecond = taskListActivitiesPerSecond;
90+
return this;
91+
}
92+
93+
public Builder setReportCompletionRetryOptions(RetryOptions reportCompletionRetryOptions) {
94+
this.reportCompletionRetryOptions = reportCompletionRetryOptions;
95+
return this;
96+
}
97+
98+
public Builder setReportFailureRetryOptions(RetryOptions reportFailureRetryOptions) {
99+
this.reportFailureRetryOptions = reportFailureRetryOptions;
100+
return this;
101+
}
102+
92103
public SingleWorkerOptions build() {
93104
if (reportCompletionRetryOptions == null) {
94105
reportCompletionRetryOptions = Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS;
@@ -119,44 +130,30 @@ public SingleWorkerOptions build() {
119130
identity,
120131
dataConverter,
121132
taskExecutorThreadPoolSize,
133+
taskListActivitiesPerSecond,
122134
pollerOptions,
123135
reportCompletionRetryOptions,
124136
reportFailureRetryOptions,
125137
metricsScope,
126138
enableLoggingInReplay);
127139
}
128-
129-
public Builder setReportCompletionRetryOptions(RetryOptions reportCompletionRetryOptions) {
130-
this.reportCompletionRetryOptions = reportCompletionRetryOptions;
131-
return this;
132-
}
133-
134-
public Builder setReportFailureRetryOptions(RetryOptions reportFailureRetryOptions) {
135-
this.reportFailureRetryOptions = reportFailureRetryOptions;
136-
return this;
137-
}
138140
}
139141

140142
private final String identity;
141-
142143
private final DataConverter dataConverter;
143-
144144
private final int taskExecutorThreadPoolSize;
145-
145+
private final double taskListActivitiesPerSecond;
146146
private final PollerOptions pollerOptions;
147-
148147
private final RetryOptions reportCompletionRetryOptions;
149-
150148
private final RetryOptions reportFailureRetryOptions;
151-
152149
private final Scope metricsScope;
153-
154150
private final boolean enableLoggingInReplay;
155151

156152
private SingleWorkerOptions(
157153
String identity,
158154
DataConverter dataConverter,
159155
int taskExecutorThreadPoolSize,
156+
double taskListActivitiesPerSecond,
160157
PollerOptions pollerOptions,
161158
RetryOptions reportCompletionRetryOptions,
162159
RetryOptions reportFailureRetryOptions,
@@ -165,6 +162,7 @@ private SingleWorkerOptions(
165162
this.identity = identity;
166163
this.dataConverter = dataConverter;
167164
this.taskExecutorThreadPoolSize = taskExecutorThreadPoolSize;
165+
this.taskListActivitiesPerSecond = taskListActivitiesPerSecond;
168166
this.pollerOptions = pollerOptions;
169167
this.reportCompletionRetryOptions = reportCompletionRetryOptions;
170168
this.reportFailureRetryOptions = reportFailureRetryOptions;
@@ -180,22 +178,26 @@ public DataConverter getDataConverter() {
180178
return dataConverter;
181179
}
182180

183-
public int getTaskExecutorThreadPoolSize() {
181+
int getTaskExecutorThreadPoolSize() {
184182
return taskExecutorThreadPoolSize;
185183
}
186184

187-
public PollerOptions getPollerOptions() {
185+
PollerOptions getPollerOptions() {
188186
return pollerOptions;
189187
}
190188

191-
public RetryOptions getReportCompletionRetryOptions() {
189+
RetryOptions getReportCompletionRetryOptions() {
192190
return reportCompletionRetryOptions;
193191
}
194192

195-
public RetryOptions getReportFailureRetryOptions() {
193+
RetryOptions getReportFailureRetryOptions() {
196194
return reportFailureRetryOptions;
197195
}
198196

197+
double getTaskListActivitiesPerSecond() {
198+
return taskListActivitiesPerSecond;
199+
}
200+
199201
public Scope getMetricsScope() {
200202
return metricsScope;
201203
}

src/main/java/com/uber/cadence/worker/WorkerOptions.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public static final class Builder {
4040
private int maxConcurrentActivityExecutionSize = 100;
4141
private int maxConcurrentWorkflowExecutionSize = 50;
4242
private int maxConcurrentLocalActivityExecutionSize = 100;
43+
private double taskListActivitiesPerSecond = 100000;
4344
private PollerOptions activityPollerOptions;
4445
private PollerOptions workflowPollerOptions;
4546
private RetryOptions reportActivityCompletionRetryOptions;
@@ -186,6 +187,19 @@ public Builder setEnableLoggingInReplay(boolean enableLoggingInReplay) {
186187
return this;
187188
}
188189

190+
/**
191+
* Optional: Sets the rate limiting on number of activities that can be executed per second.
192+
* This is managed by the server and controls activities per second for your entire tasklist.
193+
* Notice that the number is represented in double, so that you can set it to less than 1 if
194+
* needed. For example, set the number to 0.1 means you want your activity to be executed once
195+
* every 10 seconds. This can be used to protect down stream services from flooding. The zero
196+
* value of this uses the default value. Default: 100k
197+
*/
198+
public Builder setTaskListActivitiesPerSecond(double taskListActivitiesPerSecond) {
199+
this.taskListActivitiesPerSecond = taskListActivitiesPerSecond;
200+
return this;
201+
}
202+
189203
public WorkerOptions build() {
190204
if (identity == null) {
191205
identity = ManagementFactory.getRuntimeMXBean().getName();
@@ -204,6 +218,7 @@ public WorkerOptions build() {
204218
maxConcurrentActivityExecutionSize,
205219
maxConcurrentWorkflowExecutionSize,
206220
maxConcurrentLocalActivityExecutionSize,
221+
taskListActivitiesPerSecond,
207222
activityPollerOptions,
208223
workflowPollerOptions,
209224
reportActivityCompletionRetryOptions,
@@ -224,6 +239,7 @@ public WorkerOptions build() {
224239
private final int maxConcurrentActivityExecutionSize;
225240
private final int maxConcurrentWorkflowExecutionSize;
226241
private final int maxConcurrentLocalActivityExecutionSize;
242+
private final double taskListActivitiesPerSecond;
227243
private final PollerOptions activityPollerOptions;
228244
private final PollerOptions workflowPollerOptions;
229245
private final RetryOptions reportActivityCompletionRetryOptions;
@@ -243,6 +259,7 @@ private WorkerOptions(
243259
int maxConcurrentActivityExecutionSize,
244260
int maxConcurrentWorkflowExecutionSize,
245261
int maxConcurrentLocalActivityExecutionSize,
262+
double taskListActivitiesPerSecond,
246263
PollerOptions activityPollerOptions,
247264
PollerOptions workflowPollerOptions,
248265
RetryOptions reportActivityCompletionRetryOptions,
@@ -260,6 +277,7 @@ private WorkerOptions(
260277
this.maxConcurrentActivityExecutionSize = maxConcurrentActivityExecutionSize;
261278
this.maxConcurrentWorkflowExecutionSize = maxConcurrentWorkflowExecutionSize;
262279
this.maxConcurrentLocalActivityExecutionSize = maxConcurrentLocalActivityExecutionSize;
280+
this.taskListActivitiesPerSecond = taskListActivitiesPerSecond;
263281
this.activityPollerOptions = activityPollerOptions;
264282
this.workflowPollerOptions = workflowPollerOptions;
265283
this.reportActivityCompletionRetryOptions = reportActivityCompletionRetryOptions;
@@ -359,6 +377,8 @@ public String toString() {
359377
+ maxConcurrentWorkflowExecutionSize
360378
+ ", maxConcurrentLocalActivityExecutionSize="
361379
+ maxConcurrentLocalActivityExecutionSize
380+
+ ", taskListActivitiesPerSecond="
381+
+ taskListActivitiesPerSecond
362382
+ ", activityPollerOptions="
363383
+ activityPollerOptions
364384
+ ", workflowPollerOptions="

0 commit comments

Comments
 (0)