Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/122708.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 122708
summary: Support partial results in CCS in ES|QL
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public final class ExchangeService extends AbstractLifecycleComponent {
public static final String EXCHANGE_ACTION_NAME = "internal:data/read/esql/exchange";
public static final String EXCHANGE_ACTION_NAME_FOR_CCS = "cluster:internal:data/read/esql/exchange";

private static final String OPEN_EXCHANGE_ACTION_NAME = "internal:data/read/esql/open_exchange";
public static final String OPEN_EXCHANGE_ACTION_NAME = "internal:data/read/esql/open_exchange";
private static final String OPEN_EXCHANGE_ACTION_NAME_FOR_CCS = "cluster:internal:data/read/esql/open_exchange";

/**
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
import org.elasticsearch.core.Releasable;
Expand All @@ -34,10 +35,11 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

/**
* Manages computes across multiple clusters by sending {@link ClusterComputeRequest} to remote clusters and executing the computes.
Expand Down Expand Up @@ -74,69 +76,107 @@ void startComputeOnRemoteCluster(
RemoteCluster cluster,
Runnable cancelQueryOnFailure,
EsqlExecutionInfo executionInfo,
ActionListener<ComputeResponse> listener
ActionListener<List<DriverProfile>> listener
) {
var queryPragmas = configuration.pragmas();
listener = ActionListener.runBefore(listener, exchangeSource.addEmptySink()::close);
final var childSessionId = computeService.newChildSession(sessionId);
final AtomicReference<ComputeResponse> finalResponse = new AtomicReference<>();
final String clusterAlias = cluster.clusterAlias();
try (var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> {
var resp = finalResponse.get();
return Objects.requireNonNullElseGet(resp, () -> new ComputeResponse(profiles));
}))) {
ExchangeService.openExchange(
transportService,
cluster.connection,
childSessionId,
queryPragmas.exchangeBufferSize(),
esqlExecutor,
EsqlCCSUtils.skipUnavailableListener(
computeListener.acquireAvoid(),
executionInfo,
clusterAlias,
EsqlExecutionInfo.Cluster.Status.SKIPPED
).delegateFailureAndWrap((l, unused) -> {
var listenerGroup = new RemoteListenerGroup(
transportService,
rootTask,
computeListener,
clusterAlias,
executionInfo,
l
);

var remoteSink = exchangeService.newRemoteSink(
listenerGroup.getGroupTask(),
childSessionId,
transportService,
cluster.connection
);
final AtomicInteger pagesFetched = new AtomicInteger();
final AtomicReference<ComputeResponse> finalResponse = new AtomicReference<>();
listener = listener.delegateResponse((l, e) -> {
final boolean receivedResults = finalResponse.get() != null || pagesFetched.get() > 0;
if (receivedResults == false && EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) {
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e);
l.onResponse(List.of());
} else if (configuration.allowPartialResults()) {
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL, e);
l.onResponse(List.of());
} else {
l.onFailure(e);
}
});
ExchangeService.openExchange(
transportService,
cluster.connection,
childSessionId,
queryPragmas.exchangeBufferSize(),
esqlExecutor,
listener.delegateFailure((l, unused) -> {
final CancellableTask groupTask;
final Runnable onGroupFailure;
boolean failFast = executionInfo.isSkipUnavailable(clusterAlias) == false && configuration.allowPartialResults() == false;
if (failFast) {
groupTask = rootTask;
onGroupFailure = cancelQueryOnFailure;
} else {
groupTask = computeService.createGroupTask(rootTask, () -> "compute group: cluster [" + clusterAlias + "]");
onGroupFailure = computeService.cancelQueryOnFailure(groupTask);
l = ActionListener.runAfter(l, () -> transportService.getTaskManager().unregister(groupTask));
}
try (var computeListener = new ComputeListener(transportService.getThreadPool(), onGroupFailure, l.map(profiles -> {
updateExecutionInfo(executionInfo, clusterAlias, finalResponse.get());
return profiles;
}))) {
var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, cluster.connection);
exchangeSource.addRemoteSink(
remoteSink,
executionInfo.isSkipUnavailable(clusterAlias) == false,
() -> {},
failFast,
pagesFetched::incrementAndGet,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we're only using pagesFetched as a boolean flag, and in fact only every use onPageFetched to essentially trigger this flag. Is deeper usage planned in the future, or if not maybe it can be simplified somehow?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a boolean is good enough. I pushed 9283823. Thanks!

queryPragmas.concurrentExchangeClients(),
listenerGroup.getExchangeRequestListener()
computeListener.acquireAvoid()
);
var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices);
var clusterRequest = new ClusterComputeRequest(clusterAlias, childSessionId, configuration, remotePlan);
final ActionListener<ComputeResponse> clusterListener = listenerGroup.getClusterRequestListener().map(r -> {
final ActionListener<ComputeResponse> clusterListener = computeListener.acquireCompute().map(r -> {
finalResponse.set(r);
return r.getProfiles();
});
transportService.sendChildRequest(
cluster.connection,
ComputeService.CLUSTER_ACTION_NAME,
clusterRequest,
listenerGroup.getGroupTask(),
groupTask,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor)
);
})
}
})
);
}

private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String clusterAlias, ComputeResponse resp) {
Function<EsqlExecutionInfo.Cluster.Status, EsqlExecutionInfo.Cluster.Status> runningToSuccess = status -> {
if (status != EsqlExecutionInfo.Cluster.Status.RUNNING) {
return status;
} else if (executionInfo.isStopped() || resp.failedShards > 0) {
return EsqlExecutionInfo.Cluster.Status.PARTIAL;
} else {
return EsqlExecutionInfo.Cluster.Status.SUCCESSFUL;
}
};
if (resp.getTook() != null) {
var tookTime = TimeValue.timeValueNanos(executionInfo.planningTookTime().nanos() + resp.getTook().nanos());
executionInfo.swapCluster(
clusterAlias,
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(runningToSuccess.apply(v.getStatus()))
.setTook(tookTime)
.setTotalShards(resp.getTotalShards())
.setSuccessfulShards(resp.getSuccessfulShards())
.setSkippedShards(resp.getSkippedShards())
.setFailedShards(resp.getFailedShards())
.build()
);
} else {
// if the cluster is an older version and does not send back took time, then calculate it here on the coordinator
// and leave shard info unset, so it is not shown in the CCS metadata section of the JSON response
executionInfo.swapCluster(
clusterAlias,
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(runningToSuccess.apply(v.getStatus()))
.setTook(executionInfo.tookSoFar())
.build()
);
}

}

List<RemoteCluster> getRemoteClusters(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.lookup.SourceProvider;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
Expand All @@ -57,7 +61,6 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;

import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME;
Expand Down Expand Up @@ -211,7 +214,8 @@ public void execute(
computeListener.acquireCompute().delegateFailure((l, profiles) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, here I am not 100% sure of the intended logic - for a remote, it looks like if configuration.allowPartialResults() is true and any result is delivered, any error is converted to partial. Is that also the case for local results? If yes, I don't see what checks it in the code. If no, then it looks like local and remote behaves differently with regard to partial failure logic?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the search_shards can fail when a node shuts down, and I've pushed d3be47f to handle this case. We also need to handle partial results for the final pipeline, but that's for a follow-up.

if (execInfo.clusterInfo.containsKey(LOCAL_CLUSTER)) {
var tookTime = TimeValue.timeValueNanos(System.nanoTime() - execInfo.getRelativeStartNanos());
var status = localClusterWasInterrupted.get()
final Integer failedShards = execInfo.getCluster(LOCAL_CLUSTER).getFailedShards();
var status = localClusterWasInterrupted.get() || (failedShards != null && failedShards > 0)
? EsqlExecutionInfo.Cluster.Status.PARTIAL
: EsqlExecutionInfo.Cluster.Status.SUCCESSFUL;
execInfo.swapCluster(
Expand Down Expand Up @@ -277,48 +281,13 @@ public void execute(
cluster,
cancelQueryOnFailure,
execInfo,
computeListener.acquireCompute().map(r -> {
updateExecutionInfo(execInfo, cluster.clusterAlias(), r);
return r.getProfiles();
})
computeListener.acquireCompute()
);
}
}
}
}

private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String clusterAlias, ComputeResponse resp) {
Function<EsqlExecutionInfo.Cluster.Status, EsqlExecutionInfo.Cluster.Status> runningToSuccess = status -> {
if (status == EsqlExecutionInfo.Cluster.Status.RUNNING) {
return executionInfo.isStopped() ? EsqlExecutionInfo.Cluster.Status.PARTIAL : EsqlExecutionInfo.Cluster.Status.SUCCESSFUL;
} else {
return status;
}
};
if (resp.getTook() != null) {
var tookTime = TimeValue.timeValueNanos(executionInfo.planningTookTime().nanos() + resp.getTook().nanos());
executionInfo.swapCluster(
clusterAlias,
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(runningToSuccess.apply(v.getStatus()))
.setTook(tookTime)
.setTotalShards(resp.getTotalShards())
.setSuccessfulShards(resp.getSuccessfulShards())
.setSkippedShards(resp.getSkippedShards())
.setFailedShards(resp.getFailedShards())
.build()
);
} else {
// if the cluster is an older version and does not send back took time, then calculate it here on the coordinator
// and leave shard info unset, so it is not shown in the CCS metadata section of the JSON response
executionInfo.swapCluster(
clusterAlias,
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(runningToSuccess.apply(v.getStatus()))
.setTook(executionInfo.tookSoFar())
.build()
);
}
}

// For queries like: FROM logs* | LIMIT 0 (including cross-cluster LIMIT 0 queries)
private static void updateShardCountForCoordinatorOnlyQuery(EsqlExecutionInfo execInfo) {
if (execInfo.isCrossClusterSearch()) {
Expand Down Expand Up @@ -444,4 +413,33 @@ Runnable cancelQueryOnFailure(CancellableTask task) {
transportService.getTaskManager().cancelTaskAndDescendants(task, "cancelled on failure", false, ActionListener.noop());
});
}

CancellableTask createGroupTask(Task parentTask, Supplier<String> description) {
final TaskManager taskManager = transportService.getTaskManager();
return (CancellableTask) taskManager.register(
"transport",
"esql_compute_group",
new ComputeGroupTaskRequest(parentTask.taskInfo(transportService.getLocalNode().getId(), false).taskId(), description)
);
}

private static class ComputeGroupTaskRequest extends TransportRequest {
private final Supplier<String> parentDescription;

ComputeGroupTaskRequest(TaskId parentTask, Supplier<String> description) {
this.parentDescription = description;
setParentTask(parentTask);
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
assert parentTaskId.isSet();
return new CancellableTask(id, type, action, "", parentTaskId, headers);
}

@Override
public String getDescription() {
return "group [" + parentDescription.get() + "]";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ protected void sendRequest(
final Runnable onGroupFailure;
final CancellableTask groupTask;
if (allowPartialResults) {
groupTask = RemoteListenerGroup.createGroupTask(
transportService,
groupTask = computeService.createGroupTask(
parentTask,
() -> "compute group: data-node [" + node.getName() + "], " + shardIds + " [" + shardIds + "]"
);
Expand Down
Loading