Skip to content

Commit 2628dd8

Browse files
committed
DelayPolicy subclasses; fix cancellation in orTimeout/onTimeout; support service-call timeout in polling
1 parent f89bb0f commit 2628dd8

18 files changed

+787
-98
lines changed

src/main/java/net/tascalate/concurrent/Backoff.java

Lines changed: 0 additions & 5 deletions
This file was deleted.
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/**
2+
* Original work: copyright 2013 Tomasz Nurkiewicz
3+
* https://github.com/nurkiewicz/async-retry
4+
*
5+
* This class is based on the work create by Tomasz Nurkiewicz
6+
* under the Apache License, Version 2.0. Please see
7+
* https://github.com/nurkiewicz/async-retry/blob/master/src/main/java/com/nurkiewicz/asyncretry/backoff/Backoff.java
8+
*
9+
* Modified work: copyright 2015-2017 Valery Silaev (http://vsilaev.com)
10+
*
11+
* Licensed under the Apache License, Version 2.0 (the "License");
12+
* you may not use this file except in compliance with the License.
13+
* You may obtain a copy of the License at
14+
*
15+
* http://www.apache.org/licenses/LICENSE-2.0
16+
*
17+
* Unless required by applicable law or agreed to in writing, software
18+
* distributed under the License is distributed on an "AS IS" BASIS,
19+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20+
* See the License for the specific language governing permissions and
21+
* limitations under the License.
22+
*/
23+
package net.tascalate.concurrent;
24+
25+
import net.tascalate.concurrent.delays.BoundedMaxDelayPolicy;
26+
import net.tascalate.concurrent.delays.BoundedMinDelayPolicy;
27+
import net.tascalate.concurrent.delays.FirstRetryNoDelayPolicy;
28+
import net.tascalate.concurrent.delays.FixedIntervalDelayPolicy;
29+
import net.tascalate.concurrent.delays.ProportionalRandomDelayPolicy;
30+
import net.tascalate.concurrent.delays.UniformRandomDelayPolicy;
31+
32+
public interface DelayPolicy {
33+
public static final DelayPolicy DEFAULT = new FirstRetryNoDelayPolicy(new FixedIntervalDelayPolicy());
34+
public static final DelayPolicy INVALID_DELAY = ctx -> -1;
35+
36+
abstract public long delayMillis(RetryContext context);
37+
38+
default DelayPolicy withUniformJitter() {
39+
return new UniformRandomDelayPolicy(this);
40+
}
41+
42+
default DelayPolicy withUniformJitter(long range) {
43+
return new UniformRandomDelayPolicy(this, range);
44+
}
45+
46+
default DelayPolicy withProportionalJitter() {
47+
return new ProportionalRandomDelayPolicy(this);
48+
}
49+
50+
default DelayPolicy withProportionalJitter(double multiplier) {
51+
return new ProportionalRandomDelayPolicy(this, multiplier);
52+
}
53+
54+
default DelayPolicy withMinDelay(long minDelayMillis) {
55+
return new BoundedMinDelayPolicy(this, minDelayMillis);
56+
}
57+
58+
default DelayPolicy withMinDelay() {
59+
return new BoundedMinDelayPolicy(this);
60+
}
61+
62+
default DelayPolicy withMaxDelay(long maxDelayMillis) {
63+
return new BoundedMaxDelayPolicy(this, maxDelayMillis);
64+
}
65+
66+
default DelayPolicy withMaxDelay() {
67+
return new BoundedMaxDelayPolicy(this);
68+
}
69+
70+
default DelayPolicy withFirstRetryNoDelay() {
71+
return new FirstRetryNoDelayPolicy(this);
72+
}
73+
}

src/main/java/net/tascalate/concurrent/DependentPromise.java

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
/**
2+
* Copyright 2015-2017 Valery Silaev (http://vsilaev.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package net.tascalate.concurrent;
217

318
import java.lang.reflect.Array;
@@ -159,16 +174,14 @@ public DependentPromise<T> orTimeout(Duration duration, boolean cancelOnTimeout,
159174
Promise<T> onTimeout = Promises.failAfter(duration);
160175
// Use *async to execute on default "this" executor
161176
return applyToEitherAsync(
162-
onTimeout,
163-
v -> {
164-
if (cancelOnTimeout) {
165-
cancel(true);
166-
}
167-
onTimeout.cancel(true);
168-
return v;
169-
},
177+
onTimeout, Function.identity(),
170178
enlistOrigin ? PromiseOrigin.ALL : PromiseOrigin.PARAM_ONLY
171-
);
179+
).whenComplete((v, e) -> {
180+
if (cancelOnTimeout) {
181+
cancel(true);
182+
}
183+
onTimeout.cancel(true);
184+
}, true);
172185
}
173186

174187
public DependentPromise<T> onTimeout(T value, long timeout, TimeUnit unit) {
@@ -225,17 +238,13 @@ public DependentPromise<T> onTimeout(Supplier<T> supplier, Duration duration, bo
225238
// resolved value converted to supplier
226239
.thenApply(valueToSupplier, enlistOrigin)
227240
// Use *async to execute on default "this" executor
228-
.applyToEitherAsync(
229-
onTimeout,
230-
s -> {
231-
if (cancelOnTimeout) {
232-
cancel(true);
233-
}
234-
onTimeout.cancel(true);
235-
return s.get();
236-
},
237-
PromiseOrigin.ALL
238-
);
241+
.applyToEitherAsync(onTimeout, Supplier::get, PromiseOrigin.ALL)
242+
.whenComplete((v, e) -> {
243+
if (cancelOnTimeout) {
244+
cancel(true);
245+
}
246+
onTimeout.cancel(true);
247+
}, true);
239248
}
240249

241250
public <U> DependentPromise<U> thenApply(Function<? super T, ? extends U> fn) {

src/main/java/net/tascalate/concurrent/Promise.java

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -75,16 +75,15 @@ default Promise<T> orTimeout(Duration duration) {
7575
default Promise<T> orTimeout(Duration duration, boolean cancelOnTimeout) {
7676
Promise<T> onTimeout = Promises.failAfter(duration);
7777
// Use *async to execute on default "this" executor
78-
return Promises.dependent(this).applyToEitherAsync(
79-
onTimeout,
80-
v -> {
78+
return Promises.dependent(this)
79+
.applyToEitherAsync(onTimeout, Function.identity(), PromiseOrigin.PARAM_ONLY)
80+
.whenComplete((v, e) -> {
8181
if (cancelOnTimeout) {
8282
cancel(true);
8383
}
8484
onTimeout.cancel(true);
85-
return v;
8685
},
87-
PromiseOrigin.PARAM_ONLY
86+
true
8887
);
8988
}
9089

@@ -126,17 +125,13 @@ default Promise<T> onTimeout(Supplier<T> supplier, Duration duration, boolean ca
126125
// resolved value converted to supplier
127126
.thenApply(valueToSupplier, false)
128127
// Use *async to execute on default "this" executor
129-
.applyToEitherAsync(
130-
onTimeout,
131-
s -> {
132-
if (cancelOnTimeout) {
133-
cancel(true);
134-
}
135-
onTimeout.cancel(true);
136-
return s.get();
137-
},
138-
PromiseOrigin.ALL
139-
);
128+
.applyToEitherAsync(onTimeout, Supplier::get, PromiseOrigin.ALL)
129+
.whenComplete((v, e) -> {
130+
if (cancelOnTimeout) {
131+
cancel(true);
132+
}
133+
onTimeout.cancel(true);
134+
}, true);
140135
}
141136

142137
public <U> Promise<U> thenApply(Function<? super T, ? extends U> fn);

src/main/java/net/tascalate/concurrent/PromiseOrigin.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
/**
2+
* Copyright 2015-2017 Valery Silaev (http://vsilaev.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package net.tascalate.concurrent;
217

318
import java.util.EnumSet;

src/main/java/net/tascalate/concurrent/Promises.java

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -136,12 +136,9 @@ public static <T> DependentPromise<T> dependent(Promise<T> stage) {
136136
return DependentPromise.from(stage);
137137
}
138138

139-
public static Promise<Void> task(Executor executor) {
140-
return CompletableTask.asyncOn(executor);
141-
}
142-
143139
public static <T> Promise<T> task(CompletionStage<T> stage, Executor executor) {
144-
return dependent(task(executor)).thenCombineAsync(stage, (u, v) -> v, PromiseOrigin.PARAM_ONLY);
140+
return dependent(CompletableTask.asyncOn(executor))
141+
.thenCombineAsync(stage, (u, v) -> v, PromiseOrigin.PARAM_ONLY);
145142
}
146143

147144
private static <T, R> CompletablePromise<R> createLinkedPromise(CompletionStage<T> stage) {
@@ -531,7 +528,11 @@ public static <T> Promise<T> failAfter(long delay, TimeUnit timeUnit) {
531528

532529

533530
public static <T> Promise<T> poll(Callable<? extends T> codeBlock, Executor executor, RetryPolicy retryPolicy) {
534-
return pollOptional(() -> Optional.ofNullable(codeBlock.call()), executor, retryPolicy);
531+
Promise<ObjectRef<T>> wrappedResult = pollOptional(
532+
() -> Optional.of(new ObjectRef<T>( codeBlock.call() )),
533+
executor, retryPolicy
534+
);
535+
return dependent(wrappedResult).thenApply(ObjectRef::dereference, true);
535536
}
536537

537538
public static <T> Promise<T> pollOptional(Callable<Optional<? extends T>> codeBlock, Executor executor, RetryPolicy retryPolicy) {
@@ -560,8 +561,8 @@ private static <T> void pollOnce(Callable<Optional<? extends T>> codeBlock,
560561
return;
561562
}
562563

563-
long executionDelayMillis = ctx.executionDelayMillis();
564-
if (executionDelayMillis >= 0) {
564+
RetryPolicy.Outcome answer = ctx.shouldContinue();
565+
if (answer.shouldExecute()) {
565566
Runnable doCall = () -> {
566567
long startTime = System.currentTimeMillis();
567568
try {
@@ -570,35 +571,43 @@ private static <T> void pollOnce(Callable<Optional<? extends T>> codeBlock,
570571
resultPromise.onSuccess(result.get());
571572
} else {
572573
long finishTime = System.currentTimeMillis();
573-
RetryContext nextCtx = ctx.getNextRetry(finishTime - startTime);
574+
RetryContext nextCtx = ctx.nextRetry(finishTime - startTime);
574575
pollOnce(codeBlock, executor, nextCtx, resultPromise, callPromiseRef);
575576
}
576577
} catch (Exception ex) {
577578
long finishTime = System.currentTimeMillis();
578-
RetryContext nextCtx = ctx.getNextRetry(finishTime - startTime, ex);
579+
RetryContext nextCtx = ctx.nextRetry(finishTime - startTime, ex);
579580
pollOnce(codeBlock, executor, nextCtx, resultPromise, callPromiseRef);
580581
}
581582
};
582583

583584
Promise<?> callPromise;
584-
if (executionDelayMillis > 0) {
585+
Promise<?> finalPromise;
586+
long backoffDelayMillis = answer.backoffDelayMillis();
587+
if (backoffDelayMillis > 0) {
585588
// Timeout itself
586-
Promise<?> timeout = delay(Duration.ofMillis(executionDelayMillis));
587-
// Call should be done via CompletableTask to let it be interruptible
588-
callPromise = dependent(task(executor)).runAfterBothAsync(timeout, doCall, PromiseOrigin.PARAM_ONLY);
589+
Promise<?> backoff = delay(Duration.ofMillis(backoffDelayMillis));
590+
// Call should be done via CompletableTask to let it be interruptible
591+
finalPromise = CompletableTask.asyncOn(executor).runAfterBothAsync(backoff, doCall);
592+
callPromise = backoff; // Canceling timeout will cancel the chain above
589593
} else {
590594
// Immediately send to executor
591-
callPromise = CompletableTask.runAsync(doCall, executor);
595+
callPromise = finalPromise = CompletableTask.runAsync(doCall, executor);
592596
}
593597
callPromiseRef.set(callPromise);
594598
// If result promise is cancelled after callPromise was set need to stop;
595599
if (resultPromise.isDone()) {
596600
callPromise.cancel(true);
597-
return;
601+
} else if (answer.hasTimeout()) {
602+
// Restrict execution time of the final promise
603+
// Timeout should be a sum of policy timeout and policy backoff
604+
// while timer is set up immediately
605+
long totalTimeout = Math.max(0, answer.timeoutDelayMillis()) + Math.max(0, answer.backoffDelayMillis());
606+
finalPromise.orTimeout(Duration.ofMillis(totalTimeout), true);
598607
}
599608

600609
} else {
601-
resultPromise.onFailure(ctx.getLastThrowable());
610+
resultPromise.onFailure(ctx.asFailure());
602611
}
603612
}
604613

@@ -667,4 +676,16 @@ public Thread newThread(Runnable r) {
667676
return result;
668677
}
669678
});
679+
680+
private static class ObjectRef<T> {
681+
private final T reference;
682+
683+
ObjectRef(T reference) {
684+
this.reference = reference;
685+
}
686+
687+
T dereference() {
688+
return reference;
689+
}
690+
}
670691
}

0 commit comments

Comments
 (0)