Skip to content

Commit e578533

Browse files
authored
Identify system threads using a Thread subclass (#113562) (#115618)
(cherry picked from commit bbd887a)
1 parent 353bf94 commit e578533

File tree

6 files changed

+98
-17
lines changed

6 files changed

+98
-17
lines changed

server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -326,39 +326,62 @@ public static String executorName(Thread thread) {
326326
}
327327

328328
public static ThreadFactory daemonThreadFactory(Settings settings, String namePrefix) {
329-
return daemonThreadFactory(threadName(settings, namePrefix));
329+
return createDaemonThreadFactory(threadName(settings, namePrefix), false);
330330
}
331331

332332
public static ThreadFactory daemonThreadFactory(String nodeName, String namePrefix) {
333+
return daemonThreadFactory(nodeName, namePrefix, false);
334+
}
335+
336+
public static ThreadFactory daemonThreadFactory(String nodeName, String namePrefix, boolean isSystemThread) {
333337
assert nodeName != null && false == nodeName.isEmpty();
334-
return daemonThreadFactory(threadName(nodeName, namePrefix));
338+
return createDaemonThreadFactory(threadName(nodeName, namePrefix), isSystemThread);
335339
}
336340

337-
public static ThreadFactory daemonThreadFactory(String namePrefix) {
338-
return new EsThreadFactory(namePrefix);
341+
public static ThreadFactory daemonThreadFactory(String name) {
342+
assert name != null && name.isEmpty() == false;
343+
return createDaemonThreadFactory(name, false);
344+
}
345+
346+
private static ThreadFactory createDaemonThreadFactory(String namePrefix, boolean isSystemThread) {
347+
return new EsThreadFactory(namePrefix, isSystemThread);
339348
}
340349

341350
static class EsThreadFactory implements ThreadFactory {
342351

343352
final ThreadGroup group;
344353
final AtomicInteger threadNumber = new AtomicInteger(1);
345354
final String namePrefix;
355+
final boolean isSystem;
346356

347-
EsThreadFactory(String namePrefix) {
357+
EsThreadFactory(String namePrefix, boolean isSystem) {
348358
this.namePrefix = namePrefix;
349359
SecurityManager s = System.getSecurityManager();
350360
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
361+
this.isSystem = isSystem;
351362
}
352363

353364
@Override
354365
public Thread newThread(Runnable r) {
355366
return AccessController.doPrivileged((PrivilegedAction<Thread>) () -> {
356-
Thread t = new Thread(group, r, namePrefix + "[T#" + threadNumber.getAndIncrement() + "]", 0);
367+
Thread t = new EsThread(group, r, namePrefix + "[T#" + threadNumber.getAndIncrement() + "]", 0, isSystem);
357368
t.setDaemon(true);
358369
return t;
359370
});
360371
}
372+
}
361373

374+
public static class EsThread extends Thread {
375+
private final boolean isSystem;
376+
377+
EsThread(ThreadGroup group, Runnable target, String name, long stackSize, boolean isSystem) {
378+
super(group, target, name, stackSize);
379+
this.isSystem = isSystem;
380+
}
381+
382+
public boolean isSystem() {
383+
return isSystem;
384+
}
362385
}
363386

364387
/**

server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,8 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
170170
ThreadPool.Names.SYSTEM_READ,
171171
halfProcMaxAt5,
172172
2000,
173-
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
173+
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK,
174+
true
174175
)
175176
);
176177
result.put(
@@ -180,7 +181,8 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
180181
ThreadPool.Names.SYSTEM_WRITE,
181182
halfProcMaxAt5,
182183
1000,
183-
new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA)
184+
new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA),
185+
true
184186
)
185187
);
186188
result.put(
@@ -190,7 +192,8 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
190192
ThreadPool.Names.SYSTEM_CRITICAL_READ,
191193
halfProcMaxAt5,
192194
2000,
193-
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
195+
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK,
196+
true
194197
)
195198
);
196199
result.put(
@@ -200,7 +203,8 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
200203
ThreadPool.Names.SYSTEM_CRITICAL_WRITE,
201204
halfProcMaxAt5,
202205
1500,
203-
new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA)
206+
new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA),
207+
true
204208
)
205209
);
206210
return unmodifiableMap(result);

server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@
2424
public abstract class ExecutorBuilder<U extends ExecutorBuilder.ExecutorSettings> {
2525

2626
private final String name;
27+
private final boolean isSystemThread;
2728

28-
public ExecutorBuilder(String name) {
29+
public ExecutorBuilder(String name, boolean isSystemThread) {
2930
this.name = name;
31+
this.isSystemThread = isSystemThread;
3032
}
3133

3234
protected String name() {
@@ -90,4 +92,7 @@ abstract static class ExecutorSettings {
9092

9193
}
9294

95+
public boolean isSystemThread() {
96+
return isSystemThread;
97+
}
9398
}

server/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilder.java

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,28 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
5151
final int queueSize,
5252
final TaskTrackingConfig taskTrackingConfig
5353
) {
54-
this(settings, name, size, queueSize, "thread_pool." + name, taskTrackingConfig);
54+
this(settings, name, size, queueSize, "thread_pool." + name, taskTrackingConfig, false);
55+
}
56+
57+
/**
58+
* Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name.
59+
*
60+
* @param settings the node-level settings
61+
* @param name the name of the executor
62+
* @param size the fixed number of threads
63+
* @param queueSize the size of the backing queue, -1 for unbounded
64+
* @param taskTrackingConfig whether to track statics about task execution time
65+
* @param isSystemThread whether the threads are system threads
66+
*/
67+
FixedExecutorBuilder(
68+
final Settings settings,
69+
final String name,
70+
final int size,
71+
final int queueSize,
72+
final TaskTrackingConfig taskTrackingConfig,
73+
boolean isSystemThread
74+
) {
75+
this(settings, name, size, queueSize, "thread_pool." + name, taskTrackingConfig, isSystemThread);
5576
}
5677

5778
/**
@@ -72,7 +93,29 @@ public FixedExecutorBuilder(
7293
final String prefix,
7394
final TaskTrackingConfig taskTrackingConfig
7495
) {
75-
super(name);
96+
this(settings, name, size, queueSize, prefix, taskTrackingConfig, false);
97+
}
98+
99+
/**
100+
* Construct a fixed executor builder.
101+
*
102+
* @param settings the node-level settings
103+
* @param name the name of the executor
104+
* @param size the fixed number of threads
105+
* @param queueSize the size of the backing queue, -1 for unbounded
106+
* @param prefix the prefix for the settings keys
107+
* @param taskTrackingConfig whether to track statics about task execution time
108+
*/
109+
public FixedExecutorBuilder(
110+
final Settings settings,
111+
final String name,
112+
final int size,
113+
final int queueSize,
114+
final String prefix,
115+
final TaskTrackingConfig taskTrackingConfig,
116+
final boolean isSystemThread
117+
) {
118+
super(name, isSystemThread);
76119
final String sizeKey = settingsKey(prefix, "size");
77120
this.sizeSetting = new Setting<>(
78121
sizeKey,
@@ -102,7 +145,7 @@ FixedExecutorSettings getSettings(Settings settings) {
102145
ThreadPool.ExecutorHolder build(final FixedExecutorSettings settings, final ThreadContext threadContext) {
103146
int size = settings.size;
104147
int queueSize = settings.queueSize;
105-
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
148+
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings.nodeName, name(), isSystemThread());
106149
final ExecutorService executor = EsExecutors.newFixed(
107150
settings.nodeName + "/" + name(),
108151
size,

server/src/main/java/org/elasticsearch/threadpool/ScalingExecutorBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public ScalingExecutorBuilder(
104104
final String prefix,
105105
final EsExecutors.TaskTrackingConfig trackingConfig
106106
) {
107-
super(name);
107+
super(name, false);
108108
this.coreSetting = Setting.intSetting(settingsKey(prefix, "core"), core, Setting.Property.NodeScope);
109109
this.maxSetting = Setting.intSetting(settingsKey(prefix, "max"), max, Setting.Property.NodeScope);
110110
this.keepAliveSetting = Setting.timeSetting(settingsKey(prefix, "keep_alive"), keepAlive, Setting.Property.NodeScope);
@@ -131,7 +131,7 @@ ThreadPool.ExecutorHolder build(final ScalingExecutorSettings settings, final Th
131131
int core = settings.core;
132132
int max = settings.max;
133133
final ThreadPool.Info info = new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.SCALING, core, max, keepAlive, null);
134-
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
134+
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings.nodeName, name());
135135
ExecutorService executor;
136136
executor = EsExecutors.newScaling(
137137
settings.nodeName + "/" + name(),

server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -635,15 +635,19 @@ public void testParseExecutorName() throws InterruptedException {
635635
final var executorName = randomAlphaOfLength(10);
636636
final String nodeName = rarely() ? null : randomIdentifier();
637637
final ThreadFactory threadFactory;
638+
final boolean isSystem;
638639
if (nodeName == null) {
640+
isSystem = false;
639641
threadFactory = EsExecutors.daemonThreadFactory(Settings.EMPTY, executorName);
640642
} else if (randomBoolean()) {
643+
isSystem = false;
641644
threadFactory = EsExecutors.daemonThreadFactory(
642645
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), nodeName).build(),
643646
executorName
644647
);
645648
} else {
646-
threadFactory = EsExecutors.daemonThreadFactory(nodeName, executorName);
649+
isSystem = randomBoolean();
650+
threadFactory = EsExecutors.daemonThreadFactory(nodeName, executorName, isSystem);
647651
}
648652

649653
final var thread = threadFactory.newThread(() -> {});
@@ -652,6 +656,8 @@ public void testParseExecutorName() throws InterruptedException {
652656
assertThat(EsExecutors.executorName(thread), equalTo(executorName));
653657
assertThat(EsExecutors.executorName("TEST-" + thread.getName()), is(nullValue()));
654658
assertThat(EsExecutors.executorName("LuceneTestCase" + thread.getName()), is(nullValue()));
659+
assertThat(EsExecutors.executorName("LuceneTestCase" + thread.getName()), is(nullValue()));
660+
assertThat(((EsExecutors.EsThread) thread).isSystem(), equalTo(isSystem));
655661
} finally {
656662
thread.join();
657663
}

0 commit comments

Comments
 (0)