Skip to content

Commit 948814c

Browse files
authored
Fix ActivityCompletionClient cancellation and failure by WorkflowExecution (#930)
When sending RespondActivityTaskFailedByIDRequest or RespondActivityTaskCanceledByIDRequest we don't include the ActivityID. Correctly include the id and add test coverage.
1 parent fa60a85 commit 948814c

File tree

3 files changed

+255
-0
lines changed

3 files changed

+255
-0
lines changed

src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ public void fail(Throwable failure) {
153153
}
154154
} else {
155155
RespondActivityTaskFailedByIDRequest request = new RespondActivityTaskFailedByIDRequest();
156+
request.setActivityID(activityId);
156157
request.setReason(failure.getClass().getName());
157158
request.setDetails(dataConverter.toData(failure));
158159
request.setDomain(domain);
@@ -212,6 +213,7 @@ public void reportCancellation(Object details) {
212213
}
213214
} else {
214215
RespondActivityTaskCanceledByIDRequest request = new RespondActivityTaskCanceledByIDRequest();
216+
request.setActivityID(activityId);
215217
request.setDetails(dataConverter.toData(details));
216218
request.setDomain(domain);
217219
request.setWorkflowID(execution.getWorkflowId());

src/test/java/com/uber/cadence/testUtils/CadenceTestRule.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ public void evaluate() throws Throwable {
6161
};
6262
}
6363

64+
public <T> T newWorkflowStub(Class<T> tClass) {
65+
return getWorkflowClient().newWorkflowStub(tClass, workflowOptionsBuilder().build());
66+
}
67+
6468
public TracingWorkflowInterceptorFactory getTracer() {
6569
return context.getTracer();
6670
}
Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.workflow;
19+
20+
import com.google.common.base.Preconditions;
21+
import com.uber.cadence.WorkflowExecution;
22+
import com.uber.cadence.activity.Activity;
23+
import com.uber.cadence.activity.ActivityMethod;
24+
import com.uber.cadence.activity.ActivityOptions;
25+
import com.uber.cadence.activity.ActivityTask;
26+
import com.uber.cadence.client.ActivityCompletionClient;
27+
import com.uber.cadence.testUtils.CadenceTestRule;
28+
import java.time.Duration;
29+
import java.util.concurrent.CancellationException;
30+
import org.junit.Rule;
31+
import org.junit.Test;
32+
33+
public class ManualActivityCompletionWorkflowTest {
34+
@Rule
35+
public CadenceTestRule testRule =
36+
CadenceTestRule.builder()
37+
.withActivities(new ManualCompletionActivitiesImpl())
38+
.withWorkflowTypes(ManualCompletionWorkflowImpl.class)
39+
.startWorkersAutomatically()
40+
.build();
41+
42+
@Test
43+
public void testManualActivityCompletion() {
44+
testRule.newWorkflowStub(ManualCompletionWorkflow.class).run();
45+
}
46+
47+
public interface ManualCompletionActivities {
48+
@ActivityMethod
49+
String asyncActivity();
50+
51+
@ActivityMethod
52+
void completeAsyncActivity(String result);
53+
54+
@ActivityMethod
55+
void completeAsyncActivityById(String result);
56+
57+
@ActivityMethod
58+
void failAsyncActivity(String details);
59+
60+
@ActivityMethod
61+
void failAsyncActivityById(String details);
62+
63+
@ActivityMethod
64+
void cancelAsyncActivity(String details);
65+
66+
@ActivityMethod
67+
void cancelAsyncActivityById(String details);
68+
}
69+
70+
private class ManualCompletionActivitiesImpl implements ManualCompletionActivities {
71+
private ActivityTask openTask;
72+
73+
@Override
74+
public synchronized String asyncActivity() {
75+
openTask = Activity.getTask();
76+
77+
Activity.doNotCompleteOnReturn();
78+
return null;
79+
}
80+
81+
@Override
82+
public synchronized void completeAsyncActivity(String details) {
83+
Preconditions.checkState(openTask != null);
84+
getClient().complete(openTask.getTaskToken(), details);
85+
}
86+
87+
@Override
88+
public synchronized void completeAsyncActivityById(String details) {
89+
Preconditions.checkState(openTask != null);
90+
getClient().complete(getCurrentWorkflow(), openTask.getActivityId(), details);
91+
}
92+
93+
@Override
94+
public synchronized void failAsyncActivity(String details) {
95+
Preconditions.checkState(openTask != null);
96+
getClient()
97+
.completeExceptionally(openTask.getTaskToken(), new ExceptionWithDetaills(details));
98+
}
99+
100+
@Override
101+
public synchronized void failAsyncActivityById(String details) {
102+
Preconditions.checkState(openTask != null);
103+
getClient()
104+
.completeExceptionally(
105+
getCurrentWorkflow(), openTask.getActivityId(), new ExceptionWithDetaills(details));
106+
}
107+
108+
@Override
109+
public synchronized void cancelAsyncActivity(String details) {
110+
Preconditions.checkState(openTask != null);
111+
getClient().reportCancellation(openTask.getTaskToken(), details);
112+
}
113+
114+
@Override
115+
public synchronized void cancelAsyncActivityById(String details) {
116+
Preconditions.checkState(openTask != null);
117+
getClient().reportCancellation(getCurrentWorkflow(), openTask.getActivityId(), details);
118+
}
119+
120+
private WorkflowExecution getCurrentWorkflow() {
121+
return Activity.getWorkflowExecution();
122+
}
123+
124+
private ActivityCompletionClient getClient() {
125+
return testRule.getWorkflowClient().newActivityCompletionClient();
126+
}
127+
}
128+
129+
public interface ManualCompletionWorkflow {
130+
@WorkflowMethod
131+
void run();
132+
}
133+
134+
public static class ManualCompletionWorkflowImpl implements ManualCompletionWorkflow {
135+
private final ManualCompletionActivities activities =
136+
Workflow.newActivityStub(
137+
ManualCompletionActivities.class,
138+
new ActivityOptions.Builder()
139+
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
140+
.build());
141+
142+
@Override
143+
public void run() {
144+
Promise<String> result = Async.function(activities::asyncActivity);
145+
activities.completeAsyncActivity("1");
146+
expectSuccess("1", result);
147+
expectFailure(() -> activities.completeAsyncActivity("again"));
148+
149+
result = Async.function(activities::asyncActivity);
150+
activities.completeAsyncActivityById("2");
151+
expectSuccess("2", result);
152+
expectFailure(() -> activities.completeAsyncActivityById("again"));
153+
154+
result = Async.function(activities::asyncActivity);
155+
activities.failAsyncActivity("3");
156+
expectFailureWithDetails(result, "3");
157+
expectFailure(() -> activities.failAsyncActivity("again"));
158+
159+
result = Async.function(activities::asyncActivity);
160+
activities.failAsyncActivityById("4");
161+
expectFailureWithDetails(result, "4");
162+
expectFailure(() -> activities.failAsyncActivityById("again"));
163+
164+
// Need to request cancellation, then the activity can respond with the cancel
165+
CompletablePromise<String> completablePromise = Workflow.newPromise();
166+
CancellationScope scope =
167+
Workflow.newCancellationScope(
168+
() -> {
169+
completablePromise.completeFrom(Async.function(activities::asyncActivity));
170+
});
171+
result = completablePromise;
172+
scope.run();
173+
// Need to force a separate decision, otherwise the activity gets skipped since it's started
174+
// and cancelled
175+
// in the same decision
176+
Workflow.sleep(1);
177+
scope.cancel();
178+
activities.cancelAsyncActivity("5");
179+
expectCancelled(result);
180+
181+
// Need to request cancellation, then the activity can respond with the cancel
182+
CompletablePromise<String> completablePromise2 = Workflow.newPromise();
183+
scope =
184+
Workflow.newCancellationScope(
185+
() -> {
186+
completablePromise2.completeFrom(Async.function(activities::asyncActivity));
187+
});
188+
scope.run();
189+
// Need to force a separate decision, otherwise the activity gets skipped since it's started
190+
// and cancelled in the same decision
191+
Workflow.sleep(1);
192+
scope.cancel();
193+
result = completablePromise2;
194+
activities.cancelAsyncActivityById("6");
195+
expectCancelled(result);
196+
}
197+
198+
private void expectCancelled(Promise<String> promise) {
199+
try {
200+
promise.get();
201+
throw new IllegalStateException("expected activity failure");
202+
} catch (CancellationException e) {
203+
// good
204+
}
205+
}
206+
207+
private void expectFailureWithDetails(Promise<String> promise, String expectedDetails) {
208+
try {
209+
promise.get();
210+
throw new IllegalStateException("expected activity failure");
211+
} catch (ActivityFailureException e) {
212+
if (!(e.getCause() instanceof ExceptionWithDetaills)) {
213+
throw new IllegalStateException(
214+
"Didn't receive correct cause, instead found this:", e.getCause());
215+
}
216+
String details = ((ExceptionWithDetaills) e.getCause()).details;
217+
if (!expectedDetails.equals(details)) {
218+
throw new IllegalStateException(
219+
"Expected: '" + expectedDetails + "', got: '" + details + "'");
220+
}
221+
}
222+
}
223+
224+
private void expectFailure(Runnable runnable) {
225+
try {
226+
runnable.run();
227+
throw new IllegalStateException("expected activity failure");
228+
} catch (ActivityFailureException e) {
229+
// good
230+
}
231+
}
232+
233+
private void expectSuccess(String expected, Promise<String> actual) {
234+
if (!expected.equals(actual.get())) {
235+
throw new IllegalStateException("Expected: '" + expected + "', got: '" + actual + "'");
236+
}
237+
}
238+
}
239+
240+
public static class ExceptionWithDetaills extends RuntimeException {
241+
public String details;
242+
243+
public ExceptionWithDetaills() {}
244+
245+
public ExceptionWithDetaills(String details) {
246+
this.details = details;
247+
}
248+
}
249+
}

0 commit comments

Comments
 (0)