diff --git a/src/main/java/com/uber/cadence/internal/worker/ActivityPollTask.java b/src/main/java/com/uber/cadence/internal/worker/ActivityPollTask.java index 0576e1168..74bfc6b89 100644 --- a/src/main/java/com/uber/cadence/internal/worker/ActivityPollTask.java +++ b/src/main/java/com/uber/cadence/internal/worker/ActivityPollTask.java @@ -22,6 +22,7 @@ import com.uber.cadence.PollForActivityTaskResponse; import com.uber.cadence.ServiceBusyError; import com.uber.cadence.TaskList; +import com.uber.cadence.TaskListMetadata; import com.uber.cadence.internal.metrics.MetricsType; import com.uber.cadence.serviceclient.IWorkflowService; import com.uber.m3.tally.Stopwatch; @@ -56,6 +57,13 @@ public ActivityWorker.MeasurableActivityTask poll() throws TException { pollRequest.setDomain(domain); pollRequest.setIdentity(options.getIdentity()); pollRequest.setTaskList(new TaskList().setName(taskList)); + + if (options.getTaskListActivitiesPerSecond() > 0) { + TaskListMetadata metadata = new TaskListMetadata(); + metadata.setMaxTasksPerSecond(options.getTaskListActivitiesPerSecond()); + pollRequest.setTaskListMetadata(metadata); + } + if (log.isDebugEnabled()) { log.debug("poll request begin: " + pollRequest); } diff --git a/src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java b/src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java index 3825ef906..643d95e74 100644 --- a/src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java +++ b/src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java @@ -30,20 +30,15 @@ public final class SingleWorkerOptions { public static final class Builder { private String identity; - private DataConverter dataConverter; - private int taskExecutorThreadPoolSize = 100; - + private double taskListActivitiesPerSecond; private PollerOptions pollerOptions; - /** TODO: Dynamic expiration based on activity timeout */ private RetryOptions reportCompletionRetryOptions; private RetryOptions reportFailureRetryOptions; - private Scope metricsScope; - private boolean enableLoggingInReplay; public Builder() {} @@ -52,6 +47,7 @@ public Builder(SingleWorkerOptions options) { this.identity = options.getIdentity(); this.dataConverter = options.getDataConverter(); this.pollerOptions = options.getPollerOptions(); + this.taskListActivitiesPerSecond = options.getTaskListActivitiesPerSecond(); this.taskExecutorThreadPoolSize = options.getTaskExecutorThreadPoolSize(); this.reportCompletionRetryOptions = options.getReportCompletionRetryOptions(); this.reportFailureRetryOptions = options.getReportFailureRetryOptions(); @@ -89,6 +85,21 @@ public Builder setEnableLoggingInReplay(boolean enableLoggingInReplay) { return this; } + public Builder setTaskListActivitiesPerSecond(double taskListActivitiesPerSecond) { + this.taskListActivitiesPerSecond = taskListActivitiesPerSecond; + return this; + } + + public Builder setReportCompletionRetryOptions(RetryOptions reportCompletionRetryOptions) { + this.reportCompletionRetryOptions = reportCompletionRetryOptions; + return this; + } + + public Builder setReportFailureRetryOptions(RetryOptions reportFailureRetryOptions) { + this.reportFailureRetryOptions = reportFailureRetryOptions; + return this; + } + public SingleWorkerOptions build() { if (reportCompletionRetryOptions == null) { reportCompletionRetryOptions = Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS; @@ -119,44 +130,30 @@ public SingleWorkerOptions build() { identity, dataConverter, taskExecutorThreadPoolSize, + taskListActivitiesPerSecond, pollerOptions, reportCompletionRetryOptions, reportFailureRetryOptions, metricsScope, enableLoggingInReplay); } - - public Builder setReportCompletionRetryOptions(RetryOptions reportCompletionRetryOptions) { - this.reportCompletionRetryOptions = reportCompletionRetryOptions; - return this; - } - - public Builder setReportFailureRetryOptions(RetryOptions reportFailureRetryOptions) { - this.reportFailureRetryOptions = reportFailureRetryOptions; - return this; - } } private final String identity; - private final DataConverter dataConverter; - private final int taskExecutorThreadPoolSize; - + private final double taskListActivitiesPerSecond; private final PollerOptions pollerOptions; - private final RetryOptions reportCompletionRetryOptions; - private final RetryOptions reportFailureRetryOptions; - private final Scope metricsScope; - private final boolean enableLoggingInReplay; private SingleWorkerOptions( String identity, DataConverter dataConverter, int taskExecutorThreadPoolSize, + double taskListActivitiesPerSecond, PollerOptions pollerOptions, RetryOptions reportCompletionRetryOptions, RetryOptions reportFailureRetryOptions, @@ -165,6 +162,7 @@ private SingleWorkerOptions( this.identity = identity; this.dataConverter = dataConverter; this.taskExecutorThreadPoolSize = taskExecutorThreadPoolSize; + this.taskListActivitiesPerSecond = taskListActivitiesPerSecond; this.pollerOptions = pollerOptions; this.reportCompletionRetryOptions = reportCompletionRetryOptions; this.reportFailureRetryOptions = reportFailureRetryOptions; @@ -180,22 +178,26 @@ public DataConverter getDataConverter() { return dataConverter; } - public int getTaskExecutorThreadPoolSize() { + int getTaskExecutorThreadPoolSize() { return taskExecutorThreadPoolSize; } - public PollerOptions getPollerOptions() { + PollerOptions getPollerOptions() { return pollerOptions; } - public RetryOptions getReportCompletionRetryOptions() { + RetryOptions getReportCompletionRetryOptions() { return reportCompletionRetryOptions; } - public RetryOptions getReportFailureRetryOptions() { + RetryOptions getReportFailureRetryOptions() { return reportFailureRetryOptions; } + double getTaskListActivitiesPerSecond() { + return taskListActivitiesPerSecond; + } + public Scope getMetricsScope() { return metricsScope; } diff --git a/src/main/java/com/uber/cadence/worker/WorkerOptions.java b/src/main/java/com/uber/cadence/worker/WorkerOptions.java index c191677eb..b74ad779d 100644 --- a/src/main/java/com/uber/cadence/worker/WorkerOptions.java +++ b/src/main/java/com/uber/cadence/worker/WorkerOptions.java @@ -40,6 +40,7 @@ public static final class Builder { private int maxConcurrentActivityExecutionSize = 100; private int maxConcurrentWorkflowExecutionSize = 50; private int maxConcurrentLocalActivityExecutionSize = 100; + private double taskListActivitiesPerSecond = 100000; private PollerOptions activityPollerOptions; private PollerOptions workflowPollerOptions; private RetryOptions reportActivityCompletionRetryOptions; @@ -186,6 +187,19 @@ public Builder setEnableLoggingInReplay(boolean enableLoggingInReplay) { return this; } + /** + * Optional: Sets the rate limiting on number of activities that can be executed per second. + * This is managed by the server and controls activities per second for your entire tasklist. + * Notice that the number is represented in double, so that you can set it to less than 1 if + * needed. For example, set the number to 0.1 means you want your activity to be executed once + * every 10 seconds. This can be used to protect down stream services from flooding. The zero + * value of this uses the default value. Default: 100k + */ + public Builder setTaskListActivitiesPerSecond(double taskListActivitiesPerSecond) { + this.taskListActivitiesPerSecond = taskListActivitiesPerSecond; + return this; + } + public WorkerOptions build() { if (identity == null) { identity = ManagementFactory.getRuntimeMXBean().getName(); @@ -204,6 +218,7 @@ public WorkerOptions build() { maxConcurrentActivityExecutionSize, maxConcurrentWorkflowExecutionSize, maxConcurrentLocalActivityExecutionSize, + taskListActivitiesPerSecond, activityPollerOptions, workflowPollerOptions, reportActivityCompletionRetryOptions, @@ -224,6 +239,7 @@ public WorkerOptions build() { private final int maxConcurrentActivityExecutionSize; private final int maxConcurrentWorkflowExecutionSize; private final int maxConcurrentLocalActivityExecutionSize; + private final double taskListActivitiesPerSecond; private final PollerOptions activityPollerOptions; private final PollerOptions workflowPollerOptions; private final RetryOptions reportActivityCompletionRetryOptions; @@ -243,6 +259,7 @@ private WorkerOptions( int maxConcurrentActivityExecutionSize, int maxConcurrentWorkflowExecutionSize, int maxConcurrentLocalActivityExecutionSize, + double taskListActivitiesPerSecond, PollerOptions activityPollerOptions, PollerOptions workflowPollerOptions, RetryOptions reportActivityCompletionRetryOptions, @@ -260,6 +277,7 @@ private WorkerOptions( this.maxConcurrentActivityExecutionSize = maxConcurrentActivityExecutionSize; this.maxConcurrentWorkflowExecutionSize = maxConcurrentWorkflowExecutionSize; this.maxConcurrentLocalActivityExecutionSize = maxConcurrentLocalActivityExecutionSize; + this.taskListActivitiesPerSecond = taskListActivitiesPerSecond; this.activityPollerOptions = activityPollerOptions; this.workflowPollerOptions = workflowPollerOptions; this.reportActivityCompletionRetryOptions = reportActivityCompletionRetryOptions; @@ -359,6 +377,8 @@ public String toString() { + maxConcurrentWorkflowExecutionSize + ", maxConcurrentLocalActivityExecutionSize=" + maxConcurrentLocalActivityExecutionSize + + ", taskListActivitiesPerSecond=" + + taskListActivitiesPerSecond + ", activityPollerOptions=" + activityPollerOptions + ", workflowPollerOptions="