Skip to content

Commit 8d9c7ca

Browse files
committed
Simplify retry* functionality
1 parent 51842ef commit 8d9c7ca

File tree

2 files changed

+68
-117
lines changed

2 files changed

+68
-117
lines changed

src/main/java/net/tascalate/concurrent/AsyncCloseable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
* resources are relinquished.
3131
*
3232
* <p>
33-
* May be used with the methods {@link Promises#tryApply(CompletionStage, Function)},
34-
* {@link Promises#tryCompose(AsyncCloseable, Function)} to emulate the behavior of a try
33+
* May be used with the methods {@link Promises#tryApplyEx(CompletionStage, Function)},
34+
* {@link Promises#tryComposeEx(Promise, Function)} to emulate the behavior of a try
3535
* with resources block.
3636
*
3737
*/

src/main/java/net/tascalate/concurrent/Promises.java

Lines changed: 66 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ public static <K, T> Promise<Map<K, T>> all(boolean cancelRemaining,
504504
Map<? extends K, ? extends CompletionStage<? extends T>> promises) {
505505
Map<K, T> result = new ConcurrentHashMap<>();
506506
return
507-
all(cancelRemaining, groupToMap(result, promises))
507+
all(cancelRemaining, collectKeyedResults(result, promises))
508508
.dependent()
509509
.thenApply(__ -> Collections.unmodifiableMap(result), true)
510510
.unwrap();
@@ -582,7 +582,7 @@ public static <K, T> Promise<Map<K, T>> any(boolean cancelRemaining,
582582
Map<? extends K, ? extends CompletionStage<? extends T>> promises) {
583583
Map<K, T> result = new ConcurrentHashMap<>();
584584
return
585-
any(cancelRemaining, groupToMap(result, promises))
585+
any(cancelRemaining, collectKeyedResults(result, promises))
586586
.dependent()
587587
.thenApply(__ -> Collections.unmodifiableMap(result), true)
588588
.unwrap();
@@ -665,7 +665,7 @@ public static <K, T> Promise<Map<K, T>> anyStrict(boolean cancelRemaining,
665665
Map<? extends K, ? extends CompletionStage<? extends T>> promises) {
666666
Map<K, T> result = new ConcurrentHashMap<>();
667667
return
668-
anyStrict(cancelRemaining, groupToMap(result, promises))
668+
anyStrict(cancelRemaining, collectKeyedResults(result, promises))
669669
.dependent()
670670
.thenApply(__ -> Collections.unmodifiableMap(result), true)
671671
.unwrap();
@@ -743,7 +743,7 @@ public static <K, T> Promise<Map<K, T>> atLeast(int minResultsCount, boolean can
743743
Map<? extends K, ? extends CompletionStage<? extends T>> promises) {
744744
Map<K, T> result = new ConcurrentHashMap<>();
745745
return
746-
atLeast(minResultsCount, cancelRemaining, groupToMap(result, promises))
746+
atLeast(minResultsCount, cancelRemaining, collectKeyedResults(result, promises))
747747
.dependent()
748748
.thenApply(__ -> Collections.unmodifiableMap(result), true)
749749
.unwrap();
@@ -825,7 +825,7 @@ public static <K, T> Promise<Map<K, T>> atLeastStrict(int minResultsCount, boole
825825
Map<? extends K, ? extends CompletionStage<? extends T>> promises) {
826826
Map<K, T> result = new ConcurrentHashMap<>();
827827
return
828-
atLeastStrict(minResultsCount, cancelRemaining, groupToMap(result, promises))
828+
atLeastStrict(minResultsCount, cancelRemaining, collectKeyedResults(result, promises))
829829
.dependent()
830830
.thenApply(__ -> Collections.unmodifiableMap(result), true)
831831
.unwrap();
@@ -890,7 +890,7 @@ public static <K, T> Promise<Map<K, T>> atLeast(int minResultsCount, int maxErro
890890
Map<? extends K, ? extends CompletionStage<? extends T>> promises) {
891891
Map<K, T> result = new ConcurrentHashMap<>();
892892
return
893-
atLeast(minResultsCount, maxErrorsCount, cancelRemaining, groupToMap(result, promises))
893+
atLeast(minResultsCount, maxErrorsCount, cancelRemaining, collectKeyedResults(result, promises))
894894
.dependent()
895895
.thenApply(__ -> Collections.unmodifiableMap(result), true)
896896
.unwrap();
@@ -916,7 +916,14 @@ public static <T> Promise<T> retry(Callable<T> codeBlock, Executor executor,
916916

917917
public static <T extends C, C> Promise<T> retry(RetryCallable<T, C> codeBlock, Executor executor,
918918
RetryPolicy<? super C> retryPolicy) {
919-
return tryValueOnce(codeBlock, executor, RetryContext.initial(retryPolicy));
919+
//
920+
return retryImpl((RetryContext<C> ctx) -> {
921+
try {
922+
return CompletableTask.submit(() -> codeBlock.call(ctx), executor);
923+
} catch (Throwable ex) {
924+
return failure(ex);
925+
}
926+
}, RetryContext.initial(retryPolicy), true);
920927
}
921928

922929
public static <T> Promise<T> retryOptional(Callable<Optional<T>> codeBlock, Executor executor,
@@ -940,61 +947,23 @@ public static <T> Promise<T> retryFuture(Callable<? extends CompletionStage<T>>
940947

941948
public static <T extends C, C> Promise<T> retryFuture(RetryCallable<? extends CompletionStage<T>, C> futureFactory,
942949
RetryPolicy<? super C> retryPolicy) {
943-
return tryFutureOnce(futureFactory, RetryContext.initial(retryPolicy));
944-
}
945-
946-
947-
private static <T extends C, C> Promise<T> tryValueOnce(RetryCallable<T, C> codeBlock,
948-
Executor executor,
949-
RetryContext<C> initialCtx) {
950-
951-
@SuppressWarnings("unchecked")
952-
RetryContext<C>[] ctxRef = new RetryContext[] {initialCtx};
953-
return loop(null, v -> null == v || !v.isSuccess(), (Try<T> v) -> {
954-
RetryContext<C> ctx = ctxRef[0];
955-
RetryPolicy.Verdict verdict = ctx.shouldContinue();
956-
if (!verdict.shouldExecute()) {
957-
return failure(ctx.asFailure());
950+
return retryImpl((RetryContext<C> ctx) -> {
951+
try {
952+
// Not sure how many time futureFactory.call will take
953+
return from(futureFactory.call(ctx));
954+
} catch (Throwable ex) {
955+
return failure(ex);
958956
}
959-
960-
// Use Try<T> to avoid stop on exception in loop
961-
Supplier<Promise<Try<T>>> callSupplier = () -> {
962-
long startTime = System.nanoTime();
963-
// Call should be done via CompletableTask to let it be interruptible
964-
Promise<T> p = CompletableTask.submit(() -> codeBlock.call(ctx), executor);
965-
return handleTryResult(p, ctxRef, verdict, startTime);
966-
};
967-
968-
Duration backoffDelay = verdict.backoffDelay();
969-
if (DelayPolicy.isValid(backoffDelay)) {
970-
// Invocation after timeout, change cancellation target
971-
DependentPromise<?> p = Timeouts.delay(backoffDelay).dependent();
972-
p.whenComplete((__, ex) -> {
973-
// Update ctx if timeout or cancel during back-off
974-
// It doesn't conflict with ctx modifications in actual call
975-
// while actual call is never happens in case of error
976-
if (null != ex) {
977-
ctxRef[0] = ctx.nextRetry(
978-
duration(0, 0), unwrapCompletionException(ex)
979-
);
980-
}
981-
});
982-
return p.thenCompose(__ -> callSupplier.get(), true);
983-
} else {
984-
return callSupplier.get();
985-
}
986-
})
987-
.dependent()
988-
.thenApply(Try::done, true);
989-
// Don't unwrap - internal use only
957+
}, RetryContext.initial(retryPolicy), true);
990958
}
991-
992-
private static <T extends C, C> Promise<T> tryFutureOnce(RetryCallable<? extends CompletionStage<T>, C> futureFactory,
993-
RetryContext<C> initialCtx) {
959+
960+
private static <T extends C, C> Promise<T> retryImpl(Function<? super RetryContext<C>, ? extends Promise<T>> futureFactory,
961+
RetryContext<C> initialCtx,
962+
boolean usePrevAsync) {
994963

995964
@SuppressWarnings("unchecked")
996965
RetryContext<C>[] ctxRef = new RetryContext[] {initialCtx};
997-
DependentPromise<?>[] prev = new DependentPromise[1];
966+
DependentPromise<?>[] prevRef = usePrevAsync ? new DependentPromise[1] : null;
998967

999968
return loop(null, v -> null == v || !v.isSuccess() , (Try<T> v) -> {
1000969
RetryContext<C> ctx = ctxRef[0];
@@ -1005,74 +974,54 @@ private static <T extends C, C> Promise<T> tryFutureOnce(RetryCallable<? extends
1005974

1006975
Supplier<Promise<Try<T>>> callSupplier = () -> {
1007976
long startTime = System.nanoTime();
977+
Promise<T> target = futureFactory.apply(ctx);
978+
979+
DependentPromise<T> withTimeout = target.dependent();
1008980

1009-
Promise<T> target;
1010-
try {
1011-
// Not sure how many time futureFactory.call will take
1012-
target = Promises.from(futureFactory.call(ctx));
1013-
} catch (Exception ex) {
1014-
target = Promises.failure(ex);
981+
Duration timeout = verdict.timeout();
982+
if (DelayPolicy.isValid(timeout)) {
983+
withTimeout = withTimeout.orTimeout(timeout, true, true);
1015984
}
1016985

1017-
DependentPromise<Try<T>> p = handleTryResult(target, ctxRef, verdict, startTime);
1018-
prev[0] = p;
1019-
return p;
1020-
};
1021-
Duration backoffDelay = verdict.backoffDelay();
1022-
if (null != prev[0] && DelayPolicy.isValid(backoffDelay)) {
1023-
DependentPromise<?> p = prev[0].delay(backoffDelay, /* Delay on error */ true, true);
1024-
p.whenComplete((__, ex) -> {
1025-
// Update ctx if timeout or cancel during back-off
1026-
// It doesn't conflict with ctx modifications in actual call
1027-
// while actual call is never happens in case of error
1028-
if (null != ex) {
986+
DependentPromise<Try<T>> result = withTimeout.handle((value, ex) -> {
987+
if (null == ex && ctx.isValidResult(value)) {
988+
return Try.success(value);
989+
} else {
990+
long finishTime = System.nanoTime();
1029991
ctxRef[0] = ctx.nextRetry(
1030-
duration(0, 0), unwrapCompletionException(ex)
992+
duration(startTime, finishTime), unwrapCompletionException(ex)
1031993
);
1032-
}
1033-
});
1034-
// async due to unknown performance characteristics of futureFactory.call
1035-
// - so switch to default executor of prev promise
1036-
return p.thenComposeAsync(__ -> callSupplier.get(), true);
994+
return Try.failure(ex != null ? ex : new IllegalAccessException("Result not accepted by policy"));
995+
}
996+
}, true);
997+
998+
if (null != prevRef) {
999+
prevRef[0] = result;
1000+
}
1001+
1002+
return result;
1003+
};
1004+
1005+
Duration backoffDelay = verdict.backoffDelay();
1006+
if (DelayPolicy.isValid(backoffDelay)) {
1007+
DependentPromise<?> delay = prevRef == null || prevRef[0] == null ?
1008+
Timeouts.delay(backoffDelay).dependent()
1009+
:
1010+
prevRef[0].delay(backoffDelay, true, false);
1011+
return delay.thenCompose(__ -> callSupplier.get(), true)
1012+
.exceptionally(ex -> {
1013+
// May be thrown when backoff delay is interrupted (canceled)
1014+
ctxRef[0] = ctxRef[0].nextRetry(duration(0, 0), ex);
1015+
return Try.failure(ex);
1016+
}, true);
10371017
} else {
1038-
// Immediately send to executor
1039-
return callSupplier.get();
1040-
}
1018+
return callSupplier.get();
1019+
}
10411020
})
10421021
.dependent()
10431022
.thenApply(Try::done, true);
10441023
// Don't unwrap - internal use only
10451024
}
1046-
1047-
private static <T extends C, C> DependentPromise<Try<T>> handleTryResult(Promise<T> origin,
1048-
RetryContext<C>[] ctxRef,
1049-
RetryPolicy.Verdict verdict, long startTime) {
1050-
DependentPromise<T> p;
1051-
Duration timeout = verdict.timeout();
1052-
if (DelayPolicy.isValid(timeout)) {
1053-
p = origin.dependent()
1054-
.orTimeout(timeout, true, true);
1055-
} else {
1056-
p = origin.dependent();
1057-
}
1058-
1059-
return p.handle((value, ex) -> {
1060-
RetryContext<C> ctx = ctxRef[0];
1061-
if (null == ex && ctx.isValidResult(value)) {
1062-
return Try.success(value);
1063-
} else {
1064-
long finishTime = System.nanoTime();
1065-
ctxRef[0] = ctx.nextRetry(
1066-
duration(startTime, finishTime), unwrapCompletionException(ex)
1067-
);
1068-
return Try.failure(
1069-
ex != null ? ex :
1070-
new IllegalAccessException("Result not accepted by policy")
1071-
);
1072-
}
1073-
}, true);
1074-
}
1075-
10761025

10771026
private static <T, U> Promise<T> transform(CompletionStage<U> original,
10781027
Function<? super U, ? extends T> resultMapper,
@@ -1122,7 +1071,9 @@ private static <T> Promise<T> insufficientNumberOfArguments(int minResultCount,
11221071
*/
11231072
}
11241073

1125-
private static <K, T> List<? extends CompletionStage<? extends T>> groupToMap(Map<K, T> result, Map<? extends K, ? extends CompletionStage<? extends T>> promises) {
1074+
private static <K, T> List<? extends CompletionStage<? extends T>>
1075+
collectKeyedResults(Map<K, T> result, Map<? extends K, ? extends CompletionStage<? extends T>> promises) {
1076+
11261077
return
11271078
promises.entrySet()
11281079
.stream()

0 commit comments

Comments
 (0)