1616
1717package com .rabbitmq .client ;
1818
19+ import static java .util .concurrent .TimeUnit .MILLISECONDS ;
20+ import static java .util .concurrent .TimeUnit .MINUTES ;
1921import static java .util .concurrent .TimeUnit .SECONDS ;
2022import static org .junit .jupiter .api .extension .ConditionEvaluationResult .disabled ;
2123import static org .junit .jupiter .api .extension .ConditionEvaluationResult .enabled ;
2830import io .netty .channel .EventLoopGroup ;
2931import io .netty .channel .MultiThreadIoEventLoopGroup ;
3032import io .netty .channel .nio .NioIoHandler ;
33+
34+ import java .io .PrintWriter ;
35+ import java .io .StringWriter ;
36+ import java .lang .management .LockInfo ;
37+ import java .lang .management .ManagementFactory ;
38+ import java .lang .management .MonitorInfo ;
39+ import java .lang .management .RuntimeMXBean ;
40+ import java .lang .management .ThreadInfo ;
3141import java .net .Socket ;
42+ import java .nio .charset .StandardCharsets ;
43+ import java .time .Duration ;
44+ import java .time .LocalDateTime ;
45+ import java .time .format .DateTimeFormatter ;
46+ import java .util .List ;
3247import java .util .concurrent .ExecutorService ;
3348import java .util .concurrent .Executors ;
49+ import java .util .concurrent .ScheduledExecutorService ;
50+ import java .util .concurrent .ScheduledFuture ;
3451import java .util .concurrent .ThreadFactory ;
3552import java .util .concurrent .atomic .AtomicLong ;
53+ import java .util .concurrent .atomic .AtomicReference ;
54+ import java .util .stream .Collectors ;
55+ import java .util .stream .Stream ;
3656
3757import org .junit .jupiter .api .extension .AfterAllCallback ;
3858import org .junit .jupiter .api .extension .AfterEachCallback ;
@@ -59,6 +79,8 @@ public class AmqpClientTestExtension
5979 private static final ExtensionContext .Namespace NAMESPACE =
6080 ExtensionContext .Namespace .create (AmqpClientTestExtension .class );
6181
82+ private static final AtomicReference <String > CURRENT_TEST = new AtomicReference <>();
83+
6284 private static ExtensionContext .Store store (ExtensionContext extensionContext ) {
6385 return extensionContext .getRoot ().getStore (NAMESPACE );
6486 }
@@ -108,6 +130,16 @@ public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext con
108130 @ Override
109131 public void beforeAll (ExtensionContext context ) {
110132 if (TestUtils .isNetty ()) {
133+ Duration timeout = Duration .ofMinutes (20 );
134+ ScheduledFuture <?> task = executor (context ).schedule (() -> {
135+ try {
136+ LOGGER .warn ("Test {} has been running for {}" , CURRENT_TEST .get (), timeout );
137+ logThreadDump ();
138+ } catch (Exception e ) {
139+ LOGGER .warn ("Error during test timeout task" , e );
140+ }
141+ }, timeout .toMillis (), MILLISECONDS );
142+ store (context ).put ("threadDumpTask" , task );
111143 ThreadFactory tf = new NamedThreadFactory (context .getTestClass ().get ().getSimpleName () + "-" );
112144 EventLoopGroup eventLoopGroup = new MultiThreadIoEventLoopGroup (tf , NioIoHandler .newFactory ());
113145 store (context )
@@ -118,11 +150,10 @@ public void beforeAll(ExtensionContext context) {
118150
119151 @ Override
120152 public void beforeEach (ExtensionContext context ) {
121- LOGGER .info (
122- "Starting test: {}.{} (IO layer: {})" ,
123- context .getTestClass ().get ().getSimpleName (),
124- context .getTestMethod ().get ().getName (),
125- TestUtils .IO_LAYER );
153+ String test = String .format ("%s.%s" , context .getTestClass ().get ().getSimpleName (),
154+ context .getTestMethod ().get ().getName ());
155+ CURRENT_TEST .set (test );
156+ LOGGER .info ("Starting test: {} (IO layer: {})" , test , TestUtils .IO_LAYER );
126157 }
127158
128159 @ Override
@@ -137,34 +168,44 @@ public void afterEach(ExtensionContext context) {
137168 public void afterAll (ExtensionContext context ) {
138169 if (TestUtils .isNetty ()) {
139170 TestUtils .resetEventLoopGroup ();
171+ ScheduledFuture <?> threadDumpTask = store (context ).get ("threadDumpTask" , ScheduledFuture .class );
172+ if (threadDumpTask != null ) {
173+ threadDumpTask .cancel (true );
174+ }
140175 EventLoopGroup eventLoopGroup = eventLoopGroup (context );
141- ExecutorServiceCloseableResourceWrapper wrapper =
142- context
143- .getRoot ()
144- .getStore (ExtensionContext .Namespace .GLOBAL )
145- .getOrComputeIfAbsent (ExecutorServiceCloseableResourceWrapper .class );
146-
147- wrapper
148- .executorService
149- .submit (
150- () -> {
151- try {
152- eventLoopGroup .shutdownGracefully (0 , 0 , SECONDS ).get (10 , SECONDS );
153- } catch (InterruptedException e ) {
154- Thread .currentThread ().interrupt ();
155- } catch (Exception e ) {
156- LOGGER .warn ("Error while asynchronously closing Netty event loop group" , e );
157- }
158- });
176+ ExecutorService executor = executor (context );
177+
178+ executor .submit (() -> {
179+ try {
180+ eventLoopGroup .shutdownGracefully (0 , 0 , SECONDS ).get (10 , SECONDS );
181+ } catch (InterruptedException e ) {
182+ Thread .currentThread ().interrupt ();
183+ } catch (Exception e ) {
184+ LOGGER .warn ("Error while asynchronously closing Netty event loop group" , e );
185+ }
186+ });
159187 }
160188 }
161189
190+ private static ScheduledExecutorService executor (ExtensionContext context ) {
191+ ExecutorServiceCloseableResourceWrapper wrapper =
192+ context
193+ .getRoot ()
194+ .getStore (ExtensionContext .Namespace .GLOBAL )
195+ .getOrComputeIfAbsent (ExecutorServiceCloseableResourceWrapper .class );
196+ return wrapper .executor ();
197+ }
198+
162199 private static class ExecutorServiceCloseableResourceWrapper implements AutoCloseable {
163200
164- private final ExecutorService executorService ;
201+ private final ScheduledExecutorService executorService ;
165202
166203 private ExecutorServiceCloseableResourceWrapper () {
167- this .executorService = Executors .newCachedThreadPool ();
204+ this .executorService = Executors .newScheduledThreadPool (2 );
205+ }
206+
207+ private ScheduledExecutorService executor () {
208+ return this .executorService ;
168209 }
169210
170211 @ Override
@@ -197,4 +238,109 @@ public Thread newThread(Runnable r) {
197238 return thread ;
198239 }
199240 }
241+
242+ private static void logThreadDump () {
243+ PlainTextThreadDumpFormatter formatter = new PlainTextThreadDumpFormatter ();
244+ ThreadInfo [] threadInfos =
245+ ManagementFactory .getThreadMXBean ().dumpAllThreads (true , true );
246+ String threadDump = formatter .format (threadInfos );
247+ LOGGER .warn (threadDump );
248+ }
249+
250+ // from Spring Boot's PlainTextThreadDumpFormatter
251+ private static class PlainTextThreadDumpFormatter {
252+
253+ String format (ThreadInfo [] threads ) {
254+ StringWriter dump = new StringWriter ();
255+ PrintWriter writer = new PrintWriter (dump );
256+ writePreamble (writer );
257+ for (ThreadInfo info : threads ) {
258+ writeThread (writer , info );
259+ }
260+ return dump .toString ();
261+ }
262+
263+ private void writePreamble (PrintWriter writer ) {
264+ DateTimeFormatter dateFormat = DateTimeFormatter .ofPattern ("yyyy-MM-dd HH:mm:ss" );
265+ writer .println (dateFormat .format (LocalDateTime .now ()));
266+ RuntimeMXBean runtime = ManagementFactory .getRuntimeMXBean ();
267+ writer .printf (
268+ "Full thread dump %s (%s %s):%n" ,
269+ runtime .getVmName (), runtime .getVmVersion (), System .getProperty ("java.vm.info" ));
270+ writer .println ();
271+ }
272+
273+ private void writeThread (PrintWriter writer , ThreadInfo info ) {
274+ writer .printf ("\" %s\" - Thread t@%d%n" , info .getThreadName (), info .getThreadId ());
275+ writer .printf (" %s: %s%n" , Thread .State .class .getCanonicalName (), info .getThreadState ());
276+ writeStackTrace (writer , info , info .getLockedMonitors ());
277+ writer .println ();
278+ writeLockedOwnableSynchronizers (writer , info );
279+ writer .println ();
280+ }
281+
282+ private void writeStackTrace (
283+ PrintWriter writer , ThreadInfo info , MonitorInfo [] lockedMonitors ) {
284+ int depth = 0 ;
285+ for (StackTraceElement element : info .getStackTrace ()) {
286+ writeStackTraceElement (
287+ writer , element , info , lockedMonitorsForDepth (lockedMonitors , depth ), depth == 0 );
288+ depth ++;
289+ }
290+ }
291+
292+ private List <MonitorInfo > lockedMonitorsForDepth (MonitorInfo [] lockedMonitors , int depth ) {
293+ return Stream .of (lockedMonitors )
294+ .filter ((lockedMonitor ) -> lockedMonitor .getLockedStackDepth () == depth )
295+ .collect (Collectors .toList ());
296+ }
297+
298+ private void writeStackTraceElement (
299+ PrintWriter writer ,
300+ StackTraceElement element ,
301+ ThreadInfo info ,
302+ List <MonitorInfo > lockedMonitors ,
303+ boolean firstElement ) {
304+ writer .printf ("\t at %s%n" , element .toString ());
305+ LockInfo lockInfo = info .getLockInfo ();
306+ if (firstElement && lockInfo != null ) {
307+ if (element .getClassName ().equals (Object .class .getName ())
308+ && element .getMethodName ().equals ("wait" )) {
309+ writer .printf ("\t - waiting on %s%n" , format (lockInfo ));
310+ } else {
311+ String lockOwner = info .getLockOwnerName ();
312+ if (lockOwner != null ) {
313+ writer .printf (
314+ "\t - waiting to lock %s owned by \" %s\" t@%d%n" ,
315+ format (lockInfo ), lockOwner , info .getLockOwnerId ());
316+ } else {
317+ writer .printf ("\t - parking to wait for %s%n" , format (lockInfo ));
318+ }
319+ }
320+ }
321+ writeMonitors (writer , lockedMonitors );
322+ }
323+
324+ private String format (LockInfo lockInfo ) {
325+ return String .format ("<%x> (a %s)" , lockInfo .getIdentityHashCode (), lockInfo .getClassName ());
326+ }
327+
328+ private void writeMonitors (PrintWriter writer , List <MonitorInfo > lockedMonitorsAtCurrentDepth ) {
329+ for (MonitorInfo lockedMonitor : lockedMonitorsAtCurrentDepth ) {
330+ writer .printf ("\t - locked %s%n" , format (lockedMonitor ));
331+ }
332+ }
333+
334+ private void writeLockedOwnableSynchronizers (PrintWriter writer , ThreadInfo info ) {
335+ writer .println (" Locked ownable synchronizers:" );
336+ LockInfo [] lockedSynchronizers = info .getLockedSynchronizers ();
337+ if (lockedSynchronizers == null || lockedSynchronizers .length == 0 ) {
338+ writer .println ("\t - None" );
339+ } else {
340+ for (LockInfo lockedSynchronizer : lockedSynchronizers ) {
341+ writer .printf ("\t - Locked %s%n" , format (lockedSynchronizer ));
342+ }
343+ }
344+ }
345+ }
200346}
0 commit comments