Skip to content

Commit 926be8a

Browse files
committed
Fix ActivityCompletionClient cancellation and failure by WorkflowExecution
When sending RespondActivityTaskFailedByIDRequest or RespondActivityTaskCanceledByIDRequest we don't include the ActivityID. Correctly include the id and add test coverage.
1 parent 2afe97e commit 926be8a

File tree

3 files changed

+255
-0
lines changed

3 files changed

+255
-0
lines changed

src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ public void fail(Throwable failure) {
153153
}
154154
} else {
155155
RespondActivityTaskFailedByIDRequest request = new RespondActivityTaskFailedByIDRequest();
156+
request.setActivityID(activityId);
156157
request.setReason(failure.getClass().getName());
157158
request.setDetails(dataConverter.toData(failure));
158159
request.setDomain(domain);
@@ -212,6 +213,7 @@ public void reportCancellation(Object details) {
212213
}
213214
} else {
214215
RespondActivityTaskCanceledByIDRequest request = new RespondActivityTaskCanceledByIDRequest();
216+
request.setActivityID(activityId);
215217
request.setDetails(dataConverter.toData(details));
216218
request.setDomain(domain);
217219
request.setWorkflowID(execution.getWorkflowId());

src/test/java/com/uber/cadence/testUtils/CadenceTestRule.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ public void evaluate() throws Throwable {
6161
};
6262
}
6363

64+
public <T> T newWorkflowStub(Class<T> tClass) {
65+
return getWorkflowClient().newWorkflowStub(tClass, workflowOptionsBuilder().build());
66+
}
67+
6468
public TracingWorkflowInterceptorFactory getTracer() {
6569
return context.getTracer();
6670
}
Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.workflow;
19+
20+
import com.google.common.base.Preconditions;
21+
import com.uber.cadence.WorkflowExecution;
22+
import com.uber.cadence.activity.Activity;
23+
import com.uber.cadence.activity.ActivityMethod;
24+
import com.uber.cadence.activity.ActivityOptions;
25+
import com.uber.cadence.activity.ActivityTask;
26+
import com.uber.cadence.client.ActivityCompletionClient;
27+
import com.uber.cadence.testUtils.CadenceTestRule;
28+
import java.time.Duration;
29+
import java.util.concurrent.CancellationException;
30+
import org.junit.Rule;
31+
import org.junit.Test;
32+
33+
public class ManualActivityCompletionWorkflowTest {
34+
@Rule
35+
public CadenceTestRule testRule =
36+
CadenceTestRule.builder()
37+
.withActivities(new ManualCompletionActivitiesImpl())
38+
.withWorkflowTypes(ManualCompletionWorkflowImpl.class)
39+
.startWorkersAutomatically()
40+
.build();
41+
42+
@Test
43+
public void testManualActivityCompletion() {
44+
testRule.newWorkflowStub(ManualCompletionWorkflow.class).run();
45+
}
46+
47+
public interface ManualCompletionActivities {
48+
@ActivityMethod
49+
String asyncActivity();
50+
51+
@ActivityMethod
52+
void completeAsyncActivity(String result);
53+
54+
@ActivityMethod
55+
void completeAsyncActivityById(String result);
56+
57+
@ActivityMethod
58+
void failAsyncActivity(String details);
59+
60+
@ActivityMethod
61+
void failAsyncActivityById(String details);
62+
63+
@ActivityMethod
64+
void cancelAsyncActivity(String details);
65+
66+
@ActivityMethod
67+
void cancelAsyncActivityById(String details);
68+
}
69+
70+
private class ManualCompletionActivitiesImpl implements ManualCompletionActivities {
71+
private ActivityTask openTask;
72+
73+
@Override
74+
public synchronized String asyncActivity() {
75+
openTask = Activity.getTask();
76+
77+
Activity.doNotCompleteOnReturn();
78+
return null;
79+
}
80+
81+
@Override
82+
public synchronized void completeAsyncActivity(String details) {
83+
Preconditions.checkState(openTask != null);
84+
getClient().complete(openTask.getTaskToken(), details);
85+
}
86+
87+
@Override
88+
public synchronized void completeAsyncActivityById(String details) {
89+
Preconditions.checkState(openTask != null);
90+
getClient().complete(getCurrentWorkflow(), openTask.getActivityId(), details);
91+
}
92+
93+
@Override
94+
public synchronized void failAsyncActivity(String details) {
95+
Preconditions.checkState(openTask != null);
96+
getClient()
97+
.completeExceptionally(openTask.getTaskToken(), new ExceptionWithDetaills(details));
98+
}
99+
100+
@Override
101+
public synchronized void failAsyncActivityById(String details) {
102+
Preconditions.checkState(openTask != null);
103+
getClient()
104+
.completeExceptionally(
105+
getCurrentWorkflow(), openTask.getActivityId(), new ExceptionWithDetaills(details));
106+
}
107+
108+
@Override
109+
public synchronized void cancelAsyncActivity(String details) {
110+
Preconditions.checkState(openTask != null);
111+
getClient().reportCancellation(openTask.getTaskToken(), details);
112+
}
113+
114+
@Override
115+
public synchronized void cancelAsyncActivityById(String details) {
116+
Preconditions.checkState(openTask != null);
117+
getClient().reportCancellation(getCurrentWorkflow(), openTask.getActivityId(), details);
118+
}
119+
120+
private WorkflowExecution getCurrentWorkflow() {
121+
return Activity.getWorkflowExecution();
122+
}
123+
124+
private ActivityCompletionClient getClient() {
125+
return testRule.getWorkflowClient().newActivityCompletionClient();
126+
}
127+
}
128+
129+
public interface ManualCompletionWorkflow {
130+
@WorkflowMethod
131+
void run();
132+
}
133+
134+
public static class ManualCompletionWorkflowImpl implements ManualCompletionWorkflow {
135+
private final ManualCompletionActivities activities =
136+
Workflow.newActivityStub(
137+
ManualCompletionActivities.class,
138+
new ActivityOptions.Builder()
139+
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
140+
.build());
141+
142+
@Override
143+
public void run() {
144+
Promise<String> result = Async.function(activities::asyncActivity);
145+
activities.completeAsyncActivity("1");
146+
expectSuccess("1", result);
147+
expectFailure(() -> activities.completeAsyncActivity("again"));
148+
149+
result = Async.function(activities::asyncActivity);
150+
activities.completeAsyncActivityById("2");
151+
expectSuccess("2", result);
152+
expectFailure(() -> activities.completeAsyncActivityById("again"));
153+
154+
result = Async.function(activities::asyncActivity);
155+
activities.failAsyncActivity("3");
156+
expectFailureWithDetails(result, "3");
157+
expectFailure(() -> activities.failAsyncActivity("again"));
158+
159+
result = Async.function(activities::asyncActivity);
160+
activities.failAsyncActivityById("4");
161+
expectFailureWithDetails(result, "4");
162+
expectFailure(() -> activities.failAsyncActivityById("again"));
163+
164+
// Need to request cancellation, then the activity can respond with the cancel
165+
CompletablePromise<String> completablePromise = Workflow.newPromise();
166+
CancellationScope scope =
167+
Workflow.newCancellationScope(
168+
() -> {
169+
completablePromise.completeFrom(Async.function(activities::asyncActivity));
170+
});
171+
result = completablePromise;
172+
scope.run();
173+
// Need to force a separate decision, otherwise the activity gets skipped since it's started
174+
// and cancelled
175+
// in the same decision
176+
Workflow.sleep(1);
177+
scope.cancel();
178+
activities.cancelAsyncActivity("5");
179+
expectCancelled(result);
180+
181+
// Need to request cancellation, then the activity can respond with the cancel
182+
CompletablePromise<String> completablePromise2 = Workflow.newPromise();
183+
scope =
184+
Workflow.newCancellationScope(
185+
() -> {
186+
completablePromise2.completeFrom(Async.function(activities::asyncActivity));
187+
});
188+
scope.run();
189+
// Need to force a separate decision, otherwise the activity gets skipped since it's started
190+
// and cancelled in the same decision
191+
Workflow.sleep(1);
192+
scope.cancel();
193+
result = completablePromise2;
194+
activities.cancelAsyncActivityById("6");
195+
expectCancelled(result);
196+
}
197+
198+
private void expectCancelled(Promise<String> promise) {
199+
try {
200+
promise.get();
201+
throw new IllegalStateException("expected activity failure");
202+
} catch (CancellationException e) {
203+
// good
204+
}
205+
}
206+
207+
private void expectFailureWithDetails(Promise<String> promise, String expectedDetails) {
208+
try {
209+
promise.get();
210+
throw new IllegalStateException("expected activity failure");
211+
} catch (ActivityFailureException e) {
212+
if (!(e.getCause() instanceof ExceptionWithDetaills)) {
213+
throw new IllegalStateException(
214+
"Didn't receive correct cause, instead found this:", e.getCause());
215+
}
216+
String details = ((ExceptionWithDetaills) e.getCause()).details;
217+
if (!expectedDetails.equals(details)) {
218+
throw new IllegalStateException(
219+
"Expected: '" + expectedDetails + "', got: '" + details + "'");
220+
}
221+
}
222+
}
223+
224+
private void expectFailure(Runnable runnable) {
225+
try {
226+
runnable.run();
227+
throw new IllegalStateException("expected activity failure");
228+
} catch (ActivityFailureException e) {
229+
// good
230+
}
231+
}
232+
233+
private void expectSuccess(String expected, Promise<String> actual) {
234+
if (!expected.equals(actual.get())) {
235+
throw new IllegalStateException("Expected: '" + expected + "', got: '" + actual + "'");
236+
}
237+
}
238+
}
239+
240+
public static class ExceptionWithDetaills extends RuntimeException {
241+
public String details;
242+
243+
public ExceptionWithDetaills() {}
244+
245+
public ExceptionWithDetaills(String details) {
246+
this.details = details;
247+
}
248+
}
249+
}

0 commit comments

Comments
 (0)