Skip to content

Commit 9cd0dc9

Browse files
committed
Minor fix to timeout handling; Fixing timeouts for polling
1 parent 61fa1c6 commit 9cd0dc9

File tree

3 files changed

+51
-35
lines changed

3 files changed

+51
-35
lines changed

src/main/java/net/tascalate/concurrent/DependentPromise.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -173,15 +173,16 @@ public DependentPromise<T> orTimeout(Duration duration, boolean cancelOnTimeout)
173173
public DependentPromise<T> orTimeout(Duration duration, boolean cancelOnTimeout, boolean enlistOrigin) {
174174
Promise<T> onTimeout = Promises.failAfter(duration);
175175
// Use *async to execute on default "this" executor
176-
return applyToEitherAsync(
177-
onTimeout, Function.identity(),
178-
enlistOrigin ? PromiseOrigin.ALL : PromiseOrigin.PARAM_ONLY
179-
).whenComplete((v, e) -> {
180-
if (cancelOnTimeout) {
181-
cancel(true);
182-
}
183-
onTimeout.cancel(true);
184-
}, true);
176+
return this
177+
.applyToEitherAsync(onTimeout, Function.identity(), enlistOrigin ? PromiseOrigin.ALL : PromiseOrigin.PARAM_ONLY)
178+
.whenComplete((v, e) -> {
179+
// Result comes from timeout and cancel-on-timeout is set
180+
// If both are done then cancel has no effect anyway
181+
if (onTimeout.isDone() && cancelOnTimeout) {
182+
cancel(true);
183+
}
184+
onTimeout.cancel(true);
185+
}, true);
185186
}
186187

187188
public DependentPromise<T> onTimeout(T value, long timeout, TimeUnit unit) {
@@ -232,15 +233,19 @@ public DependentPromise<T> onTimeout(Supplier<T> supplier, Duration duration, bo
232233
Function<T, Supplier<T>> valueToSupplier = v -> () -> v;
233234

234235
// timeout converted to supplier
235-
Promise<Supplier<T>> onTimeout = Promises.dependent(Promises.delay(duration)).thenApply(d -> supplier, true);
236+
Promise<Supplier<T>> onTimeout = Promises
237+
.dependent(Promises.delay(duration))
238+
.thenApply(d -> supplier, true);
236239

237240
return this
238241
// resolved value converted to supplier
239242
.thenApply(valueToSupplier, enlistOrigin)
240243
// Use *async to execute on default "this" executor
241244
.applyToEitherAsync(onTimeout, Supplier::get, PromiseOrigin.ALL)
242245
.whenComplete((v, e) -> {
243-
if (cancelOnTimeout) {
246+
// Result comes from timeout and cancel-on-timeout is set
247+
// If both are done then cancel has no effect anyway
248+
if (onTimeout.isDone() && cancelOnTimeout) {
244249
cancel(true);
245250
}
246251
onTimeout.cancel(true);

src/main/java/net/tascalate/concurrent/Promise.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,13 +78,13 @@ default Promise<T> orTimeout(Duration duration, boolean cancelOnTimeout) {
7878
return Promises.dependent(this)
7979
.applyToEitherAsync(onTimeout, Function.identity(), PromiseOrigin.PARAM_ONLY)
8080
.whenComplete((v, e) -> {
81-
if (cancelOnTimeout) {
81+
// Result comes from timeout and cancel-on-timeout is set
82+
// If both are done then cancel has no effect anyway
83+
if (onTimeout.isDone() && cancelOnTimeout) {
8284
cancel(true);
8385
}
8486
onTimeout.cancel(true);
85-
},
86-
true
87-
);
87+
}, true);
8888
}
8989

9090
default Promise<T> onTimeout(T value, long timeout, TimeUnit unit) {
@@ -119,15 +119,19 @@ default Promise<T> onTimeout(Supplier<T> supplier, Duration duration, boolean ca
119119
Function<T, Supplier<T>> valueToSupplier = v -> () -> v;
120120

121121
// timeout converted to supplier
122-
Promise<Supplier<T>> onTimeout = Promises.dependent(Promises.delay(duration)).thenApply(d -> supplier, true);
122+
Promise<Supplier<T>> onTimeout = Promises
123+
.dependent(Promises.delay(duration))
124+
.thenApply(d -> supplier, true);
123125

124126
return Promises.dependent(this)
125127
// resolved value converted to supplier
126128
.thenApply(valueToSupplier, false)
127129
// Use *async to execute on default "this" executor
128130
.applyToEitherAsync(onTimeout, Supplier::get, PromiseOrigin.ALL)
129131
.whenComplete((v, e) -> {
130-
if (cancelOnTimeout) {
132+
// Result comes from timeout and cancel-on-timeout is set
133+
// If both are done then cancel has no effect anyway
134+
if (onTimeout.isDone() && cancelOnTimeout) {
131135
cancel(true);
132136
}
133137
onTimeout.cancel(true);

src/main/java/net/tascalate/concurrent/Promises.java

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.function.BiConsumer;
3737
import java.util.function.Consumer;
3838
import java.util.function.Function;
39+
import java.util.function.Supplier;
3940

4041
/**
4142
* Utility class to create a resolved (either successfully or faulty) {@link Promise}-s;
@@ -587,29 +588,35 @@ private static <T> void pollOnce(Callable<Optional<? extends T>> codeBlock,
587588
}
588589
};
589590

590-
Promise<?> callPromise;
591-
Promise<?> finalPromise;
591+
Supplier<Promise<?>> submittedCall = () -> {
592+
// Call should be done via CompletableTask to let it be interruptible
593+
Promise<?> p = CompletableTask.runAsync(doCall, executor);
594+
if (answer.hasTimeout()) {
595+
p.orTimeout( Duration.ofMillis(Math.max(0, answer.timeoutDelayMillis()) ) );
596+
}
597+
return p;
598+
};
599+
600+
Consumer<Promise<?>> changeCallPromiseRef = p -> {
601+
// If result promise is cancelled after callPromise was set need to stop;
602+
callPromiseRef.set( p );
603+
if (resultPromise.isDone()) {
604+
p.cancel(true);
605+
}
606+
};
607+
608+
592609
long backoffDelayMillis = answer.backoffDelayMillis();
593610
if (backoffDelayMillis > 0) {
594611
// Timeout itself
595-
Promise<?> backoff = delay(Duration.ofMillis(backoffDelayMillis));
596-
// Call should be done via CompletableTask to let it be interruptible
597-
finalPromise = CompletableTask.asyncOn(executor).runAfterBothAsync(backoff, doCall);
598-
callPromise = backoff; // Canceling timeout will cancel the chain above
612+
Promise<?> backoff = delay( Duration.ofMillis(backoffDelayMillis) );
613+
// Invocation after timeout
614+
backoff.thenAccept(d -> changeCallPromiseRef.accept( submittedCall.get() ));
615+
// Canceling timeout will cancel the chain above
616+
changeCallPromiseRef.accept( backoff );
599617
} else {
600618
// Immediately send to executor
601-
callPromise = finalPromise = CompletableTask.runAsync(doCall, executor);
602-
}
603-
callPromiseRef.set(callPromise);
604-
// If result promise is cancelled after callPromise was set need to stop;
605-
if (resultPromise.isDone()) {
606-
callPromise.cancel(true);
607-
} else if (answer.hasTimeout()) {
608-
// Restrict execution time of the final promise
609-
// Timeout should be a sum of policy timeout and policy backoff
610-
// while timer is set up immediately
611-
long totalTimeout = Math.max(0, answer.timeoutDelayMillis()) + Math.max(0, answer.backoffDelayMillis());
612-
finalPromise.orTimeout(Duration.ofMillis(totalTimeout), true);
619+
changeCallPromiseRef.accept( submittedCall.get() );
613620
}
614621

615622
} else {

0 commit comments

Comments
 (0)