Skip to content

Commit 35d6348

Browse files
committed
Move responsibilities between Timeouts & PromiseHelper; optimize Promise.delay implementation
1 parent 946e300 commit 35d6348

File tree

9 files changed

+91
-54
lines changed

9 files changed

+91
-54
lines changed

src/main/java/net/tascalate/concurrent/AbstractCompletableTask.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -382,9 +382,7 @@ public CompletableFuture<T> toCompletableFuture() {
382382
return result;
383383
}
384384

385-
private static <T> T op(boolean v) {
386-
return null;
387-
}
385+
private static <T> T op(boolean v) { return null; }
388386

389387
/**
390388
* This method exists just to reconcile generics when called from

src/main/java/net/tascalate/concurrent/ConfigurableDependentPromise.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -865,7 +865,12 @@ private CompletionStage<?>[] origin(boolean enlist) {
865865
}
866866

867867
private CompletionStage<?>[] originAndParam(CompletionStage<?> param, Set<PromiseOrigin> enlistOptions) {
868-
final CompletionStage<?>[] result = new CompletionStage<?>[enlistOptions.size()];
868+
int size = enlistOptions.size();
869+
if (size == 0) {
870+
return null;
871+
}
872+
873+
CompletionStage<?>[] result = new CompletionStage<?>[size];
869874
int idx = 0;
870875
if (enlistOptions.contains(PromiseOrigin.THIS)) {
871876
result[idx++] = this;

src/main/java/net/tascalate/concurrent/Promise.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.time.Duration;
2525
import java.util.Set;
2626
import java.util.concurrent.CancellationException;
27-
import java.util.concurrent.CompletableFuture;
2827
import java.util.concurrent.CompletionException;
2928
import java.util.concurrent.CompletionStage;
3029
import java.util.concurrent.ExecutionException;
@@ -136,14 +135,15 @@ default Promise<T> delay(Duration duration, boolean delayOnError) {
136135
.unwrap()
137136
);
138137
}
139-
CompletableFuture<Try<? super T>> delayed = new CompletableFuture<>();
140-
whenComplete(Timeouts.configureDelay(this, delayed, duration, delayOnError));
141-
return this.dependent()
142-
.handle(Try.lift(), false)
143-
// Use *Async to execute on default "this" executor
144-
.thenCombineAsync(delayed, selectFirst(), PromiseOrigin.ALL)
145-
.thenCompose(Try::asPromise, true)
146-
.unwrap();
138+
DependentPromise<Try<T>> h = dependent().handle(Try.lift(), false);
139+
return h.thenCompose(t -> t.isSuccess() || !(isCancelled() || t.isCancel()) ?
140+
// "this" is already completed promise here (in both cases)
141+
// Use *Async to execute on default "this" executor
142+
h.thenCombineAsync(Timeouts.delay(duration), (_1, _2) -> join(), PromiseOrigin.PARAM_ONLY)
143+
:
144+
this,
145+
true
146+
).unwrap();
147147
}
148148

149149
default Promise<T> orTimeout(long timeout, TimeUnit unit) {
@@ -167,7 +167,7 @@ default Promise<T> orTimeout(Duration duration, boolean cancelOnTimeout) {
167167
.applyToEitherAsync(onTimeout, v -> Try.doneOrTimeout(v, duration), PromiseOrigin.ALL)
168168
.thenCompose(Try::asPromise, true);
169169

170-
result.whenComplete(Timeouts.timeoutsCleanup(this, onTimeout, cancelOnTimeout));
170+
result.whenComplete(PromiseHelper.timeoutsCleanup(this, onTimeout, cancelOnTimeout));
171171
return result.unwrap();
172172
}
173173

@@ -192,7 +192,7 @@ default Promise<T> onTimeout(T value, Duration duration, boolean cancelOnTimeout
192192
.applyToEitherAsync(onTimeout, Function.identity(), PromiseOrigin.ALL)
193193
.thenCompose(Try::asPromise, true);
194194

195-
result.whenComplete(Timeouts.timeoutsCleanup(this, onTimeout, cancelOnTimeout));
195+
result.whenComplete(PromiseHelper.timeoutsCleanup(this, onTimeout, cancelOnTimeout));
196196
return result.unwrap();
197197
}
198198

@@ -220,7 +220,7 @@ default Promise<T> onTimeout(Supplier<? extends T> supplier, Duration duration,
220220
.applyToEitherAsync(onTimeout, Supplier::get, PromiseOrigin.ALL)
221221
.thenCompose(Try::asPromise, true);
222222

223-
result.whenComplete(Timeouts.timeoutsCleanup(this, onTimeout, cancelOnTimeout));
223+
result.whenComplete(PromiseHelper.timeoutsCleanup(this, onTimeout, cancelOnTimeout));
224224
return result.unwrap();
225225
}
226226

src/main/java/net/tascalate/concurrent/PromiseHelper.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.util.concurrent.CompletionStage;
1919
import java.util.concurrent.Executor;
20+
import java.util.function.BiConsumer;
2021
import java.util.function.Function;
2122

2223
class PromiseHelper {
@@ -87,4 +88,16 @@ static <T> Promise<T> exceptionallyComposeAsync(Promise<T> p,
8788
h.thenCompose(e -> e.isLeft() ? p : h.thenComposeAsync(in -> fn.apply(in.right()), executor), true)
8889
.unwrap();
8990
}
91+
92+
93+
static <T, U> BiConsumer<T, U> timeoutsCleanup(Promise<T> self, Promise<?> timeout, boolean cancelOnTimeout) {
94+
return (_1, _2) -> {
95+
// Result comes from timeout and cancel-on-timeout is set
96+
// If both are done then cancel has no effect anyway
97+
if (cancelOnTimeout && timeout.isDone() && !timeout.isCancelled()) {
98+
self.cancel(true);
99+
}
100+
timeout.cancel(true);
101+
};
102+
}
90103
}

src/main/java/net/tascalate/concurrent/SharedFunctions.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,6 @@ static <T> Supplier<T> supply(T value) {
9999
}
100100

101101
static void iif(boolean v) {}
102-
/*
103-
static <T> void voided(T v) {}
104-
*/
105102

106103
static final Function<Object, Throwable> NO_SUCH_ELEMENT = t -> new NoSuchElementException("Result rejected by filter: " + t);
107104

src/main/java/net/tascalate/concurrent/Timeouts.java

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,11 @@
1818
import java.time.Duration;
1919
import java.time.temporal.ChronoUnit;
2020
import java.util.Objects;
21-
import java.util.concurrent.CompletableFuture;
2221
import java.util.concurrent.Executors;
2322
import java.util.concurrent.Future;
2423
import java.util.concurrent.ScheduledExecutorService;
2524
import java.util.concurrent.TimeUnit;
2625
import java.util.concurrent.TimeoutException;
27-
import java.util.function.BiConsumer;
2826

2927
class Timeouts {
3028

@@ -103,33 +101,6 @@ static Duration toDuration(long delay, TimeUnit timeUnit) {
103101
return Duration.of(delay, toChronoUnit(timeUnit));
104102
}
105103

106-
static <T, U> BiConsumer<T, U> timeoutsCleanup(Promise<T> self, Promise<?> timeout, boolean cancelOnTimeout) {
107-
return (r, e) -> {
108-
// Result comes from timeout and cancel-on-timeout is set
109-
// If both are done then cancel has no effect anyway
110-
if (cancelOnTimeout && timeout.isDone() && !timeout.isCancelled()) {
111-
self.cancel(true);
112-
}
113-
timeout.cancel(true);
114-
};
115-
}
116-
117-
static <T, E extends Throwable> BiConsumer<T, E> configureDelay(Promise<? extends T> self,
118-
CompletableFuture<Try<? super T>> delayed,
119-
Duration duration,
120-
boolean delayOnError) {
121-
return (result, error) -> {
122-
if (error == null || (delayOnError && !self.isCancelled())) {
123-
Promise<?> timeout = delay(duration);
124-
delayed.whenComplete( (r, e) -> timeout.cancel(true) );
125-
timeout.whenComplete( (r, e) -> delayed.complete(Try.nothing()) );
126-
} else {
127-
// when error and should not delay on error
128-
delayed.complete(Try.nothing());
129-
}
130-
};
131-
}
132-
133104
private static ChronoUnit toChronoUnit(TimeUnit unit) {
134105
Objects.requireNonNull(unit, "unit");
135106
switch (unit) {

src/main/java/net/tascalate/concurrent/Try.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ abstract class Try<R> {
2525

2626
abstract R done();
2727
abstract boolean isSuccess();
28+
abstract boolean isCancel();
2829
abstract Promise<R> asPromise();
2930

3031
static final class Success<R> extends Try<R> {
@@ -49,6 +50,11 @@ Promise<R> asPromise() {
4950
boolean isSuccess() {
5051
return true;
5152
}
53+
54+
@Override
55+
boolean isCancel() {
56+
return false;
57+
}
5258
}
5359

5460
static final class Failure<R> extends Try<R> {
@@ -61,9 +67,6 @@ static final class Failure<R> extends Try<R> {
6167

6268
@Override
6369
R done() {
64-
/*
65-
return sneakyThrow(error);
66-
*/
6770
if (error instanceof Error) {
6871
throw (Error)error;
6972
} else if (error instanceof CancellationException) {
@@ -82,6 +85,13 @@ Promise<R> asPromise() {
8285
boolean isSuccess() {
8386
return false;
8487
}
88+
89+
@Override
90+
boolean isCancel() {
91+
Throwable ex = SharedFunctions.unwrapCompletionException(error);
92+
return ex instanceof CancellationException;
93+
}
94+
8595
}
8696

8797
static <R> Try<R> success(R result) {
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package net.tascalate.concurrent;
2+
3+
import java.util.concurrent.CompletableFuture;
4+
import java.util.concurrent.ExecutionException;
5+
import java.util.concurrent.ThreadFactory;
6+
7+
import net.tascalate.concurrent.core.CompletionStageAPI;
8+
9+
public class Exceptions {
10+
public static void main(final String[] argv) throws InterruptedException, ExecutionException {
11+
CompletionStageAPI.current();
12+
13+
CompletableFuture<String> stage1 = CompletableFuture.supplyAsync(() -> {throw new IllegalStateException();});
14+
stage1.whenComplete((r, e) -> {
15+
System.out.println(e);
16+
});
17+
18+
CompletableFuture<String> stage2 = CompletableFuture.completedFuture("ABC");
19+
stage2.thenApply(s -> {throw new RuntimeException("Z");})
20+
.handle((r, e) -> {
21+
System.out.println(e);
22+
return null;
23+
});
24+
25+
final ThreadFactory tf = TaskExecutors.newThreadFactory()
26+
.withNameFormat("CTX-MY-THREAD-%d-OF-%s")
27+
.withThreadGroup(
28+
TaskExecutors.newThreadGroup()
29+
.withName("Tascalate-Tasks")
30+
.withMaxPriority(Thread.NORM_PRIORITY)
31+
.build()
32+
)
33+
.withContextClassLoader(J8Examples.class.getClassLoader())
34+
.build();
35+
36+
TaskExecutorService executorService = TaskExecutors.newFixedThreadPool(6, tf);
37+
Promise<String> task1 = CompletableTask.completed("ABC", executorService);
38+
task1.thenApply(s -> {throw new RuntimeException("Z");})
39+
.handle((r, e) -> {
40+
System.out.println(e);
41+
return null;
42+
});
43+
executorService.shutdownNow();
44+
}
45+
}

src/test/java/net/tascalate/concurrent/J8Examples.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -234,9 +234,7 @@ public static void main(final String[] argv) throws InterruptedException, Execut
234234
//Promise<Object> k = CompletableTask.supplyAsync(() -> {throw new RuntimeException();}, executorService);
235235
//Promise<Object> k = Promises.success("ABC");
236236
//Promise<Object> k = Promises.failure(new RuntimeException());
237-
k.dependent().delay(Duration.ofMillis(1), true).whenComplete((r, e) -> System.out.println(Thread.currentThread() + " ==> " + r + ", " + e));
238-
239-
237+
k.dependent().delay(Duration.ofMillis(50), true, true).whenComplete((r, e) -> System.out.println(Thread.currentThread() + " ==> " + r + ", " + e));
240238

241239
Promise<Object> k1 = CompletableTask.supplyAsync(() -> produceStringSlow("-onTimeout1"), executorService);
242240
k1.onTimeout("ALTERNATE1", Duration.ofMillis(50))

0 commit comments

Comments
 (0)