Skip to content

Commit 2858954

Browse files
garyrussellartembilan
authored andcommitted
GH-3584: Support spring-amqp 2.3.x and 2.4.x
# Fix deprecation warning for Reactor's `limitRequest()`
1 parent ddab28a commit 2858954

File tree

4 files changed

+37
-16
lines changed

4 files changed

+37
-16
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ ext {
100100
servletApiVersion = '4.0.1'
101101
smackVersion = '4.3.5'
102102
soapVersion = '1.4.0'
103-
springAmqpVersion = project.hasProperty('springAmqpVersion') ? project.springAmqpVersion : '2.3.9'
103+
springAmqpVersion = project.hasProperty('springAmqpVersion') ? project.springAmqpVersion : '2.3.10'
104104
springDataVersion = project.hasProperty('springDataVersion') ? project.springDataVersion : '2020.0.10'
105105
springKafkaVersion = '2.6.9'
106106
springRetryVersion = '1.3.1'

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
2828
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
2929
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
30+
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
3031
import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener;
3132
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
3233
import org.springframework.amqp.support.AmqpHeaders;
@@ -100,7 +101,9 @@ public enum BatchMode {
100101

101102
private static final ThreadLocal<AttributeAccessor> ATTRIBUTES_HOLDER = new ThreadLocal<>();
102103

103-
private final AbstractMessageListenerContainer messageListenerContainer;
104+
private final MessageListenerContainer messageListenerContainer;
105+
106+
private final AbstractMessageListenerContainer abstractListenerContainer;
104107

105108
private MessageConverter messageConverter = new SimpleMessageConverter();
106109

@@ -116,7 +119,11 @@ public enum BatchMode {
116119

117120
private BatchMode batchMode = BatchMode.MESSAGES;
118121

119-
public AmqpInboundChannelAdapter(AbstractMessageListenerContainer listenerContainer) {
122+
/**
123+
* Construct an instance using the provided container.
124+
* @param listenerContainer the container.
125+
*/
126+
public AmqpInboundChannelAdapter(MessageListenerContainer listenerContainer) {
120127
Assert.notNull(listenerContainer, "listenerContainer must not be null");
121128
Assert.isNull(listenerContainer.getMessageListener(),
122129
"The listenerContainer provided to an AMQP inbound Channel Adapter " +
@@ -125,6 +132,9 @@ public AmqpInboundChannelAdapter(AbstractMessageListenerContainer listenerContai
125132
this.messageListenerContainer = listenerContainer;
126133
this.messageListenerContainer.setAutoStartup(false);
127134
setErrorMessageStrategy(new AmqpMessageHeaderErrorMessageStrategy());
135+
this.abstractListenerContainer = listenerContainer instanceof AbstractMessageListenerContainer
136+
? (AbstractMessageListenerContainer) listenerContainer
137+
: null;
128138
}
129139

130140

@@ -214,7 +224,7 @@ protected void onInit() {
214224
else {
215225
messageListener = new Listener();
216226
}
217-
this.messageListenerContainer.setMessageListener(messageListener);
227+
this.messageListenerContainer.setupMessageListener(messageListener);
218228
this.messageListenerContainer.afterPropertiesSet();
219229
super.onInit();
220230
}
@@ -282,8 +292,11 @@ protected class Listener implements ChannelAwareMessageListener {
282292

283293
protected final MessageConverter converter = AmqpInboundChannelAdapter.this.messageConverter; // NOSONAR
284294

285-
protected final boolean manualAcks = // NNOSONAR
286-
AcknowledgeMode.MANUAL == AmqpInboundChannelAdapter.this.messageListenerContainer.getAcknowledgeMode();
295+
protected final boolean manualAcks = // NOSONAR
296+
AmqpInboundChannelAdapter.this.abstractListenerContainer == null
297+
? false
298+
: AcknowledgeMode.MANUAL == AmqpInboundChannelAdapter.this.abstractListenerContainer
299+
.getAcknowledgeMode();
287300

288301
protected final RetryOperations retryOps = AmqpInboundChannelAdapter.this.retryTemplate; // NOSONAR
289302

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundGateway.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
3030
import org.springframework.amqp.rabbit.core.RabbitTemplate;
3131
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
32+
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
3233
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
3334
import org.springframework.amqp.support.AmqpHeaders;
3435
import org.springframework.amqp.support.converter.MessageConverter;
@@ -67,7 +68,9 @@ public class AmqpInboundGateway extends MessagingGatewaySupport {
6768

6869
private static final ThreadLocal<AttributeAccessor> ATTRIBUTES_HOLDER = new ThreadLocal<>();
6970

70-
private final AbstractMessageListenerContainer messageListenerContainer;
71+
private final MessageListenerContainer messageListenerContainer;
72+
73+
private final AbstractMessageListenerContainer abstractListenerContainer;
7174

7275
private final AmqpTemplate amqpTemplate;
7376

@@ -96,17 +99,17 @@ public AmqpInboundGateway(AbstractMessageListenerContainer listenerContainer) {
9699
}
97100

98101
/**
99-
* Construct {@link AmqpInboundGateway} based on the provided {@link AbstractMessageListenerContainer}
102+
* Construct {@link AmqpInboundGateway} based on the provided {@link MessageListenerContainer}
100103
* to receive request messages and {@link AmqpTemplate} to send replies.
101-
* @param listenerContainer the {@link AbstractMessageListenerContainer} to receive AMQP messages.
104+
* @param listenerContainer the {@link MessageListenerContainer} to receive AMQP messages.
102105
* @param amqpTemplate the {@link AmqpTemplate} to send reply messages.
103106
* @since 4.2
104107
*/
105-
public AmqpInboundGateway(AbstractMessageListenerContainer listenerContainer, AmqpTemplate amqpTemplate) {
108+
public AmqpInboundGateway(MessageListenerContainer listenerContainer, AmqpTemplate amqpTemplate) {
106109
this(listenerContainer, amqpTemplate, true);
107110
}
108111

109-
private AmqpInboundGateway(AbstractMessageListenerContainer listenerContainer, AmqpTemplate amqpTemplate,
112+
private AmqpInboundGateway(MessageListenerContainer listenerContainer, AmqpTemplate amqpTemplate,
110113
boolean amqpTemplateExplicitlySet) {
111114
Assert.notNull(listenerContainer, "listenerContainer must not be null");
112115
Assert.notNull(amqpTemplate, "'amqpTemplate' must not be null");
@@ -122,6 +125,9 @@ private AmqpInboundGateway(AbstractMessageListenerContainer listenerContainer, A
122125
this.templateMessageConverter = ((RabbitTemplate) this.amqpTemplate).getMessageConverter();
123126
}
124127
setErrorMessageStrategy(new AmqpMessageHeaderErrorMessageStrategy());
128+
this.abstractListenerContainer = listenerContainer instanceof AbstractMessageListenerContainer
129+
? (AbstractMessageListenerContainer) listenerContainer
130+
: null;
125131
}
126132

127133

@@ -241,7 +247,7 @@ protected void onInit() {
241247
+ "send an error message when retries are exhausted");
242248
}
243249
Listener messageListener = new Listener();
244-
this.messageListenerContainer.setMessageListener(messageListener);
250+
this.messageListenerContainer.setupMessageListener(messageListener);
245251
this.messageListenerContainer.afterPropertiesSet();
246252
if (!this.amqpTemplateExplicitlySet) {
247253
((RabbitTemplate) this.amqpTemplate).afterPropertiesSet();
@@ -336,8 +342,9 @@ public void onMessage(final Message message, final Channel channel) {
336342
private org.springframework.messaging.Message<Object> convert(Message message, Channel channel) {
337343
Map<String, Object> headers;
338344
Object payload;
339-
boolean isManualAck =
340-
AmqpInboundGateway.this.messageListenerContainer.getAcknowledgeMode() == AcknowledgeMode.MANUAL;
345+
boolean isManualAck = AmqpInboundGateway.this.abstractListenerContainer == null
346+
? false
347+
: AcknowledgeMode.MANUAL == AmqpInboundGateway.this.abstractListenerContainer.getAcknowledgeMode();
341348
try {
342349
if (AmqpInboundGateway.this.batchingStrategy.canDebatch(message.getMessageProperties())) {
343350
List<Object> payloads = new ArrayList<>();

spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,10 +367,11 @@ private Flux<Message<?>> createFluxGenerator() {
367367
fluxSink.complete();
368368
}
369369
})
370-
.limitRequest(
370+
.take(
371371
this.maxMessagesPerPoll < 0
372372
? Long.MAX_VALUE
373-
: this.maxMessagesPerPoll)
373+
: this.maxMessagesPerPoll,
374+
true)
374375
.subscribeOn(Schedulers.fromExecutor(this.taskExecutor))
375376
.doOnComplete(() ->
376377
triggerContext

0 commit comments

Comments
 (0)