Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Predicate;

Expand All @@ -55,29 +56,33 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
public static final ParseField DETAILS_FIELD = new ParseField("details");
public static final ParseField TOOK = new ParseField("took");

// map key is clusterAlias on the primary querying cluster of a CCS minimize_roundtrips=true query
// the Map itself is immutable after construction - all Clusters will be accounted for at the start of the search
// updates to the Cluster occur with the updateCluster method that given the key to map transforms an
// Map key is clusterAlias on the primary querying cluster of a CCS minimize_roundtrips=true query
// The Map itself is immutable after construction - all Clusters will be accounted for at the start of the search.
// Updates to the Cluster occur with the updateCluster method that given the key to map transforms an
// old Cluster Object to a new Cluster Object with the remapping function.
public final Map<String, Cluster> clusterInfo;
// not Writeable since it is only needed on the primary CCS coordinator
private final transient Predicate<String> skipUnavailablePredicate;
private TimeValue overallTook;

// whether the user has asked for CCS metadata to be in the JSON response (the overall took will always be present)
private final boolean includeCCSMetadata;

// fields that are not Writeable since they are only needed on the primary CCS coordinator
private final transient Predicate<String> skipUnavailablePredicate;
private final transient Long relativeStartNanos; // start time for an ESQL query for calculating took times
private transient TimeValue planningTookTime; // time elapsed since start of query to calling ComputeService.execute

public EsqlExecutionInfo(boolean includeCCSMetadata) {
this(Predicates.always(), includeCCSMetadata); // default all clusters to skip_unavailable=true
}

/**
* @param skipUnavailablePredicate provide lookup for whether a given cluster has skip_unavailable set to true or false
* @param includeCCSMetadata (user defined setting) whether to include the CCS metadata in the HTTP response
*/
public EsqlExecutionInfo(Predicate<String> skipUnavailablePredicate, boolean includeCCSMetadata) {
this.clusterInfo = ConcurrentCollections.newConcurrentMap();
this.skipUnavailablePredicate = skipUnavailablePredicate;
this.includeCCSMetadata = includeCCSMetadata;
this.relativeStartNanos = System.nanoTime();
}

/**
Expand All @@ -88,6 +93,7 @@ public EsqlExecutionInfo(Predicate<String> skipUnavailablePredicate, boolean inc
this.clusterInfo = clusterInfo;
this.includeCCSMetadata = includeCCSMetadata;
this.skipUnavailablePredicate = Predicates.always();
this.relativeStartNanos = null;
}

public EsqlExecutionInfo(StreamInput in) throws IOException {
Expand All @@ -106,6 +112,7 @@ public EsqlExecutionInfo(StreamInput in) throws IOException {
this.includeCCSMetadata = false;
}
this.skipUnavailablePredicate = Predicates.always();
this.relativeStartNanos = null;
}

@Override
Expand All @@ -125,7 +132,35 @@ public boolean includeCCSMetadata() {
return includeCCSMetadata;
}

public void overallTook(TimeValue took) {
public Long getRelativeStartNanos() {
return relativeStartNanos;
}

/**
* Call when ES|QL "planning" phase is complete and query execution (in ComputeService) is about to start.
* Note this is currently only built for a single phase planning/execution model. When INLINESTATS
* moves towards GA we may need to revisit this model. Currently, it should never be called more than once.
*/
public void markEndPlanning() {
assert planningTookTime == null : "markEndPlanning should only be called once";
assert relativeStartNanos != null : "Relative start time must be set when markEndPlanning is called";
planningTookTime = new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS);
}

public TimeValue planningTookTime() {
return planningTookTime;
}

/**
* Call when ES|QL execution is complete in order to set the overall took time for an ES|QL query.
*/
public void markEndQuery() {
assert relativeStartNanos != null : "Relative start time must be set when markEndQuery is called";
overallTook = new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can use timeValueNanos for brevity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean for relativeStartNanos? I prefer the term I used because it emphasizes that this is for relative time (duration) calculations only not related to wall clock time, which is the mental model block that resulted in this bug.

And I took the term from TransportSearchAction.SearchTimeProvider to indicate it has the same function as that field. I considered using SearchTimeProvider here as well, but I haven't found a need for the absoluteStartMillis field in ESQL yet, so I just went with the single field option here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I mean using the method TimeValue.timeValueNanos

}

// for testing only - use markEndQuery in production code
void overallTook(TimeValue took) {
this.overallTook = took;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ final class ComputeListener implements Releasable {
private final List<DriverProfile> collectedProfiles;
private final ResponseHeadersCollector responseHeaders;
private final EsqlExecutionInfo esqlExecutionInfo;
private final long queryStartTimeNanos;
// clusterAlias indicating where this ComputeListener is running
// used by the top level ComputeListener in ComputeService on both local and remote clusters
private final String whereRunning;
Expand All @@ -61,7 +60,7 @@ public static ComputeListener create(
CancellableTask task,
ActionListener<ComputeResponse> delegate
) {
return new ComputeListener(transportService, task, null, null, -1, delegate);
return new ComputeListener(transportService, task, null, null, delegate);
}

/**
Expand All @@ -75,34 +74,30 @@ public static ComputeListener create(
* @param transportService
* @param task
* @param executionInfo {@link EsqlExecutionInfo} to capture execution metadata
* @param queryStartTimeNanos Start time of the ES|QL query (stored in {@link org.elasticsearch.xpack.esql.session.Configuration})
* @param delegate
*/
public static ComputeListener create(
String clusterAlias,
TransportService transportService,
CancellableTask task,
EsqlExecutionInfo executionInfo,
long queryStartTimeNanos,
ActionListener<ComputeResponse> delegate
) {
return new ComputeListener(transportService, task, clusterAlias, executionInfo, queryStartTimeNanos, delegate);
return new ComputeListener(transportService, task, clusterAlias, executionInfo, delegate);
}

private ComputeListener(
TransportService transportService,
CancellableTask task,
String clusterAlias,
EsqlExecutionInfo executionInfo,
long queryStartTimeNanos,
ActionListener<ComputeResponse> delegate
) {
this.transportService = transportService;
this.task = task;
this.responseHeaders = new ResponseHeadersCollector(transportService.getThreadPool().getThreadContext());
this.collectedProfiles = Collections.synchronizedList(new ArrayList<>());
this.esqlExecutionInfo = executionInfo;
this.queryStartTimeNanos = queryStartTimeNanos;
this.whereRunning = clusterAlias;
// for the DataNodeHandler ComputeListener, clusterAlias and executionInfo will be null
// for the top level ComputeListener in ComputeService both will be non-null
Expand All @@ -129,11 +124,15 @@ private ComputeListener(
} else {
result = new ComputeResponse(collectedProfiles.isEmpty() ? List.of() : collectedProfiles.stream().toList());
if (coordinatingClusterIsSearchedInCCS()) {
// mark local cluster as finished once the coordinator and all data nodes have finished processing
executionInfo.swapCluster(
RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL).build()
);
// if not already marked as SKIPPED, mark the local cluster as finished once the coordinator and all
// data nodes have finished processing
executionInfo.swapCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, (k, v) -> {
if (v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) {
return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL).build();
} else {
return v;
}
});
}
}
delegate.onResponse(result);
Expand Down Expand Up @@ -196,8 +195,8 @@ ActionListener<Void> acquireAvoid() {
* info to be gathered (namely, the DataNodeRequestHandler ComputeListener) should pass in null.
*/
ActionListener<ComputeResponse> acquireCompute(@Nullable String computeClusterAlias) {
assert computeClusterAlias == null || (esqlExecutionInfo != null && queryStartTimeNanos > 0)
: "When clusterAlias is provided to acquireCompute, executionInfo must be non-null and queryStartTimeNanos must be positive";
assert computeClusterAlias == null || (esqlExecutionInfo != null && esqlExecutionInfo.getRelativeStartNanos() != null)
: "When clusterAlias is provided to acquireCompute, executionInfo and relativeStartTimeNanos must be non-null";

return acquireAvoid().map(resp -> {
responseHeaders.collect();
Expand All @@ -209,24 +208,17 @@ ActionListener<ComputeResponse> acquireCompute(@Nullable String computeClusterAl
return null;
}
if (isCCSListener(computeClusterAlias)) {
// this is the callback for the listener to the CCS compute
esqlExecutionInfo.swapCluster(
computeClusterAlias,
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v)
// for now ESQL doesn't return partial results, so set status to SUCCESSFUL
.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)
.setTook(resp.getTook())
.setTotalShards(resp.getTotalShards())
.setSuccessfulShards(resp.getSuccessfulShards())
.setSkippedShards(resp.getSkippedShards())
.setFailedShards(resp.getFailedShards())
.build()
);
// this is the callback for the listener on the primary coordinator that receives a remote ComputeResponse
updateExecutionInfoWithRemoteResponse(computeClusterAlias, resp);

} else if (shouldRecordTookTime()) {
Long relativeStartNanos = esqlExecutionInfo.getRelativeStartNanos();
// handler for this cluster's data node and coordinator completion (runs on "local" and remote clusters)
TimeValue tookTime = new TimeValue(System.nanoTime() - queryStartTimeNanos, TimeUnit.NANOSECONDS);
assert relativeStartNanos != null : "queryStartTimeNanos not set properly";
TimeValue tookTime = new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS);
esqlExecutionInfo.swapCluster(computeClusterAlias, (k, v) -> {
if (v.getTook() == null || v.getTook().nanos() < tookTime.nanos()) {
if (v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED
&& (v.getTook() == null || v.getTook().nanos() < tookTime.nanos())) {
return new EsqlExecutionInfo.Cluster.Builder(v).setTook(tookTime).build();
} else {
return v;
Expand All @@ -237,6 +229,40 @@ ActionListener<ComputeResponse> acquireCompute(@Nullable String computeClusterAl
});
}

private void updateExecutionInfoWithRemoteResponse(String computeClusterAlias, ComputeResponse resp) {
TimeValue tookOnCluster;
if (resp.getTook() != null) {
TimeValue remoteExecutionTime = resp.getTook();
TimeValue planningTookTime = esqlExecutionInfo.planningTookTime();
tookOnCluster = new TimeValue(planningTookTime.nanos() + remoteExecutionTime.nanos(), TimeUnit.NANOSECONDS);
esqlExecutionInfo.swapCluster(
computeClusterAlias,
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v)
// for now ESQL doesn't return partial results, so set status to SUCCESSFUL
.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)
.setTook(tookOnCluster)
.setTotalShards(resp.getTotalShards())
.setSuccessfulShards(resp.getSuccessfulShards())
.setSkippedShards(resp.getSkippedShards())
.setFailedShards(resp.getFailedShards())
.build()
);
} else {
// if the cluster is an older version and does not send back took time, then calculate it here on the coordinator
// and leave shard info unset, so it is not shown in the CCS metadata section of the JSON response
long remoteTook = System.nanoTime() - esqlExecutionInfo.getRelativeStartNanos();
tookOnCluster = new TimeValue(remoteTook, TimeUnit.NANOSECONDS);
esqlExecutionInfo.swapCluster(
computeClusterAlias,
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v)
// for now ESQL doesn't return partial results, so set status to SUCCESSFUL
.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)
.setTook(tookOnCluster)
.build()
);
}
}

/**
* Use this method when no execution metadata needs to be added to {@link EsqlExecutionInfo}
*/
Expand Down
Loading