diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java index 3e3ab92f5d72f..47f61c21099cb 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java @@ -802,6 +802,28 @@ public Map readMapValues(final Writeable.Reader valueReader, fin return map; } + /** + * Reads a multiple {@code V}-values and then converts them to a {@code Map} using keyMapper. + * + * @param valueReader The value reader + * @param keyMapper function to create a key from a value + * @param constructor map constructor + * @return Never {@code null}. + */ + public > M readMapValues( + final Writeable.Reader valueReader, + final Function keyMapper, + final IntFunction constructor + ) throws IOException { + final int size = readArraySize(); + final M map = constructor.apply(size); + for (int i = 0; i < size; i++) { + V value = valueReader.read(this); + map.put(keyMapper.apply(value), value); + } + return map; + } + /** * If the returned map contains any entries it will be mutable. If it is empty it might be immutable. */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index 61d0d3b0e1026..9ac56ad433872 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -13,7 +13,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.ChunkedToXContentHelper; import org.elasticsearch.common.xcontent.ChunkedToXContentObject; import org.elasticsearch.core.Predicates; @@ -37,6 +36,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.function.BiFunction; import java.util.function.Predicate; @@ -66,7 +66,7 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable { // The Map itself is immutable after construction - all Clusters will be accounted for at the start of the search. // Updates to the Cluster occur with the updateCluster method that given the key to map transforms an // old Cluster Object to a new Cluster Object with the remapping function. - public final Map clusterInfo; + public final ConcurrentMap clusterInfo; // whether the user has asked for CCS metadata to be in the JSON response (the overall took will always be present) private final boolean includeCCSMetadata; @@ -90,7 +90,7 @@ public EsqlExecutionInfo(boolean includeCCSMetadata) { * @param includeCCSMetadata (user defined setting) whether to include the CCS metadata in the HTTP response */ public EsqlExecutionInfo(Predicate skipUnavailablePredicate, boolean includeCCSMetadata) { - this.clusterInfo = ConcurrentCollections.newConcurrentMap(); + this.clusterInfo = new ConcurrentHashMap<>(); this.skipUnavailablePredicate = skipUnavailablePredicate; this.includeCCSMetadata = includeCCSMetadata; this.relativeStart = TimeSpan.start(); @@ -108,26 +108,9 @@ public EsqlExecutionInfo(Predicate skipUnavailablePredicate, boolean inc public EsqlExecutionInfo(StreamInput in) throws IOException { this.overallTook = in.readOptionalTimeValue(); - List clusterList = in.readCollectionAsList(EsqlExecutionInfo.Cluster::new); - if (clusterList.isEmpty()) { - this.clusterInfo = ConcurrentCollections.newConcurrentMap(); - } else { - Map m = ConcurrentCollections.newConcurrentMap(); - clusterList.forEach(c -> m.put(c.getClusterAlias(), c)); - this.clusterInfo = m; - } - if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) { - this.includeCCSMetadata = in.readBoolean(); - } else { - this.includeCCSMetadata = false; - } - - if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_RESPONSE_PARTIAL)) { - this.isPartial = in.readBoolean(); - } else { - this.isPartial = false; - } - + this.clusterInfo = in.readMapValues(EsqlExecutionInfo.Cluster::new, Cluster::getClusterAlias, ConcurrentHashMap::new); + this.includeCCSMetadata = in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) ? in.readBoolean() : false; + this.isPartial = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_RESPONSE_PARTIAL) ? in.readBoolean() : false; this.skipUnavailablePredicate = Predicates.always(); this.relativeStart = null; if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_QUERY_PLANNING_DURATION) @@ -141,7 +124,7 @@ public EsqlExecutionInfo(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { out.writeOptionalTimeValue(overallTook); if (clusterInfo != null) { - out.writeCollection(clusterInfo.values().stream().toList()); + out.writeCollection(clusterInfo.values()); } else { out.writeCollection(Collections.emptyList()); }