Skip to content

Commit f7437bf

Browse files
committed
Simplify timeouts cleanup
1 parent 9cd0dc9 commit f7437bf

File tree

3 files changed

+30
-40
lines changed

3 files changed

+30
-40
lines changed

src/main/java/net/tascalate/concurrent/DependentPromise.java

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -173,16 +173,10 @@ public DependentPromise<T> orTimeout(Duration duration, boolean cancelOnTimeout)
173173
public DependentPromise<T> orTimeout(Duration duration, boolean cancelOnTimeout, boolean enlistOrigin) {
174174
Promise<T> onTimeout = Promises.failAfter(duration);
175175
// Use *async to execute on default "this" executor
176-
return this
177-
.applyToEitherAsync(onTimeout, Function.identity(), enlistOrigin ? PromiseOrigin.ALL : PromiseOrigin.PARAM_ONLY)
178-
.whenComplete((v, e) -> {
179-
// Result comes from timeout and cancel-on-timeout is set
180-
// If both are done then cancel has no effect anyway
181-
if (onTimeout.isDone() && cancelOnTimeout) {
182-
cancel(true);
183-
}
184-
onTimeout.cancel(true);
185-
}, true);
176+
DependentPromise<T> result = this
177+
.applyToEitherAsync(onTimeout, Function.identity(), enlistOrigin ? PromiseOrigin.ALL : PromiseOrigin.PARAM_ONLY);
178+
result.whenComplete(Promises.timeoutsCleanup(this, onTimeout, cancelOnTimeout));
179+
return result;
186180
}
187181

188182
public DependentPromise<T> onTimeout(T value, long timeout, TimeUnit unit) {
@@ -237,19 +231,14 @@ public DependentPromise<T> onTimeout(Supplier<T> supplier, Duration duration, bo
237231
.dependent(Promises.delay(duration))
238232
.thenApply(d -> supplier, true);
239233

240-
return this
234+
DependentPromise<T> result = this
241235
// resolved value converted to supplier
242236
.thenApply(valueToSupplier, enlistOrigin)
243237
// Use *async to execute on default "this" executor
244-
.applyToEitherAsync(onTimeout, Supplier::get, PromiseOrigin.ALL)
245-
.whenComplete((v, e) -> {
246-
// Result comes from timeout and cancel-on-timeout is set
247-
// If both are done then cancel has no effect anyway
248-
if (onTimeout.isDone() && cancelOnTimeout) {
249-
cancel(true);
250-
}
251-
onTimeout.cancel(true);
252-
}, true);
238+
.applyToEitherAsync(onTimeout, Supplier::get, PromiseOrigin.ALL);
239+
240+
result.whenComplete(Promises.timeoutsCleanup(this, onTimeout, cancelOnTimeout));
241+
return result;
253242
}
254243

255244
public <U> DependentPromise<U> thenApply(Function<? super T, ? extends U> fn) {

src/main/java/net/tascalate/concurrent/Promise.java

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -75,16 +75,11 @@ default Promise<T> orTimeout(Duration duration) {
7575
default Promise<T> orTimeout(Duration duration, boolean cancelOnTimeout) {
7676
Promise<T> onTimeout = Promises.failAfter(duration);
7777
// Use *async to execute on default "this" executor
78-
return Promises.dependent(this)
79-
.applyToEitherAsync(onTimeout, Function.identity(), PromiseOrigin.PARAM_ONLY)
80-
.whenComplete((v, e) -> {
81-
// Result comes from timeout and cancel-on-timeout is set
82-
// If both are done then cancel has no effect anyway
83-
if (onTimeout.isDone() && cancelOnTimeout) {
84-
cancel(true);
85-
}
86-
onTimeout.cancel(true);
87-
}, true);
78+
Promise<T> result = Promises.dependent(this)
79+
.applyToEitherAsync(onTimeout, Function.identity(), PromiseOrigin.PARAM_ONLY);
80+
81+
result.whenComplete(Promises.timeoutsCleanup(this, onTimeout, cancelOnTimeout));
82+
return result;
8883
}
8984

9085
default Promise<T> onTimeout(T value, long timeout, TimeUnit unit) {
@@ -123,19 +118,14 @@ default Promise<T> onTimeout(Supplier<T> supplier, Duration duration, boolean ca
123118
.dependent(Promises.delay(duration))
124119
.thenApply(d -> supplier, true);
125120

126-
return Promises.dependent(this)
121+
Promise<T> result = Promises.dependent(this)
127122
// resolved value converted to supplier
128123
.thenApply(valueToSupplier, false)
129124
// Use *async to execute on default "this" executor
130-
.applyToEitherAsync(onTimeout, Supplier::get, PromiseOrigin.ALL)
131-
.whenComplete((v, e) -> {
132-
// Result comes from timeout and cancel-on-timeout is set
133-
// If both are done then cancel has no effect anyway
134-
if (onTimeout.isDone() && cancelOnTimeout) {
135-
cancel(true);
136-
}
137-
onTimeout.cancel(true);
138-
}, true);
125+
.applyToEitherAsync(onTimeout, Supplier::get, PromiseOrigin.ALL);
126+
127+
result.whenComplete(Promises.timeoutsCleanup(this, onTimeout, cancelOnTimeout));
128+
return result;
139129
}
140130

141131
public <U> Promise<U> thenApply(Function<? super T, ? extends U> fn);

src/main/java/net/tascalate/concurrent/Promises.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -657,6 +657,17 @@ static Duration toDuration(long delay, TimeUnit timeUnit) {
657657
return Duration.of(delay, toChronoUnit(timeUnit));
658658
}
659659

660+
static <T, U> BiConsumer<T, U> timeoutsCleanup(Promise<T> self, Promise<?> onTimeout, boolean cancelOnTimeout) {
661+
return (r, e) -> {
662+
// Result comes from timeout and cancel-on-timeout is set
663+
// If both are done then cancel has no effect anyway
664+
if ((onTimeout.isDone() && !onTimeout.isCancelled()) && cancelOnTimeout) {
665+
self.cancel(true);
666+
}
667+
onTimeout.cancel(true);
668+
};
669+
}
670+
660671
private static ChronoUnit toChronoUnit(TimeUnit unit) {
661672
Objects.requireNonNull(unit, "unit");
662673
switch (unit) {

0 commit comments

Comments
 (0)