Skip to content

Commit 47c9097

Browse files
authored
Remove disable activity and workflow worker options (#540)
- Workers will now always start, and poller will start only if types are registered. - Simplified worker start and shutdown logic. - Fixed a bug in worker.isSuspended() - Move common method implementations to base
1 parent a11b74f commit 47c9097

File tree

10 files changed

+127
-279
lines changed

10 files changed

+127
-279
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ license {
6868
header rootProject.file('license-header.txt')
6969
skipExistingHeaders true
7070
exclude 'com/uber/cadence/*.java' // generated code
71+
excludes(["**/*.json"])
7172
}
7273

7374
task initDlsSubmodule(type: Exec) {

src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,11 @@ public void setLocalActivitiesImplementation(Object... activitiesImplementation)
118118
@Override
119119
public void start() {
120120
workflowWorker.start();
121-
laWorker.start();
121+
// workflowWorker doesn't start if no types are registered with it. In that case we don't need
122+
// to start LocalActivity Worker.
123+
if (workflowWorker.isStarted()) {
124+
laWorker.start();
125+
}
122126
}
123127

124128
@Override

src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java

Lines changed: 3 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,10 @@
4747
import org.apache.thrift.TException;
4848
import org.slf4j.MDC;
4949

50-
public final class ActivityWorker implements SuspendableWorker {
50+
public final class ActivityWorker extends SuspendableWorkerBase {
5151

5252
private static final String POLL_THREAD_NAME_PREFIX = "Activity Poller taskList=";
5353

54-
private SuspendableWorker poller = new NoopSuspendableWorker();
5554
private final ActivityTaskHandler handler;
5655
private final IWorkflowService service;
5756
private final String domain;
@@ -83,63 +82,19 @@ public ActivityWorker(
8382
@Override
8483
public void start() {
8584
if (handler.isAnyTypeSupported()) {
86-
poller =
85+
SuspendableWorker poller =
8786
new Poller<>(
8887
options.getIdentity(),
8988
new ActivityPollTask(service, domain, taskList, options),
9089
new PollTaskExecutor<>(domain, taskList, options, new TaskHandlerImpl(handler)),
9190
options.getPollerOptions(),
9291
options.getMetricsScope());
9392
poller.start();
93+
setPoller(poller);
9494
options.getMetricsScope().counter(MetricsType.WORKER_START_COUNTER).inc(1);
9595
}
9696
}
9797

98-
@Override
99-
public boolean isStarted() {
100-
return poller.isStarted();
101-
}
102-
103-
@Override
104-
public boolean isShutdown() {
105-
return poller.isShutdown();
106-
}
107-
108-
@Override
109-
public boolean isTerminated() {
110-
return poller.isTerminated();
111-
}
112-
113-
@Override
114-
public void shutdown() {
115-
poller.shutdown();
116-
}
117-
118-
@Override
119-
public void shutdownNow() {
120-
poller.shutdownNow();
121-
}
122-
123-
@Override
124-
public void awaitTermination(long timeout, TimeUnit unit) {
125-
poller.awaitTermination(timeout, unit);
126-
}
127-
128-
@Override
129-
public void suspendPolling() {
130-
poller.suspendPolling();
131-
}
132-
133-
@Override
134-
public void resumePolling() {
135-
poller.resumePolling();
136-
}
137-
138-
@Override
139-
public boolean isSuspended() {
140-
return poller.isSuspended();
141-
}
142-
14398
private class TaskHandlerImpl
14499
implements PollTaskExecutor.TaskHandler<PollForActivityTaskResponse> {
145100

src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java

Lines changed: 3 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,14 @@
3434
import java.util.Map;
3535
import java.util.Objects;
3636
import java.util.Optional;
37-
import java.util.concurrent.TimeUnit;
3837
import java.util.function.BiFunction;
3938
import java.util.function.Consumer;
4039
import java.util.function.LongSupplier;
4140

42-
public final class LocalActivityWorker implements SuspendableWorker {
41+
public final class LocalActivityWorker extends SuspendableWorkerBase {
4342

4443
private static final String POLL_THREAD_NAME_PREFIX = "Local Activity Poller taskList=";
4544

46-
private SuspendableWorker poller = new NoopSuspendableWorker();
4745
private final ActivityTaskHandler handler;
4846
private final String domain;
4947
private final String taskList;
@@ -71,63 +69,19 @@ public LocalActivityWorker(
7169
@Override
7270
public void start() {
7371
if (handler.isAnyTypeSupported()) {
74-
poller =
72+
SuspendableWorker poller =
7573
new Poller<>(
7674
options.getIdentity(),
7775
laPollTask,
7876
new PollTaskExecutor<>(domain, taskList, options, new TaskHandlerImpl(handler)),
7977
options.getPollerOptions(),
8078
options.getMetricsScope());
8179
poller.start();
80+
setPoller(poller);
8281
options.getMetricsScope().counter(MetricsType.WORKER_START_COUNTER).inc(1);
8382
}
8483
}
8584

86-
@Override
87-
public boolean isStarted() {
88-
return poller.isStarted();
89-
}
90-
91-
@Override
92-
public boolean isShutdown() {
93-
return poller.isShutdown();
94-
}
95-
96-
@Override
97-
public boolean isTerminated() {
98-
return poller.isTerminated();
99-
}
100-
101-
@Override
102-
public void shutdown() {
103-
poller.shutdown();
104-
}
105-
106-
@Override
107-
public void shutdownNow() {
108-
poller.shutdownNow();
109-
}
110-
111-
@Override
112-
public void awaitTermination(long timeout, TimeUnit unit) {
113-
poller.awaitTermination(timeout, unit);
114-
}
115-
116-
@Override
117-
public void suspendPolling() {
118-
poller.suspendPolling();
119-
}
120-
121-
@Override
122-
public void resumePolling() {
123-
poller.resumePolling();
124-
}
125-
126-
@Override
127-
public boolean isSuspended() {
128-
return poller.isSuspended();
129-
}
130-
13185
public static class Task {
13286
private final ExecuteLocalActivityParameters params;
13387
private final Consumer<HistoryEvent> eventConsumer;

src/main/java/com/uber/cadence/internal/worker/NoopSuspendableWorker.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,27 +52,21 @@ public void shutdownNow() {
5252
public void awaitTermination(long timeout, TimeUnit unit) {}
5353

5454
@Override
55-
public void start() {
56-
throw new IllegalStateException("Non startable");
57-
}
55+
public void start() {}
5856

5957
@Override
6058
public boolean isStarted() {
6159
return false;
6260
}
6361

6462
@Override
65-
public void suspendPolling() {
66-
throw new IllegalStateException("Non suspendable");
67-
}
63+
public void suspendPolling() {}
6864

6965
@Override
70-
public void resumePolling() {
71-
throw new IllegalStateException("Non resumable");
72-
}
66+
public void resumePolling() {}
7367

7468
@Override
7569
public boolean isSuspended() {
76-
return false;
70+
return true;
7771
}
7872
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.internal.worker;
19+
20+
import java.util.concurrent.TimeUnit;
21+
22+
public abstract class SuspendableWorkerBase implements SuspendableWorker {
23+
private SuspendableWorker poller = new NoopSuspendableWorker();
24+
25+
final void setPoller(SuspendableWorker poller) {
26+
this.poller = poller;
27+
}
28+
29+
@Override
30+
public boolean isStarted() {
31+
return poller.isStarted();
32+
}
33+
34+
@Override
35+
public boolean isShutdown() {
36+
return poller.isShutdown();
37+
}
38+
39+
@Override
40+
public boolean isTerminated() {
41+
return poller.isTerminated();
42+
}
43+
44+
@Override
45+
public void shutdown() {
46+
poller.shutdown();
47+
}
48+
49+
@Override
50+
public void shutdownNow() {
51+
poller.shutdownNow();
52+
}
53+
54+
@Override
55+
public void awaitTermination(long timeout, TimeUnit unit) {
56+
if (!poller.isStarted()) {
57+
return;
58+
}
59+
poller.awaitTermination(timeout, unit);
60+
}
61+
62+
@Override
63+
public void suspendPolling() {
64+
poller.suspendPolling();
65+
}
66+
67+
@Override
68+
public void resumePolling() {
69+
poller.resumePolling();
70+
}
71+
72+
@Override
73+
public boolean isSuspended() {
74+
return poller.isSuspended();
75+
}
76+
}

src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java

Lines changed: 4 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -46,18 +46,16 @@
4646
import java.util.Arrays;
4747
import java.util.List;
4848
import java.util.Objects;
49-
import java.util.concurrent.TimeUnit;
5049
import java.util.concurrent.locks.Lock;
5150
import java.util.function.Consumer;
5251
import org.apache.thrift.TException;
5352
import org.slf4j.MDC;
5453

55-
public final class WorkflowWorker
56-
implements SuspendableWorker, Consumer<PollForDecisionTaskResponse> {
54+
public final class WorkflowWorker extends SuspendableWorkerBase
55+
implements Consumer<PollForDecisionTaskResponse> {
5756

5857
private static final String POLL_THREAD_NAME_PREFIX = "Workflow Poller taskList=";
5958

60-
private SuspendableWorker poller = new NoopSuspendableWorker();
6159
private PollTaskExecutor<PollForDecisionTaskResponse> pollTaskExecutor;
6260
private final DecisionTaskHandler handler;
6361
private final IWorkflowService service;
@@ -96,7 +94,7 @@ public void start() {
9694
if (handler.isAnyTypeSupported()) {
9795
pollTaskExecutor =
9896
new PollTaskExecutor<>(domain, taskList, options, new TaskHandlerImpl(handler));
99-
poller =
97+
SuspendableWorker poller =
10098
new Poller<>(
10199
options.getIdentity(),
102100
new WorkflowPollTask(
@@ -105,25 +103,11 @@ public void start() {
105103
options.getPollerOptions(),
106104
options.getMetricsScope());
107105
poller.start();
106+
setPoller(poller);
108107
options.getMetricsScope().counter(MetricsType.WORKER_START_COUNTER).inc(1);
109108
}
110109
}
111110

112-
@Override
113-
public boolean isStarted() {
114-
return poller.isStarted();
115-
}
116-
117-
@Override
118-
public boolean isShutdown() {
119-
return poller.isShutdown();
120-
}
121-
122-
@Override
123-
public boolean isTerminated() {
124-
return poller.isTerminated();
125-
}
126-
127111
public byte[] queryWorkflowExecution(WorkflowExecution exec, String queryType, byte[] args)
128112
throws Exception {
129113
GetWorkflowExecutionHistoryResponse historyResponse =
@@ -187,40 +171,6 @@ private byte[] queryWorkflowExecution(
187171
throw new RuntimeException("Query returned wrong response: " + result);
188172
}
189173

190-
@Override
191-
public void shutdown() {
192-
poller.shutdown();
193-
}
194-
195-
@Override
196-
public void shutdownNow() {
197-
poller.shutdownNow();
198-
}
199-
200-
@Override
201-
public void awaitTermination(long timeout, TimeUnit unit) {
202-
if (!poller.isStarted()) {
203-
return;
204-
}
205-
206-
poller.awaitTermination(timeout, unit);
207-
}
208-
209-
@Override
210-
public void suspendPolling() {
211-
poller.suspendPolling();
212-
}
213-
214-
@Override
215-
public void resumePolling() {
216-
poller.resumePolling();
217-
}
218-
219-
@Override
220-
public boolean isSuspended() {
221-
return poller.isSuspended();
222-
}
223-
224174
@Override
225175
public void accept(PollForDecisionTaskResponse pollForDecisionTaskResponse) {
226176
pollTaskExecutor.process(pollForDecisionTaskResponse);

0 commit comments

Comments
 (0)