Skip to content

Commit 91c7c82

Browse files
committed
Expose latest result with RetryContext, RetryPolicy now responsible to accept/reject result, Promises.poll* is removed in favor of Poll.retry*
1 parent 6858108 commit 91c7c82

16 files changed

+304
-249
lines changed

src/main/java/net/tascalate/concurrent/DelayPolicy.java

Lines changed: 40 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -33,102 +33,102 @@
3333
import net.tascalate.concurrent.delays.ProportionalRandomDelayPolicy;
3434
import net.tascalate.concurrent.delays.UniformRandomDelayPolicy;
3535

36-
public interface DelayPolicy {
37-
public static final DelayPolicy DEFAULT = new FirstRetryNoDelayPolicy(new FixedIntervalDelayPolicy());
38-
public static final DelayPolicy INVALID = ctx -> Timeouts.NEGATIVE_DURATION;
36+
public interface DelayPolicy<T> {
37+
public static final DelayPolicy<Object> DEFAULT = new FirstRetryNoDelayPolicy<Object>(new FixedIntervalDelayPolicy<>());
38+
public static final DelayPolicy<Object> INVALID = ctx -> Timeouts.NEGATIVE_DURATION;
3939

40-
Duration delay(RetryContext context);
40+
Duration delay(RetryContext<? extends T> retryContext);
4141

42-
public static DelayPolicy fixedInterval() {
43-
return new FixedIntervalDelayPolicy();
42+
public static <T> DelayPolicy<T> fixedInterval() {
43+
return new FixedIntervalDelayPolicy<>();
4444
}
4545

46-
public static DelayPolicy fixedInterval(Duration interval) {
47-
return new FixedIntervalDelayPolicy(interval);
46+
public static <T> DelayPolicy<T> fixedInterval(Duration interval) {
47+
return new FixedIntervalDelayPolicy<>(interval);
4848
}
4949

50-
public static DelayPolicy fixedInterval(long interval, TimeUnit timeUnit) {
50+
public static <T> DelayPolicy<T> fixedInterval(long interval, TimeUnit timeUnit) {
5151
return fixedInterval(Timeouts.toDuration(interval, timeUnit));
5252
}
5353

54-
public static DelayPolicy fixedInterval(long intervalMillis) {
54+
public static <T> DelayPolicy<T> fixedInterval(long intervalMillis) {
5555
return fixedInterval(Duration.ofMillis(intervalMillis));
5656
}
5757

58-
public static DelayPolicy exponential(double multiplier) {
59-
return new ExponentialDelayPolicy(multiplier);
58+
public static <T> DelayPolicy<T> exponential(double multiplier) {
59+
return new ExponentialDelayPolicy<>(multiplier);
6060
}
6161

62-
public static DelayPolicy exponential(Duration initialDelay, double multiplier) {
63-
return new ExponentialDelayPolicy(initialDelay, multiplier);
62+
public static <T> DelayPolicy<T> exponential(Duration initialDelay, double multiplier) {
63+
return new ExponentialDelayPolicy<>(initialDelay, multiplier);
6464
}
6565

66-
public static DelayPolicy exponential(long initialDelay, TimeUnit timeUnit, double multiplier) {
66+
public static <T> DelayPolicy<T> exponential(long initialDelay, TimeUnit timeUnit, double multiplier) {
6767
return exponential(Timeouts.toDuration(initialDelay, timeUnit), multiplier);
6868
}
6969

70-
public static DelayPolicy exponential(long initialDelayMillis, double multiplier) {
70+
public static <T> DelayPolicy<T> exponential(long initialDelayMillis, double multiplier) {
7171
return exponential(Duration.ofMillis(initialDelayMillis), multiplier);
7272
}
7373

74-
default DelayPolicy withUniformJitter() {
75-
return new UniformRandomDelayPolicy(this);
74+
default DelayPolicy<T> withUniformJitter() {
75+
return new UniformRandomDelayPolicy<>(this);
7676
}
7777

78-
default DelayPolicy withUniformJitter(long range) {
78+
default DelayPolicy<T> withUniformJitter(long range) {
7979
return withUniformJitter(range, TimeUnit.MILLISECONDS);
8080
}
8181

82-
default DelayPolicy withUniformJitter(long range, TimeUnit timeUnit) {
82+
default DelayPolicy<T> withUniformJitter(long range, TimeUnit timeUnit) {
8383
return withUniformJitter(Timeouts.toDuration(range, timeUnit));
8484
}
8585

86-
default DelayPolicy withUniformJitter(Duration range) {
87-
return new UniformRandomDelayPolicy(this, range);
86+
default DelayPolicy<T> withUniformJitter(Duration range) {
87+
return new UniformRandomDelayPolicy<>(this, range);
8888
}
8989

90-
default DelayPolicy withProportionalJitter() {
91-
return new ProportionalRandomDelayPolicy(this);
90+
default DelayPolicy<T> withProportionalJitter() {
91+
return new ProportionalRandomDelayPolicy<>(this);
9292
}
9393

94-
default DelayPolicy withProportionalJitter(double multiplier) {
95-
return new ProportionalRandomDelayPolicy(this, multiplier);
94+
default DelayPolicy<T> withProportionalJitter(double multiplier) {
95+
return new ProportionalRandomDelayPolicy<>(this, multiplier);
9696
}
9797

98-
default DelayPolicy withMinDelay() {
99-
return new BoundedMinDelayPolicy(this);
98+
default DelayPolicy<T> withMinDelay() {
99+
return new BoundedMinDelayPolicy<>(this);
100100
}
101101

102-
default DelayPolicy withMinDelay(Duration minDelay) {
103-
return new BoundedMinDelayPolicy(this, minDelay);
102+
default DelayPolicy<T> withMinDelay(Duration minDelay) {
103+
return new BoundedMinDelayPolicy<>(this, minDelay);
104104
}
105105

106-
default DelayPolicy withMinDelay(long minDelay, TimeUnit timeUnit) {
106+
default DelayPolicy<T> withMinDelay(long minDelay, TimeUnit timeUnit) {
107107
return withMaxDelay(Timeouts.toDuration(minDelay, timeUnit));
108108
}
109109

110-
default DelayPolicy withMinDelay(long minDelayMillis) {
110+
default DelayPolicy<T> withMinDelay(long minDelayMillis) {
111111
return withMinDelay(Duration.ofMillis(minDelayMillis));
112112
}
113113

114-
default DelayPolicy withMaxDelay() {
115-
return new BoundedMaxDelayPolicy(this);
114+
default DelayPolicy<T> withMaxDelay() {
115+
return new BoundedMaxDelayPolicy<>(this);
116116
}
117117

118-
default DelayPolicy withMaxDelay(Duration maxDelay) {
119-
return new BoundedMaxDelayPolicy(this, maxDelay);
118+
default DelayPolicy<T> withMaxDelay(Duration maxDelay) {
119+
return new BoundedMaxDelayPolicy<>(this, maxDelay);
120120
}
121121

122-
default DelayPolicy withMaxDelay(long maxDelay, TimeUnit timeUnit) {
122+
default DelayPolicy<T> withMaxDelay(long maxDelay, TimeUnit timeUnit) {
123123
return withMaxDelay(Timeouts.toDuration(maxDelay, timeUnit));
124124
}
125125

126-
default DelayPolicy withMaxDelay(long maxDelayMillis) {
126+
default DelayPolicy<T> withMaxDelay(long maxDelayMillis) {
127127
return withMaxDelay(Duration.ofMillis(maxDelayMillis));
128128
}
129129

130-
default DelayPolicy withFirstRetryNoDelay() {
131-
return new FirstRetryNoDelayPolicy(this);
130+
default DelayPolicy<T> withFirstRetryNoDelay() {
131+
return new FirstRetryNoDelayPolicy<>(this);
132132
}
133133

134134
public static boolean isValid(Duration d) {

src/main/java/net/tascalate/concurrent/Promises.java

Lines changed: 37 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -449,83 +449,45 @@ public static <T> Promise<List<T>> atLeast(int minResultsCount, int maxErrorsCou
449449
}
450450
}
451451

452-
public static Promise<Void> retry(Runnable codeBlock, Executor executor, RetryPolicy retryPolicy) {
452+
public static Promise<Void> retry(Runnable codeBlock, Executor executor, RetryPolicy<? super Void> retryPolicy) {
453453
return retry(ctx -> { codeBlock.run(); }, executor, retryPolicy);
454454
}
455455

456-
public static Promise<Void> retry(RetryRunnable codeBlock, Executor executor, RetryPolicy retryPolicy) {
457-
Promise<Object> wrappedResult = poll(
458-
ctx -> { codeBlock.run(ctx); return IGNORE; },
459-
executor, retryPolicy
460-
);
461-
return wrappedResult.dependent()
462-
.thenApply(v -> (Void)null, true)
463-
.raw();
456+
public static Promise<Void> retry(RetryRunnable codeBlock, Executor executor, RetryPolicy<? super Void> retryPolicy) {
457+
return retry(ctx -> { codeBlock.run(ctx); return null; }, executor, retryPolicy.acceptNullResult());
464458
}
465-
466-
public static <T> Promise<T> retry(Callable<? extends T> codeBlock, Executor executor, RetryPolicy retryPolicy) {
459+
460+
public static <T> Promise<T> retry(Callable<T> codeBlock, Executor executor, RetryPolicy<? super T> retryPolicy) {
467461
return retry(toRetryCallable(codeBlock), executor, retryPolicy);
468462
}
469-
470-
public static <T> Promise<T> retry(RetryCallable<? extends T> codeBlock, Executor executor, RetryPolicy retryPolicy) {
471-
Promise<Reference<T>> wrappedResult = poll(
472-
ctx -> new Reference<>( codeBlock.call(ctx) ),
473-
executor, retryPolicy
474-
);
475-
return wrappedResult.dependent()
476-
.thenApply(Reference::get, true)
477-
.raw();
478-
}
479-
480-
public static <T> Promise<T> retryFuture(Callable<? extends CompletionStage<? extends T>> invoker, RetryPolicy retryPolicy) {
481-
return retryFuture(toRetryCallable(invoker), retryPolicy);
482-
}
483-
484-
public static <T> Promise<T> retryFuture(RetryCallable<? extends CompletionStage<? extends T>> invoker, RetryPolicy retryPolicy) {
485-
Promise<Reference<T>> wrappedResult = pollFuture(
486-
ctx -> Promises.from(invoker.call(ctx))
487-
.dependent()
488-
.thenApply(r -> new Reference<T>(r), true)
489-
.raw()
490-
,
491-
retryPolicy
492-
);
493-
return wrappedResult.dependent()
494-
.thenApply(Reference::get, true)
495-
.raw();
496-
}
497-
498-
public static <T> Promise<T> poll(Callable<T> codeBlock, Executor executor, RetryPolicy retryPolicy) {
499-
return poll(toRetryCallable(codeBlock), executor, retryPolicy);
500-
}
501463

502-
public static <T> Promise<T> poll(RetryCallable<T> codeBlock, Executor executor, RetryPolicy retryPolicy) {
464+
public static <T> Promise<T> retry(RetryCallable<T, T> codeBlock, Executor executor, RetryPolicy<? super T> retryPolicy) {
503465
return invokePoller(
504466
retryPolicy,
505467
(ctx, result, cancellation) -> pollValueOnce(codeBlock, executor, ctx, result, cancellation)
506468
);
507469
}
508470

509-
public static <T> Promise<T> pollOptional(Callable<Optional<? extends T>> codeBlock, Executor executor, RetryPolicy retryPolicy) {
510-
return pollOptional(toRetryCallable(codeBlock), executor, retryPolicy);
471+
public static <T> Promise<T> retryOptional(Callable<Optional<T>> codeBlock, Executor executor, RetryPolicy<? super T> retryPolicy) {
472+
return retryOptional(toRetryCallable(codeBlock), executor, retryPolicy);
511473
}
512474

513-
public static <T> Promise<T> pollOptional(RetryCallable<Optional<? extends T>> codeBlock, Executor executor, RetryPolicy retryPolicy) {
514-
return poll(ctx -> codeBlock.call(ctx).orElse(null), executor, retryPolicy);
475+
public static <T> Promise<T> retryOptional(RetryCallable<Optional<T>, T> codeBlock, Executor executor, RetryPolicy<? super T> retryPolicy) {
476+
return retry(ctx -> codeBlock.call(ctx).orElse(null), executor, retryPolicy);
515477
}
516478

517-
public static <T> Promise<T> pollFuture(Callable<? extends CompletionStage<? extends T>> invoker, RetryPolicy retryPolicy) {
518-
return pollFuture(toRetryCallable(invoker), retryPolicy);
479+
public static <T> Promise<T> retryFuture(Callable<? extends CompletionStage<T>> invoker, RetryPolicy<? super T> retryPolicy) {
480+
return retryFuture(toRetryCallable(invoker), retryPolicy);
519481
}
520482

521-
public static <T> Promise<T> pollFuture(RetryCallable<? extends CompletionStage<? extends T>> invoker, RetryPolicy retryPolicy) {
483+
public static <T> Promise<T> retryFuture(RetryCallable<? extends CompletionStage<T>, T> invoker, RetryPolicy<? super T> retryPolicy) {
522484
return invokePoller(
523485
retryPolicy,
524486
(ctx, result, cancellation) -> pollFutureOnce(invoker, ctx, result, cancellation, null)
525487
);
526488
}
527489

528-
private static <T> Promise<T> invokePoller(RetryPolicy retryPolicy, F3<RetryContext, CompletableFuture<T>, Consumer<Promise<?>>> initiator) {
490+
private static <T> Promise<T> invokePoller(RetryPolicy<? super T> retryPolicy, F3<RetryContext<T>, CompletableFuture<T>, Consumer<Promise<?>>> initiator) {
529491
final CompletableFuture<T> result = new CompletableFuture<>();
530492
final AtomicReference<Promise<?>> callPromiseRef = new AtomicReference<>();
531493
// Cleanup latest timeout on completion;
@@ -544,14 +506,14 @@ private static <T> Promise<T> invokePoller(RetryPolicy retryPolicy, F3<RetryCont
544506
}
545507
};
546508

547-
RetryContext ctx = RetryContext.initial(retryPolicy);
509+
RetryContext<T> ctx = RetryContext.initial(retryPolicy);
548510
initiator.apply(ctx, result, cancellation);
549511
return new CompletableFutureWrapper<>(result);
550512
}
551513

552-
private static <T> void pollValueOnce(RetryCallable<? extends T> codeBlock,
514+
private static <T> void pollValueOnce(RetryCallable<T, T> codeBlock,
553515
Executor executor,
554-
RetryContext ctx,
516+
RetryContext<T> ctx,
555517
CompletableFuture<T> result,
556518
Consumer<Promise<?>> cancellation) {
557519

@@ -560,35 +522,36 @@ private static <T> void pollValueOnce(RetryCallable<? extends T> codeBlock,
560522
return;
561523
}
562524

563-
RetryPolicy.Outcome answer = ctx.shouldContinue();
564-
if (answer.shouldExecute()) {
525+
RetryPolicy.Verdict verdict = ctx.shouldContinue();
526+
if (verdict.shouldExecute()) {
565527
Supplier<Promise<?>> callSupplier = () -> {
566528
long startTime = System.nanoTime();
567529
Runnable call = () -> {
568530
try {
569531
T value = codeBlock.call(ctx);
570-
if (value != null) {
532+
if (ctx.isValidResult(value)) {
571533
result.complete(value);
572534
} else {
573535
long finishTime = System.nanoTime();
574-
RetryContext nextCtx = ctx.nextRetry(duration(startTime, finishTime));
536+
RetryContext<T> nextCtx = ctx.nextRetry(duration(startTime, finishTime), value);
575537
pollValueOnce(codeBlock, executor, nextCtx, result, cancellation);
576538
}
577539
} catch (Exception ex) {
578540
long finishTime = System.nanoTime();
579-
RetryContext nextCtx = ctx.nextRetry(duration(startTime, finishTime), ex);
541+
RetryContext<T> nextCtx = ctx.nextRetry(duration(startTime, finishTime), ex);
580542
pollValueOnce(codeBlock, executor, nextCtx, result, cancellation);
581543
}
582544
};
583545

584546
// Call should be done via CompletableTask to let it be interruptible
585547
Promise<?> p = CompletableTask.runAsync(call, executor);
586-
return applyExecutionTimeout(p, answer);
548+
return applyExecutionTimeout(p, verdict);
587549
};
588-
Duration backoffDelay = answer.backoffDelay();
550+
Duration backoffDelay = verdict.backoffDelay();
589551
if (DelayPolicy.isValid(backoffDelay)) {
590552
// Invocation after timeout, change cancellation target
591-
Promise<?> later = Timeouts.delay( backoffDelay )
553+
Promise<?> later = Timeouts
554+
.delay(backoffDelay)
592555
.dependent()
593556
.thenRun(() -> cancellation.accept( callSupplier.get() ), true);
594557
cancellation.accept( later );
@@ -601,8 +564,8 @@ private static <T> void pollValueOnce(RetryCallable<? extends T> codeBlock,
601564
}
602565
}
603566

604-
private static <T> void pollFutureOnce(RetryCallable<? extends CompletionStage<? extends T>> invoker,
605-
RetryContext ctx,
567+
private static <T> void pollFutureOnce(RetryCallable<? extends CompletionStage<T>, T> invoker,
568+
RetryContext<T> ctx,
606569
CompletableFuture<T> result,
607570
Consumer<Promise<?>> cancellation,
608571
Promise<?> prev) {
@@ -611,8 +574,8 @@ private static <T> void pollFutureOnce(RetryCallable<? extends CompletionStage<?
611574
return;
612575
}
613576

614-
RetryPolicy.Outcome answer = ctx.shouldContinue();
615-
if (answer.shouldExecute()) {
577+
RetryPolicy.Verdict verdict = ctx.shouldContinue();
578+
if (verdict.shouldExecute()) {
616579
Supplier<Promise<?>> callSupplier = () -> {
617580
long startTime = System.nanoTime();
618581

@@ -628,11 +591,11 @@ private static <T> void pollFutureOnce(RetryCallable<? extends CompletionStage<?
628591

629592
Promise<? extends T> p = target;
630593
p.whenComplete((value, ex) -> {
631-
if (null == ex && null != value) {
594+
if (null == ex && ctx.isValidResult(value)) {
632595
result.complete(value);
633596
} else {
634597
long finishTime = System.nanoTime();
635-
RetryContext nextCtx = ctx.nextRetry(
598+
RetryContext<T> nextCtx = ctx.nextRetry(
636599
duration(startTime, finishTime), unwrapCompletionException(ex)
637600
);
638601
boolean callLater = isRecursive.get() && Thread.currentThread() == invokerThread;
@@ -648,9 +611,9 @@ private static <T> void pollFutureOnce(RetryCallable<? extends CompletionStage<?
648611
}
649612
});
650613
isRecursive.set(false);
651-
return applyExecutionTimeout(p, answer);
614+
return applyExecutionTimeout(p, verdict);
652615
};
653-
Duration backoffDelay = answer.backoffDelay();
616+
Duration backoffDelay = verdict.backoffDelay();
654617
if (null != prev && DelayPolicy.isValid(backoffDelay)) {
655618
callLater(prev, backoffDelay, cancellation, () -> cancellation.accept( callSupplier.get() ));
656619
} else {
@@ -670,8 +633,8 @@ private static <T> void callLater(Promise<T> completedPromise, Duration delay, C
670633
cancellation.accept(later);
671634
}
672635

673-
private static <T> Promise<T> applyExecutionTimeout(Promise<T> singleInvocationPromise, RetryPolicy.Outcome answer) {
674-
Duration timeout = answer.timeout();
636+
private static <T> Promise<T> applyExecutionTimeout(Promise<T> singleInvocationPromise, RetryPolicy.Verdict verdict) {
637+
Duration timeout = verdict.timeout();
675638
if (DelayPolicy.isValid(timeout)) {
676639
singleInvocationPromise.orTimeout( timeout );
677640
}
@@ -721,7 +684,7 @@ private static <T> Promise<T> insufficientNumberOfArguments(int minResultCount,
721684
throw new IllegalArgumentException(message);
722685
}
723686

724-
private static <V> RetryCallable<V> toRetryCallable(Callable<? extends V> callable) {
687+
private static <V, T> RetryCallable<V, T> toRetryCallable(Callable<? extends V> callable) {
725688
return ctx -> callable.call();
726689
}
727690

src/main/java/net/tascalate/concurrent/RetryCallable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,6 @@
1616
package net.tascalate.concurrent;
1717

1818
@FunctionalInterface
19-
public interface RetryCallable<V> {
20-
V call(RetryContext ctx) throws Exception;
19+
public interface RetryCallable<V, T> {
20+
V call(RetryContext<T> ctx) throws Exception;
2121
}

0 commit comments

Comments
 (0)