-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Simplify EsqlQueryResponse #129031
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Simplify EsqlQueryResponse #129031
Changes from 4 commits
54ed9c5
1aae75a
01b90cc
397e902
f0e77fc
ad0a3d0
e04864c
4cbbb83
8a5b342
3661342
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,6 +34,7 @@ | |
| import java.util.List; | ||
| import java.util.Objects; | ||
| import java.util.Optional; | ||
| import java.util.function.Supplier; | ||
|
|
||
| import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED; | ||
|
|
||
|
|
@@ -122,7 +123,7 @@ static EsqlQueryResponse deserialize(BlockStreamInput in) throws IOException { | |
| long documentsFound = in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED) ? in.readVLong() : 0; | ||
| long valuesLoaded = in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED) ? in.readVLong() : 0; | ||
| if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) { | ||
| profile = in.readOptionalWriteable(Profile::new); | ||
| profile = in.readOptionalWriteable(Profile::readFrom); | ||
| } | ||
| boolean columnar = in.readBoolean(); | ||
| EsqlExecutionInfo executionInfo = null; | ||
|
|
@@ -224,75 +225,68 @@ public EsqlExecutionInfo getExecutionInfo() { | |
| return executionInfo; | ||
| } | ||
|
|
||
| private Iterator<? extends ToXContent> asyncPropertiesOrEmpty() { | ||
| if (isAsync) { | ||
| return ChunkedToXContentHelper.chunk((builder, params) -> { | ||
| if (asyncExecutionId != null) { | ||
| builder.field("id", asyncExecutionId); | ||
| } | ||
| builder.field("is_running", isRunning); | ||
| return builder; | ||
| }); | ||
| } else { | ||
| return Collections.emptyIterator(); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) { | ||
| boolean dropNullColumns = params.paramAsBoolean(DROP_NULL_COLUMNS_OPTION, false); | ||
| boolean[] nullColumns = dropNullColumns ? nullColumns() : null; | ||
|
|
||
| Iterator<ToXContent> tookTime; | ||
| if (executionInfo != null && executionInfo.overallTook() != null) { | ||
| tookTime = ChunkedToXContentHelper.chunk( | ||
| (builder, p) -> builder.field("took", executionInfo.overallTook().millis()) | ||
| .field(EsqlExecutionInfo.IS_PARTIAL_FIELD.getPreferredName(), executionInfo.isPartial()) | ||
| ); | ||
| } else { | ||
| tookTime = Collections.emptyIterator(); | ||
| } | ||
|
|
||
| Iterator<ToXContent> meta = ChunkedToXContentHelper.chunk((builder, p) -> { | ||
| builder.field("documents_found", documentsFound); | ||
| builder.field("values_loaded", valuesLoaded); | ||
| return builder; | ||
| }); | ||
|
|
||
| Iterator<? extends ToXContent> columnHeadings = dropNullColumns | ||
| ? Iterators.concat( | ||
| ResponseXContentUtils.allColumns(columns, "all_columns"), | ||
| ResponseXContentUtils.nonNullColumns(columns, nullColumns, "columns") | ||
| ) | ||
| : ResponseXContentUtils.allColumns(columns, "columns"); | ||
| Iterator<? extends ToXContent> valuesIt = ResponseXContentUtils.columnValues(this.columns, this.pages, columnar, nullColumns); | ||
| Iterator<ToXContent> executionInfoRender = executionInfo != null && executionInfo.hasMetadataToReport() | ||
| ? ChunkedToXContentHelper.field("_clusters", executionInfo, params) | ||
| : Collections.emptyIterator(); | ||
| return Iterators.concat( | ||
| ChunkedToXContentHelper.startObject(), | ||
| asyncPropertiesOrEmpty(), | ||
| tookTime, | ||
| meta, | ||
| columnHeadings, | ||
| ChunkedToXContentHelper.array("values", valuesIt), | ||
| executionInfoRender, | ||
| profileRenderer(params), | ||
| conditionalChunkedXContent(isAsync, () -> ChunkedToXContentHelper.chunk((builder, p) -> { | ||
| if (asyncExecutionId != null) { | ||
| builder.field("id", asyncExecutionId); | ||
| } | ||
| builder.field("is_running", isRunning); | ||
| return builder; | ||
| })), | ||
| conditionalChunkedXContent( | ||
| executionInfo != null && executionInfo.overallTook() != null, | ||
| () -> ChunkedToXContentHelper.chunk( | ||
| (builder, p) -> builder // | ||
| .field("took", executionInfo.overallTook().millis()) | ||
| .field(EsqlExecutionInfo.IS_PARTIAL_FIELD.getPreferredName(), executionInfo.isPartial()) | ||
| ) | ||
| ), | ||
| ChunkedToXContentHelper.chunk( | ||
| (builder, p) -> builder // | ||
| .field("documents_found", documentsFound) | ||
| .field("values_loaded", valuesLoaded) | ||
| ), | ||
| dropNullColumns | ||
| ? Iterators.concat( | ||
| ResponseXContentUtils.allColumns(columns, "all_columns"), | ||
| ResponseXContentUtils.nonNullColumns(columns, nullColumns, "columns") | ||
| ) | ||
| : ResponseXContentUtils.allColumns(columns, "columns"), | ||
| ChunkedToXContentHelper.array("values", ResponseXContentUtils.columnValues(this.columns, this.pages, columnar, nullColumns)), | ||
| conditionalChunkedXContent( | ||
| executionInfo != null && executionInfo.hasMetadataToReport(), | ||
| () -> ChunkedToXContentHelper.field("_clusters", executionInfo, params) | ||
| ), | ||
| conditionalChunkedXContent( | ||
| profile != null, | ||
| () -> Iterators.concat( | ||
| ChunkedToXContentHelper.startObject("profile"), // | ||
| ChunkedToXContentHelper.chunk((b, p) -> { | ||
| if (executionInfo != null) { | ||
| b.field("query", executionInfo.overallTimeSpan()); | ||
| b.field("planning", executionInfo.planningTimeSpan()); | ||
| } | ||
| return b; | ||
| }), | ||
| ChunkedToXContentHelper.array("drivers", profile.drivers.iterator(), params), | ||
| ChunkedToXContentHelper.endObject() | ||
| ) | ||
| ), | ||
| ChunkedToXContentHelper.endObject() | ||
| ); | ||
| } | ||
|
|
||
| private Iterator<ToXContent> profileRenderer(ToXContent.Params params) { | ||
| if (profile == null) { | ||
| return Collections.emptyIterator(); | ||
| } | ||
| return Iterators.concat(ChunkedToXContentHelper.startObject("profile"), ChunkedToXContentHelper.chunk((b, p) -> { | ||
| if (executionInfo != null) { | ||
| b.field("query", executionInfo.overallTimeSpan()); | ||
| b.field("planning", executionInfo.planningTimeSpan()); | ||
| } | ||
| return b; | ||
| }), ChunkedToXContentHelper.array("drivers", profile.drivers.iterator(), params), ChunkedToXContentHelper.endObject()); | ||
| private Iterator<? extends ToXContent> conditionalChunkedXContent( | ||
| boolean condition, | ||
| Supplier<Iterator<? extends ToXContent>> chunkedXContent | ||
| ) { | ||
| return condition ? chunkedXContent.get() : Collections.emptyIterator(); | ||
| } | ||
|
|
||
| public boolean[] nullColumns() { | ||
|
|
@@ -396,41 +390,15 @@ public EsqlResponse responseInternal() { | |
| return esqlResponse; | ||
| } | ||
|
|
||
| public static class Profile implements Writeable { | ||
| private final List<DriverProfile> drivers; | ||
|
|
||
| public Profile(List<DriverProfile> drivers) { | ||
| this.drivers = drivers; | ||
| } | ||
| public record Profile(List<DriverProfile> drivers) implements Writeable { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we even need that? May be it is worth in-lining List instead?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe I wrote it as a record because I expected we'd add other |
||
|
|
||
| public Profile(StreamInput in) throws IOException { | ||
| this.drivers = in.readCollectionAsImmutableList(DriverProfile::readFrom); | ||
| public static Profile readFrom(StreamInput in) throws IOException { | ||
| return new Profile(in.readCollectionAsImmutableList(DriverProfile::readFrom)); | ||
| } | ||
|
|
||
| @Override | ||
| public void writeTo(StreamOutput out) throws IOException { | ||
| out.writeCollection(drivers); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object o) { | ||
| if (this == o) { | ||
| return true; | ||
| } | ||
| if (o == null || getClass() != o.getClass()) { | ||
| return false; | ||
| } | ||
| Profile profile = (Profile) o; | ||
| return Objects.equals(drivers, profile.drivers); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hash(drivers); | ||
| } | ||
|
|
||
| List<DriverProfile> drivers() { | ||
| return drivers; | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this'd be more readable as a traditional
I think so.