Skip to content

Commit be2fcc7

Browse files
authored
[8.16] ES|QL per-cluster took time is incorrectly calculated and causes fatal exceptions (#115017) (#115124)
* ES|QL per-cluster took time is incorrectly calculated and causes fatal exceptions (#115017) The model for calculating per-cluster `took` times from remote clusters in #112595 was flawed. It attempted to use Java's System.nanoTime between the local and remote clusters, which is not safe. This results in per-cluster took times that have arbitrary (invalid) values including negative values which cause exceptions to be thrown by the `TimeValue` constructor. (Note: the overall took time calculation was done correctly, so it was the remote per-cluster took times that were flawed.) In this PR, I've done a redesign to address this. A key decision of this re-design was whether to always calculate took times only on the querying cluster (bypassing this whole problem) or to continue to allow the remote clusters to calculate their own took times for the remote processing and report that back to the querying cluster via the `ComputeResponse`. I decided in favor of having remote clusters compute their own took times for the remote processing and to additionally track "planning" time (encompassing field-caps and policy enrich remote calls), so that total per-cluster took time is a combination of the two. In _search, remote cluster took times are calculated entirely on the remote cluster, so network time is not included in the per-cluster took times. This has been helpful in diagnosing issues on user environments because if you see an overall took time that is significantly larger than the per cluster took times, that may indicate a network issue, which has happened in diagnosing cross-cluster issues in _search. I moved relative time tracking into `EsqlExecutionInfo`. The "planning time" marker is currently only used in cross-cluster searches, so it will conflict with the INLINESTATS 2 phase model (where planning can be done twice). We will improve this design to handle a 2 phase model in a later ticket, as part of the INLINESTATS work. I tested the current overall took time calculation model with local-only INLINESTATS queries and they work correctly. I also fixed another secondary bug in this PR. If the remote cluster is an older version that does not return took time (and shard info) in the ComputeResponse, the per-cluster took time is then calculated on the querying cluster as a fallback. Finally, I fixed some minor inconsistencies about whether the `_shards` info is shown in the response. The rule now is that `_shards` is always shown with 0 shards for SKIPPED clusters, with actual counts for SUCCESSFUL clusters and for remotes running an older version that doesn't report shard stats, the `_shards` field is left out of the XContent response. Fixes #115022 * Added fix for #115127 into this since can't get the build to pass
1 parent 1890fdf commit be2fcc7

File tree

7 files changed

+331
-168
lines changed

7 files changed

+331
-168
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java

Lines changed: 48 additions & 26 deletions
Large diffs are not rendered by default.

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.Objects;
3434
import java.util.Set;
3535
import java.util.concurrent.ConcurrentMap;
36+
import java.util.concurrent.TimeUnit;
3637
import java.util.function.BiFunction;
3738
import java.util.function.Predicate;
3839

@@ -55,29 +56,33 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
5556
public static final ParseField DETAILS_FIELD = new ParseField("details");
5657
public static final ParseField TOOK = new ParseField("took");
5758

58-
// map key is clusterAlias on the primary querying cluster of a CCS minimize_roundtrips=true query
59-
// the Map itself is immutable after construction - all Clusters will be accounted for at the start of the search
60-
// updates to the Cluster occur with the updateCluster method that given the key to map transforms an
59+
// Map key is clusterAlias on the primary querying cluster of a CCS minimize_roundtrips=true query
60+
// The Map itself is immutable after construction - all Clusters will be accounted for at the start of the search.
61+
// Updates to the Cluster occur with the updateCluster method that given the key to map transforms an
6162
// old Cluster Object to a new Cluster Object with the remapping function.
6263
public final Map<String, Cluster> clusterInfo;
63-
// not Writeable since it is only needed on the primary CCS coordinator
64-
private final transient Predicate<String> skipUnavailablePredicate;
6564
private TimeValue overallTook;
66-
6765
// whether the user has asked for CCS metadata to be in the JSON response (the overall took will always be present)
6866
private final boolean includeCCSMetadata;
6967

68+
// fields that are not Writeable since they are only needed on the primary CCS coordinator
69+
private final transient Predicate<String> skipUnavailablePredicate;
70+
private final transient Long relativeStartNanos; // start time for an ESQL query for calculating took times
71+
private transient TimeValue planningTookTime; // time elapsed since start of query to calling ComputeService.execute
72+
7073
public EsqlExecutionInfo(boolean includeCCSMetadata) {
7174
this(Predicates.always(), includeCCSMetadata); // default all clusters to skip_unavailable=true
7275
}
7376

7477
/**
7578
* @param skipUnavailablePredicate provide lookup for whether a given cluster has skip_unavailable set to true or false
79+
* @param includeCCSMetadata (user defined setting) whether to include the CCS metadata in the HTTP response
7680
*/
7781
public EsqlExecutionInfo(Predicate<String> skipUnavailablePredicate, boolean includeCCSMetadata) {
7882
this.clusterInfo = ConcurrentCollections.newConcurrentMap();
7983
this.skipUnavailablePredicate = skipUnavailablePredicate;
8084
this.includeCCSMetadata = includeCCSMetadata;
85+
this.relativeStartNanos = System.nanoTime();
8186
}
8287

8388
/**
@@ -88,6 +93,7 @@ public EsqlExecutionInfo(Predicate<String> skipUnavailablePredicate, boolean inc
8893
this.clusterInfo = clusterInfo;
8994
this.includeCCSMetadata = includeCCSMetadata;
9095
this.skipUnavailablePredicate = Predicates.always();
96+
this.relativeStartNanos = null;
9197
}
9298

9399
public EsqlExecutionInfo(StreamInput in) throws IOException {
@@ -106,6 +112,7 @@ public EsqlExecutionInfo(StreamInput in) throws IOException {
106112
this.includeCCSMetadata = false;
107113
}
108114
this.skipUnavailablePredicate = Predicates.always();
115+
this.relativeStartNanos = null;
109116
}
110117

111118
@Override
@@ -125,7 +132,35 @@ public boolean includeCCSMetadata() {
125132
return includeCCSMetadata;
126133
}
127134

128-
public void overallTook(TimeValue took) {
135+
public Long getRelativeStartNanos() {
136+
return relativeStartNanos;
137+
}
138+
139+
/**
140+
* Call when ES|QL "planning" phase is complete and query execution (in ComputeService) is about to start.
141+
* Note this is currently only built for a single phase planning/execution model. When INLINESTATS
142+
* moves towards GA we may need to revisit this model. Currently, it should never be called more than once.
143+
*/
144+
public void markEndPlanning() {
145+
assert planningTookTime == null : "markEndPlanning should only be called once";
146+
assert relativeStartNanos != null : "Relative start time must be set when markEndPlanning is called";
147+
planningTookTime = new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS);
148+
}
149+
150+
public TimeValue planningTookTime() {
151+
return planningTookTime;
152+
}
153+
154+
/**
155+
* Call when ES|QL execution is complete in order to set the overall took time for an ES|QL query.
156+
*/
157+
public void markEndQuery() {
158+
assert relativeStartNanos != null : "Relative start time must be set when markEndQuery is called";
159+
overallTook = new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS);
160+
}
161+
162+
// for testing only - use markEndQuery in production code
163+
void overallTook(TimeValue took) {
129164
this.overallTook = took;
130165
}
131166

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java

Lines changed: 55 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ final class ComputeListener implements Releasable {
4747
private final List<DriverProfile> collectedProfiles;
4848
private final ResponseHeadersCollector responseHeaders;
4949
private final EsqlExecutionInfo esqlExecutionInfo;
50-
private final long queryStartTimeNanos;
5150
// clusterAlias indicating where this ComputeListener is running
5251
// used by the top level ComputeListener in ComputeService on both local and remote clusters
5352
private final String whereRunning;
@@ -61,7 +60,7 @@ public static ComputeListener create(
6160
CancellableTask task,
6261
ActionListener<ComputeResponse> delegate
6362
) {
64-
return new ComputeListener(transportService, task, null, null, -1, delegate);
63+
return new ComputeListener(transportService, task, null, null, delegate);
6564
}
6665

6766
/**
@@ -75,34 +74,30 @@ public static ComputeListener create(
7574
* @param transportService
7675
* @param task
7776
* @param executionInfo {@link EsqlExecutionInfo} to capture execution metadata
78-
* @param queryStartTimeNanos Start time of the ES|QL query (stored in {@link org.elasticsearch.xpack.esql.session.Configuration})
7977
* @param delegate
8078
*/
8179
public static ComputeListener create(
8280
String clusterAlias,
8381
TransportService transportService,
8482
CancellableTask task,
8583
EsqlExecutionInfo executionInfo,
86-
long queryStartTimeNanos,
8784
ActionListener<ComputeResponse> delegate
8885
) {
89-
return new ComputeListener(transportService, task, clusterAlias, executionInfo, queryStartTimeNanos, delegate);
86+
return new ComputeListener(transportService, task, clusterAlias, executionInfo, delegate);
9087
}
9188

9289
private ComputeListener(
9390
TransportService transportService,
9491
CancellableTask task,
9592
String clusterAlias,
9693
EsqlExecutionInfo executionInfo,
97-
long queryStartTimeNanos,
9894
ActionListener<ComputeResponse> delegate
9995
) {
10096
this.transportService = transportService;
10197
this.task = task;
10298
this.responseHeaders = new ResponseHeadersCollector(transportService.getThreadPool().getThreadContext());
10399
this.collectedProfiles = Collections.synchronizedList(new ArrayList<>());
104100
this.esqlExecutionInfo = executionInfo;
105-
this.queryStartTimeNanos = queryStartTimeNanos;
106101
this.whereRunning = clusterAlias;
107102
// for the DataNodeHandler ComputeListener, clusterAlias and executionInfo will be null
108103
// for the top level ComputeListener in ComputeService both will be non-null
@@ -129,11 +124,15 @@ private ComputeListener(
129124
} else {
130125
result = new ComputeResponse(collectedProfiles.isEmpty() ? List.of() : collectedProfiles.stream().toList());
131126
if (coordinatingClusterIsSearchedInCCS()) {
132-
// mark local cluster as finished once the coordinator and all data nodes have finished processing
133-
executionInfo.swapCluster(
134-
RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
135-
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL).build()
136-
);
127+
// if not already marked as SKIPPED, mark the local cluster as finished once the coordinator and all
128+
// data nodes have finished processing
129+
executionInfo.swapCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, (k, v) -> {
130+
if (v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) {
131+
return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL).build();
132+
} else {
133+
return v;
134+
}
135+
});
137136
}
138137
}
139138
delegate.onResponse(result);
@@ -196,8 +195,8 @@ ActionListener<Void> acquireAvoid() {
196195
* info to be gathered (namely, the DataNodeRequestHandler ComputeListener) should pass in null.
197196
*/
198197
ActionListener<ComputeResponse> acquireCompute(@Nullable String computeClusterAlias) {
199-
assert computeClusterAlias == null || (esqlExecutionInfo != null && queryStartTimeNanos > 0)
200-
: "When clusterAlias is provided to acquireCompute, executionInfo must be non-null and queryStartTimeNanos must be positive";
198+
assert computeClusterAlias == null || (esqlExecutionInfo != null && esqlExecutionInfo.getRelativeStartNanos() != null)
199+
: "When clusterAlias is provided to acquireCompute, executionInfo and relativeStartTimeNanos must be non-null";
201200

202201
return acquireAvoid().map(resp -> {
203202
responseHeaders.collect();
@@ -209,24 +208,17 @@ ActionListener<ComputeResponse> acquireCompute(@Nullable String computeClusterAl
209208
return null;
210209
}
211210
if (isCCSListener(computeClusterAlias)) {
212-
// this is the callback for the listener to the CCS compute
213-
esqlExecutionInfo.swapCluster(
214-
computeClusterAlias,
215-
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v)
216-
// for now ESQL doesn't return partial results, so set status to SUCCESSFUL
217-
.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)
218-
.setTook(resp.getTook())
219-
.setTotalShards(resp.getTotalShards())
220-
.setSuccessfulShards(resp.getSuccessfulShards())
221-
.setSkippedShards(resp.getSkippedShards())
222-
.setFailedShards(resp.getFailedShards())
223-
.build()
224-
);
211+
// this is the callback for the listener on the primary coordinator that receives a remote ComputeResponse
212+
updateExecutionInfoWithRemoteResponse(computeClusterAlias, resp);
213+
225214
} else if (shouldRecordTookTime()) {
215+
Long relativeStartNanos = esqlExecutionInfo.getRelativeStartNanos();
226216
// handler for this cluster's data node and coordinator completion (runs on "local" and remote clusters)
227-
TimeValue tookTime = new TimeValue(System.nanoTime() - queryStartTimeNanos, TimeUnit.NANOSECONDS);
217+
assert relativeStartNanos != null : "queryStartTimeNanos not set properly";
218+
TimeValue tookTime = new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS);
228219
esqlExecutionInfo.swapCluster(computeClusterAlias, (k, v) -> {
229-
if (v.getTook() == null || v.getTook().nanos() < tookTime.nanos()) {
220+
if (v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED
221+
&& (v.getTook() == null || v.getTook().nanos() < tookTime.nanos())) {
230222
return new EsqlExecutionInfo.Cluster.Builder(v).setTook(tookTime).build();
231223
} else {
232224
return v;
@@ -237,6 +229,40 @@ ActionListener<ComputeResponse> acquireCompute(@Nullable String computeClusterAl
237229
});
238230
}
239231

232+
private void updateExecutionInfoWithRemoteResponse(String computeClusterAlias, ComputeResponse resp) {
233+
TimeValue tookOnCluster;
234+
if (resp.getTook() != null) {
235+
TimeValue remoteExecutionTime = resp.getTook();
236+
TimeValue planningTookTime = esqlExecutionInfo.planningTookTime();
237+
tookOnCluster = new TimeValue(planningTookTime.nanos() + remoteExecutionTime.nanos(), TimeUnit.NANOSECONDS);
238+
esqlExecutionInfo.swapCluster(
239+
computeClusterAlias,
240+
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v)
241+
// for now ESQL doesn't return partial results, so set status to SUCCESSFUL
242+
.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)
243+
.setTook(tookOnCluster)
244+
.setTotalShards(resp.getTotalShards())
245+
.setSuccessfulShards(resp.getSuccessfulShards())
246+
.setSkippedShards(resp.getSkippedShards())
247+
.setFailedShards(resp.getFailedShards())
248+
.build()
249+
);
250+
} else {
251+
// if the cluster is an older version and does not send back took time, then calculate it here on the coordinator
252+
// and leave shard info unset, so it is not shown in the CCS metadata section of the JSON response
253+
long remoteTook = System.nanoTime() - esqlExecutionInfo.getRelativeStartNanos();
254+
tookOnCluster = new TimeValue(remoteTook, TimeUnit.NANOSECONDS);
255+
esqlExecutionInfo.swapCluster(
256+
computeClusterAlias,
257+
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v)
258+
// for now ESQL doesn't return partial results, so set status to SUCCESSFUL
259+
.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)
260+
.setTook(tookOnCluster)
261+
.build()
262+
);
263+
}
264+
}
265+
240266
/**
241267
* Use this method when no execution metadata needs to be added to {@link EsqlExecutionInfo}
242268
*/

0 commit comments

Comments
 (0)