Skip to content

Commit f61374a

Browse files
committed
Refactoring Promise.orTimeout / onTimeout methods (now with option to cancel original promise on timeout)
1 parent a7e288d commit f61374a

File tree

3 files changed

+142
-62
lines changed

3 files changed

+142
-62
lines changed

src/main/java/net/tascalate/concurrent/DependentPromise.java

Lines changed: 73 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -136,71 +136,106 @@ private static <U> DependentPromise<U> doWrap(Promise<U> original, CompletionSta
136136

137137

138138
public DependentPromise<T> orTimeout(long timeout, TimeUnit unit) {
139-
return orTimeout(timeout, unit, false);
139+
return orTimeout(timeout, unit, true);
140+
}
141+
142+
public DependentPromise<T> orTimeout(long timeout, TimeUnit unit, boolean cancelOnTimeout) {
143+
return orTimeout(timeout, unit, cancelOnTimeout, false);
140144
}
141145

142-
public DependentPromise<T> orTimeout(long timeout, TimeUnit unit, boolean enlistOrigin) {
143-
return orTimeout(Promises.toDuration(timeout, unit), enlistOrigin);
146+
public DependentPromise<T> orTimeout(long timeout, TimeUnit unit, boolean cancelOnTimeout, boolean enlistOrigin) {
147+
return orTimeout(Promises.toDuration(timeout, unit), cancelOnTimeout, enlistOrigin);
144148
}
145149

146150
public DependentPromise<T> orTimeout(Duration duration) {
147-
return orTimeout(duration, false);
151+
return orTimeout(duration, true);
152+
}
153+
154+
public DependentPromise<T> orTimeout(Duration duration, boolean cancelOnTimeout) {
155+
return orTimeout(duration, cancelOnTimeout, false);
148156
}
149157

150-
public DependentPromise<T> orTimeout(Duration duration, boolean enlistOrigin) {
158+
public DependentPromise<T> orTimeout(Duration duration, boolean cancelOnTimeout, boolean enlistOrigin) {
159+
Promise<T> onTimeout = Promises.failAfter(duration);
151160
// Use *async to execute on default "this" executor
152-
return applyToEitherAsync(
153-
Promises.failAfter(duration),
154-
Function.identity(),
161+
return Promises.dependent(this).applyToEitherAsync(
162+
onTimeout,
163+
v -> {
164+
if (cancelOnTimeout) {
165+
cancel(true);
166+
}
167+
onTimeout.cancel(true);
168+
return v;
169+
},
155170
enlistOrigin ? PromiseOrigin.ALL : PromiseOrigin.PARAM_ONLY
156-
);
171+
);
172+
}
173+
174+
public DependentPromise<T> onTimeout(T value, long timeout, TimeUnit unit) {
175+
return onTimeout(value, timeout, unit, true);
176+
}
177+
178+
public DependentPromise<T> onTimeout(T value, long timeout, TimeUnit unit, boolean cancelOnTimeout) {
179+
return onTimeout(value, timeout, unit, cancelOnTimeout, false);
157180
}
158181

159-
public DependentPromise<T> completeOnTimeout(T value, long timeout, TimeUnit unit) {
160-
return completeOnTimeout(value, timeout, unit, false);
182+
public DependentPromise<T> onTimeout(T value, long timeout, TimeUnit unit, boolean cancelOnTimeout, boolean enlistOrigin) {
183+
return onTimeout(value, Promises.toDuration(timeout, unit), cancelOnTimeout, enlistOrigin);
161184
}
162185

163-
public DependentPromise<T> completeOnTimeout(T value, long timeout, TimeUnit unit, boolean enlistOrigin) {
164-
return completeOnTimeout(value, Promises.toDuration(timeout, unit), enlistOrigin);
186+
public DependentPromise<T> onTimeout(T value, Duration duration) {
187+
return onTimeout(value, duration, true);
165188
}
166189

167-
public DependentPromise<T> completeOnTimeout(T value, Duration duration) {
168-
return completeOnTimeout(value, duration, false);
190+
public DependentPromise<T> onTimeout(T value, Duration duration, boolean cancelOnTimeout) {
191+
return onTimeout(value, duration, cancelOnTimeout, false);
169192
}
170193

171-
public DependentPromise<T> completeOnTimeout(T value, Duration duration, boolean enlistOrigin) {
172-
// Use *async to execute on default "this" executor
173-
return applyToEitherAsync(
174-
// timeout converted to onTimeout value
175-
DependentPromise.from(Promises.delay(duration)).thenApply(d -> value, true),
176-
Function.identity(),
177-
enlistOrigin ? PromiseOrigin.ALL : PromiseOrigin.PARAM_ONLY
178-
);
194+
public DependentPromise<T> onTimeout(T value, Duration duration, boolean cancelOnTimeout, boolean enlistOrigin) {
195+
return onTimeout(() -> value, duration, enlistOrigin);
196+
}
197+
198+
public DependentPromise<T> onTimeout(Supplier<T> supplier, long timeout, TimeUnit unit) {
199+
return onTimeout(supplier, timeout, unit, true);
179200
}
180201

181-
public DependentPromise<T> completeOnTimeout(Supplier<T> supplier, long timeout, TimeUnit unit) {
182-
return completeOnTimeout(supplier, timeout, unit, false);
202+
public DependentPromise<T> onTimeout(Supplier<T> supplier, long timeout, TimeUnit unit, boolean cancelOnTimeout) {
203+
return onTimeout(supplier, Promises.toDuration(timeout, unit), false);
183204
}
184205

185-
public DependentPromise<T> completeOnTimeout(Supplier<T> supplier, long timeout, TimeUnit unit, boolean enlistOrigin) {
186-
return completeOnTimeout(supplier, Promises.toDuration(timeout, unit), enlistOrigin);
206+
public DependentPromise<T> onTimeout(Supplier<T> supplier, long timeout, TimeUnit unit, boolean cancelOnTimeout, boolean enlistOrigin) {
207+
return onTimeout(supplier, Promises.toDuration(timeout, unit), enlistOrigin);
187208
}
188209

189-
public DependentPromise<T> completeOnTimeout(Supplier<T> supplier, Duration duration) {
190-
return completeOnTimeout(supplier, duration, false);
210+
public DependentPromise<T> onTimeout(Supplier<T> supplier, Duration duration) {
211+
return onTimeout(supplier, duration, true);
212+
}
213+
214+
public DependentPromise<T> onTimeout(Supplier<T> supplier, Duration duration, boolean cancelOnTimeout) {
215+
return onTimeout(supplier, duration, cancelOnTimeout, false);
191216
}
192217

193-
public DependentPromise<T> completeOnTimeout(Supplier<T> supplier, Duration duration, boolean enlistOrigin) {
218+
public DependentPromise<T> onTimeout(Supplier<T> supplier, Duration duration, boolean cancelOnTimeout, boolean enlistOrigin) {
194219
Function<T, Supplier<T>> valueToSupplier = v -> () -> v;
195-
return this
196-
.thenApply(valueToSupplier, enlistOrigin) // ready this value converted to supplier
197-
.applyToEitherAsync(// Use *async to execute on default "this" executor
198-
// timeout converted to supplier of onTimeout value
199-
DependentPromise.from(Promises.delay(duration)).thenApply(d -> supplier, true),
200-
Function.identity(),
220+
221+
// timeout converted to supplier
222+
Promise<Supplier<T>> onTimeout = Promises.dependent(Promises.delay(duration)).thenApply(d -> supplier, true);
223+
224+
return Promises.dependent(this)
225+
// resolved value converted to supplier
226+
.thenApply(valueToSupplier, enlistOrigin)
227+
// Use *async to execute on default "this" executor
228+
.applyToEitherAsync(
229+
onTimeout,
230+
s -> {
231+
if (cancelOnTimeout) {
232+
cancel(true);
233+
}
234+
onTimeout.cancel(true);
235+
return s.get();
236+
},
201237
PromiseOrigin.ALL
202-
)
203-
.thenApply(s -> s.get(), true);
238+
);
204239
}
205240

206241
public <U> DependentPromise<U> thenApply(Function<? super T, ? extends U> fn) {

src/main/java/net/tascalate/concurrent/Promise.java

Lines changed: 61 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -61,45 +61,82 @@ default public T getNow(Supplier<T> valueIfAbsent) {
6161
}
6262

6363
default Promise<T> orTimeout(long timeout, TimeUnit unit) {
64-
return orTimeout(Promises.toDuration(timeout, unit));
64+
return orTimeout(timeout, unit, true);
65+
}
66+
67+
default Promise<T> orTimeout(long timeout, TimeUnit unit, boolean cancelOnTimeout) {
68+
return orTimeout(Promises.toDuration(timeout, unit), cancelOnTimeout);
6569
}
6670

6771
default Promise<T> orTimeout(Duration duration) {
72+
return orTimeout(duration, true);
73+
}
74+
75+
default Promise<T> orTimeout(Duration duration, boolean cancelOnTimeout) {
76+
Promise<T> onTimeout = Promises.failAfter(duration);
6877
// Use *async to execute on default "this" executor
69-
return DependentPromise.from(this).applyToEitherAsync(
70-
Promises.failAfter(duration), Function.identity(), PromiseOrigin.PARAM_ONLY
78+
return Promises.dependent(this).applyToEitherAsync(
79+
onTimeout,
80+
v -> {
81+
if (cancelOnTimeout) {
82+
cancel(true);
83+
}
84+
onTimeout.cancel(true);
85+
return v;
86+
},
87+
PromiseOrigin.PARAM_ONLY
7188
);
7289
}
7390

74-
default Promise<T> completeOnTimeout(T value, long timeout, TimeUnit unit) {
75-
return completeOnTimeout(value, Promises.toDuration(timeout, unit));
91+
default Promise<T> onTimeout(T value, long timeout, TimeUnit unit) {
92+
return onTimeout(value, timeout, unit, true);
93+
}
94+
95+
default Promise<T> onTimeout(T value, long timeout, TimeUnit unit, boolean cancelOnTimeout) {
96+
return onTimeout(value, Promises.toDuration(timeout, unit));
7697
}
7798

78-
default Promise<T> completeOnTimeout(T value, Duration duration) {
79-
return DependentPromise.from(this)
80-
.applyToEitherAsync(// Use *async to execute on default "this" executor
81-
// timeout converted to onTimeout value
82-
DependentPromise.from(Promises.delay(duration)).thenApply(d -> value, true),
83-
Function.identity(),
84-
PromiseOrigin.PARAM_ONLY
85-
);
99+
default Promise<T> onTimeout(T value, Duration duration) {
100+
return onTimeout(value, duration, true);
101+
}
102+
103+
default Promise<T> onTimeout(T value, Duration duration, boolean cancelOnTimeout) {
104+
return onTimeout(() -> value, duration, cancelOnTimeout);
105+
}
106+
107+
default Promise<T> onTimeout(Supplier<T> supplier, long timeout, TimeUnit unit) {
108+
return onTimeout(supplier, timeout, unit, true);
109+
}
110+
111+
default Promise<T> onTimeout(Supplier<T> supplier, long timeout, TimeUnit unit, boolean cancelOnTimeout) {
112+
return onTimeout(supplier, Promises.toDuration(timeout, unit), cancelOnTimeout);
86113
}
87114

88-
default Promise<T> completeOnTimeout(Supplier<T> supplier, long timeout, TimeUnit unit) {
89-
return completeOnTimeout(supplier, Promises.toDuration(timeout, unit));
115+
default Promise<T> onTimeout(Supplier<T> supplier, Duration duration) {
116+
return onTimeout(supplier, duration, true);
90117
}
91118

92-
default Promise<T> completeOnTimeout(Supplier<T> supplier, Duration duration) {
119+
default Promise<T> onTimeout(Supplier<T> supplier, Duration duration, boolean cancelOnTimeout) {
93120
Function<T, Supplier<T>> valueToSupplier = v -> () -> v;
94-
return DependentPromise.from(this)
95-
.thenApply(valueToSupplier, false) // ready this value converted to supplier
96-
.applyToEitherAsync(// Use *async to execute on default "this" executor
97-
// timeout converted to supplier of onTimeout value
98-
DependentPromise.from(Promises.delay(duration)).thenApply(d -> supplier, true),
99-
Function.identity(),
121+
122+
// timeout converted to supplier
123+
Promise<Supplier<T>> onTimeout = Promises.dependent(Promises.delay(duration)).thenApply(d -> supplier, true);
124+
125+
return Promises.dependent(this)
126+
// resolved value converted to supplier
127+
.thenApply(valueToSupplier, false)
128+
// Use *async to execute on default "this" executor
129+
.applyToEitherAsync(
130+
onTimeout,
131+
s -> {
132+
if (cancelOnTimeout) {
133+
cancel(true);
134+
}
135+
onTimeout.cancel(true);
136+
return s.get();
137+
},
100138
PromiseOrigin.ALL
101-
)
102-
.thenApply(s -> s.get(), true);
139+
);
103140
}
104141

105142
public <U> Promise<U> thenApply(Function<? super T, ? extends U> fn);

src/test/java/net/tascalate/concurrent/J8Examples.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package net.tascalate.concurrent;
1717

18+
import java.time.Duration;
1819
import java.util.Arrays;
1920
import java.util.concurrent.ExecutionException;
2021
import java.util.stream.Collectors;
@@ -24,8 +25,14 @@ public class J8Examples {
2425
public static void main(final String[] argv) throws InterruptedException, ExecutionException {
2526
final TaskExecutorService executorService = TaskExecutors.newFixedThreadPool(3);
2627

28+
Promises.success("ABC").applyToEither(Promises.failure(new IllegalArgumentException()),r -> {
29+
System.out.println("Race won by value: " + r);
30+
return r;
31+
});
32+
2733
CompletableTask
2834
.supplyAsync(() -> awaitAndProduceN(73), executorService)
35+
.onTimeout(123456789, Duration.ofMillis(200))
2936
.thenAcceptAsync(J8Examples::onComplete)
3037
.get();
3138

@@ -127,6 +134,7 @@ private static int awaitAndProduceN(int i) {
127134
return i * 1000;
128135
} catch (final InterruptedException ex) {
129136
Thread.currentThread().interrupt();
137+
System.out.println("awaitAndProduceN interrupted, requested value " + i);
130138
return -1;
131139
}
132140
}

0 commit comments

Comments
 (0)