Skip to content

Commit c036a14

Browse files
committed
Added error handling and set a completion listener to the action
1 parent 5d7a5ef commit c036a14

File tree

4 files changed

+56
-11
lines changed

4 files changed

+56
-11
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlQueryAction.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.rest.action.RestCancellableNodeClient;
1818
import org.elasticsearch.xcontent.XContentParser;
1919
import org.elasticsearch.xpack.esql.action.stream.EsqlQueryResponseStream;
20+
import org.elasticsearch.xpack.esql.action.stream.NonStreamingEsqlQueryResponseStream;
2021

2122
import java.io.IOException;
2223
import java.util.List;
@@ -53,21 +54,19 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
5354

5455
protected static RestChannelConsumer restChannelConsumer(EsqlQueryRequest esqlRequest, RestRequest request, NodeClient client) {
5556
final Boolean partialResults = request.paramAsBoolean("allow_partial_results", null);
57+
final boolean stream = request.paramAsBoolean("stream", false);
5658
if (partialResults != null) {
5759
esqlRequest.allowPartialResults(partialResults);
5860
}
5961
LOGGER.debug("Beginning execution of ESQL query.\nQuery string: [{}]", esqlRequest.query());
6062
// TODO: Create responseStream here, and add to the request object (?). See RestRepositoryVerifyIntegrityAction
6163
return channel -> {
62-
final var responseStream = EsqlQueryResponseStream.forMediaType(channel, request);
64+
final var responseStream = stream
65+
? EsqlQueryResponseStream.forMediaType(channel, request)
66+
: new NonStreamingEsqlQueryResponseStream(channel, request, esqlRequest);
6367
esqlRequest.responseStream(responseStream);
6468
RestCancellableNodeClient cancellableClient = new RestCancellableNodeClient(client, request.getHttpChannel());
65-
cancellableClient.execute(
66-
EsqlQueryAction.INSTANCE,
67-
esqlRequest,
68-
// TODO: Replace this with a call to the responseStream
69-
new EsqlResponseListener(channel, request, esqlRequest).wrapWithLogging()
70-
);
69+
cancellableClient.execute(EsqlQueryAction.INSTANCE, esqlRequest, responseStream.completionListener());
7170
};
7271
}
7372

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/stream/DefaultEsqlQueryResponseStream.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ protected void doSendEverything(EsqlQueryResponse response) {
7373
doFinishResponse(response);
7474
}
7575

76+
@Override
77+
protected void doHandleException(Exception e) {
78+
// TODO: Implement this
79+
}
80+
7681
private void sendStartResponseDroppingNullColumns(List<ColumnInfoImpl> columns, List<Page> pages) {
7782
assert params.paramAsBoolean(DROP_NULL_COLUMNS_OPTION, false) : "this method should only be called when dropping null columns";
7883

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/stream/EsqlQueryResponseStream.java

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,13 @@
77

88
package org.elasticsearch.xpack.esql.action.stream;
99

10+
import org.elasticsearch.action.ActionListener;
1011
import org.elasticsearch.common.collect.Iterators;
1112
import org.elasticsearch.compute.data.Page;
1213
import org.elasticsearch.core.Nullable;
1314
import org.elasticsearch.core.Releasable;
15+
import org.elasticsearch.logging.LogManager;
16+
import org.elasticsearch.logging.Logger;
1417
import org.elasticsearch.rest.RestChannel;
1518
import org.elasticsearch.rest.RestRequest;
1619
import org.elasticsearch.rest.StreamingXContentResponse;
@@ -35,6 +38,7 @@
3538
* TODO: Took header wouldn't be available on streaming
3639
*/
3740
public abstract class EsqlQueryResponseStream implements Releasable {
41+
private static final Logger LOGGER = LogManager.getLogger(EsqlQueryResponseStream.class);
3842

3943
public static EsqlQueryResponseStream forMediaType(RestChannel restChannel, RestRequest request) {
4044
MediaType mediaType = EsqlMediaTypeParser.getResponseMediaType(request, XContentType.JSON);
@@ -53,7 +57,7 @@ public static EsqlQueryResponseStream forMediaType(RestChannel restChannel, Rest
5357
private final RestChannel restChannel;
5458
protected final ToXContent.Params params;
5559
/**
56-
* Initialized on the first call to {@link #startResponse()} and used to write the response chunks.
60+
* Initialized on the first call to {@link #startResponse} and used to write the response chunks.
5761
*/
5862
@Nullable
5963
private StreamingXContentResponse streamingXContentResponse;
@@ -79,7 +83,7 @@ protected EsqlQueryResponseStream(RestChannel restChannel, ToXContent.Params par
7983
*/
8084
public final void startResponse(List<ColumnInfoImpl> columns) throws IOException {
8185
assert streamingXContentResponse == null : "startResponse() called more than once";
82-
assert finished == false : "sendPages() called after finishResponse()";
86+
assert finished == false : "sendPages() called on a finished stream";
8387

8488
streamingXContentResponse = new StreamingXContentResponse(restChannel, restChannel.request(), () -> {});
8589

@@ -93,7 +97,7 @@ public final void startResponse(List<ColumnInfoImpl> columns) throws IOException
9397

9498
public final void sendPages(Iterable<Page> pages) {
9599
assert streamingXContentResponse != null : "sendPages() called before startResponse()";
96-
assert finished == false : "sendPages() called after finishResponse()";
100+
assert finished == false : "sendPages() called on a finished stream";
97101

98102
if (initialStreamChunkSent) {
99103
doSendPages(pages);
@@ -111,6 +115,36 @@ public final void finishResponse(EsqlQueryResponse response) {
111115
finished = true;
112116
}
113117

118+
public final void handleException(Exception e) {
119+
assert finished == false : "handleException() called on a finished stream";
120+
121+
// TODO: To be overridden by subclasses. This should append the error to the stream, if possible
122+
LOGGER.error("Error while streaming response", e);
123+
124+
finished = true;
125+
}
126+
127+
// TODO: For error handling, check RestActionListener error listener
128+
// TODO: Also ensure that we check if the channel is closed at some points (Also see RestActionListener)
129+
130+
public final ActionListener<EsqlQueryResponse> completionListener() {
131+
return new ActionListener<>() {
132+
@Override
133+
public void onResponse(EsqlQueryResponse esqlResponse) {
134+
assert finished == false : "completionListener() called on a finished stream";
135+
136+
finishResponse(esqlResponse);
137+
}
138+
139+
@Override
140+
public void onFailure(Exception e) {
141+
assert finished == false : "onFailure() called on a finished stream";
142+
143+
handleException(e);
144+
}
145+
};
146+
}
147+
114148
/**
115149
* Returns true if the response can be streamed, false otherwise.
116150
* <p>
@@ -126,6 +160,8 @@ public final void finishResponse(EsqlQueryResponse response) {
126160

127161
protected abstract void doFinishResponse(EsqlQueryResponse response);
128162

163+
protected abstract void doHandleException(Exception e);
164+
129165
protected void doSendEverything(EsqlQueryResponse response) {
130166
// TODO: Is this safe? Should this be abstract to ensure proper implementation? Add tests for both cases
131167
doStartResponse(response.columns());

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/stream/NonStreamingEsqlQueryResponseStream.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public class NonStreamingEsqlQueryResponseStream extends EsqlQueryResponseStream
3131

3232
private final ActionListener<EsqlQueryResponse> listener;
3333

34-
NonStreamingEsqlQueryResponseStream(RestChannel restChannel, RestRequest restRequest, EsqlQueryRequest esqlRequest) {
34+
public NonStreamingEsqlQueryResponseStream(RestChannel restChannel, RestRequest restRequest, EsqlQueryRequest esqlRequest) {
3535
super(restChannel, restRequest);
3636

3737
this.listener = new EsqlResponseListener(restChannel, restRequest, esqlRequest).wrapWithLogging();
@@ -57,6 +57,11 @@ protected void doFinishResponse(EsqlQueryResponse response) {
5757
throw new UnsupportedOperationException("This class does not support streaming");
5858
}
5959

60+
@Override
61+
protected void doHandleException(Exception e) {
62+
listener.onFailure(e);
63+
}
64+
6065
@Override
6166
protected void doSendEverything(EsqlQueryResponse response) {
6267
// The base class will close the response, and the listener will do so too.

0 commit comments

Comments
 (0)