Skip to content

Commit 896a197

Browse files
committed
Remove duplicated code via delegating Promise code to DependentPromise (); avoid async call in exception handling when no exception occured
1 parent e440726 commit 896a197

File tree

9 files changed

+122
-105
lines changed

9 files changed

+122
-105
lines changed

src/main/java/net/tascalate/concurrent/ConfigurableDependentPromise.java

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static net.tascalate.concurrent.LinkedCompletion.FutureCompletion;
1919
import static net.tascalate.concurrent.SharedFunctions.cancelPromise;
2020
import static net.tascalate.concurrent.SharedFunctions.selectFirst;
21+
import static net.tascalate.concurrent.SharedFunctions.updateReference;
2122

2223
import java.time.Duration;
2324
import java.util.Arrays;
@@ -30,6 +31,7 @@
3031
import java.util.concurrent.Executor;
3132
import java.util.concurrent.TimeUnit;
3233
import java.util.concurrent.TimeoutException;
34+
import java.util.concurrent.atomic.AtomicReference;
3335
import java.util.function.BiConsumer;
3436
import java.util.function.BiFunction;
3537
import java.util.function.Consumer;
@@ -126,6 +128,20 @@ private static <U> DependentPromise<U> doWrap(Promise<U> original,
126128
return result;
127129
}
128130

131+
@Override
132+
public DependentPromise<T> onCancel(Runnable code) {
133+
return new ConfigurableDependentPromise<T>(this, defaultEnlistOptions, null) {
134+
@Override
135+
public boolean cancel(boolean mayInterruptIfRunning) {
136+
if (super.cancel(mayInterruptIfRunning)) {
137+
code.run();
138+
return true;
139+
} else {
140+
return false;
141+
}
142+
}
143+
};
144+
}
129145

130146
// All delay overloads delegate to these methods
131147
@Override
@@ -376,32 +392,54 @@ public DependentPromise<T> exceptionally(Function<Throwable, ? extends T> fn, bo
376392

377393
@Override
378394
public DependentPromise<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, boolean enlistOrigin) {
379-
return handleAsync(SharedFunctions.exceptionallyApply(fn), enlistOrigin);
395+
AtomicReference<Promise<T>> onException = new AtomicReference<>(this);
396+
return
397+
this.handle((r, ex) -> ex == null ?
398+
this :
399+
updateReference(handleAsync((r1, ex1) -> fn.apply(ex1)), onException), enlistOrigin)
400+
.thenCompose(Function.identity(), true)
401+
.onCancel(() -> onException.get().cancel(true));
380402
}
381403

382404
@Override
383405
public DependentPromise<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor, boolean enlistOrigin) {
384-
return handleAsync(SharedFunctions.exceptionallyApply(fn), executor, enlistOrigin);
406+
AtomicReference<Promise<T>> onException = new AtomicReference<>(this);
407+
return
408+
this.handle((r, ex) -> ex == null ?
409+
this :
410+
updateReference(handleAsync((r1, ex1) -> fn.apply(ex1), executor), onException), enlistOrigin)
411+
.thenCompose(Function.identity(), true)
412+
.onCancel(() -> onException.get().cancel(true));
385413
}
386414

387415
@Override
388416
public DependentPromise<T> exceptionallyCompose(Function<Throwable, ? extends CompletionStage<T>> fn, boolean enlistOrigin) {
389-
return this.handle(SharedFunctions.exceptionallyCompose(fn), enlistOrigin)
417+
return this.handle((r, ex) -> ex == null ? this : fn.apply(ex), enlistOrigin)
390418
.thenCompose(Function.identity(), true);
391419
}
392420

393421
@Override
394422
public DependentPromise<T> exceptionallyComposeAsync(Function<Throwable, ? extends CompletionStage<T>> fn, boolean enlistOrigin) {
395-
return this.handleAsync(SharedFunctions.exceptionallyCompose(fn), enlistOrigin)
396-
.thenCompose(Function.identity(), true);
423+
AtomicReference<Promise<T>> onException = new AtomicReference<>(this);
424+
return this.handle((r, ex) -> ex == null ?
425+
this :
426+
updateReference(this.handleAsync((r1, ex1) -> fn.apply(ex1), enlistOrigin)
427+
.thenCompose(Function.identity(), true), onException))
428+
.thenCompose(Function.identity(), true)
429+
.onCancel(() -> onException.get().cancel(true));
397430
}
398431

399432
@Override
400433
public DependentPromise<T> exceptionallyComposeAsync(Function<Throwable, ? extends CompletionStage<T>> fn,
401434
Executor executor,
402435
boolean enlistOrigin) {
403-
return this.handleAsync(SharedFunctions.exceptionallyCompose(fn), executor, enlistOrigin)
404-
.thenCompose(Function.identity(), true);
436+
AtomicReference<Promise<T>> onException = new AtomicReference<>(this);
437+
return this.handle((r, ex) -> ex == null ?
438+
this :
439+
updateReference(this.handleAsync((r1, ex1) -> fn.apply(ex1), executor, enlistOrigin)
440+
.thenCompose(Function.identity(), true), onException))
441+
.thenCompose(Function.identity(), true)
442+
.onCancel(() -> onException.get().cancel(true));
405443
}
406444

407445
public DependentPromise<T> whenComplete(BiConsumer<? super T, ? super Throwable> action, boolean enlistOrigin) {
@@ -792,8 +830,8 @@ private boolean defaultEnlistOrigin() {
792830
static void cancelPromises(CompletionStage<?>[] promises, boolean mayInterruptIfRunning) {
793831
if (null != promises) {
794832
Arrays.stream(promises)
795-
.filter(p -> p != null)
796-
.forEach(p -> cancelPromise(p, mayInterruptIfRunning));
833+
.filter(p -> p != null)
834+
.forEach(p -> cancelPromise(p, mayInterruptIfRunning));
797835
}
798836
}
799837

src/main/java/net/tascalate/concurrent/DependentPromise.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ public static <U> DependentPromise<U> from(Promise<U> source, Set<PromiseOrigin>
7777
return ConfigurableDependentPromise.from(source, defaultEnlistOptions);
7878
}
7979

80+
@Override
81+
DependentPromise<T> onCancel(Runnable code);
82+
8083
@Override
8184
default DependentPromise<T> defaultAsyncOn(Executor executor) {
8285
return new ExecutorBoundDependentPromise<>(this, executor);

src/main/java/net/tascalate/concurrent/Promise.java

Lines changed: 13 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,12 @@
1515
*/
1616
package net.tascalate.concurrent;
1717

18-
import static net.tascalate.concurrent.SharedFunctions.selectFirst;
1918
import static net.tascalate.concurrent.SharedFunctions.unwrapExecutionException;
2019
import static net.tascalate.concurrent.SharedFunctions.wrapCompletionException;
2120

2221
import java.time.Duration;
2322
import java.util.Set;
2423
import java.util.concurrent.CancellationException;
25-
import java.util.concurrent.CompletableFuture;
2624
import java.util.concurrent.CompletionException;
2725
import java.util.concurrent.CompletionStage;
2826
import java.util.concurrent.ExecutionException;
@@ -77,6 +75,10 @@ default T join() throws CancellationException, CompletionException {
7775
throw wrapCompletionException(unwrapExecutionException(ex));
7876
}
7977
}
78+
79+
default Promise<T> onCancel(Runnable code) {
80+
return dependent().onCancel(code).raw();
81+
}
8082

8183
default Promise<T> delay(long timeout, TimeUnit unit) {
8284
return delay(timeout, unit, true);
@@ -91,23 +93,7 @@ default Promise<T> delay(Duration duration) {
9193
}
9294

9395
default Promise<T> delay(Duration duration, boolean delayOnError) {
94-
if (!delayOnError) {
95-
// Fast route
96-
return thenCompose(v ->
97-
this.dependent()
98-
.thenCombineAsync(Timeouts.delay(duration), selectFirst(), PromiseOrigin.PARAM_ONLY)
99-
.raw()
100-
);
101-
}
102-
CompletableFuture<Try<? super T>> delayed = new CompletableFuture<>();
103-
whenComplete(Timeouts.configureDelay(this, delayed, duration, delayOnError));
104-
// Use *Async to execute on default "this" executor
105-
return
106-
this.dependent()
107-
.thenApply(Try::success, false)
108-
.exceptionally(Try::failure, true)
109-
.thenCombineAsync(delayed, (u, v) -> u.done(), PromiseOrigin.ALL)
110-
.raw();
96+
return dependent().delay(duration, delayOnError).raw();
11197
}
11298

11399
default Promise<T> orTimeout(long timeout, TimeUnit unit) {
@@ -123,16 +109,7 @@ default Promise<T> orTimeout(Duration duration) {
123109
}
124110

125111
default Promise<T> orTimeout(Duration duration, boolean cancelOnTimeout) {
126-
Promise<? extends Try<T>> onTimeout = Timeouts.delayed(null, duration);
127-
Promise<T> result =
128-
this.dependent()
129-
.thenApply(Try::success, false)
130-
.exceptionally(Try::failure, true)
131-
// Use *Async to execute on default "this" executor
132-
.applyToEitherAsync(onTimeout, v -> Try.doneOrTimeout(v, duration), PromiseOrigin.ALL);
133-
134-
result.whenComplete(Timeouts.timeoutsCleanup(this, onTimeout, cancelOnTimeout));
135-
return result.raw();
112+
return dependent().orTimeout(duration, cancelOnTimeout).raw();
136113
}
137114

138115
default Promise<T> onTimeout(T value, long timeout, TimeUnit unit) {
@@ -148,17 +125,7 @@ default Promise<T> onTimeout(T value, Duration duration) {
148125
}
149126

150127
default Promise<T> onTimeout(T value, Duration duration, boolean cancelOnTimeout) {
151-
// timeout converted to supplier
152-
Promise<Try<T>> onTimeout = Timeouts.delayed(Try.success(value), duration);
153-
Promise<T> result =
154-
this.dependent()
155-
.thenApply(Try::success, false)
156-
.exceptionally(Try::failure, true)
157-
// Use *Async to execute on default "this" executor
158-
.applyToEitherAsync(onTimeout, Try::done, PromiseOrigin.ALL);
159-
160-
result.whenComplete(Timeouts.timeoutsCleanup(this, onTimeout, cancelOnTimeout));
161-
return result.raw();
128+
return dependent().onTimeout(value, duration, cancelOnTimeout).raw();
162129
}
163130

164131
default Promise<T> onTimeout(Supplier<? extends T> supplier, long timeout, TimeUnit unit) {
@@ -174,20 +141,7 @@ default Promise<T> onTimeout(Supplier<? extends T> supplier, Duration duration)
174141
}
175142

176143
default Promise<T> onTimeout(Supplier<? extends T> supplier, Duration duration, boolean cancelOnTimeout) {
177-
// timeout converted to supplier
178-
Promise<Supplier<Try<T>>> onTimeout = Timeouts.delayed(Try.with(supplier), duration);
179-
180-
Promise<T> result =
181-
this.dependent()
182-
// resolved value converted to supplier of Try
183-
.thenApply(Try::success, false)
184-
.exceptionally(Try::failure, true)
185-
.thenApply(SharedFunctions::supply, true)
186-
// Use *Async to execute on default "this" executor
187-
.applyToEitherAsync(onTimeout, s -> s.get().done(), PromiseOrigin.ALL);
188-
189-
result.whenComplete(Timeouts.timeoutsCleanup(this, onTimeout, cancelOnTimeout));
190-
return result.raw();
144+
return dependent().onTimeout(supplier, duration, cancelOnTimeout).raw();
191145
}
192146

193147
/**
@@ -319,23 +273,23 @@ Promise<Void> runAfterEitherAsync(CompletionStage<?> other,
319273
Promise<T> exceptionally(Function<Throwable, ? extends T> fn);
320274

321275
default Promise<T> exceptionallyAsync(Function<Throwable, ? extends T> fn) {
322-
return handleAsync(SharedFunctions.exceptionallyApply(fn));
276+
return dependent().exceptionallyAsync(fn).raw();
323277
}
324278

325279
default Promise<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor) {
326-
return handleAsync(SharedFunctions.exceptionallyApply(fn), executor);
280+
return dependent().exceptionallyAsync(fn, executor).raw();
327281
}
328282

329283
default Promise<T> exceptionallyCompose(Function<Throwable, ? extends CompletionStage<T>> fn) {
330-
return Promises.flatMap(handle(SharedFunctions.exceptionallyCompose(fn)));
284+
return dependent().exceptionallyCompose(fn).raw();
331285
}
332286

333287
default Promise<T> exceptionallyComposeAsync(Function<Throwable, ? extends CompletionStage<T>> fn) {
334-
return Promises.flatMap(handleAsync(SharedFunctions.exceptionallyCompose(fn)));
288+
return dependent().exceptionallyComposeAsync(fn).raw();
335289
}
336290

337291
default Promise<T> exceptionallyComposeAsync(Function<Throwable, ? extends CompletionStage<T>> fn, Executor executor) {
338-
return Promises.flatMap(handleAsync(SharedFunctions.exceptionallyCompose(fn), executor));
292+
return dependent().exceptionallyComposeAsync(fn, executor).raw();
339293
}
340294

341295
Promise<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);

src/main/java/net/tascalate/concurrent/Promises.java

Lines changed: 34 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -109,17 +109,6 @@ public static <T> CompletionStage<T> withDefaultExecutor(CompletionStage<T> stag
109109
return new ExecutorBoundCompletionStage<>(stage, executor);
110110
}
111111

112-
113-
public static <T> Promise<T> flatMap(CompletionStage<? extends CompletionStage<T>> stage) {
114-
return flatMap(from(stage));
115-
}
116-
117-
public static <T> Promise<T> flatMap(Promise<? extends CompletionStage<T>> promise) {
118-
return promise.dependent()
119-
.thenCompose(Function.identity(), true)
120-
.raw();
121-
}
122-
123112
/**
124113
* <p>Returns a promise that is resolved successfully when all {@link CompletionStage}-s passed as parameters
125114
* are completed normally; if any promise completed exceptionally, then resulting promise is resolved faulty
@@ -488,29 +477,44 @@ public static <T> Promise<List<T>> atLeast(int minResultsCount, int maxErrorsCou
488477
}
489478
}
490479

491-
public static <T> CompletionStage<T> exceptionallyApplyAsync(CompletionStage<? extends T> delegate,
480+
public static <T> CompletionStage<T> exceptionallyApplyAsync(CompletionStage<T> delegate,
492481
Function<Throwable, ? extends T> fn) {
493-
return delegate.handleAsync(SharedFunctions.exceptionallyApply(fn));
482+
return delegate.handle((r, ex) -> ex == null ?
483+
delegate :
484+
delegate.<T>handleAsync((r1, ex1) -> fn.apply(ex1)))
485+
.thenCompose(Function.identity());
494486
}
495487

496-
public static <T> CompletionStage<T> exceptionallyApplyAsync(CompletionStage<? extends T> delegate,
488+
public static <T> CompletionStage<T> exceptionallyApplyAsync(CompletionStage<T> delegate,
497489
Function<Throwable, ? extends T> fn, Executor executor) {
498-
return delegate.handleAsync(SharedFunctions.exceptionallyApply(fn), executor);
499-
}
500-
501-
public static <T> Promise<T> exceptionallyCompose(CompletionStage<? extends T> delegate,
502-
Function<Throwable, ? extends CompletionStage<T>> fn) {
503-
return flatMap( delegate.handle(SharedFunctions.exceptionallyCompose(fn)) );
504-
}
505-
506-
public static <T> Promise<T> exceptionallyComposeAsync(CompletionStage<? extends T> delegate,
507-
Function<Throwable, ? extends CompletionStage<T>> fn) {
508-
return flatMap( delegate.handleAsync(SharedFunctions.exceptionallyCompose(fn)) );
509-
}
510-
511-
public static <T> Promise<T> exceptionallyComposeAsync(CompletionStage<? extends T> delegate,
512-
Function<Throwable, ? extends CompletionStage<T>> fn, Executor executor) {
513-
return flatMap( delegate.handleAsync(SharedFunctions.exceptionallyCompose(fn), executor) );
490+
return delegate.handle((r, ex) -> ex == null ?
491+
delegate :
492+
delegate.<T>handleAsync((r1, ex1) -> fn.apply(ex1), executor))
493+
.thenCompose(Function.identity());
494+
}
495+
496+
public static <T> CompletionStage<T> exceptionallyCompose(CompletionStage<T> delegate,
497+
Function<Throwable, ? extends CompletionStage<T>> fn) {
498+
return delegate.handle((r, ex) -> ex == null ? delegate : fn.apply(ex))
499+
.thenCompose(Function.identity());
500+
}
501+
502+
public static <T> CompletionStage<T> exceptionallyComposeAsync(CompletionStage<T> delegate,
503+
Function<Throwable, ? extends CompletionStage<T>> fn) {
504+
return delegate.handle((r, ex) -> ex == null ?
505+
delegate :
506+
delegate.handleAsync((r1, ex1) -> fn.apply(ex1))
507+
.thenCompose(Function.identity()))
508+
.thenCompose(Function.identity());
509+
}
510+
511+
public static <T> CompletionStage<T> exceptionallyComposeAsync(CompletionStage<T> delegate,
512+
Function<Throwable, ? extends CompletionStage<T>> fn, Executor executor) {
513+
return delegate.handle((r, ex) -> ex == null ?
514+
delegate :
515+
delegate.handleAsync((r1, ex1) -> fn.apply(ex1), executor)
516+
.thenCompose(Function.identity()))
517+
.thenCompose(Function.identity());
514518
}
515519

516520
public static Promise<Void> retry(Runnable codeBlock, Executor executor,

src/main/java/net/tascalate/concurrent/SharedFunctions.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.CompletionStage;
2323
import java.util.concurrent.ExecutionException;
2424
import java.util.concurrent.Future;
25+
import java.util.concurrent.atomic.AtomicReference;
2526
import java.util.function.BiFunction;
2627
import java.util.function.Function;
2728
import java.util.function.Supplier;
@@ -82,13 +83,10 @@ static boolean cancelPromise(CompletionStage<?> promise, boolean mayInterruptIfR
8283
.orElse(Boolean.FALSE);
8384
}
8485
}
85-
86-
static <T> BiFunction<T, Throwable, T> exceptionallyApply(Function<Throwable, ? extends T> fn) {
87-
return (r, ex) -> null != ex ? fn.apply(ex) : r;
88-
}
89-
90-
static <T> BiFunction<T, Throwable, Promise<T>> exceptionallyCompose(Function<Throwable, ? extends CompletionStage<T>> fn) {
91-
return (r, ex) -> ex != null ? Promises.from(fn.apply(ex)) : Promises.success(r);
86+
87+
static <T> T updateReference(T value, AtomicReference<? super T> ref) {
88+
ref.set(value);
89+
return value;
9290
}
9391

9492

src/main/java/net/tascalate/concurrent/decorators/AbstractDependentPromiseDecorator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ public DependentPromise<T> defaultAsyncOn(Executor executor) {
6666
return (DependentPromise<T>)super.defaultAsyncOn(executor);
6767
}
6868

69+
@Override
70+
public DependentPromise<T> onCancel(Runnable code) {
71+
return (DependentPromise<T>)super.onCancel(code);
72+
}
73+
6974
@Override
7075
public DependentPromise<T> delay(long timeout, TimeUnit unit) {
7176
return (DependentPromise<T>)super.delay(timeout, unit);

src/main/java/net/tascalate/concurrent/decorators/AbstractPromiseDecorator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,11 @@ public Promise<T> defaultAsyncOn(Executor executor) {
8888
}
8989
}
9090

91+
@Override
92+
public Promise<T> onCancel(Runnable code) {
93+
return wrap(delegate.onCancel(code));
94+
}
95+
9196
@Override
9297
public Promise<T> delay(long timeout, TimeUnit unit) {
9398
return wrap(delegate.delay(timeout, unit));

0 commit comments

Comments
 (0)