Skip to content

Commit c003a47

Browse files
GH-3437: Add MessageRecoverer to AMQP Inbounds (#3465)
* GH-3437: Add `MessageRecoverer` to AMQP Inbounds Fixes #3437 For better end-user experience with AMQP Inbound Endpoints, expose a `MessageRecoverer` option * Test the feature and document this new option * * Fix Checkstyle & update Copyright * Fix language in Docs Co-authored-by: Gary Russell <[email protected]> Co-authored-by: Gary Russell <[email protected]>
1 parent 5c73843 commit c003a47

File tree

7 files changed

+271
-16
lines changed

7 files changed

+271
-16
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpBaseInboundChannelAdapterSpec.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.amqp.dsl;
1818

19+
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
1920
import org.springframework.amqp.support.converter.MessageConverter;
2021
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
2122
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
@@ -102,4 +103,16 @@ public S recoveryCallback(RecoveryCallback<?> recoveryCallback) {
102103
return _this();
103104
}
104105

106+
/**
107+
* Set a {@link MessageRecoverer} when using retry within the adapter.
108+
* @param messageRecoverer the callback.
109+
* @return the spec.
110+
* @since 5.5
111+
* @see AmqpInboundChannelAdapter#setMessageRecoverer(MessageRecoverer)
112+
*/
113+
public S messageRecoverer(MessageRecoverer messageRecoverer) {
114+
this.target.setMessageRecoverer(messageRecoverer);
115+
return _this();
116+
}
117+
105118
}

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpBaseInboundGatewaySpec.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
package org.springframework.integration.amqp.dsl;
1818

1919
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
20+
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
2021
import org.springframework.amqp.support.converter.MessageConverter;
22+
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
2123
import org.springframework.integration.amqp.inbound.AmqpInboundGateway;
2224
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
2325
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
@@ -176,4 +178,16 @@ public S replyHeadersMappedLast(boolean replyHeadersMappedLast) {
176178
return _this();
177179
}
178180

181+
/**
182+
* Set a {@link MessageRecoverer} when using retry within the adapter.
183+
* @param messageRecoverer the callback.
184+
* @return the spec.
185+
* @since 5.5
186+
* @see AmqpInboundChannelAdapter#setMessageRecoverer(MessageRecoverer)
187+
*/
188+
public S messageRecoverer(MessageRecoverer messageRecoverer) {
189+
this.target.setMessageRecoverer(messageRecoverer);
190+
return _this();
191+
}
192+
179193
}

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,6 +29,8 @@
2929
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
3030
import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener;
3131
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
32+
import org.springframework.amqp.rabbit.retry.MessageBatchRecoverer;
33+
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
3234
import org.springframework.amqp.support.AmqpHeaders;
3335
import org.springframework.amqp.support.converter.MessageConversionException;
3436
import org.springframework.amqp.support.converter.MessageConverter;
@@ -110,6 +112,8 @@ public enum BatchMode {
110112

111113
private RecoveryCallback<?> recoveryCallback;
112114

115+
private MessageRecoverer messageRecoverer;
116+
113117
private BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(0, 0, 0L);
114118

115119
private boolean bindSourceMessage;
@@ -154,6 +158,7 @@ public void setRetryTemplate(RetryTemplate retryTemplate) {
154158

155159
/**
156160
* Set a {@link RecoveryCallback} when using retry within the adapter.
161+
* Mutually exclusive with {@link #setMessageRecoverer(MessageRecoverer)}.
157162
* @param recoveryCallback the callback.
158163
* @since 4.3.10
159164
* @see #setRetryTemplate(RetryTemplate)
@@ -162,6 +167,16 @@ public void setRecoveryCallback(RecoveryCallback<?> recoveryCallback) {
162167
this.recoveryCallback = recoveryCallback;
163168
}
164169

170+
/**
171+
* Configure a {@link MessageRecoverer} for retry operations.
172+
* A more AMQP-specific convenience instead of {@link #setRecoveryCallback(RecoveryCallback)}.
173+
* @param messageRecoverer the {@link MessageRecoverer} to use.
174+
* @since 5.5
175+
*/
176+
public void setMessageRecoverer(MessageRecoverer messageRecoverer) {
177+
this.messageRecoverer = messageRecoverer;
178+
}
179+
165180
/**
166181
* Set a batching strategy to use when de-batching messages created by a batching
167182
* producer (such as the BatchingRabbitTemplate).
@@ -206,6 +221,7 @@ protected void onInit() {
206221
Assert.state(getErrorChannel() == null, "Cannot have an 'errorChannel' property when a 'RetryTemplate' is "
207222
+ "provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to "
208223
+ "send an error message when retries are exhausted");
224+
setupRecoveryCallbackIfAny();
209225
}
210226
Listener messageListener;
211227
if (this.messageListenerContainer.isConsumerBatchEnabled()) {
@@ -219,6 +235,38 @@ protected void onInit() {
219235
super.onInit();
220236
}
221237

238+
private void setupRecoveryCallbackIfAny() {
239+
Assert.state(this.recoveryCallback == null || this.messageRecoverer == null,
240+
"Only one of 'recoveryCallback' or 'messageRecoverer' may be provided, but not both");
241+
if (this.messageRecoverer != null) {
242+
if (this.messageListenerContainer.isConsumerBatchEnabled()) {
243+
Assert.isInstanceOf(MessageBatchRecoverer.class, this.messageRecoverer,
244+
"The 'messageRecoverer' must be an instance of MessageBatchRecoverer " +
245+
"when consumer configured for batch mode");
246+
this.recoveryCallback =
247+
context -> {
248+
@SuppressWarnings("unchecked")
249+
List<Message> messagesToRecover =
250+
(List<Message>) RetrySynchronizationManager.getContext()
251+
.getAttribute(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE);
252+
((MessageBatchRecoverer) this.messageRecoverer).recover(messagesToRecover,
253+
context.getLastThrowable());
254+
return null;
255+
};
256+
}
257+
else {
258+
this.recoveryCallback =
259+
context -> {
260+
Message messageToRecover =
261+
(Message) RetrySynchronizationManager.getContext()
262+
.getAttribute(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE);
263+
this.messageRecoverer.recover(messageToRecover, context.getLastThrowable());
264+
return null;
265+
};
266+
}
267+
}
268+
}
269+
222270
@Override
223271
protected void doStart() {
224272
this.messageListenerContainer.start();

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundGateway.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -30,6 +30,7 @@
3030
import org.springframework.amqp.rabbit.core.RabbitTemplate;
3131
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
3232
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
33+
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
3334
import org.springframework.amqp.support.AmqpHeaders;
3435
import org.springframework.amqp.support.converter.MessageConverter;
3536
import org.springframework.amqp.support.converter.SimpleMessageConverter;
@@ -83,7 +84,9 @@ public class AmqpInboundGateway extends MessagingGatewaySupport {
8384

8485
private RetryTemplate retryTemplate;
8586

86-
private RecoveryCallback<? extends Object> recoveryCallback;
87+
private RecoveryCallback<?> recoveryCallback;
88+
89+
private MessageRecoverer messageRecoverer;
8790

8891
private BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(0, 0, 0L);
8992

@@ -181,6 +184,7 @@ public void setRetryTemplate(RetryTemplate retryTemplate) {
181184

182185
/**
183186
* Set a {@link RecoveryCallback} when using retry within the gateway.
187+
* Mutually exclusive with {@link #setMessageRecoverer(MessageRecoverer)}.
184188
* @param recoveryCallback the callback.
185189
* @since 4.3.10
186190
* @see #setRetryTemplate(RetryTemplate)
@@ -189,6 +193,16 @@ public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallb
189193
this.recoveryCallback = recoveryCallback;
190194
}
191195

196+
/**
197+
* Configure a {@link MessageRecoverer} for retry operations.
198+
* A more AMQP-specific convenience instead of {@link #setRecoveryCallback(RecoveryCallback)}.
199+
* @param messageRecoverer the {@link MessageRecoverer} to use.
200+
* @since 5.5
201+
*/
202+
public void setMessageRecoverer(MessageRecoverer messageRecoverer) {
203+
this.messageRecoverer = messageRecoverer;
204+
}
205+
192206
/**
193207
* Set a batching strategy to use when de-batching messages.
194208
* Default is {@link SimpleBatchingStrategy}.
@@ -239,6 +253,7 @@ protected void onInit() {
239253
Assert.state(getErrorChannel() == null, "Cannot have an 'errorChannel' property when a 'RetryTemplate' is "
240254
+ "provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to "
241255
+ "send an error message when retries are exhausted");
256+
setupRecoveryCallbackIfAny();
242257
}
243258
Listener messageListener = new Listener();
244259
this.messageListenerContainer.setMessageListener(messageListener);
@@ -254,6 +269,21 @@ protected void onInit() {
254269
}
255270
}
256271

272+
private void setupRecoveryCallbackIfAny() {
273+
Assert.state(this.recoveryCallback == null || this.messageRecoverer == null,
274+
"Only one of 'recoveryCallback' or 'messageRecoverer' may be provided, but not both");
275+
if (this.messageRecoverer != null) {
276+
this.recoveryCallback =
277+
context -> {
278+
Message messageToRecover =
279+
(Message) RetrySynchronizationManager.getContext()
280+
.getAttribute(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE);
281+
this.messageRecoverer.recover(messageToRecover, context.getLastThrowable());
282+
return null;
283+
};
284+
}
285+
}
286+
257287
@Override
258288
protected void doStart() {
259289
super.doStart();

0 commit comments

Comments
 (0)