Skip to content

Commit 524d7e8

Browse files
committed
Handle exceptions in AbstractCompletableTask as in CompletableFuture (wrapping & propagation); Optimizing Promises to use only Promise.dependent() instead of ad-hoc decorators.
1 parent 7f417d1 commit 524d7e8

File tree

2 files changed

+72
-71
lines changed

2 files changed

+72
-71
lines changed

src/main/java/net/tascalate/concurrent/AbstractCompletableTask.java

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -296,15 +296,27 @@ public Promise<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> act
296296
addCallbacks(
297297
nextStage,
298298
result -> {
299-
action.accept(result, null);
300-
return result;
299+
try {
300+
action.accept(result, null);
301+
return result;
302+
} catch (Throwable e) {
303+
// CompletableFuture wraps exception here
304+
// Copying this behavior
305+
return forwardException(e);
306+
}
301307
},
308+
// exceptions are handled in regular way
302309
failure -> {
303310
try {
304311
action.accept(null, failure);
305312
return forwardException(failure);
306313
} catch (Throwable e) {
307-
return forwardException(e);
314+
// CompletableFuture does not override exception here
315+
// unlike as in handle[Async](BiFunction)
316+
// Preserve this behavior, but let us add at least
317+
// suppressed exception
318+
failure.addSuppressed(e);
319+
return forwardException(failure);
308320
}
309321
},
310322
executor
@@ -317,9 +329,27 @@ public <U> Promise<U> handleAsync(BiFunction<? super T, Throwable, ? extends U>
317329
AbstractCompletableTask<U> nextStage = internalCreateCompletionStage(executor);
318330
addCallbacks(
319331
nextStage,
320-
result -> fn.apply(result, null),
321-
// exceptions are treated as success
322-
error -> fn.apply(null, error), executor
332+
result -> {
333+
try {
334+
return fn.apply(result, null);
335+
} catch (Throwable e) {
336+
// CompletableFuture wraps exception here
337+
// Copying this behavior
338+
return forwardException(e);
339+
}
340+
},
341+
// exceptions are handled in regular way
342+
failure -> {
343+
try {
344+
return fn.apply(null, failure);
345+
} catch (Throwable e) {
346+
// CompletableFuture handle[Async](BiFunction)
347+
// allows to overwrite exception for resulting stage.
348+
// Copying this behavior
349+
return forwardException(e);
350+
}
351+
},
352+
executor
323353
);
324354
return nextStage;
325355
}
@@ -371,7 +401,7 @@ private <R, U> Promise<U> doApplyToEitherAsync(CompletionStage<? extends R> firs
371401
if (failure == null) {
372402
nextStage.onSuccess(result);
373403
} else {
374-
nextStage.onError(Promises.wrapException(failure));
404+
nextStage.onError(forwardException(failure));
375405
}
376406
};
377407
// only the first result is accepted by completion stage,

src/main/java/net/tascalate/concurrent/Promises.java

Lines changed: 35 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@
2727
import java.util.concurrent.CompletionStage;
2828
import java.util.concurrent.Executor;
2929
import java.util.concurrent.atomic.AtomicReference;
30-
import java.util.function.BiConsumer;
3130
import java.util.function.Consumer;
32-
import java.util.function.Function;
3331
import java.util.function.Supplier;
3432

3533
/**
@@ -54,9 +52,9 @@ private Promises() {}
5452
* a successfully resolved {@link Promise} with a value provided
5553
*/
5654
public static <T> Promise<T> success(T value) {
57-
CompletablePromise<T> result = new CompletablePromise<>();
58-
result.onSuccess(value);
59-
return result;
55+
return new CompletablePromise<>(
56+
CompletableFuture.completedFuture(value)
57+
);
6058
}
6159

6260
/**
@@ -69,9 +67,9 @@ public static <T> Promise<T> success(T value) {
6967
* a faulty resolved {@link Promise} with an exception provided
7068
*/
7169
public static <T> Promise<T> failure(Throwable exception) {
72-
CompletablePromise<T> result = new CompletablePromise<>();
73-
result.onFailure(exception);
74-
return result;
70+
CompletableFuture<T> delegate = new CompletableFuture<>();
71+
delegate.completeExceptionally(exception);
72+
return new CompletablePromise<>(delegate);
7573
}
7674

7775
/**
@@ -92,35 +90,7 @@ public static <T> Promise<T> from(CompletionStage<T> stage) {
9290
return new CompletablePromise<>((CompletableFuture<T>)stage);
9391
}
9492

95-
CompletablePromise<T> result = createLinkedPromise(stage);
96-
stage.whenComplete(handler(result::onSuccess, result::onFailure));
97-
return result;
98-
}
99-
100-
static <T, R> Promise<R> from(CompletionStage<? extends T> stage,
101-
Function<? super T, ? extends R> resultConverter,
102-
Function<? super Throwable, ? extends Throwable> errorConverter) {
103-
104-
CompletablePromise<R> result = createLinkedPromise(stage);
105-
stage.whenComplete(handler(
106-
acceptConverted(result::onSuccess, resultConverter),
107-
acceptConverted(result::onFailure, errorConverter)
108-
));
109-
return result;
110-
}
111-
112-
private static <T, R> CompletablePromise<R> createLinkedPromise(CompletionStage<? extends T> stage) {
113-
return new CompletablePromise<R>() {
114-
@Override
115-
public boolean cancel(boolean mayInterruptIfRunning) {
116-
if (super.cancel(mayInterruptIfRunning)) {
117-
cancelPromise(stage, mayInterruptIfRunning);
118-
return true;
119-
} else {
120-
return false;
121-
}
122-
}
123-
};
93+
return COMPLETED_DEPENDENT_PROMISE.thenCombine(stage, (u, v) -> v, PromiseOrigin.PARAM_ONLY).raw();
12494
}
12595

12696
/**
@@ -425,7 +395,10 @@ public static <T> Promise<List<T>> atLeast(int minResultsCount, int maxErrorsCou
425395
} else if (minResultsCount == 0) {
426396
return success(Collections.emptyList());
427397
} else if (size == 1) {
428-
return from(promises.get(0), Collections::singletonList, Function.<Throwable> identity());
398+
return from(promises.get(0))
399+
.dependent()
400+
.thenApply((T r) -> Collections.singletonList(r), true)
401+
.raw();
429402
} else {
430403
return new AggregatingPromise<>(minResultsCount, maxErrorsCount, cancelRemaining, promises);
431404
}
@@ -541,6 +514,14 @@ private static <T> void pollOnce(Callable<Optional<? extends T>> codeBlock,
541514
resultPromise.onFailure(ctx.asFailure());
542515
}
543516
}
517+
518+
public static Throwable unwrapException(Throwable ex) {
519+
Throwable nested = ex;
520+
while (nested instanceof CompletionException) {
521+
nested = nested.getCause();
522+
}
523+
return null == nested ? ex : nested;
524+
}
544525

545526
static CompletionException wrapException(Throwable e) {
546527
if (e instanceof CompletionException) {
@@ -551,34 +532,23 @@ static CompletionException wrapException(Throwable e) {
551532
}
552533

553534
private static <T> Promise<T> unwrap(CompletionStage<List<T>> original, boolean unwrapException) {
554-
return from(
555-
original,
556-
c -> c.stream().filter(Objects::nonNull).findFirst().get(),
557-
unwrapException ?
558-
ex -> ex instanceof MultitargetException ? ((MultitargetException)ex).getFirstException().get() : ex
559-
:
560-
Function.identity()
561-
);
562-
}
563-
564-
private static <T> BiConsumer<T, ? super Throwable> handler(Consumer<? super T> onResult, Consumer<? super Throwable> onError) {
565-
return (r, e) -> {
566-
if (null != e) {
567-
onError.accept(e);
568-
} else {
569-
try {
570-
onResult.accept(r);
571-
} catch (Exception ex) {
572-
onError.accept(ex);
535+
return from(original)
536+
.dependent()
537+
.handle((r, e) -> {
538+
if (null != e) {
539+
if (unwrapException) {
540+
Throwable targetException = unwrapException(e);
541+
if (targetException instanceof MultitargetException) {
542+
throw wrapException( ((MultitargetException)targetException).getFirstException().get() );
543+
}
544+
}
545+
throw wrapException(e);
546+
} else {
547+
return r.stream().filter(Objects::nonNull).findFirst().get();
573548
}
574-
}
575-
};
549+
}, true)
550+
.raw();
576551
}
577-
578-
private static <T, U> Consumer<? super T> acceptConverted(Consumer<? super U> target, Function<? super T, ? extends U> converter) {
579-
return t -> target.accept(converter.apply(t));
580-
}
581-
582552

583553
private static class ObjectRef<T> {
584554
private final T reference;
@@ -593,4 +563,5 @@ T dereference() {
593563
}
594564

595565
private static final Object IGNORE = new Object();
566+
private static final DependentPromise<Object> COMPLETED_DEPENDENT_PROMISE = success(null).dependent();
596567
}

0 commit comments

Comments
 (0)