Skip to content

Commit ee4e9d5

Browse files
committed
Call internal executor method to ensure at least 1 worker
1 parent 83d45ed commit ee4e9d5

File tree

5 files changed

+35
-50
lines changed

5 files changed

+35
-50
lines changed

build-tools-internal/src/main/java/org/elasticsearch/gradle/internal/ElasticsearchTestBasePlugin.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ public void execute(Task t) {
112112
"-Xms" + System.getProperty("tests.heap.size", "512m"),
113113
"-Dtests.testfeatures.enabled=true",
114114
"--add-opens=java.base/java.util=ALL-UNNAMED",
115+
"--add-opens=java.base/java.util.concurrent=ALL-UNNAMED",
115116
// TODO: only open these for mockito when it is modularized
116117
"--add-opens=java.base/java.security.cert=ALL-UNNAMED",
117118
"--add-opens=java.base/java.nio.channels=ALL-UNNAMED",

distribution/tools/server-cli/src/main/java/org/elasticsearch/server/cli/SystemJvmOptions.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ static List<String> systemJvmOptions(Settings nodeSettings, final Map<String, St
6868
"-Dlog4j2.formatMsgNoLookups=true",
6969
"-Djava.locale.providers=CLDR",
7070
// Pass through distribution type
71-
"-Des.distribution.type=" + distroType
71+
"-Des.distribution.type=" + distroType,
72+
"--add-opens=java.base/java.util.concurrent=org.elasticsearch.server"
7273
),
7374
maybeEnableNativeAccess(useEntitlements),
7475
maybeOverrideDockerCgroup(distroType),

server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717
import org.elasticsearch.core.SuppressForbidden;
1818
import org.elasticsearch.node.Node;
1919

20+
import java.lang.invoke.MethodHandle;
21+
import java.lang.invoke.MethodHandles;
2022
import java.security.AccessController;
2123
import java.security.PrivilegedAction;
24+
import java.security.PrivilegedExceptionAction;
2225
import java.util.List;
2326
import java.util.Optional;
2427
import java.util.concurrent.AbstractExecutorService;
@@ -109,34 +112,12 @@ public static EsThreadPoolExecutor newScaling(
109112
) {
110113
ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<>();
111114
EsThreadPoolExecutor executor;
112-
113-
final int corePoolSize;
114-
final long adjustedKeepAliveTime;
115-
final boolean allowCoreThreadTimeOut;
116-
if (min == 0) {
117-
// If a min of 0 is requested we set the thread core count to 1, but we allow the thread cores to timeout.
118-
// This way we still achieve the goal of having 0 threads when idle, but we do not run into the risk of running into a case
119-
// where we have task(s) in the queue and no worker to consume them.
120-
corePoolSize = 1;
121-
allowCoreThreadTimeOut = true;
122-
if (keepAliveTime == 0) {
123-
// The JDK does not allow a timeout of 0 with allowCoreThreadTimeOut
124-
adjustedKeepAliveTime = 1;
125-
} else {
126-
adjustedKeepAliveTime = keepAliveTime;
127-
}
128-
} else {
129-
corePoolSize = min;
130-
adjustedKeepAliveTime = keepAliveTime;
131-
allowCoreThreadTimeOut = false;
132-
}
133-
134115
if (config.trackExecutionTime()) {
135116
executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor(
136117
name,
137-
corePoolSize,
118+
min,
138119
max,
139-
adjustedKeepAliveTime,
120+
keepAliveTime,
140121
unit,
141122
queue,
142123
TimedRunnable::new,
@@ -148,17 +129,16 @@ public static EsThreadPoolExecutor newScaling(
148129
} else {
149130
executor = new EsThreadPoolExecutor(
150131
name,
151-
corePoolSize,
132+
min,
152133
max,
153-
adjustedKeepAliveTime,
134+
keepAliveTime,
154135
unit,
155136
queue,
156137
threadFactory,
157138
new ForceQueuePolicy(rejectAfterShutdown),
158139
contextHolder
159140
);
160141
}
161-
executor.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
162142
queue.executor = executor;
163143
return executor;
164144
}
@@ -479,6 +459,22 @@ static class ForceQueuePolicy extends EsRejectedExecutionHandler {
479459
*/
480460
private final boolean rejectAfterShutdown;
481461

462+
private static final MethodHandle ensurePrestart$mh = lookupEnsurePrestart();
463+
464+
@SuppressForbidden(reason = "reflective access")
465+
static MethodHandle lookupEnsurePrestart() {
466+
try {
467+
return AccessController.doPrivileged((PrivilegedExceptionAction<MethodHandle>) () -> {
468+
var ensurePrestartMethod = ThreadPoolExecutor.class.getDeclaredMethod("ensurePrestart");
469+
ensurePrestartMethod.setAccessible(true);
470+
471+
return MethodHandles.lookup().unreflect(ensurePrestartMethod);
472+
});
473+
} catch (Exception e) {
474+
throw new RuntimeException(e);
475+
}
476+
}
477+
482478
/**
483479
* @param rejectAfterShutdown indicates if {@link Runnable} should be rejected once the thread pool is shutting down
484480
*/
@@ -506,6 +502,11 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
506502
// b) var t = executor.getQueue().poll(); if (t != null) executor.execute(t); (re-enqueue) but this will likely
507503
// have undesired side effects (it's a recursive call in disguise)
508504
// c) executor.ensurePrestart(); which is not public nor protected, so we would have to call it reflectively
505+
try {
506+
ensurePrestart$mh.invokeExact(executor);
507+
} catch (Throwable e) {
508+
throw new RuntimeException(e);
509+
}
509510
}
510511
}
511512
} else {

server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -66,28 +66,6 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
6666
this.contextHolder = contextHolder;
6767
}
6868

69-
@Override
70-
public void setCorePoolSize(int corePoolSize) {
71-
if (corePoolSize == 0) {
72-
corePoolSize = 1;
73-
// Enforce a minimum timeout > 0 so we can allow core threads timeout
74-
if (getKeepAliveTime(TimeUnit.MILLISECONDS) == 0) {
75-
setKeepAliveTime(1, TimeUnit.MILLISECONDS);
76-
}
77-
super.allowCoreThreadTimeOut(true);
78-
}
79-
super.setCorePoolSize(corePoolSize);
80-
}
81-
82-
@Override
83-
public void allowCoreThreadTimeOut(boolean value) {
84-
if (value == false && getCorePoolSize() == 0) {
85-
// Nope
86-
return;
87-
}
88-
super.allowsCoreThreadTimeOut();
89-
}
90-
9169
@Override
9270
public void execute(Runnable command) {
9371
final Runnable wrappedRunnable = wrapRunnable(command);

server/src/main/resources/org/elasticsearch/bootstrap/security.policy

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ grant codeBase "${codebase.elasticsearch}" {
3232

3333
// for plugin api dynamic settings instances
3434
permission java.lang.RuntimePermission "accessClassInPackage.jdk.internal.reflect";
35+
36+
// for ExExecutors internal function patch
37+
permission java.lang.RuntimePermission "accessDeclaredMembers";
38+
permission java.lang.reflect.ReflectPermission "suppressAccessChecks";
3539
};
3640

3741
//// Very special jar permissions:

0 commit comments

Comments
 (0)