Skip to content

Commit 0f76653

Browse files
authored
Merge pull request #90 from rabbitmq/queue-info-members
Deprecate QueueInfo#replicas() in favor of QueueInfo#members()
2 parents 94da41a + 98e7288 commit 0f76653

File tree

7 files changed

+51
-31
lines changed

7 files changed

+51
-31
lines changed

src/main/java/com/rabbitmq/client/amqp/Management.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -861,12 +861,20 @@ interface QueueInfo {
861861
String leader();
862862

863863
/**
864-
* The nodes the queue has replicas (members) on.
864+
* Deprecated, use {@link #members()} instead.
865865
*
866-
* @return the nodes of the queue replicas (members)
866+
* @return the nodes of the queue members
867867
*/
868+
@Deprecated(forRemoval = true)
868869
List<String> replicas();
869870

871+
/**
872+
* The nodes the queue has members on.
873+
*
874+
* @return the nodes of the queue members
875+
*/
876+
List<String> members();
877+
870878
/**
871879
* The number of messages in the queue.
872880
*

src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,7 @@ private static class DefaultQueueInfo implements QueueInfo {
611611
private final QueueType type;
612612
private final Map<String, Object> arguments;
613613
private final String leader;
614-
private final List<String> replicas;
614+
private final List<String> members;
615615
private final long messageCount;
616616
private final int consumerCount;
617617

@@ -624,11 +624,11 @@ private DefaultQueueInfo(Map<String, Object> response) {
624624
this.type = QueueType.valueOf(((String) response.get("type")).toUpperCase(Locale.ENGLISH));
625625
this.arguments = Map.copyOf((Map<String, Object>) response.get("arguments"));
626626
this.leader = (String) response.get("leader");
627-
String[] replicas = (String[]) response.get("replicas");
628-
if (replicas == null || replicas.length == 0) {
629-
this.replicas = Collections.emptyList();
627+
String[] members = (String[]) response.get("replicas");
628+
if (members == null || members.length == 0) {
629+
this.members = Collections.emptyList();
630630
} else {
631-
this.replicas = List.of(replicas);
631+
this.members = List.of(members);
632632
}
633633
this.messageCount = ((Number) response.get("message_count")).longValue();
634634
this.consumerCount = ((Number) response.get("consumer_count")).intValue();
@@ -670,8 +670,14 @@ public String leader() {
670670
}
671671

672672
@Override
673+
@SuppressWarnings("removal")
673674
public List<String> replicas() {
674-
return this.replicas;
675+
return this.members();
676+
}
677+
678+
@Override
679+
public List<String> members() {
680+
return this.members;
675681
}
676682

677683
@Override
@@ -704,7 +710,7 @@ public String toString() {
704710
+ leader
705711
+ '\''
706712
+ ", replicas="
707-
+ replicas
713+
+ members
708714
+ ", messageCount="
709715
+ messageCount
710716
+ ", consumerCount="

src/main/java/com/rabbitmq/client/amqp/impl/ConnectionUtils.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ static AmqpConnection.NativeConnectionWrapper enforceAffinity(
8383
info.name(),
8484
info.type(),
8585
info.leader(),
86-
info.replicas(),
86+
info.members(),
8787
connectionName);
8888
if (nodesWithAffinity == null) {
8989
nodesWithAffinity = strategy.nodesWithAffinity(context, info);
@@ -255,9 +255,9 @@ static class LeaderForPublishingMembersForConsumingStrategy
255255
public List<String> nodesWithAffinity(
256256
ConnectionSettings.AffinityContext context, Management.QueueInfo info) {
257257
List<String> nodesWithAffinity =
258-
(info.replicas() == null || info.replicas().isEmpty())
258+
(info.members() == null || info.members().isEmpty())
259259
? Collections.emptyList()
260-
: List.copyOf(info.replicas());
260+
: List.copyOf(info.members());
261261
if (context.operation() == ConnectionSettings.Affinity.Operation.PUBLISH) {
262262
if (info.leader() != null && !info.leader().isBlank()) {
263263
nodesWithAffinity = List.of(info.leader());
@@ -277,7 +277,7 @@ public List<String> nodesWithAffinity(
277277
ConnectionSettings.AffinityContext context, Management.QueueInfo info) {
278278
ConnectionSettings.Affinity.Operation operation = context.operation();
279279
String leader = info.leader();
280-
List<String> replicas = info.replicas() == null ? Collections.emptyList() : info.replicas();
280+
List<String> replicas = info.members() == null ? Collections.emptyList() : info.members();
281281
List<String> nodesWithAffinity;
282282
LOGGER.debug(
283283
"Trying to find affinity {} with leader = {}, replicas = {}", context, leader, replicas);

src/test/java/com/rabbitmq/client/amqp/impl/AmqpConnectionAffinityUnitTest.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -340,14 +340,14 @@ private static class TestQueueInfo implements Management.QueueInfo {
340340

341341
private final String name, leader;
342342
private final Management.QueueType type;
343-
private final List<String> replicas;
343+
private final List<String> members;
344344

345345
private TestQueueInfo(
346-
String name, Management.QueueType type, String leader, List<String> replicas) {
346+
String name, Management.QueueType type, String leader, List<String> members) {
347347
this.name = name;
348348
this.type = type;
349349
this.leader = leader;
350-
this.replicas = replicas;
350+
this.members = members;
351351
}
352352

353353
@Override
@@ -386,8 +386,14 @@ public String leader() {
386386
}
387387

388388
@Override
389+
@SuppressWarnings("removal")
389390
public List<String> replicas() {
390-
return this.replicas;
391+
return this.members();
392+
}
393+
394+
@Override
395+
public List<String> members() {
396+
return this.members;
391397
}
392398

393399
@Override
@@ -408,12 +414,12 @@ public boolean equals(Object o) {
408414
return Objects.equals(name, that.name)
409415
&& Objects.equals(leader, that.leader)
410416
&& type == that.type
411-
&& Objects.equals(replicas, that.replicas);
417+
&& Objects.equals(members, that.members);
412418
}
413419

414420
@Override
415421
public int hashCode() {
416-
return Objects.hash(name, leader, type, replicas);
422+
return Objects.hash(name, leader, type, members);
417423
}
418424

419425
@Override
@@ -428,7 +434,7 @@ public String toString() {
428434
+ leader
429435
+ '\''
430436
+ ", replicas="
431-
+ replicas
437+
+ members
432438
+ '}';
433439
}
434440
}

src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ ConnectionAssert isOnLeader(Management.QueueInfo info) {
392392
ConnectionAssert isOnFollower(Management.QueueInfo info) {
393393
Assert.notNull(info, "Queue info cannot be null");
394394
List<String> followers =
395-
info.replicas().stream()
395+
info.members().stream()
396396
.filter(n -> !n.equals(info.leader()))
397397
.collect(Collectors.toList());
398398
if (!followers.contains(actual.connectionNodename())) {

src/test/java/com/rabbitmq/client/amqp/impl/ClusterTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ void connectionsShouldBeMemberLocalReplicatedQueues(Management.QueueType type) {
8181
Management.QueueInfo info = connection.management().queueInfo(q);
8282
assertThat(publishConnection.connectionNodename()).isEqualTo(info.leader());
8383
assertThat(consumeConnection.connectionNodename())
84-
.isIn(info.replicas())
84+
.isIn(info.members())
8585
.isNotEqualTo(info.leader());
8686
assertThat(Cli.listConnections()).hasSize(3);
8787
} finally {
@@ -315,7 +315,7 @@ void consumeFromQuorumQueueWhenLeaderIsPaused() {
315315
consumeSync.reset();
316316

317317
List<String> initialFollowers =
318-
queueInfo.replicas().stream().filter(n -> !n.equals(initialLeader)).collect(toList());
318+
queueInfo.members().stream().filter(n -> !n.equals(initialLeader)).collect(toList());
319319
assertThat(initialFollowers).isNotEmpty();
320320

321321
Cli.pauseNode(initialLeader);
@@ -498,10 +498,10 @@ String deleteStreamLeader() {
498498
String deleteLeader(Consumer<String> deleteMemberOperation) {
499499
Management.QueueInfo info = queueInfo();
500500
String initialLeader = info.leader();
501-
int initialReplicaCount = info.replicas().size();
501+
int initialReplicaCount = info.members().size();
502502
deleteMemberOperation.accept(initialLeader);
503503
TestUtils.waitAtMost(() -> !initialLeader.equals(queueInfo().leader()));
504-
assertThat(queueInfo().replicas()).hasSize(initialReplicaCount - 1);
504+
assertThat(queueInfo().members()).hasSize(initialReplicaCount - 1);
505505
return initialLeader;
506506
}
507507

@@ -527,9 +527,9 @@ void addStreamMember(String newMember) {
527527

528528
void addMember(Runnable addMemberOperation) {
529529
Management.QueueInfo info = queueInfo();
530-
int initialReplicaCount = info.replicas().size();
530+
int initialReplicaCount = info.members().size();
531531
addMemberOperation.run();
532-
TestUtils.waitAtMost(() -> queueInfo().replicas().size() == initialReplicaCount + 1);
532+
TestUtils.waitAtMost(() -> queueInfo().members().size() == initialReplicaCount + 1);
533533
}
534534

535535
Management.QueueInfo queueInfo() {

src/test/java/com/rabbitmq/client/amqp/impl/RecoveryClusterTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -199,11 +199,11 @@ void clusterRestart() {
199199
queueConfigurations.forEach(
200200
c -> {
201201
if (c.type == Management.QueueType.QUORUM || c.type == Management.QueueType.STREAM) {
202-
assertThat(management.queueInfo(c.name).replicas())
202+
assertThat(management.queueInfo(c.name).members())
203203
.hasSameSizeAs(nodes)
204204
.containsExactlyInAnyOrderElementsOf(nodes);
205205
} else {
206-
assertThat(management.queueInfo(c.name).replicas())
206+
assertThat(management.queueInfo(c.name).members())
207207
.hasSize(1)
208208
.containsAnyElementsOf(nodes);
209209
}
@@ -240,7 +240,7 @@ void clusterRestart() {
240240
"Queue '%s': leader '%s', followers '%s'%n",
241241
q,
242242
queueInfo.leader(),
243-
queueInfo.replicas().stream()
243+
queueInfo.members().stream()
244244
.filter(n -> !n.equals(queueInfo.leader()))
245245
.collect(toList()));
246246
});
@@ -426,7 +426,7 @@ boolean isOnMember() {
426426
return this.connection
427427
.management()
428428
.queueInfo(this.queue)
429-
.replicas()
429+
.members()
430430
.contains(this.connection.connectionNodename());
431431
}
432432

0 commit comments

Comments
 (0)