Skip to content

Commit dd6afa8

Browse files
garyrussellartembilan
authored andcommitted
Use RetrySynchronizationManager
Use the `RetrySynchronizationManager` instead of a `RetryListener`. Fix `setAttributesIfNecessary` for gateway conversion errors (this, at least, should be cherry-picked).
1 parent 5852583 commit dd6afa8

File tree

3 files changed

+17
-33
lines changed

3 files changed

+17
-33
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,7 @@
3939
import org.springframework.integration.support.ErrorMessageStrategy;
4040
import org.springframework.integration.support.ErrorMessageUtils;
4141
import org.springframework.retry.RecoveryCallback;
42-
import org.springframework.retry.RetryCallback;
43-
import org.springframework.retry.RetryContext;
44-
import org.springframework.retry.RetryListener;
42+
import org.springframework.retry.support.RetrySynchronizationManager;
4543
import org.springframework.retry.support.RetryTemplate;
4644
import org.springframework.util.Assert;
4745

@@ -132,9 +130,6 @@ protected void onInit() {
132130
+ "send an error message when retries are exhausted");
133131
}
134132
Listener messageListener = new Listener();
135-
if (this.retryTemplate != null) {
136-
this.retryTemplate.registerListener(messageListener);
137-
}
138133
this.messageListenerContainer.setMessageListener(messageListener);
139134
this.messageListenerContainer.afterPropertiesSet();
140135
super.onInit();
@@ -177,7 +172,9 @@ private void setAttributesIfNecessary(Message amqpMessage, org.springframework.m
177172
attributesHolder.set(ErrorMessageUtils.getAttributeAccessor(null, null));
178173
}
179174
if (needAttributes) {
180-
AttributeAccessor attributes = attributesHolder.get();
175+
AttributeAccessor attributes = this.retryTemplate != null
176+
? RetrySynchronizationManager.getContext()
177+
: attributesHolder.get();
181178
if (attributes != null) {
182179
attributes.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, message);
183180
attributes.setAttribute(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE, amqpMessage);
@@ -196,7 +193,7 @@ protected AttributeAccessor getErrorMessageAttributes(org.springframework.messag
196193
}
197194
}
198195

199-
protected class Listener implements ChannelAwareMessageListener, RetryListener {
196+
protected class Listener implements ChannelAwareMessageListener {
200197

201198
@SuppressWarnings("unchecked")
202199
@Override
@@ -259,26 +256,6 @@ private org.springframework.messaging.Message<Object> createMessage(Message mess
259256
return messagingMessage;
260257
}
261258

262-
@Override
263-
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
264-
if (AmqpInboundChannelAdapter.this.recoveryCallback != null) {
265-
attributesHolder.set(context);
266-
}
267-
return true;
268-
}
269-
270-
@Override
271-
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
272-
Throwable throwable) {
273-
attributesHolder.remove();
274-
}
275-
276-
@Override
277-
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
278-
Throwable throwable) {
279-
// Empty
280-
}
281-
282259
}
283260

284261
}

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundGateway.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,7 @@ private org.springframework.messaging.Message<Object> convert(Message message, C
296296
}
297297
catch (RuntimeException e) {
298298
if (getErrorChannel() != null) {
299+
setAttributesIfNecessary(message, null);
299300
AmqpInboundGateway.this.messagingTemplate.send(getErrorChannel(), buildErrorMessage(null,
300301
new ListenerExecutionFailedException("Message conversion failed", e, message)));
301302
}

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -248,9 +248,12 @@ public Object fromMessage(org.springframework.amqp.core.Message message) throws
248248

249249
});
250250
adapter.afterPropertiesSet();
251-
((ChannelAwareMessageListener) container.getMessageListener()).onMessage(null, null);
251+
((ChannelAwareMessageListener) container.getMessageListener())
252+
.onMessage(mock(org.springframework.amqp.core.Message.class), null);
252253
assertNull(outputChannel.receive(0));
253-
assertNotNull(errorChannel.receive(0));
254+
Message<?> received = errorChannel.receive(0);
255+
assertNotNull(received);
256+
assertNotNull(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE));
254257
}
255258

256259
@Test
@@ -276,14 +279,17 @@ public org.springframework.amqp.core.Message toMessage(Object object, MessagePro
276279

277280
@Override
278281
public Object fromMessage(org.springframework.amqp.core.Message message) throws MessageConversionException {
279-
return null;
282+
throw new MessageConversionException("intended");
280283
}
281284

282285
});
283286
adapter.afterPropertiesSet();
284-
((ChannelAwareMessageListener) container.getMessageListener()).onMessage(null, null);
287+
((ChannelAwareMessageListener) container.getMessageListener())
288+
.onMessage(mock(org.springframework.amqp.core.Message.class), null);
285289
assertNull(outputChannel.receive(0));
286-
assertNotNull(errorChannel.receive(0));
290+
Message<?> received = errorChannel.receive(0);
291+
assertNotNull(received);
292+
assertNotNull(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE));
287293
}
288294

289295
@Test

0 commit comments

Comments
 (0)