Skip to content

Commit 83d45ed

Browse files
committed
Do not allow core count of 0, use 1 and allow core thread timeout instead
1 parent 2a94ced commit 83d45ed

File tree

2 files changed

+58
-4
lines changed

2 files changed

+58
-4
lines changed

server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,34 @@ public static EsThreadPoolExecutor newScaling(
109109
) {
110110
ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<>();
111111
EsThreadPoolExecutor executor;
112+
113+
final int corePoolSize;
114+
final long adjustedKeepAliveTime;
115+
final boolean allowCoreThreadTimeOut;
116+
if (min == 0) {
117+
// If a min of 0 is requested we set the thread core count to 1, but we allow the thread cores to timeout.
118+
// This way we still achieve the goal of having 0 threads when idle, but we do not run into the risk of running into a case
119+
// where we have task(s) in the queue and no worker to consume them.
120+
corePoolSize = 1;
121+
allowCoreThreadTimeOut = true;
122+
if (keepAliveTime == 0) {
123+
// The JDK does not allow a timeout of 0 with allowCoreThreadTimeOut
124+
adjustedKeepAliveTime = 1;
125+
} else {
126+
adjustedKeepAliveTime = keepAliveTime;
127+
}
128+
} else {
129+
corePoolSize = min;
130+
adjustedKeepAliveTime = keepAliveTime;
131+
allowCoreThreadTimeOut = false;
132+
}
133+
112134
if (config.trackExecutionTime()) {
113135
executor = new TaskExecutionTimeTrackingEsThreadPoolExecutor(
114136
name,
115-
min,
137+
corePoolSize,
116138
max,
117-
keepAliveTime,
139+
adjustedKeepAliveTime,
118140
unit,
119141
queue,
120142
TimedRunnable::new,
@@ -126,16 +148,17 @@ public static EsThreadPoolExecutor newScaling(
126148
} else {
127149
executor = new EsThreadPoolExecutor(
128150
name,
129-
min,
151+
corePoolSize,
130152
max,
131-
keepAliveTime,
153+
adjustedKeepAliveTime,
132154
unit,
133155
queue,
134156
threadFactory,
135157
new ForceQueuePolicy(rejectAfterShutdown),
136158
contextHolder
137159
);
138160
}
161+
executor.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
139162
queue.executor = executor;
140163
return executor;
141164
}
@@ -475,6 +498,15 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
475498
if (executor.isShutdown() && executor.remove(task)) {
476499
reject(executor, task);
477500
}
501+
// The equivalent code in the JDK here would check if the worker count (aka pool size) is 0, and if so it would
502+
// add a worker. We cannot reproduce that exact behaviour.
503+
if (executor.getPoolSize() == 0) {
504+
// add a worker. We could use:
505+
// a) executor.setCorePoolSize(1); (but then we would need to bring it back to 0 "later", and "later" is not trivial
506+
// b) var t = executor.getQueue().poll(); if (t != null) executor.execute(t); (re-enqueue) but this will likely
507+
// have undesired side effects (it's a recursive call in disguise)
508+
// c) executor.ensurePrestart(); which is not public nor protected, so we would have to call it reflectively
509+
}
478510
}
479511
} else {
480512
put(executor, task);

server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,28 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
6666
this.contextHolder = contextHolder;
6767
}
6868

69+
@Override
70+
public void setCorePoolSize(int corePoolSize) {
71+
if (corePoolSize == 0) {
72+
corePoolSize = 1;
73+
// Enforce a minimum timeout > 0 so we can allow core threads timeout
74+
if (getKeepAliveTime(TimeUnit.MILLISECONDS) == 0) {
75+
setKeepAliveTime(1, TimeUnit.MILLISECONDS);
76+
}
77+
super.allowCoreThreadTimeOut(true);
78+
}
79+
super.setCorePoolSize(corePoolSize);
80+
}
81+
82+
@Override
83+
public void allowCoreThreadTimeOut(boolean value) {
84+
if (value == false && getCorePoolSize() == 0) {
85+
// Nope
86+
return;
87+
}
88+
super.allowsCoreThreadTimeOut();
89+
}
90+
6991
@Override
7092
public void execute(Runnable command) {
7193
final Runnable wrappedRunnable = wrapRunnable(command);

0 commit comments

Comments
 (0)