From 40215fb095dd1aa2e2ba5cd2b8a8b92aa8db4892 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Tue, 12 Aug 2025 10:33:20 -0600 Subject: [PATCH 1/4] Don't try to serialize half-baked cluster info --- .../xpack/esql/action/EsqlExecutionInfo.java | 10 +++++++++- .../elasticsearch/xpack/esql/session/EsqlCCSUtils.java | 2 +- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index 9ac56ad433872..96d04c6adbb19 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -67,6 +67,8 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable { // Updates to the Cluster occur with the updateCluster method that given the key to map transforms an // old Cluster Object to a new Cluster Object with the remapping function. public final ConcurrentMap clusterInfo; + // Did we initialize the clusterInfo map? If not, then we will serialize it as empty. + private transient volatile boolean clusterInfoInitialized = false; // whether the user has asked for CCS metadata to be in the JSON response (the overall took will always be present) private final boolean includeCCSMetadata; @@ -104,11 +106,13 @@ public EsqlExecutionInfo(Predicate skipUnavailablePredicate, boolean inc this.includeCCSMetadata = includeCCSMetadata; this.skipUnavailablePredicate = Predicates.always(); this.relativeStart = null; + this.clusterInfoInitialized = true; } public EsqlExecutionInfo(StreamInput in) throws IOException { this.overallTook = in.readOptionalTimeValue(); this.clusterInfo = in.readMapValues(EsqlExecutionInfo.Cluster::new, Cluster::getClusterAlias, ConcurrentHashMap::new); + this.clusterInfoInitialized = true; this.includeCCSMetadata = in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) ? in.readBoolean() : false; this.isPartial = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_RESPONSE_PARTIAL) ? in.readBoolean() : false; this.skipUnavailablePredicate = Predicates.always(); @@ -123,7 +127,7 @@ public EsqlExecutionInfo(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { out.writeOptionalTimeValue(overallTook); - if (clusterInfo != null) { + if (clusterInfo != null && clusterInfoInitialized) { out.writeCollection(clusterInfo.values()); } else { out.writeCollection(Collections.emptyList()); @@ -350,6 +354,10 @@ public boolean isStopped() { return isStopped; } + public void clusterInfoInitialized() { + this.clusterInfoInitialized = true; + } + /** * Represents the search metadata about a particular cluster involved in a cross-cluster search. * The Cluster object can represent either the local cluster or a remote cluster. diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index 80c08d145d9bb..9bc8df5af31a7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -347,7 +347,7 @@ public static void initCrossClusterState( return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.isSkipUnavailable(clusterAlias)); }); } - + executionInfo.clusterInfoInitialized(); // check if it is a cross-cluster query if (groupedIndices.size() > 1 || groupedIndices.containsKey(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY) == false) { if (EsqlLicenseChecker.isCcsAllowed(licenseState) == false) { From 9ef4173f4652aa796967a39e00383830355bdc35 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Tue, 12 Aug 2025 15:04:54 -0600 Subject: [PATCH 2/4] Use shorter uninitialized period to not break tests --- .../xpack/esql/action/EsqlExecutionInfo.java | 8 +++----- .../elasticsearch/xpack/esql/session/EsqlCCSUtils.java | 3 ++- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index 96d04c6adbb19..5dfc6c895f430 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -68,7 +68,7 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable { // old Cluster Object to a new Cluster Object with the remapping function. public final ConcurrentMap clusterInfo; // Did we initialize the clusterInfo map? If not, then we will serialize it as empty. - private transient volatile boolean clusterInfoInitialized = false; + private transient volatile boolean clusterInfoInitialized = true; // whether the user has asked for CCS metadata to be in the JSON response (the overall took will always be present) private final boolean includeCCSMetadata; @@ -106,13 +106,11 @@ public EsqlExecutionInfo(Predicate skipUnavailablePredicate, boolean inc this.includeCCSMetadata = includeCCSMetadata; this.skipUnavailablePredicate = Predicates.always(); this.relativeStart = null; - this.clusterInfoInitialized = true; } public EsqlExecutionInfo(StreamInput in) throws IOException { this.overallTook = in.readOptionalTimeValue(); this.clusterInfo = in.readMapValues(EsqlExecutionInfo.Cluster::new, Cluster::getClusterAlias, ConcurrentHashMap::new); - this.clusterInfoInitialized = true; this.includeCCSMetadata = in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) ? in.readBoolean() : false; this.isPartial = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_RESPONSE_PARTIAL) ? in.readBoolean() : false; this.skipUnavailablePredicate = Predicates.always(); @@ -354,8 +352,8 @@ public boolean isStopped() { return isStopped; } - public void clusterInfoInitialized() { - this.clusterInfoInitialized = true; + public void clusterInfoInitialized(boolean clusterInfoInitialized) { + this.clusterInfoInitialized = clusterInfoInitialized; } /** diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index 9bc8df5af31a7..1d013f9e1a72a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -337,6 +337,7 @@ public static void initCrossClusterState( patterns.getFirst().indexPattern() ); + executionInfo.clusterInfoInitialized(false); // initialize the cluster entries in EsqlExecutionInfo before throwing the invalid license error // so that the CCS telemetry handler can recognize that this error is CCS-related for (var entry : groupedIndices.entrySet()) { @@ -347,7 +348,7 @@ public static void initCrossClusterState( return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.isSkipUnavailable(clusterAlias)); }); } - executionInfo.clusterInfoInitialized(); + executionInfo.clusterInfoInitialized(true); // check if it is a cross-cluster query if (groupedIndices.size() > 1 || groupedIndices.containsKey(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY) == false) { if (EsqlLicenseChecker.isCcsAllowed(licenseState) == false) { From 1100c84d410a6e204a5677c4cd1ba7579d273f41 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Wed, 13 Aug 2025 10:40:04 -0600 Subject: [PATCH 3/4] better wording --- .../xpack/esql/action/EsqlExecutionInfo.java | 10 ++++----- .../xpack/esql/session/EsqlCCSUtils.java | 21 +++++++++++-------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index 5dfc6c895f430..62e8585700b18 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -67,8 +67,8 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable { // Updates to the Cluster occur with the updateCluster method that given the key to map transforms an // old Cluster Object to a new Cluster Object with the remapping function. public final ConcurrentMap clusterInfo; - // Did we initialize the clusterInfo map? If not, then we will serialize it as empty. - private transient volatile boolean clusterInfoInitialized = true; + // Is the clusterInfo map iinitialization in progress? If so, we should not try to serialize it. + private transient volatile boolean clusterInfoInitializing; // whether the user has asked for CCS metadata to be in the JSON response (the overall took will always be present) private final boolean includeCCSMetadata; @@ -125,7 +125,7 @@ public EsqlExecutionInfo(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { out.writeOptionalTimeValue(overallTook); - if (clusterInfo != null && clusterInfoInitialized) { + if (clusterInfo != null && clusterInfoInitializing == false) { out.writeCollection(clusterInfo.values()); } else { out.writeCollection(Collections.emptyList()); @@ -352,8 +352,8 @@ public boolean isStopped() { return isStopped; } - public void clusterInfoInitialized(boolean clusterInfoInitialized) { - this.clusterInfoInitialized = clusterInfoInitialized; + public void clusterInfoInitializing(boolean clusterInfoInitializing) { + this.clusterInfoInitializing = clusterInfoInitializing; } /** diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index 1d013f9e1a72a..dc4a412560a13 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -337,18 +337,21 @@ public static void initCrossClusterState( patterns.getFirst().indexPattern() ); - executionInfo.clusterInfoInitialized(false); + executionInfo.clusterInfoInitializing(true); // initialize the cluster entries in EsqlExecutionInfo before throwing the invalid license error // so that the CCS telemetry handler can recognize that this error is CCS-related - for (var entry : groupedIndices.entrySet()) { - final String clusterAlias = entry.getKey(); - final String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices()); - executionInfo.swapCluster(clusterAlias, (k, v) -> { - assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet"; - return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.isSkipUnavailable(clusterAlias)); - }); + try { + for (var entry : groupedIndices.entrySet()) { + final String clusterAlias = entry.getKey(); + final String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices()); + executionInfo.swapCluster(clusterAlias, (k, v) -> { + assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet"; + return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.isSkipUnavailable(clusterAlias)); + }); + } + } finally { + executionInfo.clusterInfoInitializing(false); } - executionInfo.clusterInfoInitialized(true); // check if it is a cross-cluster query if (groupedIndices.size() > 1 || groupedIndices.containsKey(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY) == false) { if (EsqlLicenseChecker.isCcsAllowed(licenseState) == false) { From 98f0f6677ac5dc87b7dee21ce135bbd499d3c567 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Fri, 15 Aug 2025 10:22:58 -0600 Subject: [PATCH 4/4] Fix merge --- .../xpack/esql/session/EsqlCCSUtils.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index 1d6349d2fa363..a4a8ed63a1a03 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -342,16 +342,18 @@ public static void initCrossClusterState( // initialize the cluster entries in EsqlExecutionInfo before throwing the invalid license error // so that the CCS telemetry handler can recognize that this error is CCS-related try { - for (var entry : groupedIndices.entrySet()) { - final String clusterAlias = entry.getKey(); - final String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices()); - executionInfo.swapCluster(clusterAlias, (k, v) -> { - assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet"; - return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.shouldSkipOnFailure(clusterAlias)); - }); + for (var entry : groupedIndices.entrySet()) { + final String clusterAlias = entry.getKey(); + final String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices()); + executionInfo.swapCluster(clusterAlias, (k, v) -> { + assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet"; + return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.shouldSkipOnFailure(clusterAlias)); + }); + } } finally { executionInfo.clusterInfoInitializing(false); } + // check if it is a cross-cluster query if (groupedIndices.size() > 1 || groupedIndices.containsKey(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY) == false) { if (EsqlLicenseChecker.isCcsAllowed(licenseState) == false) {