Skip to content

Commit f294f81

Browse files
authored
Simplify EsqlExecution info serialization (#131823)
1 parent 7673059 commit f294f81

File tree

2 files changed

+29
-24
lines changed

2 files changed

+29
-24
lines changed

server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -802,6 +802,28 @@ public <K, V> Map<K, V> readMapValues(final Writeable.Reader<V> valueReader, fin
802802
return map;
803803
}
804804

805+
/**
806+
* Reads a multiple {@code V}-values and then converts them to a {@code Map} using keyMapper.
807+
*
808+
* @param valueReader The value reader
809+
* @param keyMapper function to create a key from a value
810+
* @param constructor map constructor
811+
* @return Never {@code null}.
812+
*/
813+
public <K, V, M extends Map<K, V>> M readMapValues(
814+
final Writeable.Reader<V> valueReader,
815+
final Function<V, K> keyMapper,
816+
final IntFunction<M> constructor
817+
) throws IOException {
818+
final int size = readArraySize();
819+
final M map = constructor.apply(size);
820+
for (int i = 0; i < size; i++) {
821+
V value = valueReader.read(this);
822+
map.put(keyMapper.apply(value), value);
823+
}
824+
return map;
825+
}
826+
805827
/**
806828
* If the returned map contains any entries it will be mutable. If it is empty it might be immutable.
807829
*/

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.elasticsearch.common.io.stream.StreamInput;
1414
import org.elasticsearch.common.io.stream.StreamOutput;
1515
import org.elasticsearch.common.io.stream.Writeable;
16-
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
1716
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
1817
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
1918
import org.elasticsearch.core.Predicates;
@@ -37,6 +36,7 @@
3736
import java.util.Map;
3837
import java.util.Objects;
3938
import java.util.Set;
39+
import java.util.concurrent.ConcurrentHashMap;
4040
import java.util.concurrent.ConcurrentMap;
4141
import java.util.function.BiFunction;
4242
import java.util.function.Predicate;
@@ -66,7 +66,7 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
6666
// The Map itself is immutable after construction - all Clusters will be accounted for at the start of the search.
6767
// Updates to the Cluster occur with the updateCluster method that given the key to map transforms an
6868
// old Cluster Object to a new Cluster Object with the remapping function.
69-
public final Map<String, Cluster> clusterInfo;
69+
public final ConcurrentMap<String, Cluster> clusterInfo;
7070
// whether the user has asked for CCS metadata to be in the JSON response (the overall took will always be present)
7171
private final boolean includeCCSMetadata;
7272

@@ -90,7 +90,7 @@ public EsqlExecutionInfo(boolean includeCCSMetadata) {
9090
* @param includeCCSMetadata (user defined setting) whether to include the CCS metadata in the HTTP response
9191
*/
9292
public EsqlExecutionInfo(Predicate<String> skipUnavailablePredicate, boolean includeCCSMetadata) {
93-
this.clusterInfo = ConcurrentCollections.newConcurrentMap();
93+
this.clusterInfo = new ConcurrentHashMap<>();
9494
this.skipUnavailablePredicate = skipUnavailablePredicate;
9595
this.includeCCSMetadata = includeCCSMetadata;
9696
this.relativeStart = TimeSpan.start();
@@ -108,26 +108,9 @@ public EsqlExecutionInfo(Predicate<String> skipUnavailablePredicate, boolean inc
108108

109109
public EsqlExecutionInfo(StreamInput in) throws IOException {
110110
this.overallTook = in.readOptionalTimeValue();
111-
List<EsqlExecutionInfo.Cluster> clusterList = in.readCollectionAsList(EsqlExecutionInfo.Cluster::new);
112-
if (clusterList.isEmpty()) {
113-
this.clusterInfo = ConcurrentCollections.newConcurrentMap();
114-
} else {
115-
Map<String, EsqlExecutionInfo.Cluster> m = ConcurrentCollections.newConcurrentMap();
116-
clusterList.forEach(c -> m.put(c.getClusterAlias(), c));
117-
this.clusterInfo = m;
118-
}
119-
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
120-
this.includeCCSMetadata = in.readBoolean();
121-
} else {
122-
this.includeCCSMetadata = false;
123-
}
124-
125-
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_RESPONSE_PARTIAL)) {
126-
this.isPartial = in.readBoolean();
127-
} else {
128-
this.isPartial = false;
129-
}
130-
111+
this.clusterInfo = in.readMapValues(EsqlExecutionInfo.Cluster::new, Cluster::getClusterAlias, ConcurrentHashMap::new);
112+
this.includeCCSMetadata = in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) ? in.readBoolean() : false;
113+
this.isPartial = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_RESPONSE_PARTIAL) ? in.readBoolean() : false;
131114
this.skipUnavailablePredicate = Predicates.always();
132115
this.relativeStart = null;
133116
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_QUERY_PLANNING_DURATION)
@@ -141,7 +124,7 @@ public EsqlExecutionInfo(StreamInput in) throws IOException {
141124
public void writeTo(StreamOutput out) throws IOException {
142125
out.writeOptionalTimeValue(overallTook);
143126
if (clusterInfo != null) {
144-
out.writeCollection(clusterInfo.values().stream().toList());
127+
out.writeCollection(clusterInfo.values());
145128
} else {
146129
out.writeCollection(Collections.emptyList());
147130
}

0 commit comments

Comments
 (0)