2828
2929import java .io .IOException ;
3030import java .io .OutputStreamWriter ;
31+ import java .io .StringWriter ;
3132import java .io .Writer ;
3233import java .lang .management .ManagementFactory ;
3334import java .lang .management .ThreadInfo ;
@@ -105,6 +106,33 @@ public static void logLocalHotThreads(Logger logger, Level level, String prefix,
105106 }
106107 }
107108
109+ /**
110+ * Capture and log the current threads on the local node. Unlike hot threads this does not sample and captures current state only.
111+ * Useful for capturing stack traces for unexpectedly-slow operations in production. The resulting message might be large, so it is
112+ * split per thread and logged as multiple entries.
113+ *
114+ * @param logger The logger to use for the logging
115+ * @param level The log level to use for the logging.
116+ * @param prefix The prefix to emit on each chunk of the logging.
117+ */
118+ public static void logLocalCurrentThreads (Logger logger , Level level , String prefix ) {
119+ if (logger .isEnabled (level ) == false ) {
120+ return ;
121+ }
122+
123+ try (var writer = new StringWriter ()) {
124+ new HotThreads ().busiestThreads (500 ).threadElementsSnapshotCount (1 ).detect (writer , () -> {
125+ logger .log (level , "{}: {}" , prefix , writer .toString ());
126+ writer .getBuffer ().setLength (0 );
127+ });
128+ } catch (Exception e ) {
129+ logger .error (
130+ () -> org .elasticsearch .common .Strings .format ("failed to write local current threads with prefix [%s]" , prefix ),
131+ e
132+ );
133+ }
134+ }
135+
108136 public enum ReportType {
109137
110138 CPU ("cpu" ),
@@ -192,11 +220,12 @@ public HotThreads sortOrder(SortOrder order) {
192220 }
193221
194222 public void detect (Writer writer ) throws Exception {
223+ detect (writer , () -> {});
224+ }
225+
226+ public void detect (Writer writer , Runnable onNextThread ) throws Exception {
195227 synchronized (mutex ) {
196- innerDetect (ManagementFactory .getThreadMXBean (), SunThreadInfo .INSTANCE , Thread .currentThread ().getId (), (interval ) -> {
197- Thread .sleep (interval );
198- return null ;
199- }, writer );
228+ innerDetect (ManagementFactory .getThreadMXBean (), SunThreadInfo .INSTANCE , Thread .currentThread ().getId (), writer , onNextThread );
200229 }
201230 }
202231
@@ -245,13 +274,15 @@ Map<Long, ThreadTimeAccumulator> getAllValidThreadInfos(ThreadMXBean threadBean,
245274
246275 ThreadInfo [][] captureThreadStacks (ThreadMXBean threadBean , long [] threadIds ) throws InterruptedException {
247276 ThreadInfo [][] result = new ThreadInfo [threadElementsSnapshotCount ][];
248- for (int j = 0 ; j < threadElementsSnapshotCount ; j ++) {
249- // NOTE, javadoc of getThreadInfo says: If a thread of the given ID is not alive or does not exist,
250- // null will be set in the corresponding element in the returned array. A thread is alive if it has
251- // been started and has not yet died.
277+
278+ // NOTE, javadoc of getThreadInfo says: If a thread of the given ID is not alive or does not exist,
279+ // null will be set in the corresponding element in the returned array. A thread is alive if it has
280+ // been started and has not yet died.
281+ for (int j = 0 ; j < threadElementsSnapshotCount - 1 ; j ++) {
252282 result [j ] = threadBean .getThreadInfo (threadIds , Integer .MAX_VALUE );
253283 Thread .sleep (threadElementsSnapshotDelay .millis ());
254284 }
285+ result [threadElementsSnapshotCount - 1 ] = threadBean .getThreadInfo (threadIds , Integer .MAX_VALUE );
255286
256287 return result ;
257288 }
@@ -267,13 +298,8 @@ private double getTimeSharePercentage(long time) {
267298 return (((double ) time ) / interval .nanos ()) * 100 ;
268299 }
269300
270- void innerDetect (
271- ThreadMXBean threadBean ,
272- SunThreadInfo sunThreadInfo ,
273- long currentThreadId ,
274- SleepFunction <Long , Void > threadSleep ,
275- Writer writer
276- ) throws Exception {
301+ void innerDetect (ThreadMXBean threadBean , SunThreadInfo sunThreadInfo , long currentThreadId , Writer writer , Runnable onNextThread )
302+ throws Exception {
277303 if (threadBean .isThreadCpuTimeSupported () == false ) {
278304 throw new ElasticsearchException ("thread CPU time is not supported on this JDK" );
279305 }
@@ -297,10 +323,11 @@ void innerDetect(
297323 .append (", ignoreIdleThreads=" )
298324 .append (Boolean .toString (ignoreIdleThreads ))
299325 .append (":\n " );
326+ onNextThread .run ();
300327
301328 // Capture before and after thread state with timings
302329 Map <Long , ThreadTimeAccumulator > previousThreadInfos = getAllValidThreadInfos (threadBean , sunThreadInfo , currentThreadId );
303- threadSleep . apply (interval .millis ());
330+ Thread . sleep (interval .millis ());
304331 Map <Long , ThreadTimeAccumulator > latestThreadInfos = getAllValidThreadInfos (threadBean , sunThreadInfo , currentThreadId );
305332
306333 latestThreadInfos .forEach ((threadId , accumulator ) -> accumulator .subtractPrevious (previousThreadInfos .get (threadId )));
@@ -430,6 +457,7 @@ void innerDetect(
430457 }
431458 }
432459 }
460+ onNextThread .run ();
433461 }
434462 }
435463
0 commit comments