From 079c419d408461cb8e70b036f74660bbbcf486eb Mon Sep 17 00:00:00 2001 From: Michael Peterson Date: Wed, 16 Oct 2024 12:40:29 -0400 Subject: [PATCH 1/7] Intmd commit - Added 'mark' methods to EsqlExecutionInfo and being used on coordinator but not yet on remote clusters --- .../esql/action/CrossClustersQueryIT.java | 74 +++++++++++------ .../xpack/esql/action/EsqlExecutionInfo.java | 37 ++++++++- .../xpack/esql/plugin/ComputeListener.java | 35 +++++--- .../xpack/esql/plugin/ComputeService.java | 79 +++++++++++-------- .../xpack/esql/session/EsqlSession.java | 24 ------ .../esql/plugin/ComputeListenerTests.java | 7 -- 6 files changed, 147 insertions(+), 109 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java index adfa2fc7273cd..ddd5cff014ed2 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java @@ -97,7 +97,8 @@ public void testSuccessfulPathways() { EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); assertNotNull(executionInfo); assertThat(executionInfo.isCrossClusterSearch(), is(true)); - assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER))); @@ -106,6 +107,7 @@ public void testSuccessfulPathways() { assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*")); assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); assertThat(remoteCluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); assertThat(remoteCluster.getTotalShards(), equalTo(remoteNumShards)); assertThat(remoteCluster.getSuccessfulShards(), equalTo(remoteNumShards)); assertThat(remoteCluster.getSkippedShards(), equalTo(0)); @@ -115,6 +117,7 @@ public void testSuccessfulPathways() { assertThat(localCluster.getIndexExpression(), equalTo("logs-*")); assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); assertThat(localCluster.getTotalShards(), equalTo(localNumShards)); assertThat(localCluster.getSuccessfulShards(), equalTo(localNumShards)); assertThat(localCluster.getSkippedShards(), equalTo(0)); @@ -133,7 +136,8 @@ public void testSuccessfulPathways() { EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); assertNotNull(executionInfo); assertThat(executionInfo.isCrossClusterSearch(), is(true)); - assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER))); @@ -142,6 +146,7 @@ public void testSuccessfulPathways() { assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*")); assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); assertThat(remoteCluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); assertThat(remoteCluster.getTotalShards(), equalTo(remoteNumShards)); assertThat(remoteCluster.getSuccessfulShards(), equalTo(remoteNumShards)); assertThat(remoteCluster.getSkippedShards(), equalTo(0)); @@ -151,6 +156,7 @@ public void testSuccessfulPathways() { assertThat(localCluster.getIndexExpression(), equalTo("logs-*")); assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); assertThat(localCluster.getTotalShards(), equalTo(localNumShards)); assertThat(localCluster.getSuccessfulShards(), equalTo(localNumShards)); assertThat(localCluster.getSkippedShards(), equalTo(0)); @@ -180,7 +186,8 @@ public void testSearchesWhereMissingIndicesAreSpecified() { assertNotNull(executionInfo); assertThat(executionInfo.isCrossClusterSearch(), is(true)); - assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER))); @@ -189,6 +196,7 @@ public void testSearchesWhereMissingIndicesAreSpecified() { assertThat(remoteCluster.getIndexExpression(), equalTo("no_such_index")); assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); assertThat(remoteCluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); assertThat(remoteCluster.getTotalShards(), equalTo(0)); // 0 since no matching index, thus no shards to search assertThat(remoteCluster.getSuccessfulShards(), equalTo(0)); assertThat(remoteCluster.getSkippedShards(), equalTo(0)); @@ -198,6 +206,7 @@ public void testSearchesWhereMissingIndicesAreSpecified() { assertThat(localCluster.getIndexExpression(), equalTo("logs-*")); assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); assertThat(localCluster.getTotalShards(), equalTo(localNumShards)); assertThat(localCluster.getSuccessfulShards(), equalTo(localNumShards)); assertThat(localCluster.getSkippedShards(), equalTo(0)); @@ -219,7 +228,8 @@ public void testSearchesWhereMissingIndicesAreSpecified() { EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); assertNotNull(executionInfo); assertThat(executionInfo.isCrossClusterSearch(), is(true)); - assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER))); @@ -228,6 +238,7 @@ public void testSearchesWhereMissingIndicesAreSpecified() { assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*")); assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); assertThat(remoteCluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); assertThat(remoteCluster.getTotalShards(), equalTo(remoteNumShards)); assertThat(remoteCluster.getSuccessfulShards(), equalTo(remoteNumShards)); assertThat(remoteCluster.getSkippedShards(), equalTo(0)); @@ -235,8 +246,9 @@ public void testSearchesWhereMissingIndicesAreSpecified() { EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); assertThat(localCluster.getIndexExpression(), equalTo("no_such_index")); - assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); assertThat(localCluster.getTotalShards(), equalTo(0)); assertThat(localCluster.getSuccessfulShards(), equalTo(0)); assertThat(localCluster.getSkippedShards(), equalTo(0)); @@ -258,7 +270,8 @@ public void testSearchesWhereMissingIndicesAreSpecified() { EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); assertNotNull(executionInfo); assertThat(executionInfo.isCrossClusterSearch(), is(true)); - assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER))); @@ -267,6 +280,7 @@ public void testSearchesWhereMissingIndicesAreSpecified() { assertThat(remoteCluster.getIndexExpression(), equalTo("no_such_index1,no_such_index2")); assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); assertThat(remoteCluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); assertThat(remoteCluster.getTotalShards(), equalTo(0)); assertThat(remoteCluster.getSuccessfulShards(), equalTo(0)); assertThat(remoteCluster.getSkippedShards(), equalTo(0)); @@ -276,6 +290,7 @@ public void testSearchesWhereMissingIndicesAreSpecified() { assertThat(localCluster.getIndexExpression(), equalTo("no_such_index*,logs-1")); assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); assertThat(localCluster.getTotalShards(), equalTo(localNumShards)); assertThat(localCluster.getSuccessfulShards(), equalTo(localNumShards)); assertThat(localCluster.getSkippedShards(), equalTo(0)); @@ -291,7 +306,8 @@ public void testSearchesWhereMissingIndicesAreSpecified() { assertNotNull(executionInfo); assertThat(executionInfo.isCrossClusterSearch(), is(true)); - assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER))); @@ -300,6 +316,7 @@ public void testSearchesWhereMissingIndicesAreSpecified() { assertThat(remoteCluster.getIndexExpression(), equalTo("no_such_index*")); assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); assertThat(remoteCluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); assertThat(remoteCluster.getTotalShards(), equalTo(0)); assertThat(remoteCluster.getSuccessfulShards(), equalTo(0)); assertThat(remoteCluster.getSkippedShards(), equalTo(0)); @@ -309,6 +326,7 @@ public void testSearchesWhereMissingIndicesAreSpecified() { assertThat(localCluster.getIndexExpression(), equalTo("logs-*")); assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); assertThat(localCluster.getTotalShards(), equalTo(localNumShards)); assertThat(localCluster.getSuccessfulShards(), equalTo(localNumShards)); assertThat(localCluster.getSkippedShards(), equalTo(0)); @@ -414,20 +432,20 @@ public void testCCSExecutionOnSearchesWithLimit0() { assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); assertThat(remoteCluster.getTook().millis(), greaterThanOrEqualTo(0L)); assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); - assertNull(remoteCluster.getTotalShards()); - assertNull(remoteCluster.getSuccessfulShards()); - assertNull(remoteCluster.getSkippedShards()); - assertNull(remoteCluster.getFailedShards()); + assertThat(remoteCluster.getTotalShards(), equalTo(0)); + assertThat(remoteCluster.getSuccessfulShards(), equalTo(0)); + assertThat(remoteCluster.getSkippedShards(), equalTo(0)); + assertThat(remoteCluster.getFailedShards(), equalTo(0)); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); assertThat(localCluster.getIndexExpression(), equalTo("logs*")); assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L)); assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); - assertNull(localCluster.getTotalShards()); - assertNull(localCluster.getSuccessfulShards()); - assertNull(localCluster.getSkippedShards()); - assertNull(localCluster.getFailedShards()); + assertThat(remoteCluster.getTotalShards(), equalTo(0)); + assertThat(remoteCluster.getSuccessfulShards(), equalTo(0)); + assertThat(remoteCluster.getSkippedShards(), equalTo(0)); + assertThat(remoteCluster.getFailedShards(), equalTo(0)); } try (EsqlQueryResponse resp = runQuery("FROM logs*,cluster-a:nomatch* | LIMIT 0", requestIncludeMeta)) { @@ -442,7 +460,8 @@ public void testCCSExecutionOnSearchesWithLimit0() { EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER); assertThat(remoteCluster.getIndexExpression(), equalTo("nomatch*")); assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); - assertThat(remoteCluster.getTook().millis(), equalTo(0L)); + assertThat(remoteCluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); assertThat(remoteCluster.getTotalShards(), equalTo(0)); assertThat(remoteCluster.getSuccessfulShards(), equalTo(0)); assertThat(remoteCluster.getSkippedShards(), equalTo(0)); @@ -453,10 +472,10 @@ public void testCCSExecutionOnSearchesWithLimit0() { assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L)); assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); - assertNull(localCluster.getTotalShards()); - assertNull(localCluster.getSuccessfulShards()); - assertNull(localCluster.getSkippedShards()); - assertNull(localCluster.getFailedShards()); + assertThat(localCluster.getTotalShards(), equalTo(0)); + assertThat(localCluster.getSuccessfulShards(), equalTo(0)); + assertThat(localCluster.getSkippedShards(), equalTo(0)); + assertThat(localCluster.getFailedShards(), equalTo(0)); } try (EsqlQueryResponse resp = runQuery("FROM nomatch*,cluster-a:* | LIMIT 0", requestIncludeMeta)) { @@ -473,17 +492,20 @@ public void testCCSExecutionOnSearchesWithLimit0() { assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); assertThat(remoteCluster.getTook().millis(), greaterThanOrEqualTo(0L)); assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); - assertNull(remoteCluster.getTotalShards()); - assertNull(remoteCluster.getSuccessfulShards()); - assertNull(remoteCluster.getSkippedShards()); - assertNull(remoteCluster.getFailedShards()); + assertThat(remoteCluster.getTotalShards(), equalTo(0)); + assertThat(remoteCluster.getSuccessfulShards(), equalTo(0)); + assertThat(remoteCluster.getSkippedShards(), equalTo(0)); + assertThat(remoteCluster.getFailedShards(), equalTo(0)); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); assertThat(localCluster.getIndexExpression(), equalTo("nomatch*")); - // TODO: in https://github.com/elastic/elasticsearch/issues/112886, this will be changed to be SKIPPED - assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L)); assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); + assertThat(remoteCluster.getTotalShards(), equalTo(0)); + assertThat(remoteCluster.getSuccessfulShards(), equalTo(0)); + assertThat(remoteCluster.getSkippedShards(), equalTo(0)); + assertThat(remoteCluster.getFailedShards(), equalTo(0)); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index dabccd4ffeb17..65fa96ccfb9c5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -33,6 +33,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.function.Predicate; @@ -60,24 +61,29 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable { // updates to the Cluster occur with the updateCluster method that given the key to map transforms an // old Cluster Object to a new Cluster Object with the remapping function. public final Map clusterInfo; - // not Writeable since it is only needed on the primary CCS coordinator - private final transient Predicate skipUnavailablePredicate; private TimeValue overallTook; - // whether the user has asked for CCS metadata to be in the JSON response (the overall took will always be present) private final boolean includeCCSMetadata; + // fields that are not Writeable since it is only needed on the primary CCS coordinator + private final transient Predicate skipUnavailablePredicate; + private final transient Long relativeStartNanos; + // TODO: make this a SetOnce? + private transient TimeValue planningTookTime; // time elapsed since start of query to calling ComputeService.execute + public EsqlExecutionInfo(boolean includeCCSMetadata) { this(Predicates.always(), includeCCSMetadata); // default all clusters to skip_unavailable=true } /** * @param skipUnavailablePredicate provide lookup for whether a given cluster has skip_unavailable set to true or false + * @param includeCCSMetadata (user defined setting) whether to include the CCS metadata in the HTTP response */ public EsqlExecutionInfo(Predicate skipUnavailablePredicate, boolean includeCCSMetadata) { this.clusterInfo = ConcurrentCollections.newConcurrentMap(); this.skipUnavailablePredicate = skipUnavailablePredicate; this.includeCCSMetadata = includeCCSMetadata; + this.relativeStartNanos = System.nanoTime(); } /** @@ -88,6 +94,7 @@ public EsqlExecutionInfo(Predicate skipUnavailablePredicate, boolean inc this.clusterInfo = clusterInfo; this.includeCCSMetadata = includeCCSMetadata; this.skipUnavailablePredicate = Predicates.always(); + this.relativeStartNanos = null; } public EsqlExecutionInfo(StreamInput in) throws IOException { @@ -106,6 +113,7 @@ public EsqlExecutionInfo(StreamInput in) throws IOException { this.includeCCSMetadata = false; } this.skipUnavailablePredicate = Predicates.always(); + this.relativeStartNanos = null; } @Override @@ -125,7 +133,28 @@ public boolean includeCCSMetadata() { return includeCCSMetadata; } - public void overallTook(TimeValue took) { + public Long getRelativeStartNanos() { + return relativeStartNanos; + } + + public void markEndPlanning() { + assert planningTookTime == null : "markEndPlanning should only be called once"; + planningTookTime = new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS); + System.err.println(">>> >>> EEE ExecInfo: markEndPlanning: " + planningTookTime); + } + + public TimeValue planningTookTime() { + return planningTookTime; + } + + public void markEndQuery() { + assert overallTook == null : "markEndQuery should only be called once"; + overallTook = new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS); + } + + // TODO: remove this? + // for testing only - use markEndQuery in production code + void overallTook(TimeValue took) { this.overallTook = took; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java index d8fc4da070767..1a5be4361e454 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java @@ -47,7 +47,6 @@ final class ComputeListener implements Releasable { private final List collectedProfiles; private final ResponseHeadersCollector responseHeaders; private final EsqlExecutionInfo esqlExecutionInfo; - private final long queryStartTimeNanos; // clusterAlias indicating where this ComputeListener is running // used by the top level ComputeListener in ComputeService on both local and remote clusters private final String whereRunning; @@ -61,7 +60,7 @@ public static ComputeListener create( CancellableTask task, ActionListener delegate ) { - return new ComputeListener(transportService, task, null, null, -1, delegate); + return new ComputeListener(transportService, task, null, null, delegate); } /** @@ -75,7 +74,6 @@ public static ComputeListener create( * @param transportService * @param task * @param executionInfo {@link EsqlExecutionInfo} to capture execution metadata - * @param queryStartTimeNanos Start time of the ES|QL query (stored in {@link org.elasticsearch.xpack.esql.session.Configuration}) * @param delegate */ public static ComputeListener create( @@ -83,10 +81,9 @@ public static ComputeListener create( TransportService transportService, CancellableTask task, EsqlExecutionInfo executionInfo, - long queryStartTimeNanos, ActionListener delegate ) { - return new ComputeListener(transportService, task, clusterAlias, executionInfo, queryStartTimeNanos, delegate); + return new ComputeListener(transportService, task, clusterAlias, executionInfo, delegate); } private ComputeListener( @@ -94,7 +91,6 @@ private ComputeListener( CancellableTask task, String clusterAlias, EsqlExecutionInfo executionInfo, - long queryStartTimeNanos, ActionListener delegate ) { this.transportService = transportService; @@ -102,7 +98,6 @@ private ComputeListener( this.responseHeaders = new ResponseHeadersCollector(transportService.getThreadPool().getThreadContext()); this.collectedProfiles = Collections.synchronizedList(new ArrayList<>()); this.esqlExecutionInfo = executionInfo; - this.queryStartTimeNanos = queryStartTimeNanos; this.whereRunning = clusterAlias; // for the DataNodeHandler ComputeListener, clusterAlias and executionInfo will be null // for the top level ComputeListener in ComputeService both will be non-null @@ -118,6 +113,7 @@ private ComputeListener( // for remote executions - this ComputeResponse is created on the remote cluster/node and will be serialized and // received by the acquireCompute method callback on the coordinating cluster EsqlExecutionInfo.Cluster cluster = esqlExecutionInfo.getCluster(clusterAlias); + System.err.println(">> >> >> RRR setting took in ComputeResponse to " + cluster.getTook()); result = new ComputeResponse( collectedProfiles.isEmpty() ? List.of() : collectedProfiles.stream().toList(), cluster.getTook(), @@ -128,7 +124,9 @@ private ComputeListener( ); } else { result = new ComputeResponse(collectedProfiles.isEmpty() ? List.of() : collectedProfiles.stream().toList()); - if (coordinatingClusterIsSearchedInCCS()) { + if (coordinatingClusterIsSearchedInCCS() + && esqlExecutionInfo.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) + .getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) { // mark local cluster as finished once the coordinator and all data nodes have finished processing executionInfo.swapCluster( RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, @@ -196,8 +194,8 @@ ActionListener acquireAvoid() { * info to be gathered (namely, the DataNodeRequestHandler ComputeListener) should pass in null. */ ActionListener acquireCompute(@Nullable String computeClusterAlias) { - assert computeClusterAlias == null || (esqlExecutionInfo != null && queryStartTimeNanos > 0) - : "When clusterAlias is provided to acquireCompute, executionInfo must be non-null and queryStartTimeNanos must be positive"; + assert computeClusterAlias == null || (esqlExecutionInfo != null && esqlExecutionInfo.getRelativeStartNanos() != null) + : "When clusterAlias is provided to acquireCompute, executionInfo and relativeStartTimeNanos must be non-null"; return acquireAvoid().map(resp -> { responseHeaders.collect(); @@ -210,12 +208,20 @@ ActionListener acquireCompute(@Nullable String computeClusterAl } if (isCCSListener(computeClusterAlias)) { // this is the callback for the listener to the CCS compute + TimeValue tookOnCluster; + if (resp.getTook() != null) { + TimeValue remoteExecutionTime = resp.getTook(); + TimeValue planningTookTime = esqlExecutionInfo.planningTookTime(); + tookOnCluster = new TimeValue(planningTookTime.nanos() + remoteExecutionTime.nanos(), TimeUnit.NANOSECONDS); + } else { + tookOnCluster = null; + } esqlExecutionInfo.swapCluster( computeClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v) // for now ESQL doesn't return partial results, so set status to SUCCESSFUL .setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL) - .setTook(resp.getTook()) + .setTook(tookOnCluster) .setTotalShards(resp.getTotalShards()) .setSuccessfulShards(resp.getSuccessfulShards()) .setSkippedShards(resp.getSkippedShards()) @@ -223,10 +229,13 @@ ActionListener acquireCompute(@Nullable String computeClusterAl .build() ); } else if (shouldRecordTookTime()) { + Long relativeStartNanos = esqlExecutionInfo.getRelativeStartNanos(); // handler for this cluster's data node and coordinator completion (runs on "local" and remote clusters) - TimeValue tookTime = new TimeValue(System.nanoTime() - queryStartTimeNanos, TimeUnit.NANOSECONDS); + assert relativeStartNanos != null : "queryStartTimeNanos not set properly"; + TimeValue tookTime = new TimeValue(System.nanoTime() - relativeStartNanos.longValue(), TimeUnit.NANOSECONDS); esqlExecutionInfo.swapCluster(computeClusterAlias, (k, v) -> { - if (v.getTook() == null || v.getTook().nanos() < tookTime.nanos()) { + if (v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED + && (v.getTook() == null || v.getTook().nanos() < tookTime.nanos())) { return new EsqlExecutionInfo.Cluster.Builder(v).setTook(tookTime).build(); } else { return v; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index f714695504a1d..a3e84d9bcac21 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -32,7 +32,6 @@ import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.index.query.QueryBuilder; @@ -81,7 +80,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME; @@ -138,6 +136,7 @@ public void execute( EsqlExecutionInfo execInfo, ActionListener listener ) { + updateExecutionInfoAtStartOfQueryExecution(execInfo); Tuple coordinatorAndDataNodePlan = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode( physicalPlan, configuration @@ -173,19 +172,10 @@ public void execute( null ); String local = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; - try ( - var computeListener = ComputeListener.create( - local, - transportService, - rootTask, - execInfo, - configuration.getQueryStartTimeNanos(), - listener.map(r -> { - updateExecutionInfoAfterCoordinatorOnlyQuery(configuration.getQueryStartTimeNanos(), execInfo); - return new Result(physicalPlan.output(), collectedPages, r.getProfiles(), execInfo); - }) - ) - ) { + try (var computeListener = ComputeListener.create(local, transportService, rootTask, execInfo, listener.map(r -> { + updateExecutionInfoAfterCoordinatorOnlyQuery(execInfo); + return new Result(physicalPlan.output(), collectedPages, r.getProfiles(), execInfo); + }))) { runCompute(rootTask, computeContext, coordinatorPlan, computeListener.acquireCompute(local)); return; } @@ -205,7 +195,6 @@ public void execute( queryPragmas.exchangeBufferSize(), transportService.getThreadPool().executor(ThreadPool.Names.SEARCH) ); - long start = configuration.getQueryStartTimeNanos(); String local = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; /* * Grab the output attributes here, so we can pass them to @@ -216,9 +205,8 @@ public void execute( try ( Releasable ignored = exchangeSource.addEmptySink(); // this is the top level ComputeListener called once at the end (e.g., once all clusters have finished for a CCS) - var computeListener = ComputeListener.create(local, transportService, rootTask, execInfo, start, listener.map(r -> { - long tookTimeNanos = System.nanoTime() - configuration.getQueryStartTimeNanos(); - execInfo.overallTook(new TimeValue(tookTimeNanos, TimeUnit.NANOSECONDS)); + var computeListener = ComputeListener.create(local, transportService, rootTask, execInfo, listener.map(r -> { + execInfo.markEndQuery(); return new Result(outputAttributes, collectedPages, r.getProfiles(), execInfo); })) ) { @@ -258,22 +246,44 @@ public void execute( } } - private static void updateExecutionInfoAfterCoordinatorOnlyQuery(long queryStartNanos, EsqlExecutionInfo execInfo) { - long tookTimeNanos = System.nanoTime() - queryStartNanos; - execInfo.overallTook(new TimeValue(tookTimeNanos, TimeUnit.NANOSECONDS)); + // visible for testing + static void updateExecutionInfoAtStartOfQueryExecution(EsqlExecutionInfo execInfo) { + execInfo.markEndPlanning(); + for (String clusterAlias : execInfo.clusterAliases()) { + EsqlExecutionInfo.Cluster cluster = execInfo.getCluster(clusterAlias); + if (cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) { + execInfo.swapCluster( + clusterAlias, + (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.planningTookTime()) + .setTotalShards(0) + .setSuccessfulShards(0) + .setSkippedShards(0) + .setFailedShards(0) + .build() + ); + } + } + } + + // For queries like: FROM logs*,remote*:logs* | LIMIT 0 + private static void updateExecutionInfoAfterCoordinatorOnlyQuery(EsqlExecutionInfo execInfo) { + assert execInfo.getRelativeStartNanos() != null : "Relative start time should be set on EsqlExecutionInfo but is null"; + assert execInfo.planningTookTime() != null : "Planning took time should be set on EsqlExecutionInfo but is null"; + + execInfo.markEndQuery(); if (execInfo.isCrossClusterSearch()) { for (String clusterAlias : execInfo.clusterAliases()) { - // The local cluster 'took' time gets updated as part of the acquireCompute(local) call in the coordinator, so - // here we only need to update status for remote clusters since there are no remote ComputeListeners in this case. - // This happens in cross cluster searches that use LIMIT 0, e.g, FROM logs*,remote*:logs* | LIMIT 0. - if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) { - execInfo.swapCluster(clusterAlias, (k, v) -> { - if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) { - return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL).build(); - } else { - return v; - } - }); + if (execInfo.getCluster(clusterAlias).getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) { + execInfo.swapCluster( + clusterAlias, + (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.overallTook()) + .setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL) + .setTotalShards(0) + .setSuccessfulShards(0) + .setSkippedShards(0) + .setFailedShards(0) + .build() + ); } } } @@ -837,8 +847,7 @@ public void messageReceived(ClusterComputeRequest request, TransportChannel chan EsqlExecutionInfo execInfo = new EsqlExecutionInfo(true); execInfo.swapCluster(clusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(clusterAlias, Arrays.toString(request.indices()))); CancellableTask cancellable = (CancellableTask) task; - long start = request.configuration().getQueryStartTimeNanos(); - try (var computeListener = ComputeListener.create(clusterAlias, transportService, cancellable, execInfo, start, listener)) { + try (var computeListener = ComputeListener.create(clusterAlias, transportService, cancellable, execInfo, listener)) { runComputeOnRemoteCluster( clusterAlias, request.sessionId(), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 96391c841856f..e1f4d4634c111 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -72,7 +72,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Predicate; @@ -246,7 +245,6 @@ private void preAnalyze( if (indexResolution.isValid()) { updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.getUnavailableClusters()); - updateTookTimeForRemoteClusters(executionInfo); Set newClusters = enrichPolicyResolver.groupIndicesPerCluster( indexResolution.get().concreteIndices().toArray(String[]::new) ).keySet(); @@ -307,28 +305,6 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn } } - private void updateTookTimeForRemoteClusters(EsqlExecutionInfo executionInfo) { - if (executionInfo.isCrossClusterSearch()) { - for (String clusterAlias : executionInfo.clusterAliases()) { - if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) { - executionInfo.swapCluster(clusterAlias, (k, v) -> { - if (v.getTook() == null && v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) { - // set took time in case we are finished with the remote cluster (e.g., FROM foo | LIMIT 0). - // this will be overwritten later if ES|QL operations happen on the remote cluster (the typical scenario) - TimeValue took = new TimeValue( - System.nanoTime() - configuration.getQueryStartTimeNanos(), - TimeUnit.NANOSECONDS - ); - return new EsqlExecutionInfo.Cluster.Builder(v).setTook(took).build(); - } else { - return v; - } - }); - } - } - } - } - private void preAnalyzeIndices( LogicalPlan parsed, EsqlExecutionInfo executionInfo, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java index 8cfcb605a19d5..edefcf217fec0 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java @@ -132,7 +132,6 @@ public void testEmpty() { transportService, newTask(), executionInfo, - System.nanoTime(), results ) ) { @@ -152,7 +151,6 @@ public void testCollectComputeResults() { transportService, newTask(), executionInfo, - System.nanoTime(), future ) ) { @@ -203,7 +201,6 @@ public void testAcquireComputeCCSListener() { transportService, newTask(), executionInfo, - System.nanoTime(), future ) ) { @@ -271,7 +268,6 @@ public void testAcquireComputeRunningOnRemoteClusterFillsInTookTime() { transportService, newTask(), executionInfo, - System.nanoTime(), future ) ) { @@ -331,7 +327,6 @@ public void testAcquireComputeRunningOnQueryingClusterFillsInTookTime() { transportService, newTask(), executionInfo, - System.nanoTime(), future ) ) { @@ -379,7 +374,6 @@ public void testCancelOnFailure() throws Exception { transportService, rootTask, execInfo, - System.nanoTime(), rootListener ) ) { @@ -443,7 +437,6 @@ public void onFailure(Exception e) { transportService, newTask(), executionInfo, - System.nanoTime(), ActionListener.runAfter(rootListener, latch::countDown) ) ) { From 041b002a162d362f72e47c8d6c7470dcbb38bae0 Mon Sep 17 00:00:00 2001 From: Michael Peterson Date: Thu, 17 Oct 2024 09:53:14 -0400 Subject: [PATCH 2/7] Removed debugging lines --- .../org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java | 1 - .../org/elasticsearch/xpack/esql/plugin/ComputeListener.java | 1 - 2 files changed, 2 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index 65fa96ccfb9c5..64f068fe60cd5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -140,7 +140,6 @@ public Long getRelativeStartNanos() { public void markEndPlanning() { assert planningTookTime == null : "markEndPlanning should only be called once"; planningTookTime = new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS); - System.err.println(">>> >>> EEE ExecInfo: markEndPlanning: " + planningTookTime); } public TimeValue planningTookTime() { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java index 1a5be4361e454..e5c2f99505ea7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java @@ -113,7 +113,6 @@ private ComputeListener( // for remote executions - this ComputeResponse is created on the remote cluster/node and will be serialized and // received by the acquireCompute method callback on the coordinating cluster EsqlExecutionInfo.Cluster cluster = esqlExecutionInfo.getCluster(clusterAlias); - System.err.println(">> >> >> RRR setting took in ComputeResponse to " + cluster.getTook()); result = new ComputeResponse( collectedProfiles.isEmpty() ? List.of() : collectedProfiles.stream().toList(), cluster.getTook(), From 8a7a670cdf8cf5425483ce01f071379809c354ed Mon Sep 17 00:00:00 2001 From: Michael Peterson Date: Thu, 17 Oct 2024 12:34:00 -0400 Subject: [PATCH 3/7] Now handling cases where took time is not present in the ComputeRersponse from remote cluster (older version). In that case, we set took time on the coordinator. Other changes: - changed where markEndPlanning is called since it was not compatiable with 2 phase execution model for INLINESTATS - removed assert to oncally call markEndQuery once since it was not compatiable with 2 phase execution model for INLINESTATS --- .../xpack/esql/action/EsqlExecutionInfo.java | 3 +- .../xpack/esql/plugin/ComputeListener.java | 4 +- .../xpack/esql/plugin/ComputeService.java | 28 +---- .../xpack/esql/session/EsqlSession.java | 103 +++++++++++------- 4 files changed, 71 insertions(+), 67 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index 64f068fe60cd5..20ff605866b81 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -139,6 +139,7 @@ public Long getRelativeStartNanos() { public void markEndPlanning() { assert planningTookTime == null : "markEndPlanning should only be called once"; + assert relativeStartNanos != null : "Relative start time must be set when markEndPlanning is called"; planningTookTime = new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS); } @@ -147,7 +148,7 @@ public TimeValue planningTookTime() { } public void markEndQuery() { - assert overallTook == null : "markEndQuery should only be called once"; + assert relativeStartNanos != null : "Relative start time must be set when markEndQuery is called"; overallTook = new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java index e5c2f99505ea7..e336770df45f7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java @@ -213,7 +213,9 @@ ActionListener acquireCompute(@Nullable String computeClusterAl TimeValue planningTookTime = esqlExecutionInfo.planningTookTime(); tookOnCluster = new TimeValue(planningTookTime.nanos() + remoteExecutionTime.nanos(), TimeUnit.NANOSECONDS); } else { - tookOnCluster = null; + // if the cluster is an older version and does not send back took time, then calculate it here on the coordinator + long remoteTook = System.nanoTime() - esqlExecutionInfo.getRelativeStartNanos(); + tookOnCluster = new TimeValue(remoteTook, TimeUnit.NANOSECONDS); } esqlExecutionInfo.swapCluster( computeClusterAlias, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index a3e84d9bcac21..3d654bfa66b50 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -136,7 +136,6 @@ public void execute( EsqlExecutionInfo execInfo, ActionListener listener ) { - updateExecutionInfoAtStartOfQueryExecution(execInfo); Tuple coordinatorAndDataNodePlan = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode( physicalPlan, configuration @@ -206,7 +205,7 @@ public void execute( Releasable ignored = exchangeSource.addEmptySink(); // this is the top level ComputeListener called once at the end (e.g., once all clusters have finished for a CCS) var computeListener = ComputeListener.create(local, transportService, rootTask, execInfo, listener.map(r -> { - execInfo.markEndQuery(); + execInfo.markEndQuery(); // TODO: revisit this time recording model as part of INLINESTATS improvements return new Result(outputAttributes, collectedPages, r.getProfiles(), execInfo); })) ) { @@ -246,32 +245,11 @@ public void execute( } } - // visible for testing - static void updateExecutionInfoAtStartOfQueryExecution(EsqlExecutionInfo execInfo) { - execInfo.markEndPlanning(); - for (String clusterAlias : execInfo.clusterAliases()) { - EsqlExecutionInfo.Cluster cluster = execInfo.getCluster(clusterAlias); - if (cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) { - execInfo.swapCluster( - clusterAlias, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.planningTookTime()) - .setTotalShards(0) - .setSuccessfulShards(0) - .setSkippedShards(0) - .setFailedShards(0) - .build() - ); - } - } - } - // For queries like: FROM logs*,remote*:logs* | LIMIT 0 private static void updateExecutionInfoAfterCoordinatorOnlyQuery(EsqlExecutionInfo execInfo) { - assert execInfo.getRelativeStartNanos() != null : "Relative start time should be set on EsqlExecutionInfo but is null"; - assert execInfo.planningTookTime() != null : "Planning took time should be set on EsqlExecutionInfo but is null"; - - execInfo.markEndQuery(); + execInfo.markEndQuery(); // TODO: revisit this time recording model as part of INLINESTATS improvements if (execInfo.isCrossClusterSearch()) { + assert execInfo.planningTookTime() != null : "Planning took time should be set on EsqlExecutionInfo but is null"; for (String clusterAlias : execInfo.clusterAliases()) { if (execInfo.getCluster(clusterAlias).getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) { execInfo.swapCluster( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index e1f4d4634c111..788b2827d7c8e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -163,6 +163,7 @@ public void executeOptimizedPlan( LogicalPlan firstPhase = Phased.extractFirstPhase(optimizedPlan); if (firstPhase == null) { runPhase.accept(logicalPlanToPhysicalPlan(optimizedPlan, request), listener); + updateExecutionInfoAtEndOfPlanning(executionInfo); } else { executePhased(new ArrayList<>(), optimizedPlan, request, executionInfo, firstPhase, runPhase, listener); } @@ -265,46 +266,6 @@ private void preAnalyze( })); } - // visible for testing - static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo executionInfo, Set unavailableClusters) { - for (String clusterAlias : unavailableClusters) { - executionInfo.swapCluster( - clusterAlias, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED).build() - ); - // TODO: follow-on PR will set SKIPPED status when skip_unavailable=true and throw an exception when skip_un=false - } - } - - // visible for testing - static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) { - Set clustersWithResolvedIndices = new HashSet<>(); - // determine missing clusters - for (String indexName : indexResolution.get().indexNameWithModes().keySet()) { - clustersWithResolvedIndices.add(RemoteClusterAware.parseClusterAlias(indexName)); - } - Set clustersRequested = executionInfo.clusterAliases(); - Set clustersWithNoMatchingIndices = Sets.difference(clustersRequested, clustersWithResolvedIndices); - clustersWithNoMatchingIndices.removeAll(indexResolution.getUnavailableClusters()); - /* - * These are clusters in the original request that are not present in the field-caps response. They were - * specified with an index or indices that do not exist, so the search on that cluster is done. - * Mark it as SKIPPED with 0 shards searched and took=0. - */ - for (String c : clustersWithNoMatchingIndices) { - executionInfo.swapCluster( - c, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED) - .setTook(new TimeValue(0)) - .setTotalShards(0) - .setSuccessfulShards(0) - .setSkippedShards(0) - .setFailedShards(0) - .build() - ); - } - } - private void preAnalyzeIndices( LogicalPlan parsed, EsqlExecutionInfo executionInfo, @@ -484,4 +445,66 @@ public PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) { LOGGER.debug("Optimized physical plan:\n{}", plan); return plan; } + + // visible for testing + static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo executionInfo, Set unavailableClusters) { + for (String clusterAlias : unavailableClusters) { + executionInfo.swapCluster( + clusterAlias, + (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED).build() + ); + // TODO: follow-on PR will set SKIPPED status when skip_unavailable=true and throw an exception when skip_un=false + } + } + + // visible for testing + static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) { + Set clustersWithResolvedIndices = new HashSet<>(); + // determine missing clusters + for (String indexName : indexResolution.get().indexNameWithModes().keySet()) { + clustersWithResolvedIndices.add(RemoteClusterAware.parseClusterAlias(indexName)); + } + Set clustersRequested = executionInfo.clusterAliases(); + Set clustersWithNoMatchingIndices = Sets.difference(clustersRequested, clustersWithResolvedIndices); + clustersWithNoMatchingIndices.removeAll(indexResolution.getUnavailableClusters()); + /* + * These are clusters in the original request that are not present in the field-caps response. They were + * specified with an index or indices that do not exist, so the search on that cluster is done. + * Mark it as SKIPPED with 0 shards searched and took=0. + */ + for (String c : clustersWithNoMatchingIndices) { + executionInfo.swapCluster( + c, + (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED) + .setTook(new TimeValue(0)) + .setTotalShards(0) + .setSuccessfulShards(0) + .setSkippedShards(0) + .setFailedShards(0) + .build() + ); + } + } + + // visible for testing + static void updateExecutionInfoAtEndOfPlanning(EsqlExecutionInfo execInfo) { + // TODO: this logic assumes a single phase execution model, so it may need to altered once INLINESTATS is made CCS compatible + if (execInfo.isCrossClusterSearch()) { + execInfo.markEndPlanning(); + for (String clusterAlias : execInfo.clusterAliases()) { + EsqlExecutionInfo.Cluster cluster = execInfo.getCluster(clusterAlias); + if (cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) { + execInfo.swapCluster( + clusterAlias, + (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.planningTookTime()) + .setTotalShards(0) + .setSuccessfulShards(0) + .setSkippedShards(0) + .setFailedShards(0) + .build() + ); + } + } + } + } } From e2b2283d5f72e88d893559013455ea24a17caa6a Mon Sep 17 00:00:00 2001 From: Michael Peterson Date: Thu, 17 Oct 2024 12:53:57 -0400 Subject: [PATCH 4/7] Added test for updateExecutionInfoAtEndOfPlanning --- .../xpack/esql/session/EsqlSessionTests.java | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionTests.java index 7e93213fcee21..32b31cf78650b 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionTests.java @@ -21,6 +21,7 @@ import java.util.Set; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; public class EsqlSessionTests extends ESTestCase { @@ -243,6 +244,50 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { } } + public void testUpdateExecutionInfoAtEndOfPlanning() { + String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; + String remote1Alias = "remote1"; + String remote2Alias = "remote2"; + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); + executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); + executionInfo.swapCluster( + remote1Alias, + (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true, EsqlExecutionInfo.Cluster.Status.SKIPPED) + ); + executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); + + assertNull(executionInfo.planningTookTime()); + assertNull(executionInfo.overallTook()); + try { + Thread.sleep(1); + } catch (InterruptedException e) {} + + EsqlSession.updateExecutionInfoAtEndOfPlanning(executionInfo); + + assertThat(executionInfo.planningTookTime().millis(), greaterThanOrEqualTo(0L)); + assertNull(executionInfo.overallTook()); + + // only remote1 should be altered, since it is the only one marked as SKIPPED when passed into updateExecutionInfoAtEndOfPlanning + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)); + assertNull(localCluster.getTotalShards()); + assertNull(localCluster.getTook()); + + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); + assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote1Cluster.getTotalShards(), equalTo(0)); + assertThat(remote1Cluster.getSuccessfulShards(), equalTo(0)); + assertThat(remote1Cluster.getSkippedShards(), equalTo(0)); + assertThat(remote1Cluster.getFailedShards(), equalTo(0)); + assertThat(remote1Cluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote1Cluster.getTook().millis(), equalTo(executionInfo.planningTookTime().millis())); + + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); + assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)); + assertNull(remote2Cluster.getTotalShards()); + assertNull(remote2Cluster.getTook()); + } + private void assertClusterStatusAndHasNullCounts(EsqlExecutionInfo.Cluster cluster, EsqlExecutionInfo.Cluster.Status status) { assertThat(cluster.getStatus(), equalTo(status)); assertNull(cluster.getTook()); From d192d8f0db2762a4b2cf0bc14b4e2cfff0341fd4 Mon Sep 17 00:00:00 2001 From: Michael Peterson Date: Thu, 17 Oct 2024 13:41:48 -0400 Subject: [PATCH 5/7] Added testAcquireComputeCCSListenerWithComputeResponseFromOlderCluster Removed shard counts in EsqlExecutionInfo for remotes where cluster is older. --- .../xpack/esql/action/EsqlExecutionInfo.java | 18 +++-- .../xpack/esql/plugin/ComputeListener.java | 80 +++++++++++-------- .../esql/plugin/ComputeListenerTests.java | 55 +++++++++++++ 3 files changed, 116 insertions(+), 37 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index 20ff605866b81..a888c7807d950 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -56,18 +56,18 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable { public static final ParseField DETAILS_FIELD = new ParseField("details"); public static final ParseField TOOK = new ParseField("took"); - // map key is clusterAlias on the primary querying cluster of a CCS minimize_roundtrips=true query - // the Map itself is immutable after construction - all Clusters will be accounted for at the start of the search - // updates to the Cluster occur with the updateCluster method that given the key to map transforms an + // Map key is clusterAlias on the primary querying cluster of a CCS minimize_roundtrips=true query + // The Map itself is immutable after construction - all Clusters will be accounted for at the start of the search. + // Updates to the Cluster occur with the updateCluster method that given the key to map transforms an // old Cluster Object to a new Cluster Object with the remapping function. public final Map clusterInfo; private TimeValue overallTook; // whether the user has asked for CCS metadata to be in the JSON response (the overall took will always be present) private final boolean includeCCSMetadata; - // fields that are not Writeable since it is only needed on the primary CCS coordinator + // fields that are not Writeable since they are only needed on the primary CCS coordinator private final transient Predicate skipUnavailablePredicate; - private final transient Long relativeStartNanos; + private final transient Long relativeStartNanos; // start time for an ESQL query for calculating took times // TODO: make this a SetOnce? private transient TimeValue planningTookTime; // time elapsed since start of query to calling ComputeService.execute @@ -137,6 +137,11 @@ public Long getRelativeStartNanos() { return relativeStartNanos; } + /** + * Call when ES|QL "planning" phase is complete and query execution (in ComputeService) is about to start. + * Note this is currently only built for a single phase planning/execution model. When INLINESTATS + * moves towards GA we may need to revisit this model. Currently, it should never be called more than once. + */ public void markEndPlanning() { assert planningTookTime == null : "markEndPlanning should only be called once"; assert relativeStartNanos != null : "Relative start time must be set when markEndPlanning is called"; @@ -147,6 +152,9 @@ public TimeValue planningTookTime() { return planningTookTime; } + /** + * Call when ES|QL execution is complete in order to set the overall took time for an ES|QL query. + */ public void markEndQuery() { assert relativeStartNanos != null : "Relative start time must be set when markEndQuery is called"; overallTook = new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java index e336770df45f7..49af4a593e6e5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java @@ -123,14 +123,16 @@ private ComputeListener( ); } else { result = new ComputeResponse(collectedProfiles.isEmpty() ? List.of() : collectedProfiles.stream().toList()); - if (coordinatingClusterIsSearchedInCCS() - && esqlExecutionInfo.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) - .getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) { - // mark local cluster as finished once the coordinator and all data nodes have finished processing - executionInfo.swapCluster( - RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL).build() - ); + if (coordinatingClusterIsSearchedInCCS()) { + // if not already marked as SKIPPED, mark the local cluster as finished once the coordinator and all + // data nodes have finished processing + executionInfo.swapCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, (k, v) -> { + if (v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) { + return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL).build(); + } else { + return v; + } + }); } } delegate.onResponse(result); @@ -206,34 +208,14 @@ ActionListener acquireCompute(@Nullable String computeClusterAl return null; } if (isCCSListener(computeClusterAlias)) { - // this is the callback for the listener to the CCS compute - TimeValue tookOnCluster; - if (resp.getTook() != null) { - TimeValue remoteExecutionTime = resp.getTook(); - TimeValue planningTookTime = esqlExecutionInfo.planningTookTime(); - tookOnCluster = new TimeValue(planningTookTime.nanos() + remoteExecutionTime.nanos(), TimeUnit.NANOSECONDS); - } else { - // if the cluster is an older version and does not send back took time, then calculate it here on the coordinator - long remoteTook = System.nanoTime() - esqlExecutionInfo.getRelativeStartNanos(); - tookOnCluster = new TimeValue(remoteTook, TimeUnit.NANOSECONDS); - } - esqlExecutionInfo.swapCluster( - computeClusterAlias, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v) - // for now ESQL doesn't return partial results, so set status to SUCCESSFUL - .setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL) - .setTook(tookOnCluster) - .setTotalShards(resp.getTotalShards()) - .setSuccessfulShards(resp.getSuccessfulShards()) - .setSkippedShards(resp.getSkippedShards()) - .setFailedShards(resp.getFailedShards()) - .build() - ); + // this is the callback for the listener on the primary coordinator that receives a remote ComputeResponse + updateExecutionInfoWithRemoteResponse(computeClusterAlias, resp); + } else if (shouldRecordTookTime()) { Long relativeStartNanos = esqlExecutionInfo.getRelativeStartNanos(); // handler for this cluster's data node and coordinator completion (runs on "local" and remote clusters) assert relativeStartNanos != null : "queryStartTimeNanos not set properly"; - TimeValue tookTime = new TimeValue(System.nanoTime() - relativeStartNanos.longValue(), TimeUnit.NANOSECONDS); + TimeValue tookTime = new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS); esqlExecutionInfo.swapCluster(computeClusterAlias, (k, v) -> { if (v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED && (v.getTook() == null || v.getTook().nanos() < tookTime.nanos())) { @@ -247,6 +229,40 @@ ActionListener acquireCompute(@Nullable String computeClusterAl }); } + private void updateExecutionInfoWithRemoteResponse(String computeClusterAlias, ComputeResponse resp) { + TimeValue tookOnCluster; + if (resp.getTook() != null) { + TimeValue remoteExecutionTime = resp.getTook(); + TimeValue planningTookTime = esqlExecutionInfo.planningTookTime(); + tookOnCluster = new TimeValue(planningTookTime.nanos() + remoteExecutionTime.nanos(), TimeUnit.NANOSECONDS); + esqlExecutionInfo.swapCluster( + computeClusterAlias, + (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v) + // for now ESQL doesn't return partial results, so set status to SUCCESSFUL + .setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL) + .setTook(tookOnCluster) + .setTotalShards(resp.getTotalShards()) + .setSuccessfulShards(resp.getSuccessfulShards()) + .setSkippedShards(resp.getSkippedShards()) + .setFailedShards(resp.getFailedShards()) + .build() + ); + } else { + // if the cluster is an older version and does not send back took time, then calculate it here on the coordinator + // and leave shard info unset, so it is not shown in the CCS metadata section of the JSON response + long remoteTook = System.nanoTime() - esqlExecutionInfo.getRelativeStartNanos(); + tookOnCluster = new TimeValue(remoteTook, TimeUnit.NANOSECONDS); + esqlExecutionInfo.swapCluster( + computeClusterAlias, + (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v) + // for now ESQL doesn't return partial results, so set status to SUCCESSFUL + .setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL) + .setTook(tookOnCluster) + .build() + ); + } + } + /** * Use this method when no execution metadata needs to be added to {@link EsqlExecutionInfo} */ diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java index edefcf217fec0..5fbd5dd28050f 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java @@ -194,6 +194,7 @@ public void testAcquireComputeCCSListener() { String remoteAlias = "rc1"; EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); executionInfo.swapCluster(remoteAlias, (k, v) -> new EsqlExecutionInfo.Cluster(remoteAlias, "logs*", false)); + executionInfo.markEndPlanning(); // set planning took time, so it can be used to calculate per-cluster took time try ( ComputeListener computeListener = ComputeListener.create( // 'whereRunning' for this test is the local cluster, waiting for a response from the remote cluster @@ -236,6 +237,60 @@ public void testAcquireComputeCCSListener() { Mockito.verifyNoInteractions(transportService.getTaskManager()); } + /** + * Tests the acquireCompute functionality running on the querying ("local") cluster, that is waiting upon + * a ComputeResponse from a remote cluster where we simulate connecting to a remote cluster running a version + * of ESQL that does not record and return CCS metadata. Ensure that the local cluster {@link EsqlExecutionInfo} + * is properly updated with took time and shard info is left unset. + */ + public void testAcquireComputeCCSListenerWithComputeResponseFromOlderCluster() { + PlainActionFuture future = new PlainActionFuture<>(); + List allProfiles = new ArrayList<>(); + String remoteAlias = "rc1"; + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); + executionInfo.swapCluster(remoteAlias, (k, v) -> new EsqlExecutionInfo.Cluster(remoteAlias, "logs*", false)); + executionInfo.markEndPlanning(); // set planning took time, so it can be used to calculate per-cluster took time + try ( + ComputeListener computeListener = ComputeListener.create( + // 'whereRunning' for this test is the local cluster, waiting for a response from the remote cluster + RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, + transportService, + newTask(), + executionInfo, + future + ) + ) { + int tasks = randomIntBetween(1, 5); + for (int t = 0; t < tasks; t++) { + ComputeResponse resp = randomResponse(false); // older clusters will not return CCS metadata in response + allProfiles.addAll(resp.getProfiles()); + // Use remoteAlias here to indicate what remote cluster alias the listener is waiting to hear back from + ActionListener subListener = computeListener.acquireCompute(remoteAlias); + threadPool.schedule( + ActionRunnable.wrap(subListener, l -> l.onResponse(resp)), + TimeValue.timeValueNanos(between(0, 100)), + threadPool.generic() + ); + } + } + ComputeResponse response = future.actionGet(10, TimeUnit.SECONDS); + assertThat( + response.getProfiles().stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)), + equalTo(allProfiles.stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum))) + ); + + assertTrue(executionInfo.isCrossClusterSearch()); + EsqlExecutionInfo.Cluster rc1Cluster = executionInfo.getCluster(remoteAlias); + assertThat(rc1Cluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertNull(rc1Cluster.getTotalShards()); + assertNull(rc1Cluster.getSuccessfulShards()); + assertNull(rc1Cluster.getSkippedShards()); + assertNull(rc1Cluster.getFailedShards()); + assertThat(rc1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + + Mockito.verifyNoInteractions(transportService.getTaskManager()); + } + /** * Run an acquireCompute cycle on the RemoteCluster. * AcquireCompute will fill in the took time on the EsqlExecutionInfo (the shard info is filled in before this, From c43060f45e4e6fe512559c9978ffea14b1b2dd0a Mon Sep 17 00:00:00 2001 From: Michael Peterson Date: Thu, 17 Oct 2024 14:48:07 -0400 Subject: [PATCH 6/7] Minor cleanup --- .../org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java | 2 -- .../org/elasticsearch/xpack/esql/plugin/ComputeService.java | 3 ++- .../xpack/esql/plugin/TransportEsqlQueryAction.java | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index a888c7807d950..aeac14091f378 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -68,7 +68,6 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable { // fields that are not Writeable since they are only needed on the primary CCS coordinator private final transient Predicate skipUnavailablePredicate; private final transient Long relativeStartNanos; // start time for an ESQL query for calculating took times - // TODO: make this a SetOnce? private transient TimeValue planningTookTime; // time elapsed since start of query to calling ComputeService.execute public EsqlExecutionInfo(boolean includeCCSMetadata) { @@ -160,7 +159,6 @@ public void markEndQuery() { overallTook = new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS); } - // TODO: remove this? // for testing only - use markEndQuery in production code void overallTook(TimeValue took) { this.overallTook = took; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 3d654bfa66b50..108e70d7d3a50 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -245,12 +245,13 @@ public void execute( } } - // For queries like: FROM logs*,remote*:logs* | LIMIT 0 + // For queries like: FROM logs* | LIMIT 0 (including cross-cluster LIMIT 0 queries) private static void updateExecutionInfoAfterCoordinatorOnlyQuery(EsqlExecutionInfo execInfo) { execInfo.markEndQuery(); // TODO: revisit this time recording model as part of INLINESTATS improvements if (execInfo.isCrossClusterSearch()) { assert execInfo.planningTookTime() != null : "Planning took time should be set on EsqlExecutionInfo but is null"; for (String clusterAlias : execInfo.clusterAliases()) { + // took time and shard counts for SKIPPED clusters were added at end of planning, so only update other cases here if (execInfo.getCluster(clusterAlias).getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) { execInfo.swapCluster( clusterAlias, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index 193930cdf711d..ab0e90a97254e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -164,7 +164,7 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener Date: Fri, 18 Oct 2024 09:32:01 -0400 Subject: [PATCH 7/7] PR feedback --- .../xpack/esql/plugin/TransportEsqlQueryAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index ab0e90a97254e..193930cdf711d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -164,7 +164,7 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener