Skip to content

Commit 51842ef

Browse files
committed
Fix regressions with Retry; refactoring Retry code
1 parent 38d24c9 commit 51842ef

File tree

2 files changed

+67
-61
lines changed

2 files changed

+67
-61
lines changed

src/main/java/net/tascalate/concurrent/Promises.java

Lines changed: 66 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -950,7 +950,7 @@ private static <T extends C, C> Promise<T> tryValueOnce(RetryCallable<T, C> code
950950

951951
@SuppressWarnings("unchecked")
952952
RetryContext<C>[] ctxRef = new RetryContext[] {initialCtx};
953-
return loop(null, v -> null == v || !v.isSuccess(), v -> {
953+
return loop(null, v -> null == v || !v.isSuccess(), (Try<T> v) -> {
954954
RetryContext<C> ctx = ctxRef[0];
955955
RetryPolicy.Verdict verdict = ctx.shouldContinue();
956956
if (!verdict.shouldExecute()) {
@@ -961,50 +961,42 @@ private static <T extends C, C> Promise<T> tryValueOnce(RetryCallable<T, C> code
961961
Supplier<Promise<Try<T>>> callSupplier = () -> {
962962
long startTime = System.nanoTime();
963963
// Call should be done via CompletableTask to let it be interruptible
964-
Promise<Try<T>> p = CompletableTask.supplyAsync(() -> {
965-
try {
966-
T value = codeBlock.call(ctx);
967-
if (ctx.isValidResult(value)) {
968-
return Try.success(value);
969-
} else {
970-
long finishTime = System.nanoTime();
971-
ctxRef[0] = ctx.nextRetry(duration(startTime, finishTime), value);
972-
return Try.failure(new IllegalAccessException("Result not accepted by policy"));
973-
}
974-
} catch (Exception ex) {
975-
long finishTime = System.nanoTime();
976-
ctxRef[0] = ctx.nextRetry(duration(startTime, finishTime), ex);
977-
return Try.failure(ex);
978-
}
979-
}, executor);
980-
return applyExecutionTimeout(p, verdict);
964+
Promise<T> p = CompletableTask.submit(() -> codeBlock.call(ctx), executor);
965+
return handleTryResult(p, ctxRef, verdict, startTime);
981966
};
982967

983968
Duration backoffDelay = verdict.backoffDelay();
984969
if (DelayPolicy.isValid(backoffDelay)) {
985970
// Invocation after timeout, change cancellation target
986-
return Timeouts
987-
.delay(backoffDelay)
988-
.dependent()
989-
.thenCompose(__ -> callSupplier.get(), true);
971+
DependentPromise<?> p = Timeouts.delay(backoffDelay).dependent();
972+
p.whenComplete((__, ex) -> {
973+
// Update ctx if timeout or cancel during back-off
974+
// It doesn't conflict with ctx modifications in actual call
975+
// while actual call is never happens in case of error
976+
if (null != ex) {
977+
ctxRef[0] = ctx.nextRetry(
978+
duration(0, 0), unwrapCompletionException(ex)
979+
);
980+
}
981+
});
982+
return p.thenCompose(__ -> callSupplier.get(), true);
990983
} else {
991984
return callSupplier.get();
992985
}
993986
})
994987
.dependent()
995-
.thenApply(Try::done, true)
996-
.unwrap();
997-
988+
.thenApply(Try::done, true);
989+
// Don't unwrap - internal use only
998990
}
999-
991+
1000992
private static <T extends C, C> Promise<T> tryFutureOnce(RetryCallable<? extends CompletionStage<T>, C> futureFactory,
1001993
RetryContext<C> initialCtx) {
1002994

1003995
@SuppressWarnings("unchecked")
1004996
RetryContext<C>[] ctxRef = new RetryContext[] {initialCtx};
1005-
Promise<?>[] prev = new Promise[1];
997+
DependentPromise<?>[] prev = new DependentPromise[1];
1006998

1007-
return loop(null, v -> null == v || !v.isSuccess() , v -> {
999+
return loop(null, v -> null == v || !v.isSuccess() , (Try<T> v) -> {
10081000
RetryContext<C> ctx = ctxRef[0];
10091001
RetryPolicy.Verdict verdict = ctx.shouldContinue();
10101002
if (!verdict.shouldExecute()) {
@@ -1022,51 +1014,65 @@ private static <T extends C, C> Promise<T> tryFutureOnce(RetryCallable<? extends
10221014
target = Promises.failure(ex);
10231015
}
10241016

1025-
prev[0] = target;
1026-
1027-
Promise<Try<T>> p =
1028-
target
1029-
.dependent()
1030-
.handle((value, ex) -> {
1031-
if (null == ex && ctx.isValidResult(value)) {
1032-
return Try.success(value);
1033-
} else {
1034-
long finishTime = System.nanoTime();
1035-
ctxRef[0] = ctx.nextRetry(
1036-
duration(startTime, finishTime), SharedFunctions.unwrapCompletionException(ex)
1037-
);
1038-
return Try.failure(
1039-
ex != null ? ex :
1040-
new IllegalAccessException("Result not accepted by policy")
1041-
);
1042-
}
1043-
}, true);
1044-
return applyExecutionTimeout(p, verdict);
1017+
DependentPromise<Try<T>> p = handleTryResult(target, ctxRef, verdict, startTime);
1018+
prev[0] = p;
1019+
return p;
10451020
};
10461021
Duration backoffDelay = verdict.backoffDelay();
10471022
if (null != prev[0] && DelayPolicy.isValid(backoffDelay)) {
1048-
return prev[0].delay(backoffDelay, /* Delay on error */ true)
1049-
.dependent()
1050-
// async due to unknown performance characteristics of futureFactory.call
1051-
// - so switch to default executor of prev promise
1052-
.thenComposeAsync(__ -> callSupplier.get(), true);
1023+
DependentPromise<?> p = prev[0].delay(backoffDelay, /* Delay on error */ true, true);
1024+
p.whenComplete((__, ex) -> {
1025+
// Update ctx if timeout or cancel during back-off
1026+
// It doesn't conflict with ctx modifications in actual call
1027+
// while actual call is never happens in case of error
1028+
if (null != ex) {
1029+
ctxRef[0] = ctx.nextRetry(
1030+
duration(0, 0), unwrapCompletionException(ex)
1031+
);
1032+
}
1033+
});
1034+
// async due to unknown performance characteristics of futureFactory.call
1035+
// - so switch to default executor of prev promise
1036+
return p.thenComposeAsync(__ -> callSupplier.get(), true);
10531037
} else {
10541038
// Immediately send to executor
10551039
return callSupplier.get();
1056-
}
1040+
}
10571041
})
10581042
.dependent()
1059-
.thenApply(Try::done, true)
1060-
.unwrap();
1043+
.thenApply(Try::done, true);
1044+
// Don't unwrap - internal use only
10611045
}
10621046

1063-
private static <T> Promise<T> applyExecutionTimeout(Promise<T> singleInvocationPromise, RetryPolicy.Verdict verdict) {
1047+
private static <T extends C, C> DependentPromise<Try<T>> handleTryResult(Promise<T> origin,
1048+
RetryContext<C>[] ctxRef,
1049+
RetryPolicy.Verdict verdict, long startTime) {
1050+
DependentPromise<T> p;
10641051
Duration timeout = verdict.timeout();
10651052
if (DelayPolicy.isValid(timeout)) {
1066-
singleInvocationPromise.dependent().orTimeout( timeout, true, true ).unwrap();
1053+
p = origin.dependent()
1054+
.orTimeout(timeout, true, true);
1055+
} else {
1056+
p = origin.dependent();
10671057
}
1068-
return singleInvocationPromise;
1058+
1059+
return p.handle((value, ex) -> {
1060+
RetryContext<C> ctx = ctxRef[0];
1061+
if (null == ex && ctx.isValidResult(value)) {
1062+
return Try.success(value);
1063+
} else {
1064+
long finishTime = System.nanoTime();
1065+
ctxRef[0] = ctx.nextRetry(
1066+
duration(startTime, finishTime), unwrapCompletionException(ex)
1067+
);
1068+
return Try.failure(
1069+
ex != null ? ex :
1070+
new IllegalAccessException("Result not accepted by policy")
1071+
);
1072+
}
1073+
}, true);
10691074
}
1075+
10701076

10711077
private static <T, U> Promise<T> transform(CompletionStage<U> original,
10721078
Function<? super U, ? extends T> resultMapper,
@@ -1087,7 +1093,7 @@ private static <T> T extractFirstNonNull(Collection<? extends T> collection) {
10871093
}
10881094

10891095
private static <E extends Throwable> Throwable unwrapMultitargetException(E exception) {
1090-
Throwable targetException = SharedFunctions.unwrapCompletionException(exception);
1096+
Throwable targetException = unwrapCompletionException(exception);
10911097
if (targetException instanceof MultitargetException) {
10921098
return ((MultitargetException)targetException).getFirstException().get();
10931099
} else {

src/test/java/net/tascalate/concurrent/J8Examples.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ static String produceStringSlow(String suffix) {
301301
}
302302

303303
private static String pollingMethod(RetryContext<String> ctx) throws InterruptedException {
304-
System.out.println("Polling method, #" + ctx.getRetryCount());
304+
System.out.println("Enter Polling method #" + ctx.getRetryCount());
305305
try {
306306
if (ctx.getRetryCount() < 5) {
307307
Thread.sleep((5 - ctx.getRetryCount()) * 1000);

0 commit comments

Comments
 (0)