Skip to content

Commit 478b4e9

Browse files
committed
More cancellation options for third-party CompletionStage impl. Revisiting Promises combiner methods to return exceptional future instead of throwing exception
1 parent 84962ba commit 478b4e9

File tree

4 files changed

+82
-29
lines changed

4 files changed

+82
-29
lines changed

src/main/java/net/tascalate/concurrent/Promises.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Collection;
2626
import java.util.Collections;
2727
import java.util.List;
28+
import java.util.NoSuchElementException;
2829
import java.util.Objects;
2930
import java.util.Optional;
3031
import java.util.concurrent.Callable;
@@ -46,7 +47,7 @@
4647
* @author vsilaev
4748
*
4849
*/
49-
public class Promises {
50+
public final class Promises {
5051

5152
private Promises() {}
5253

@@ -703,8 +704,13 @@ private static <T> Promise<T> insufficientNumberOfArguments(int minResultCount,
703704
String message = String.format(
704705
"The number of futures supplied (%d) is less than a number of futures to await (%d)", size, minResultCount
705706
);
706-
//return failure(new NoSuchElementException(message));
707+
Exception ex = new NoSuchElementException(message);
708+
//TODO: exceptional completion vs runtime exception on combined promise construction?
709+
ex.fillInStackTrace();
710+
return failure(ex);
711+
/*
707712
throw new IllegalArgumentException(message);
713+
*/
708714
}
709715

710716
private static <V, T> RetryCallable<V, T> toRetryCallable(Callable<? extends V> callable) {
@@ -715,10 +721,8 @@ private static Duration duration(long startTime, long finishTime) {
715721
return Duration.ofNanos(finishTime - startTime);
716722
}
717723

718-
static final Object IGNORE = new Object();
719-
720724
@FunctionalInterface
721-
static interface F3<T, U, V> {
725+
private static interface F3<T, U, V> {
722726
void apply(T p1, U p2, V p3);
723727
}
724728

src/main/java/net/tascalate/concurrent/RetryPolicy.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,6 @@ protected static final class PositiveVerdict implements Verdict {
8585
private final DelayPolicy<? super T> backoff;
8686
private final DelayPolicy<? super T> timeout;
8787

88-
public static <T> RetryPolicy<T> create() {
89-
return new RetryPolicy<>(-1);
90-
}
91-
9288
@SafeVarargs
9389
public final RetryPolicy<T> retryOn(Class<? extends Throwable>... retryOnThrowables) {
9490
return retryOn(Arrays.asList(retryOnThrowables));

src/main/java/net/tascalate/concurrent/SharedFunctions.java

Lines changed: 72 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,18 @@
1717

1818
import java.lang.reflect.Method;
1919
import java.time.Duration;
20+
import java.util.Optional;
2021
import java.util.concurrent.CancellationException;
2122
import java.util.concurrent.CompletionException;
2223
import java.util.concurrent.CompletionStage;
2324
import java.util.concurrent.ExecutionException;
2425
import java.util.concurrent.Future;
2526
import java.util.concurrent.TimeoutException;
2627
import java.util.function.BiFunction;
28+
import java.util.function.Function;
2729
import java.util.function.Supplier;
30+
import java.util.stream.Stream;
31+
2832

2933
class SharedFunctions {
3034

@@ -68,25 +72,17 @@ static boolean cancelPromise(CompletionStage<?> promise, boolean mayInterruptIfR
6872
Future<?> future = (Future<?>) promise;
6973
return future.cancel(mayInterruptIfRunning);
7074
} else {
71-
Method m = completeExceptionallyMethodOf(promise);
72-
if (null != m) {
73-
try {
74-
return (Boolean) m.invoke(promise, new CancellationException());
75-
} catch (final ReflectiveOperationException ex) {
76-
return false;
77-
}
78-
} else {
79-
return false;
80-
}
81-
}
82-
}
83-
84-
private static Method completeExceptionallyMethodOf(CompletionStage<?> promise) {
85-
try {
86-
Class<?> clazz = promise.getClass();
87-
return clazz.getMethod("completeExceptionally", Throwable.class);
88-
} catch (ReflectiveOperationException | SecurityException ex) {
89-
return null;
75+
Stream<Function<Class<?>, ExceptionalCancellation>> options = Stream.of(
76+
SharedFunctions::cancelInterruptibleMethodOf,
77+
SharedFunctions::cancelMethodOf,
78+
SharedFunctions::completeExceptionallyMethodOf
79+
);
80+
81+
return options.map(f -> tryCancellation(f, promise, mayInterruptIfRunning))
82+
.filter(Optional::isPresent)
83+
.map(Optional::get)
84+
.findFirst()
85+
.orElse(Boolean.FALSE);
9086
}
9187
}
9288

@@ -113,6 +109,63 @@ static <T, E extends Throwable> T sneakyThrow(Throwable e) throws E {
113109
throw (E) e;
114110
}
115111

112+
113+
private static Optional<Boolean> tryCancellation(Function<Class<?>, ExceptionalCancellation> option,
114+
CompletionStage<?> promise,
115+
boolean mayInterruptIfRunning) {
116+
117+
118+
return Optional.ofNullable( option.apply(promise.getClass()) )
119+
.map(SharedFunctions::uncheckedReflectionException) // TODO: False for reflective operation or runtime exception?
120+
.map(bf -> bf.apply(promise, mayInterruptIfRunning ? Boolean.TRUE : Boolean.FALSE))
121+
;
122+
}
123+
124+
private static ExceptionalCancellation completeExceptionallyMethodOf(Class<?> clazz) {
125+
try {
126+
Method m = clazz.getMethod("completeExceptionally", Throwable.class);
127+
return (p, b) -> (Boolean)m.invoke(p, new CancellationException());
128+
} catch (ReflectiveOperationException | SecurityException ex) {
129+
return null;
130+
}
131+
}
132+
133+
private static ExceptionalCancellation cancelInterruptibleMethodOf(Class<?> clazz) {
134+
try {
135+
Method m = clazz.getMethod("cancel", boolean.class);
136+
return (p, b) -> (Boolean)m.invoke(p, b);
137+
} catch (ReflectiveOperationException | SecurityException ex) {
138+
return null;
139+
}
140+
}
141+
142+
private static ExceptionalCancellation cancelMethodOf(Class<?> clazz) {
143+
try {
144+
Method m = clazz.getMethod("cancel");
145+
return (p, b) -> (Boolean)m.invoke(p);
146+
} catch (ReflectiveOperationException | SecurityException ex) {
147+
return null;
148+
}
149+
}
150+
151+
private static <T, U> Cancellation uncheckedReflectionException(ExceptionalCancellation original ) {
152+
return (a, b) -> {
153+
try {
154+
return original.apply(a, b);
155+
} catch (ReflectiveOperationException ex) {
156+
throw new RuntimeException(ex);
157+
}
158+
};
159+
}
160+
116161
private static final BiFunction<Object, Object, Object> SELECT_FIRST = (u, v) -> u;
117162
private static final BiFunction<Object, Object, Object> SELECT_SECOND = (u, v) -> v;
163+
164+
@FunctionalInterface
165+
private static interface Cancellation extends BiFunction<CompletionStage<?>, Boolean, Boolean> { }
166+
167+
@FunctionalInterface
168+
public interface ExceptionalCancellation {
169+
Boolean apply(CompletionStage<?> p, Boolean b) throws ReflectiveOperationException;
170+
}
118171
}

src/test/java/net/tascalate/concurrent/J8Examples.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public static void main(final String[] argv) throws InterruptedException, Execut
5353

5454
Promise<BigInteger> tryTyping = Promises.retry(
5555
J8Examples::tryCalc, executorService,
56-
RetryPolicy.<Number>create().withResultValidator(v -> v.intValue() > 0).withMaxRetries(2)
56+
new RetryPolicy<Number>().withResultValidator(v -> v.intValue() > 0).withMaxRetries(2)
5757
);
5858
System.out.println( tryTyping.get() );
5959

0 commit comments

Comments
 (0)