Skip to content

Commit cb875bc

Browse files
committed
Sort of working
1 parent a11556b commit cb875bc

File tree

7 files changed

+308
-32
lines changed

7 files changed

+308
-32
lines changed

server/src/main/java/org/elasticsearch/common/Randomness.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,12 @@ public static Random get(Settings settings, Setting<Long> setting) {
8787
* RandomizedContext or tests are
8888
* running but tests.seed is not set
8989
*/
90+
@SuppressForbidden(reason = "Need to hack something for virtual threads")
9091
public static Random get() {
92+
if (Thread.currentThread().isVirtual()) {
93+
// Need to work out how to propagate context to virtual threads
94+
return ThreadLocalRandom.current();
95+
}
9196
if (currentMethod != null && getRandomMethod != null) {
9297
try {
9398
Object randomizedContext = currentMethod.invoke(null);

server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
3333
// https://github.com/elastic/elasticsearch/issues/124667
3434
// note, this is intentionally not a lambda to avoid this ever be turned into a compile time constant
3535
// matching similar lambdas coming from other places
36-
static final Runnable WORKER_PROBE = new Runnable() {
36+
public static final Runnable WORKER_PROBE = new Runnable() {
3737
@Override
3838
public void run() {}
3939
};

server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java

Lines changed: 7 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -34,35 +34,13 @@ public Map<String, ExecutorBuilder> getBuilders(Settings settings, int allocated
3434
final double indexAutoscalingEWMA = WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING.get(settings);
3535

3636
Map<String, ExecutorBuilder> result = new HashMap<>();
37-
result.put(
38-
ThreadPool.Names.GENERIC,
39-
new ScalingExecutorBuilder(ThreadPool.Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30), false)
40-
);
41-
result.put(
42-
ThreadPool.Names.WRITE_COORDINATION,
43-
new FixedExecutorBuilder(
44-
settings,
45-
ThreadPool.Names.WRITE_COORDINATION,
46-
allocatedProcessors,
47-
10000,
48-
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
49-
)
50-
);
51-
result.put(
52-
ThreadPool.Names.WRITE,
53-
new FixedExecutorBuilder(
54-
settings,
55-
ThreadPool.Names.WRITE,
56-
allocatedProcessors,
57-
// 10,000 for all nodes with 8 cores or fewer. Scale up once we have more than 8 cores.
58-
Math.max(allocatedProcessors * 750, 10000),
59-
EsExecutors.TaskTrackingConfig.builder()
60-
.trackOngoingTasks()
61-
.trackMaxQueueLatency()
62-
.trackExecutionTime(indexAutoscalingEWMA)
63-
.build()
64-
)
65-
);
37+
// result.put(
38+
// ThreadPool.Names.GENERIC,
39+
// new ScalingExecutorBuilder(ThreadPool.Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30), false)
40+
// );
41+
result.put(ThreadPool.Names.GENERIC, new VirtualThreadsExecutorBuilder(ThreadPool.Names.GENERIC, false));
42+
result.put(ThreadPool.Names.WRITE_COORDINATION, new VirtualThreadsExecutorBuilder(ThreadPool.Names.WRITE_COORDINATION, true));
43+
result.put(ThreadPool.Names.WRITE, new VirtualThreadsExecutorBuilder(ThreadPool.Names.WRITE, true));
6644
int searchOrGetThreadPoolSize = ThreadPool.searchOrGetThreadPoolSize(allocatedProcessors);
6745
result.put(
6846
ThreadPool.Names.GET,
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.threadpool;
11+
12+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
13+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
14+
import org.elasticsearch.common.util.concurrent.ThreadContext;
15+
import org.elasticsearch.logging.LogManager;
16+
import org.elasticsearch.logging.Logger;
17+
18+
import java.util.Collection;
19+
import java.util.List;
20+
import java.util.concurrent.Callable;
21+
import java.util.concurrent.ExecutionException;
22+
import java.util.concurrent.ExecutorService;
23+
import java.util.concurrent.Future;
24+
import java.util.concurrent.RejectedExecutionException;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.TimeoutException;
27+
import java.util.concurrent.atomic.AtomicBoolean;
28+
import java.util.concurrent.atomic.AtomicInteger;
29+
30+
import static org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.WORKER_PROBE;
31+
import static org.elasticsearch.core.Strings.format;
32+
33+
/**
34+
* There is a subtle interaction between a scaling {@link org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor} with
35+
* {@link org.elasticsearch.threadpool.ScalingExecutorBuilder.ScalingExecutorSettings#rejectAfterShutdown} set to false, and
36+
* the {@link org.elasticsearch.common.util.concurrent.ThrottledTaskRunner}.
37+
* <p>
38+
* When a {@link org.elasticsearch.common.util.concurrent.ThrottledTaskRunner} is feeding into a scaling executor that doesn't
39+
* reject after shutdown, it will always be fully processed in the event of a shutdown. This is because whenever a throttled
40+
* task finishes, it checks if there are more queued and forces them onto the end of the thread-pool queue even though they
41+
* are rejected by {@link java.util.concurrent.ThreadPoolExecutor#execute(Runnable)}. The executor won't terminate until all
42+
* workers are finished and the queue is empty, so the fact each worker adds a task to the end of the queue before it terminates
43+
* means the {@link org.elasticsearch.common.util.concurrent.ThrottledTaskRunner} queue will be drained before the thread pool
44+
* executor terminates.
45+
* <p>
46+
* This decorator attempts to emulate that behavior in the absence of an explicit queue, and also ensures that {@link ThreadContext}
47+
* is propagated to tasks that are dispatched.
48+
*/
49+
public class EsExecutorServiceDecorator implements ExecutorService {
50+
51+
private static final Logger logger = LogManager.getLogger(EsExecutorServiceDecorator.class);
52+
53+
private final String name;
54+
private final ExecutorService delegate;
55+
private final ThreadContext contextHolder;
56+
private final boolean rejectAfterShutdown;
57+
private final AtomicInteger runningTasks = new AtomicInteger();
58+
private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
59+
60+
public EsExecutorServiceDecorator(String name, ExecutorService delegate, ThreadContext contextHolder, boolean rejectAfterShutdown) {
61+
this.name = name;
62+
this.delegate = delegate;
63+
this.contextHolder = contextHolder;
64+
this.rejectAfterShutdown = rejectAfterShutdown;
65+
}
66+
67+
@Override
68+
public void shutdown() {
69+
shutdownRequested.set(true);
70+
tryShutdownDelegate();
71+
}
72+
73+
private void tryShutdownDelegate() {
74+
if (shutdownRequested.get() && runningTasks.compareAndSet(0, -1)) {
75+
delegate.shutdown();
76+
}
77+
}
78+
79+
@Override
80+
public List<Runnable> shutdownNow() {
81+
return delegate.shutdownNow();
82+
}
83+
84+
@Override
85+
public boolean isShutdown() {
86+
return delegate.isShutdown();
87+
}
88+
89+
@Override
90+
public boolean isTerminated() {
91+
return delegate.isTerminated();
92+
}
93+
94+
@Override
95+
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
96+
long endTime = System.nanoTime() + unit.toNanos(timeout);
97+
shutdownRequested.set(true);
98+
while (delegate.isShutdown() == false) {
99+
if (System.nanoTime() > endTime) {
100+
return false;
101+
}
102+
Thread.sleep(1);
103+
tryShutdownDelegate();
104+
}
105+
logger.info("Falling through");
106+
return delegate.awaitTermination(timeout, unit);
107+
}
108+
109+
@Override
110+
public <T> Future<T> submit(Callable<T> task) {
111+
return delegate.submit(task);
112+
}
113+
114+
@Override
115+
public <T> Future<T> submit(Runnable task, T result) {
116+
return delegate.submit(task, result);
117+
}
118+
119+
@Override
120+
public Future<?> submit(Runnable task) {
121+
return delegate.submit(task);
122+
}
123+
124+
@Override
125+
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
126+
return delegate.invokeAll(tasks);
127+
}
128+
129+
@Override
130+
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
131+
return delegate.invokeAll(tasks, timeout, unit);
132+
}
133+
134+
@Override
135+
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
136+
return delegate.invokeAny(tasks);
137+
}
138+
139+
@Override
140+
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException,
141+
ExecutionException, TimeoutException {
142+
return delegate.invokeAny(tasks, timeout, unit);
143+
}
144+
145+
@Override
146+
public void execute(Runnable command) {
147+
final Runnable wrappedRunnable = command != WORKER_PROBE ? wrapRunnable(command) : WORKER_PROBE;
148+
try {
149+
if (rejectAfterShutdown && shutdownRequested.get() || delegate.isShutdown()) {
150+
throw new EsRejectedExecutionException("executor has been shutdown", delegate.isShutdown());
151+
}
152+
// Increment outstanding task count
153+
runningTasks.getAndUpdate(currentValue -> {
154+
if (currentValue == -1) {
155+
throw new EsRejectedExecutionException("executor has been shutdown", true);
156+
} else {
157+
return currentValue + 1;
158+
}
159+
});
160+
try {
161+
delegate.execute(() -> {
162+
try {
163+
wrappedRunnable.run();
164+
} finally {
165+
// Decrement outstanding
166+
runningTasks.decrementAndGet();
167+
tryShutdownDelegate();
168+
}
169+
});
170+
} catch (RejectedExecutionException e) {
171+
if (command == WORKER_PROBE) {
172+
return;
173+
}
174+
throw new EsRejectedExecutionException("delegate rejected execution", delegate.isShutdown());
175+
}
176+
} catch (Exception e) {
177+
if (wrappedRunnable instanceof AbstractRunnable abstractRunnable) {
178+
try {
179+
// If we are an abstract runnable we can handle the exception
180+
// directly and don't need to rethrow it, but we log and assert
181+
// any unexpected exception first.
182+
if (e instanceof EsRejectedExecutionException == false) {
183+
logException(abstractRunnable, e);
184+
}
185+
abstractRunnable.onRejection(e);
186+
} finally {
187+
abstractRunnable.onAfter();
188+
}
189+
} else {
190+
throw e;
191+
}
192+
}
193+
}
194+
195+
// package-visible for testing
196+
void logException(AbstractRunnable r, Exception e) {
197+
logger.error(() -> format("[%s] unexpected exception when submitting task [%s] for execution", name, r), e);
198+
assert false : "executor throws an exception (not a rejected execution exception) before the task has been submitted " + e;
199+
}
200+
201+
protected Runnable wrapRunnable(Runnable command) {
202+
return contextHolder.preserveContext(command);
203+
}
204+
}

server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,8 @@ public static class Names {
159159

160160
public enum ThreadPoolType {
161161
FIXED("fixed"),
162-
SCALING("scaling");
162+
SCALING("scaling"),
163+
VIRTUAL("virtual");
163164

164165
private final String type;
165166

@@ -616,6 +617,12 @@ public void shutdown() {
616617
if (executor.executor() instanceof ThreadPoolExecutor) {
617618
closeMetrics(executor);
618619
executor.executor().shutdown();
620+
logger.info("Shutting down pool [{}]", executor.info.getName());
621+
} else if (executor.executor() instanceof EsExecutorServiceDecorator decorator) {
622+
logger.info("Shutting down virtual pool [{}]", executor.info.getName());
623+
decorator.shutdown();
624+
} else {
625+
logger.warn("unknown executor type [{}]", executor.executor().getClass().getName());
619626
}
620627
}
621628
}
@@ -627,6 +634,8 @@ public void shutdownNow() {
627634
if (executor.executor() instanceof ThreadPoolExecutor) {
628635
closeMetrics(executor);
629636
executor.executor().shutdownNow();
637+
} else if (executor.executor() instanceof EsExecutorServiceDecorator decorator) {
638+
decorator.shutdownNow();
630639
}
631640
}
632641
}
@@ -637,6 +646,8 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE
637646
if (executor.executor() instanceof ThreadPoolExecutor) {
638647
closeMetrics(executor);
639648
result &= executor.executor().awaitTermination(timeout, unit);
649+
} else if (executor.executor() instanceof EsExecutorServiceDecorator decorator) {
650+
result &= decorator.awaitTermination(timeout, unit);
640651
}
641652
}
642653
cachedTimeThread.join(unit.toMillis(timeout));
@@ -914,7 +925,9 @@ static class ExecutorHolder {
914925
public final Info info;
915926

916927
ExecutorHolder(ExecutorService executor, Info info) {
917-
assert executor instanceof EsThreadPoolExecutor || executor == EsExecutors.DIRECT_EXECUTOR_SERVICE;
928+
assert executor instanceof EsThreadPoolExecutor
929+
|| executor instanceof EsExecutorServiceDecorator
930+
|| executor == EsExecutors.DIRECT_EXECUTOR_SERVICE;
918931
this.executor = executor;
919932
this.info = info;
920933
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.threadpool;
11+
12+
import org.elasticsearch.common.settings.Setting;
13+
import org.elasticsearch.common.settings.Settings;
14+
import org.elasticsearch.common.unit.SizeValue;
15+
import org.elasticsearch.common.util.concurrent.EsExecutors;
16+
import org.elasticsearch.common.util.concurrent.ThreadContext;
17+
import org.elasticsearch.core.TimeValue;
18+
import org.elasticsearch.node.Node;
19+
20+
import java.util.List;
21+
import java.util.Locale;
22+
import java.util.concurrent.ExecutorService;
23+
import java.util.concurrent.Executors;
24+
25+
public class VirtualThreadsExecutorBuilder extends ExecutorBuilder<VirtualThreadsExecutorSettings> {
26+
27+
private final boolean rejectAfterShutdown;
28+
29+
public VirtualThreadsExecutorBuilder(String name, boolean rejectAfterShutdown) {
30+
super(name, false);
31+
this.rejectAfterShutdown = rejectAfterShutdown;
32+
}
33+
34+
@Override
35+
public List<Setting<?>> getRegisteredSettings() {
36+
return List.of();
37+
}
38+
39+
@Override
40+
VirtualThreadsExecutorSettings getSettings(Settings settings) {
41+
final String nodeName = Node.NODE_NAME_SETTING.get(settings);
42+
return new VirtualThreadsExecutorSettings(nodeName);
43+
}
44+
45+
@Override
46+
ThreadPool.ExecutorHolder build(VirtualThreadsExecutorSettings settings, ThreadContext threadContext) {
47+
final String threadName = EsExecutors.threadName(settings.nodeName, name());
48+
final ExecutorService executorService = Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name(threadName + "[v]").factory());
49+
return new ThreadPool.ExecutorHolder(
50+
new EsExecutorServiceDecorator(name(), executorService, threadContext, rejectAfterShutdown),
51+
new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.VIRTUAL, 999, 999, TimeValue.ZERO, SizeValue.parseSizeValue("999"))
52+
);
53+
}
54+
55+
@Override
56+
String formatInfo(ThreadPool.Info info) {
57+
return String.format(Locale.ROOT, "name [%s], virtual", info.getName());
58+
}
59+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.threadpool;
11+
12+
public class VirtualThreadsExecutorSettings extends ExecutorBuilder.ExecutorSettings {
13+
14+
VirtualThreadsExecutorSettings(String nodeName) {
15+
super(nodeName);
16+
}
17+
}

0 commit comments

Comments
 (0)