Skip to content

Commit 3304410

Browse files
authored
Bug bash feedback (#111)
* Implemented options.getMaxWorkflowThreads value propagation to executor * Pull request comments
1 parent 77fccb2 commit 3304410

File tree

5 files changed

+51
-11
lines changed

5 files changed

+51
-11
lines changed

src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.uber.cadence.internal.worker.WorkflowWorker;
2626

2727
import java.util.concurrent.ArrayBlockingQueue;
28+
import java.util.concurrent.SynchronousQueue;
2829
import java.util.concurrent.ThreadPoolExecutor;
2930
import java.util.concurrent.TimeUnit;
3031

@@ -38,9 +39,10 @@ public class SyncWorkflowWorker {
3839
private final POJOWorkflowImplementationFactory factory;
3940
private final SingleWorkerOptions options;
4041

41-
public SyncWorkflowWorker(WorkflowService.Iface service, String domain, String taskList, SingleWorkerOptions options) {
42-
ThreadPoolExecutor workflowThreadPool = new ThreadPoolExecutor(1000, 1000,
43-
10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10000));
42+
public SyncWorkflowWorker(WorkflowService.Iface service, String domain, String taskList, SingleWorkerOptions options,
43+
int workflowThreadPoolSize) {
44+
ThreadPoolExecutor workflowThreadPool = new ThreadPoolExecutor(workflowThreadPoolSize, workflowThreadPoolSize,
45+
10, TimeUnit.SECONDS, new SynchronousQueue<>());
4446
factory = new POJOWorkflowImplementationFactory(options.getDataConverter(), workflowThreadPool);
4547
taskHandler = new ReplayDecisionTaskHandler(domain, factory);
4648
worker = new WorkflowWorker(service, domain, taskList, options, taskHandler);

src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.ExecutionException;
2828
import java.util.concurrent.ExecutorService;
2929
import java.util.concurrent.Future;
30+
import java.util.concurrent.RejectedExecutionException;
3031
import java.util.function.Consumer;
3132
import java.util.function.Supplier;
3233

@@ -188,7 +189,13 @@ public void start() {
188189
}
189190
log.trace(String.format("Workflow thread \"%s\" started", getName()));
190191
context.setStatus(Status.RUNNING);
191-
taskFuture = threadPool.submit(task);
192+
try {
193+
taskFuture = threadPool.submit(task);
194+
} catch (RejectedExecutionException e) {
195+
throw new Error("Not enough threads to execute workflows. " +
196+
"If this message appears consistently either WorkerOptions.maxConcurrentWorklfowExecutionSize " +
197+
"should be decreased or WorkerOptions.maxWorkflowThreads increased.");
198+
}
192199
}
193200

194201
public WorkflowThreadContext getContext() {

src/main/java/com/uber/cadence/internal/worker/Poller.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.slf4j.Logger;
2222
import org.slf4j.LoggerFactory;
2323

24+
import java.util.concurrent.ArrayBlockingQueue;
25+
import java.util.concurrent.BlockingQueue;
2426
import java.util.concurrent.CountDownLatch;
2527
import java.util.concurrent.LinkedBlockingQueue;
2628
import java.util.concurrent.ThreadPoolExecutor;
@@ -118,9 +120,12 @@ public void start() {
118120
options.getMaximumPollRateIntervalMilliseconds());
119121
}
120122

123+
// It is important to pass blocking queue of at least options.getPollThreadCount() capacity.
124+
// As task enqueues next task the buffering is needed to queue task until the previous one
125+
// releases a thread.
121126
pollExecutor = new ThreadPoolExecutor(options.getPollThreadCount(), options.getPollThreadCount(),
122127
1, TimeUnit.SECONDS,
123-
new LinkedBlockingQueue<>(options.getPollThreadCount()));
128+
new ArrayBlockingQueue<>(options.getPollThreadCount()));
124129
pollExecutor.setThreadFactory(new ExecutorThreadFactory(options.getPollThreadNamePrefix(),
125130
options.getUncaughtExceptionHandler()));
126131

src/main/java/com/uber/cadence/internal/worker/PollerOptions.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import java.time.Duration;
2121

2222
/**
23-
* TODO: Switch to Duration.
23+
* Options for component that polls Cadence task lists for tasks.
2424
*/
2525
public final class PollerOptions {
2626

@@ -59,41 +59,66 @@ public Builder(PollerOptions o) {
5959
this.uncaughtExceptionHandler = o.getUncaughtExceptionHandler();
6060
}
6161

62+
/**
63+
* Defines interval for measuring poll rate. Larger the interval more spiky can be the load.
64+
*/
6265
public Builder setMaximumPollRateIntervalMilliseconds(int maximumPollRateIntervalMilliseconds) {
6366
this.maximumPollRateIntervalMilliseconds = maximumPollRateIntervalMilliseconds;
6467
return this;
6568
}
6669

70+
/**
71+
* Maximum rate of polling. Measured in the interval set through {@link #setMaximumPollRateIntervalMilliseconds(int)}.
72+
*/
6773
public Builder setMaximumPollRatePerSecond(double maximumPollRatePerSecond) {
6874
this.maximumPollRatePerSecond = maximumPollRatePerSecond;
6975
return this;
7076
}
7177

78+
/**
79+
* Coefficient to use when calculating exponential delay in case of failures
80+
*/
7281
public Builder setPollBackoffCoefficient(double pollBackoffCoefficient) {
7382
this.pollBackoffCoefficient = pollBackoffCoefficient;
7483
return this;
7584
}
7685

86+
/**
87+
* Initial delay in case of failure. If backoff coefficient is 1 then it would be the constant delay
88+
* between failing polls.
89+
*/
7790
public Builder setPollBackoffInitialInterval(Duration pollBackoffInitialInterval) {
7891
this.pollBackoffInitialInterval = pollBackoffInitialInterval;
7992
return this;
8093
}
8194

95+
/**
96+
* Maximum interval between polls in case of failures.
97+
*/
8298
public Builder setPollBackoffMaximumInterval(Duration pollBackoffMaximumInterval) {
8399
this.pollBackoffMaximumInterval = pollBackoffMaximumInterval;
84100
return this;
85101
}
86102

103+
/**
104+
* Number of parallel polling threads.
105+
*/
87106
public Builder setPollThreadCount(int pollThreadCount) {
88107
this.pollThreadCount = pollThreadCount;
89108
return this;
90109
}
91110

111+
/**
112+
* Called to report unexpected exceptions in the poller threads.
113+
*/
92114
public Builder setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
93115
this.uncaughtExceptionHandler = uncaughtExceptionHandler;
94116
return this;
95117
}
96118

119+
/**
120+
* Prefix to use when naming poller threads.
121+
*/
97122
public Builder setPollThreadNamePrefix(String pollThreadNamePrefix) {
98123
this.pollThreadNamePrefix = pollThreadNamePrefix;
99124
return this;

src/main/java/com/uber/cadence/worker/Worker.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ public Worker(WorkflowService.Iface service, String domain, String taskList, Wor
105105
}
106106
SingleWorkerOptions workflowOptions = toWorkflowOptions(options);
107107
if (!options.isDisableWorkflowWorker()) {
108-
workflowWorker = new SyncWorkflowWorker(service, domain, taskList, workflowOptions);
108+
workflowWorker = new SyncWorkflowWorker(service, domain, taskList, workflowOptions,
109+
options.getMaxWorkflowThreads());
109110
} else {
110111
workflowWorker = null;
111112
}
@@ -134,7 +135,7 @@ private SingleWorkerOptions toWorkflowOptions(WorkerOptions options) {
134135
}
135136

136137
/**
137-
* Register workflow implementation classes with a worker.
138+
* Register workflow implementation classes with a worker. Overwrites previously registered types.
138139
* A workflow implementation class must implement at least one interface with a method annotated with
139140
* {@link com.uber.cadence.workflow.WorkflowMethod}. That method becomes a workflow type
140141
* that this worker supports.
@@ -151,7 +152,7 @@ public void registerWorkflowImplementationTypes(Class<?>... workflowImplementati
151152
}
152153

153154
/**
154-
* Register activity implementation objects with a worker.
155+
* Register activity implementation objects with a worker. Overwrites previously registered objects.
155156
* As activities are reentrant and stateless only one instance per activity type
156157
* is registered.
157158
* <p>
@@ -164,11 +165,11 @@ public void registerActivitiesImplementations(Object... activityImplementations)
164165
if (activityWorker == null) {
165166
throw new IllegalStateException("disableActivityWorker is set in worker options");
166167
}
167-
checkStarted();
168+
checkNotStarted();
168169
activityWorker.setActivitiesImplementation(activityImplementations);
169170
}
170171

171-
private void checkStarted() {
172+
private void checkNotStarted() {
172173
if (started.get()) {
173174
throw new IllegalStateException("already started");
174175
}

0 commit comments

Comments
 (0)