Skip to content

Commit 9746755

Browse files
authored
Fix flakiness in ManualActivityCompletionWorkflowTest (#937)
The operations here are flaky because we're starting an activity and then attempting to cancel/complete/fail it in parallel. We need the second activity to block until the first one has started, and there's no way to orchestrate that within the workflow. The best we can do is make the acitivites block until the ActivityTask is available.
1 parent d4b2d0b commit 9746755

File tree

1 file changed

+38
-13
lines changed

1 file changed

+38
-13
lines changed

src/test/java/com/uber/cadence/workflow/ManualActivityCompletionWorkflowTest.java

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package com.uber.cadence.workflow;
1919

20-
import com.google.common.base.Preconditions;
2120
import com.uber.cadence.WorkflowExecution;
2221
import com.uber.cadence.activity.Activity;
2322
import com.uber.cadence.activity.ActivityMethod;
@@ -48,6 +47,9 @@ public interface ManualCompletionActivities {
4847
@ActivityMethod
4948
String asyncActivity();
5049

50+
@ActivityMethod
51+
void reset();
52+
5153
@ActivityMethod
5254
void completeAsyncActivity(String result);
5355

@@ -73,48 +75,61 @@ private class ManualCompletionActivitiesImpl implements ManualCompletionActiviti
7375
@Override
7476
public synchronized String asyncActivity() {
7577
openTask = Activity.getTask();
78+
notifyAll();
7679

7780
Activity.doNotCompleteOnReturn();
7881
return null;
7982
}
8083

84+
@Override
85+
public synchronized void reset() {
86+
openTask = null;
87+
}
88+
8189
@Override
8290
public synchronized void completeAsyncActivity(String details) {
83-
Preconditions.checkState(openTask != null);
84-
getClient().complete(openTask.getTaskToken(), details);
91+
getClient().complete(getOpenTask().getTaskToken(), details);
8592
}
8693

8794
@Override
8895
public synchronized void completeAsyncActivityById(String details) {
89-
Preconditions.checkState(openTask != null);
90-
getClient().complete(getCurrentWorkflow(), openTask.getActivityId(), details);
96+
getClient().complete(getCurrentWorkflow(), getOpenTask().getActivityId(), details);
9197
}
9298

9399
@Override
94100
public synchronized void failAsyncActivity(String details) {
95-
Preconditions.checkState(openTask != null);
96101
getClient()
97-
.completeExceptionally(openTask.getTaskToken(), new ExceptionWithDetaills(details));
102+
.completeExceptionally(getOpenTask().getTaskToken(), new ExceptionWithDetaills(details));
98103
}
99104

100105
@Override
101106
public synchronized void failAsyncActivityById(String details) {
102-
Preconditions.checkState(openTask != null);
103107
getClient()
104108
.completeExceptionally(
105-
getCurrentWorkflow(), openTask.getActivityId(), new ExceptionWithDetaills(details));
109+
getCurrentWorkflow(),
110+
getOpenTask().getActivityId(),
111+
new ExceptionWithDetaills(details));
106112
}
107113

108114
@Override
109115
public synchronized void cancelAsyncActivity(String details) {
110-
Preconditions.checkState(openTask != null);
111-
getClient().reportCancellation(openTask.getTaskToken(), details);
116+
getClient().reportCancellation(getOpenTask().getTaskToken(), details);
112117
}
113118

114119
@Override
115120
public synchronized void cancelAsyncActivityById(String details) {
116-
Preconditions.checkState(openTask != null);
117-
getClient().reportCancellation(getCurrentWorkflow(), openTask.getActivityId(), details);
121+
getClient().reportCancellation(getCurrentWorkflow(), getOpenTask().getActivityId(), details);
122+
}
123+
124+
private synchronized ActivityTask getOpenTask() {
125+
while (openTask == null) {
126+
try {
127+
wait();
128+
} catch (InterruptedException e) {
129+
throw new RuntimeException(e);
130+
}
131+
}
132+
return openTask;
118133
}
119134

120135
private WorkflowExecution getCurrentWorkflow() {
@@ -146,21 +161,29 @@ public void run() {
146161
expectSuccess("1", result);
147162
expectFailure(() -> activities.completeAsyncActivity("again"));
148163

164+
activities.reset();
165+
149166
result = Async.function(activities::asyncActivity);
150167
activities.completeAsyncActivityById("2");
151168
expectSuccess("2", result);
152169
expectFailure(() -> activities.completeAsyncActivityById("again"));
153170

171+
activities.reset();
172+
154173
result = Async.function(activities::asyncActivity);
155174
activities.failAsyncActivity("3");
156175
expectFailureWithDetails(result, "3");
157176
expectFailure(() -> activities.failAsyncActivity("again"));
158177

178+
activities.reset();
179+
159180
result = Async.function(activities::asyncActivity);
160181
activities.failAsyncActivityById("4");
161182
expectFailureWithDetails(result, "4");
162183
expectFailure(() -> activities.failAsyncActivityById("again"));
163184

185+
activities.reset();
186+
164187
// Need to request cancellation, then the activity can respond with the cancel
165188
CompletablePromise<String> completablePromise = Workflow.newPromise();
166189
CancellationScope scope =
@@ -178,6 +201,8 @@ public void run() {
178201
activities.cancelAsyncActivity("5");
179202
expectCancelled(result);
180203

204+
activities.reset();
205+
181206
// Need to request cancellation, then the activity can respond with the cancel
182207
CompletablePromise<String> completablePromise2 = Workflow.newPromise();
183208
scope =

0 commit comments

Comments
 (0)