Skip to content

Commit eb4b35c

Browse files
committed
Fix issue #6 -- AbstractCompletableTask.thenComposeAsync does not propagate exception
1 parent 0a44a9a commit eb4b35c

File tree

2 files changed

+57
-29
lines changed

2 files changed

+57
-29
lines changed

src/main/java/net/tascalate/concurrent/AbstractCompletableTask.java

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -248,40 +248,49 @@ public <U> Promise<U> thenComposeAsync(Function<? super T, ? extends CompletionS
248248
addCallbacks(
249249
tempStage,
250250
consumerAsFunction(r -> {
251-
CompletionStage<U> returned = fn.apply(r);
252-
// tempStage is completed successfully, so no sense
253-
// to include it in cancellableOrigins
254-
// However, nextStage is in progress
255-
// IMPORTANT: it COULD be shared, but typically is not
256-
// So in very rare case some nasty behavior MAY exist
257-
// if others depends on it
258-
259-
// TEST: There is a race when fn.apply(r) is completed
260-
// normally and nextStage is cancelled before returned is set
261-
// as its nextStage's cancellableOrigins. In this case,
262-
// execution of returned continues as nextStage cannot be
263-
// cancelled for a second time. However, completion stages after
264-
// nextStage are completed exceptionally (correctly) since when
265-
// moveToNextStage is executed nextStage is already completed
266-
// (cancelled) from cancel(...) -> onError(...). In order to
267-
// cancel returned here, I think you need to know whether
268-
// nextStage might have been interrupted.
269-
// try {
270-
// Thread.sleep(100);
271-
//} catch (InterruptedException ex) {
272-
//}
273-
274-
nextStage.resetCancellableOrigins(returned);
275-
if (nextStage.isCancelled()) {
276-
nextStage.cancelOrigins(true);
277-
} else {
278-
returned.whenComplete(moveToNextStage);
251+
try {
252+
CompletionStage<U> returned = fn.apply(r);
253+
// tempStage is completed successfully, so no sense
254+
// to include it in cancellableOrigins
255+
// However, nextStage is in progress
256+
// IMPORTANT: it COULD be shared, but typically is not
257+
// So in very rare case some nasty behavior MAY exist
258+
// if others depends on it
259+
260+
// TEST: There is a race when fn.apply(r) is completed
261+
// normally and nextStage is cancelled before returned is set
262+
// as its nextStage's cancellableOrigins. In this case,
263+
// execution of returned continues as nextStage cannot be
264+
// cancelled for a second time. However, completion stages after
265+
// nextStage are completed exceptionally (correctly) since when
266+
// moveToNextStage is executed nextStage is already completed
267+
// (cancelled) from cancel(...) -> onError(...). In order to
268+
// cancel returned here, I think you need to know whether
269+
// nextStage might have been interrupted.
270+
// try {
271+
// Thread.sleep(100);
272+
//} catch (InterruptedException ex) {
273+
//}
274+
275+
nextStage.resetCancellableOrigins(returned);
276+
if (nextStage.isCancelled()) {
277+
nextStage.cancelOrigins(true);
278+
} else {
279+
returned.whenComplete(moveToNextStage);
280+
}
281+
} catch (Throwable ex) {
282+
// must-have if fn.apply above failed
283+
nextStage.resetCancellableOrigins((CompletionStage<U>)null);
284+
// no need to check nextStage.isCancelled()
285+
// while there are no origins to cancel
286+
// propagate error immediately
287+
moveToNextStage.accept(null, ex);
279288
}
280289
}),
281290
e -> {
282291
moveToNextStage.accept(null, e);
283292
return null;
284-
}, /* must-have if fn.apply above failed */
293+
},
285294
executor
286295
);
287296

src/test/java/net/tascalate/concurrent/ThenComposeAsyncTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package net.tascalate.concurrent;
1717

18+
import static org.junit.Assert.fail;
19+
1820
import java.util.concurrent.atomic.AtomicBoolean;
1921
import org.junit.After;
2022
import org.junit.Assert;
@@ -76,6 +78,23 @@ public void testThenComposeAsyncRace() {
7678
Assert.assertTrue("Expected cancelled true, but is " + cancelled.get(), cancelled.get());
7779
}
7880

81+
82+
@Test
83+
public void testThenComposeException() {
84+
Promise<Void> p = CompletableTask.runAsync(() -> trySleep(10), executor)
85+
.thenCompose(it -> {
86+
throw new IllegalStateException("oh no!");
87+
});
88+
try {
89+
p.get();
90+
} catch (Exception ex) {
91+
// expected
92+
return;
93+
}
94+
fail("Exception must be thrown");
95+
}
96+
97+
7998
private void trySleep(long millis) {
8099
try {
81100
Thread.sleep(millis);

0 commit comments

Comments
 (0)