Skip to content

Commit f94bcfb

Browse files
committed
Sender with retry
1 parent 6e5250e commit f94bcfb

File tree

4 files changed

+521
-74
lines changed

4 files changed

+521
-74
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public FailureCollector(int maxExceptions) {
4545
this.nonCancelledExceptionsPermits = new Semaphore(maxExceptions);
4646
}
4747

48-
private static Exception unwrapTransportException(TransportException te) {
48+
public static Exception unwrapTransportException(TransportException te) {
4949
final Throwable cause = te.getCause();
5050
if (cause == null) {
5151
return te;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@
77

88
package org.elasticsearch.xpack.esql.plugin;
99

10+
import org.elasticsearch.TransportVersions;
1011
import org.elasticsearch.action.ActionListener;
1112
import org.elasticsearch.action.ActionListenerResponseHandler;
1213
import org.elasticsearch.action.ActionRunnable;
1314
import org.elasticsearch.action.OriginalIndices;
1415
import org.elasticsearch.action.support.ChannelActionListener;
1516
import org.elasticsearch.action.support.RefCountingRunnable;
17+
import org.elasticsearch.cluster.node.DiscoveryNode;
1618
import org.elasticsearch.compute.operator.DriverProfile;
1719
import org.elasticsearch.compute.operator.exchange.ExchangeService;
1820
import org.elasticsearch.compute.operator.exchange.ExchangeSink;
@@ -21,7 +23,6 @@
2123
import org.elasticsearch.core.IOUtils;
2224
import org.elasticsearch.core.Releasable;
2325
import org.elasticsearch.index.Index;
24-
import org.elasticsearch.index.query.QueryBuilder;
2526
import org.elasticsearch.index.shard.IndexShard;
2627
import org.elasticsearch.index.shard.ShardId;
2728
import org.elasticsearch.search.SearchService;
@@ -32,6 +33,7 @@
3233
import org.elasticsearch.tasks.Task;
3334
import org.elasticsearch.tasks.TaskCancelledException;
3435
import org.elasticsearch.threadpool.ThreadPool;
36+
import org.elasticsearch.transport.Transport;
3537
import org.elasticsearch.transport.TransportChannel;
3638
import org.elasticsearch.transport.TransportRequestHandler;
3739
import org.elasticsearch.transport.TransportRequestOptions;
@@ -52,8 +54,6 @@
5254
import java.util.concurrent.atomic.AtomicInteger;
5355
import java.util.concurrent.atomic.AtomicLong;
5456
import java.util.concurrent.atomic.AtomicReference;
55-
import java.util.function.Function;
56-
import java.util.stream.Collectors;
5757

5858
import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME;
5959

@@ -97,10 +97,23 @@ void startComputeOnDataNodes(
9797
Runnable runOnTaskFailure,
9898
ActionListener<ComputeResponse> outListener
9999
) {
100-
DataNodeRequestSender sender = new DataNodeRequestSender(
101-
transportService,
102-
esqlExecutor,
103-
(connection, shardIds, aliasFilters, listener) -> {
100+
DataNodeRequestSender sender = new DataNodeRequestSender(transportService, esqlExecutor, parentTask) {
101+
@Override
102+
protected void sendRequest(
103+
DiscoveryNode node,
104+
List<ShardId> shardIds,
105+
Map<Index, AliasFilter> aliasFilters,
106+
NodeListener nodeListener
107+
) {
108+
final AtomicLong pagesFetched = new AtomicLong();
109+
var listener = ActionListener.wrap(nodeListener::onResponse, e -> nodeListener.onFailure(e, pagesFetched.get() > 0));
110+
final Transport.Connection connection;
111+
try {
112+
connection = transportService.getConnection(node);
113+
} catch (Exception e) {
114+
listener.onFailure(e);
115+
return;
116+
}
104117
var queryPragmas = configuration.pragmas();
105118
var childSessionId = computeService.newChildSession(sessionId);
106119
// For each target node, first open a remote exchange on the remote node, then link the exchange source to
@@ -112,18 +125,10 @@ void startComputeOnDataNodes(
112125
queryPragmas.exchangeBufferSize(),
113126
esqlExecutor,
114127
listener.delegateFailureAndWrap((l, unused) -> {
115-
final AtomicLong pagesFetched = new AtomicLong();
116128
final AtomicReference<DataNodeComputeResponse> nodeResponseRef = new AtomicReference<>();
117-
try (var computeListener = new ComputeListener(threadPool, runOnTaskFailure, ActionListener.wrap(r -> {
118-
l.onResponse(nodeResponseRef.get());
119-
}, e -> {
120-
if (pagesFetched.get() == 0 && false) {
121-
var shardFailures = shardIds.stream().collect(Collectors.toMap(Function.identity(), ignored -> e));
122-
l.onResponse(new DataNodeComputeResponse(List.of(), shardFailures));
123-
} else {
124-
l.onFailure(e);
125-
}
126-
}))) {
129+
try (
130+
var computeListener = new ComputeListener(threadPool, runOnTaskFailure, l.map(ignored -> nodeResponseRef.get()))
131+
) {
127132
final var remoteSink = exchangeService.newRemoteSink(parentTask, childSessionId, transportService, connection);
128133
exchangeSource.addRemoteSink(
129134
remoteSink,
@@ -159,14 +164,12 @@ void startComputeOnDataNodes(
159164
})
160165
);
161166
}
162-
);
163-
QueryBuilder requestFilter = PlannerUtils.requestTimestampFilter(dataNodePlan);
167+
};
164168
sender.startComputeOnDataNodes(
165169
clusterAlias,
166-
parentTask,
167170
concreteIndices,
168171
originalIndices,
169-
requestFilter,
172+
PlannerUtils.requestTimestampFilter(dataNodePlan),
170173
runOnTaskFailure,
171174
ActionListener.runAfter(outListener, exchangeSource.addEmptySink()::close)
172175
);
@@ -445,7 +448,7 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T
445448
request.runNodeLevelReduction()
446449
);
447450
// the sender doesn't support retry on shard failures, so we need to fail fast here.
448-
final boolean failFastOnShardFailures = true; // channel.getVersion().before(TransportVersions.ESQL_RETRY_ON_SHARD_LEVEL_FAILURE);
451+
final boolean failFastOnShardFailures = channel.getVersion().before(TransportVersions.ESQL_RETRY_ON_SHARD_LEVEL_FAILURE);
449452
runComputeOnDataNode((CancellableTask) task, sessionId, reductionPlan, request, failFastOnShardFailures, listener);
450453
}
451454
}

0 commit comments

Comments
 (0)