Skip to content

Commit 2eb08f0

Browse files
committed
Introduce StreamingXContentResponse
Similar to `ChunkedZipResponse` (elastic#109820) this utility allows Elasticsearch to send an `XContent`-based response constructed out of a sequence of `ChunkedToXContent` fragments, provided in a streaming and asynchronous fashion. This will enable elastic#93735 to proceed without needing to create a temporary index to hold the intermediate results.
1 parent 6d8e6ad commit 2eb08f0

File tree

2 files changed

+735
-0
lines changed

2 files changed

+735
-0
lines changed
Lines changed: 300 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.rest;
10+
11+
import org.apache.http.ConnectionClosedException;
12+
import org.apache.http.HttpResponse;
13+
import org.apache.http.nio.ContentDecoder;
14+
import org.apache.http.nio.IOControl;
15+
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
16+
import org.apache.http.protocol.HttpContext;
17+
import org.elasticsearch.action.ActionListener;
18+
import org.elasticsearch.action.ActionRunnable;
19+
import org.elasticsearch.action.support.RefCountingRunnable;
20+
import org.elasticsearch.client.Request;
21+
import org.elasticsearch.client.RequestOptions;
22+
import org.elasticsearch.client.internal.node.NodeClient;
23+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
24+
import org.elasticsearch.cluster.node.DiscoveryNodes;
25+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
26+
import org.elasticsearch.common.settings.ClusterSettings;
27+
import org.elasticsearch.common.settings.IndexScopedSettings;
28+
import org.elasticsearch.common.settings.Settings;
29+
import org.elasticsearch.common.settings.SettingsFilter;
30+
import org.elasticsearch.common.unit.ByteSizeUnit;
31+
import org.elasticsearch.common.util.CollectionUtils;
32+
import org.elasticsearch.common.util.concurrent.EsExecutors;
33+
import org.elasticsearch.common.util.concurrent.ThrottledIterator;
34+
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
35+
import org.elasticsearch.common.xcontent.XContentHelper;
36+
import org.elasticsearch.features.NodeFeature;
37+
import org.elasticsearch.plugins.ActionPlugin;
38+
import org.elasticsearch.plugins.Plugin;
39+
import org.elasticsearch.plugins.PluginsService;
40+
import org.elasticsearch.test.ESIntegTestCase;
41+
import org.elasticsearch.threadpool.ThreadPool;
42+
import org.elasticsearch.xcontent.json.JsonXContent;
43+
44+
import java.io.IOException;
45+
import java.nio.ByteBuffer;
46+
import java.util.Collection;
47+
import java.util.HashMap;
48+
import java.util.Iterator;
49+
import java.util.List;
50+
import java.util.Map;
51+
import java.util.concurrent.CountDownLatch;
52+
import java.util.concurrent.Semaphore;
53+
import java.util.concurrent.atomic.AtomicReference;
54+
import java.util.function.Predicate;
55+
import java.util.function.Supplier;
56+
import java.util.stream.Stream;
57+
import java.util.stream.StreamSupport;
58+
59+
import static org.hamcrest.Matchers.hasSize;
60+
61+
@ESIntegTestCase.ClusterScope(numDataNodes = 1)
62+
public class StreamingXContentResponseIT extends ESIntegTestCase {
63+
64+
@Override
65+
protected boolean addMockHttpTransport() {
66+
return false;
67+
}
68+
69+
@Override
70+
protected Collection<Class<? extends Plugin>> nodePlugins() {
71+
return CollectionUtils.appendToCopyNoNullElements(super.nodePlugins(), RandomXContentResponsePlugin.class);
72+
}
73+
74+
public static class RandomXContentResponsePlugin extends Plugin implements ActionPlugin {
75+
76+
public static final String ROUTE = "/_random_xcontent_response";
77+
78+
public static final String INFINITE_ROUTE = "/_random_infinite_xcontent_response";
79+
80+
public final AtomicReference<Response> responseRef = new AtomicReference<>();
81+
82+
public record Response(Map<String, String> fragments, CountDownLatch completedLatch) {}
83+
84+
@Override
85+
public Collection<RestHandler> getRestHandlers(
86+
Settings settings,
87+
NamedWriteableRegistry namedWriteableRegistry,
88+
RestController restController,
89+
ClusterSettings clusterSettings,
90+
IndexScopedSettings indexScopedSettings,
91+
SettingsFilter settingsFilter,
92+
IndexNameExpressionResolver indexNameExpressionResolver,
93+
Supplier<DiscoveryNodes> nodesInCluster,
94+
Predicate<NodeFeature> clusterSupportsFeature
95+
) {
96+
return List.of(
97+
// handler that returns a normal (finite) response
98+
new RestHandler() {
99+
@Override
100+
public List<Route> routes() {
101+
return List.of(new Route(RestRequest.Method.GET, ROUTE));
102+
}
103+
104+
@Override
105+
public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws IOException {
106+
final var response = new Response(new HashMap<>(), new CountDownLatch(1));
107+
final var entryCount = between(0, 10000);
108+
for (int i = 0; i < entryCount; i++) {
109+
response.fragments().put(randomIdentifier(), randomIdentifier());
110+
}
111+
assertTrue(responseRef.compareAndSet(null, response));
112+
handleStreamingXContentRestRequest(
113+
channel,
114+
client.threadPool(),
115+
response.completedLatch(),
116+
response.fragments().entrySet().iterator()
117+
);
118+
}
119+
},
120+
121+
// handler that just keeps on yielding chunks until aborted
122+
new RestHandler() {
123+
@Override
124+
public List<Route> routes() {
125+
return List.of(new Route(RestRequest.Method.GET, INFINITE_ROUTE));
126+
}
127+
128+
@Override
129+
public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws IOException {
130+
final var response = new Response(new HashMap<>(), new CountDownLatch(1));
131+
assertTrue(responseRef.compareAndSet(null, new Response(null, response.completedLatch())));
132+
handleStreamingXContentRestRequest(channel, client.threadPool(), response.completedLatch(), new Iterator<>() {
133+
134+
private long id;
135+
136+
// carry on yielding content even after the channel closes
137+
private final Semaphore trailingContentPermits = new Semaphore(between(0, 20));
138+
139+
@Override
140+
public boolean hasNext() {
141+
return request.getHttpChannel().isOpen() || trailingContentPermits.tryAcquire();
142+
}
143+
144+
@Override
145+
public Map.Entry<String, String> next() {
146+
return new Map.Entry<>() {
147+
private final String key = Long.toString(id++);
148+
private final String content = randomIdentifier();
149+
150+
@Override
151+
public String getKey() {
152+
return key;
153+
}
154+
155+
@Override
156+
public String getValue() {
157+
return content;
158+
}
159+
160+
@Override
161+
public String setValue(String value) {
162+
return fail(null, "must not setValue");
163+
}
164+
};
165+
}
166+
});
167+
}
168+
}
169+
);
170+
}
171+
172+
private static void handleStreamingXContentRestRequest(
173+
RestChannel channel,
174+
ThreadPool threadPool,
175+
CountDownLatch completionLatch,
176+
Iterator<Map.Entry<String, String>> fragmentIterator
177+
) throws IOException {
178+
try (var refs = new RefCountingRunnable(completionLatch::countDown)) {
179+
final var streamingXContentResponse = new StreamingXContentResponse(channel, channel.request(), refs.acquire());
180+
streamingXContentResponse.writeFragment(p -> ChunkedToXContentHelper.startObject(), refs.acquire());
181+
final var finalRef = refs.acquire();
182+
ThrottledIterator.run(
183+
fragmentIterator,
184+
(ref, fragment) -> randomFrom(EsExecutors.DIRECT_EXECUTOR_SERVICE, threadPool.generic()).execute(
185+
ActionRunnable.run(ActionListener.releaseAfter(refs.acquireListener(), ref), () -> {
186+
Thread.yield();
187+
streamingXContentResponse.writeFragment(
188+
p -> ChunkedToXContentHelper.field(fragment.getKey(), fragment.getValue()),
189+
refs.acquire()
190+
);
191+
})
192+
),
193+
between(1, 10),
194+
() -> {},
195+
() -> {
196+
try (streamingXContentResponse; finalRef) {
197+
streamingXContentResponse.writeFragment(p -> ChunkedToXContentHelper.endObject(), refs.acquire());
198+
}
199+
}
200+
);
201+
}
202+
}
203+
}
204+
205+
public void testRandomStreamingXContentResponse() throws IOException {
206+
final var request = new Request("GET", RandomXContentResponsePlugin.ROUTE);
207+
final var response = getRestClient().performRequest(request);
208+
final var actualEntries = XContentHelper.convertToMap(JsonXContent.jsonXContent, response.getEntity().getContent(), false);
209+
assertEquals(getExpectedEntries(), actualEntries);
210+
}
211+
212+
public void testAbort() throws IOException {
213+
final var request = new Request("GET", RandomXContentResponsePlugin.INFINITE_ROUTE);
214+
final var responseStarted = new CountDownLatch(1);
215+
final var bodyConsumed = new CountDownLatch(1);
216+
request.setOptions(RequestOptions.DEFAULT.toBuilder().setHttpAsyncResponseConsumerFactory(() -> new HttpAsyncResponseConsumer<>() {
217+
218+
final ByteBuffer readBuffer = ByteBuffer.allocate(ByteSizeUnit.KB.toIntBytes(4));
219+
int bytesToConsume = ByteSizeUnit.MB.toIntBytes(1);
220+
221+
@Override
222+
public void responseReceived(HttpResponse response) {
223+
responseStarted.countDown();
224+
}
225+
226+
@Override
227+
public void consumeContent(ContentDecoder decoder, IOControl ioControl) throws IOException {
228+
readBuffer.clear();
229+
final var bytesRead = decoder.read(readBuffer);
230+
if (bytesRead > 0) {
231+
bytesToConsume -= bytesRead;
232+
}
233+
234+
if (bytesToConsume <= 0) {
235+
bodyConsumed.countDown();
236+
ioControl.shutdown();
237+
}
238+
}
239+
240+
@Override
241+
public void responseCompleted(HttpContext context) {}
242+
243+
@Override
244+
public void failed(Exception ex) {}
245+
246+
@Override
247+
public Exception getException() {
248+
return null;
249+
}
250+
251+
@Override
252+
public HttpResponse getResult() {
253+
return null;
254+
}
255+
256+
@Override
257+
public boolean isDone() {
258+
return false;
259+
}
260+
261+
@Override
262+
public void close() {}
263+
264+
@Override
265+
public boolean cancel() {
266+
return false;
267+
}
268+
}));
269+
270+
try {
271+
try (var restClient = createRestClient(internalCluster().getRandomNodeName())) {
272+
// one-node REST client to avoid retries
273+
expectThrows(ConnectionClosedException.class, () -> restClient.performRequest(request));
274+
}
275+
safeAwait(responseStarted);
276+
safeAwait(bodyConsumed);
277+
} finally {
278+
assertNull(getExpectedEntries()); // mainly just checking that all refs are released
279+
}
280+
}
281+
282+
private static Map<String, String> getExpectedEntries() {
283+
final List<Map<String, String>> nodeResponses = StreamSupport
284+
// concatenate all the chunks in all the entries
285+
.stream(internalCluster().getInstances(PluginsService.class).spliterator(), false)
286+
.flatMap(p -> p.filterPlugins(RandomXContentResponsePlugin.class))
287+
.flatMap(p -> {
288+
final var response = p.responseRef.getAndSet(null);
289+
if (response == null) {
290+
return Stream.of();
291+
} else {
292+
safeAwait(response.completedLatch()); // ensures that all refs have been released
293+
return Stream.of(response.fragments());
294+
}
295+
})
296+
.toList();
297+
assertThat(nodeResponses, hasSize(1));
298+
return nodeResponses.get(0);
299+
}
300+
}

0 commit comments

Comments
 (0)