Skip to content

Commit 9b432fe

Browse files
committed
Simplify EsqlExecution info serialization
1 parent 75ca874 commit 9b432fe

File tree

1 file changed

+7
-24
lines changed

1 file changed

+7
-24
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.elasticsearch.common.io.stream.StreamInput;
1414
import org.elasticsearch.common.io.stream.StreamOutput;
1515
import org.elasticsearch.common.io.stream.Writeable;
16-
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
1716
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
1817
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
1918
import org.elasticsearch.core.Predicates;
@@ -37,6 +36,7 @@
3736
import java.util.Map;
3837
import java.util.Objects;
3938
import java.util.Set;
39+
import java.util.concurrent.ConcurrentHashMap;
4040
import java.util.concurrent.ConcurrentMap;
4141
import java.util.function.BiFunction;
4242
import java.util.function.Predicate;
@@ -66,7 +66,7 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
6666
// The Map itself is immutable after construction - all Clusters will be accounted for at the start of the search.
6767
// Updates to the Cluster occur with the updateCluster method that given the key to map transforms an
6868
// old Cluster Object to a new Cluster Object with the remapping function.
69-
public final Map<String, Cluster> clusterInfo;
69+
public final ConcurrentMap<String, Cluster> clusterInfo;
7070
// whether the user has asked for CCS metadata to be in the JSON response (the overall took will always be present)
7171
private final boolean includeCCSMetadata;
7272

@@ -90,7 +90,7 @@ public EsqlExecutionInfo(boolean includeCCSMetadata) {
9090
* @param includeCCSMetadata (user defined setting) whether to include the CCS metadata in the HTTP response
9191
*/
9292
public EsqlExecutionInfo(Predicate<String> skipUnavailablePredicate, boolean includeCCSMetadata) {
93-
this.clusterInfo = ConcurrentCollections.newConcurrentMap();
93+
this.clusterInfo = new ConcurrentHashMap<>();
9494
this.skipUnavailablePredicate = skipUnavailablePredicate;
9595
this.includeCCSMetadata = includeCCSMetadata;
9696
this.relativeStart = TimeSpan.start();
@@ -108,26 +108,9 @@ public EsqlExecutionInfo(Predicate<String> skipUnavailablePredicate, boolean inc
108108

109109
public EsqlExecutionInfo(StreamInput in) throws IOException {
110110
this.overallTook = in.readOptionalTimeValue();
111-
List<EsqlExecutionInfo.Cluster> clusterList = in.readCollectionAsList(EsqlExecutionInfo.Cluster::new);
112-
if (clusterList.isEmpty()) {
113-
this.clusterInfo = ConcurrentCollections.newConcurrentMap();
114-
} else {
115-
Map<String, EsqlExecutionInfo.Cluster> m = ConcurrentCollections.newConcurrentMap();
116-
clusterList.forEach(c -> m.put(c.getClusterAlias(), c));
117-
this.clusterInfo = m;
118-
}
119-
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
120-
this.includeCCSMetadata = in.readBoolean();
121-
} else {
122-
this.includeCCSMetadata = false;
123-
}
124-
125-
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_RESPONSE_PARTIAL)) {
126-
this.isPartial = in.readBoolean();
127-
} else {
128-
this.isPartial = false;
129-
}
130-
111+
this.clusterInfo = new ConcurrentHashMap<>(in.readMapValues(EsqlExecutionInfo.Cluster::new, Cluster::getClusterAlias));
112+
this.includeCCSMetadata = in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) && in.readBoolean();
113+
this.isPartial = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_RESPONSE_PARTIAL) && in.readBoolean();
131114
this.skipUnavailablePredicate = Predicates.always();
132115
this.relativeStart = null;
133116
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_QUERY_PLANNING_DURATION)
@@ -141,7 +124,7 @@ public EsqlExecutionInfo(StreamInput in) throws IOException {
141124
public void writeTo(StreamOutput out) throws IOException {
142125
out.writeOptionalTimeValue(overallTook);
143126
if (clusterInfo != null) {
144-
out.writeCollection(clusterInfo.values().stream().toList());
127+
out.writeCollection(clusterInfo.values());
145128
} else {
146129
out.writeCollection(Collections.emptyList());
147130
}

0 commit comments

Comments
 (0)