Skip to content

Commit b4948f0

Browse files
committed
Improve logging in cluster recovery test
1 parent 0f1f5e8 commit b4948f0

File tree

3 files changed

+55
-22
lines changed

3 files changed

+55
-22
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,11 @@ private void handleException(Exception ex, String operation) {
533533
}
534534
}
535535

536+
@Override
537+
public String toString() {
538+
return "AmqpConsumer{" + "id=" + id + ", queue='" + queue + '\'' + '}';
539+
}
540+
536541
private static boolean maybeCloseConsumerOnException(AmqpConsumer consumer, Exception ex) {
537542
return ExceptionUtils.maybeCloseConsumerOnException(consumer::close, ex);
538543
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,4 +252,9 @@ Long id() {
252252
String address() {
253253
return this.address;
254254
}
255+
256+
@Override
257+
public String toString() {
258+
return "AmqpPublisher{" + "id=" + id + ", address='" + address + '\'' + '}';
259+
}
255260
}

src/test/java/com/rabbitmq/client/amqp/impl/RecoveryClusterTest.java

Lines changed: 45 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,9 @@ private QueueConfiguration(
106106

107107
@Test
108108
void clusterRestart() {
109+
LOGGER.info("Cluster restart test...");
110+
LOGGER.info("Available processors: {}", Runtime.getRuntime().availableProcessors());
111+
LOGGER.info("Java version: {}", System.getProperty("java.version"));
109112
int queueCount = 10;
110113
List<Management.QueueType> queueTypes =
111114
List.of(
@@ -234,41 +237,61 @@ void clusterRestart() {
234237

235238
assertThat(publisherStates).allMatch(s -> s.state() == OPEN);
236239
assertThat(consumerStates).allMatch(s -> s.state() == OPEN);
237-
238-
System.out.println("Queues:");
239-
queueNames.forEach(
240-
q -> {
241-
Management.QueueInfo queueInfo = management.queueInfo(q);
242-
System.out.printf(
243-
"Queue '%s': leader '%s', followers '%s'%n",
244-
q,
245-
queueInfo.leader(),
246-
queueInfo.members().stream()
247-
.filter(n -> !n.equals(queueInfo.leader()))
248-
.collect(toList()));
249-
});
250-
251-
System.out.println("Publishers:");
252-
publisherStates.forEach(
253-
p -> System.out.printf(" queue %s, is on leader? %s%n", p.queue, p.isOnLeader()));
254-
255-
System.out.println("Consumers:");
256-
consumerStates.forEach(
257-
p -> System.out.printf(" queue %s, is on member? %s%n", p.queue, p.isOnMember()));
258240
} catch (Throwable e) {
259241
LOGGER.info("Test failed with {}", e.getMessage(), e);
260242
BiConsumer<AmqpConnection, ResourceBase> log =
261243
(c, r) -> {
262244
LOGGER.info("Connection {}: {}", c.name(), c.state());
263245
if (r != null) {
264-
LOGGER.info("Resource: {}", r.state());
246+
LOGGER.info("{}: {}", r, r.state());
265247
}
266248
};
267249
log.accept(this.connection, null);
268250
publisherStates.forEach(s -> log.accept(s.connection, s.publisher));
269251
consumerStates.forEach(s -> log.accept(s.connection, s.consumer));
252+
270253
throw e;
271254
} finally {
255+
System.out.println("Queues:");
256+
queueNames.forEach(
257+
q -> {
258+
try {
259+
Management.QueueInfo queueInfo = management.queueInfo(q);
260+
System.out.printf(
261+
"Queue '%s': leader '%s', followers '%s'%n",
262+
q,
263+
queueInfo.leader(),
264+
queueInfo.members().stream()
265+
.filter(n -> !n.equals(queueInfo.leader()))
266+
.collect(toList()));
267+
} catch (Exception ex) {
268+
LOGGER.info(
269+
"Error while retrieving queue information for '{}': {}", q, ex.getMessage());
270+
}
271+
});
272+
273+
System.out.println("Publishers:");
274+
publisherStates.forEach(
275+
p -> {
276+
try {
277+
System.out.printf(" queue %s, is on leader? %s%n", p.queue, p.isOnLeader());
278+
} catch (Exception ex) {
279+
LOGGER.info(
280+
"Error while checking publisher '{}' is on leader node: {}", p, ex.getMessage());
281+
}
282+
});
283+
284+
System.out.println("Consumers:");
285+
consumerStates.forEach(
286+
p -> {
287+
try {
288+
System.out.printf(" queue %s, is on member? %s%n", p.queue, p.isOnMember());
289+
} catch (Exception ex) {
290+
LOGGER.info(
291+
"Error while checking consumer '{}' is on a member node: {}", p, ex.getMessage());
292+
}
293+
});
294+
272295
publisherStates.forEach(PublisherState::close);
273296
consumerStates.forEach(ConsumerState::close);
274297
queueConfigurations.stream()

0 commit comments

Comments
 (0)