Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.compute.EsqlRefCountingListener;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
Expand All @@ -25,17 +25,17 @@
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.session.Configuration;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;

/**
* Manages computes across multiple clusters by sending {@link ClusterComputeRequest} to remote clusters and executing the computes.
Expand Down Expand Up @@ -63,46 +63,54 @@ final class ClusterComputeHandler implements TransportRequestHandler<ClusterComp
transportService.registerRequestHandler(ComputeService.CLUSTER_ACTION_NAME, esqlExecutor, ClusterComputeRequest::new, this);
}

void startComputeOnRemoteClusters(
void startComputeOnRemoteCluster(
String sessionId,
CancellableTask rootTask,
Configuration configuration,
PhysicalPlan plan,
ExchangeSourceHandler exchangeSource,
List<RemoteCluster> clusters,
ComputeListener computeListener
RemoteCluster cluster,
Runnable cancelQueryOnFailure,
ActionListener<ComputeResponse> listener
) {
var queryPragmas = configuration.pragmas();
var linkExchangeListeners = ActionListener.releaseAfter(computeListener.acquireAvoid(), exchangeSource.addEmptySink());
try (EsqlRefCountingListener refs = new EsqlRefCountingListener(linkExchangeListeners)) {
for (RemoteCluster cluster : clusters) {
final var childSessionId = computeService.newChildSession(sessionId);
ExchangeService.openExchange(
transportService,
cluster.connection,
childSessionId,
queryPragmas.exchangeBufferSize(),
esqlExecutor,
refs.acquire().delegateFailureAndWrap((l, unused) -> {
var remoteSink = exchangeService.newRemoteSink(rootTask, childSessionId, transportService, cluster.connection);
exchangeSource.addRemoteSink(remoteSink, true, queryPragmas.concurrentExchangeClients(), ActionListener.noop());
var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices);
var clusterRequest = new ClusterComputeRequest(cluster.clusterAlias, childSessionId, configuration, remotePlan);
var clusterListener = ActionListener.runBefore(
computeListener.acquireCompute(cluster.clusterAlias()),
() -> l.onResponse(null)
);
transportService.sendChildRequest(
cluster.connection,
ComputeService.CLUSTER_ACTION_NAME,
clusterRequest,
rootTask,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor)
);
})
);
}
listener = ActionListener.runBefore(listener, exchangeSource.addEmptySink()::close);
final var childSessionId = computeService.newChildSession(sessionId);
final AtomicReference<ComputeResponse> finalResponse = new AtomicReference<>();
try (var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> {
var resp = finalResponse.get();
return Objects.requireNonNullElseGet(resp, () -> new ComputeResponse(profiles));
}))) {
ExchangeService.openExchange(
transportService,
cluster.connection,
childSessionId,
queryPragmas.exchangeBufferSize(),
esqlExecutor,
computeListener.acquireCompute().delegateFailureAndWrap((l, unused) -> {
var remoteSink = exchangeService.newRemoteSink(rootTask, childSessionId, transportService, cluster.connection);
exchangeSource.addRemoteSink(
remoteSink,
true,
queryPragmas.concurrentExchangeClients(),
computeListener.acquireAvoid()
);
var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices);
var clusterRequest = new ClusterComputeRequest(cluster.clusterAlias, childSessionId, configuration, remotePlan);
final ActionListener<ComputeResponse> clusterListener = l.map(r -> {
finalResponse.set(r);
return r.getProfiles();
});
transportService.sendChildRequest(
cluster.connection,
ComputeService.CLUSTER_ACTION_NAME,
clusterRequest,
rootTask,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor)
);
})
);
}
}

Expand Down Expand Up @@ -141,28 +149,16 @@ public void messageReceived(ClusterComputeRequest request, TransportChannel chan
listener.onFailure(new IllegalStateException("expected exchange sink for a remote compute; got " + plan));
return;
}
String clusterAlias = request.clusterAlias();
/*
* This handler runs only on remote cluster coordinators, so it creates a new local EsqlExecutionInfo object to record
* execution metadata for ES|QL processing local to this cluster. The execution info will be copied into the
* ComputeResponse that is sent back to the primary coordinating cluster.
*/
EsqlExecutionInfo execInfo = new EsqlExecutionInfo(true);
execInfo.swapCluster(clusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(clusterAlias, Arrays.toString(request.indices())));
CancellableTask cancellable = (CancellableTask) task;
try (var computeListener = ComputeListener.create(clusterAlias, transportService, cancellable, execInfo, listener)) {
runComputeOnRemoteCluster(
clusterAlias,
request.sessionId(),
(CancellableTask) task,
request.configuration(),
(ExchangeSinkExec) plan,
Set.of(remoteClusterPlan.targetIndices()),
remoteClusterPlan.originalIndices(),
execInfo,
computeListener
);
}
runComputeOnRemoteCluster(
request.clusterAlias(),
request.sessionId(),
(CancellableTask) task,
request.configuration(),
(ExchangeSinkExec) plan,
Set.of(remoteClusterPlan.targetIndices()),
remoteClusterPlan.originalIndices(),
listener
);
}

/**
Expand All @@ -182,48 +178,59 @@ void runComputeOnRemoteCluster(
ExchangeSinkExec plan,
Set<String> concreteIndices,
OriginalIndices originalIndices,
EsqlExecutionInfo executionInfo,
ComputeListener computeListener
ActionListener<ComputeResponse> listener
) {
final var exchangeSink = exchangeService.getSinkHandler(globalSessionId);
parentTask.addListener(
() -> exchangeService.finishSinkHandler(globalSessionId, new TaskCancelledException(parentTask.getReasonCancelled()))
);
final String localSessionId = clusterAlias + ":" + globalSessionId;
final PhysicalPlan coordinatorPlan = ComputeService.reductionPlan(plan, true);
var exchangeSource = new ExchangeSourceHandler(
configuration.pragmas().exchangeBufferSize(),
transportService.getThreadPool().executor(ThreadPool.Names.SEARCH),
computeListener.acquireAvoid()
);
try (Releasable ignored = exchangeSource.addEmptySink()) {
exchangeSink.addCompletionListener(computeListener.acquireAvoid());
computeService.runCompute(
parentTask,
new ComputeContext(
final AtomicReference<ComputeResponse> finalResponse = new AtomicReference<>();
final long startTimeInNanos = System.nanoTime();
final Runnable cancelQueryOnFailure = computeService.cancelQueryOnFailure(parentTask);
try (var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> {
final TimeValue took = TimeValue.timeValueNanos(System.nanoTime() - startTimeInNanos);
final ComputeResponse r = finalResponse.get();
return new ComputeResponse(profiles, took, r.totalShards, r.successfulShards, r.skippedShards, r.failedShards);
}))) {
var exchangeSource = new ExchangeSourceHandler(
configuration.pragmas().exchangeBufferSize(),
transportService.getThreadPool().executor(ThreadPool.Names.SEARCH),
computeListener.acquireAvoid()
);
try (Releasable ignored = exchangeSource.addEmptySink()) {
exchangeSink.addCompletionListener(computeListener.acquireAvoid());
computeService.runCompute(
parentTask,
new ComputeContext(
localSessionId,
clusterAlias,
List.of(),
configuration,
configuration.newFoldContext(),
exchangeSource,
exchangeSink
),
coordinatorPlan,
computeListener.acquireCompute()
);
dataNodeComputeHandler.startComputeOnDataNodes(
localSessionId,
clusterAlias,
List.of(),
parentTask,
configuration,
configuration.newFoldContext(),
plan,
concreteIndices,
originalIndices,
exchangeSource,
exchangeSink
),
coordinatorPlan,
computeListener.acquireCompute(clusterAlias)
);
dataNodeComputeHandler.startComputeOnDataNodes(
localSessionId,
clusterAlias,
parentTask,
configuration,
plan,
concreteIndices,
originalIndices,
exchangeSource,
executionInfo,
computeListener
);
cancelQueryOnFailure,
computeListener.acquireCompute().map(r -> {
finalResponse.set(r);
return r.getProfiles();
})
);
}
}
}

Expand Down
Loading