Skip to content

Commit 8e15c36

Browse files
committed
Promise.delay methods; refactoring timeout-related stuff to the separate internal class
1 parent 22790ce commit 8e15c36

File tree

6 files changed

+259
-155
lines changed

6 files changed

+259
-155
lines changed

src/main/java/net/tascalate/concurrent/AbstractCompletableTask.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ private <R, U> Promise<U> doApplyToEitherAsync(CompletionStage<? extends R> firs
371371
if (failure == null) {
372372
nextStage.onSuccess(result);
373373
} else {
374-
nextStage.onError(wrapException(failure));
374+
nextStage.onError(Promises.wrapException(failure));
375375
}
376376
};
377377
// only the first result is accepted by completion stage,
@@ -403,15 +403,7 @@ private static <R> Function<R, Void> runnableAsFunction(Runnable action) {
403403
}
404404

405405
private static <U> U forwardException(Throwable e) {
406-
throw wrapException(e);
407-
}
408-
409-
private static CompletionException wrapException(Throwable e) {
410-
if (e instanceof CompletionException) {
411-
return (CompletionException) e;
412-
} else {
413-
return new CompletionException(e);
414-
}
406+
throw Promises.wrapException(e);
415407
}
416408

417409
private static ExecutionException rewrapExecutionException(ExecutionException ex) {

src/main/java/net/tascalate/concurrent/CompletableTask.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@
1515
*/
1616
package net.tascalate.concurrent;
1717

18+
import java.time.Duration;
1819
import java.util.concurrent.Callable;
20+
import java.util.concurrent.CompletionStage;
1921
import java.util.concurrent.Executor;
2022
import java.util.concurrent.RunnableFuture;
23+
import java.util.concurrent.TimeUnit;
2124
import java.util.function.Supplier;
2225

2326
/**
@@ -141,7 +144,26 @@ public static <U> Promise<U> supplyAsync(Supplier<U> supplier, Executor executor
141144
executor.execute(result);
142145
return result;
143146
}
144-
147+
148+
public static <T> Promise<T> await(CompletionStage<? extends T> stage, Executor executor) {
149+
return await(stage, executor, false);
150+
}
151+
152+
public static <T> Promise<T> await(CompletionStage<? extends T> stage, Executor executor, boolean enlistOrigin) {
153+
return asyncOn(executor).dependent()
154+
.thenCombineAsync(
155+
stage, (u, v) -> v, enlistOrigin ? PromiseOrigin.PARAM_ONLY : PromiseOrigin.NONE
156+
);
157+
}
158+
159+
public static Promise<Duration> delay(long timeout, TimeUnit unit, Executor executor) {
160+
return delay(Timeouts.toDuration(timeout, unit), executor);
161+
}
162+
163+
public static Promise<Duration> delay(Duration duration, Executor executor) {
164+
return await(Timeouts.delay(duration), executor, true);
165+
}
166+
145167
@Override
146168
Runnable setupTransition(Callable<T> code) {
147169
throw new UnsupportedOperationException();

src/main/java/net/tascalate/concurrent/DependentPromise.java

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,34 @@ private static <U> DependentPromise<U> doWrap(Promise<U> original, CompletionSta
149149
return result;
150150
}
151151

152+
public DependentPromise<T> delay(long timeout, TimeUnit unit) {
153+
return delay(timeout, unit, true);
154+
}
155+
156+
public DependentPromise<T> delay(long timeout, TimeUnit unit, boolean delayOnError) {
157+
return delay(timeout, unit, delayOnError, false);
158+
}
159+
160+
public DependentPromise<T> delay(long timeout, TimeUnit unit, boolean delayOnError, boolean enlistOrigin) {
161+
return delay(Timeouts.toDuration(timeout, unit), delayOnError, enlistOrigin);
162+
}
163+
164+
public DependentPromise<T> delay(Duration duration) {
165+
return delay(duration, true);
166+
}
167+
168+
public DependentPromise<T> delay(Duration duration, boolean delayOnError) {
169+
return delay(duration, delayOnError, false);
170+
}
171+
172+
public DependentPromise<T> delay(Duration duration, boolean delayOnError, boolean enlistOrigin) {
173+
CompletablePromise<?> delayed = new CompletablePromise<>();
174+
whenComplete(Timeouts.configureDelay(this, delayed, duration, delayOnError));
175+
// Use *async to execute on default "this" executor
176+
return thenCombineAsync(
177+
delayed, (r, d) -> r, enlistOrigin ? PromiseOrigin.ALL : PromiseOrigin.PARAM_ONLY
178+
);
179+
}
152180

153181
public DependentPromise<T> orTimeout(long timeout, TimeUnit unit) {
154182
return orTimeout(timeout, unit, true);
@@ -159,7 +187,7 @@ public DependentPromise<T> orTimeout(long timeout, TimeUnit unit, boolean cancel
159187
}
160188

161189
public DependentPromise<T> orTimeout(long timeout, TimeUnit unit, boolean cancelOnTimeout, boolean enlistOrigin) {
162-
return orTimeout(Promises.toDuration(timeout, unit), cancelOnTimeout, enlistOrigin);
190+
return orTimeout(Timeouts.toDuration(timeout, unit), cancelOnTimeout, enlistOrigin);
163191
}
164192

165193
public DependentPromise<T> orTimeout(Duration duration) {
@@ -171,11 +199,11 @@ public DependentPromise<T> orTimeout(Duration duration, boolean cancelOnTimeout)
171199
}
172200

173201
public DependentPromise<T> orTimeout(Duration duration, boolean cancelOnTimeout, boolean enlistOrigin) {
174-
Promise<T> onTimeout = Promises.failAfter(duration);
202+
Promise<T> onTimeout = Timeouts.failAfter(duration);
175203
// Use *async to execute on default "this" executor
176204
DependentPromise<T> result = this
177205
.applyToEitherAsync(onTimeout, Function.identity(), enlistOrigin ? PromiseOrigin.ALL : PromiseOrigin.PARAM_ONLY);
178-
result.whenComplete(Promises.timeoutsCleanup(this, onTimeout, cancelOnTimeout));
206+
result.whenComplete(Timeouts.timeoutsCleanup(this, onTimeout, cancelOnTimeout));
179207
return result;
180208
}
181209

@@ -188,7 +216,7 @@ public DependentPromise<T> onTimeout(T value, long timeout, TimeUnit unit, boole
188216
}
189217

190218
public DependentPromise<T> onTimeout(T value, long timeout, TimeUnit unit, boolean cancelOnTimeout, boolean enlistOrigin) {
191-
return onTimeout(value, Promises.toDuration(timeout, unit), cancelOnTimeout, enlistOrigin);
219+
return onTimeout(value, Timeouts.toDuration(timeout, unit), cancelOnTimeout, enlistOrigin);
192220
}
193221

194222
public DependentPromise<T> onTimeout(T value, Duration duration) {
@@ -208,11 +236,11 @@ public DependentPromise<T> onTimeout(Supplier<? extends T> supplier, long timeou
208236
}
209237

210238
public DependentPromise<T> onTimeout(Supplier<? extends T> supplier, long timeout, TimeUnit unit, boolean cancelOnTimeout) {
211-
return onTimeout(supplier, Promises.toDuration(timeout, unit), false);
239+
return onTimeout(supplier, Timeouts.toDuration(timeout, unit), false);
212240
}
213241

214242
public DependentPromise<T> onTimeout(Supplier<? extends T> supplier, long timeout, TimeUnit unit, boolean cancelOnTimeout, boolean enlistOrigin) {
215-
return onTimeout(supplier, Promises.toDuration(timeout, unit), enlistOrigin);
243+
return onTimeout(supplier, Timeouts.toDuration(timeout, unit), enlistOrigin);
216244
}
217245

218246
public DependentPromise<T> onTimeout(Supplier<? extends T> supplier, Duration duration) {
@@ -227,7 +255,7 @@ public DependentPromise<T> onTimeout(Supplier<? extends T> supplier, Duration du
227255
Function<T, Supplier<? extends T>> valueToSupplier = v -> () -> v;
228256

229257
// timeout converted to supplier
230-
Promise<Supplier<? extends T>> onTimeout = Promises
258+
Promise<Supplier<? extends T>> onTimeout = Timeouts
231259
.delay(duration)
232260
.dependent()
233261
.thenApply(d -> supplier, true);
@@ -238,7 +266,7 @@ public DependentPromise<T> onTimeout(Supplier<? extends T> supplier, Duration du
238266
// Use *async to execute on default "this" executor
239267
.applyToEitherAsync(onTimeout, Supplier::get, PromiseOrigin.ALL);
240268

241-
result.whenComplete(Promises.timeoutsCleanup(this, onTimeout, cancelOnTimeout));
269+
result.whenComplete(Timeouts.timeoutsCleanup(this, onTimeout, cancelOnTimeout));
242270
return result;
243271
}
244272

src/main/java/net/tascalate/concurrent/Promise.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -59,26 +59,48 @@ default T getNow(Supplier<? extends T> valueIfAbsent) {
5959
return valueIfAbsent.get();
6060
}
6161
}
62+
63+
default Promise<T> delay(long timeout, TimeUnit unit) {
64+
return delay(timeout, unit, true);
65+
}
66+
67+
default Promise<T> delay(long timeout, TimeUnit unit, boolean delayOnError) {
68+
return delay(Timeouts.toDuration(timeout, unit), delayOnError);
69+
}
70+
71+
default Promise<T> delay(Duration duration) {
72+
return delay(duration, true);
73+
}
74+
75+
default Promise<T> delay(Duration duration, boolean delayOnError) {
76+
CompletablePromise<?> delayed = new CompletablePromise<>();
77+
whenComplete(Timeouts.configureDelay(this, delayed, duration, delayOnError));
78+
// Use *async to execute on default "this" executor
79+
return dependent().thenCombineAsync(
80+
delayed, (r, d) -> r, PromiseOrigin.PARAM_ONLY
81+
);
82+
}
83+
6284

6385
default Promise<T> orTimeout(long timeout, TimeUnit unit) {
6486
return orTimeout(timeout, unit, true);
6587
}
6688

6789
default Promise<T> orTimeout(long timeout, TimeUnit unit, boolean cancelOnTimeout) {
68-
return orTimeout(Promises.toDuration(timeout, unit), cancelOnTimeout);
90+
return orTimeout(Timeouts.toDuration(timeout, unit), cancelOnTimeout);
6991
}
7092

7193
default Promise<T> orTimeout(Duration duration) {
7294
return orTimeout(duration, true);
7395
}
7496

7597
default Promise<T> orTimeout(Duration duration, boolean cancelOnTimeout) {
76-
Promise<T> onTimeout = Promises.failAfter(duration);
98+
Promise<T> timeout = Timeouts.failAfter(duration);
7799
// Use *async to execute on default "this" executor
78100
Promise<T> result = dependent()
79-
.applyToEitherAsync(onTimeout, Function.identity(), PromiseOrigin.PARAM_ONLY);
101+
.applyToEitherAsync(timeout, Function.identity(), PromiseOrigin.PARAM_ONLY);
80102

81-
result.whenComplete(Promises.timeoutsCleanup(this, onTimeout, cancelOnTimeout));
103+
result.whenComplete(Timeouts.timeoutsCleanup(this, timeout, cancelOnTimeout));
82104
return result;
83105
}
84106

@@ -87,7 +109,7 @@ default Promise<T> onTimeout(T value, long timeout, TimeUnit unit) {
87109
}
88110

89111
default Promise<T> onTimeout(T value, long timeout, TimeUnit unit, boolean cancelOnTimeout) {
90-
return onTimeout(value, Promises.toDuration(timeout, unit));
112+
return onTimeout(value, Timeouts.toDuration(timeout, unit));
91113
}
92114

93115
default Promise<T> onTimeout(T value, Duration duration) {
@@ -103,7 +125,7 @@ default Promise<T> onTimeout(Supplier<? extends T> supplier, long timeout, TimeU
103125
}
104126

105127
default Promise<T> onTimeout(Supplier<? extends T> supplier, long timeout, TimeUnit unit, boolean cancelOnTimeout) {
106-
return onTimeout(supplier, Promises.toDuration(timeout, unit), cancelOnTimeout);
128+
return onTimeout(supplier, Timeouts.toDuration(timeout, unit), cancelOnTimeout);
107129
}
108130

109131
default Promise<T> onTimeout(Supplier<? extends T> supplier, Duration duration) {
@@ -114,7 +136,7 @@ default Promise<T> onTimeout(Supplier<? extends T> supplier, Duration duration,
114136
Function<T, Supplier<? extends T>> valueToSupplier = v -> () -> v;
115137

116138
// timeout converted to supplier
117-
Promise<Supplier<? extends T>> onTimeout = Promises
139+
Promise<Supplier<? extends T>> timeout = Timeouts
118140
.delay(duration)
119141
.dependent()
120142
.thenApply(d -> supplier, true);
@@ -123,9 +145,9 @@ default Promise<T> onTimeout(Supplier<? extends T> supplier, Duration duration,
123145
// resolved value converted to supplier
124146
.thenApply(valueToSupplier, false)
125147
// Use *async to execute on default "this" executor
126-
.applyToEitherAsync(onTimeout, Supplier::get, PromiseOrigin.ALL);
148+
.applyToEitherAsync(timeout, Supplier::get, PromiseOrigin.ALL);
127149

128-
result.whenComplete(Promises.timeoutsCleanup(this, onTimeout, cancelOnTimeout));
150+
result.whenComplete(Timeouts.timeoutsCleanup(this, timeout, cancelOnTimeout));
129151
return result;
130152
}
131153

0 commit comments

Comments
 (0)