Skip to content

Commit 2c5e548

Browse files
committed
Support partial_results in CCS
1 parent 54eab9d commit 2c5e548

File tree

8 files changed

+280
-258
lines changed

8 files changed

+280
-258
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java

Lines changed: 148 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.cluster.metadata.IndexMetadata;
1616
import org.elasticsearch.common.Priority;
1717
import org.elasticsearch.common.Strings;
18+
import org.elasticsearch.common.UUIDs;
1819
import org.elasticsearch.common.settings.Settings;
1920
import org.elasticsearch.compute.lucene.DataPartitioning;
2021
import org.elasticsearch.compute.operator.DriverProfile;
@@ -33,14 +34,17 @@
3334
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
3435

3536
import java.io.IOException;
37+
import java.util.Collection;
3638
import java.util.HashMap;
39+
import java.util.HashSet;
3740
import java.util.List;
3841
import java.util.Locale;
3942
import java.util.Map;
4043
import java.util.Set;
4144
import java.util.concurrent.CountDownLatch;
4245
import java.util.concurrent.TimeUnit;
4346
import java.util.stream.Collectors;
47+
import java.util.stream.Stream;
4448

4549
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
4650
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
@@ -50,6 +54,7 @@
5054
import static org.hamcrest.Matchers.greaterThan;
5155
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
5256
import static org.hamcrest.Matchers.hasSize;
57+
import static org.hamcrest.Matchers.in;
5358
import static org.hamcrest.Matchers.instanceOf;
5459
import static org.hamcrest.Matchers.is;
5560
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@@ -838,6 +843,108 @@ private static void assertClusterMetadataInResponse(EsqlQueryResponse resp, bool
838843
}
839844
}
840845

846+
public void testPartialResults() throws Exception {
847+
class Cluster {
848+
final int okShards = randomIntBetween(1, 5);
849+
final int failingShards = randomIntBetween(1, 5);
850+
Set<String> okIds;
851+
}
852+
Cluster local = new Cluster();
853+
Cluster remote1 = new Cluster();
854+
Cluster remote2 = new Cluster();
855+
856+
local.okIds = populateIndex(LOCAL_CLUSTER, "ok-local", local.okShards);
857+
populateIndexWithFailingFields(LOCAL_CLUSTER, "fail-local", local.failingShards);
858+
859+
remote1.okIds = populateIndex(REMOTE_CLUSTER_1, "ok-cluster1", remote1.okShards);
860+
populateIndexWithFailingFields(REMOTE_CLUSTER_1, "fail-cluster1", remote1.failingShards);
861+
862+
remote2.okIds = populateIndex(REMOTE_CLUSTER_2, "ok-cluster2", remote2.okShards);
863+
populateIndexWithFailingFields(REMOTE_CLUSTER_2, "fail-cluster2", remote2.failingShards);
864+
// allow_partial_results = false
865+
{
866+
EsqlQueryRequest request = new EsqlQueryRequest();
867+
request.query("FROM ok*,fail*,*:ok*,*:fail* | KEEP id, fail_me");
868+
request.allowPartialResults(false);
869+
IllegalStateException error = expectThrows(IllegalStateException.class, () -> runQuery(request).close());
870+
assertThat(error.getMessage(), containsString("Accessing failing field"));
871+
}
872+
// allow_partial_results = true
873+
{
874+
EsqlQueryRequest request = new EsqlQueryRequest();
875+
request.query("FROM ok*,fail*,*:ok*,*:fail* | KEEP id, fail_me");
876+
request.allowPartialResults(true);
877+
request.includeCCSMetadata(randomBoolean());
878+
try (var resp = runQuery(request)) {
879+
assertTrue(resp.isPartial());
880+
Set<String> allIds = Stream.of(local.okIds, remote1.okIds, remote2.okIds)
881+
.flatMap(Collection::stream)
882+
.collect(Collectors.toSet());
883+
List<List<Object>> rows = getValuesList(resp);
884+
assertThat(rows.size(), lessThanOrEqualTo(allIds.size()));
885+
Set<String> returnedIds = new HashSet<>();
886+
for (List<Object> row : rows) {
887+
assertThat(row.size(), equalTo(2));
888+
String id = (String) row.get(0);
889+
assertTrue(returnedIds.add(id));
890+
assertThat(id, is(in(allIds)));
891+
}
892+
if (request.includeCCSMetadata()) {
893+
EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(LOCAL_CLUSTER);
894+
assertThat(localInfo.getTotalShards(), equalTo(local.okShards + local.failingShards));
895+
assertThat(localInfo.getSuccessfulShards(), lessThanOrEqualTo(local.okShards));
896+
assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
897+
898+
EsqlExecutionInfo.Cluster remote1Info = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_1);
899+
assertThat(remote1Info.getTotalShards(), equalTo(remote1.okShards + remote1.failingShards));
900+
assertThat(remote1Info.getSuccessfulShards(), lessThanOrEqualTo(remote1.okShards));
901+
assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
902+
903+
EsqlExecutionInfo.Cluster remote2Info = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_2);
904+
assertThat(remote2Info.getTotalShards(), equalTo(remote2.okShards + remote1.failingShards));
905+
assertThat(remote2Info.getSuccessfulShards(), lessThanOrEqualTo(remote2.okShards));
906+
assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
907+
}
908+
}
909+
}
910+
// allow_partial_results = true
911+
{
912+
EsqlQueryRequest request = new EsqlQueryRequest();
913+
request.query("FROM ok*,cluster-a:ok*,remote-b:fail* | KEEP id, fail_me");
914+
request.allowPartialResults(true);
915+
request.includeCCSMetadata(randomBoolean());
916+
try (var resp = runQuery(request)) {
917+
assertTrue(resp.isPartial());
918+
Set<String> allIds = Stream.of(local.okIds, remote1.okIds).flatMap(Collection::stream).collect(Collectors.toSet());
919+
List<List<Object>> rows = getValuesList(resp);
920+
assertThat(rows.size(), equalTo(allIds.size()));
921+
Set<String> returnedIds = new HashSet<>();
922+
for (List<Object> row : rows) {
923+
assertThat(row.size(), equalTo(2));
924+
String id = (String) row.get(0);
925+
assertTrue(returnedIds.add(id));
926+
}
927+
assertThat(returnedIds, equalTo(allIds));
928+
if (request.includeCCSMetadata()) {
929+
EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(LOCAL_CLUSTER);
930+
assertThat(localInfo.getTotalShards(), equalTo(local.okShards));
931+
assertThat(localInfo.getSuccessfulShards(), equalTo(local.okShards));
932+
assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
933+
934+
EsqlExecutionInfo.Cluster remote1Info = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_1);
935+
assertThat(remote1Info.getTotalShards(), equalTo(remote1.okShards));
936+
assertThat(remote1Info.getSuccessfulShards(), equalTo(remote1.okShards));
937+
assertThat(remote1Info.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
938+
939+
EsqlExecutionInfo.Cluster remote2Info = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_2);
940+
assertThat(remote2Info.getTotalShards(), equalTo(remote1.failingShards));
941+
assertThat(remote2Info.getSuccessfulShards(), equalTo(0));
942+
assertThat(remote1Info.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
943+
}
944+
}
945+
}
946+
}
947+
841948
void waitForNoInitializingShards(Client client, TimeValue timeout, String... indices) {
842949
ClusterHealthResponse resp = client.admin()
843950
.cluster()
@@ -951,7 +1058,7 @@ Map<String, Object> setupFailClusters() throws IOException {
9511058
populateLocalIndices(LOCAL_INDEX, numShardsLocal);
9521059

9531060
int numShardsRemote = randomIntBetween(1, 3);
954-
populateRemoteIndicesFail(REMOTE_CLUSTER_1, REMOTE_INDEX, numShardsRemote);
1061+
populateIndexWithFailingFields(REMOTE_CLUSTER_1, REMOTE_INDEX, numShardsRemote);
9551062

9561063
Map<String, Object> clusterInfo = new HashMap<>();
9571064
clusterInfo.put("local.num_shards", numShardsLocal);
@@ -962,8 +1069,29 @@ Map<String, Object> setupFailClusters() throws IOException {
9621069
return clusterInfo;
9631070
}
9641071

965-
void populateRemoteIndicesFail(String clusterAlias, String indexName, int numShards) throws IOException {
966-
Client remoteClient = client(clusterAlias);
1072+
protected Set<String> populateIndex(String clusterAlias, String indexName, int numShards) {
1073+
Client localClient = client(clusterAlias);
1074+
assertAcked(
1075+
localClient.admin()
1076+
.indices()
1077+
.prepareCreate(indexName)
1078+
.setSettings(Settings.builder().put("index.number_of_shards", numShards))
1079+
.setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long", "const", "type=long")
1080+
);
1081+
Set<String> ids = new HashSet<>();
1082+
int numDocs = between(1, 100);
1083+
String tag = Strings.isEmpty(clusterAlias) ? "local" : clusterAlias;
1084+
for (int i = 0; i < numDocs; i++) {
1085+
String id = UUIDs.randomBase64UUID();
1086+
localClient.prepareIndex(indexName).setSource("id", id, "tag", tag, "v", i).get();
1087+
ids.add(id);
1088+
}
1089+
localClient.admin().indices().prepareRefresh(indexName).get();
1090+
return ids;
1091+
}
1092+
1093+
Set<String> populateIndexWithFailingFields(String clusterAlias, String indexName, int numShards) throws IOException {
1094+
Client client = client(clusterAlias);
9671095
XContentBuilder mapping = JsonXContent.contentBuilder().startObject();
9681096
mapping.startObject("runtime");
9691097
{
@@ -975,16 +1103,29 @@ void populateRemoteIndicesFail(String clusterAlias, String indexName, int numSha
9751103
mapping.endObject();
9761104
}
9771105
mapping.endObject();
1106+
mapping.startObject("properties");
1107+
{
1108+
mapping.startObject("id").field("type", "keyword").endObject();
1109+
mapping.startObject("tag").field("type", "keyword").endObject();
1110+
}
1111+
mapping.endObject();
9781112
assertAcked(
979-
remoteClient.admin()
1113+
client.admin()
9801114
.indices()
9811115
.prepareCreate(indexName)
9821116
.setSettings(Settings.builder().put("index.number_of_shards", numShards))
9831117
.setMapping(mapping.endObject())
9841118
);
985-
986-
remoteClient.prepareIndex(indexName).setSource("id", 0).get();
987-
remoteClient.admin().indices().prepareRefresh(indexName).get();
1119+
Set<String> ids = new HashSet<>();
1120+
String tag = clusterAlias.isEmpty() ? "local" : clusterAlias;
1121+
int numDocs = between(1, 100);
1122+
for (int i = 0; i < numDocs; i++) {
1123+
String id = UUIDs.base64UUID();
1124+
client.prepareIndex(indexName).setSource("id", id, "tag", tag, "v", i).get();
1125+
ids.add(id);
1126+
}
1127+
client.admin().indices().prepareRefresh(indexName).get();
1128+
return ids;
9881129
}
9891130

9901131
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java

Lines changed: 81 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.action.ActionListenerResponseHandler;
1212
import org.elasticsearch.action.OriginalIndices;
1313
import org.elasticsearch.action.support.ChannelActionListener;
14+
import org.elasticsearch.compute.operator.DriverProfile;
1415
import org.elasticsearch.compute.operator.exchange.ExchangeService;
1516
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
1617
import org.elasticsearch.core.Releasable;
@@ -34,10 +35,11 @@
3435
import java.util.ArrayList;
3536
import java.util.List;
3637
import java.util.Map;
37-
import java.util.Objects;
3838
import java.util.Set;
3939
import java.util.concurrent.Executor;
40+
import java.util.concurrent.atomic.AtomicInteger;
4041
import java.util.concurrent.atomic.AtomicReference;
42+
import java.util.function.Function;
4143

4244
/**
4345
* Manages computes across multiple clusters by sending {@link ClusterComputeRequest} to remote clusters and executing the computes.
@@ -74,69 +76,108 @@ void startComputeOnRemoteCluster(
7476
RemoteCluster cluster,
7577
Runnable cancelQueryOnFailure,
7678
EsqlExecutionInfo executionInfo,
77-
ActionListener<ComputeResponse> listener
79+
ActionListener<List<DriverProfile>> listener
7880
) {
7981
var queryPragmas = configuration.pragmas();
8082
listener = ActionListener.runBefore(listener, exchangeSource.addEmptySink()::close);
8183
final var childSessionId = computeService.newChildSession(sessionId);
82-
final AtomicReference<ComputeResponse> finalResponse = new AtomicReference<>();
8384
final String clusterAlias = cluster.clusterAlias();
84-
try (var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> {
85-
var resp = finalResponse.get();
86-
return Objects.requireNonNullElseGet(resp, () -> new ComputeResponse(profiles));
87-
}))) {
88-
ExchangeService.openExchange(
89-
transportService,
90-
cluster.connection,
91-
childSessionId,
92-
queryPragmas.exchangeBufferSize(),
93-
esqlExecutor,
94-
EsqlCCSUtils.skipUnavailableListener(
95-
computeListener.acquireAvoid(),
96-
executionInfo,
97-
clusterAlias,
98-
EsqlExecutionInfo.Cluster.Status.SKIPPED
99-
).delegateFailureAndWrap((l, unused) -> {
100-
var listenerGroup = new RemoteListenerGroup(
101-
transportService,
102-
rootTask,
103-
computeListener,
104-
clusterAlias,
105-
executionInfo,
106-
l
107-
);
108-
109-
var remoteSink = exchangeService.newRemoteSink(
110-
listenerGroup.getGroupTask(),
111-
childSessionId,
112-
transportService,
113-
cluster.connection
114-
);
85+
final boolean failFast = executionInfo.isSkipUnavailable(clusterAlias) == false && configuration.allowPartialResults() == false;
86+
final AtomicInteger pagesFetched = new AtomicInteger();
87+
final AtomicReference<ComputeResponse> finalResponse = new AtomicReference<>();
88+
listener = listener.delegateResponse((l, e) -> {
89+
final boolean receivedResults = finalResponse.get() != null || pagesFetched.get() > 0;
90+
if (receivedResults == false && EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) {
91+
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e);
92+
l.onResponse(List.of());
93+
} else if (configuration.allowPartialResults()) {
94+
final var status = receivedResults ? EsqlExecutionInfo.Cluster.Status.PARTIAL : EsqlExecutionInfo.Cluster.Status.FAILED;
95+
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, status, e);
96+
l.onResponse(List.of());
97+
} else {
98+
l.onFailure(e);
99+
}
100+
});
101+
ExchangeService.openExchange(
102+
transportService,
103+
cluster.connection,
104+
childSessionId,
105+
queryPragmas.exchangeBufferSize(),
106+
esqlExecutor,
107+
listener.delegateFailure((l, unused) -> {
108+
final CancellableTask groupTask;
109+
final Runnable onGroupFailure;
110+
if (failFast) {
111+
groupTask = rootTask;
112+
onGroupFailure = cancelQueryOnFailure;
113+
} else {
114+
groupTask = ComputeGroupTask.createGroupTask(transportService, rootTask, rootTask::getDescription);
115+
onGroupFailure = computeService.cancelQueryOnFailure(groupTask);
116+
l = ActionListener.runAfter(l, () -> transportService.getTaskManager().unregister(groupTask));
117+
}
118+
try (var computeListener = new ComputeListener(transportService.getThreadPool(), onGroupFailure, l.map(profiles -> {
119+
updateExecutionInfo(executionInfo, clusterAlias, finalResponse.get());
120+
return profiles;
121+
}))) {
122+
var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, cluster.connection);
115123
exchangeSource.addRemoteSink(
116124
remoteSink,
117125
executionInfo.isSkipUnavailable(clusterAlias) == false,
118-
() -> {},
126+
pagesFetched::incrementAndGet,
119127
queryPragmas.concurrentExchangeClients(),
120-
listenerGroup.getExchangeRequestListener()
128+
computeListener.acquireAvoid()
121129
);
122130
var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices);
123131
var clusterRequest = new ClusterComputeRequest(clusterAlias, childSessionId, configuration, remotePlan);
124-
final ActionListener<ComputeResponse> clusterListener = listenerGroup.getClusterRequestListener().map(r -> {
132+
final ActionListener<ComputeResponse> clusterListener = computeListener.acquireCompute().map(r -> {
125133
finalResponse.set(r);
126134
return r.getProfiles();
127135
});
128136
transportService.sendChildRequest(
129137
cluster.connection,
130138
ComputeService.CLUSTER_ACTION_NAME,
131139
clusterRequest,
132-
listenerGroup.getGroupTask(),
140+
groupTask,
133141
TransportRequestOptions.EMPTY,
134142
new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor)
135143
);
136-
})
144+
}
145+
})
146+
);
147+
}
148+
149+
private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String clusterAlias, ComputeResponse resp) {
150+
Function<EsqlExecutionInfo.Cluster.Status, EsqlExecutionInfo.Cluster.Status> runningToSuccess = status -> {
151+
if (status != EsqlExecutionInfo.Cluster.Status.RUNNING) {
152+
return status;
153+
} else if (executionInfo.isStopped() || resp.failedShards > 0) {
154+
return EsqlExecutionInfo.Cluster.Status.PARTIAL;
155+
} else {
156+
return EsqlExecutionInfo.Cluster.Status.SUCCESSFUL;
157+
}
158+
};
159+
if (resp.getTook() != null) {
160+
var tookTime = TimeValue.timeValueNanos(executionInfo.planningTookTime().nanos() + resp.getTook().nanos());
161+
executionInfo.swapCluster(
162+
clusterAlias,
163+
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(runningToSuccess.apply(v.getStatus()))
164+
.setTook(tookTime)
165+
.setTotalShards(resp.getTotalShards())
166+
.setSuccessfulShards(resp.getSuccessfulShards())
167+
.setSkippedShards(resp.getSkippedShards())
168+
.setFailedShards(resp.getFailedShards())
169+
.build()
170+
);
171+
} else {
172+
// if the cluster is an older version and does not send back took time, then calculate it here on the coordinator
173+
// and leave shard info unset, so it is not shown in the CCS metadata section of the JSON response
174+
executionInfo.swapCluster(
175+
clusterAlias,
176+
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(runningToSuccess.apply(v.getStatus()))
177+
.setTook(executionInfo.tookSoFar())
178+
.build()
137179
);
138180
}
139-
140181
}
141182

142183
List<RemoteCluster> getRemoteClusters(

0 commit comments

Comments
 (0)