From 910c4963dc8023984490ce3afa669a8442d4e9fd Mon Sep 17 00:00:00 2001 From: Liang Mei Date: Fri, 28 Jun 2019 16:52:31 -0700 Subject: [PATCH 1/3] Add activity tasklist rate limiter option to worker options --- .../internal/worker/ActivityPollTask.java | 9 ++- .../internal/worker/SingleWorkerOptions.java | 56 ++++++++++--------- .../uber/cadence/worker/WorkerOptions.java | 20 +++++++ 3 files changed, 53 insertions(+), 32 deletions(-) diff --git a/src/main/java/com/uber/cadence/internal/worker/ActivityPollTask.java b/src/main/java/com/uber/cadence/internal/worker/ActivityPollTask.java index 0576e1168..3d2efe5e2 100644 --- a/src/main/java/com/uber/cadence/internal/worker/ActivityPollTask.java +++ b/src/main/java/com/uber/cadence/internal/worker/ActivityPollTask.java @@ -17,11 +17,7 @@ package com.uber.cadence.internal.worker; -import com.uber.cadence.InternalServiceError; -import com.uber.cadence.PollForActivityTaskRequest; -import com.uber.cadence.PollForActivityTaskResponse; -import com.uber.cadence.ServiceBusyError; -import com.uber.cadence.TaskList; +import com.uber.cadence.*; import com.uber.cadence.internal.metrics.MetricsType; import com.uber.cadence.serviceclient.IWorkflowService; import com.uber.m3.tally.Stopwatch; @@ -56,6 +52,9 @@ public ActivityWorker.MeasurableActivityTask poll() throws TException { pollRequest.setDomain(domain); pollRequest.setIdentity(options.getIdentity()); pollRequest.setTaskList(new TaskList().setName(taskList)); + TaskListMetadata metadata = new TaskListMetadata(); + metadata.setMaxTasksPerSecond(options.getTaskListActivitiesPerSecond()); + pollRequest.setTaskListMetadata(metadata); if (log.isDebugEnabled()) { log.debug("poll request begin: " + pollRequest); } diff --git a/src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java b/src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java index 3825ef906..643d95e74 100644 --- a/src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java +++ b/src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java @@ -30,20 +30,15 @@ public final class SingleWorkerOptions { public static final class Builder { private String identity; - private DataConverter dataConverter; - private int taskExecutorThreadPoolSize = 100; - + private double taskListActivitiesPerSecond; private PollerOptions pollerOptions; - /** TODO: Dynamic expiration based on activity timeout */ private RetryOptions reportCompletionRetryOptions; private RetryOptions reportFailureRetryOptions; - private Scope metricsScope; - private boolean enableLoggingInReplay; public Builder() {} @@ -52,6 +47,7 @@ public Builder(SingleWorkerOptions options) { this.identity = options.getIdentity(); this.dataConverter = options.getDataConverter(); this.pollerOptions = options.getPollerOptions(); + this.taskListActivitiesPerSecond = options.getTaskListActivitiesPerSecond(); this.taskExecutorThreadPoolSize = options.getTaskExecutorThreadPoolSize(); this.reportCompletionRetryOptions = options.getReportCompletionRetryOptions(); this.reportFailureRetryOptions = options.getReportFailureRetryOptions(); @@ -89,6 +85,21 @@ public Builder setEnableLoggingInReplay(boolean enableLoggingInReplay) { return this; } + public Builder setTaskListActivitiesPerSecond(double taskListActivitiesPerSecond) { + this.taskListActivitiesPerSecond = taskListActivitiesPerSecond; + return this; + } + + public Builder setReportCompletionRetryOptions(RetryOptions reportCompletionRetryOptions) { + this.reportCompletionRetryOptions = reportCompletionRetryOptions; + return this; + } + + public Builder setReportFailureRetryOptions(RetryOptions reportFailureRetryOptions) { + this.reportFailureRetryOptions = reportFailureRetryOptions; + return this; + } + public SingleWorkerOptions build() { if (reportCompletionRetryOptions == null) { reportCompletionRetryOptions = Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS; @@ -119,44 +130,30 @@ public SingleWorkerOptions build() { identity, dataConverter, taskExecutorThreadPoolSize, + taskListActivitiesPerSecond, pollerOptions, reportCompletionRetryOptions, reportFailureRetryOptions, metricsScope, enableLoggingInReplay); } - - public Builder setReportCompletionRetryOptions(RetryOptions reportCompletionRetryOptions) { - this.reportCompletionRetryOptions = reportCompletionRetryOptions; - return this; - } - - public Builder setReportFailureRetryOptions(RetryOptions reportFailureRetryOptions) { - this.reportFailureRetryOptions = reportFailureRetryOptions; - return this; - } } private final String identity; - private final DataConverter dataConverter; - private final int taskExecutorThreadPoolSize; - + private final double taskListActivitiesPerSecond; private final PollerOptions pollerOptions; - private final RetryOptions reportCompletionRetryOptions; - private final RetryOptions reportFailureRetryOptions; - private final Scope metricsScope; - private final boolean enableLoggingInReplay; private SingleWorkerOptions( String identity, DataConverter dataConverter, int taskExecutorThreadPoolSize, + double taskListActivitiesPerSecond, PollerOptions pollerOptions, RetryOptions reportCompletionRetryOptions, RetryOptions reportFailureRetryOptions, @@ -165,6 +162,7 @@ private SingleWorkerOptions( this.identity = identity; this.dataConverter = dataConverter; this.taskExecutorThreadPoolSize = taskExecutorThreadPoolSize; + this.taskListActivitiesPerSecond = taskListActivitiesPerSecond; this.pollerOptions = pollerOptions; this.reportCompletionRetryOptions = reportCompletionRetryOptions; this.reportFailureRetryOptions = reportFailureRetryOptions; @@ -180,22 +178,26 @@ public DataConverter getDataConverter() { return dataConverter; } - public int getTaskExecutorThreadPoolSize() { + int getTaskExecutorThreadPoolSize() { return taskExecutorThreadPoolSize; } - public PollerOptions getPollerOptions() { + PollerOptions getPollerOptions() { return pollerOptions; } - public RetryOptions getReportCompletionRetryOptions() { + RetryOptions getReportCompletionRetryOptions() { return reportCompletionRetryOptions; } - public RetryOptions getReportFailureRetryOptions() { + RetryOptions getReportFailureRetryOptions() { return reportFailureRetryOptions; } + double getTaskListActivitiesPerSecond() { + return taskListActivitiesPerSecond; + } + public Scope getMetricsScope() { return metricsScope; } diff --git a/src/main/java/com/uber/cadence/worker/WorkerOptions.java b/src/main/java/com/uber/cadence/worker/WorkerOptions.java index c191677eb..b74ad779d 100644 --- a/src/main/java/com/uber/cadence/worker/WorkerOptions.java +++ b/src/main/java/com/uber/cadence/worker/WorkerOptions.java @@ -40,6 +40,7 @@ public static final class Builder { private int maxConcurrentActivityExecutionSize = 100; private int maxConcurrentWorkflowExecutionSize = 50; private int maxConcurrentLocalActivityExecutionSize = 100; + private double taskListActivitiesPerSecond = 100000; private PollerOptions activityPollerOptions; private PollerOptions workflowPollerOptions; private RetryOptions reportActivityCompletionRetryOptions; @@ -186,6 +187,19 @@ public Builder setEnableLoggingInReplay(boolean enableLoggingInReplay) { return this; } + /** + * Optional: Sets the rate limiting on number of activities that can be executed per second. + * This is managed by the server and controls activities per second for your entire tasklist. + * Notice that the number is represented in double, so that you can set it to less than 1 if + * needed. For example, set the number to 0.1 means you want your activity to be executed once + * every 10 seconds. This can be used to protect down stream services from flooding. The zero + * value of this uses the default value. Default: 100k + */ + public Builder setTaskListActivitiesPerSecond(double taskListActivitiesPerSecond) { + this.taskListActivitiesPerSecond = taskListActivitiesPerSecond; + return this; + } + public WorkerOptions build() { if (identity == null) { identity = ManagementFactory.getRuntimeMXBean().getName(); @@ -204,6 +218,7 @@ public WorkerOptions build() { maxConcurrentActivityExecutionSize, maxConcurrentWorkflowExecutionSize, maxConcurrentLocalActivityExecutionSize, + taskListActivitiesPerSecond, activityPollerOptions, workflowPollerOptions, reportActivityCompletionRetryOptions, @@ -224,6 +239,7 @@ public WorkerOptions build() { private final int maxConcurrentActivityExecutionSize; private final int maxConcurrentWorkflowExecutionSize; private final int maxConcurrentLocalActivityExecutionSize; + private final double taskListActivitiesPerSecond; private final PollerOptions activityPollerOptions; private final PollerOptions workflowPollerOptions; private final RetryOptions reportActivityCompletionRetryOptions; @@ -243,6 +259,7 @@ private WorkerOptions( int maxConcurrentActivityExecutionSize, int maxConcurrentWorkflowExecutionSize, int maxConcurrentLocalActivityExecutionSize, + double taskListActivitiesPerSecond, PollerOptions activityPollerOptions, PollerOptions workflowPollerOptions, RetryOptions reportActivityCompletionRetryOptions, @@ -260,6 +277,7 @@ private WorkerOptions( this.maxConcurrentActivityExecutionSize = maxConcurrentActivityExecutionSize; this.maxConcurrentWorkflowExecutionSize = maxConcurrentWorkflowExecutionSize; this.maxConcurrentLocalActivityExecutionSize = maxConcurrentLocalActivityExecutionSize; + this.taskListActivitiesPerSecond = taskListActivitiesPerSecond; this.activityPollerOptions = activityPollerOptions; this.workflowPollerOptions = workflowPollerOptions; this.reportActivityCompletionRetryOptions = reportActivityCompletionRetryOptions; @@ -359,6 +377,8 @@ public String toString() { + maxConcurrentWorkflowExecutionSize + ", maxConcurrentLocalActivityExecutionSize=" + maxConcurrentLocalActivityExecutionSize + + ", taskListActivitiesPerSecond=" + + taskListActivitiesPerSecond + ", activityPollerOptions=" + activityPollerOptions + ", workflowPollerOptions=" From 51fd578b03f189b9c261096bfc836e89a3e64f4e Mon Sep 17 00:00:00 2001 From: Liang Mei Date: Fri, 28 Jun 2019 16:54:17 -0700 Subject: [PATCH 2/3] expand import --- .../com/uber/cadence/internal/worker/ActivityPollTask.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/uber/cadence/internal/worker/ActivityPollTask.java b/src/main/java/com/uber/cadence/internal/worker/ActivityPollTask.java index 3d2efe5e2..04bc4cf3a 100644 --- a/src/main/java/com/uber/cadence/internal/worker/ActivityPollTask.java +++ b/src/main/java/com/uber/cadence/internal/worker/ActivityPollTask.java @@ -17,7 +17,12 @@ package com.uber.cadence.internal.worker; -import com.uber.cadence.*; +import com.uber.cadence.InternalServiceError; +import com.uber.cadence.PollForActivityTaskRequest; +import com.uber.cadence.PollForActivityTaskResponse; +import com.uber.cadence.ServiceBusyError; +import com.uber.cadence.TaskList; +import com.uber.cadence.TaskListMetadata; import com.uber.cadence.internal.metrics.MetricsType; import com.uber.cadence.serviceclient.IWorkflowService; import com.uber.m3.tally.Stopwatch; From d84e80e077a029a3608002a2061904da1e6065e2 Mon Sep 17 00:00:00 2001 From: Liang Mei Date: Mon, 8 Jul 2019 11:48:13 -0700 Subject: [PATCH 3/3] Fix bug --- .../uber/cadence/internal/worker/ActivityPollTask.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/uber/cadence/internal/worker/ActivityPollTask.java b/src/main/java/com/uber/cadence/internal/worker/ActivityPollTask.java index 04bc4cf3a..74bfc6b89 100644 --- a/src/main/java/com/uber/cadence/internal/worker/ActivityPollTask.java +++ b/src/main/java/com/uber/cadence/internal/worker/ActivityPollTask.java @@ -57,9 +57,13 @@ public ActivityWorker.MeasurableActivityTask poll() throws TException { pollRequest.setDomain(domain); pollRequest.setIdentity(options.getIdentity()); pollRequest.setTaskList(new TaskList().setName(taskList)); - TaskListMetadata metadata = new TaskListMetadata(); - metadata.setMaxTasksPerSecond(options.getTaskListActivitiesPerSecond()); - pollRequest.setTaskListMetadata(metadata); + + if (options.getTaskListActivitiesPerSecond() > 0) { + TaskListMetadata metadata = new TaskListMetadata(); + metadata.setMaxTasksPerSecond(options.getTaskListActivitiesPerSecond()); + pollRequest.setTaskListMetadata(metadata); + } + if (log.isDebugEnabled()) { log.debug("poll request begin: " + pollRequest); }