Skip to content

Commit 6caf9c1

Browse files
Pass Executor into request handlers (#98854)
Pass an Executor into RequestHandleRegistry#registerRequestHandler calls instead of a String. Ultimately the String executor name passed through and stashed internally is converted by the ThreadPool into an Executor on which to execute tasks. Part of the changes for #97879.
1 parent 1301995 commit 6caf9c1

File tree

16 files changed

+86
-47
lines changed

16 files changed

+86
-47
lines changed

server/src/internalClusterTest/java/org/elasticsearch/action/IndicesRequestIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
import java.util.List;
9999
import java.util.Map;
100100
import java.util.Set;
101+
import java.util.concurrent.Executor;
101102
import java.util.function.Function;
102103

103104
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@@ -754,7 +755,7 @@ public List<TransportInterceptor> getTransportInterceptors(
754755
@Override
755756
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
756757
String action,
757-
String executor,
758+
Executor executor,
758759
boolean forceExecution,
759760
TransportRequestHandler<T> actualHandler
760761
) {

server/src/main/java/org/elasticsearch/common/network/NetworkModule.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.List;
5050
import java.util.Map;
5151
import java.util.Objects;
52+
import java.util.concurrent.Executor;
5253
import java.util.function.BiConsumer;
5354
import java.util.function.Supplier;
5455

@@ -262,7 +263,7 @@ private CompositeTransportInterceptor(List<TransportInterceptor> transportInterc
262263
@Override
263264
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
264265
String action,
265-
String executor,
266+
Executor executor,
266267
boolean forceExecution,
267268
TransportRequestHandler<T> actualHandler
268269
) {

server/src/main/java/org/elasticsearch/transport/InboundHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ private <T extends TransportRequest> void handleRequestForking(T request, Reques
295295
boolean success = false;
296296
request.incRef();
297297
try {
298-
threadPool.executor(reg.getExecutor()).execute(threadPool.getThreadContext().preserveContextWithTracing(new AbstractRunnable() {
298+
reg.getExecutor().execute(threadPool.getThreadContext().preserveContextWithTracing(new AbstractRunnable() {
299299
@Override
300300
protected void doRun() {
301301
doHandleRequest(reg, request, channel);

server/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.tracing.Tracer;
1919

2020
import java.io.IOException;
21+
import java.util.concurrent.Executor;
2122

2223
import static org.elasticsearch.core.Releasables.assertOnce;
2324

@@ -27,7 +28,7 @@ public class RequestHandlerRegistry<Request extends TransportRequest> implements
2728
private final TransportRequestHandler<Request> handler;
2829
private final boolean forceExecution;
2930
private final boolean canTripCircuitBreaker;
30-
private final String executor;
31+
private final Executor executor;
3132
private final TaskManager taskManager;
3233
private final Tracer tracer;
3334
private final Writeable.Reader<Request> requestReader;
@@ -38,7 +39,7 @@ public RequestHandlerRegistry(
3839
Writeable.Reader<Request> requestReader,
3940
TaskManager taskManager,
4041
TransportRequestHandler<Request> handler,
41-
String executor,
42+
Executor executor,
4243
boolean forceExecution,
4344
boolean canTripCircuitBreaker,
4445
Tracer tracer
@@ -86,7 +87,7 @@ public boolean canTripCircuitBreaker() {
8687
return canTripCircuitBreaker;
8788
}
8889

89-
public String getExecutor() {
90+
public Executor getExecutor() {
9091
return executor;
9192
}
9293

server/src/main/java/org/elasticsearch/transport/TransportInterceptor.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,21 @@
1111
import org.elasticsearch.cluster.node.DiscoveryNode;
1212
import org.elasticsearch.common.io.stream.Writeable.Reader;
1313

14+
import java.util.concurrent.Executor;
15+
1416
/**
1517
* This interface allows plugins to intercept requests on both the sender and the receiver side.
1618
*/
1719
public interface TransportInterceptor {
1820
/**
1921
* This is called for each handler that is registered via
20-
* {@link TransportService#registerRequestHandler(String, String, boolean, boolean, Reader, TransportRequestHandler)} or
21-
* {@link TransportService#registerRequestHandler(String, String, Reader, TransportRequestHandler)}. The returned handler is
22+
* {@link TransportService#registerRequestHandler(String, Executor, boolean, boolean, Reader, TransportRequestHandler)} or
23+
* {@link TransportService#registerRequestHandler(String, Executor, Reader, TransportRequestHandler)}. The returned handler is
2224
* used instead of the passed in handler. By default the provided handler is returned.
2325
*/
2426
default <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
2527
String action,
26-
String executor,
28+
Executor executor,
2729
boolean forceExecution,
2830
TransportRequestHandler<T> actualHandler
2931
) {

server/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ public TransportService(
293293
}
294294
registerRequestHandler(
295295
HANDSHAKE_ACTION_NAME,
296-
ThreadPool.Names.SAME,
296+
EsExecutors.DIRECT_EXECUTOR_SERVICE,
297297
false,
298298
false,
299299
HandshakeRequest::new,
@@ -1004,8 +1004,8 @@ private void sendLocalRequest(long requestId, final String action, final Transpo
10041004
assert false : action;
10051005
throw new ActionNotFoundTransportException("Action [" + action + "] not found");
10061006
}
1007-
final String executor = reg.getExecutor();
1008-
if (ThreadPool.Names.SAME.equals(executor)) {
1007+
final Executor executor = reg.getExecutor();
1008+
if (executor == EsExecutors.DIRECT_EXECUTOR_SERVICE) {
10091009
try (var ignored = threadPool.getThreadContext().newTraceContext()) {
10101010
try {
10111011
reg.processMessageReceived(request, channel);
@@ -1017,7 +1017,7 @@ private void sendLocalRequest(long requestId, final String action, final Transpo
10171017
boolean success = false;
10181018
request.incRef();
10191019
try {
1020-
threadPool.executor(executor).execute(threadPool.getThreadContext().preserveContextWithTracing(new AbstractRunnable() {
1020+
executor.execute(threadPool.getThreadContext().preserveContextWithTracing(new AbstractRunnable() {
10211021
@Override
10221022
protected void doRun() throws Exception {
10231023
reg.processMessageReceived(request, channel);
@@ -1122,6 +1122,46 @@ public static boolean isValidActionName(String actionName) {
11221122
return false;
11231123
}
11241124

1125+
/**
1126+
* Temporary passthrough function that continues to take a String rather than Executor type.
1127+
*
1128+
* @param action
1129+
* @param executor
1130+
* @param requestReader
1131+
* @param handler
1132+
* @param <Request>
1133+
*/
1134+
public <Request extends TransportRequest> void registerRequestHandler(
1135+
String action,
1136+
String executor,
1137+
Writeable.Reader<Request> requestReader,
1138+
TransportRequestHandler<Request> handler
1139+
) {
1140+
registerRequestHandler(action, threadPool.executor(executor), requestReader, handler);
1141+
}
1142+
1143+
/**
1144+
* Temporary passthrough function that continues to take a String rather than Executor type.
1145+
*
1146+
* @param action
1147+
* @param executor
1148+
* @param forceExecution
1149+
* @param canTripCircuitBreaker
1150+
* @param requestReader
1151+
* @param handler
1152+
* @param <Request>
1153+
*/
1154+
public <Request extends TransportRequest> void registerRequestHandler(
1155+
String action,
1156+
String executor,
1157+
boolean forceExecution,
1158+
boolean canTripCircuitBreaker,
1159+
Writeable.Reader<Request> requestReader,
1160+
TransportRequestHandler<Request> handler
1161+
) {
1162+
registerRequestHandler(action, threadPool.executor(executor), forceExecution, canTripCircuitBreaker, requestReader, handler);
1163+
}
1164+
11251165
/**
11261166
* Registers a new request handler
11271167
*
@@ -1132,7 +1172,7 @@ public static boolean isValidActionName(String actionName) {
11321172
*/
11331173
public <Request extends TransportRequest> void registerRequestHandler(
11341174
String action,
1135-
String executor,
1175+
Executor executor,
11361176
Writeable.Reader<Request> requestReader,
11371177
TransportRequestHandler<Request> handler
11381178
) {
@@ -1163,7 +1203,7 @@ public <Request extends TransportRequest> void registerRequestHandler(
11631203
*/
11641204
public <Request extends TransportRequest> void registerRequestHandler(
11651205
String action,
1166-
String executor,
1206+
Executor executor,
11671207
boolean forceExecution,
11681208
boolean canTripCircuitBreaker,
11691209
Writeable.Reader<Request> requestReader,

server/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.HashMap;
3939
import java.util.List;
4040
import java.util.Map;
41+
import java.util.concurrent.Executor;
4142
import java.util.concurrent.TimeUnit;
4243
import java.util.concurrent.atomic.AtomicInteger;
4344
import java.util.function.BiConsumer;
@@ -237,7 +238,7 @@ public void testRegisterInterceptor() {
237238
@Override
238239
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
239240
String action,
240-
String executor,
241+
Executor executor,
241242
boolean forceExecution,
242243
TransportRequestHandler<T> actualHandler
243244
) {

server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.common.network.HandlingTimeTracker;
2727
import org.elasticsearch.common.settings.Settings;
2828
import org.elasticsearch.common.util.PageCacheRecycler;
29+
import org.elasticsearch.common.util.concurrent.EsExecutors;
2930
import org.elasticsearch.core.TimeValue;
3031
import org.elasticsearch.core.Tuple;
3132
import org.elasticsearch.tasks.TaskId;
@@ -111,7 +112,7 @@ public void testPing() throws Exception {
111112
TestRequest::new,
112113
taskManager,
113114
(request, channel, task) -> channelCaptor.set(channel),
114-
ThreadPool.Names.SAME,
115+
EsExecutors.DIRECT_EXECUTOR_SERVICE,
115116
false,
116117
true,
117118
Tracer.NOOP
@@ -164,7 +165,7 @@ public TestResponse read(StreamInput in) throws IOException {
164165
channelCaptor.set(channel);
165166
requestCaptor.set(request);
166167
},
167-
ThreadPool.Names.SAME,
168+
EsExecutors.DIRECT_EXECUTOR_SERVICE,
168169
false,
169170
true,
170171
Tracer.NOOP

server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.ArrayList;
4141
import java.util.Collections;
4242
import java.util.List;
43+
import java.util.concurrent.Executor;
4344
import java.util.concurrent.TimeUnit;
4445

4546
import static java.util.Collections.emptyMap;
@@ -426,7 +427,7 @@ public void setModifyBuildHash(boolean modifyBuildHash) {
426427
@Override
427428
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
428429
String action,
429-
String executor,
430+
Executor executor,
430431
boolean forceExecution,
431432
TransportRequestHandler<T> actualHandler
432433
) {

test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@
174174
import java.util.concurrent.Callable;
175175
import java.util.concurrent.CopyOnWriteArrayList;
176176
import java.util.concurrent.CountDownLatch;
177+
import java.util.concurrent.Executor;
177178
import java.util.concurrent.TimeUnit;
178179
import java.util.concurrent.atomic.AtomicInteger;
179180
import java.util.function.Function;
@@ -2174,7 +2175,7 @@ public List<TransportInterceptor> getTransportInterceptors(
21742175
@Override
21752176
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
21762177
String action,
2177-
String executor,
2178+
Executor executor,
21782179
boolean forceExecution,
21792180
TransportRequestHandler<T> actualHandler
21802181
) {

0 commit comments

Comments
 (0)