Skip to content

Commit 02ecea7

Browse files
committed
Changed all to work with Iterators and return them instead of calling sendChunks
1 parent 8db1bd6 commit 02ecea7

File tree

3 files changed

+94
-39
lines changed

3 files changed

+94
-39
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/stream/DefaultEsqlQueryResponseStream.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public class DefaultEsqlQueryResponseStream extends EsqlQueryResponseStream {
3636
@Nullable
3737
private List<ColumnInfoImpl> columns;
3838

39-
private boolean dropNullColumns;
39+
private final boolean dropNullColumns;
4040

4141
DefaultEsqlQueryResponseStream(RestChannel restChannel, RestRequest restRequest, EsqlQueryRequest esqlRequest) throws IOException {
4242
super(restChannel, restRequest, esqlRequest);
@@ -50,7 +50,7 @@ protected boolean canBeStreamed() {
5050
}
5151

5252
@Override
53-
protected void doStartResponse(List<ColumnInfoImpl> columns) {
53+
protected Iterator<? extends ToXContent> doStartResponse(List<ColumnInfoImpl> columns) {
5454
assert dropNullColumns == false : "this method doesn't support dropping null columns";
5555

5656
this.columns = columns;
@@ -63,18 +63,18 @@ protected void doStartResponse(List<ColumnInfoImpl> columns) {
6363
// Start the values array, to be filled in
6464
content.add(ChunkedToXContentHelper.startArray("values"));
6565

66-
sendChunks(content);
66+
return asIterator(content);
6767
}
6868

6969
@Override
70-
protected void doSendPages(Iterable<Page> pages) {
70+
protected Iterator<? extends ToXContent> doSendPages(Iterable<Page> pages) {
7171
assert columns != null : "columns must be set before sending pages";
7272

73-
sendChunks(List.of(ResponseXContentUtils.rowValues(columns, pages, null)));
73+
return ResponseXContentUtils.rowValues(columns, pages, null);
7474
}
7575

7676
@Override
77-
protected void doFinishResponse(EsqlQueryResponse response) {
77+
protected Iterator<? extends ToXContent> doFinishResponse(EsqlQueryResponse response) {
7878
var content = new ArrayList<Iterator<? extends ToXContent>>(10);
7979

8080
// End the values array
@@ -120,35 +120,36 @@ protected void doFinishResponse(EsqlQueryResponse response) {
120120

121121
content.add(ChunkedToXContentHelper.endObject());
122122

123-
sendChunks(content);
123+
return asIterator(content);
124124
}
125125

126126
@Override
127-
protected void doSendEverything(EsqlQueryResponse response) {
128-
// TODO: Close the response
127+
protected Iterator<? extends ToXContent> doSendEverything(EsqlQueryResponse response) {
129128
// final Releasable releasable = releasableFromResponse(esqlResponse);
130129

131130
// TODO: Instead of sendChunks, implement a flush() to attach the response to it? Or pass the response to the methods
132131
// TODO: Or make "doX" methods return an Iterator<? extends ToXContent> and then concat them all together
133132

133+
var content = new ArrayList<Iterator<? extends ToXContent>>(3);
134134
boolean[] nullColumns = null;
135135
if (dropNullColumns) {
136136
nullColumns = nullColumns(response.columns(), response.pages());
137-
sendStartResponseDroppingNullColumns(response.columns(), nullColumns);
137+
content.add(sendStartResponseDroppingNullColumns(response.columns(), nullColumns));
138138
} else {
139-
doStartResponse(response.columns());
139+
content.add(doStartResponse(response.columns()));
140140
}
141141
// doSendPages doesn't work with nullColumns or columnar, so we generate them here directly
142-
sendChunks(List.of(ResponseXContentUtils.columnValues(response.columns(), response.pages(), esqlRequest.columnar(), nullColumns)));
143-
doFinishResponse(response);
142+
content.add(ResponseXContentUtils.columnValues(response.columns(), response.pages(), esqlRequest.columnar(), nullColumns));
143+
content.add(doFinishResponse(response));
144+
return asIterator(content);
144145
}
145146

146147
@Override
147-
protected void doHandleException(Exception e) {
148-
// TODO: Implement this
148+
protected Iterator<? extends ToXContent> doHandleException(Exception e) {
149+
throw new UnsupportedOperationException("Not implemented yet");
149150
}
150151

151-
private void sendStartResponseDroppingNullColumns(List<ColumnInfoImpl> columns, boolean[] nullColumns) {
152+
private Iterator<? extends ToXContent> sendStartResponseDroppingNullColumns(List<ColumnInfoImpl> columns, boolean[] nullColumns) {
152153
assert dropNullColumns : "this method should only be called when dropping null columns";
153154

154155
var content = new ArrayList<Iterator<? extends ToXContent>>(3);
@@ -157,7 +158,7 @@ private void sendStartResponseDroppingNullColumns(List<ColumnInfoImpl> columns,
157158
content.add(ResponseXContentUtils.allColumns(columns, "all_columns"));
158159
content.add(ResponseXContentUtils.nonNullColumns(columns, nullColumns, "columns"));
159160

160-
sendChunks(content);
161+
return asIterator(content);
161162
}
162163

163164
private boolean[] nullColumns(List<ColumnInfoImpl> columns, List<Page> pages) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/stream/EsqlQueryResponseStream.java

Lines changed: 66 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -120,28 +120,35 @@ public final void startResponse(List<Attribute> schema) {
120120
return new ColumnInfoImpl(c.name(), c.dataType().outputType(), originalTypes);
121121
}).toList();
122122

123-
doStartResponse(columns);
123+
sendChunks(doStartResponse(columns));
124+
124125
initialStreamChunkSent = true;
125126
}
126127

127128
public final void sendPages(Iterable<Page> pages) {
128129
assert finished == false : "sendPages() called on a finished stream";
129130

130131
if (initialStreamChunkSent) {
131-
doSendPages(pages);
132+
sendChunks(doSendPages(pages));
132133
}
133134
}
134135

135136
public final void finishResponse(EsqlQueryResponse response) {
136137
assert finished == false : "finishResponse() called more than once";
137138

138-
try (response) {
139+
// TODO: Also, is this closing right? EsqlResponseListener uses releasableFromResponse(), which increments the ref first
140+
boolean success = false;
141+
try {
139142
if (initialStreamChunkSent) {
140-
doFinishResponse(response);
143+
sendChunks(doFinishResponse(response), response);
141144
} else {
142-
doSendEverything(response);
145+
sendChunks(doSendEverything(response), response);
143146
}
147+
success = true;
144148
} finally {
149+
if (success == false) {
150+
response.close();
151+
}
145152
finished = true;
146153
}
147154
}
@@ -152,6 +159,8 @@ public final void handleException(Exception e) {
152159
// TODO: To be overridden by subclasses. This should append the error to the stream, if possible
153160
LOGGER.error("Error while streaming response", e);
154161

162+
sendChunks(doHandleException(e));
163+
155164
finished = true;
156165
}
157166

@@ -185,25 +194,65 @@ public void onFailure(Exception e) {
185194
*/
186195
protected abstract boolean canBeStreamed();
187196

188-
protected abstract void doStartResponse(List<ColumnInfoImpl> columns);
197+
/**
198+
* Returns the chunks to be sent at the beginning of the response. Called once, at the start.
199+
* <p>
200+
* Only called if {@link #canBeStreamed()} returns {@code true}.
201+
* </p>
202+
*/
203+
protected abstract Iterator<? extends ToXContent> doStartResponse(List<ColumnInfoImpl> columns);
189204

190-
protected abstract void doSendPages(Iterable<Page> pages);
205+
/**
206+
* Returns the chunks for the given page. Called 0 to N times, after {@link #doStartResponse} and before {@link #doFinishResponse}.
207+
* <p>
208+
* Only called if {@link #canBeStreamed()} returns {@code true}.
209+
* </p>
210+
*/
211+
protected abstract Iterator<? extends ToXContent> doSendPages(Iterable<Page> pages);
191212

192-
protected abstract void doFinishResponse(EsqlQueryResponse response);
213+
/**
214+
* Returns the remaining chunks of the response. Called once, at the end of the response.
215+
* <p>
216+
* Only called if {@link #canBeStreamed()} returns {@code true}.
217+
* </p>
218+
*/
219+
protected abstract Iterator<? extends ToXContent> doFinishResponse(EsqlQueryResponse response);
193220

194-
protected abstract void doHandleException(Exception e);
221+
/**
222+
* Returns the chunks to be sent for the given exception.
223+
* <p>
224+
* This may be called at any time, so the code should track what was sent already
225+
* and how to send a meaningful response given the chunks sent in previous calls.
226+
* </p>
227+
*/
228+
protected abstract Iterator<? extends ToXContent> doHandleException(Exception e);
195229

196-
protected void doSendEverything(EsqlQueryResponse response) {
197-
// TODO: Is this safe? Should this be abstract to ensure proper implementation? Add tests for both cases
198-
doStartResponse(response.columns());
199-
doSendPages(response.pages());
200-
doFinishResponse(response);
230+
/**
231+
* Returns the chunks of the full response. Called once for the full response.
232+
* <p>
233+
* Only called if {@link #canBeStreamed()} returns {@code false}.
234+
* </p>
235+
*/
236+
protected Iterator<? extends ToXContent> doSendEverything(EsqlQueryResponse response) {
237+
// TODO: Is this safe? Should this be abstract to ensure proper implementation? Add tests for both streamed and "everything" cases
238+
return Iterators.concat(
239+
doStartResponse(response.columns()),
240+
doSendPages(response.pages()),
241+
doFinishResponse(response)
242+
);
201243
}
202244

203245
@SuppressWarnings("unchecked")
204-
protected final void sendChunks(List<Iterator<? extends ToXContent>> chunkedContent) {
205-
// TODO: Maybe accept a single chunk here, and do a flush() inside of each method?
206-
streamingXContentResponse.writeFragment(p0 -> Iterators.concat(chunkedContent.toArray(Iterator[]::new)), () -> {});
246+
protected static Iterator<? extends ToXContent> asIterator(List<Iterator<? extends ToXContent>> chunks) {
247+
return Iterators.concat(chunks.toArray(Iterator[]::new));
248+
}
249+
250+
private void sendChunks(Iterator<? extends ToXContent> chunks) {
251+
sendChunks(chunks, () -> {});
252+
}
253+
254+
private void sendChunks(Iterator<? extends ToXContent> chunks, Releasable releasable) {
255+
streamingXContentResponse.writeFragment(p0 -> chunks, releasable);
207256
}
208257

209258
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/stream/NonStreamingEsqlQueryResponseStream.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,15 @@
1111
import org.elasticsearch.compute.data.Page;
1212
import org.elasticsearch.rest.RestChannel;
1313
import org.elasticsearch.rest.RestRequest;
14+
import org.elasticsearch.xcontent.ToXContent;
1415
import org.elasticsearch.xpack.esql.action.ColumnInfoImpl;
1516
import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
1617
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
1718
import org.elasticsearch.xpack.esql.action.EsqlResponseListener;
1819

1920
import java.io.IOException;
21+
import java.util.Collections;
22+
import java.util.Iterator;
2023
import java.util.List;
2124

2225
/**
@@ -44,32 +47,34 @@ protected boolean canBeStreamed() {
4447
}
4548

4649
@Override
47-
protected void doStartResponse(List<ColumnInfoImpl> columns) {
50+
protected Iterator<? extends ToXContent> doStartResponse(List<ColumnInfoImpl> columns) {
4851
throw new UnsupportedOperationException("This class does not support streaming");
4952
}
5053

5154
@Override
52-
protected void doSendPages(Iterable<Page> pages) {
55+
protected Iterator<? extends ToXContent> doSendPages(Iterable<Page> pages) {
5356
throw new UnsupportedOperationException("This class does not support streaming");
5457
}
5558

5659
@Override
57-
protected void doFinishResponse(EsqlQueryResponse response) {
60+
protected Iterator<? extends ToXContent> doFinishResponse(EsqlQueryResponse response) {
5861
throw new UnsupportedOperationException("This class does not support streaming");
5962
}
6063

6164
@Override
62-
protected void doHandleException(Exception e) {
65+
protected Iterator<? extends ToXContent> doHandleException(Exception e) {
6366
listener.onFailure(e);
67+
return Collections.emptyIterator();
6468
}
6569

6670
@Override
67-
protected void doSendEverything(EsqlQueryResponse response) {
71+
protected Iterator<? extends ToXContent> doSendEverything(EsqlQueryResponse response) {
6872
// The base class will close the response, and the listener will do so too.
6973
// So we need to increment the reference count here
7074
// TODO: Check this
7175
response.incRef();
7276
listener.onResponse(response);
77+
return Collections.emptyIterator();
7378
}
7479

7580
@Override

0 commit comments

Comments
 (0)