Skip to content

Commit d7fb63c

Browse files
authored
use noop worker when local dispatch disabled (#569)
1 parent 5f20884 commit d7fb63c

File tree

2 files changed

+18
-31
lines changed

2 files changed

+18
-31
lines changed

src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java

Lines changed: 17 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.uber.cadence.internal.worker.LocalActivityWorker;
2929
import com.uber.cadence.internal.worker.LocallyDispatchedActivityWorker;
3030
import com.uber.cadence.internal.worker.LocallyDispatchedActivityWorker.Task;
31+
import com.uber.cadence.internal.worker.NoopSuspendableWorker;
3132
import com.uber.cadence.internal.worker.SingleWorkerOptions;
3233
import com.uber.cadence.internal.worker.SuspendableWorker;
3334
import com.uber.cadence.internal.worker.WorkflowWorker;
@@ -45,9 +46,7 @@
4546
import java.util.function.Consumer;
4647
import java.util.function.Function;
4748

48-
/**
49-
* Workflow worker that supports POJO workflow implementations.
50-
*/
49+
/** Workflow worker that supports POJO workflow implementations. */
5150
public class SyncWorkflowWorker
5251
implements SuspendableWorker, Consumer<PollForDecisionTaskResponse> {
5352

@@ -58,7 +57,7 @@ public class SyncWorkflowWorker
5857
private final POJOActivityTaskHandler laTaskHandler;
5958
private final ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(4);
6059
private final ScheduledExecutorService ldaHeartbeatExecutor = Executors.newScheduledThreadPool(4);
61-
private LocallyDispatchedActivityWorker ldaWorker;
60+
private SuspendableWorker ldaWorker;
6261
private POJOActivityTaskHandler ldaTaskHandler;
6362

6463
public SyncWorkflowWorker(
@@ -112,7 +111,10 @@ public SyncWorkflowWorker(
112111
ldaWorker =
113112
new LocallyDispatchedActivityWorker(
114113
service, domain, taskList, locallyDispatchedActivityOptions, ldaTaskHandler);
115-
locallyDispatchedActivityTaskPoller = ldaWorker.getLocallyDispatchedActivityTaskPoller();
114+
locallyDispatchedActivityTaskPoller =
115+
((LocallyDispatchedActivityWorker) ldaWorker).getLocallyDispatchedActivityTaskPoller();
116+
} else {
117+
ldaWorker = new NoopSuspendableWorker();
116118
}
117119

118120
workflowWorker =
@@ -157,84 +159,69 @@ public void start() {
157159
// to start LocalActivity Worker.
158160
if (workflowWorker.isStarted()) {
159161
laWorker.start();
160-
if (ldaWorker != null) {
161-
ldaWorker.start();
162-
}
162+
ldaWorker.start();
163163
}
164164
}
165165

166166
@Override
167167
public boolean isStarted() {
168-
return workflowWorker.isStarted() && laWorker.isStarted() && (ldaWorker == null || ldaWorker
169-
.isStarted());
168+
return workflowWorker.isStarted() && laWorker.isStarted() && ldaWorker.isStarted();
170169
}
171170

172171
@Override
173172
public boolean isShutdown() {
174-
return workflowWorker.isShutdown() && laWorker.isShutdown() && (ldaWorker == null || ldaWorker
175-
.isShutdown());
173+
return workflowWorker.isShutdown() && laWorker.isShutdown() && ldaWorker.isShutdown();
176174
}
177175

178176
@Override
179177
public boolean isTerminated() {
180178
return workflowWorker.isTerminated()
181179
&& laWorker.isTerminated()
182180
&& ldaHeartbeatExecutor.isTerminated()
183-
&& (ldaWorker == null || ldaWorker.isTerminated());
181+
&& ldaWorker.isTerminated();
184182
}
185183

186184
@Override
187185
public void shutdown() {
188186
laWorker.shutdown();
189187
ldaHeartbeatExecutor.shutdown();
190-
if (ldaWorker != null) {
191-
ldaWorker.shutdown();
192-
}
188+
ldaWorker.shutdown();
193189
workflowWorker.shutdown();
194190
}
195191

196192
@Override
197193
public void shutdownNow() {
198194
laWorker.shutdownNow();
199195
ldaHeartbeatExecutor.shutdownNow();
200-
if (ldaWorker != null) {
201-
ldaWorker.shutdownNow();
202-
}
196+
ldaWorker.shutdownNow();
203197
workflowWorker.shutdownNow();
204198
}
205199

206200
@Override
207201
public void awaitTermination(long timeout, TimeUnit unit) {
208202
long timeoutMillis = InternalUtils.awaitTermination(laWorker, unit.toMillis(timeout));
209203
timeoutMillis = InternalUtils.awaitTermination(ldaHeartbeatExecutor, timeoutMillis);
210-
if (ldaWorker != null) {
211-
timeoutMillis = InternalUtils.awaitTermination(ldaWorker, timeoutMillis);
212-
}
204+
timeoutMillis = InternalUtils.awaitTermination(ldaWorker, timeoutMillis);
213205
InternalUtils.awaitTermination(workflowWorker, timeoutMillis);
214206
}
215207

216208
@Override
217209
public void suspendPolling() {
218210
workflowWorker.suspendPolling();
219211
laWorker.suspendPolling();
220-
if (ldaWorker != null) {
221-
ldaWorker.suspendPolling();
222-
}
212+
ldaWorker.suspendPolling();
223213
}
224214

225215
@Override
226216
public void resumePolling() {
227217
workflowWorker.resumePolling();
228218
laWorker.resumePolling();
229-
if (ldaWorker != null) {
230-
ldaWorker.resumePolling();
231-
}
219+
ldaWorker.resumePolling();
232220
}
233221

234222
@Override
235223
public boolean isSuspended() {
236-
return workflowWorker.isSuspended() && laWorker.isSuspended() && (ldaWorker == null || ldaWorker
237-
.isSuspended());
224+
return workflowWorker.isSuspended() && laWorker.isSuspended() && ldaWorker.isSuspended();
238225
}
239226

240227
public <R> R queryWorkflowExecution(

src/main/java/com/uber/cadence/internal/worker/NoopSuspendableWorker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
* Helper class that is used instead of null for non initialized worker. This eliminates needs for
2525
* null checks when calling into it.
2626
*/
27-
class NoopSuspendableWorker implements SuspendableWorker {
27+
public class NoopSuspendableWorker implements SuspendableWorker {
2828

2929
private AtomicBoolean shutdown = new AtomicBoolean();
3030

0 commit comments

Comments
 (0)