Skip to content

Commit d949baf

Browse files
authored
Provide more performant and reliable extraction of the executorName for EsThreads (#135195)
1 parent 3aa317e commit d949baf

File tree

27 files changed

+173
-125
lines changed

27 files changed

+173
-125
lines changed

qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
1818
import org.elasticsearch.core.TimeValue;
1919
import org.elasticsearch.test.ESTestCase;
20+
import org.elasticsearch.test.TestEsExecutors;
2021
import org.junit.After;
2122
import org.junit.Before;
2223

@@ -65,7 +66,7 @@ public void testExecutionErrorOnFixedESThreadPoolExecutor() throws InterruptedEx
6566
"test",
6667
1,
6768
1,
68-
EsExecutors.daemonThreadFactory("test"),
69+
TestEsExecutors.testOnlyDaemonThreadFactory("test"),
6970
threadPool.getThreadContext(),
7071
randomFrom(TaskTrackingConfig.DEFAULT, TaskTrackingConfig.DO_NOT_TRACK)
7172
);
@@ -85,7 +86,7 @@ public void testExecutionErrorOnScalingESThreadPoolExecutor() throws Interrupted
8586
10,
8687
TimeUnit.SECONDS,
8788
randomBoolean(),
88-
EsExecutors.daemonThreadFactory("test"),
89+
TestEsExecutors.testOnlyDaemonThreadFactory("test"),
8990
threadPool.getThreadContext()
9091
);
9192
try {
@@ -99,7 +100,7 @@ public void testExecutionErrorOnScalingESThreadPoolExecutor() throws Interrupted
99100
public void testExecutionErrorOnSinglePrioritizingThreadPoolExecutor() throws InterruptedException {
100101
final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors.newSinglePrioritizing(
101102
"test",
102-
EsExecutors.daemonThreadFactory("test"),
103+
TestEsExecutors.testOnlyDaemonThreadFactory("test"),
103104
threadPool.getThreadContext(),
104105
threadPool.scheduler()
105106
);
@@ -174,7 +175,7 @@ public void testExecutionExceptionOnFixedESThreadPoolExecutor() throws Interrupt
174175
"test",
175176
1,
176177
1,
177-
EsExecutors.daemonThreadFactory("test"),
178+
TestEsExecutors.testOnlyDaemonThreadFactory("test"),
178179
threadPool.getThreadContext(),
179180
randomFrom(TaskTrackingConfig.DEFAULT, TaskTrackingConfig.DO_NOT_TRACK)
180181
);
@@ -194,7 +195,7 @@ public void testExecutionExceptionOnScalingESThreadPoolExecutor() throws Interru
194195
10,
195196
TimeUnit.SECONDS,
196197
randomBoolean(),
197-
EsExecutors.daemonThreadFactory("test"),
198+
TestEsExecutors.testOnlyDaemonThreadFactory("test"),
198199
threadPool.getThreadContext()
199200
);
200201
try {
@@ -208,7 +209,7 @@ public void testExecutionExceptionOnScalingESThreadPoolExecutor() throws Interru
208209
public void testExecutionExceptionOnSinglePrioritizingThreadPoolExecutor() throws InterruptedException {
209210
final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors.newSinglePrioritizing(
210211
"test",
211-
EsExecutors.daemonThreadFactory("test"),
212+
TestEsExecutors.testOnlyDaemonThreadFactory("test"),
212213
threadPool.getThreadContext(),
213214
threadPool.scheduler()
214215
);

server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java

Lines changed: 40 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.common.util.concurrent;
1111

1212
import org.elasticsearch.ExceptionsHelper;
13+
import org.elasticsearch.common.Strings;
1314
import org.elasticsearch.common.settings.Setting;
1415
import org.elasticsearch.common.settings.Setting.Property;
1516
import org.elasticsearch.common.settings.Settings;
@@ -322,92 +323,79 @@ public void execute(Runnable command) {
322323
*/
323324
public static final ExecutorService DIRECT_EXECUTOR_SERVICE = new DirectExecutorService();
324325

325-
public static String threadName(Settings settings, String namePrefix) {
326-
if (Node.NODE_NAME_SETTING.exists(settings)) {
327-
return threadName(Node.NODE_NAME_SETTING.get(settings), namePrefix);
328-
} else {
329-
// TODO this should only be allowed in tests
330-
return threadName("", namePrefix);
331-
}
332-
}
333-
334-
public static String threadName(final String nodeName, final String namePrefix) {
335-
// TODO missing node names should only be allowed in tests
336-
return nodeName.isEmpty() == false ? "elasticsearch[" + nodeName + "][" + namePrefix + "]" : "elasticsearch[" + namePrefix + "]";
337-
}
338-
339-
public static String executorName(String threadName) {
340-
// subtract 2 to avoid the `]` of the thread number part.
341-
int executorNameEnd = threadName.lastIndexOf(']', threadName.length() - 2);
342-
int executorNameStart = threadName.lastIndexOf('[', executorNameEnd);
343-
if (executorNameStart == -1
344-
|| executorNameEnd - executorNameStart <= 1
345-
|| threadName.startsWith("TEST-")
346-
|| threadName.startsWith("LuceneTestCase")) {
347-
return null;
348-
}
349-
return threadName.substring(executorNameStart + 1, executorNameEnd);
326+
public static String threadName(Settings settings, String executorName) {
327+
// TODO require node name to be non empty unless in tests
328+
return EsThreadFactory.threadNamePrefix(Node.NODE_NAME_SETTING.get(settings), executorName);
350329
}
351330

352331
public static String executorName(Thread thread) {
353-
return executorName(thread.getName());
354-
}
355-
356-
public static ThreadFactory daemonThreadFactory(Settings settings, String namePrefix) {
357-
return createDaemonThreadFactory(threadName(settings, namePrefix), false);
358-
}
359-
360-
public static ThreadFactory daemonThreadFactory(String nodeName, String namePrefix) {
361-
return daemonThreadFactory(nodeName, namePrefix, false);
332+
return EsThread.executorName(thread);
362333
}
363334

364-
public static ThreadFactory daemonThreadFactory(String nodeName, String namePrefix, boolean isSystemThread) {
365-
assert nodeName != null && false == nodeName.isEmpty();
366-
return createDaemonThreadFactory(threadName(nodeName, namePrefix), isSystemThread);
335+
public static ThreadFactory daemonThreadFactory(Settings settings, String executorName) {
336+
// TODO require node name to be non empty unless in tests
337+
return new EsThreadFactory(Node.NODE_NAME_SETTING.get(settings), executorName, false);
367338
}
368339

369-
public static ThreadFactory daemonThreadFactory(String name) {
370-
assert name != null && name.isEmpty() == false;
371-
return createDaemonThreadFactory(name, false);
340+
public static ThreadFactory daemonThreadFactory(String nodeName, String executorName) {
341+
return new EsThreadFactory(nodeName, executorName, false);
372342
}
373343

374-
private static ThreadFactory createDaemonThreadFactory(String namePrefix, boolean isSystemThread) {
375-
return new EsThreadFactory(namePrefix, isSystemThread);
344+
public static ThreadFactory daemonThreadFactory(String nodeName, String executorName, boolean isSystemThread) {
345+
assert Strings.hasLength(nodeName);
346+
return new EsThreadFactory(nodeName, executorName, isSystemThread);
376347
}
377348

378-
static class EsThreadFactory implements ThreadFactory {
349+
private static class EsThreadFactory implements ThreadFactory {
379350

380351
final ThreadGroup group;
381352
final AtomicInteger threadNumber = new AtomicInteger(1);
382-
final String namePrefix;
353+
final String nodeName;
354+
final String executorName;
383355
final boolean isSystem;
384356

385-
EsThreadFactory(String namePrefix, boolean isSystem) {
386-
this.namePrefix = namePrefix;
387-
SecurityManager s = System.getSecurityManager();
388-
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
357+
EsThreadFactory(String nodeName, String executorName, boolean isSystem) {
358+
this.nodeName = nodeName;
359+
this.executorName = executorName;
360+
this.group = Thread.currentThread().getThreadGroup();
389361
this.isSystem = isSystem;
390362
}
391363

392364
@Override
393365
public Thread newThread(Runnable r) {
394-
Thread t = new EsThread(group, r, namePrefix + "[T#" + threadNumber.getAndIncrement() + "]", 0, isSystem);
395-
t.setDaemon(true);
396-
return t;
366+
String threadName = threadNamePrefix(nodeName, executorName) + "[T#" + threadNumber.getAndIncrement() + "]";
367+
Thread thread = new EsThread(group, r, threadName, 0, executorName, isSystem);
368+
thread.setDaemon(true);
369+
return thread;
370+
}
371+
372+
static String threadNamePrefix(String nodeName, String executorName) {
373+
return Strings.hasLength(nodeName)
374+
? "elasticsearch[" + nodeName + "][" + executorName + "]"
375+
: "elasticsearch[" + executorName + "]";
397376
}
398377
}
399378

400379
public static class EsThread extends Thread {
380+
private final String executorName;
401381
private final boolean isSystem;
402382

403-
EsThread(ThreadGroup group, Runnable target, String name, long stackSize, boolean isSystem) {
383+
EsThread(ThreadGroup group, Runnable target, String name, long stackSize, String executorName, boolean isSystem) {
404384
super(group, target, name, stackSize);
385+
this.executorName = executorName;
405386
this.isSystem = isSystem;
406387
}
407388

408389
public boolean isSystem() {
409390
return isSystem;
410391
}
392+
393+
private static String executorName(Thread thread) {
394+
if (thread instanceof EsThread esThread) {
395+
return esThread.executorName;
396+
}
397+
return null;
398+
}
411399
}
412400

413401
/**

server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1107,11 +1107,11 @@ public static boolean assertNotScheduleThread(String reason) {
11071107
}
11081108

11091109
public static boolean assertCurrentThreadPool(String... permittedThreadPoolNames) {
1110-
final var threadName = Thread.currentThread().getName();
1111-
final var executorName = EsExecutors.executorName(threadName);
1110+
final Thread thread = Thread.currentThread();
1111+
final var threadName = thread.getName();
11121112
assert threadName.startsWith("TEST-")
11131113
|| threadName.startsWith("LuceneTestCase")
1114-
|| Arrays.asList(permittedThreadPoolNames).contains(executorName)
1114+
|| Arrays.asList(permittedThreadPoolNames).contains(EsExecutors.executorName(thread))
11151115
: threadName + " not in " + Arrays.toString(permittedThreadPoolNames) + " nor a test thread";
11161116
return true;
11171117
}

server/src/test/java/org/elasticsearch/action/search/QueryPhaseResultConsumerTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.search.builder.SearchSourceBuilder;
3737
import org.elasticsearch.search.query.QuerySearchResult;
3838
import org.elasticsearch.test.ESTestCase;
39+
import org.elasticsearch.test.TestEsExecutors;
3940
import org.elasticsearch.threadpool.TestThreadPool;
4041
import org.elasticsearch.threadpool.ThreadPool;
4142
import org.junit.After;
@@ -90,7 +91,7 @@ public AggregationReduceContext forFinalReduction() {
9091
"test",
9192
1,
9293
10,
93-
EsExecutors.daemonThreadFactory("test"),
94+
TestEsExecutors.testOnlyDaemonThreadFactory("test"),
9495
threadPool.getThreadContext(),
9596
randomFrom(TaskTrackingConfig.DEFAULT, TaskTrackingConfig.DO_NOT_TRACK)
9697
);

server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import org.elasticsearch.search.suggest.term.TermSuggestion;
6969
import org.elasticsearch.tasks.TaskId;
7070
import org.elasticsearch.test.ESTestCase;
71+
import org.elasticsearch.test.TestEsExecutors;
7172
import org.elasticsearch.threadpool.TestThreadPool;
7273
import org.elasticsearch.threadpool.ThreadPool;
7374
import org.elasticsearch.xcontent.Text;
@@ -134,7 +135,7 @@ public AggregationReduceContext forFinalReduction() {
134135
"test",
135136
1,
136137
10,
137-
EsExecutors.daemonThreadFactory("test"),
138+
TestEsExecutors.testOnlyDaemonThreadFactory("test"),
138139
threadPool.getThreadContext(),
139140
randomFrom(TaskTrackingConfig.DEFAULT, TaskTrackingConfig.DO_NOT_TRACK)
140141
);

server/src/test/java/org/elasticsearch/action/support/RefCountingRunnableTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.core.AbstractRefCounted;
1818
import org.elasticsearch.core.Releasable;
1919
import org.elasticsearch.test.ESTestCase;
20+
import org.elasticsearch.test.TestEsExecutors;
2021

2122
import java.util.concurrent.CountDownLatch;
2223
import java.util.concurrent.Semaphore;
@@ -105,7 +106,7 @@ public void testAsyncAcquire() throws InterruptedException {
105106
10,
106107
TimeUnit.SECONDS,
107108
true,
108-
EsExecutors.daemonThreadFactory("test"),
109+
TestEsExecutors.testOnlyDaemonThreadFactory("test"),
109110
new ThreadContext(Settings.EMPTY)
110111
);
111112
final var asyncPermits = new Semaphore(between(0, 1000));

server/src/test/java/org/elasticsearch/action/support/SubscribableListenerTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.core.Nullable;
2222
import org.elasticsearch.core.TimeValue;
2323
import org.elasticsearch.test.ESTestCase;
24+
import org.elasticsearch.test.TestEsExecutors;
2425
import org.elasticsearch.threadpool.ThreadPool;
2526

2627
import java.io.IOException;
@@ -249,7 +250,7 @@ public void testForkingAndInterrupts() throws Exception {
249250
10,
250251
TimeUnit.SECONDS,
251252
true,
252-
EsExecutors.daemonThreadFactory(executorThreadPrefix),
253+
TestEsExecutors.testOnlyDaemonThreadFactory(executorThreadPrefix),
253254
threadContext
254255
);
255256

server/src/test/java/org/elasticsearch/action/support/UnsafePlainActionFutureTests.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,10 @@ public void testOneArg() {
2222
UnsafePlainActionFuture<?> future = new UnsafePlainActionFuture<Void>(unsafeExecutorName);
2323
Thread other1 = getThread(otherExecutorName);
2424
Thread other2 = getThread(otherExecutorName);
25-
assertFalse(future.allowedExecutors(other1, other2));
2625
Thread unsafe1 = getThread(unsafeExecutorName);
2726
Thread unsafe2 = getThread(unsafeExecutorName);
27+
assertFalse(future.allowedExecutors(other1, other2));
2828
assertTrue(future.allowedExecutors(unsafe1, unsafe2));
29-
3029
assertTrue(future.allowedExecutors(unsafe1, other1));
3130
}
3231

@@ -51,8 +50,8 @@ public void testTwoArg() {
5150
}
5251

5352
private static Thread getThread(String executorName) {
54-
Thread t = new Thread("[" + executorName + "][]");
53+
Thread t = EsExecutors.daemonThreadFactory("node", executorName).newThread(() -> {});
5554
assertThat(EsExecutors.executorName(t), equalTo(executorName));
56-
return t;
55+
return t; // note: t is never started
5756
}
5857
}

server/src/test/java/org/elasticsearch/common/component/LifecycleTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.common.util.concurrent.ThreadContext;
1818
import org.elasticsearch.core.Releasable;
1919
import org.elasticsearch.test.ESTestCase;
20+
import org.elasticsearch.test.TestEsExecutors;
2021

2122
import java.util.concurrent.CyclicBarrier;
2223
import java.util.concurrent.ExecutorService;
@@ -93,7 +94,7 @@ private static class ThreadSafetyTestHarness implements Releasable {
9394
10,
9495
TimeUnit.SECONDS,
9596
true,
96-
EsExecutors.daemonThreadFactory("test"),
97+
TestEsExecutors.testOnlyDaemonThreadFactory("test"),
9798
new ThreadContext(Settings.EMPTY)
9899
);
99100
}

server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractThrottledTaskRunnerTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig;
1515
import org.elasticsearch.core.Releasable;
1616
import org.elasticsearch.test.ESTestCase;
17+
import org.elasticsearch.test.TestEsExecutors;
1718

1819
import java.util.concurrent.BlockingQueue;
1920
import java.util.concurrent.CountDownLatch;
@@ -29,7 +30,7 @@
2930

3031
public class AbstractThrottledTaskRunnerTests extends ESTestCase {
3132

32-
private static final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory("test");
33+
private static final ThreadFactory threadFactory = TestEsExecutors.testOnlyDaemonThreadFactory("test");
3334
private static final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
3435

3536
private ExecutorService executor;

0 commit comments

Comments
 (0)