Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ public static Object adaptReactiveResult(

Publisher<?> publisher = adapter.toPublisher(result);
Retry retry = Retry.backoff(spec.maxAttempts(), spec.delay())
.jitter((double) spec.jitter().toMillis() / spec.delay().toMillis())
.jitter(
spec.delay().isZero() ? 0.0 :
Math.max(0.0, Math.min(1.0, spec.jitter().toNanos() / (double) spec.delay().toNanos()))
)
.multiplier(spec.multiplier())
.maxBackoff(spec.maxDelay())
.filter(spec.combinedPredicate().forMethod(method));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,97 @@ void withPostProcessorForClass() {
assertThat(target.counter.get()).isEqualTo(6);
}

@Test
void adaptReactiveResultWithMinimalRetrySpec() {
// Test minimal retry configuration: maxAttempts=1, delay=0, jitter=0, multiplier=1.0, maxDelay=0
MinimalRetryBean target = new MinimalRetryBean();
ProxyFactory pf = new ProxyFactory();
pf.setTarget(target);
pf.addAdvice(new SimpleRetryInterceptor(
new MethodRetrySpec((m, t) -> true, 1, Duration.ZERO, Duration.ZERO, 1.0, Duration.ZERO)));
MinimalRetryBean proxy = (MinimalRetryBean) pf.getProxy();

// Should execute only 2 times, because maxAttempts=1 means 1 call + 1 retry
assertThatIllegalStateException().isThrownBy(() -> proxy.retryOperation().block())
.withCauseInstanceOf(IOException.class).havingCause().withMessage("2");
assertThat(target.counter.get()).isEqualTo(2);
}

@Test
void adaptReactiveResultWithZeroDelayAndJitter() {
// Test case where delay=0 and jitter>0
ZeroDelayJitterBean target = new ZeroDelayJitterBean();
ProxyFactory pf = new ProxyFactory();
pf.setTarget(target);
pf.addAdvice(new SimpleRetryInterceptor(
new MethodRetrySpec((m, t) -> true, 3, Duration.ZERO, Duration.ofMillis(10), 2.0, Duration.ofMillis(100))));
ZeroDelayJitterBean proxy = (ZeroDelayJitterBean) pf.getProxy();

assertThatIllegalStateException().isThrownBy(() -> proxy.retryOperation().block())
.withCauseInstanceOf(IOException.class).havingCause().withMessage("4");
assertThat(target.counter.get()).isEqualTo(4);
}

@Test
void adaptReactiveResultWithJitterGreaterThanDelay() {
// Test case where jitter > delay
JitterGreaterThanDelayBean target = new JitterGreaterThanDelayBean();
ProxyFactory pf = new ProxyFactory();
pf.setTarget(target);
pf.addAdvice(new SimpleRetryInterceptor(
new MethodRetrySpec((m, t) -> true, 3, Duration.ofMillis(5), Duration.ofMillis(20), 1.5, Duration.ofMillis(50))));
JitterGreaterThanDelayBean proxy = (JitterGreaterThanDelayBean) pf.getProxy();

assertThatIllegalStateException().isThrownBy(() -> proxy.retryOperation().block())
.withCauseInstanceOf(IOException.class).havingCause().withMessage("4");
assertThat(target.counter.get()).isEqualTo(4);
}

@Test
void adaptReactiveResultWithFluxMultiValue() {
// Test Flux multi-value stream case
FluxMultiValueBean target = new FluxMultiValueBean();
ProxyFactory pf = new ProxyFactory();
pf.setTarget(target);
pf.addAdvice(new SimpleRetryInterceptor(
new MethodRetrySpec((m, t) -> true, 3, Duration.ofMillis(10), Duration.ofMillis(5), 2.0, Duration.ofMillis(100))));
FluxMultiValueBean proxy = (FluxMultiValueBean) pf.getProxy();

assertThatIllegalStateException().isThrownBy(() -> proxy.retryOperation().blockFirst())
.withCauseInstanceOf(IOException.class).havingCause().withMessage("4");
assertThat(target.counter.get()).isEqualTo(4);
}

@Test
void adaptReactiveResultWithSuccessfulOperation() {
// Test successful return case, ensuring retry mechanism doesn't activate
SuccessfulOperationBean target = new SuccessfulOperationBean();
ProxyFactory pf = new ProxyFactory();
pf.setTarget(target);
pf.addAdvice(new SimpleRetryInterceptor(
new MethodRetrySpec((m, t) -> true, 5, Duration.ofMillis(10), Duration.ofMillis(5), 2.0, Duration.ofMillis(100))));
SuccessfulOperationBean proxy = (SuccessfulOperationBean) pf.getProxy();

String result = proxy.retryOperation().block();
assertThat(result).isEqualTo("success");
// Should execute only once because of successful return
assertThat(target.counter.get()).isEqualTo(1);
}

@Test
void adaptReactiveResultWithImmediateFailure() {
// Test immediate failure case
ImmediateFailureBean target = new ImmediateFailureBean();
ProxyFactory pf = new ProxyFactory();
pf.setTarget(target);
pf.addAdvice(new SimpleRetryInterceptor(
new MethodRetrySpec((m, t) -> true, 3, Duration.ofMillis(10), Duration.ofMillis(5), 1.5, Duration.ofMillis(50))));
ImmediateFailureBean proxy = (ImmediateFailureBean) pf.getProxy();

assertThatIllegalStateException().isThrownBy(() -> proxy.retryOperation().block())
.withCauseInstanceOf(RuntimeException.class).havingCause().withMessage("immediate failure");
assertThat(target.counter.get()).isEqualTo(4);
}

public static class NonAnnotatedBean {

Expand Down Expand Up @@ -193,4 +284,71 @@ public boolean shouldRetry(Method method, Throwable throwable) {
}
}

// Bean classes for boundary testing
public static class MinimalRetryBean {
AtomicInteger counter = new AtomicInteger();

public Mono<Object> retryOperation() {
return Mono.fromCallable(() -> {
counter.incrementAndGet();
throw new IOException(counter.toString());
});
}
}

public static class ZeroDelayJitterBean {
AtomicInteger counter = new AtomicInteger();

public Mono<Object> retryOperation() {
return Mono.fromCallable(() -> {
counter.incrementAndGet();
throw new IOException(counter.toString());
});
}
}

public static class JitterGreaterThanDelayBean {
AtomicInteger counter = new AtomicInteger();

public Mono<Object> retryOperation() {
return Mono.fromCallable(() -> {
counter.incrementAndGet();
throw new IOException(counter.toString());
});
}
}

public static class FluxMultiValueBean {
AtomicInteger counter = new AtomicInteger();

public Flux<Object> retryOperation() {
return Flux.from(Mono.fromCallable(() -> {
counter.incrementAndGet();
throw new IOException(counter.toString());
}));
}
}

public static class SuccessfulOperationBean {
AtomicInteger counter = new AtomicInteger();

public Mono<String> retryOperation() {
return Mono.fromCallable(() -> {
counter.incrementAndGet();
return "success";
});
}
}

public static class ImmediateFailureBean {
AtomicInteger counter = new AtomicInteger();

public Mono<Object> retryOperation() {
return Mono.fromCallable(() -> {
counter.incrementAndGet();
throw new RuntimeException("immediate failure");
});
}
}

}
Loading