Skip to content

Commit f89bb0f

Browse files
committed
Polling functionality (no backoff implementation yet)
1 parent b3fb6a8 commit f89bb0f

File tree

4 files changed

+248
-0
lines changed

4 files changed

+248
-0
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package net.tascalate.concurrent;
2+
3+
public interface Backoff {
4+
long delayMillis(RetryContext context);
5+
}

src/main/java/net/tascalate/concurrent/Promises.java

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,18 @@
2121
import java.util.Collections;
2222
import java.util.List;
2323
import java.util.Objects;
24+
import java.util.Optional;
25+
import java.util.concurrent.Callable;
2426
import java.util.concurrent.CompletableFuture;
2527
import java.util.concurrent.CompletionStage;
28+
import java.util.concurrent.Executor;
2629
import java.util.concurrent.Executors;
2730
import java.util.concurrent.Future;
2831
import java.util.concurrent.ScheduledExecutorService;
2932
import java.util.concurrent.ThreadFactory;
3033
import java.util.concurrent.TimeUnit;
3134
import java.util.concurrent.TimeoutException;
35+
import java.util.concurrent.atomic.AtomicReference;
3236
import java.util.function.BiConsumer;
3337
import java.util.function.Consumer;
3438
import java.util.function.Function;
@@ -131,6 +135,14 @@ public static <T> DependentPromise<T> dependent(CompletionStage<T> stage) {
131135
public static <T> DependentPromise<T> dependent(Promise<T> stage) {
132136
return DependentPromise.from(stage);
133137
}
138+
139+
public static Promise<Void> task(Executor executor) {
140+
return CompletableTask.asyncOn(executor);
141+
}
142+
143+
public static <T> Promise<T> task(CompletionStage<T> stage, Executor executor) {
144+
return dependent(task(executor)).thenCombineAsync(stage, (u, v) -> v, PromiseOrigin.PARAM_ONLY);
145+
}
134146

135147
private static <T, R> CompletablePromise<R> createLinkedPromise(CompletionStage<T> stage) {
136148
return new CompletablePromise<R>() {
@@ -517,6 +529,78 @@ public static <T> Promise<T> failAfter(long delay, TimeUnit timeUnit) {
517529
return failAfter( toDuration(delay, timeUnit) );
518530
}
519531

532+
533+
public static <T> Promise<T> poll(Callable<? extends T> codeBlock, Executor executor, RetryPolicy retryPolicy) {
534+
return pollOptional(() -> Optional.ofNullable(codeBlock.call()), executor, retryPolicy);
535+
}
536+
537+
public static <T> Promise<T> pollOptional(Callable<Optional<? extends T>> codeBlock, Executor executor, RetryPolicy retryPolicy) {
538+
final CompletablePromise<T> promise = new CompletablePromise<>();
539+
final AtomicReference<Promise<?>> callPromiseRef = new AtomicReference<>();
540+
// Cleanup latest timeout on completion;
541+
promise.whenComplete(
542+
(r, e) ->
543+
Optional
544+
.of(callPromiseRef)
545+
.map(AtomicReference::get)
546+
.ifPresent( p -> p.cancel(true) )
547+
);
548+
RetryContext ctx = RetryContext.initial(retryPolicy);
549+
pollOnce(codeBlock, executor, ctx, promise, callPromiseRef);
550+
return promise;
551+
}
552+
553+
private static <T> void pollOnce(Callable<Optional<? extends T>> codeBlock,
554+
Executor executor, RetryContext ctx,
555+
CompletablePromise<T> resultPromise,
556+
AtomicReference<Promise<?>> callPromiseRef) {
557+
558+
// Promise may be cancelled outside of polling
559+
if (resultPromise.isDone()) {
560+
return;
561+
}
562+
563+
long executionDelayMillis = ctx.executionDelayMillis();
564+
if (executionDelayMillis >= 0) {
565+
Runnable doCall = () -> {
566+
long startTime = System.currentTimeMillis();
567+
try {
568+
Optional<? extends T> result = codeBlock.call();
569+
if (result.isPresent()) {
570+
resultPromise.onSuccess(result.get());
571+
} else {
572+
long finishTime = System.currentTimeMillis();
573+
RetryContext nextCtx = ctx.getNextRetry(finishTime - startTime);
574+
pollOnce(codeBlock, executor, nextCtx, resultPromise, callPromiseRef);
575+
}
576+
} catch (Exception ex) {
577+
long finishTime = System.currentTimeMillis();
578+
RetryContext nextCtx = ctx.getNextRetry(finishTime - startTime, ex);
579+
pollOnce(codeBlock, executor, nextCtx, resultPromise, callPromiseRef);
580+
}
581+
};
582+
583+
Promise<?> callPromise;
584+
if (executionDelayMillis > 0) {
585+
// Timeout itself
586+
Promise<?> timeout = delay(Duration.ofMillis(executionDelayMillis));
587+
// Call should be done via CompletableTask to let it be interruptible
588+
callPromise = dependent(task(executor)).runAfterBothAsync(timeout, doCall, PromiseOrigin.PARAM_ONLY);
589+
} else {
590+
// Immediately send to executor
591+
callPromise = CompletableTask.runAsync(doCall, executor);
592+
}
593+
callPromiseRef.set(callPromise);
594+
// If result promise is cancelled after callPromise was set need to stop;
595+
if (resultPromise.isDone()) {
596+
callPromise.cancel(true);
597+
return;
598+
}
599+
600+
} else {
601+
resultPromise.onFailure(ctx.getLastThrowable());
602+
}
603+
}
520604

521605
private static <T> Promise<T> unwrap(CompletionStage<List<T>> original, boolean unwrapException) {
522606
return from(
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package net.tascalate.concurrent;
2+
3+
public class RetryContext {
4+
private final RetryPolicy policy;
5+
private final int retry;
6+
private final long lastCallDuartion;
7+
private final Throwable lastThrowable;
8+
9+
public RetryContext(RetryPolicy policy, int retry, long lastCallDuration, Throwable lastThrowable) {
10+
this.policy = policy;
11+
this.retry = retry;
12+
this.lastCallDuartion = lastCallDuration;
13+
this.lastThrowable = lastThrowable;
14+
}
15+
16+
17+
public static RetryContext initial(RetryPolicy policy) {
18+
return new RetryContext(policy, 0, 0, null);
19+
}
20+
21+
public long executionDelayMillis() {
22+
if (policy.shouldContinue(this) ) {
23+
return policy.delayInMillis(this);
24+
} else {
25+
return -1;
26+
}
27+
}
28+
29+
public int getRetryCount() {
30+
return retry;
31+
}
32+
33+
public long getLastCallDuration() {
34+
return lastCallDuartion;
35+
}
36+
37+
public Throwable getLastThrowable() {
38+
return lastThrowable;
39+
}
40+
41+
public RetryContext getNextRetry(long callDuration) {
42+
return new RetryContext(policy, retry + 1, callDuration, null);
43+
}
44+
45+
public RetryContext getNextRetry(long callDuration, Throwable throwable) {
46+
return new RetryContext(policy, retry + 1, callDuration, throwable);
47+
}
48+
49+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package net.tascalate.concurrent;
2+
3+
import java.util.Arrays;
4+
import java.util.Collections;
5+
import java.util.HashSet;
6+
import java.util.Set;
7+
import java.util.function.Predicate;
8+
9+
public class RetryPolicy {
10+
public static final RetryPolicy DEFAULT = new RetryPolicy().retryOn(Exception.class);
11+
12+
private final int maxRetries;
13+
private final Set<Class<? extends Throwable>> retryOn;
14+
private final Set<Class<? extends Throwable>> abortOn;
15+
private final Predicate<RetryContext> retryPredicate;
16+
private final Predicate<RetryContext> abortPredicate;
17+
private final Backoff backoff;
18+
19+
@SafeVarargs
20+
public final RetryPolicy retryOn(Class<? extends Throwable>... retryOnThrowables) {
21+
return new RetryPolicy(maxRetries, setPlusElems(retryOn, retryOnThrowables), abortOn, retryPredicate, abortPredicate, backoff);
22+
}
23+
24+
@SafeVarargs
25+
public final RetryPolicy abortOn(Class<? extends Throwable>... abortOnThrowables) {
26+
return new RetryPolicy(maxRetries, retryOn, setPlusElems(abortOn, abortOnThrowables), retryPredicate, abortPredicate, backoff);
27+
}
28+
29+
public RetryPolicy abortIf(Predicate<RetryContext> abortPredicate) {
30+
return new RetryPolicy(maxRetries, retryOn, abortOn, retryPredicate, abortPredicate.or(abortPredicate), backoff);
31+
}
32+
33+
public RetryPolicy retryIf(Predicate<RetryContext> retryPredicate) {
34+
return new RetryPolicy(maxRetries, retryOn, abortOn, this.retryPredicate.or(retryPredicate), abortPredicate, backoff);
35+
}
36+
37+
public RetryPolicy dontRetry() {
38+
return new RetryPolicy(0, retryOn, abortOn, retryPredicate, abortPredicate, backoff);
39+
}
40+
41+
public RetryPolicy withMaxRetries(int maxRetries) {
42+
return new RetryPolicy(maxRetries, retryOn, abortOn, retryPredicate, abortPredicate, backoff);
43+
}
44+
45+
public RetryPolicy withBackoff(Backoff backoff) {
46+
return new RetryPolicy(maxRetries, retryOn, abortOn, retryPredicate, abortPredicate, backoff);
47+
}
48+
49+
public RetryPolicy(int maxRetries, Set<Class<? extends Throwable>> retryOn, Set<Class<? extends Throwable>> abortOn, Predicate<RetryContext> retryPredicate, Predicate<RetryContext> abortPredicate, Backoff backoff) {
50+
this.maxRetries = maxRetries;
51+
this.retryOn = retryOn;
52+
this.abortOn = abortOn;
53+
this.retryPredicate = retryPredicate;
54+
this.abortPredicate = abortPredicate;
55+
this.backoff = backoff;
56+
}
57+
58+
public RetryPolicy() {
59+
this(1000);
60+
}
61+
62+
public RetryPolicy(long defaultDelay) {
63+
this(Integer.MAX_VALUE,
64+
Collections.emptySet(), Collections.emptySet(),
65+
ctx -> true, ctx -> false,
66+
ctx -> ctx.getRetryCount() == 0 ? 0 : defaultDelay
67+
);
68+
}
69+
70+
public boolean shouldContinue(RetryContext context) {
71+
if (tooManyRetries(context)) {
72+
return false;
73+
}
74+
if (abortPredicate.test(context)) {
75+
return false;
76+
}
77+
if (retryPredicate.test(context)) {
78+
return true;
79+
}
80+
return exceptionClassRetryable(context);
81+
}
82+
83+
public long delayInMillis(RetryContext context) {
84+
return backoff.delayMillis(context);
85+
}
86+
87+
private boolean tooManyRetries(RetryContext context) {
88+
return context.getRetryCount() > maxRetries;
89+
}
90+
91+
private boolean exceptionClassRetryable(RetryContext context) {
92+
if (context.getLastThrowable() == null) {
93+
return true;
94+
}
95+
final Class<? extends Throwable> e = context.getLastThrowable().getClass();
96+
return !matches(e, abortOn) && matches(e, retryOn);
97+
}
98+
99+
private static boolean matches(Class<? extends Throwable> throwable, Set<Class<? extends Throwable>> set) {
100+
return set.stream().anyMatch(c -> c.isAssignableFrom(throwable));
101+
}
102+
103+
@SafeVarargs
104+
private static <T> Set<T> setPlusElems(Set<T> initial, T... newElement) {
105+
final HashSet<T> copy = new HashSet<>(initial);
106+
copy.addAll(Arrays.asList(newElement));
107+
return Collections.unmodifiableSet(copy);
108+
}
109+
110+
}

0 commit comments

Comments
 (0)