Skip to content

Commit f486a30

Browse files
committed
Introduce spring-rabbitmq-client for RabbitMQ AMQP 1.0
Fixes: #2991 Fixes: #2992 Fixes: #2993 * Add `QueueInformation.type` since RabbitMQ AMQP Client exposes such an info
1 parent fbd93a9 commit f486a30

File tree

13 files changed

+1745
-12
lines changed

13 files changed

+1745
-12
lines changed

build.gradle

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ ext {
5959
micrometerVersion = '1.15.0-SNAPSHOT'
6060
micrometerTracingVersion = '1.5.0-SNAPSHOT'
6161
mockitoVersion = '5.15.2'
62+
rabbitmqAmqpClientVersion = '0.4.0'
6263
rabbitmqStreamVersion = '0.22.0'
6364
rabbitmqVersion = '5.24.0'
6465
reactorVersion = '2024.0.3'
@@ -472,6 +473,28 @@ project('spring-rabbit-stream') {
472473
}
473474
}
474475

476+
project('spring-rabbitmq-client') {
477+
description = 'Spring RabbitMQ Client for AMQP 1.0'
478+
479+
dependencies {
480+
api project(':spring-rabbit')
481+
api "com.rabbitmq.client:amqp-client:$rabbitmqAmqpClientVersion"
482+
api 'io.micrometer:micrometer-observation'
483+
484+
testApi project(':spring-rabbit-junit')
485+
486+
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-databind'
487+
488+
testImplementation 'org.testcontainers:rabbitmq'
489+
testImplementation 'org.testcontainers:junit-jupiter'
490+
testImplementation 'org.apache.logging.log4j:log4j-slf4j-impl'
491+
testImplementation 'io.micrometer:micrometer-observation-test'
492+
testImplementation 'io.micrometer:micrometer-tracing-bridge-brave'
493+
testImplementation 'io.micrometer:micrometer-tracing-test'
494+
testImplementation 'io.micrometer:micrometer-tracing-integration-test'
495+
}
496+
}
497+
475498
project('spring-rabbit-junit') {
476499
description = 'Spring Rabbit JUnit Support'
477500

spring-amqp/src/main/java/org/springframework/amqp/core/QueueInformation.java

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2024 the original author or authors.
2+
* Copyright 2019-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,18 +21,22 @@
2121
*
2222
* @author Gary Russell
2323
* @author Ngoc Nhan
24+
* @author Artem Bilan
25+
*
2426
* @since 2.2
2527
*
2628
*/
2729
public class QueueInformation {
2830

2931
private final String name;
3032

31-
private final int messageCount;
33+
private final long messageCount;
3234

3335
private final int consumerCount;
3436

35-
public QueueInformation(String name, int messageCount, int consumerCount) {
37+
private String type = "classic";
38+
39+
public QueueInformation(String name, long messageCount, int consumerCount) {
3640
this.name = name;
3741
this.messageCount = messageCount;
3842
this.consumerCount = consumerCount;
@@ -42,19 +46,38 @@ public String getName() {
4246
return this.name;
4347
}
4448

45-
public int getMessageCount() {
49+
public long getMessageCount() {
4650
return this.messageCount;
4751
}
4852

4953
public int getConsumerCount() {
5054
return this.consumerCount;
5155
}
5256

57+
/**
58+
* Return a queue type.
59+
* {@code classic} by default since AMQP 0.9.1 protocol does not return this info in {@code DeclareOk} reply.
60+
* @return a queue type
61+
* @since 4.0
62+
*/
63+
public String getType() {
64+
return this.type;
65+
}
66+
67+
/**
68+
* Set a queue type.
69+
* @param type the queue type: {@code quorum}, {@code classic} or {@code stream}
70+
* @since 4.0
71+
*/
72+
public void setType(String type) {
73+
this.type = type;
74+
}
75+
5376
@Override
5477
public int hashCode() {
5578
final int prime = 31;
5679
int result = 1;
57-
result = prime * result + ((this.name == null) ? 0 : this.name.hashCode());
80+
result = prime * result + this.name.hashCode();
5881
return result;
5982
}
6083

@@ -70,9 +93,6 @@ public boolean equals(Object obj) {
7093
return false;
7194
}
7295
QueueInformation other = (QueueInformation) obj;
73-
if (this.name == null) {
74-
return other.name == null;
75-
}
7696
return this.name.equals(other.name);
7797
}
7898

spring-amqp/src/main/java/org/springframework/amqp/utils/JavaUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ private JavaUtils() {
4444
}
4545

4646
/**
47-
* Invoke {@link Consumer#accept(Object)} with the value if the condition is true.
47+
* Invoke {@link Consumer#accept(Object)} with the value if it is not null and the condition is true.
4848
* @param condition the condition.
49-
* @param value the value.
49+
* @param value the value. Skipped if null.
5050
* @param consumer the consumer.
5151
* @param <T> the value type.
5252
* @return this.

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitAdmin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, Applicat
119119
*/
120120
public static final Object QUEUE_CONSUMER_COUNT = "QUEUE_CONSUMER_COUNT";
121121

122-
private static final String DELAYED_MESSAGE_EXCHANGE = "x-delayed-message";
122+
public static final String DELAYED_MESSAGE_EXCHANGE = "x-delayed-message";
123123

124124
/** Logger available to subclasses. */
125125
protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ public void testProperties() throws Exception {
163163
}
164164
}
165165

166-
private int messageCount(RabbitAdmin rabbitAdmin, String queueName) {
166+
private long messageCount(RabbitAdmin rabbitAdmin, String queueName) {
167167
QueueInformation info = rabbitAdmin.getQueueInfo(queueName);
168168
assertThat(info).isNotNull();
169169
return info.getMessageCount();

0 commit comments

Comments
 (0)