Skip to content

Commit 6e5250e

Browse files
committed
Refactor sender
1 parent 6c67d6a commit 6e5250e

File tree

2 files changed

+231
-128
lines changed

2 files changed

+231
-128
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

Lines changed: 52 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -7,30 +7,23 @@
77

88
package org.elasticsearch.xpack.esql.plugin;
99

10-
import org.elasticsearch.TransportVersions;
1110
import org.elasticsearch.action.ActionListener;
1211
import org.elasticsearch.action.ActionListenerResponseHandler;
1312
import org.elasticsearch.action.ActionRunnable;
1413
import org.elasticsearch.action.OriginalIndices;
15-
import org.elasticsearch.action.search.SearchShardsGroup;
16-
import org.elasticsearch.action.search.SearchShardsRequest;
17-
import org.elasticsearch.action.search.SearchShardsResponse;
1814
import org.elasticsearch.action.support.ChannelActionListener;
1915
import org.elasticsearch.action.support.RefCountingRunnable;
20-
import org.elasticsearch.cluster.node.DiscoveryNode;
2116
import org.elasticsearch.compute.operator.DriverProfile;
2217
import org.elasticsearch.compute.operator.exchange.ExchangeService;
2318
import org.elasticsearch.compute.operator.exchange.ExchangeSink;
2419
import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler;
2520
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
2621
import org.elasticsearch.core.IOUtils;
2722
import org.elasticsearch.core.Releasable;
28-
import org.elasticsearch.core.TimeValue;
2923
import org.elasticsearch.index.Index;
3024
import org.elasticsearch.index.query.QueryBuilder;
3125
import org.elasticsearch.index.shard.IndexShard;
3226
import org.elasticsearch.index.shard.ShardId;
33-
import org.elasticsearch.index.shard.ShardNotFoundException;
3427
import org.elasticsearch.search.SearchService;
3528
import org.elasticsearch.search.internal.AliasFilter;
3629
import org.elasticsearch.search.internal.SearchContext;
@@ -39,12 +32,10 @@
3932
import org.elasticsearch.tasks.Task;
4033
import org.elasticsearch.tasks.TaskCancelledException;
4134
import org.elasticsearch.threadpool.ThreadPool;
42-
import org.elasticsearch.transport.Transport;
4335
import org.elasticsearch.transport.TransportChannel;
4436
import org.elasticsearch.transport.TransportRequestHandler;
4537
import org.elasticsearch.transport.TransportRequestOptions;
4638
import org.elasticsearch.transport.TransportService;
47-
import org.elasticsearch.xpack.esql.action.EsqlSearchShardsAction;
4839
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
4940
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
5041
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
@@ -59,6 +50,10 @@
5950
import java.util.concurrent.Executor;
6051
import java.util.concurrent.atomic.AtomicBoolean;
6152
import java.util.concurrent.atomic.AtomicInteger;
53+
import java.util.concurrent.atomic.AtomicLong;
54+
import java.util.concurrent.atomic.AtomicReference;
55+
import java.util.function.Function;
56+
import java.util.stream.Collectors;
6257

6358
import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME;
6459

@@ -72,6 +67,7 @@ final class DataNodeComputeHandler implements TransportRequestHandler<DataNodeRe
7267
private final TransportService transportService;
7368
private final ExchangeService exchangeService;
7469
private final Executor esqlExecutor;
70+
private final ThreadPool threadPool;
7571

7672
DataNodeComputeHandler(
7773
ComputeService computeService,
@@ -85,6 +81,7 @@ final class DataNodeComputeHandler implements TransportRequestHandler<DataNodeRe
8581
this.transportService = transportService;
8682
this.exchangeService = exchangeService;
8783
this.esqlExecutor = esqlExecutor;
84+
this.threadPool = transportService.getThreadPool();
8885
transportService.registerRequestHandler(ComputeService.DATA_ACTION_NAME, esqlExecutor, DataNodeRequest::new, this);
8986
}
9087

@@ -100,151 +97,78 @@ void startComputeOnDataNodes(
10097
Runnable runOnTaskFailure,
10198
ActionListener<ComputeResponse> outListener
10299
) {
103-
QueryBuilder requestFilter = PlannerUtils.requestTimestampFilter(dataNodePlan);
104-
var listener = ActionListener.runAfter(outListener, exchangeSource.addEmptySink()::close);
105-
final long startTimeInNanos = System.nanoTime();
106-
lookupDataNodes(parentTask, clusterAlias, requestFilter, concreteIndices, originalIndices, ActionListener.wrap(dataNodeResult -> {
107-
try (var computeListener = new ComputeListener(transportService.getThreadPool(), runOnTaskFailure, listener.map(profiles -> {
108-
TimeValue took = TimeValue.timeValueNanos(System.nanoTime() - startTimeInNanos);
109-
return new ComputeResponse(
110-
profiles,
111-
took,
112-
dataNodeResult.totalShards(),
113-
dataNodeResult.totalShards(),
114-
dataNodeResult.skippedShards(),
115-
0
116-
);
117-
}))) {
100+
DataNodeRequestSender sender = new DataNodeRequestSender(
101+
transportService,
102+
esqlExecutor,
103+
(connection, shardIds, aliasFilters, listener) -> {
104+
var queryPragmas = configuration.pragmas();
105+
var childSessionId = computeService.newChildSession(sessionId);
118106
// For each target node, first open a remote exchange on the remote node, then link the exchange source to
119107
// the new remote exchange sink, and initialize the computation on the target node via data-node-request.
120-
for (DataNode node : dataNodeResult.dataNodes()) {
121-
var queryPragmas = configuration.pragmas();
122-
var childSessionId = computeService.newChildSession(sessionId);
123-
ActionListener<DataNodeComputeResponse> nodeListener = computeListener.acquireCompute()
124-
.map(DataNodeComputeResponse::profiles);
125-
ExchangeService.openExchange(
126-
transportService,
127-
node.connection,
128-
childSessionId,
129-
queryPragmas.exchangeBufferSize(),
130-
esqlExecutor,
131-
nodeListener.delegateFailureAndWrap((l, unused) -> {
132-
var remoteSink = exchangeService.newRemoteSink(parentTask, childSessionId, transportService, node.connection);
108+
ExchangeService.openExchange(
109+
transportService,
110+
connection,
111+
childSessionId,
112+
queryPragmas.exchangeBufferSize(),
113+
esqlExecutor,
114+
listener.delegateFailureAndWrap((l, unused) -> {
115+
final AtomicLong pagesFetched = new AtomicLong();
116+
final AtomicReference<DataNodeComputeResponse> nodeResponseRef = new AtomicReference<>();
117+
try (var computeListener = new ComputeListener(threadPool, runOnTaskFailure, ActionListener.wrap(r -> {
118+
l.onResponse(nodeResponseRef.get());
119+
}, e -> {
120+
if (pagesFetched.get() == 0 && false) {
121+
var shardFailures = shardIds.stream().collect(Collectors.toMap(Function.identity(), ignored -> e));
122+
l.onResponse(new DataNodeComputeResponse(List.of(), shardFailures));
123+
} else {
124+
l.onFailure(e);
125+
}
126+
}))) {
127+
final var remoteSink = exchangeService.newRemoteSink(parentTask, childSessionId, transportService, connection);
133128
exchangeSource.addRemoteSink(
134129
remoteSink,
135130
true,
136-
() -> {},
131+
pagesFetched::incrementAndGet,
137132
queryPragmas.concurrentExchangeClients(),
138133
computeListener.acquireAvoid()
139134
);
140-
final boolean sameNode = transportService.getLocalNode().getId().equals(node.connection.getNode().getId());
135+
final boolean sameNode = transportService.getLocalNode().getId().equals(connection.getNode().getId());
141136
var dataNodeRequest = new DataNodeRequest(
142137
childSessionId,
143138
configuration,
144139
clusterAlias,
145-
node.shardIds,
146-
node.aliasFilters,
140+
shardIds,
141+
aliasFilters,
147142
dataNodePlan,
148143
originalIndices.indices(),
149144
originalIndices.indicesOptions(),
150145
sameNode == false && queryPragmas.nodeLevelReduction()
151146
);
152147
transportService.sendChildRequest(
153-
node.connection,
148+
connection,
154149
ComputeService.DATA_ACTION_NAME,
155150
dataNodeRequest,
156151
parentTask,
157152
TransportRequestOptions.EMPTY,
158-
new ActionListenerResponseHandler<>(nodeListener, DataNodeComputeResponse::new, esqlExecutor)
153+
new ActionListenerResponseHandler<>(computeListener.acquireCompute().map(r -> {
154+
nodeResponseRef.set(r);
155+
return r.profiles();
156+
}), DataNodeComputeResponse::new, esqlExecutor)
159157
);
160-
})
161-
);
162-
}
163-
}
164-
}, listener::onFailure));
165-
}
166-
167-
record DataNode(Transport.Connection connection, List<ShardId> shardIds, Map<Index, AliasFilter> aliasFilters) {
168-
169-
}
170-
171-
/**
172-
* Result from lookupDataNodes where can_match is performed to determine what shards can be skipped
173-
* and which target nodes are needed for running the ES|QL query
174-
*
175-
* @param dataNodes list of DataNode to perform the ES|QL query on
176-
* @param totalShards Total number of shards (from can_match phase), including skipped shards
177-
* @param skippedShards Number of skipped shards (from can_match phase)
178-
*/
179-
record DataNodeResult(List<DataNode> dataNodes, int totalShards, int skippedShards) {}
180-
181-
/**
182-
* Performs can_match and find the target nodes for the given target indices and filter.
183-
* <p>
184-
* Ideally, the search_shards API should be called before the field-caps API; however, this can lead
185-
* to a situation where the column structure (i.e., matched data types) differs depending on the query.
186-
*/
187-
private void lookupDataNodes(
188-
Task parentTask,
189-
String clusterAlias,
190-
QueryBuilder filter,
191-
Set<String> concreteIndices,
192-
OriginalIndices originalIndices,
193-
ActionListener<DataNodeResult> listener
194-
) {
195-
ActionListener<SearchShardsResponse> searchShardsListener = listener.map(resp -> {
196-
Map<String, DiscoveryNode> nodes = new HashMap<>();
197-
for (DiscoveryNode node : resp.getNodes()) {
198-
nodes.put(node.getId(), node);
199-
}
200-
Map<String, List<ShardId>> nodeToShards = new HashMap<>();
201-
Map<String, Map<Index, AliasFilter>> nodeToAliasFilters = new HashMap<>();
202-
int totalShards = 0;
203-
int skippedShards = 0;
204-
for (SearchShardsGroup group : resp.getGroups()) {
205-
var shardId = group.shardId();
206-
if (group.allocatedNodes().isEmpty()) {
207-
throw new ShardNotFoundException(group.shardId(), "no shard copies found {}", group.shardId());
208-
}
209-
if (concreteIndices.contains(shardId.getIndexName()) == false) {
210-
continue;
211-
}
212-
totalShards++;
213-
if (group.skipped()) {
214-
skippedShards++;
215-
continue;
216-
}
217-
String targetNode = group.allocatedNodes().get(0);
218-
nodeToShards.computeIfAbsent(targetNode, k -> new ArrayList<>()).add(shardId);
219-
AliasFilter aliasFilter = resp.getAliasFilters().get(shardId.getIndex().getUUID());
220-
if (aliasFilter != null) {
221-
nodeToAliasFilters.computeIfAbsent(targetNode, k -> new HashMap<>()).put(shardId.getIndex(), aliasFilter);
222-
}
223-
}
224-
List<DataNode> dataNodes = new ArrayList<>(nodeToShards.size());
225-
for (Map.Entry<String, List<ShardId>> e : nodeToShards.entrySet()) {
226-
DiscoveryNode node = nodes.get(e.getKey());
227-
Map<Index, AliasFilter> aliasFilters = nodeToAliasFilters.getOrDefault(e.getKey(), Map.of());
228-
dataNodes.add(new DataNode(transportService.getConnection(node), e.getValue(), aliasFilters));
158+
}
159+
})
160+
);
229161
}
230-
return new DataNodeResult(dataNodes, totalShards, skippedShards);
231-
});
232-
SearchShardsRequest searchShardsRequest = new SearchShardsRequest(
233-
originalIndices.indices(),
234-
originalIndices.indicesOptions(),
235-
filter,
236-
null,
237-
null,
238-
false,
239-
clusterAlias
240162
);
241-
transportService.sendChildRequest(
242-
transportService.getLocalNode(),
243-
EsqlSearchShardsAction.TYPE.name(),
244-
searchShardsRequest,
163+
QueryBuilder requestFilter = PlannerUtils.requestTimestampFilter(dataNodePlan);
164+
sender.startComputeOnDataNodes(
165+
clusterAlias,
245166
parentTask,
246-
TransportRequestOptions.EMPTY,
247-
new ActionListenerResponseHandler<>(searchShardsListener, SearchShardsResponse::new, esqlExecutor)
167+
concreteIndices,
168+
originalIndices,
169+
requestFilter,
170+
runOnTaskFailure,
171+
ActionListener.runAfter(outListener, exchangeSource.addEmptySink()::close)
248172
);
249173
}
250174

0 commit comments

Comments
 (0)