Skip to content

Commit fa60a85

Browse files
authored
Fix WorkflowQueue#offer and increase test coverage (#932)
1 parent 2afe97e commit fa60a85

File tree

3 files changed

+329
-2
lines changed

3 files changed

+329
-2
lines changed

src/main/java/com/uber/cadence/internal/sync/WorkflowQueueImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,10 @@ public void put(E e) throws InterruptedException {
7474

7575
@Override
7676
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
77-
boolean timedOut =
77+
boolean success =
7878
WorkflowThread.await(
7979
unit.toMillis(timeout), "WorkflowQueue.offer", () -> queue.size() < capacity);
80-
if (timedOut) {
80+
if (!success) {
8181
return false;
8282
}
8383
queue.addLast(e);

src/test/java/com/uber/cadence/internal/sync/PromiseTest.java

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.uber.cadence.workflow.CompletablePromise;
2828
import com.uber.cadence.workflow.Promise;
2929
import com.uber.cadence.workflow.Workflow;
30+
import java.time.Duration;
3031
import java.util.ArrayList;
3132
import java.util.IllegalFormatCodePointException;
3233
import java.util.List;
@@ -84,6 +85,122 @@ public void testFailure() throws Throwable {
8485
trace.setExpected(expected);
8586
}
8687

88+
@Test
89+
public void testTimedFailure() throws Throwable {
90+
DeterministicRunner r =
91+
DeterministicRunner.newRunner(
92+
() -> {
93+
CompletablePromise<Boolean> f = Workflow.newPromise();
94+
trace.add("root begin");
95+
WorkflowInternal.newThread(
96+
false, () -> f.completeExceptionally(new IllegalArgumentException("foo")))
97+
.start();
98+
WorkflowInternal.newThread(
99+
false,
100+
() -> {
101+
try {
102+
f.get(10, TimeUnit.DAYS);
103+
trace.add("thread1 get success");
104+
fail("failure expected");
105+
} catch (Exception e) {
106+
assertEquals(IllegalArgumentException.class, e.getClass());
107+
trace.add("thread1 get failure");
108+
}
109+
})
110+
.start();
111+
trace.add("root done");
112+
});
113+
r.runUntilAllBlocked();
114+
String[] expected =
115+
new String[] {
116+
"root begin", "root done", "thread1 get failure",
117+
};
118+
trace.setExpected(expected);
119+
}
120+
121+
@Test
122+
public void testGetFailure() throws Throwable {
123+
DeterministicRunner r =
124+
DeterministicRunner.newRunner(
125+
() -> {
126+
CompletablePromise<Boolean> f = Workflow.newPromise();
127+
trace.add("root begin");
128+
WorkflowInternal.newThread(
129+
false,
130+
() -> {
131+
trace.add("thread1 begin");
132+
assertEquals(IllegalArgumentException.class, f.getFailure().getClass());
133+
trace.add("thread1 done");
134+
})
135+
.start();
136+
WorkflowInternal.newThread(
137+
false,
138+
() -> {
139+
f.completeExceptionally(new IllegalArgumentException("foo"));
140+
trace.add("thread2 done");
141+
})
142+
.start();
143+
144+
trace.add("root done");
145+
});
146+
r.runUntilAllBlocked();
147+
String[] expected =
148+
new String[] {"root begin", "root done", "thread1 begin", "thread2 done", "thread1 done"};
149+
trace.setExpected(expected);
150+
}
151+
152+
@Test
153+
public void testGetFailureWithTimeout() throws Throwable {
154+
DeterministicRunner r =
155+
DeterministicRunner.newRunner(
156+
() -> currentTime,
157+
() -> {
158+
CompletablePromise<Boolean> f = Workflow.newPromise();
159+
trace.add("root begin");
160+
WorkflowInternal.newThread(
161+
false,
162+
() -> {
163+
try {
164+
trace.add("thread1 begin");
165+
f.get(1, TimeUnit.MINUTES);
166+
trace.add("thread1 get success");
167+
fail("failure expected");
168+
} catch (Exception e) {
169+
assertEquals(IllegalArgumentException.class, e.getClass());
170+
trace.add("thread1 get failure");
171+
}
172+
})
173+
.start();
174+
WorkflowInternal.newThread(
175+
false,
176+
() -> {
177+
Workflow.sleep(Duration.ofSeconds(30));
178+
trace.add("thread2 awake");
179+
f.completeExceptionally(new IllegalArgumentException("foo"));
180+
trace.add("thread2 done");
181+
})
182+
.start();
183+
184+
trace.add("root done");
185+
});
186+
r.runUntilAllBlocked();
187+
String[] expected = new String[] {"root begin", "root done", "thread1 begin"};
188+
trace.setExpected(expected);
189+
190+
currentTime += Duration.ofSeconds(31).toMillis();
191+
r.runUntilAllBlocked();
192+
expected =
193+
new String[] {
194+
"root begin",
195+
"root done",
196+
"thread1 begin",
197+
"thread2 awake",
198+
"thread2 done",
199+
"thread1 get failure"
200+
};
201+
trace.setExpected(expected);
202+
}
203+
87204
@Test
88205
public void testGetTimeout() throws Throwable {
89206
ExecutorService threadPool =
@@ -249,6 +366,46 @@ public void testGetDefaultOnFailure() throws Throwable {
249366
threadPool.awaitTermination(1, TimeUnit.MINUTES);
250367
}
251368

369+
@Test
370+
public void testGetDefault_success() throws Throwable {
371+
ExecutorService threadPool =
372+
new ThreadPoolExecutor(1, 1000, 1, TimeUnit.SECONDS, new SynchronousQueue<>());
373+
374+
DeterministicRunner r =
375+
DeterministicRunner.newRunner(
376+
threadPool,
377+
null,
378+
() -> currentTime,
379+
() -> {
380+
CompletablePromise<String> f = Workflow.newPromise();
381+
trace.add("root begin");
382+
WorkflowInternal.newThread(
383+
false,
384+
() -> {
385+
trace.add("thread1 begin");
386+
assertEquals("success", f.get("default"));
387+
trace.add("thread1 get success");
388+
})
389+
.start();
390+
WorkflowInternal.newThread(
391+
false,
392+
() -> {
393+
trace.add("thread2 begin");
394+
f.complete("success");
395+
})
396+
.start();
397+
trace.add("root done");
398+
});
399+
r.runUntilAllBlocked();
400+
String[] expected =
401+
new String[] {
402+
"root begin", "root done", "thread1 begin", "thread2 begin", "thread1 get success",
403+
};
404+
trace.setExpected(expected);
405+
threadPool.shutdown();
406+
threadPool.awaitTermination(1, TimeUnit.MINUTES);
407+
}
408+
252409
@Test
253410
public void testMultiple() throws Throwable {
254411
DeterministicRunner r =

src/test/java/com/uber/cadence/internal/sync/WorkflowInternalQueueTest.java

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@
2020
import static org.junit.Assert.assertFalse;
2121
import static org.junit.Assert.assertTrue;
2222

23+
import com.uber.cadence.workflow.QueueConsumer;
2324
import com.uber.cadence.workflow.Workflow;
2425
import com.uber.cadence.workflow.WorkflowQueue;
26+
import java.util.concurrent.TimeUnit;
27+
import org.junit.Assert;
2528
import org.junit.Before;
2629
import org.junit.Rule;
2730
import org.junit.Test;
@@ -137,4 +140,171 @@ public void testPutBlocking() throws Throwable {
137140
};
138141
trace.setExpected(expected);
139142
}
143+
144+
@Test
145+
public void testPoll() throws Throwable {
146+
DeterministicRunner r =
147+
DeterministicRunner.newRunner(
148+
() -> currentTime,
149+
() -> {
150+
WorkflowQueue<String> f = WorkflowInternal.newQueue(1);
151+
f.offer("foo");
152+
trace.add("root begin");
153+
WorkflowInternal.newThread(
154+
false,
155+
() -> {
156+
try {
157+
trace.add("thread1 begin");
158+
Assert.assertEquals("foo", f.poll(1, TimeUnit.SECONDS));
159+
trace.add("thread1 foo");
160+
Assert.assertNull(f.poll(1, TimeUnit.SECONDS));
161+
trace.add("thread1 done");
162+
} catch (InterruptedException e) {
163+
throw new RuntimeException(e);
164+
}
165+
})
166+
.start();
167+
168+
trace.add("root done");
169+
});
170+
r.runUntilAllBlocked();
171+
String[] expected = new String[] {"root begin", "root done", "thread1 begin", "thread1 foo"};
172+
trace.setExpected(expected);
173+
174+
currentTime += 1000;
175+
r.runUntilAllBlocked();
176+
expected =
177+
new String[] {"root begin", "root done", "thread1 begin", "thread1 foo", "thread1 done"};
178+
trace.setExpected(expected);
179+
}
180+
181+
@Test
182+
public void testOffer() throws Throwable {
183+
DeterministicRunner r =
184+
DeterministicRunner.newRunner(
185+
() -> currentTime,
186+
() -> {
187+
WorkflowQueue<String> f = WorkflowInternal.newQueue(1);
188+
trace.add("root begin");
189+
WorkflowInternal.newThread(
190+
false,
191+
() -> {
192+
Assert.assertTrue(f.offer("foo"));
193+
Assert.assertFalse(f.offer("bar"));
194+
trace.add("thread1 done");
195+
})
196+
.start();
197+
198+
trace.add("root done");
199+
});
200+
r.runUntilAllBlocked();
201+
String[] expected = new String[] {"root begin", "root done", "thread1 done"};
202+
trace.setExpected(expected);
203+
}
204+
205+
@Test
206+
public void testOfferTimed() throws Throwable {
207+
DeterministicRunner r =
208+
DeterministicRunner.newRunner(
209+
() -> currentTime,
210+
() -> {
211+
WorkflowQueue<String> f = WorkflowInternal.newQueue(1);
212+
trace.add("root begin");
213+
WorkflowInternal.newThread(
214+
false,
215+
() -> {
216+
try {
217+
trace.add("thread1 begin");
218+
Assert.assertTrue(f.offer("foo", 1, TimeUnit.SECONDS));
219+
trace.add("thread1 foo");
220+
Assert.assertFalse(f.offer("bar", 1, TimeUnit.SECONDS));
221+
trace.add("thread1 done");
222+
} catch (InterruptedException e) {
223+
throw new RuntimeException(e);
224+
}
225+
})
226+
.start();
227+
228+
trace.add("root done");
229+
});
230+
r.runUntilAllBlocked();
231+
String[] expected = new String[] {"root begin", "root done", "thread1 begin", "thread1 foo"};
232+
trace.setExpected(expected);
233+
234+
currentTime += 1000;
235+
r.runUntilAllBlocked();
236+
expected =
237+
new String[] {"root begin", "root done", "thread1 begin", "thread1 foo", "thread1 done"};
238+
trace.setExpected(expected);
239+
}
240+
241+
@Test
242+
public void testMappedTake() throws Throwable {
243+
DeterministicRunner r =
244+
DeterministicRunner.newRunner(
245+
() -> currentTime,
246+
() -> {
247+
WorkflowQueue<Boolean> f = WorkflowInternal.newQueue(1);
248+
f.offer(true);
249+
trace.add("root begin");
250+
WorkflowInternal.newThread(
251+
false,
252+
() -> {
253+
try {
254+
QueueConsumer<String> mappedQueue = f.map(x -> x ? "yes" : "no");
255+
trace.add("thread1 begin");
256+
Assert.assertEquals("yes", mappedQueue.take());
257+
trace.add("thread1 done");
258+
} catch (InterruptedException e) {
259+
throw new RuntimeException(e);
260+
}
261+
})
262+
.start();
263+
264+
trace.add("root done");
265+
});
266+
r.runUntilAllBlocked();
267+
String[] expected = new String[] {"root begin", "root done", "thread1 begin", "thread1 done"};
268+
trace.setExpected(expected);
269+
}
270+
271+
@Test
272+
public void testMappedPoll() throws Throwable {
273+
DeterministicRunner r =
274+
DeterministicRunner.newRunner(
275+
() -> currentTime,
276+
() -> {
277+
WorkflowQueue<Boolean> f = WorkflowInternal.newQueue(1);
278+
f.offer(true);
279+
trace.add("root begin");
280+
WorkflowInternal.newThread(
281+
false,
282+
() -> {
283+
try {
284+
QueueConsumer<String> mappedQueue =
285+
f.map(x -> x ? "yes" : "no").map(x -> x);
286+
trace.add("thread1 begin");
287+
Assert.assertEquals("yes", mappedQueue.poll(1, TimeUnit.SECONDS));
288+
trace.add("thread1 yes");
289+
Assert.assertNull(mappedQueue.poll(1, TimeUnit.SECONDS));
290+
291+
trace.add("thread1 done");
292+
} catch (InterruptedException e) {
293+
throw new RuntimeException(e);
294+
}
295+
})
296+
.start();
297+
298+
trace.add("root done");
299+
});
300+
r.runUntilAllBlocked();
301+
String[] expected = new String[] {"root begin", "root done", "thread1 begin", "thread1 yes"};
302+
trace.setExpected(expected);
303+
304+
currentTime += 1000;
305+
r.runUntilAllBlocked();
306+
expected =
307+
new String[] {"root begin", "root done", "thread1 begin", "thread1 yes", "thread1 done"};
308+
trace.setExpected(expected);
309+
}
140310
}

0 commit comments

Comments
 (0)