Skip to content

Commit 9f67c40

Browse files
belljun3395olegz
authored andcommitted
GH-3147 Add consumer priority support to Rabbit binder properties
Signed-off-by: belljun3395 <[email protected]>
1 parent aa3b308 commit 9f67c40

File tree

5 files changed

+90
-0
lines changed

5 files changed

+90
-0
lines changed

binders/kafka-binder/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaConsumerProperties.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,16 @@ public enum StandardHeaders {
210210
*/
211211
private boolean reactiveAtMostOnce;
212212

213+
/**
214+
* Consumer priority level. NOTE: Kafka does not natively support consumer priority.
215+
* This property is provided for consistency across binders but has no effect in Kafka.
216+
* For even message distribution across servers, use partition assignment strategies
217+
* or create separate bindings with concurrency=1.
218+
* Default: -1 (not supported)
219+
* @since 4.2
220+
*/
221+
private int consumerPriority = -1;
222+
213223
/**
214224
* @return Container's ack mode.
215225
*/
@@ -486,4 +496,12 @@ public void setReactiveAtMostOnce(boolean reactiveAtMostOnce) {
486496
this.reactiveAtMostOnce = reactiveAtMostOnce;
487497
}
488498

499+
public int getConsumerPriority() {
500+
return this.consumerPriority;
501+
}
502+
503+
public void setConsumerPriority(int consumerPriority) {
504+
this.consumerPriority = consumerPriority;
505+
}
506+
489507
}

binders/rabbit-binder/spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/properties/RabbitConsumerProperties.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,19 @@ public class RabbitConsumerProperties extends RabbitCommonProperties {
145145
*/
146146
private boolean superStream;
147147

148+
/**
149+
* Consumer priority for this consumer. Higher values indicate higher priority.
150+
* Requires the queue to be declared with x-max-priority argument.
151+
* Valid range: 0-255. Default: -1 (no priority set).
152+
*/
153+
private int consumerPriority = -1;
154+
155+
/**
156+
* Maximum priority for the queue. When set, the queue will be declared with
157+
* x-max-priority argument. Valid range: 1-255. Default: -1 (not set).
158+
*/
159+
private int queueMaxPriority = -1;
160+
148161
public boolean isTransacted() {
149162
return transacted;
150163
}
@@ -325,6 +338,22 @@ public void setSuperStream(boolean superStream) {
325338
this.superStream = superStream;
326339
}
327340

341+
public int getConsumerPriority() {
342+
return this.consumerPriority;
343+
}
344+
345+
public void setConsumerPriority(int consumerPriority) {
346+
this.consumerPriority = consumerPriority;
347+
}
348+
349+
public int getQueueMaxPriority() {
350+
return this.queueMaxPriority;
351+
}
352+
353+
public void setQueueMaxPriority(int queueMaxPriority) {
354+
this.queueMaxPriority = queueMaxPriority;
355+
}
356+
328357
/**
329358
* Container type.
330359
* @author Gary Russell

binders/rabbit-binder/spring-cloud-stream-binder-rabbit-core/src/main/java/org/springframework/cloud/stream/binder/rabbit/provisioning/RabbitExchangeQueueProvisioner.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -608,6 +608,15 @@ private void additionalArgs(Map<String, Object> args, RabbitCommonProperties pro
608608
: properties.getMaxLengthBytes();
609609
Integer maxPriority = isDlq ? properties.getDlqMaxPriority()
610610
: properties.getMaxPriority();
611+
612+
// Add queue max priority for consumer priority support
613+
if (!isDlq && properties instanceof RabbitConsumerProperties) {
614+
RabbitConsumerProperties consumerProps = (RabbitConsumerProperties) properties;
615+
if (consumerProps.getQueueMaxPriority() > 0) {
616+
maxPriority = consumerProps.getQueueMaxPriority();
617+
}
618+
}
619+
611620
Integer ttl = isDlq ? properties.getDlqTtl() : properties.getTtl();
612621
boolean lazy = isDlq ? properties.isDlqLazy() : properties.isLazy();
613622
String overflow = isDlq ? properties.getDlqOverflowBehavior()

binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.StringWriter;
2222
import java.util.ArrayList;
2323
import java.util.Arrays;
24+
import java.util.HashMap;
2425
import java.util.List;
2526
import java.util.Map;
2627
import java.util.concurrent.ExecutionException;
@@ -604,6 +605,12 @@ else if (getApplicationContext() != null) {
604605
q -> extension.getConsumerTagPrefix() + "#"
605606
+ index.getAndIncrement());
606607
}
608+
// Set consumer priority if configured
609+
if (extension.getConsumerPriority() >= 0) {
610+
Map<String, Object> consumerArgs = new HashMap<>();
611+
consumerArgs.put("x-priority", extension.getConsumerPriority());
612+
listenerContainer.setConsumerArguments(consumerArgs);
613+
}
607614
listenerContainer.setApplicationContext(getApplicationContext());
608615
return listenerContainer;
609616
}

binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/test/java/org/springframework/cloud/stream/binder/rabbit/RabbitBinderTests.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2732,6 +2732,33 @@ public void setField(String field) {
27322732
}
27332733

27342734
}
2735+
2736+
@Test
2737+
void testConsumerPriority() throws Exception {
2738+
RabbitTestBinder binder = getBinder();
2739+
ExtendedConsumerProperties<RabbitConsumerProperties> consumerProperties = createConsumerProperties();
2740+
consumerProperties.getExtension().setConsumerPriority(10);
2741+
consumerProperties.getExtension().setQueueMaxPriority(10);
2742+
2743+
DirectChannel moduleInputChannel = createBindableChannel("input", new BindingProperties());
2744+
2745+
Binding<MessageChannel> consumerBinding = binder.bindConsumer("priority.test", "priorityGroup",
2746+
moduleInputChannel, consumerProperties);
2747+
2748+
AbstractMessageListenerContainer container =
2749+
TestUtils.getPropertyValue(consumerBinding, "lifecycle.messageListenerContainer",
2750+
AbstractMessageListenerContainer.class);
2751+
2752+
Map<String, Object> consumerArgs = container.getConsumerArguments();
2753+
2754+
assertThat(consumerArgs).isNotNull();
2755+
assertThat(consumerArgs.get("x-priority")).isEqualTo(10);
2756+
assertThat(consumerProperties.getExtension().getConsumerPriority()).isEqualTo(10);
2757+
assertThat(consumerProperties.getExtension().getQueueMaxPriority()).isEqualTo(10);
2758+
2759+
consumerBinding.unbind();
2760+
}
2761+
27352762
// @checkstyle:on
27362763

27372764
public static class TestBatchingStrategy implements BatchingStrategy {

0 commit comments

Comments
 (0)