Skip to content

Commit 5f20884

Browse files
authored
do not dispatch activities locally if taskListActivitiesPerSecond is set (#567)
1 parent e33ed95 commit 5f20884

File tree

3 files changed

+86
-59
lines changed

3 files changed

+86
-59
lines changed

src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java

Lines changed: 50 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.uber.cadence.internal.worker.DecisionTaskHandler;
2828
import com.uber.cadence.internal.worker.LocalActivityWorker;
2929
import com.uber.cadence.internal.worker.LocallyDispatchedActivityWorker;
30+
import com.uber.cadence.internal.worker.LocallyDispatchedActivityWorker.Task;
3031
import com.uber.cadence.internal.worker.SingleWorkerOptions;
3132
import com.uber.cadence.internal.worker.SuspendableWorker;
3233
import com.uber.cadence.internal.worker.WorkflowWorker;
@@ -44,20 +45,21 @@
4445
import java.util.function.Consumer;
4546
import java.util.function.Function;
4647

47-
/** Workflow worker that supports POJO workflow implementations. */
48+
/**
49+
* Workflow worker that supports POJO workflow implementations.
50+
*/
4851
public class SyncWorkflowWorker
4952
implements SuspendableWorker, Consumer<PollForDecisionTaskResponse> {
5053

5154
private final WorkflowWorker workflowWorker;
5255
private final LocalActivityWorker laWorker;
53-
private final LocallyDispatchedActivityWorker ldaWorker;
54-
5556
private final POJOWorkflowImplementationFactory factory;
5657
private final DataConverter dataConverter;
5758
private final POJOActivityTaskHandler laTaskHandler;
58-
private final POJOActivityTaskHandler ldaTaskHandler;
5959
private final ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(4);
6060
private final ScheduledExecutorService ldaHeartbeatExecutor = Executors.newScheduledThreadPool(4);
61+
private LocallyDispatchedActivityWorker ldaWorker;
62+
private POJOActivityTaskHandler ldaTaskHandler;
6163

6264
public SyncWorkflowWorker(
6365
IWorkflowService service,
@@ -97,15 +99,21 @@ public SyncWorkflowWorker(
9799
stickyDecisionScheduleToStartTimeout,
98100
service,
99101
laWorker.getLocalActivityTaskPoller());
100-
ldaTaskHandler =
101-
new POJOActivityTaskHandler(
102-
service,
103-
domain,
104-
locallyDispatchedActivityOptions.getDataConverter(),
105-
ldaHeartbeatExecutor);
106-
ldaWorker =
107-
new LocallyDispatchedActivityWorker(
108-
service, domain, taskList, locallyDispatchedActivityOptions, ldaTaskHandler);
102+
103+
Function<Task, Boolean> locallyDispatchedActivityTaskPoller = null;
104+
// do not dispatch locally if TaskListActivitiesPerSecond is set
105+
if (locallyDispatchedActivityOptions.getTaskListActivitiesPerSecond() == 0) {
106+
ldaTaskHandler =
107+
new POJOActivityTaskHandler(
108+
service,
109+
domain,
110+
locallyDispatchedActivityOptions.getDataConverter(),
111+
ldaHeartbeatExecutor);
112+
ldaWorker =
113+
new LocallyDispatchedActivityWorker(
114+
service, domain, taskList, locallyDispatchedActivityOptions, ldaTaskHandler);
115+
locallyDispatchedActivityTaskPoller = ldaWorker.getLocallyDispatchedActivityTaskPoller();
116+
}
109117

110118
workflowWorker =
111119
new WorkflowWorker(
@@ -114,7 +122,7 @@ public SyncWorkflowWorker(
114122
taskList,
115123
workflowOptions,
116124
taskHandler,
117-
ldaWorker.getLocallyDispatchedActivityTaskPoller(),
125+
locallyDispatchedActivityTaskPoller,
118126
stickyTaskListName);
119127
}
120128

@@ -137,7 +145,9 @@ public void setLocalActivitiesImplementation(Object... activitiesImplementation)
137145
}
138146

139147
public void setActivitiesImplementationToDispatchLocally(Object... activitiesImplementation) {
140-
this.ldaTaskHandler.setActivitiesImplementation(activitiesImplementation);
148+
if (this.ldaTaskHandler != null) {
149+
this.ldaTaskHandler.setActivitiesImplementation(activitiesImplementation);
150+
}
141151
}
142152

143153
@Override
@@ -147,69 +157,84 @@ public void start() {
147157
// to start LocalActivity Worker.
148158
if (workflowWorker.isStarted()) {
149159
laWorker.start();
150-
ldaWorker.start();
160+
if (ldaWorker != null) {
161+
ldaWorker.start();
162+
}
151163
}
152164
}
153165

154166
@Override
155167
public boolean isStarted() {
156-
return workflowWorker.isStarted() && laWorker.isStarted() && ldaWorker.isStarted();
168+
return workflowWorker.isStarted() && laWorker.isStarted() && (ldaWorker == null || ldaWorker
169+
.isStarted());
157170
}
158171

159172
@Override
160173
public boolean isShutdown() {
161-
return workflowWorker.isShutdown() && laWorker.isShutdown() && ldaWorker.isShutdown();
174+
return workflowWorker.isShutdown() && laWorker.isShutdown() && (ldaWorker == null || ldaWorker
175+
.isShutdown());
162176
}
163177

164178
@Override
165179
public boolean isTerminated() {
166180
return workflowWorker.isTerminated()
167181
&& laWorker.isTerminated()
168182
&& ldaHeartbeatExecutor.isTerminated()
169-
&& ldaWorker.isTerminated();
183+
&& (ldaWorker == null || ldaWorker.isTerminated());
170184
}
171185

172186
@Override
173187
public void shutdown() {
174188
laWorker.shutdown();
175189
ldaHeartbeatExecutor.shutdown();
176-
ldaWorker.shutdown();
190+
if (ldaWorker != null) {
191+
ldaWorker.shutdown();
192+
}
177193
workflowWorker.shutdown();
178194
}
179195

180196
@Override
181197
public void shutdownNow() {
182198
laWorker.shutdownNow();
183199
ldaHeartbeatExecutor.shutdownNow();
184-
ldaWorker.shutdownNow();
200+
if (ldaWorker != null) {
201+
ldaWorker.shutdownNow();
202+
}
185203
workflowWorker.shutdownNow();
186204
}
187205

188206
@Override
189207
public void awaitTermination(long timeout, TimeUnit unit) {
190208
long timeoutMillis = InternalUtils.awaitTermination(laWorker, unit.toMillis(timeout));
191209
timeoutMillis = InternalUtils.awaitTermination(ldaHeartbeatExecutor, timeoutMillis);
192-
timeoutMillis = InternalUtils.awaitTermination(ldaWorker, timeoutMillis);
210+
if (ldaWorker != null) {
211+
timeoutMillis = InternalUtils.awaitTermination(ldaWorker, timeoutMillis);
212+
}
193213
InternalUtils.awaitTermination(workflowWorker, timeoutMillis);
194214
}
195215

196216
@Override
197217
public void suspendPolling() {
198218
workflowWorker.suspendPolling();
199219
laWorker.suspendPolling();
200-
ldaWorker.suspendPolling();
220+
if (ldaWorker != null) {
221+
ldaWorker.suspendPolling();
222+
}
201223
}
202224

203225
@Override
204226
public void resumePolling() {
205227
workflowWorker.resumePolling();
206228
laWorker.resumePolling();
207-
ldaWorker.resumePolling();
229+
if (ldaWorker != null) {
230+
ldaWorker.resumePolling();
231+
}
208232
}
209233

210234
@Override
211235
public boolean isSuspended() {
212-
return workflowWorker.isSuspended() && laWorker.isSuspended() && ldaWorker.isSuspended();
236+
return workflowWorker.isSuspended() && laWorker.isSuspended() && (ldaWorker == null || ldaWorker
237+
.isSuspended());
213238
}
214239

215240
public <R> R queryWorkflowExecution(

src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ PollerOptions getPollerOptions() {
173173
return pollerOptions;
174174
}
175175

176-
double getTaskListActivitiesPerSecond() {
176+
public double getTaskListActivitiesPerSecond() {
177177
return taskListActivitiesPerSecond;
178178
}
179179

src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java

Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -253,39 +253,41 @@ private void sendReply(
253253
RespondDecisionTaskCompletedResponse taskCompletedResponse = null;
254254
List<Task> activityTasks = new ArrayList<>();
255255
try {
256-
for (Decision decision : taskCompleted.getDecisions()) {
257-
ScheduleActivityTaskDecisionAttributes attr =
258-
decision.getScheduleActivityTaskDecisionAttributes();
259-
if (attr != null && taskList.equals(attr.getTaskList().getName())) {
260-
// assume the activity type is in registry otherwise the activity would be
261-
// failed and retried from server
262-
Task activityTask =
263-
new Task(
264-
attr.getActivityId(),
265-
attr.getActivityType(),
266-
attr.bufferForInput(),
267-
attr.getScheduleToCloseTimeoutSeconds(),
268-
attr.getStartToCloseTimeoutSeconds(),
269-
attr.getHeartbeatTimeoutSeconds(),
270-
task.getWorkflowType(),
271-
domain,
272-
attr.getHeader(),
273-
task.getWorkflowExecution());
274-
if (ldaTaskPoller.apply(activityTask)) {
275-
options
276-
.getMetricsScope()
277-
.counter(MetricsType.ACTIVITY_LOCAL_DISPATCH_SUCCEED_COUNTER)
278-
.inc(1);
279-
decision
280-
.getScheduleActivityTaskDecisionAttributes()
281-
.setRequestLocalDispatch(true);
282-
activityTasks.add(activityTask);
283-
} else {
284-
// all pollers are busy - no room to optimize
285-
options
286-
.getMetricsScope()
287-
.counter(MetricsType.ACTIVITY_LOCAL_DISPATCH_FAILED_COUNTER)
288-
.inc(1);
256+
if (ldaTaskPoller != null) {
257+
for (Decision decision : taskCompleted.getDecisions()) {
258+
ScheduleActivityTaskDecisionAttributes attr =
259+
decision.getScheduleActivityTaskDecisionAttributes();
260+
if (attr != null && taskList.equals(attr.getTaskList().getName())) {
261+
// assume the activity type is in registry otherwise the activity would be
262+
// failed and retried from server
263+
Task activityTask =
264+
new Task(
265+
attr.getActivityId(),
266+
attr.getActivityType(),
267+
attr.bufferForInput(),
268+
attr.getScheduleToCloseTimeoutSeconds(),
269+
attr.getStartToCloseTimeoutSeconds(),
270+
attr.getHeartbeatTimeoutSeconds(),
271+
task.getWorkflowType(),
272+
domain,
273+
attr.getHeader(),
274+
task.getWorkflowExecution());
275+
if (ldaTaskPoller.apply(activityTask)) {
276+
options
277+
.getMetricsScope()
278+
.counter(MetricsType.ACTIVITY_LOCAL_DISPATCH_SUCCEED_COUNTER)
279+
.inc(1);
280+
decision
281+
.getScheduleActivityTaskDecisionAttributes()
282+
.setRequestLocalDispatch(true);
283+
activityTasks.add(activityTask);
284+
} else {
285+
// all pollers are busy - no room to optimize
286+
options
287+
.getMetricsScope()
288+
.counter(MetricsType.ACTIVITY_LOCAL_DISPATCH_FAILED_COUNTER)
289+
.inc(1);
290+
}
289291
}
290292
}
291293
}

0 commit comments

Comments
 (0)