Skip to content

Commit dcaa696

Browse files
garyrussellartembilan
authored andcommitted
GH-1318: Support the Global Flag in basicQos
Resolves #1318 **cherry-pick to 2.2.x** # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumerTests.java # src/reference/asciidoc/amqp.adoc # src/reference/asciidoc/whats-new.adoc
1 parent d06e75e commit dcaa696

File tree

15 files changed

+117
-24
lines changed

15 files changed

+117
-24
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRabbitListenerContainerFactory.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2020 the original author or authors.
2+
* Copyright 2014-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -87,6 +87,8 @@ public abstract class AbstractRabbitListenerContainerFactory<C extends AbstractM
8787

8888
private Integer prefetchCount;
8989

90+
private Boolean globalQos;
91+
9092
private Boolean defaultRequeueRejected;
9193

9294
private Advice[] adviceChain;
@@ -400,6 +402,16 @@ public void setDeBatchingEnabled(final Boolean deBatchingEnabled) {
400402
this.deBatchingEnabled = deBatchingEnabled;
401403
}
402404

405+
/**
406+
* Apply prefetch to the entire channel.
407+
* @param globalQos true for a channel-wide prefetch.
408+
* @since 2.2.17
409+
* @see com.rabbitmq.client.Channel#basicQos(int, boolean)
410+
*/
411+
public void setGlobalQos(boolean globalQos) {
412+
this.globalQos = globalQos;
413+
}
414+
403415
@Override
404416
public C createListenerContainer(RabbitListenerEndpoint endpoint) {
405417
C instance = createContainerInstance();
@@ -418,6 +430,7 @@ public C createListenerContainer(RabbitListenerEndpoint endpoint) {
418430
.acceptIfNotNull(this.taskExecutor, instance::setTaskExecutor)
419431
.acceptIfNotNull(this.transactionManager, instance::setTransactionManager)
420432
.acceptIfNotNull(this.prefetchCount, instance::setPrefetchCount)
433+
.acceptIfNotNull(this.globalQos, instance::setGlobalQos)
421434
.acceptIfNotNull(this.defaultRequeueRejected, instance::setDefaultRequeueRejected)
422435
.acceptIfNotNull(this.adviceChain, instance::setAdviceChain)
423436
.acceptIfNotNull(this.recoveryBackOff, instance::setRecoveryBackOff)

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -48,6 +48,8 @@
4848
import org.springframework.util.ErrorHandler;
4949
import org.springframework.util.backoff.BackOff;
5050

51+
import com.rabbitmq.client.Channel;
52+
5153
/**
5254
* A Factory bean to create a listener container.
5355
*
@@ -111,6 +113,8 @@ public class ListenerContainerFactoryBean extends AbstractFactoryBean<AbstractMe
111113

112114
private Integer prefetchCount;
113115

116+
private Boolean globalQos;
117+
114118
private Long shutdownTimeout;
115119

116120
private Long idleEventInterval;
@@ -268,6 +272,16 @@ public void setPrefetchCount(int prefetchCount) {
268272
this.prefetchCount = prefetchCount;
269273
}
270274

275+
/**
276+
* Apply prefetch to the entire channel.
277+
* @param globalQos true for a channel-wide prefetch.
278+
* @since 2.2.17
279+
* @see Channel#basicQos(int, boolean)
280+
*/
281+
public void setGlobalQos(boolean globalQos) {
282+
this.globalQos = globalQos;
283+
}
284+
271285
public void setShutdownTimeout(long shutdownTimeout) {
272286
this.shutdownTimeout = shutdownTimeout;
273287
}
@@ -460,6 +474,7 @@ protected AbstractMessageListenerContainer createInstance() { // NOSONAR complex
460474
.acceptIfNotNull(this.exclusive, container::setExclusive)
461475
.acceptIfNotNull(this.defaultRequeueRejected, container::setDefaultRequeueRejected)
462476
.acceptIfNotNull(this.prefetchCount, container::setPrefetchCount)
477+
.acceptIfNotNull(this.globalQos, container::setGlobalQos)
463478
.acceptIfNotNull(this.shutdownTimeout, container::setShutdownTimeout)
464479
.acceptIfNotNull(this.idleEventInterval, container::setIdleEventInterval)
465480
.acceptIfNotNull(this.transactionManager, container::setTransactionManager)

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/RabbitNamespaceUtils.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -68,6 +68,8 @@ public final class RabbitNamespaceUtils {
6868

6969
private static final String PREFETCH_ATTRIBUTE = "prefetch";
7070

71+
private static final String GLOBAL_QOS = "global-qos";
72+
7173
private static final String RECEIVE_TIMEOUT_ATTRIBUTE = "receive-timeout";
7274

7375
private static final String CHANNEL_TRANSACTED_ATTRIBUTE = "channel-transacted";
@@ -204,6 +206,11 @@ public static BeanDefinition parseContainer(Element containerEle, ParserContext
204206
containerDef.getPropertyValues().add("prefetchCount", new TypedStringValue(prefetch));
205207
}
206208

209+
String globalQos = containerEle.getAttribute(GLOBAL_QOS);
210+
if (StringUtils.hasText(globalQos)) {
211+
containerDef.getPropertyValues().add("globalQos", new TypedStringValue(globalQos));
212+
}
213+
207214
String receiveTimeout = containerEle.getAttribute(RECEIVE_TIMEOUT_ATTRIBUTE);
208215
if (StringUtils.hasText(receiveTimeout)) {
209216
containerDef.getPropertyValues().add("receiveTimeout", new TypedStringValue(receiveTimeout));

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,8 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor
220220

221221
private volatile int prefetchCount = DEFAULT_PREFETCH_COUNT;
222222

223+
private boolean globalQos;
224+
223225
private long idleEventInterval;
224226

225227
private volatile long lastReceive = System.currentTimeMillis();
@@ -793,6 +795,7 @@ protected boolean isDefaultRequeueRejected() {
793795
* Tell the broker how many messages to send to each consumer in a single request.
794796
* Often this can be set quite high to improve throughput.
795797
* @param prefetchCount the prefetch count
798+
* @see com.rabbitmq.client.Channel#basicQos(int, boolean)
796799
*/
797800
public void setPrefetchCount(int prefetchCount) {
798801
this.prefetchCount = prefetchCount;
@@ -807,6 +810,20 @@ protected int getPrefetchCount() {
807810
return this.prefetchCount;
808811
}
809812

813+
/**
814+
* Apply prefetchCount to the entire channel.
815+
* @param globalQos true for a channel-wide prefetch.
816+
* @since 2.2.17
817+
* @see com.rabbitmq.client.Channel#basicQos(int, boolean)
818+
*/
819+
public void setGlobalQos(boolean globalQos) {
820+
this.globalQos = globalQos;
821+
}
822+
823+
protected boolean isGlobalQos() {
824+
return this.globalQos;
825+
}
826+
810827
/**
811828
* The time to wait for workers in milliseconds after the container is stopped. If any
812829
* workers are active when the shutdown signal comes they will be allowed to finish

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ public class BlockingQueueConsumer {
160160

161161
private java.util.function.Consumer<String> missingQueuePublisher = str -> { };
162162

163+
private boolean globalQos;
164+
163165
private volatile long abortStarted;
164166

165167
private volatile boolean normalCancel;
@@ -392,6 +394,16 @@ public void clearDeliveryTags() {
392394
this.deliveryTags.clear();
393395
}
394396

397+
/**
398+
* Apply prefetch to the entire channel.
399+
* @param globalQos true for a channel-wide prefetch.
400+
* @since 2.2.17
401+
* @see Channel#basicQos(int, boolean)
402+
*/
403+
public void setGlobalQos(boolean globalQos) {
404+
this.globalQos = globalQos;
405+
}
406+
395407
/**
396408
* Return true if cancellation is expected.
397409
* @return true if expected.
@@ -608,10 +620,8 @@ private void passiveDeclarations() {
608620

609621
private void setQosAndreateConsumers() {
610622
if (!this.acknowledgeMode.isAutoAck() && !cancelled()) {
611-
// Set basicQos before calling basicConsume (otherwise if we are not acking the broker
612-
// will send blocks of 100 messages)
613623
try {
614-
this.channel.basicQos(this.prefetchCount);
624+
this.channel.basicQos(this.prefetchCount, this.globalQos);
615625
}
616626
catch (IOException e) {
617627
this.activeObjectCounter.release(this);

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -724,7 +724,7 @@ private SimpleConsumer consume(String queue, int index, Connection connection) {
724724
SimpleConsumer consumer = null;
725725
try {
726726
channel = connection.createChannel(isChannelTransacted());
727-
channel.basicQos(getPrefetchCount());
727+
channel.basicQos(getPrefetchCount(), isGlobalQos());
728728
consumer = new SimpleConsumer(connection, channel, queue, index);
729729
channel.queueDeclarePassive(queue);
730730
consumer.consumerTag = channel.basicConsume(queue, getAcknowledgeMode().isAutoAck(),

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -831,6 +831,7 @@ protected BlockingQueueConsumer createBlockingQueueConsumer() {
831831
consumer = new BlockingQueueConsumer(getConnectionFactory(), getMessagePropertiesConverter(),
832832
this.cancellationLock, getAcknowledgeMode(), isChannelTransacted(), actualPrefetchCount,
833833
isDefaultRequeueRejected(), getConsumerArguments(), isNoLocal(), isExclusive(), queues);
834+
consumer.setGlobalQos(isGlobalQos());
834835
consumer.setMissingQueuePublisher(this::publishMissingQueueEvent);
835836
if (this.declarationRetries != null) {
836837
consumer.setDeclarationRetries(this.declarationRetries);

spring-rabbit/src/main/resources/org/springframework/amqp/rabbit/config/spring-rabbit-2.2.xsd

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -728,11 +728,24 @@
728728
<xsd:attribute name="prefetch" type="xsd:string">
729729
<xsd:annotation>
730730
<xsd:documentation><![CDATA[
731-
Tells the broker how many messages to send to each consumer in a single request. Often this can be set quite high
732-
to improve throughput. It should be greater than or equal to the transaction size.
731+
Tells the broker how many messages to send to each consumer (or channel) in a single request.
732+
Often this can be set quite high
733+
to improve throughput, but it can cause starvation when you have multiple application instances
734+
and low message volume.
735+
It should be greater than or equal to the batch size. Also see 'global-qos'.
733736
]]></xsd:documentation>
734737
</xsd:annotation>
735738
</xsd:attribute>
739+
<xsd:attribute name="global-qos">
740+
<xsd:annotation>
741+
<xsd:documentation><![CDATA[
742+
When true, apply the 'prefetch' globally to the channel rather than to each consumer on the channel.
743+
]]></xsd:documentation>
744+
</xsd:annotation>
745+
<xsd:simpleType>
746+
<xsd:union memberTypes="xsd:boolean xsd:string" />
747+
</xsd:simpleType>
748+
</xsd:attribute>
736749
<xsd:attribute name="transaction-size" type="xsd:string">
737750
<xsd:annotation>
738751
<xsd:documentation><![CDATA[

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/ListenerContainerParserTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2010-2020 the original author or authors.
2+
* Copyright 2010-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -104,6 +104,7 @@ public void testParseWithQueueNames() {
104104
assertThat(ReflectionTestUtils.getField(container, "idleEventInterval")).isEqualTo(1235L);
105105
assertThat(container.getListenerId()).isEqualTo("container1");
106106
assertThat(TestUtils.getPropertyValue(container, "mismatchedQueuesFatal", Boolean.class)).isTrue();
107+
assertThat(TestUtils.getPropertyValue(container, "globalQos", Boolean.class)).isFalse();
107108
}
108109

109110
@Test
@@ -149,6 +150,7 @@ public void testParseWithQueues() {
149150
assertThat(Arrays.asList(container.getQueueNames()).toString()).isEqualTo("[foo, " + queue.getName() + "]");
150151
assertThat(TestUtils.getPropertyValue(container, "missingQueuesFatal", Boolean.class)).isTrue();
151152
assertThat(TestUtils.getPropertyValue(container, "autoDeclare", Boolean.class)).isFalse();
153+
assertThat(TestUtils.getPropertyValue(container, "globalQos", Boolean.class)).isTrue();
152154
}
153155

154156
@Test

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/RabbitListenerContainerFactoryTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -103,6 +103,7 @@ public void createContainerFullConfig() {
103103
this.factory.setRecoveryBackOff(recoveryBackOff);
104104
this.factory.setMissingQueuesFatal(true);
105105
this.factory.setAfterReceivePostProcessors(afterReceivePostProcessor);
106+
this.factory.setGlobalQos(true);
106107
this.factory.setContainerCustomizer(c -> c.setShutdownTimeout(10_000));
107108

108109
assertThat(this.factory.getAdviceChain()).isEqualTo(new Advice[]{advice});
@@ -138,6 +139,7 @@ public void createContainerFullConfig() {
138139
List<?> actualAfterReceivePostProcessors = (List<?>) fieldAccessor.getPropertyValue("afterReceivePostProcessors");
139140
assertThat(actualAfterReceivePostProcessors.size()).as("Wrong number of afterReceivePostProcessors").isEqualTo(1);
140141
assertThat(actualAfterReceivePostProcessors.get(0)).as("Wrong advice").isSameAs(afterReceivePostProcessor);
142+
assertThat(fieldAccessor.getPropertyValue("globalQos")).isEqualTo(true);
141143
}
142144

143145
@Test

0 commit comments

Comments
 (0)