Skip to content

Commit 3d5a6a9

Browse files
committed
GH-3001: Add consume batch support to RabbitAmqpListenerContainer
Fixes: #3001 * Expose batch related options for `RabbitAmqpListenerContainer` and `RabbitAmqpListenerContainerFactory`, respectively * `batchSize` - the indicator that `RabbitAmqpListenerContainer` (and `RabbitAmqpMessageListenerAdapter`) has to work in batch mode * `batchReceiveTimeout` - how long to wait for batch to be fulfilled or release whatever was gathered so far, even if just only one message * `taskScheduler` - schedule "force batch release" after `batchReceiveTimeout` * Make `MessagingMessageListenerAdapter` `final` properties as `protected` to avoid undesired copy-paste burden
1 parent 6402004 commit 3d5a6a9

File tree

6 files changed

+437
-48
lines changed

6 files changed

+437
-48
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/BatchMessagingMessageListenerAdapter.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,23 +49,20 @@
4949
public class BatchMessagingMessageListenerAdapter extends MessagingMessageListenerAdapter
5050
implements ChannelAwareBatchMessageListener {
5151

52-
private final MessagingMessageConverterAdapter converterAdapter;
53-
5452
private final BatchingStrategy batchingStrategy;
5553

5654
@SuppressWarnings("this-escape")
5755
public BatchMessagingMessageListenerAdapter(@Nullable Object bean, @Nullable Method method, boolean returnExceptions,
5856
@Nullable RabbitListenerErrorHandler errorHandler, @Nullable BatchingStrategy batchingStrategy) {
5957

6058
super(bean, method, returnExceptions, errorHandler, true);
61-
this.converterAdapter = (MessagingMessageConverterAdapter) getMessagingMessageConverter();
6259
this.batchingStrategy = batchingStrategy == null ? new SimpleBatchingStrategy(0, 0, 0L) : batchingStrategy;
6360
}
6461

6562
@Override
6663
public void onMessageBatch(List<org.springframework.amqp.core.Message> messages, @Nullable Channel channel) {
6764
Message<?> converted;
68-
if (this.converterAdapter.isAmqpMessageList()) {
65+
if (this.messagingMessageConverter.isAmqpMessageList()) {
6966
converted = new GenericMessage<>(messages);
7067
}
7168
else {
@@ -87,7 +84,7 @@ public void onMessageBatch(List<org.springframework.amqp.core.Message> messages,
8784
}
8885
}
8986
}
90-
if (this.converterAdapter.isMessageList()) {
87+
if (this.messagingMessageConverter.isMessageList()) {
9188
converted = new GenericMessage<>(messagingMessages);
9289
}
9390
else {
@@ -178,17 +175,17 @@ private void asyncFailure(List<org.springframework.amqp.core.Message> requests,
178175
protected Message<?> toMessagingMessage(org.springframework.amqp.core.Message amqpMessage) {
179176
if (this.batchingStrategy.canDebatch(amqpMessage.getMessageProperties())) {
180177

181-
if (this.converterAdapter.isMessageList()) {
178+
if (this.messagingMessageConverter.isMessageList()) {
182179
List<Message<?>> messages = new ArrayList<>();
183180
this.batchingStrategy.deBatch(amqpMessage, fragment -> messages.add(super.toMessagingMessage(fragment)));
184181
return new GenericMessage<>(messages);
185182
}
186183
else {
187184
List<Object> list = new ArrayList<>();
188185
this.batchingStrategy.deBatch(amqpMessage, fragment ->
189-
list.add(this.converterAdapter.extractPayload(fragment)));
186+
list.add(this.messagingMessageConverter.extractPayload(fragment)));
190187
return MessageBuilder.withPayload(list)
191-
.copyHeaders(this.converterAdapter
188+
.copyHeaders(this.messagingMessageConverter
192189
.getHeaderMapper()
193190
.toHeaders(amqpMessage.getMessageProperties()))
194191
.build();

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,11 @@
6969
*/
7070
public class MessagingMessageListenerAdapter extends AbstractAdaptableMessageListener {
7171

72-
private final MessagingMessageConverterAdapter messagingMessageConverter;
72+
protected final MessagingMessageConverterAdapter messagingMessageConverter;
7373

74-
private final boolean returnExceptions;
74+
protected final boolean returnExceptions;
7575

76-
private final @Nullable RabbitListenerErrorHandler errorHandler;
76+
protected final @Nullable RabbitListenerErrorHandler errorHandler;
7777

7878
private @Nullable HandlerAdapter handlerAdapter;
7979

@@ -364,15 +364,15 @@ protected final class MessagingMessageConverterAdapter extends MessagingMessageC
364364
}
365365
}
366366

367-
protected boolean isMessageList() {
367+
public boolean isMessageList() {
368368
return this.isMessageList;
369369
}
370370

371-
protected boolean isAmqpMessageList() {
371+
public boolean isAmqpMessageList() {
372372
return this.isAmqpMessageList;
373373
}
374374

375-
protected @Nullable Method getMethod() {
375+
public @Nullable Method getMethod() {
376376
return this.method;
377377
}
378378

spring-rabbitmq-client/src/main/java/org/springframework/amqp/rabbitmq/client/config/RabbitAmqpListenerContainerFactory.java

Lines changed: 68 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,21 @@
1616

1717
package org.springframework.amqp.rabbitmq.client.config;
1818

19+
import java.util.Arrays;
20+
1921
import com.rabbitmq.client.amqp.Connection;
20-
import org.aopalliance.aop.Advice;
2122
import org.jspecify.annotations.Nullable;
2223

24+
import org.springframework.amqp.core.MessageListener;
25+
import org.springframework.amqp.core.MessagePostProcessor;
2326
import org.springframework.amqp.rabbit.config.BaseRabbitListenerContainerFactory;
2427
import org.springframework.amqp.rabbit.config.ContainerCustomizer;
2528
import org.springframework.amqp.rabbit.listener.MethodRabbitListenerEndpoint;
2629
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpoint;
2730
import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpListenerContainer;
2831
import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpMessageListenerAdapter;
2932
import org.springframework.amqp.utils.JavaUtils;
33+
import org.springframework.scheduling.TaskScheduler;
3034

3135
/**
3236
* Factory for {@link RabbitAmqpListenerContainer}.
@@ -45,6 +49,14 @@ public class RabbitAmqpListenerContainerFactory
4549

4650
private @Nullable ContainerCustomizer<RabbitAmqpListenerContainer> containerCustomizer;
4751

52+
private MessagePostProcessor @Nullable [] afterReceivePostProcessors;
53+
54+
private @Nullable Integer batchSize;
55+
56+
private @Nullable Long batchReceiveTimeout;
57+
58+
private @Nullable TaskScheduler taskScheduler;
59+
4860
/**
4961
* Construct an instance using the provided amqpConnection.
5062
* @param amqpConnection the connection.
@@ -62,18 +74,69 @@ public void setContainerCustomizer(ContainerCustomizer<RabbitAmqpListenerContain
6274
this.containerCustomizer = containerCustomizer;
6375
}
6476

77+
/**
78+
* Set {@link MessagePostProcessor}s that will be applied after message reception, before
79+
* invoking the {@link MessageListener}. Often used to decompress data. Processors are invoked in order,
80+
* depending on {@code PriorityOrder}, {@code Order} and finally unordered.
81+
* @param afterReceivePostProcessors the post processor.
82+
*/
83+
public void setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors) {
84+
this.afterReceivePostProcessors = Arrays.copyOf(afterReceivePostProcessors, afterReceivePostProcessors.length);
85+
}
86+
87+
/**
88+
* The size of the batch of messages to process.
89+
* This is only option (if {@code batchSize > 1}) which turns the target listener container into a batch mode.
90+
* @param batchSize the batch size.
91+
* @see RabbitAmqpListenerContainer#setBatchSize
92+
* @see #setBatchReceiveTimeout(Long)
93+
*/
94+
public void setBatchSize(Integer batchSize) {
95+
this.batchSize = batchSize;
96+
}
97+
98+
/**
99+
* The number of milliseconds of timeout for gathering batch messages.
100+
* It limits the time to wait to fill batchSize.
101+
* Default is 30 seconds.
102+
* @param batchReceiveTimeout the timeout for gathering batch messages.
103+
* @see RabbitAmqpListenerContainer#setBatchReceiveTimeout
104+
* @see #setBatchSize(Integer)
105+
*/
106+
public void setBatchReceiveTimeout(Long batchReceiveTimeout) {
107+
this.batchReceiveTimeout = batchReceiveTimeout;
108+
}
109+
110+
/**
111+
* Configure a {@link TaskScheduler} to release not fulfilled batches after timeout.
112+
* @param taskScheduler the {@link TaskScheduler} to use.
113+
* @see RabbitAmqpListenerContainer#setTaskScheduler(TaskScheduler)
114+
* @see #setBatchReceiveTimeout(Long)
115+
*/
116+
public void setTaskScheduler(TaskScheduler taskScheduler) {
117+
this.taskScheduler = taskScheduler;
118+
}
119+
65120
@Override
66121
public RabbitAmqpListenerContainer createListenerContainer(@Nullable RabbitListenerEndpoint endpoint) {
67122
if (endpoint instanceof MethodRabbitListenerEndpoint methodRabbitListenerEndpoint) {
123+
JavaUtils.INSTANCE
124+
.acceptIfCondition(this.batchSize != null && this.batchSize > 1,
125+
true,
126+
methodRabbitListenerEndpoint::setBatchListener);
127+
68128
methodRabbitListenerEndpoint.setAdapterProvider(
69129
(batch, bean, method, returnExceptions, errorHandler, batchingStrategy) ->
70-
new RabbitAmqpMessageListenerAdapter(bean, method, returnExceptions, errorHandler));
130+
new RabbitAmqpMessageListenerAdapter(bean, method, returnExceptions, errorHandler, batch));
71131
}
72132
RabbitAmqpListenerContainer container = createContainerInstance();
73-
Advice[] adviceChain = getAdviceChain();
74133
JavaUtils.INSTANCE
75-
.acceptIfNotNull(adviceChain, container::setAdviceChain)
76-
.acceptIfNotNull(getDefaultRequeueRejected(), container::setDefaultRequeue);
134+
.acceptIfNotNull(getAdviceChain(), container::setAdviceChain)
135+
.acceptIfNotNull(getDefaultRequeueRejected(), container::setDefaultRequeue)
136+
.acceptIfNotNull(this.afterReceivePostProcessors, container::setAfterReceivePostProcessors)
137+
.acceptIfNotNull(this.batchSize, container::setBatchSize)
138+
.acceptIfNotNull(this.batchReceiveTimeout, container::setBatchReceiveTimeout)
139+
.acceptIfNotNull(this.taskScheduler, container::setTaskScheduler);
77140

78141
applyCommonOverrides(endpoint, container);
79142

0 commit comments

Comments
 (0)