Skip to content

Commit b8b4a95

Browse files
committed
move logic to DataNodeComputeHandler
1 parent 333ed60 commit b8b4a95

File tree

9 files changed

+58
-141
lines changed

9 files changed

+58
-141
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.action.ActionListener;
1616
import org.elasticsearch.cluster.RemoteException;
1717
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
18+
import org.elasticsearch.cluster.project.ProjectResolver;
1819
import org.elasticsearch.cluster.service.ClusterService;
1920
import org.elasticsearch.common.Strings;
2021
import org.elasticsearch.common.breaker.CircuitBreaker;
@@ -383,6 +384,7 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() {
383384
mock(SearchService.class),
384385
null,
385386
mock(ClusterService.class),
387+
mock(ProjectResolver.class),
386388
mock(IndexNameExpressionResolver.class),
387389
null,
388390
mockInferenceRunner()

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveNodesAction.java

Lines changed: 0 additions & 126 deletions
This file was deleted.

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.action.search.SearchRequest;
1313
import org.elasticsearch.action.search.ShardSearchFailure;
1414
import org.elasticsearch.cluster.RemoteException;
15+
import org.elasticsearch.cluster.project.ProjectResolver;
1516
import org.elasticsearch.cluster.service.ClusterService;
1617
import org.elasticsearch.common.util.BigArrays;
1718
import org.elasticsearch.common.util.concurrent.RunOnce;
@@ -128,6 +129,7 @@ public class ComputeService {
128129
private final LookupFromIndexService lookupFromIndexService;
129130
private final InferenceRunner inferenceRunner;
130131
private final ClusterService clusterService;
132+
private final ProjectResolver projectResolver;
131133
private final AtomicLong childSessionIdGenerator = new AtomicLong();
132134
private final DataNodeComputeHandler dataNodeComputeHandler;
133135
private final ClusterComputeHandler clusterComputeHandler;
@@ -155,7 +157,16 @@ public ComputeService(
155157
this.lookupFromIndexService = lookupFromIndexService;
156158
this.inferenceRunner = transportActionServices.inferenceRunner();
157159
this.clusterService = transportActionServices.clusterService();
158-
this.dataNodeComputeHandler = new DataNodeComputeHandler(this, searchService, transportService, exchangeService, esqlExecutor);
160+
this.projectResolver = transportActionServices.projectResolver();
161+
this.dataNodeComputeHandler = new DataNodeComputeHandler(
162+
this,
163+
clusterService,
164+
projectResolver,
165+
searchService,
166+
transportService,
167+
exchangeService,
168+
esqlExecutor
169+
);
159170
this.clusterComputeHandler = new ClusterComputeHandler(
160171
this,
161172
exchangeService,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import org.elasticsearch.action.support.ChannelActionListener;
1717
import org.elasticsearch.action.support.RefCountingRunnable;
1818
import org.elasticsearch.cluster.node.DiscoveryNode;
19+
import org.elasticsearch.cluster.project.ProjectResolver;
20+
import org.elasticsearch.cluster.service.ClusterService;
1921
import org.elasticsearch.compute.operator.DriverProfile;
2022
import org.elasticsearch.compute.operator.exchange.ExchangeService;
2123
import org.elasticsearch.compute.operator.exchange.ExchangeSink;
@@ -66,19 +68,25 @@
6668
final class DataNodeComputeHandler implements TransportRequestHandler<DataNodeRequest> {
6769
private final ComputeService computeService;
6870
private final SearchService searchService;
71+
private final ClusterService clusterService;
72+
private final ProjectResolver projectResolver;
6973
private final TransportService transportService;
7074
private final ExchangeService exchangeService;
7175
private final Executor esqlExecutor;
7276
private final ThreadPool threadPool;
7377

7478
DataNodeComputeHandler(
7579
ComputeService computeService,
80+
ClusterService clusterService,
81+
ProjectResolver projectResolver,
7682
SearchService searchService,
7783
TransportService transportService,
7884
ExchangeService exchangeService,
7985
Executor esqlExecutor
8086
) {
8187
this.computeService = computeService;
88+
this.clusterService = clusterService;
89+
this.projectResolver = projectResolver;
8290
this.searchService = searchService;
8391
this.transportService = transportService;
8492
this.exchangeService = exchangeService;
@@ -102,6 +110,8 @@ void startComputeOnDataNodes(
102110
Integer maxConcurrentNodesPerCluster = PlanConcurrencyCalculator.INSTANCE.calculateNodesConcurrency(dataNodePlan, configuration);
103111

104112
new DataNodeRequestSender(
113+
clusterService,
114+
projectResolver,
105115
transportService,
106116
esqlExecutor,
107117
parentTask,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
import org.elasticsearch.action.support.TransportActions;
2020
import org.elasticsearch.cluster.node.DiscoveryNode;
2121
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
22+
import org.elasticsearch.cluster.project.ProjectResolver;
23+
import org.elasticsearch.cluster.service.ClusterService;
2224
import org.elasticsearch.common.breaker.CircuitBreakingException;
25+
import org.elasticsearch.common.util.Maps;
2326
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2427
import org.elasticsearch.compute.operator.DriverProfile;
2528
import org.elasticsearch.compute.operator.FailureCollector;
@@ -33,9 +36,6 @@
3336
import org.elasticsearch.transport.TransportException;
3437
import org.elasticsearch.transport.TransportRequestOptions;
3538
import org.elasticsearch.transport.TransportService;
36-
import org.elasticsearch.xpack.esql.action.EsqlResolveNodesAction;
37-
import org.elasticsearch.xpack.esql.action.EsqlResolveNodesAction.ResolveNodesRequest;
38-
import org.elasticsearch.xpack.esql.action.EsqlResolveNodesAction.ResolveNodesResponse;
3939
import org.elasticsearch.xpack.esql.action.EsqlSearchShardsAction;
4040

4141
import java.util.ArrayList;
@@ -78,6 +78,8 @@ abstract class DataNodeRequestSender {
7878
DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE.roleName()
7979
);
8080

81+
private final ClusterService clusterService;
82+
private final ProjectResolver projectResolver;
8183
private final TransportService transportService;
8284
private final Executor esqlExecutor;
8385
private final CancellableTask rootTask;
@@ -100,6 +102,8 @@ abstract class DataNodeRequestSender {
100102
private final AtomicInteger remainingTargetShardSearchAttempts = new AtomicInteger(10);
101103

102104
DataNodeRequestSender(
105+
ClusterService clusterService,
106+
ProjectResolver projectResolver,
103107
TransportService transportService,
104108
Executor esqlExecutor,
105109
CancellableTask rootTask,
@@ -109,6 +113,8 @@ abstract class DataNodeRequestSender {
109113
boolean allowPartialResults,
110114
int concurrentRequests
111115
) {
116+
this.clusterService = clusterService;
117+
this.projectResolver = projectResolver;
112118
this.transportService = transportService;
113119
this.esqlExecutor = esqlExecutor;
114120
this.rootTask = rootTask;
@@ -490,13 +496,23 @@ void searchShards(Set<String> concreteIndices, ActionListener<TargetShards> list
490496
}
491497

492498
void resolveShards(Set<ShardId> shardIds, ActionListener<Map<ShardId, List<DiscoveryNode>>> listener) {
493-
transportService.sendChildRequest(
494-
transportService.getLocalNode(),
495-
EsqlResolveNodesAction.TYPE.name(),
496-
new ResolveNodesRequest(shardIds),
497-
rootTask,
498-
TransportRequestOptions.EMPTY,
499-
new ActionListenerResponseHandler<>(listener.map(ResolveNodesResponse::nodes), ResolveNodesResponse::new, esqlExecutor)
500-
);
499+
ActionListener.completeWith(listener, () -> doResolveShards(shardIds));
500+
}
501+
502+
private Map<ShardId, List<DiscoveryNode>> doResolveShards(Set<ShardId> shardIds) {
503+
var project = projectResolver.getProjectState(clusterService.state());
504+
var nodes = Maps.<ShardId, List<DiscoveryNode>>newMapWithExpectedSize(shardIds.size());
505+
for (var shardId : shardIds) {
506+
nodes.put(
507+
shardId,
508+
project.routingTable()
509+
.shardRoutingTable(shardId)
510+
.allShards()
511+
.filter(shard -> shard.active() && shard.isSearchable())
512+
.map(shard -> project.cluster().nodes().get(shard.currentNodeId()))
513+
.toList()
514+
);
515+
}
516+
return nodes;
501517
}
502518
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@
5858
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
5959
import org.elasticsearch.xpack.esql.action.EsqlQueryRequestBuilder;
6060
import org.elasticsearch.xpack.esql.action.EsqlResolveFieldsAction;
61-
import org.elasticsearch.xpack.esql.action.EsqlResolveNodesAction;
6261
import org.elasticsearch.xpack.esql.action.EsqlSearchShardsAction;
6362
import org.elasticsearch.xpack.esql.action.RestEsqlAsyncQueryAction;
6463
import org.elasticsearch.xpack.esql.action.RestEsqlDeleteAsyncResultAction;
@@ -240,7 +239,6 @@ public List<ActionHandler> getActions() {
240239
new ActionHandler(XPackUsageFeatureAction.ESQL, EsqlUsageTransportAction.class),
241240
new ActionHandler(XPackInfoFeatureAction.ESQL, EsqlInfoTransportAction.class),
242241
new ActionHandler(EsqlResolveFieldsAction.TYPE, EsqlResolveFieldsAction.class),
243-
new ActionHandler(EsqlResolveNodesAction.TYPE, EsqlResolveNodesAction.class),
244242
new ActionHandler(EsqlSearchShardsAction.TYPE, EsqlSearchShardsAction.class),
245243
new ActionHandler(EsqlAsyncStopAction.INSTANCE, TransportEsqlAsyncStopAction.class),
246244
new ActionHandler(EsqlListQueriesAction.INSTANCE, TransportEsqlListQueriesAction.class),

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportActionServices.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package org.elasticsearch.xpack.esql.plugin;
99

1010
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
11+
import org.elasticsearch.cluster.project.ProjectResolver;
1112
import org.elasticsearch.cluster.service.ClusterService;
1213
import org.elasticsearch.compute.operator.exchange.ExchangeService;
1314
import org.elasticsearch.search.SearchService;
@@ -20,6 +21,7 @@ public record TransportActionServices(
2021
SearchService searchService,
2122
ExchangeService exchangeService,
2223
ClusterService clusterService,
24+
ProjectResolver projectResolver,
2325
IndexNameExpressionResolver indexNameExpressionResolver,
2426
UsageService usageService,
2527
InferenceRunner inferenceRunner

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.action.support.HandledTransportAction;
1616
import org.elasticsearch.client.internal.Client;
1717
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
18+
import org.elasticsearch.cluster.project.ProjectResolver;
1819
import org.elasticsearch.cluster.service.ClusterService;
1920
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2021
import org.elasticsearch.common.io.stream.StreamInput;
@@ -94,6 +95,7 @@ public TransportEsqlQueryAction(
9495
SearchService searchService,
9596
ExchangeService exchangeService,
9697
ClusterService clusterService,
98+
ProjectResolver projectResolver,
9799
ThreadPool threadPool,
98100
BigArrays bigArrays,
99101
BlockFactoryProvider blockFactoryProvider,
@@ -149,6 +151,7 @@ public TransportEsqlQueryAction(
149151
searchService,
150152
exchangeService,
151153
clusterService,
154+
projectResolver,
152155
indexNameExpressionResolver,
153156
usageService,
154157
new InferenceRunner(client)

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.elasticsearch.threadpool.TestThreadPool;
3535
import org.elasticsearch.transport.TransportService;
3636
import org.elasticsearch.xpack.esql.plugin.DataNodeRequestSender.NodeListener;
37-
import org.elasticsearch.xpack.esql.plugin.DataNodeRequestSender.TargetShards;
3837
import org.junit.After;
3938
import org.junit.Before;
4039

@@ -569,6 +568,8 @@ PlainActionFuture<ComputeResponse> sendRequests(
569568
Collections.emptyMap()
570569
);
571570
new DataNodeRequestSender(
571+
null,
572+
null,
572573
transportService,
573574
executor,
574575
task,

0 commit comments

Comments
 (0)