Skip to content

Commit 3d0a676

Browse files
committed
Efficient implementation for exceptionallyComposeAsync - symmetrical with thenComposeAsync
1 parent 2202dcd commit 3d0a676

File tree

1 file changed

+50
-18
lines changed

1 file changed

+50
-18
lines changed

src/main/java/net/tascalate/concurrent/AbstractCompletableTask.java

Lines changed: 50 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -281,14 +281,14 @@ public <U> Promise<U> thenComposeAsync(Function<? super T, ? extends CompletionS
281281
// Synchronous, while transition to tempStage is asynchronous already
282282
returned.whenComplete(biConsumer(onResult, onError));
283283
}
284-
} catch (Throwable ex) {
284+
} catch (Throwable e) {
285285
// must-have if fn.apply above failed
286286
nextStage.resetCancellableOrigins((CompletionStage<U>)null);
287287
// no need to check nextStage.isCancelled()
288288
// while there are no origins to cancel
289289
// propagate error immediately
290290
// synchronous, while transition to tempStage is asynchronous already
291-
onError.accept(ex);
291+
onError.accept(e);
292292
}
293293
}),
294294
consumerAsFunction(onError),
@@ -298,30 +298,58 @@ public <U> Promise<U> thenComposeAsync(Function<? super T, ? extends CompletionS
298298
return nextStage;
299299
}
300300

301+
@Override
302+
public Promise<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor) {
303+
// Symmetrical with thenApplyAsync
304+
AbstractCompletableTask<T> nextStage = internalCreateCompletionStage(executor);
305+
addCallbacks(nextStage, Function.identity(), fn, executor);
306+
return nextStage;
307+
}
308+
301309
// Provide better impl. rather than several nested stages by default
302-
/*
303310
@Override
304311
public Promise<T> exceptionallyComposeAsync(Function<Throwable, ? extends CompletionStage<T>> fn, Executor executor) {
305-
return null;
312+
// Symmetrical with thenComposeAsync
313+
// See comments for thenComposeAsync -- all are valid here, this is just a different path in Either (left vs right)
314+
AbstractCompletableTask<Void> tempStage = internalCreateCompletionStage(executor);
315+
AbstractCompletableTask<T> nextStage = internalCreateCompletionStage(executor);
316+
317+
nextStage.resetCancellableOrigins(tempStage);
318+
319+
Consumer<? super T> onResult = nextStage.runTransition(Function.identity());
320+
Consumer<? super Throwable> onError = nextStage.runTransition(AbstractCompletableTask::forwardException);
321+
322+
addCallbacks(
323+
tempStage,
324+
consumerAsFunction(onResult),
325+
consumerAsFunction(error -> {
326+
try {
327+
CompletionStage<T> returned = fn.apply(error);
328+
nextStage.resetCancellableOrigins(returned);
329+
if (nextStage.isCancelled()) {
330+
nextStage.cancelOrigins(true);
331+
} else {
332+
returned.whenComplete(biConsumer(onResult, onError));
333+
}
334+
} catch (Throwable e) {
335+
nextStage.resetCancellableOrigins((CompletionStage<T>)null);
336+
// In JDK 12 CompletionStage.composeExceptionally[Async] uses *.handle[Async]
337+
// So overwrite returned error with the latest one - as in handle()
338+
e.addSuppressed(error);
339+
onError.accept(e);
340+
}
341+
}),
342+
executor
343+
);
344+
345+
return nextStage;
306346
}
307-
*/
308347

309-
// Default opertaion in Promise is ok
348+
// Default operation in Promise is ok - it just delegates to thenCompose[Async]
310349
/*
311350
@Override
312351
public Promise<T> thenFilterAsync(Predicate<? super T> predicate, Function<? super T, Throwable> errorSupplier, Executor executor) {
313352
*/
314-
315-
private <U> Consumer<? super U> runTransition(Function<? super U, ? extends T> converter) {
316-
return u -> fireTransition(() -> converter.apply(u));
317-
}
318-
319-
@Override
320-
public Promise<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor) {
321-
AbstractCompletableTask<T> nextStage = internalCreateCompletionStage(executor);
322-
addCallbacks(nextStage, Function.identity(), fn, executor);
323-
return nextStage;
324-
}
325353

326354
@Override
327355
public Promise<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) {
@@ -426,7 +454,7 @@ private <R, U> Promise<U> doApplyToEitherAsync(CompletionStage<? extends R> firs
426454
// Next stage is not exposed to the client, so we can
427455
// short-circuit its initiation - just fire callbacks
428456
// without task execution (unlike as in other methods,
429-
// event in thenComposeAsync with its ad-hoc execution)
457+
// even in thenComposeAsync with its ad-hoc execution)
430458

431459
// In certain sense, nextStage here is bogus: neither
432460
// of Future-defined methods are functional.
@@ -450,6 +478,10 @@ private <U> AbstractCompletableTask<U> internalCreateCompletionStage(Executor ex
450478
// But don't let SAME_THREAD_EXECUTOR to be a default async executor
451479
return createCompletionStage(executor == SAME_THREAD_EXECUTOR ? getDefaultExecutor() : executor);
452480
}
481+
482+
private <U> Consumer<? super U> runTransition(Function<? super U, ? extends T> converter) {
483+
return u -> fireTransition(() -> converter.apply(u));
484+
}
453485

454486
private static <V, R> Function<V, R> consumerAsFunction(Consumer<? super V> action) {
455487
return result -> {

0 commit comments

Comments
 (0)