Skip to content

Commit 0a0523e

Browse files
committed
Fix for exceptional handling of Promise.orTimeout
1 parent f37379c commit 0a0523e

File tree

7 files changed

+61
-81
lines changed

7 files changed

+61
-81
lines changed

src/main/java/net/tascalate/concurrent/ConfigurableDependentPromise.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
import static net.tascalate.concurrent.LinkedCompletion.FutureCompletion;
1919
import static net.tascalate.concurrent.SharedFunctions.cancelPromise;
2020
import static net.tascalate.concurrent.SharedFunctions.selectFirst;
21-
import static net.tascalate.concurrent.SharedFunctions.supply;
22-
import static net.tascalate.concurrent.SharedFunctions.timeout;
2321

2422
import java.time.Duration;
2523
import java.util.Arrays;
@@ -154,11 +152,12 @@ public DependentPromise<T> orTimeout(Duration duration, boolean cancelOnTimeout)
154152

155153
@Override
156154
public DependentPromise<T> orTimeout(Duration duration, boolean cancelOnTimeout, boolean enlistOrigin) {
157-
Promise<? extends Reference<T>> onTimeout = Timeouts.delayed(null, duration);
155+
Promise<? extends Try<T>> onTimeout = Timeouts.delayed(null, duration);
158156
DependentPromise<T> result =
159-
this.thenApply(Reference::new, enlistOrigin)
157+
this.thenApply(Try::success, enlistOrigin)
158+
.exceptionally(Try::failure, true)
160159
// Use *Async to execute on default "this" executor
161-
.applyToEitherAsync(onTimeout, v -> Reference.getOrElse(v, timeout(duration)), PromiseOrigin.ALL);
160+
.applyToEitherAsync(onTimeout, v -> Try.doneOrTimeout(v, duration), PromiseOrigin.ALL);
162161

163162
result.whenComplete(Timeouts.timeoutsCleanup(this, onTimeout, cancelOnTimeout));
164163
return result;
@@ -171,9 +170,12 @@ public DependentPromise<T> onTimeout(T value, Duration duration, boolean cancelO
171170

172171
@Override
173172
public DependentPromise<T> onTimeout(T value, Duration duration, boolean cancelOnTimeout, boolean enlistOrigin) {
174-
Promise<T> onTimeout = Timeouts.delayed(value, duration);
175-
// Use *Async to execute on default "this" executor
176-
DependentPromise<T> result = applyToEitherAsync(onTimeout, Function.identity(), enlistParamOrAll(enlistOrigin));
173+
Promise<Try<T>> onTimeout = Timeouts.delayed(Try.success(value), duration);
174+
DependentPromise<T> result =
175+
this.thenApply(Try::success, enlistOrigin)
176+
.exceptionally(Try::failure, true)
177+
// Use *Async to execute on default "this" executor
178+
.applyToEitherAsync(onTimeout, Try::done, PromiseOrigin.ALL);
177179

178180
result.whenComplete(Timeouts.timeoutsCleanup(this, onTimeout, cancelOnTimeout));
179181
return result;
@@ -187,15 +189,15 @@ public DependentPromise<T> onTimeout(Supplier<? extends T> supplier, Duration du
187189

188190
@Override
189191
public DependentPromise<T> onTimeout(Supplier<? extends T> supplier, Duration duration, boolean cancelOnTimeout, boolean enlistOrigin) {
190-
Function<T, Supplier<? extends T>> valueToSupplier = v -> supply(v);
191-
192192
// timeout converted to supplier
193-
Promise<Supplier<? extends T>> onTimeout = Timeouts.delayed(supplier, duration);
193+
Promise<Supplier<Try<T>>> onTimeout = Timeouts.delayed(Try.with(supplier), duration);
194194

195195
DependentPromise<T> result =
196-
this.thenApply(valueToSupplier, enlistOrigin)
196+
this.thenApply(Try::success, enlistOrigin)
197+
.exceptionally(Try::failure, true)
198+
.thenApply(SharedFunctions::supply, true)
197199
// Use *Async to execute on default "this" executor
198-
.applyToEitherAsync(onTimeout, Supplier::get, PromiseOrigin.ALL);
200+
.applyToEitherAsync(onTimeout, s -> s.get().done(), PromiseOrigin.ALL);
199201

200202
result.whenComplete(Timeouts.timeoutsCleanup(this, onTimeout, cancelOnTimeout));
201203
return result;
@@ -715,10 +717,6 @@ private CompletionStage<?>[] originAndParam(CompletionStage<?> param, Set<Promis
715717
private boolean defaultEnlistOrigin() {
716718
return defaultEnlistOptions.contains(PromiseOrigin.THIS);
717719
}
718-
719-
private static Set<PromiseOrigin> enlistParamOrAll(boolean enlistThis) {
720-
return enlistThis ? PromiseOrigin.ALL : PromiseOrigin.PARAM_ONLY;
721-
}
722720

723721
static void cancelPromises(CompletionStage<?>[] promises, boolean mayInterruptIfRunning) {
724722
if (null != promises) {

src/main/java/net/tascalate/concurrent/Promise.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
package net.tascalate.concurrent;
1717

1818
import static net.tascalate.concurrent.SharedFunctions.selectFirst;
19-
import static net.tascalate.concurrent.SharedFunctions.supply;
20-
import static net.tascalate.concurrent.SharedFunctions.timeout;
2119
import static net.tascalate.concurrent.SharedFunctions.unwrapExecutionException;
2220
import static net.tascalate.concurrent.SharedFunctions.wrapCompletionException;
2321

@@ -52,7 +50,7 @@
5250
public interface Promise<T> extends Future<T>, CompletionStage<T> {
5351

5452
default T getNow(T valueIfAbsent) throws CancellationException, CompletionException {
55-
return getNow(supply(valueIfAbsent));
53+
return getNow(SharedFunctions.supply(valueIfAbsent));
5654
}
5755

5856
default T getNow(Supplier<? extends T> valueIfAbsent) throws CancellationException, CompletionException {
@@ -125,12 +123,13 @@ default Promise<T> orTimeout(Duration duration) {
125123
}
126124

127125
default Promise<T> orTimeout(Duration duration, boolean cancelOnTimeout) {
128-
Promise<? extends Reference<T>> onTimeout = Timeouts.delayed(null, duration);
126+
Promise<? extends Try<T>> onTimeout = Timeouts.delayed(null, duration);
129127
Promise<T> result =
130128
this.dependent()
131-
.thenApply(Reference::new, false)
129+
.thenApply(Try::success, false)
130+
.exceptionally(Try::failure, true)
132131
// Use *Async to execute on default "this" executor
133-
.applyToEitherAsync(onTimeout, v -> Reference.getOrElse(v, timeout(duration)), PromiseOrigin.ALL);
132+
.applyToEitherAsync(onTimeout, v -> Try.doneOrTimeout(v, duration), PromiseOrigin.ALL);
134133

135134
result.whenComplete(Timeouts.timeoutsCleanup(this, onTimeout, cancelOnTimeout));
136135
return result.raw();
@@ -150,11 +149,13 @@ default Promise<T> onTimeout(T value, Duration duration) {
150149

151150
default Promise<T> onTimeout(T value, Duration duration, boolean cancelOnTimeout) {
152151
// timeout converted to supplier
153-
Promise<T> onTimeout = Timeouts.delayed(value, duration);
154-
// Use *Async to execute on default "this" executor
152+
Promise<Try<T>> onTimeout = Timeouts.delayed(Try.success(value), duration);
155153
Promise<T> result =
156154
this.dependent()
157-
.applyToEitherAsync(onTimeout, Function.identity(), PromiseOrigin.PARAM_ONLY);
155+
.thenApply(Try::success, false)
156+
.exceptionally(Try::failure, true)
157+
// Use *Async to execute on default "this" executor
158+
.applyToEitherAsync(onTimeout, Try::done, PromiseOrigin.ALL);
158159

159160
result.whenComplete(Timeouts.timeoutsCleanup(this, onTimeout, cancelOnTimeout));
160161
return result.raw();
@@ -173,17 +174,17 @@ default Promise<T> onTimeout(Supplier<? extends T> supplier, Duration duration)
173174
}
174175

175176
default Promise<T> onTimeout(Supplier<? extends T> supplier, Duration duration, boolean cancelOnTimeout) {
176-
Function<T, Supplier<? extends T>> valueToSupplier = v -> supply(v);
177-
178177
// timeout converted to supplier
179-
Promise<Supplier<? extends T>> onTimeout = Timeouts.delayed(supplier, duration);
178+
Promise<Supplier<Try<T>>> onTimeout = Timeouts.delayed(Try.with(supplier), duration);
180179

181180
Promise<T> result =
182181
this.dependent()
183-
// resolved value converted to supplier
184-
.thenApply(valueToSupplier, false)
182+
// resolved value converted to supplier of Try
183+
.thenApply(Try::success, false)
184+
.exceptionally(Try::failure, true)
185+
.thenApply(SharedFunctions::supply, true)
185186
// Use *Async to execute on default "this" executor
186-
.applyToEitherAsync(onTimeout, Supplier::get, PromiseOrigin.ALL);
187+
.applyToEitherAsync(onTimeout, s -> s.get().done(), PromiseOrigin.ALL);
187188

188189
result.whenComplete(Timeouts.timeoutsCleanup(this, onTimeout, cancelOnTimeout));
189190
return result.raw();

src/main/java/net/tascalate/concurrent/Reference.java

Lines changed: 0 additions & 38 deletions
This file was deleted.

src/main/java/net/tascalate/concurrent/SharedFunctions.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,17 @@
1616
package net.tascalate.concurrent;
1717

1818
import java.lang.reflect.Method;
19-
import java.time.Duration;
2019
import java.util.Optional;
2120
import java.util.concurrent.CancellationException;
2221
import java.util.concurrent.CompletionException;
2322
import java.util.concurrent.CompletionStage;
2423
import java.util.concurrent.ExecutionException;
2524
import java.util.concurrent.Future;
26-
import java.util.concurrent.TimeoutException;
2725
import java.util.function.BiFunction;
2826
import java.util.function.Function;
2927
import java.util.function.Supplier;
3028
import java.util.stream.Stream;
3129

32-
3330
class SharedFunctions {
3431

3532
private SharedFunctions() {}
@@ -100,10 +97,6 @@ static <T> Supplier<T> supply(T value) {
10097
return () -> value;
10198
}
10299

103-
static Supplier<TimeoutException> timeout(Duration duration) {
104-
return supply(new TimeoutException("Timeout after " + duration));
105-
}
106-
107100
@SuppressWarnings("unchecked")
108101
static <T, E extends Throwable> T sneakyThrow(Throwable e) throws E {
109102
throw (E) e;

src/main/java/net/tascalate/concurrent/Try.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
*/
1616
package net.tascalate.concurrent;
1717

18+
import java.time.Duration;
19+
import java.util.concurrent.TimeoutException;
20+
import java.util.function.Supplier;
21+
1822
abstract class Try<R> {
1923

2024
abstract R done();
@@ -55,7 +59,24 @@ static <R> Try<R> success(R result) {
5559
static <R> Try<R> failure(Throwable error) {
5660
return new Failure<R>(error);
5761
}
58-
62+
63+
static <R> Supplier<Try<R>> with(Supplier<? extends R> supplier) {
64+
return () -> of(supplier);
65+
}
66+
67+
static <R> Try<R> of(Supplier<? extends R> supplier) {
68+
try {
69+
return success(supplier.get());
70+
} catch (Throwable ex) {
71+
return failure(ex);
72+
}
73+
}
74+
75+
static <R> R doneOrTimeout(Try<R> result, Duration duration) {
76+
Try<R> checkedResult = null != result ? result: failure(new TimeoutException("Timeout after " + duration));
77+
return checkedResult.done();
78+
}
79+
5980
@SuppressWarnings("unchecked")
6081
static <R> Try<R> nothing() {
6182
return (Try<R>)NOTHING;

src/test/java/net/tascalate/concurrent/J8Examples.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public static void main(final String[] argv) throws InterruptedException, Execut
131131
System.out.println("After delay: " + v);
132132
return v;
133133
}, true)
134-
.onTimeout(123456789, Duration.ofMillis(2000))
134+
.onTimeout(() -> 123456789, Duration.ofMillis(2000))
135135
.thenAcceptAsync(J8Examples::onComplete)
136136
.get();
137137

src/test/java/net/tascalate/concurrent/OrTimeoutExceptionTests.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static org.junit.Assert.assertTrue;
44

55
import java.time.Duration;
6+
import java.util.concurrent.CompletionException;
67
import java.util.concurrent.TimeUnit;
78

89
import org.junit.After;
@@ -35,13 +36,17 @@ public void testExceptionalOrTimeout() {
3536
else
3637
System.out.println("ERROR: " + err.getMessage());
3738
});
38-
p.join();
39+
try {
40+
p.join();
41+
} catch (CompletionException ex) {
42+
assertTrue(ex.getCause() instanceof IllegalStateException);
43+
}
3944
}
4045

4146
private String doTask () throws Exception {
4247
TimeUnit.SECONDS.sleep(3);
4348
if (null != System.out)
44-
throw new Exception("my error");
49+
throw new IllegalStateException("my error");
4550
return "executed ok";
4651
}
4752

0 commit comments

Comments
 (0)