diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index f115d0ac8bf2c..b71b8a15e72fa 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -67,6 +67,8 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable { // Updates to the Cluster occur with the updateCluster method that given the key to map transforms an // old Cluster Object to a new Cluster Object with the remapping function. public final ConcurrentMap clusterInfo; + // Is the clusterInfo map iinitialization in progress? If so, we should not try to serialize it. + private transient volatile boolean clusterInfoInitializing; // whether the user has asked for CCS metadata to be in the JSON response (the overall took will always be present) private final boolean includeCCSMetadata; @@ -124,10 +126,8 @@ public EsqlExecutionInfo(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { out.writeOptionalTimeValue(overallTook); - if (clusterInfo != null) { - // .stream().toList() creates an immutable copy of the cluster info entries - // as today they might be still changing while serialization is happening - out.writeCollection(clusterInfo.values().stream().toList()); + if (clusterInfo != null && clusterInfoInitializing == false) { + out.writeCollection(clusterInfo.values()); } else { out.writeCollection(Collections.emptyList()); } @@ -354,6 +354,10 @@ public boolean isStopped() { return isStopped; } + public void clusterInfoInitializing(boolean clusterInfoInitializing) { + this.clusterInfoInitializing = clusterInfoInitializing; + } + /** * Represents the search metadata about a particular cluster involved in a cross-cluster search. * The Cluster object can represent either the local cluster or a remote cluster. diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index ce8915af0fc69..a4a8ed63a1a03 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -338,15 +338,20 @@ public static void initCrossClusterState( patterns.getFirst().indexPattern() ); + executionInfo.clusterInfoInitializing(true); // initialize the cluster entries in EsqlExecutionInfo before throwing the invalid license error // so that the CCS telemetry handler can recognize that this error is CCS-related - for (var entry : groupedIndices.entrySet()) { - final String clusterAlias = entry.getKey(); - final String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices()); - executionInfo.swapCluster(clusterAlias, (k, v) -> { - assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet"; - return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.shouldSkipOnFailure(clusterAlias)); - }); + try { + for (var entry : groupedIndices.entrySet()) { + final String clusterAlias = entry.getKey(); + final String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices()); + executionInfo.swapCluster(clusterAlias, (k, v) -> { + assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet"; + return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.shouldSkipOnFailure(clusterAlias)); + }); + } + } finally { + executionInfo.clusterInfoInitializing(false); } // check if it is a cross-cluster query