-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Simplify EsqlExecution info serialization #131823
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,7 +13,6 @@ | |
| import org.elasticsearch.common.io.stream.StreamInput; | ||
| import org.elasticsearch.common.io.stream.StreamOutput; | ||
| import org.elasticsearch.common.io.stream.Writeable; | ||
| import org.elasticsearch.common.util.concurrent.ConcurrentCollections; | ||
| import org.elasticsearch.common.xcontent.ChunkedToXContentHelper; | ||
| import org.elasticsearch.common.xcontent.ChunkedToXContentObject; | ||
| import org.elasticsearch.core.Predicates; | ||
|
|
@@ -37,6 +36,7 @@ | |
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.Set; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.ConcurrentMap; | ||
| import java.util.function.BiFunction; | ||
| import java.util.function.Predicate; | ||
|
|
@@ -66,7 +66,7 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable { | |
| // The Map itself is immutable after construction - all Clusters will be accounted for at the start of the search. | ||
| // Updates to the Cluster occur with the updateCluster method that given the key to map transforms an | ||
| // old Cluster Object to a new Cluster Object with the remapping function. | ||
| public final Map<String, Cluster> clusterInfo; | ||
| public final ConcurrentMap<String, Cluster> clusterInfo; | ||
| // whether the user has asked for CCS metadata to be in the JSON response (the overall took will always be present) | ||
| private final boolean includeCCSMetadata; | ||
|
|
||
|
|
@@ -90,7 +90,7 @@ public EsqlExecutionInfo(boolean includeCCSMetadata) { | |
| * @param includeCCSMetadata (user defined setting) whether to include the CCS metadata in the HTTP response | ||
| */ | ||
| public EsqlExecutionInfo(Predicate<String> skipUnavailablePredicate, boolean includeCCSMetadata) { | ||
| this.clusterInfo = ConcurrentCollections.newConcurrentMap(); | ||
| this.clusterInfo = new ConcurrentHashMap<>(); | ||
| this.skipUnavailablePredicate = skipUnavailablePredicate; | ||
| this.includeCCSMetadata = includeCCSMetadata; | ||
| this.relativeStart = TimeSpan.start(); | ||
|
|
@@ -108,26 +108,9 @@ public EsqlExecutionInfo(Predicate<String> skipUnavailablePredicate, boolean inc | |
|
|
||
| public EsqlExecutionInfo(StreamInput in) throws IOException { | ||
| this.overallTook = in.readOptionalTimeValue(); | ||
| List<EsqlExecutionInfo.Cluster> clusterList = in.readCollectionAsList(EsqlExecutionInfo.Cluster::new); | ||
| if (clusterList.isEmpty()) { | ||
| this.clusterInfo = ConcurrentCollections.newConcurrentMap(); | ||
| } else { | ||
| Map<String, EsqlExecutionInfo.Cluster> m = ConcurrentCollections.newConcurrentMap(); | ||
| clusterList.forEach(c -> m.put(c.getClusterAlias(), c)); | ||
| this.clusterInfo = m; | ||
| } | ||
| if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) { | ||
| this.includeCCSMetadata = in.readBoolean(); | ||
| } else { | ||
| this.includeCCSMetadata = false; | ||
| } | ||
|
|
||
| if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_RESPONSE_PARTIAL)) { | ||
| this.isPartial = in.readBoolean(); | ||
| } else { | ||
| this.isPartial = false; | ||
| } | ||
|
|
||
| this.clusterInfo = in.readMapValues(EsqlExecutionInfo.Cluster::new, Cluster::getClusterAlias, ConcurrentHashMap::new); | ||
| this.includeCCSMetadata = in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) ? in.readBoolean() : false; | ||
| this.isPartial = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_RESPONSE_PARTIAL) ? in.readBoolean() : false; | ||
| this.skipUnavailablePredicate = Predicates.always(); | ||
| this.relativeStart = null; | ||
| if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_QUERY_PLANNING_DURATION) | ||
|
|
@@ -141,7 +124,7 @@ public EsqlExecutionInfo(StreamInput in) throws IOException { | |
| public void writeTo(StreamOutput out) throws IOException { | ||
| out.writeOptionalTimeValue(overallTook); | ||
| if (clusterInfo != null) { | ||
| out.writeCollection(clusterInfo.values().stream().toList()); | ||
| out.writeCollection(clusterInfo.values()); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do not think we need to copy this value before serializing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you are right here, the nodes would only send the info at the end of the computation, AFAIR. |
||
| } else { | ||
| out.writeCollection(Collections.emptyList()); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the difference here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
elasticsearch/server/src/main/java/org/elasticsearch/common/util/concurrent/ConcurrentCollections.java
Lines 44 to 46 in a59c182
newConcurrentMap simply delegates to
new ConcurrentHashMap. Not sure why we have it. I am replacing it with what it actually is.