3434import io .netty .channel .EventLoopGroup ;
3535import java .nio .charset .StandardCharsets ;
3636import java .time .Duration ;
37+ import java .time .Instant ;
38+ import java .time .ZoneId ;
39+ import java .time .format .DateTimeFormatter ;
3740import java .util .Collections ;
3841import java .util .LinkedHashMap ;
3942import java .util .List ;
@@ -251,6 +254,11 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru
251254 LOGGER .info ("Environment information:" );
252255 System .out .println (TestUtils .jsonPrettyPrint (environment .toString ()));
253256
257+ LOGGER .info ("Producer information:" );
258+ producers .forEach (p -> {
259+ LOGGER .info ("Producer to '{}' (last exception: '{}')" , p .stream (), p .lastException );
260+ });
261+
254262 LOGGER .info ("Closing producers" );
255263 producers .forEach (
256264 p -> {
@@ -293,6 +301,8 @@ private static class ProducerState implements AutoCloseable {
293301 final AtomicBoolean stopped = new AtomicBoolean (false );
294302 final AtomicInteger acceptedCount = new AtomicInteger ();
295303 final AtomicReference <Runnable > postConfirmed = new AtomicReference <>(() -> {});
304+ final AtomicReference <Throwable > lastException = new AtomicReference <>();
305+ final AtomicReference <Instant > lastExceptionInstant = new AtomicReference <>();
296306
297307 private ProducerState (String stream , boolean dynamicBatch , Environment environment ) {
298308 this .stream = stream ;
@@ -317,8 +327,9 @@ void start() {
317327 this .limiter .acquire (1 );
318328 this .producer .send (
319329 producer .messageBuilder ().addData (BODY ).build (), confirmationHandler );
320- } catch (Exception e ) {
321-
330+ } catch (Throwable e ) {
331+ this .lastException .set (e );
332+ this .lastExceptionInstant .set (Instant .now ());
322333 }
323334 }
324335 });
@@ -342,6 +353,14 @@ String stream() {
342353 return this .stream ;
343354 }
344355
356+ String lastException () {
357+ if (this .lastException .get () == null ) {
358+ return "no exception" ;
359+ } else {
360+ return this .lastException .get ().getMessage () + " at " + DateTimeFormatter .ISO_LOCAL_TIME .format (lastExceptionInstant .get ());
361+ }
362+ }
363+
345364 @ Override
346365 public void close () {
347366 stopped .set (true );
0 commit comments