Skip to content

Commit bc7db6c

Browse files
authored
Don't try to serialize half-baked cluster info (#132756)
* Don't try to serialize half-baked cluster info
1 parent b46e0ce commit bc7db6c

File tree

2 files changed

+20
-11
lines changed

2 files changed

+20
-11
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
6767
// Updates to the Cluster occur with the updateCluster method that given the key to map transforms an
6868
// old Cluster Object to a new Cluster Object with the remapping function.
6969
public final ConcurrentMap<String, Cluster> clusterInfo;
70+
// Is the clusterInfo map iinitialization in progress? If so, we should not try to serialize it.
71+
private transient volatile boolean clusterInfoInitializing;
7072
// whether the user has asked for CCS metadata to be in the JSON response (the overall took will always be present)
7173
private final boolean includeCCSMetadata;
7274

@@ -124,10 +126,8 @@ public EsqlExecutionInfo(StreamInput in) throws IOException {
124126
@Override
125127
public void writeTo(StreamOutput out) throws IOException {
126128
out.writeOptionalTimeValue(overallTook);
127-
if (clusterInfo != null) {
128-
// .stream().toList() creates an immutable copy of the cluster info entries
129-
// as today they might be still changing while serialization is happening
130-
out.writeCollection(clusterInfo.values().stream().toList());
129+
if (clusterInfo != null && clusterInfoInitializing == false) {
130+
out.writeCollection(clusterInfo.values());
131131
} else {
132132
out.writeCollection(Collections.emptyList());
133133
}
@@ -354,6 +354,10 @@ public boolean isStopped() {
354354
return isStopped;
355355
}
356356

357+
public void clusterInfoInitializing(boolean clusterInfoInitializing) {
358+
this.clusterInfoInitializing = clusterInfoInitializing;
359+
}
360+
357361
/**
358362
* Represents the search metadata about a particular cluster involved in a cross-cluster search.
359363
* The Cluster object can represent either the local cluster or a remote cluster.

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -338,15 +338,20 @@ public static void initCrossClusterState(
338338
patterns.getFirst().indexPattern()
339339
);
340340

341+
executionInfo.clusterInfoInitializing(true);
341342
// initialize the cluster entries in EsqlExecutionInfo before throwing the invalid license error
342343
// so that the CCS telemetry handler can recognize that this error is CCS-related
343-
for (var entry : groupedIndices.entrySet()) {
344-
final String clusterAlias = entry.getKey();
345-
final String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices());
346-
executionInfo.swapCluster(clusterAlias, (k, v) -> {
347-
assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet";
348-
return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.shouldSkipOnFailure(clusterAlias));
349-
});
344+
try {
345+
for (var entry : groupedIndices.entrySet()) {
346+
final String clusterAlias = entry.getKey();
347+
final String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices());
348+
executionInfo.swapCluster(clusterAlias, (k, v) -> {
349+
assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet";
350+
return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.shouldSkipOnFailure(clusterAlias));
351+
});
352+
}
353+
} finally {
354+
executionInfo.clusterInfoInitializing(false);
350355
}
351356

352357
// check if it is a cross-cluster query

0 commit comments

Comments
 (0)