Skip to content

Commit 1cc0ebe

Browse files
committed
The Retry API cleanup
* Improve tests performance using `delay(Duration.ZERO)` where we are not interested in the delay between attempts * Make `StatefulRetryOperationsInterceptor` to use a `LinkedHashMap` with `removeEldestEntry` removal. Essentially, keeping the cache clean from those stale entries.
1 parent 78d94ff commit 1cc0ebe

File tree

7 files changed

+148
-125
lines changed

7 files changed

+148
-125
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/StatefulRetryOperationsInterceptor.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616

1717
package org.springframework.amqp.rabbit.config;
1818

19+
import java.time.Duration;
1920
import java.util.Arrays;
21+
import java.util.Collections;
22+
import java.util.LinkedHashMap;
2023
import java.util.List;
2124
import java.util.Map;
22-
import java.util.concurrent.ConcurrentHashMap;
2325

2426
import org.aopalliance.intercept.MethodInterceptor;
2527
import org.aopalliance.intercept.MethodInvocation;
@@ -42,6 +44,7 @@
4244
*
4345
* @author Juergen Hoeller
4446
* @author Stephane Nicoll
47+
* @author Artem Bilan
4548
*
4649
* @since 4.0
4750
*/
@@ -57,15 +60,26 @@ public final class StatefulRetryOperationsInterceptor implements MethodIntercept
5760

5861
private final @Nullable MessageRecoverer messageRecoverer;
5962

60-
private final Map<Object, RetryState> cache = new ConcurrentHashMap<>();
63+
private final int stateCacheSize;
64+
65+
private final Map<Object, RetryState> retryStateCache =
66+
Collections.synchronizedMap(new LinkedHashMap<>(100, 0.75f, true) {
67+
68+
@Override
69+
protected boolean removeEldestEntry(Map.Entry<Object, RetryState> eldest) {
70+
return size() > StatefulRetryOperationsInterceptor.this.stateCacheSize;
71+
}
72+
73+
});
6174

6275
StatefulRetryOperationsInterceptor(MessageKeyGenerator messageKeyGenerator,
6376
NewMessageIdentifier newMessageIdentifier, @Nullable RetryPolicy retryPolicy,
64-
@Nullable MessageRecoverer messageRecoverer) {
77+
@Nullable MessageRecoverer messageRecoverer, @Nullable Integer stateCacheSize) {
6578

6679
this.messageKeyGenerator = messageKeyGenerator;
6780
this.newMessageIdentifier = newMessageIdentifier;
68-
this.retryPolicy = (retryPolicy != null) ? retryPolicy : RetryPolicy.builder().build();
81+
this.retryPolicy = retryPolicy != null ? retryPolicy : RetryPolicy.builder().delay(Duration.ZERO).build();
82+
this.stateCacheSize = stateCacheSize != null ? stateCacheSize : 1000;
6983
this.messageRecoverer = messageRecoverer;
7084
}
7185

@@ -77,20 +91,20 @@ public final class StatefulRetryOperationsInterceptor implements MethodIntercept
7791
if (key == null) { // Means no retry for this message anymore.
7892
return invocation.proceed();
7993
}
80-
RetryState retryState = this.cache.get(key);
94+
RetryState retryState = this.retryStateCache.get(key);
8195
if (retryState == null || this.newMessageIdentifier.isNew(message)) {
8296
try {
8397
return invocation.proceed();
8498
}
8599
catch (Throwable ex) {
86-
this.cache.put(key, new RetryState(this.retryPolicy.getBackOff().start(), ex));
100+
this.retryStateCache.put(key, new RetryState(this.retryPolicy.getBackOff().start(), ex));
87101
throw ex;
88102
}
89103
}
90104
else {
91105
long time = retryState.backOffExecution().nextBackOff();
92106
if (time == BackOffExecution.STOP || !this.retryPolicy.shouldRetry(retryState.lastException())) {
93-
this.cache.remove(key);
107+
this.retryStateCache.remove(key);
94108
recover(args[1], retryState.lastException());
95109
// This is actually a normal outcome. It means the recovery was successful, but we don't want to consume
96110
// any more messages until the acks and commits are sent for this (problematic) message...
@@ -101,11 +115,11 @@ public final class StatefulRetryOperationsInterceptor implements MethodIntercept
101115
Thread.sleep(time);
102116
try {
103117
Object result = invocation.proceed();
104-
this.cache.remove(key);
118+
this.retryStateCache.remove(key);
105119
return result;
106120
}
107121
catch (Throwable ex) {
108-
this.cache.put(key, new RetryState(retryState.backOffExecution(), ex));
122+
this.retryStateCache.put(key, new RetryState(retryState.backOffExecution(), ex));
109123
throw ex;
110124
}
111125
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/StatefulRetryOperationsInterceptorFactoryBean.java

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,17 @@
1818

1919
import org.jspecify.annotations.Nullable;
2020

21+
import org.springframework.amqp.core.MessageProperties;
2122
import org.springframework.amqp.rabbit.retry.MessageKeyGenerator;
2223
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
2324
import org.springframework.amqp.rabbit.retry.NewMessageIdentifier;
2425
import org.springframework.core.retry.RetryPolicy;
2526

2627
/**
2728
* Convenient factory bean for creating a stateful retry interceptor for use in a message listener container, giving you
28-
* a large amount of control over the behaviour of a container when a listener fails. To control the number of retry
29+
* a large amount of control over the behavior of a container when a listener fails. To control the number of retry
2930
* attempts or the backoff in between attempts, supply a customized {@link RetryPolicy}. Stateful retry is appropriate
30-
* if your listener is using a transactional resource that needs to be rolled back on an exception (e.g. a stateful
31+
* if your listener is using a transactional resource that needs to be rolled back on an exception (e.g., a stateful
3132
* connection to a back end server). JPA is the canonical example. The semantics of stateful retry mean that a listener
3233
* exception is propagated to the container so that it can force a rollback. When the message is redelivered it has to
3334
* be recognised (hence the {@link MessageKeyGenerator} strategy), and when the retry attempts are exhausted it will be
@@ -44,9 +45,20 @@
4445
*/
4546
public class StatefulRetryOperationsInterceptorFactoryBean extends AbstractRetryOperationsInterceptorFactoryBean {
4647

47-
private @Nullable MessageKeyGenerator messageKeyGenerator;
48+
private MessageKeyGenerator messageKeyGenerator =
49+
(message) -> {
50+
MessageProperties messageProperties = message.getMessageProperties();
51+
String messageId = messageProperties.getMessageId();
52+
if (messageId == null && Boolean.TRUE.equals(messageProperties.isRedelivered())) {
53+
messageProperties.setFinalRetryForMessageWithNoId(true);
54+
}
55+
return messageId;
56+
};
4857

49-
private @Nullable NewMessageIdentifier newMessageIdentifier;
58+
private NewMessageIdentifier newMessageIdentifier =
59+
(message) -> Boolean.FALSE.equals(message.getMessageProperties().isRedelivered());
60+
61+
private @Nullable Integer stateCacheSize;
5062

5163
public void setMessageKeyGenerator(MessageKeyGenerator messageKeyGenerator) {
5264
this.messageKeyGenerator = messageKeyGenerator;
@@ -56,30 +68,14 @@ public void setNewMessageIdentifier(NewMessageIdentifier newMessageIdentifier) {
5668
this.newMessageIdentifier = newMessageIdentifier;
5769
}
5870

59-
@Override
60-
public StatefulRetryOperationsInterceptor getObject() {
61-
return new StatefulRetryOperationsInterceptor(getMessageKeyGenerator(),
62-
getNewMessageIdentifier(), getRetryPolicy(), getMessageRecoverer());
71+
public void setStateCacheSize(int stateCacheSize) {
72+
this.stateCacheSize = stateCacheSize;
6373
}
6474

65-
private NewMessageIdentifier getNewMessageIdentifier() {
66-
if (this.newMessageIdentifier != null) {
67-
return this.newMessageIdentifier;
68-
}
69-
return (message) -> Boolean.FALSE.equals(message.getMessageProperties().isRedelivered());
70-
}
71-
72-
private MessageKeyGenerator getMessageKeyGenerator() {
73-
if (this.messageKeyGenerator != null) {
74-
return this.messageKeyGenerator;
75-
}
76-
return (message) -> {
77-
String messageId = message.getMessageProperties().getMessageId();
78-
if (messageId == null && Boolean.TRUE.equals(message.getMessageProperties().isRedelivered())) {
79-
message.getMessageProperties().setFinalRetryForMessageWithNoId(true);
80-
}
81-
return messageId;
82-
};
75+
@Override
76+
public StatefulRetryOperationsInterceptor getObject() {
77+
return new StatefulRetryOperationsInterceptor(this.messageKeyGenerator,
78+
this.newMessageIdentifier, getRetryPolicy(), getMessageRecoverer(), this.stateCacheSize);
8379
}
8480

8581
@Override

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/StatelessRetryOperationsInterceptor.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.amqp.rabbit.config;
1818

19+
import java.time.Duration;
1920
import java.util.function.BiFunction;
2021

2122
import org.aopalliance.intercept.MethodInterceptor;
@@ -33,6 +34,7 @@
3334
*
3435
* @author Juergen Hoeller
3536
* @author Stephane Nicoll
37+
* @author Artem Bilan
3638
*
3739
* @since 4.0
3840
*/
@@ -44,7 +46,8 @@ public final class StatelessRetryOperationsInterceptor implements MethodIntercep
4446

4547
StatelessRetryOperationsInterceptor(@Nullable RetryPolicy retryPolicy,
4648
@Nullable BiFunction<@Nullable Object[], Throwable, @Nullable Object> recoverer) {
47-
this.retryOperations = new RetryTemplate((retryPolicy != null) ? retryPolicy : RetryPolicy.builder().build());
49+
this.retryOperations =
50+
new RetryTemplate(retryPolicy != null ? retryPolicy : RetryPolicy.builder().delay(Duration.ZERO).build());
4851
this.recoverer = recoverer;
4952
}
5053

0 commit comments

Comments
 (0)