|
19 | 19 |
|
20 | 20 | import static com.rabbitmq.client.amqp.ConnectionSettings.Affinity.Operation.CONSUME; |
21 | 21 | import static com.rabbitmq.client.amqp.ConnectionSettings.Affinity.Operation.PUBLISH; |
| 22 | +import static com.rabbitmq.client.amqp.Management.QueueType.QUORUM; |
22 | 23 | import static com.rabbitmq.client.amqp.impl.Assertions.assertThat; |
23 | 24 | import static com.rabbitmq.client.amqp.impl.ExceptionUtils.noRunningStreamMemberOnNode; |
24 | 25 | import static com.rabbitmq.client.amqp.impl.TestUtils.name; |
@@ -111,7 +112,7 @@ void connectionsShouldBeMemberLocalReplicatedQueues(Management.QueueType type) { |
111 | 112 | @Test |
112 | 113 | void connectionShouldRecoverToNewQuorumQueueLeaderAfterItHasMoved() { |
113 | 114 | try { |
114 | | - management.queue(q).type(Management.QueueType.QUORUM).declare(); |
| 115 | + management.queue(q).type(QUORUM).declare(); |
115 | 116 | Management.QueueInfo info = queueInfo(); |
116 | 117 | String initialLeader = info.leader(); |
117 | 118 |
|
@@ -139,7 +140,7 @@ void connectionShouldRecoverToNewQuorumQueueLeaderAfterItHasMoved() { |
139 | 140 | @Test |
140 | 141 | void publishToMovingQq() { |
141 | 142 | try { |
142 | | - management.queue(q).type(Management.QueueType.QUORUM).declare(); |
| 143 | + management.queue(q).type(QUORUM).declare(); |
143 | 144 |
|
144 | 145 | AmqpConnection publishConnection = connection(b -> b.affinity().queue(q).operation(PUBLISH)); |
145 | 146 | assertThat(publishConnection).hasNodename(queueInfo().leader()); |
@@ -183,7 +184,7 @@ void publishToMovingQq() { |
183 | 184 | @Test |
184 | 185 | void consumeFromMovingQq() { |
185 | 186 | try { |
186 | | - management.queue(q).type(Management.QueueType.QUORUM).declare(); |
| 187 | + management.queue(q).type(QUORUM).declare(); |
187 | 188 |
|
188 | 189 | AmqpConnection consumeConnection = connection(b -> b.affinity().queue(q).operation(CONSUME)); |
189 | 190 | assertThat(consumeConnection).isOnFollower(queueInfo()); |
@@ -240,7 +241,7 @@ void consumeFromMovingQq() { |
240 | 241 | @Test |
241 | 242 | void publishConsumeQuorumQueueWhenLeaderChanges() { |
242 | 243 | try { |
243 | | - management.queue(q).type(Management.QueueType.QUORUM).declare(); |
| 244 | + management.queue(q).type(QUORUM).declare(); |
244 | 245 |
|
245 | 246 | AmqpConnection consumeConnection = connection(b -> b.affinity().queue(q).operation(CONSUME)); |
246 | 247 | assertThat(consumeConnection).isOnFollower(queueInfo()); |
@@ -298,8 +299,8 @@ void publishConsumeQuorumQueueWhenLeaderChanges() { |
298 | 299 | } |
299 | 300 |
|
300 | 301 | @Test |
301 | | - void consumeFromQuorumQueueWhenLeaderIsPaused() { |
302 | | - management.queue(q).type(Management.QueueType.QUORUM).declare(); |
| 302 | + void consumeFromQuorumQueueWhenLeaderIsPaused() throws InterruptedException { |
| 303 | + management.queue(q).type(QUORUM).declare(); |
303 | 304 | Management.QueueInfo queueInfo = queueInfo(); |
304 | 305 | String initialLeader = queueInfo.leader(); |
305 | 306 | boolean nodePaused = false; |
@@ -350,7 +351,14 @@ void consumeFromQuorumQueueWhenLeaderIsPaused() { |
350 | 351 | assertThat(messageIds).containsExactlyInAnyOrder(1L, 2L); |
351 | 352 | consumeSync.reset(); |
352 | 353 |
|
353 | | - waitAtMost(() -> initialFollowers.contains(mgmt.queueInfo(q).leader())); |
| 354 | + waitAtMost( |
| 355 | + () -> initialFollowers.contains(mgmt.queueInfo(q).leader()), |
| 356 | + () -> |
| 357 | + "Current leader is not in initial followers, initial followers " |
| 358 | + + initialFollowers |
| 359 | + + ", " |
| 360 | + + "queue info " |
| 361 | + + mgmt.queueInfo(q)); |
354 | 362 | assertThat(initialFollowers).contains(mgmt.queueInfo(q).leader()); |
355 | 363 |
|
356 | 364 | Cli.unpauseNode(initialLeader); |
|
0 commit comments