3131import com .rabbitmq .client .amqp .*;
3232import java .nio .charset .StandardCharsets ;
3333import java .time .Duration ;
34+ import java .time .Instant ;
35+ import java .time .format .DateTimeFormatter ;
3436import java .util .Collections ;
3537import java .util .List ;
3638import java .util .concurrent .ThreadFactory ;
@@ -221,7 +223,7 @@ void clusterRestart() {
221223 syncs .forEach (
222224 s -> {
223225 LOGGER .info ("Publishing messages ('{}')" , s );
224- assertThat (s ).completes ();
226+ assertThat (s ).completes (TIMEOUT );
225227 LOGGER .info ("Messages published and settled ('{}')" , s );
226228 });
227229 LOGGER .info ("Checked publishers have recovered." );
@@ -230,7 +232,7 @@ void clusterRestart() {
230232 syncs .forEach (
231233 s -> {
232234 LOGGER .info ("Waiting for new messages ('{}')" , s );
233- assertThat (s ).completes (Duration . ofSeconds ( 20 ) );
235+ assertThat (s ).completes (TIMEOUT );
234236 LOGGER .info ("Expected messages received ('{}')" , s );
235237 });
236238 LOGGER .info ("Checked consumers have recovered." );
@@ -276,7 +278,9 @@ void clusterRestart() {
276278 publisherStates .forEach (
277279 p -> {
278280 try {
279- System .out .printf (" queue %s, is on leader? %s%n" , p .queue , p .isOnLeader ());
281+ System .out .printf (
282+ " queue %s, is on leader? %s, last exception '%s', last failed status '%s'%n" ,
283+ p .queue , p .isOnLeader (), p .lastException (), p .lastFailedStatus ());
280284 } catch (Exception ex ) {
281285 LOGGER .info (
282286 "Error while checking publisher '{}' is on leader node: {}" , p , ex .getMessage ());
@@ -346,6 +350,10 @@ private static class PublisherState implements AutoCloseable {
346350 volatile Thread task ;
347351 final RateLimiter limiter = RateLimiter .create (10 );
348352 final AtomicReference <Runnable > postAccepted = new AtomicReference <>(() -> {});
353+ final AtomicReference <Throwable > lastException = new AtomicReference <>();
354+ final AtomicReference <Instant > lastExceptionInstant = new AtomicReference <>();
355+ final AtomicReference <Publisher .Status > lastFailedStatus = new AtomicReference <>();
356+ final AtomicReference <Instant > lastFailedStatusInstant = new AtomicReference <>();
349357
350358 private PublisherState (String queue , boolean exclusive , AmqpConnection connection ) {
351359 this .queue = queue ;
@@ -362,6 +370,9 @@ void start() {
362370 if (ctx .status () == Publisher .Status .ACCEPTED ) {
363371 acceptedCount .incrementAndGet ();
364372 postAccepted .get ().run ();
373+ } else {
374+ lastFailedStatus .set (ctx .status ());
375+ lastFailedStatusInstant .set (Instant .now ());
365376 }
366377 };
367378 this .task =
@@ -373,7 +384,8 @@ void start() {
373384 this .limiter .acquire (1 );
374385 this .publisher .publish (publisher .message (BODY ), callback );
375386 } catch (Exception e ) {
376-
387+ this .lastException .set (e );
388+ this .lastExceptionInstant .set (Instant .now ());
377389 }
378390 }
379391 }
@@ -401,6 +413,26 @@ boolean isOnLeader() {
401413 .equals (this .connection .connectionNodename ());
402414 }
403415
416+ String lastException () {
417+ if (this .lastException .get () == null ) {
418+ return "no exception" ;
419+ } else {
420+ return this .lastException .get ().getMessage ()
421+ + " at "
422+ + DateTimeFormatter .ISO_INSTANT .format (lastExceptionInstant .get ());
423+ }
424+ }
425+
426+ String lastFailedStatus () {
427+ if (this .lastFailedStatus .get () == null ) {
428+ return "no failed status" ;
429+ } else {
430+ return this .lastFailedStatus .get ().name ()
431+ + " at "
432+ + DateTimeFormatter .ISO_INSTANT .format (lastFailedStatusInstant .get ());
433+ }
434+ }
435+
404436 @ Override
405437 public void close () {
406438 this .task .interrupt ();
0 commit comments