Skip to content

Commit 9146c50

Browse files
committed
Better type bounds for Promises.retry(RetryCallable<..>...)
1 parent 5b1e7a2 commit 9146c50

File tree

2 files changed

+66
-45
lines changed

2 files changed

+66
-45
lines changed

src/main/java/net/tascalate/concurrent/Promises.java

Lines changed: 55 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -449,45 +449,67 @@ public static <T> Promise<List<T>> atLeast(int minResultsCount, int maxErrorsCou
449449
}
450450
}
451451

452-
public static Promise<Void> retry(Runnable codeBlock, Executor executor, RetryPolicy<? super Void> retryPolicy) {
452+
public static Promise<Void> retry(Runnable codeBlock, Executor executor,
453+
RetryPolicy<? super Void> retryPolicy) {
454+
453455
return retry(ctx -> { codeBlock.run(); }, executor, retryPolicy);
454456
}
455457

456-
public static Promise<Void> retry(RetryRunnable codeBlock, Executor executor, RetryPolicy<? super Void> retryPolicy) {
458+
public static Promise<Void> retry(RetryRunnable codeBlock, Executor executor,
459+
RetryPolicy<? super Void> retryPolicy) {
460+
457461
return retry(ctx -> { codeBlock.run(ctx); return null; }, executor, retryPolicy.acceptNullResult());
458462
}
459463

460-
public static <T> Promise<T> retry(Callable<T> codeBlock, Executor executor, RetryPolicy<? super T> retryPolicy) {
464+
public static <T> Promise<T> retry(Callable<T> codeBlock, Executor executor,
465+
RetryPolicy<? super T> retryPolicy) {
466+
461467
return retry(toRetryCallable(codeBlock), executor, retryPolicy);
462468
}
463469

464-
public static <T> Promise<T> retry(RetryCallable<T, T> codeBlock, Executor executor, RetryPolicy<? super T> retryPolicy) {
465-
return invokePoller(
466-
retryPolicy,
467-
(ctx, result, cancellation) -> pollValueOnce(codeBlock, executor, ctx, result, cancellation)
470+
public static <C, T extends C> Promise<T> retry(RetryCallable<T, C> codeBlock, Executor executor,
471+
RetryPolicy<? super C> retryPolicy) {
472+
473+
// Need type on lambda param to please Eclipse compiler
474+
return invokePoller(retryPolicy, (RetryContext<C> ctx,
475+
CompletableFuture<T> result,
476+
Consumer<Promise<?>> cancellation)
477+
-> pollValueOnce(codeBlock, executor, ctx, result, cancellation)
468478
);
469479
}
470480

471-
public static <T> Promise<T> retryOptional(Callable<Optional<T>> codeBlock, Executor executor, RetryPolicy<? super T> retryPolicy) {
481+
public static <T> Promise<T> retryOptional(Callable<Optional<T>> codeBlock, Executor executor,
482+
RetryPolicy<? super T> retryPolicy) {
483+
472484
return retryOptional(toRetryCallable(codeBlock), executor, retryPolicy);
473485
}
474486

475-
public static <T> Promise<T> retryOptional(RetryCallable<Optional<T>, T> codeBlock, Executor executor, RetryPolicy<? super T> retryPolicy) {
476-
return retry(ctx -> codeBlock.call(ctx).orElse(null), executor, retryPolicy);
487+
public static <C, T extends C> Promise<T> retryOptional(RetryCallable<Optional<T>, C> codeBlock, Executor executor,
488+
RetryPolicy<? super C> retryPolicy) {
489+
490+
// Need type on lambda param to please Eclipse compiler
491+
return retry((RetryContext<C> ctx) -> codeBlock.call(ctx).orElse(null), executor, retryPolicy);
477492
}
478493

479-
public static <T> Promise<T> retryFuture(Callable<? extends CompletionStage<T>> invoker, RetryPolicy<? super T> retryPolicy) {
494+
public static <T> Promise<T> retryFuture(Callable<? extends CompletionStage<T>> invoker,
495+
RetryPolicy<? super T> retryPolicy) {
496+
480497
return retryFuture(toRetryCallable(invoker), retryPolicy);
481498
}
482499

483-
public static <T> Promise<T> retryFuture(RetryCallable<? extends CompletionStage<T>, T> invoker, RetryPolicy<? super T> retryPolicy) {
484-
return invokePoller(
485-
retryPolicy,
486-
(ctx, result, cancellation) -> pollFutureOnce(invoker, ctx, result, cancellation, null)
500+
public static <C, T extends C> Promise<T> retryFuture(RetryCallable<? extends CompletionStage<T>, C> futureFactory,
501+
RetryPolicy<? super C> retryPolicy) {
502+
503+
return invokePoller(retryPolicy, (RetryContext<C> ctx,
504+
CompletableFuture<T> result,
505+
Consumer<Promise<?>> cancellation)
506+
-> pollFutureOnce(futureFactory, ctx, result, cancellation, null)
487507
);
488508
}
489509

490-
private static <T> Promise<T> invokePoller(RetryPolicy<? super T> retryPolicy, F3<RetryContext<T>, CompletableFuture<T>, Consumer<Promise<?>>> initiator) {
510+
private static <C, T extends C> Promise<T> invokePoller(RetryPolicy<? super C> retryPolicy,
511+
F3<RetryContext<C>, CompletableFuture<T>, Consumer<Promise<?>>> initiator) {
512+
491513
final CompletableFuture<T> result = new CompletableFuture<>();
492514
final AtomicReference<Promise<?>> callPromiseRef = new AtomicReference<>();
493515
// Cleanup latest timeout on completion;
@@ -506,16 +528,16 @@ private static <T> Promise<T> invokePoller(RetryPolicy<? super T> retryPolicy, F
506528
}
507529
};
508530

509-
RetryContext<T> ctx = RetryContext.initial(retryPolicy);
531+
RetryContext<C> ctx = RetryContext.initial(retryPolicy);
510532
initiator.apply(ctx, result, cancellation);
511533
return new CompletableFutureWrapper<>(result);
512534
}
513535

514-
private static <T> void pollValueOnce(RetryCallable<T, T> codeBlock,
515-
Executor executor,
516-
RetryContext<T> ctx,
517-
CompletableFuture<T> result,
518-
Consumer<Promise<?>> cancellation) {
536+
private static <C, T extends C> void pollValueOnce(RetryCallable<T, C> codeBlock,
537+
Executor executor,
538+
RetryContext<C> ctx,
539+
CompletableFuture<T> result,
540+
Consumer<Promise<?>> cancellation) {
519541

520542
// Promise may be cancelled outside of polling
521543
if (result.isDone()) {
@@ -533,12 +555,12 @@ private static <T> void pollValueOnce(RetryCallable<T, T> codeBlock,
533555
result.complete(value);
534556
} else {
535557
long finishTime = System.nanoTime();
536-
RetryContext<T> nextCtx = ctx.nextRetry(duration(startTime, finishTime), value);
558+
RetryContext<C> nextCtx = ctx.nextRetry(duration(startTime, finishTime), value);
537559
pollValueOnce(codeBlock, executor, nextCtx, result, cancellation);
538560
}
539561
} catch (Exception ex) {
540562
long finishTime = System.nanoTime();
541-
RetryContext<T> nextCtx = ctx.nextRetry(duration(startTime, finishTime), ex);
563+
RetryContext<C> nextCtx = ctx.nextRetry(duration(startTime, finishTime), ex);
542564
pollValueOnce(codeBlock, executor, nextCtx, result, cancellation);
543565
}
544566
};
@@ -564,11 +586,11 @@ private static <T> void pollValueOnce(RetryCallable<T, T> codeBlock,
564586
}
565587
}
566588

567-
private static <T> void pollFutureOnce(RetryCallable<? extends CompletionStage<T>, T> invoker,
568-
RetryContext<T> ctx,
569-
CompletableFuture<T> result,
570-
Consumer<Promise<?>> cancellation,
571-
Promise<?> prev) {
589+
private static <C, T extends C> void pollFutureOnce(RetryCallable<? extends CompletionStage<T>, C> futureFactory,
590+
RetryContext<C> ctx,
591+
CompletableFuture<T> result,
592+
Consumer<Promise<?>> cancellation,
593+
Promise<?> prev) {
572594
// Promise may be cancelled outside of polling
573595
if (result.isDone()) {
574596
return;
@@ -581,7 +603,7 @@ private static <T> void pollFutureOnce(RetryCallable<? extends CompletionStage<T
581603

582604
Promise<? extends T> target;
583605
try {
584-
target = Promises.from(invoker.call(ctx));
606+
target = Promises.from(futureFactory.call(ctx));
585607
} catch (Exception ex) {
586608
target = Promises.failure(ex);
587609
}
@@ -595,18 +617,18 @@ private static <T> void pollFutureOnce(RetryCallable<? extends CompletionStage<T
595617
result.complete(value);
596618
} else {
597619
long finishTime = System.nanoTime();
598-
RetryContext<T> nextCtx = ctx.nextRetry(
620+
RetryContext<C> nextCtx = ctx.nextRetry(
599621
duration(startTime, finishTime), unwrapCompletionException(ex)
600622
);
601623
boolean callLater = isRecursive.get() && Thread.currentThread() == invokerThread;
602624
if (callLater) {
603625
// Call after minimal possible delay
604626
callLater(
605627
p, Duration.ofNanos(1), cancellation,
606-
() -> pollFutureOnce(invoker, nextCtx, result, cancellation, p)
628+
() -> pollFutureOnce(futureFactory, nextCtx, result, cancellation, p)
607629
);
608630
} else {
609-
pollFutureOnce(invoker, nextCtx, result, cancellation, p);
631+
pollFutureOnce(futureFactory, nextCtx, result, cancellation, p);
610632
}
611633
}
612634
});

src/test/java/net/tascalate/concurrent/J8Examples.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package net.tascalate.concurrent;
1717

18+
import java.math.BigInteger;
1819
import java.time.Duration;
1920
import java.util.Arrays;
2021
import java.util.concurrent.CompletionStage;
@@ -26,9 +27,19 @@
2627
import net.tascalate.concurrent.decorators.ExtendedPromiseDecorator;
2728

2829
public class J8Examples {
30+
31+
static BigInteger tryCalc(RetryContext<Number> ctx) {
32+
return BigInteger.ONE;
33+
}
2934

3035
public static void main(final String[] argv) throws InterruptedException, ExecutionException {
3136
final TaskExecutorService executorService = TaskExecutors.newFixedThreadPool(6);
37+
38+
Promise<BigInteger> tryTyping = Promises.retry(
39+
J8Examples::tryCalc, executorService,
40+
RetryPolicy.<Number>create().withResultValidator(v -> v.intValue() > 0).withMaxRetries(2)
41+
);
42+
System.out.println( tryTyping.get() );
3243

3344
Promise<Object> k = CompletableTask.supplyAsync(() -> produceStringSlow("-ABC"), executorService);
3445
//Promise<Object> k = CompletableTask.complete("ABC", executorService);
@@ -70,18 +81,6 @@ public static void main(final String[] argv) throws InterruptedException, Execut
7081
//p.cancel(true);
7182
p.get();
7283

73-
Promise<String> retry1 = Promises.retry(
74-
() -> "ABC",
75-
executorService, RetryPolicy.DEFAULT
76-
);
77-
78-
Promise<String> retry2 = Promises.retry(
79-
ctx -> "ABC",
80-
executorService, RetryPolicy.DEFAULT.rejectNullResult()
81-
);
82-
83-
System.out.println(retry1.join() + " vs " + retry2.join());
84-
8584
Promise<String> pollerFuture = Promises.retryFuture(
8685
ctx -> pollingFutureMethod(ctx, executorService),
8786
RetryPolicy.DEFAULT

0 commit comments

Comments
 (0)