From b88b2083828bedaaa78ef7ce7861e64a09929ffb Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sun, 16 Feb 2025 23:37:33 -0800 Subject: [PATCH 01/10] Support partial results in CCS in ES|QL --- .../operator/exchange/ExchangeService.java | 2 +- ...CrossClusterQueryWithPartialResultsIT.java | 322 ++++++++++++++++++ .../esql/plugin/ClusterComputeHandler.java | 122 ++++--- .../xpack/esql/plugin/ComputeService.java | 77 ++--- .../esql/plugin/DataNodeComputeHandler.java | 6 +- .../esql/plugin/RemoteListenerGroup.java | 123 ------- .../xpack/esql/session/EsqlCCSUtils.java | 19 -- .../xpack/esql/session/EsqlCCSUtilsTests.java | 31 -- 8 files changed, 444 insertions(+), 258 deletions(-) create mode 100644 x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java delete mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java index dd36a6f455e8b..eddcbd97153ef 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java @@ -61,7 +61,7 @@ public final class ExchangeService extends AbstractLifecycleComponent { public static final String EXCHANGE_ACTION_NAME = "internal:data/read/esql/exchange"; public static final String EXCHANGE_ACTION_NAME_FOR_CCS = "cluster:internal:data/read/esql/exchange"; - private static final String OPEN_EXCHANGE_ACTION_NAME = "internal:data/read/esql/open_exchange"; + public static final String OPEN_EXCHANGE_ACTION_NAME = "internal:data/read/esql/open_exchange"; private static final String OPEN_EXCHANGE_ACTION_NAME_FOR_CCS = "cluster:internal:data/read/esql/open_exchange"; /** diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java new file mode 100644 index 0000000000000..d582128fd026d --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java @@ -0,0 +1,322 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.compute.operator.exchange.ExchangeService; +import org.elasticsearch.test.FailingFieldPlugin; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.esql.plugin.ComputeService; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.in; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class CrossClusterQueryWithPartialResultsIT extends AbstractCrossClusterTestCase { + private static final AtomicLong nextDocId = new AtomicLong(0); + + private static class ClusterSetup { + final int okShards = randomIntBetween(1, 5); + final int failingShards = randomIntBetween(1, 5); + Set okIds; + } + + private final ClusterSetup local = new ClusterSetup(); + private final ClusterSetup remote1 = new ClusterSetup(); + private final ClusterSetup remote2 = new ClusterSetup(); + + void populateIndices() throws Exception { + local.okIds = populateIndex(LOCAL_CLUSTER, "ok-local", local.okShards); + populateIndexWithFailingFields(LOCAL_CLUSTER, "fail-local", local.failingShards); + + remote1.okIds = populateIndex(REMOTE_CLUSTER_1, "ok-cluster1", remote1.okShards); + populateIndexWithFailingFields(REMOTE_CLUSTER_1, "fail-cluster1", remote1.failingShards); + + remote2.okIds = populateIndex(REMOTE_CLUSTER_2, "ok-cluster2", remote2.okShards); + populateIndexWithFailingFields(REMOTE_CLUSTER_2, "fail-cluster2", remote2.failingShards); + } + + public void testPartialResults() throws Exception { + populateIndices(); + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM ok*,fail*,*:ok*,*:fail* | KEEP id, fail_me | LIMIT 1000"); + request.includeCCSMetadata(randomBoolean()); + { + // allow_partial_results = false + request.includeCCSMetadata(randomBoolean()); + IllegalStateException error = expectThrows(IllegalStateException.class, () -> runQuery(request).close()); + assertThat(error.getMessage(), containsString("Accessing failing field")); + } + // allow_partial_results = true + request.allowPartialResults(true); + try (var resp = runQuery(request)) { + assertTrue(resp.isPartial()); + Set allIds = Stream.of(local.okIds, remote1.okIds, remote2.okIds) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + List> rows = getValuesList(resp); + assertThat(rows.size(), lessThanOrEqualTo(allIds.size())); + Set returnedIds = new HashSet<>(); + for (List row : rows) { + assertThat(row.size(), equalTo(2)); + String id = (String) row.get(0); + assertTrue(returnedIds.add(id)); + assertThat(id, is(in(allIds))); + } + if (request.includeCCSMetadata()) { + EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(LOCAL_CLUSTER); + assertThat(localInfo.getTotalShards(), equalTo(local.okShards + local.failingShards)); + assertThat(localInfo.getSuccessfulShards(), lessThanOrEqualTo(local.okShards)); + assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); + + EsqlExecutionInfo.Cluster remote1Info = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_1); + assertThat(remote1Info.getTotalShards(), equalTo(remote1.okShards + remote1.failingShards)); + assertThat(remote1Info.getSuccessfulShards(), lessThanOrEqualTo(remote1.okShards)); + assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); + + EsqlExecutionInfo.Cluster remote2Info = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_2); + assertThat(remote2Info.getTotalShards(), equalTo(remote2.okShards + remote2.failingShards)); + assertThat(remote2Info.getSuccessfulShards(), lessThanOrEqualTo(remote2.okShards)); + assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); + } + } + } + + public void testOneRemoteClusterPartial() throws Exception { + populateIndices(); + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM ok*,cluster-a:ok*,*-b:fail* | KEEP id, fail_me"); + request.allowPartialResults(true); + request.includeCCSMetadata(randomBoolean()); + try (var resp = runQuery(request)) { + assertTrue(resp.isPartial()); + Set allIds = Stream.of(local.okIds, remote1.okIds).flatMap(Collection::stream).collect(Collectors.toSet()); + List> rows = getValuesList(resp); + assertThat(rows.size(), equalTo(allIds.size())); + Set returnedIds = new HashSet<>(); + for (List row : rows) { + assertThat(row.size(), equalTo(2)); + String id = (String) row.get(0); + assertTrue(returnedIds.add(id)); + } + assertThat(returnedIds, equalTo(allIds)); + if (request.includeCCSMetadata()) { + EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(LOCAL_CLUSTER); + assertThat(localInfo.getTotalShards(), equalTo(local.okShards)); + assertThat(localInfo.getSuccessfulShards(), equalTo(local.okShards)); + assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + + EsqlExecutionInfo.Cluster remote1Info = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_1); + assertThat(remote1Info.getTotalShards(), equalTo(remote1.okShards)); + assertThat(remote1Info.getSuccessfulShards(), equalTo(remote1.okShards)); + assertThat(remote1Info.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + + EsqlExecutionInfo.Cluster remote2Info = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_2); + assertThat(remote2Info.getTotalShards(), equalTo(remote2.failingShards)); + assertThat(remote2Info.getSuccessfulShards(), equalTo(0)); + assertThat(remote2Info.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); + } + } + } + + public void testFailToReceiveClusterResponse() throws Exception { + populateIndices(); + // fetched pages, but failed to receive the cluster response + for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) { + MockTransportService ts = asInstanceOf(MockTransportService.class, transportService); + ts.addRequestHandlingBehavior( + ComputeService.CLUSTER_ACTION_NAME, + (handler, request, channel, task) -> handler.messageReceived(request, new TransportChannel() { + @Override + public String getProfileName() { + return channel.getProfileName(); + } + + @Override + public void sendResponse(TransportResponse response) { + sendResponse(new CircuitBreakingException("simulated", CircuitBreaker.Durability.PERMANENT)); + } + + @Override + public void sendResponse(Exception exception) { + channel.sendResponse(exception); + } + }, task) + ); + } + try { + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM ok*,cluster-a:ok* | KEEP id"); + request.includeCCSMetadata(randomBoolean()); + { + request.allowPartialResults(false); + var error = expectThrows(CircuitBreakingException.class, () -> runQuery(request).close()); + assertThat(error.getMessage(), equalTo("simulated")); + } + request.allowPartialResults(true); + try (var resp = runQuery(request)) { + assertTrue(resp.isPartial()); + List> rows = getValuesList(resp); + Set returnedIds = new HashSet<>(); + for (List row : rows) { + assertThat(row.size(), equalTo(1)); + String id = (String) row.get(0); + assertTrue(returnedIds.add(id)); + } + assertThat(returnedIds, equalTo(Sets.union(local.okIds, remote1.okIds))); + if (request.includeCCSMetadata()) { + EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(LOCAL_CLUSTER); + assertThat(localInfo.getTotalShards(), equalTo(localInfo.getTotalShards())); + assertThat(localInfo.getSuccessfulShards(), equalTo(localInfo.getSuccessfulShards())); + assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + + EsqlExecutionInfo.Cluster remoteInfo = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_1); + assertThat(remoteInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); + } + } + } finally { + for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) { + MockTransportService ts = asInstanceOf(MockTransportService.class, transportService); + ts.clearAllRules(); + } + } + } + + public void testFailToStartRequestOnRemoteCluster() throws Exception { + populateIndices(); + for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) { + MockTransportService ts = asInstanceOf(MockTransportService.class, transportService); + String actionToFail = randomFrom( + ExchangeService.EXCHANGE_ACTION_NAME, + ExchangeService.OPEN_EXCHANGE_ACTION_NAME, + ComputeService.CLUSTER_ACTION_NAME + ); + ts.addRequestHandlingBehavior(actionToFail, (handler, request, channel, task) -> { + channel.sendResponse(new IllegalStateException("simulated")); + }); + } + try { + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM ok*,*a:ok* | KEEP id"); + request.includeCCSMetadata(randomBoolean()); + { + request.allowPartialResults(false); + var error = expectThrows(IllegalStateException.class, () -> runQuery(request).close()); + assertThat(error.getMessage(), containsString("simulated")); + } + request.allowPartialResults(true); + try (var resp = runQuery(request)) { + assertTrue(resp.isPartial()); + List> rows = getValuesList(resp); + Set returnedIds = new HashSet<>(); + for (List row : rows) { + assertThat(row.size(), equalTo(1)); + String id = (String) row.get(0); + assertTrue(returnedIds.add(id)); + } + assertThat(returnedIds, equalTo(local.okIds)); + if (request.includeCCSMetadata()) { + EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(LOCAL_CLUSTER); + assertThat(localInfo.getTotalShards(), equalTo(local.okShards)); + assertThat(localInfo.getSuccessfulShards(), equalTo(local.okShards)); + assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + + EsqlExecutionInfo.Cluster remoteInfo = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_1); + assertThat(remoteInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); + } + } + } finally { + for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) { + MockTransportService ts = asInstanceOf(MockTransportService.class, transportService); + ts.clearAllRules(); + } + } + } + + private Set populateIndex(String clusterAlias, String indexName, int numShards) { + Client client = client(clusterAlias); + assertAcked( + client.admin() + .indices() + .prepareCreate(indexName) + .setSettings(Settings.builder().put("index.number_of_shards", numShards)) + .setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long", "const", "type=long") + ); + Set ids = new HashSet<>(); + int numDocs = between(1, 100); + String tag = Strings.isEmpty(clusterAlias) ? "local" : clusterAlias; + for (int i = 0; i < numDocs; i++) { + String id = Long.toString(nextDocId.incrementAndGet()); + client.prepareIndex(indexName).setSource("id", id, "tag", tag, "v", i).get(); + ids.add(id); + } + client.admin().indices().prepareRefresh(indexName).get(); + return ids; + } + + private Set populateIndexWithFailingFields(String clusterAlias, String indexName, int numShards) throws IOException { + Client client = client(clusterAlias); + XContentBuilder mapping = JsonXContent.contentBuilder().startObject(); + mapping.startObject("runtime"); + { + mapping.startObject("fail_me"); + { + mapping.field("type", "long"); + mapping.startObject("script").field("source", "").field("lang", FailingFieldPlugin.FAILING_FIELD_LANG).endObject(); + } + mapping.endObject(); + } + mapping.endObject(); + mapping.startObject("properties"); + { + mapping.startObject("id").field("type", "keyword").endObject(); + mapping.startObject("tag").field("type", "keyword").endObject(); + } + mapping.endObject(); + assertAcked( + client.admin() + .indices() + .prepareCreate(indexName) + .setSettings(Settings.builder().put("index.number_of_shards", numShards)) + .setMapping(mapping.endObject()) + ); + Set ids = new HashSet<>(); + String tag = clusterAlias.isEmpty() ? "local" : clusterAlias; + int numDocs = between(50, 100); // large enough to have failing documents in every shard + for (int i = 0; i < numDocs; i++) { + String id = Long.toString(nextDocId.incrementAndGet()); + client.prepareIndex(indexName).setSource("id", id, "tag", tag, "v", i).get(); + ids.add(id); + } + client.admin().indices().prepareRefresh(indexName).get(); + return ids; + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java index 3b599a87afaff..61d2f581471b1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.support.ChannelActionListener; +import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler; import org.elasticsearch.core.Releasable; @@ -34,10 +35,11 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; /** * Manages computes across multiple clusters by sending {@link ClusterComputeRequest} to remote clusters and executing the computes. @@ -74,54 +76,59 @@ void startComputeOnRemoteCluster( RemoteCluster cluster, Runnable cancelQueryOnFailure, EsqlExecutionInfo executionInfo, - ActionListener listener + ActionListener> listener ) { var queryPragmas = configuration.pragmas(); listener = ActionListener.runBefore(listener, exchangeSource.addEmptySink()::close); final var childSessionId = computeService.newChildSession(sessionId); - final AtomicReference finalResponse = new AtomicReference<>(); final String clusterAlias = cluster.clusterAlias(); - try (var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> { - var resp = finalResponse.get(); - return Objects.requireNonNullElseGet(resp, () -> new ComputeResponse(profiles)); - }))) { - ExchangeService.openExchange( - transportService, - cluster.connection, - childSessionId, - queryPragmas.exchangeBufferSize(), - esqlExecutor, - EsqlCCSUtils.skipUnavailableListener( - computeListener.acquireAvoid(), - executionInfo, - clusterAlias, - EsqlExecutionInfo.Cluster.Status.SKIPPED - ).delegateFailureAndWrap((l, unused) -> { - var listenerGroup = new RemoteListenerGroup( - transportService, - rootTask, - computeListener, - clusterAlias, - executionInfo, - l - ); - - var remoteSink = exchangeService.newRemoteSink( - listenerGroup.getGroupTask(), - childSessionId, - transportService, - cluster.connection - ); + final AtomicInteger pagesFetched = new AtomicInteger(); + final AtomicReference finalResponse = new AtomicReference<>(); + listener = listener.delegateResponse((l, e) -> { + final boolean receivedResults = finalResponse.get() != null || pagesFetched.get() > 0; + if (receivedResults == false && EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) { + EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e); + l.onResponse(List.of()); + } else if (configuration.allowPartialResults()) { + EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL, e); + l.onResponse(List.of()); + } else { + l.onFailure(e); + } + }); + ExchangeService.openExchange( + transportService, + cluster.connection, + childSessionId, + queryPragmas.exchangeBufferSize(), + esqlExecutor, + listener.delegateFailure((l, unused) -> { + final CancellableTask groupTask; + final Runnable onGroupFailure; + boolean failFast = executionInfo.isSkipUnavailable(clusterAlias) == false && configuration.allowPartialResults() == false; + if (failFast) { + groupTask = rootTask; + onGroupFailure = cancelQueryOnFailure; + } else { + groupTask = computeService.createGroupTask(rootTask); + onGroupFailure = computeService.cancelQueryOnFailure(groupTask); + l = ActionListener.runAfter(l, () -> transportService.getTaskManager().unregister(groupTask)); + } + try (var computeListener = new ComputeListener(transportService.getThreadPool(), onGroupFailure, l.map(profiles -> { + updateExecutionInfo(executionInfo, clusterAlias, finalResponse.get()); + return profiles; + }))) { + var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, cluster.connection); exchangeSource.addRemoteSink( remoteSink, - executionInfo.isSkipUnavailable(clusterAlias) == false, - () -> {}, + failFast, + pagesFetched::incrementAndGet, queryPragmas.concurrentExchangeClients(), - listenerGroup.getExchangeRequestListener() + computeListener.acquireAvoid() ); var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices); var clusterRequest = new ClusterComputeRequest(clusterAlias, childSessionId, configuration, remotePlan); - final ActionListener clusterListener = listenerGroup.getClusterRequestListener().map(r -> { + final ActionListener clusterListener = computeListener.acquireCompute().map(r -> { finalResponse.set(r); return r.getProfiles(); }); @@ -129,14 +136,47 @@ void startComputeOnRemoteCluster( cluster.connection, ComputeService.CLUSTER_ACTION_NAME, clusterRequest, - listenerGroup.getGroupTask(), + groupTask, TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor) ); - }) + } + }) + ); + } + + private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String clusterAlias, ComputeResponse resp) { + Function runningToSuccess = status -> { + if (status != EsqlExecutionInfo.Cluster.Status.RUNNING) { + return status; + } else if (executionInfo.isStopped() || resp.failedShards > 0) { + return EsqlExecutionInfo.Cluster.Status.PARTIAL; + } else { + return EsqlExecutionInfo.Cluster.Status.SUCCESSFUL; + } + }; + if (resp.getTook() != null) { + var tookTime = TimeValue.timeValueNanos(executionInfo.planningTookTime().nanos() + resp.getTook().nanos()); + executionInfo.swapCluster( + clusterAlias, + (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(runningToSuccess.apply(v.getStatus())) + .setTook(tookTime) + .setTotalShards(resp.getTotalShards()) + .setSuccessfulShards(resp.getSuccessfulShards()) + .setSkippedShards(resp.getSkippedShards()) + .setFailedShards(resp.getFailedShards()) + .build() + ); + } else { + // if the cluster is an older version and does not send back took time, then calculate it here on the coordinator + // and leave shard info unset, so it is not shown in the CCS metadata section of the JSON response + executionInfo.swapCluster( + clusterAlias, + (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(runningToSuccess.apply(v.getStatus())) + .setTook(executionInfo.tookSoFar()) + .build() ); } - } List getRemoteClusters( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index a9a3be7ecab1c..a97838d2cd651 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -31,8 +31,12 @@ import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.lookup.SourceProvider; import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.action.EsqlQueryAction; @@ -57,7 +61,6 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Function; import java.util.function.Supplier; import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME; @@ -211,7 +214,8 @@ public void execute( computeListener.acquireCompute().delegateFailure((l, profiles) -> { if (execInfo.clusterInfo.containsKey(LOCAL_CLUSTER)) { var tookTime = TimeValue.timeValueNanos(System.nanoTime() - execInfo.getRelativeStartNanos()); - var status = localClusterWasInterrupted.get() + final Integer failedShards = execInfo.getCluster(LOCAL_CLUSTER).getFailedShards(); + var status = localClusterWasInterrupted.get() || (failedShards != null && failedShards > 0) ? EsqlExecutionInfo.Cluster.Status.PARTIAL : EsqlExecutionInfo.Cluster.Status.SUCCESSFUL; execInfo.swapCluster( @@ -277,48 +281,13 @@ public void execute( cluster, cancelQueryOnFailure, execInfo, - computeListener.acquireCompute().map(r -> { - updateExecutionInfo(execInfo, cluster.clusterAlias(), r); - return r.getProfiles(); - }) + computeListener.acquireCompute() ); } } } } - private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String clusterAlias, ComputeResponse resp) { - Function runningToSuccess = status -> { - if (status == EsqlExecutionInfo.Cluster.Status.RUNNING) { - return executionInfo.isStopped() ? EsqlExecutionInfo.Cluster.Status.PARTIAL : EsqlExecutionInfo.Cluster.Status.SUCCESSFUL; - } else { - return status; - } - }; - if (resp.getTook() != null) { - var tookTime = TimeValue.timeValueNanos(executionInfo.planningTookTime().nanos() + resp.getTook().nanos()); - executionInfo.swapCluster( - clusterAlias, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(runningToSuccess.apply(v.getStatus())) - .setTook(tookTime) - .setTotalShards(resp.getTotalShards()) - .setSuccessfulShards(resp.getSuccessfulShards()) - .setSkippedShards(resp.getSkippedShards()) - .setFailedShards(resp.getFailedShards()) - .build() - ); - } else { - // if the cluster is an older version and does not send back took time, then calculate it here on the coordinator - // and leave shard info unset, so it is not shown in the CCS metadata section of the JSON response - executionInfo.swapCluster( - clusterAlias, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(runningToSuccess.apply(v.getStatus())) - .setTook(executionInfo.tookSoFar()) - .build() - ); - } - } - // For queries like: FROM logs* | LIMIT 0 (including cross-cluster LIMIT 0 queries) private static void updateShardCountForCoordinatorOnlyQuery(EsqlExecutionInfo execInfo) { if (execInfo.isCrossClusterSearch()) { @@ -444,4 +413,36 @@ Runnable cancelQueryOnFailure(CancellableTask task) { transportService.getTaskManager().cancelTaskAndDescendants(task, "cancelled on failure", false, ActionListener.noop()); }); } + + CancellableTask createGroupTask(Task parentTask) { + final TaskManager taskManager = transportService.getTaskManager(); + return (CancellableTask) taskManager.register( + "transport", + "esql_compute_group", + new ComputeGroupTaskRequest( + parentTask.taskInfo(transportService.getLocalNode().getId(), false).taskId(), + parentTask::getDescription + ) + ); + } + + private static class ComputeGroupTaskRequest extends TransportRequest { + private final Supplier parentDescription; + + ComputeGroupTaskRequest(TaskId parentTask, Supplier description) { + this.parentDescription = description; + setParentTask(parentTask); + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + assert parentTaskId.isSet(); + return new CancellableTask(id, type, action, "", parentTaskId, headers); + } + + @Override + public String getDescription() { + return "group [" + parentDescription.get() + "]"; + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java index 3106a7b5a43cd..2560ffa9c71af 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java @@ -130,11 +130,7 @@ protected void sendRequest( final Runnable onGroupFailure; final CancellableTask groupTask; if (allowPartialResults) { - groupTask = RemoteListenerGroup.createGroupTask( - transportService, - parentTask, - () -> "compute group: data-node [" + node.getName() + "], " + shardIds + " [" + shardIds + "]" - ); + groupTask = computeService.createGroupTask(parentTask); onGroupFailure = computeService.cancelQueryOnFailure(groupTask); l = ActionListener.runAfter(l, () -> transportService.getTaskManager().unregister(groupTask)); } else { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java deleted file mode 100644 index 3c6c13993520b..0000000000000 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.esql.plugin; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.common.util.concurrent.CountDown; -import org.elasticsearch.compute.operator.DriverProfile; -import org.elasticsearch.tasks.CancellableTask; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.tasks.TaskManager; -import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; -import org.elasticsearch.xpack.esql.session.EsqlCCSUtils; - -import java.util.List; -import java.util.Map; -import java.util.function.Supplier; - -/** - * Create group task for this cluster. This group task ensures that two branches of the computation: - * the exchange sink and the cluster request, belong to the same group and each of them can cancel the other. - * runAfter listeners below ensure that the group is finalized when both branches are done. - * The group task is the child of the root task, so if the root task is cancelled, the group task is cancelled too. - */ -class RemoteListenerGroup { - private final CancellableTask groupTask; - private final ActionListener exchangeRequestListener; - private final ActionListener> clusterRequestListener; - private final TaskManager taskManager; - private final String clusterAlias; - private final EsqlExecutionInfo executionInfo; - private final TransportService transportService; - - RemoteListenerGroup( - TransportService transportService, - Task rootTask, - ComputeListener computeListener, - String clusterAlias, - EsqlExecutionInfo executionInfo, - ActionListener delegate - ) { - this.transportService = transportService; - this.taskManager = transportService.getTaskManager(); - this.clusterAlias = clusterAlias; - this.executionInfo = executionInfo; - groupTask = createGroupTask(transportService, rootTask, () -> rootTask.getDescription() + "[" + clusterAlias + "]"); - CountDown countDown = new CountDown(2); - // The group is done when both the sink and the cluster request are done - Runnable finishGroup = () -> { - if (countDown.countDown()) { - taskManager.unregister(groupTask); - delegate.onResponse(null); - } - }; - // Cancel the group on sink failure - exchangeRequestListener = createCancellingListener("exchange sink failure", computeListener.acquireAvoid(), finishGroup); - - // Cancel the group on cluster request failure - clusterRequestListener = createCancellingListener("exchange cluster action failure", computeListener.acquireCompute(), finishGroup); - } - - /** - * Create a listener that: - * 1. Cancels the group task on failure - * 2. Marks the cluster as partial if the error is ignorable, otherwise propagates the error - */ - private ActionListener createCancellingListener(String reason, ActionListener delegate, Runnable finishGroup) { - return ActionListener.runAfter(delegate.delegateResponse((inner, e) -> { - taskManager.cancelTaskAndDescendants(groupTask, reason, true, ActionListener.running(() -> { - EsqlCCSUtils.skipUnavailableListener(inner, executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL) - .onFailure(e); - })); - }), finishGroup); - } - - public CancellableTask getGroupTask() { - return groupTask; - } - - public ActionListener getExchangeRequestListener() { - return exchangeRequestListener; - } - - public ActionListener> getClusterRequestListener() { - return clusterRequestListener; - } - - public static CancellableTask createGroupTask(TransportService transportService, Task parentTask, Supplier description) { - final TaskManager taskManager = transportService.getTaskManager(); - return (CancellableTask) taskManager.register( - "transport", - "esql_compute_group", - new ComputeGroupTaskRequest(parentTask.taskInfo(transportService.getLocalNode().getId(), false).taskId(), description) - ); - } - - private static class ComputeGroupTaskRequest extends TransportRequest { - private final Supplier parentDescription; - - ComputeGroupTaskRequest(TaskId parentTask, Supplier description) { - this.parentDescription = description; - setParentTask(parentTask); - } - - @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - assert parentTaskId.isSet(); - return new CancellableTask(id, type, action, "", parentTaskId, headers); - } - - @Override - public String getDescription() { - return "group [" + parentDescription.get() + "]"; - } - } -} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index 64e5c6647e9ca..89cd4b3d4d7cd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -369,23 +369,4 @@ public static boolean shouldIgnoreRuntimeError(EsqlExecutionInfo executionInfo, return ExceptionsHelper.isRemoteUnavailableException(e); } - - /** - * Wrap a listener so that it will skip errors that are ignorable - */ - public static ActionListener skipUnavailableListener( - ActionListener delegate, - EsqlExecutionInfo executionInfo, - String clusterAlias, - EsqlExecutionInfo.Cluster.Status status - ) { - return delegate.delegateResponse((l, e) -> { - if (shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) { - markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, status, e); - l.onResponse(null); - } else { - l.onFailure(e); - } - }); - } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java index 49cfbba5c7610..7df3851224498 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java @@ -10,7 +10,6 @@ import org.apache.lucene.index.CorruptIndexException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; import org.elasticsearch.action.search.ShardSearchFailure; @@ -52,7 +51,6 @@ import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY; import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.checkForCcsLicense; import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.shouldIgnoreRuntimeError; -import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.skipUnavailableListener; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -797,35 +795,6 @@ public void testShouldIgnoreRuntimeError() { assertThat(shouldIgnoreRuntimeError(executionInfo, LOCAL_CLUSTER_ALIAS, new TaskCancelledException("task cancelled")), is(false)); } - public void testSkipUnavailableListener() { - Predicate skipUnPredicate = s -> s.equals(REMOTE1_ALIAS); - - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, true); - executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); - executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true)); - executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", false)); - - ActionListener expectResult = ActionListener.wrap(unused -> {}, (e) -> fail("Listener should not have failed")); - ActionListener expectFailure = ActionListener.wrap(unused -> fail("Listener should have failed"), (e) -> {}); - - // snip_unavailable=true but not connect exception, so should fail - skipUnavailableListener(expectFailure, executionInfo, REMOTE1_ALIAS, EsqlExecutionInfo.Cluster.Status.PARTIAL).onFailure( - new ElasticsearchException("something is wrong") - ); - assertThat(executionInfo.getCluster(REMOTE1_ALIAS).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)); - - // snip_unavailable=true, so should not fail - skipUnavailableListener(expectResult, executionInfo, REMOTE1_ALIAS, EsqlExecutionInfo.Cluster.Status.PARTIAL).onFailure( - new IllegalStateException("Unable to open any connections") - ); - assertThat(executionInfo.getCluster(REMOTE1_ALIAS).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); - // snip_unavailable=false, so should fail - skipUnavailableListener(expectFailure, executionInfo, REMOTE2_ALIAS, EsqlExecutionInfo.Cluster.Status.PARTIAL).onFailure( - new IllegalStateException("Unable to open any connections") - ); - - } - private XPackLicenseStatus activeLicenseStatus(License.OperationMode operationMode) { return new XPackLicenseStatus(operationMode, true, null); } From c6159adbda09637ccfbd3ca002a1b91684c5c1bf Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 17 Feb 2025 10:31:29 -0800 Subject: [PATCH 02/10] Update docs/changelog/122708.yaml --- docs/changelog/122708.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/122708.yaml diff --git a/docs/changelog/122708.yaml b/docs/changelog/122708.yaml new file mode 100644 index 0000000000000..ec4e75798d473 --- /dev/null +++ b/docs/changelog/122708.yaml @@ -0,0 +1,5 @@ +pr: 122708 +summary: Support partial results in CCS in ES|QL +area: ES|QL +type: enhancement +issues: [] From 1252fba4fb8ac2a742789090fd722c00c3fc6983 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 17 Feb 2025 11:15:27 -0800 Subject: [PATCH 03/10] assert doc stats --- .../action/CrossClusterQueryWithPartialResultsIT.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java index d582128fd026d..cad28937eab04 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java @@ -36,6 +36,7 @@ import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.in; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -310,13 +311,18 @@ private Set populateIndexWithFailingFields(String clusterAlias, String i ); Set ids = new HashSet<>(); String tag = clusterAlias.isEmpty() ? "local" : clusterAlias; - int numDocs = between(50, 100); // large enough to have failing documents in every shard + int numDocs = between(1, 100); // large enough to have failing documents in every shard for (int i = 0; i < numDocs; i++) { String id = Long.toString(nextDocId.incrementAndGet()); client.prepareIndex(indexName).setSource("id", id, "tag", tag, "v", i).get(); ids.add(id); } client.admin().indices().prepareRefresh(indexName).get(); + for (var shardStats : client.admin().indices().prepareStats(indexName).clear().setDocs(true).get().getShards()) { + var docsStats = shardStats.getStats().docs; + assertNotNull(docsStats); + assertThat("no doc for shard " + shardStats.getShardRouting().shardId(), docsStats.getCount(), greaterThan(0L)); + } return ids; } } From 6c4ec5afdabdb1ae9712fca8e03e083d7cef0044 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 18 Feb 2025 09:39:46 -0800 Subject: [PATCH 04/10] includeMetadata --- .../esql/action/CrossClusterQueryWithPartialResultsIT.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java index cad28937eab04..8becf8e25d880 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java @@ -71,12 +71,10 @@ public void testPartialResults() throws Exception { request.query("FROM ok*,fail*,*:ok*,*:fail* | KEEP id, fail_me | LIMIT 1000"); request.includeCCSMetadata(randomBoolean()); { - // allow_partial_results = false - request.includeCCSMetadata(randomBoolean()); + request.allowPartialResults(false); IllegalStateException error = expectThrows(IllegalStateException.class, () -> runQuery(request).close()); assertThat(error.getMessage(), containsString("Accessing failing field")); } - // allow_partial_results = true request.allowPartialResults(true); try (var resp = runQuery(request)) { assertTrue(resp.isPartial()); From 261ed18651a5edb21b35fb477a856bc1185199bc Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 18 Feb 2025 09:53:57 -0800 Subject: [PATCH 05/10] random failure --- ...CrossClusterQueryWithPartialResultsIT.java | 35 ++++++++++++++----- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java index 8becf8e25d880..04c5cd57ffb01 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.esql.action; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.CircuitBreaker; @@ -21,6 +23,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.esql.EsqlTestUtils; import org.elasticsearch.xpack.esql.plugin.ComputeService; import java.io.IOException; @@ -148,6 +151,7 @@ public void testOneRemoteClusterPartial() throws Exception { public void testFailToReceiveClusterResponse() throws Exception { populateIndices(); + Exception simulatedFailure = randomFailure(); // fetched pages, but failed to receive the cluster response for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) { MockTransportService ts = asInstanceOf(MockTransportService.class, transportService); @@ -161,7 +165,7 @@ public String getProfileName() { @Override public void sendResponse(TransportResponse response) { - sendResponse(new CircuitBreakingException("simulated", CircuitBreaker.Durability.PERMANENT)); + sendResponse(simulatedFailure); } @Override @@ -177,8 +181,10 @@ public void sendResponse(Exception exception) { request.includeCCSMetadata(randomBoolean()); { request.allowPartialResults(false); - var error = expectThrows(CircuitBreakingException.class, () -> runQuery(request).close()); - assertThat(error.getMessage(), equalTo("simulated")); + Exception error = expectThrows(Exception.class, () -> runQuery(request).close()); + var unwrapped = ExceptionsHelper.unwrap(error, simulatedFailure.getClass()); + assertNotNull(unwrapped); + assertThat(unwrapped.getMessage(), equalTo(simulatedFailure.getMessage())); } request.allowPartialResults(true); try (var resp = runQuery(request)) { @@ -211,6 +217,7 @@ public void sendResponse(Exception exception) { public void testFailToStartRequestOnRemoteCluster() throws Exception { populateIndices(); + Exception simulatedFailure = randomFailure(); for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) { MockTransportService ts = asInstanceOf(MockTransportService.class, transportService); String actionToFail = randomFrom( @@ -218,9 +225,7 @@ public void testFailToStartRequestOnRemoteCluster() throws Exception { ExchangeService.OPEN_EXCHANGE_ACTION_NAME, ComputeService.CLUSTER_ACTION_NAME ); - ts.addRequestHandlingBehavior(actionToFail, (handler, request, channel, task) -> { - channel.sendResponse(new IllegalStateException("simulated")); - }); + ts.addRequestHandlingBehavior(actionToFail, (handler, request, channel, task) -> { channel.sendResponse(simulatedFailure); }); } try { EsqlQueryRequest request = new EsqlQueryRequest(); @@ -228,8 +233,11 @@ public void testFailToStartRequestOnRemoteCluster() throws Exception { request.includeCCSMetadata(randomBoolean()); { request.allowPartialResults(false); - var error = expectThrows(IllegalStateException.class, () -> runQuery(request).close()); - assertThat(error.getMessage(), containsString("simulated")); + var error = expectThrows(Exception.class, () -> runQuery(request).close()); + EsqlTestUtils.assertEsqlFailure(error); + var unwrapped = ExceptionsHelper.unwrap(error, simulatedFailure.getClass()); + assertNotNull(unwrapped); + assertThat(unwrapped.getMessage(), equalTo(simulatedFailure.getMessage())); } request.allowPartialResults(true); try (var resp = runQuery(request)) { @@ -260,6 +268,15 @@ public void testFailToStartRequestOnRemoteCluster() throws Exception { } } + private static Exception randomFailure() { + return randomFrom( + new IllegalStateException("driver was closed already"), + new CircuitBreakingException("low memory", CircuitBreaker.Durability.PERMANENT), + new IOException("broken disk"), + new ResourceNotFoundException("exchange sink was not found") + ); + } + private Set populateIndex(String clusterAlias, String indexName, int numShards) { Client client = client(clusterAlias); assertAcked( @@ -309,7 +326,7 @@ private Set populateIndexWithFailingFields(String clusterAlias, String i ); Set ids = new HashSet<>(); String tag = clusterAlias.isEmpty() ? "local" : clusterAlias; - int numDocs = between(1, 100); // large enough to have failing documents in every shard + int numDocs = between(50, 100); // large enough to have failing documents in every shard for (int i = 0; i < numDocs; i++) { String id = Long.toString(nextDocId.incrementAndGet()); client.prepareIndex(indexName).setSource("id", id, "tag", tag, "v", i).get(); From badef87052eb987b8f8d5f687cc746e7479b5c3d Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 18 Feb 2025 09:56:24 -0800 Subject: [PATCH 06/10] restore compute-group description --- .../xpack/esql/plugin/ClusterComputeHandler.java | 2 +- .../elasticsearch/xpack/esql/plugin/ComputeService.java | 7 ++----- .../xpack/esql/plugin/DataNodeComputeHandler.java | 5 ++++- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java index 61d2f581471b1..db62b02214ac6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java @@ -110,7 +110,7 @@ void startComputeOnRemoteCluster( groupTask = rootTask; onGroupFailure = cancelQueryOnFailure; } else { - groupTask = computeService.createGroupTask(rootTask); + groupTask = computeService.createGroupTask(rootTask, () -> "compute group: cluster [" + clusterAlias + "]"); onGroupFailure = computeService.cancelQueryOnFailure(groupTask); l = ActionListener.runAfter(l, () -> transportService.getTaskManager().unregister(groupTask)); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index a97838d2cd651..ca0f409e36b98 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -414,15 +414,12 @@ Runnable cancelQueryOnFailure(CancellableTask task) { }); } - CancellableTask createGroupTask(Task parentTask) { + CancellableTask createGroupTask(Task parentTask, Supplier description) { final TaskManager taskManager = transportService.getTaskManager(); return (CancellableTask) taskManager.register( "transport", "esql_compute_group", - new ComputeGroupTaskRequest( - parentTask.taskInfo(transportService.getLocalNode().getId(), false).taskId(), - parentTask::getDescription - ) + new ComputeGroupTaskRequest(parentTask.taskInfo(transportService.getLocalNode().getId(), false).taskId(), description) ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java index 2560ffa9c71af..e4ba1678722cc 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java @@ -130,7 +130,10 @@ protected void sendRequest( final Runnable onGroupFailure; final CancellableTask groupTask; if (allowPartialResults) { - groupTask = computeService.createGroupTask(parentTask); + groupTask = computeService.createGroupTask( + parentTask, + () -> "compute group: data-node [" + node.getName() + "], " + shardIds + " [" + shardIds + "]" + ); onGroupFailure = computeService.cancelQueryOnFailure(groupTask); l = ActionListener.runAfter(l, () -> transportService.getTaskManager().unregister(groupTask)); } else { From 9283823a8e82026186b321343fd1f1f8ccf04733 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 18 Feb 2025 12:08:54 -0800 Subject: [PATCH 07/10] use bool --- .../xpack/esql/plugin/ClusterComputeHandler.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java index db62b02214ac6..de5b67802e15a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java @@ -37,7 +37,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -82,10 +82,10 @@ void startComputeOnRemoteCluster( listener = ActionListener.runBefore(listener, exchangeSource.addEmptySink()::close); final var childSessionId = computeService.newChildSession(sessionId); final String clusterAlias = cluster.clusterAlias(); - final AtomicInteger pagesFetched = new AtomicInteger(); + final AtomicBoolean pagesFetched = new AtomicBoolean(); final AtomicReference finalResponse = new AtomicReference<>(); listener = listener.delegateResponse((l, e) -> { - final boolean receivedResults = finalResponse.get() != null || pagesFetched.get() > 0; + final boolean receivedResults = finalResponse.get() != null || pagesFetched.get(); if (receivedResults == false && EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) { EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e); l.onResponse(List.of()); @@ -122,7 +122,7 @@ void startComputeOnRemoteCluster( exchangeSource.addRemoteSink( remoteSink, failFast, - pagesFetched::incrementAndGet, + () -> pagesFetched.set(true), queryPragmas.concurrentExchangeClients(), computeListener.acquireAvoid() ); From d3be47f51828e6d03ae3951434c37b8535cee479 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 18 Feb 2025 14:01:32 -0800 Subject: [PATCH 08/10] local cluster --- ...CrossClusterQueryWithPartialResultsIT.java | 53 ++++++++++++++++++- .../xpack/esql/plugin/ComputeService.java | 39 ++++++++++---- 2 files changed, 80 insertions(+), 12 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java index 04c5cd57ffb01..72a075b7ff971 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.test.FailingFieldPlugin; @@ -268,12 +269,62 @@ public void testFailToStartRequestOnRemoteCluster() throws Exception { } } + public void testFailSearchShardsOnLocalCluster() throws Exception { + populateIndices(); + Exception simulatedFailure = randomFailure(); + for (TransportService transportService : cluster(LOCAL_CLUSTER).getInstances(TransportService.class)) { + MockTransportService ts = asInstanceOf(MockTransportService.class, transportService); + ts.addRequestHandlingBehavior( + EsqlSearchShardsAction.NAME, + (handler, request, channel, task) -> { channel.sendResponse(simulatedFailure); } + ); + } + try { + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM ok*,*a:ok* | KEEP id"); + request.includeCCSMetadata(randomBoolean()); + { + request.allowPartialResults(false); + var error = expectThrows(Exception.class, () -> runQuery(request).close()); + EsqlTestUtils.assertEsqlFailure(error); + var unwrapped = ExceptionsHelper.unwrap(error, simulatedFailure.getClass()); + assertNotNull(unwrapped); + assertThat(unwrapped.getMessage(), equalTo(simulatedFailure.getMessage())); + } + request.allowPartialResults(true); + try (var resp = runQuery(request)) { + assertTrue(resp.isPartial()); + List> rows = getValuesList(resp); + Set returnedIds = new HashSet<>(); + for (List row : rows) { + assertThat(row.size(), equalTo(1)); + String id = (String) row.get(0); + assertTrue(returnedIds.add(id)); + } + assertThat(returnedIds, equalTo(remote1.okIds)); + if (request.includeCCSMetadata()) { + EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(LOCAL_CLUSTER); + assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); + + EsqlExecutionInfo.Cluster remoteInfo = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_1); + assertThat(remoteInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + } + } + } finally { + for (TransportService transportService : cluster(LOCAL_CLUSTER).getInstances(TransportService.class)) { + MockTransportService ts = asInstanceOf(MockTransportService.class, transportService); + ts.clearAllRules(); + } + } + } + private static Exception randomFailure() { return randomFrom( new IllegalStateException("driver was closed already"), new CircuitBreakingException("low memory", CircuitBreaker.Durability.PERMANENT), new IOException("broken disk"), - new ResourceNotFoundException("exchange sink was not found") + new ResourceNotFoundException("exchange sink was not found"), + new EsRejectedExecutionException("node is shutting down") ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index ca0f409e36b98..e69de5a20a888 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.RunOnce; @@ -213,15 +214,18 @@ public void execute( cancelQueryOnFailure, computeListener.acquireCompute().delegateFailure((l, profiles) -> { if (execInfo.clusterInfo.containsKey(LOCAL_CLUSTER)) { - var tookTime = TimeValue.timeValueNanos(System.nanoTime() - execInfo.getRelativeStartNanos()); - final Integer failedShards = execInfo.getCluster(LOCAL_CLUSTER).getFailedShards(); - var status = localClusterWasInterrupted.get() || (failedShards != null && failedShards > 0) - ? EsqlExecutionInfo.Cluster.Status.PARTIAL - : EsqlExecutionInfo.Cluster.Status.SUCCESSFUL; - execInfo.swapCluster( - LOCAL_CLUSTER, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(status).setTook(tookTime).build() - ); + execInfo.swapCluster(LOCAL_CLUSTER, (k, v) -> { + var tookTime = TimeValue.timeValueNanos(System.nanoTime() - execInfo.getRelativeStartNanos()); + var builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(tookTime); + if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) { + final Integer failedShards = execInfo.getCluster(LOCAL_CLUSTER).getFailedShards(); + var status = localClusterWasInterrupted.get() || (failedShards != null && failedShards > 0) + ? EsqlExecutionInfo.Cluster.Status.PARTIAL + : EsqlExecutionInfo.Cluster.Status.SUCCESSFUL; + builder.setStatus(status); + } + return builder.build(); + }); } l.onResponse(profiles); }) @@ -244,6 +248,7 @@ public void execute( ); // starts computes on data nodes on the main cluster if (localConcreteIndices != null && localConcreteIndices.indices().length > 0) { + final var dataNodesListener = localListener.acquireCompute(); dataNodeComputeHandler.startComputeOnDataNodes( sessionId, LOCAL_CLUSTER, @@ -254,7 +259,7 @@ public void execute( localOriginalIndices, exchangeSource, cancelQueryOnFailure, - localListener.acquireCompute().map(r -> { + ActionListener.wrap(r -> { localClusterWasInterrupted.set(execInfo.isStopped()); execInfo.swapCluster( LOCAL_CLUSTER, @@ -264,7 +269,19 @@ public void execute( .setFailedShards(r.getFailedShards()) .build() ); - return r.getProfiles(); + dataNodesListener.onResponse(r.getProfiles()); + }, e -> { + if (configuration.allowPartialResults()) { + execInfo.swapCluster( + LOCAL_CLUSTER, + (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus( + EsqlExecutionInfo.Cluster.Status.PARTIAL + ).setFailures(List.of(new ShardSearchFailure(e))).build() + ); + dataNodesListener.onResponse(List.of()); + } else { + dataNodesListener.onFailure(e); + } }) ); } From 9e474016f9b4fc4821b4e0b9fdae3633844433a1 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 18 Feb 2025 14:32:25 -0800 Subject: [PATCH 09/10] update shards info --- .../esql/plugin/ClusterComputeHandler.java | 49 +++++++------------ 1 file changed, 19 insertions(+), 30 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java index de5b67802e15a..36650f5ca2c4e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java @@ -39,7 +39,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; /** * Manages computes across multiple clusters by sending {@link ClusterComputeRequest} to remote clusters and executing the computes. @@ -146,37 +145,27 @@ void startComputeOnRemoteCluster( } private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String clusterAlias, ComputeResponse resp) { - Function runningToSuccess = status -> { - if (status != EsqlExecutionInfo.Cluster.Status.RUNNING) { - return status; - } else if (executionInfo.isStopped() || resp.failedShards > 0) { - return EsqlExecutionInfo.Cluster.Status.PARTIAL; + executionInfo.swapCluster(clusterAlias, (k, v) -> { + var builder = new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(resp.getTotalShards()) + .setSuccessfulShards(resp.getSuccessfulShards()) + .setSkippedShards(resp.getSkippedShards()) + .setFailedShards(resp.getFailedShards()); + if (resp.getTook() != null) { + builder.setTook(TimeValue.timeValueNanos(executionInfo.planningTookTime().nanos() + resp.getTook().nanos())); } else { - return EsqlExecutionInfo.Cluster.Status.SUCCESSFUL; + // if the cluster is an older version and does not send back took time, then calculate it here on the coordinator + // and leave shard info unset, so it is not shown in the CCS metadata section of the JSON response + builder.setTook(executionInfo.tookSoFar()); } - }; - if (resp.getTook() != null) { - var tookTime = TimeValue.timeValueNanos(executionInfo.planningTookTime().nanos() + resp.getTook().nanos()); - executionInfo.swapCluster( - clusterAlias, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(runningToSuccess.apply(v.getStatus())) - .setTook(tookTime) - .setTotalShards(resp.getTotalShards()) - .setSuccessfulShards(resp.getSuccessfulShards()) - .setSkippedShards(resp.getSkippedShards()) - .setFailedShards(resp.getFailedShards()) - .build() - ); - } else { - // if the cluster is an older version and does not send back took time, then calculate it here on the coordinator - // and leave shard info unset, so it is not shown in the CCS metadata section of the JSON response - executionInfo.swapCluster( - clusterAlias, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(runningToSuccess.apply(v.getStatus())) - .setTook(executionInfo.tookSoFar()) - .build() - ); - } + if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) { + if (executionInfo.isStopped() || resp.failedShards > 0) { + builder.setStatus(EsqlExecutionInfo.Cluster.Status.PARTIAL); + } else { + builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL); + } + } + return builder.build(); + }); } List getRemoteClusters( From e00080e94d9f6da3965a7e30b0a17ced50266b43 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 19 Feb 2025 22:40:31 -0800 Subject: [PATCH 10/10] Combine method --- .../action/AbstractCrossClusterTestCase.java | 22 +++++++++---- .../esql/action/CrossClusterQueryIT.java | 2 +- ...CrossClusterQueryWithPartialResultsIT.java | 32 +++---------------- 3 files changed, 20 insertions(+), 36 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java index 510f5945f745a..d31315d7cd0ef 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java @@ -32,8 +32,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -48,6 +50,7 @@ public abstract class AbstractCrossClusterTestCase extends AbstractMultiClusters protected static final String REMOTE_INDEX = "logs-2"; protected static final String INDEX_WITH_BLOCKING_MAPPING = "blocking"; protected static final String INDEX_WITH_FAIL_MAPPING = "failing"; + protected static final AtomicLong NEXT_DOC_ID = new AtomicLong(0); @Override protected List remoteClusterAlias() { @@ -150,7 +153,7 @@ protected static void assertClusterMetadataInResponse(EsqlQueryResponse resp, bo protected Map setupClusters(int numClusters) throws IOException { assert numClusters == 2 || numClusters == 3 : "2 or 3 clusters supported not: " + numClusters; int numShardsLocal = randomIntBetween(1, 5); - populateLocalIndices(LOCAL_INDEX, numShardsLocal); + populateIndex(LOCAL_CLUSTER, LOCAL_INDEX, numShardsLocal, 10); int numShardsRemote = randomIntBetween(1, 5); populateRemoteIndices(REMOTE_CLUSTER_1, REMOTE_INDEX, numShardsRemote); @@ -180,19 +183,24 @@ protected Map setupClusters(int numClusters) throws IOException return clusterInfo; } - protected void populateLocalIndices(String indexName, int numShards) { - Client localClient = client(LOCAL_CLUSTER); + protected Set populateIndex(String clusterAlias, String indexName, int numShards, int numDocs) { + Client client = client(clusterAlias); assertAcked( - localClient.admin() + client.admin() .indices() .prepareCreate(indexName) .setSettings(Settings.builder().put("index.number_of_shards", numShards)) .setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long", "const", "type=long") ); - for (int i = 0; i < 10; i++) { - localClient.prepareIndex(indexName).setSource("id", "local-" + i, "tag", "local", "v", i).get(); + Set ids = new HashSet<>(); + String tag = Strings.isEmpty(clusterAlias) ? "local" : clusterAlias; + for (int i = 0; i < numDocs; i++) { + String id = Long.toString(NEXT_DOC_ID.incrementAndGet()); + client.prepareIndex(indexName).setSource("id", id, "tag", tag, "v", i).get(); + ids.add(id); } - localClient.admin().indices().prepareRefresh(indexName).get(); + client.admin().indices().prepareRefresh(indexName).get(); + return ids; } protected void populateRuntimeIndex(String clusterAlias, String langName, String indexName) throws IOException { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java index c1976c9fa2ad8..52d03c483332c 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java @@ -948,7 +948,7 @@ Map createEmptyIndicesWithNoMappings(int numClusters) { Map setupFailClusters() throws IOException { int numShardsLocal = randomIntBetween(1, 3); - populateLocalIndices(LOCAL_INDEX, numShardsLocal); + populateIndex(LOCAL_CLUSTER, LOCAL_INDEX, numShardsLocal, 10); int numShardsRemote = randomIntBetween(1, 3); populateRemoteIndicesFail(REMOTE_CLUSTER_1, REMOTE_INDEX, numShardsRemote); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java index 72a075b7ff971..ac67c1458dd64 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java @@ -10,7 +10,6 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.client.internal.Client; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.settings.Settings; @@ -32,7 +31,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -46,7 +44,6 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; public class CrossClusterQueryWithPartialResultsIT extends AbstractCrossClusterTestCase { - private static final AtomicLong nextDocId = new AtomicLong(0); private static class ClusterSetup { final int okShards = randomIntBetween(1, 5); @@ -59,13 +56,13 @@ private static class ClusterSetup { private final ClusterSetup remote2 = new ClusterSetup(); void populateIndices() throws Exception { - local.okIds = populateIndex(LOCAL_CLUSTER, "ok-local", local.okShards); + local.okIds = populateIndex(LOCAL_CLUSTER, "ok-local", local.okShards, between(1, 100)); populateIndexWithFailingFields(LOCAL_CLUSTER, "fail-local", local.failingShards); - remote1.okIds = populateIndex(REMOTE_CLUSTER_1, "ok-cluster1", remote1.okShards); + remote1.okIds = populateIndex(REMOTE_CLUSTER_1, "ok-cluster1", remote1.okShards, between(1, 100)); populateIndexWithFailingFields(REMOTE_CLUSTER_1, "fail-cluster1", remote1.failingShards); - remote2.okIds = populateIndex(REMOTE_CLUSTER_2, "ok-cluster2", remote2.okShards); + remote2.okIds = populateIndex(REMOTE_CLUSTER_2, "ok-cluster2", remote2.okShards, between(1, 100)); populateIndexWithFailingFields(REMOTE_CLUSTER_2, "fail-cluster2", remote2.failingShards); } @@ -328,27 +325,6 @@ private static Exception randomFailure() { ); } - private Set populateIndex(String clusterAlias, String indexName, int numShards) { - Client client = client(clusterAlias); - assertAcked( - client.admin() - .indices() - .prepareCreate(indexName) - .setSettings(Settings.builder().put("index.number_of_shards", numShards)) - .setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long", "const", "type=long") - ); - Set ids = new HashSet<>(); - int numDocs = between(1, 100); - String tag = Strings.isEmpty(clusterAlias) ? "local" : clusterAlias; - for (int i = 0; i < numDocs; i++) { - String id = Long.toString(nextDocId.incrementAndGet()); - client.prepareIndex(indexName).setSource("id", id, "tag", tag, "v", i).get(); - ids.add(id); - } - client.admin().indices().prepareRefresh(indexName).get(); - return ids; - } - private Set populateIndexWithFailingFields(String clusterAlias, String indexName, int numShards) throws IOException { Client client = client(clusterAlias); XContentBuilder mapping = JsonXContent.contentBuilder().startObject(); @@ -379,7 +355,7 @@ private Set populateIndexWithFailingFields(String clusterAlias, String i String tag = clusterAlias.isEmpty() ? "local" : clusterAlias; int numDocs = between(50, 100); // large enough to have failing documents in every shard for (int i = 0; i < numDocs; i++) { - String id = Long.toString(nextDocId.incrementAndGet()); + String id = Long.toString(NEXT_DOC_ID.incrementAndGet()); client.prepareIndex(indexName).setSource("id", id, "tag", tag, "v", i).get(); ids.add(id); }