diff --git a/server/src/internalClusterTest/java/org/elasticsearch/rest/StreamingXContentResponseIT.java b/server/src/internalClusterTest/java/org/elasticsearch/rest/StreamingXContentResponseIT.java new file mode 100644 index 0000000000000..ae91caea888db --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/rest/StreamingXContentResponseIT.java @@ -0,0 +1,300 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.rest; + +import org.apache.http.ConnectionClosedException; +import org.apache.http.HttpResponse; +import org.apache.http.nio.ContentDecoder; +import org.apache.http.nio.IOControl; +import org.apache.http.nio.protocol.HttpAsyncResponseConsumer; +import org.apache.http.protocol.HttpContext; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.support.RefCountingRunnable; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.ThrottledIterator; +import org.elasticsearch.common.xcontent.ChunkedToXContentHelper; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.features.NodeFeature; +import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xcontent.json.JsonXContent; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.hamcrest.Matchers.hasSize; + +@ESIntegTestCase.ClusterScope(numDataNodes = 1) +public class StreamingXContentResponseIT extends ESIntegTestCase { + + @Override + protected boolean addMockHttpTransport() { + return false; + } + + @Override + protected Collection> nodePlugins() { + return CollectionUtils.appendToCopyNoNullElements(super.nodePlugins(), RandomXContentResponsePlugin.class); + } + + public static class RandomXContentResponsePlugin extends Plugin implements ActionPlugin { + + public static final String ROUTE = "/_random_xcontent_response"; + + public static final String INFINITE_ROUTE = "/_random_infinite_xcontent_response"; + + public final AtomicReference responseRef = new AtomicReference<>(); + + public record Response(Map fragments, CountDownLatch completedLatch) {} + + @Override + public Collection getRestHandlers( + Settings settings, + NamedWriteableRegistry namedWriteableRegistry, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster, + Predicate clusterSupportsFeature + ) { + return List.of( + // handler that returns a normal (finite) response + new RestHandler() { + @Override + public List routes() { + return List.of(new Route(RestRequest.Method.GET, ROUTE)); + } + + @Override + public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws IOException { + final var response = new Response(new HashMap<>(), new CountDownLatch(1)); + final var entryCount = between(0, 10000); + for (int i = 0; i < entryCount; i++) { + response.fragments().put(randomIdentifier(), randomIdentifier()); + } + assertTrue(responseRef.compareAndSet(null, response)); + handleStreamingXContentRestRequest( + channel, + client.threadPool(), + response.completedLatch(), + response.fragments().entrySet().iterator() + ); + } + }, + + // handler that just keeps on yielding chunks until aborted + new RestHandler() { + @Override + public List routes() { + return List.of(new Route(RestRequest.Method.GET, INFINITE_ROUTE)); + } + + @Override + public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws IOException { + final var response = new Response(new HashMap<>(), new CountDownLatch(1)); + assertTrue(responseRef.compareAndSet(null, new Response(null, response.completedLatch()))); + handleStreamingXContentRestRequest(channel, client.threadPool(), response.completedLatch(), new Iterator<>() { + + private long id; + + // carry on yielding content even after the channel closes + private final Semaphore trailingContentPermits = new Semaphore(between(0, 20)); + + @Override + public boolean hasNext() { + return request.getHttpChannel().isOpen() || trailingContentPermits.tryAcquire(); + } + + @Override + public Map.Entry next() { + return new Map.Entry<>() { + private final String key = Long.toString(id++); + private final String content = randomIdentifier(); + + @Override + public String getKey() { + return key; + } + + @Override + public String getValue() { + return content; + } + + @Override + public String setValue(String value) { + return fail(null, "must not setValue"); + } + }; + } + }); + } + } + ); + } + + private static void handleStreamingXContentRestRequest( + RestChannel channel, + ThreadPool threadPool, + CountDownLatch completionLatch, + Iterator> fragmentIterator + ) throws IOException { + try (var refs = new RefCountingRunnable(completionLatch::countDown)) { + final var streamingXContentResponse = new StreamingXContentResponse(channel, channel.request(), refs.acquire()); + streamingXContentResponse.writeFragment(p -> ChunkedToXContentHelper.startObject(), refs.acquire()); + final var finalRef = refs.acquire(); + ThrottledIterator.run( + fragmentIterator, + (ref, fragment) -> randomFrom(EsExecutors.DIRECT_EXECUTOR_SERVICE, threadPool.generic()).execute( + ActionRunnable.run(ActionListener.releaseAfter(refs.acquireListener(), ref), () -> { + Thread.yield(); + streamingXContentResponse.writeFragment( + p -> ChunkedToXContentHelper.field(fragment.getKey(), fragment.getValue()), + refs.acquire() + ); + }) + ), + between(1, 10), + () -> {}, + () -> { + try (streamingXContentResponse; finalRef) { + streamingXContentResponse.writeFragment(p -> ChunkedToXContentHelper.endObject(), refs.acquire()); + } + } + ); + } + } + } + + public void testRandomStreamingXContentResponse() throws IOException { + final var request = new Request("GET", RandomXContentResponsePlugin.ROUTE); + final var response = getRestClient().performRequest(request); + final var actualEntries = XContentHelper.convertToMap(JsonXContent.jsonXContent, response.getEntity().getContent(), false); + assertEquals(getExpectedEntries(), actualEntries); + } + + public void testAbort() throws IOException { + final var request = new Request("GET", RandomXContentResponsePlugin.INFINITE_ROUTE); + final var responseStarted = new CountDownLatch(1); + final var bodyConsumed = new CountDownLatch(1); + request.setOptions(RequestOptions.DEFAULT.toBuilder().setHttpAsyncResponseConsumerFactory(() -> new HttpAsyncResponseConsumer<>() { + + final ByteBuffer readBuffer = ByteBuffer.allocate(ByteSizeUnit.KB.toIntBytes(4)); + int bytesToConsume = ByteSizeUnit.MB.toIntBytes(1); + + @Override + public void responseReceived(HttpResponse response) { + responseStarted.countDown(); + } + + @Override + public void consumeContent(ContentDecoder decoder, IOControl ioControl) throws IOException { + readBuffer.clear(); + final var bytesRead = decoder.read(readBuffer); + if (bytesRead > 0) { + bytesToConsume -= bytesRead; + } + + if (bytesToConsume <= 0) { + bodyConsumed.countDown(); + ioControl.shutdown(); + } + } + + @Override + public void responseCompleted(HttpContext context) {} + + @Override + public void failed(Exception ex) {} + + @Override + public Exception getException() { + return null; + } + + @Override + public HttpResponse getResult() { + return null; + } + + @Override + public boolean isDone() { + return false; + } + + @Override + public void close() {} + + @Override + public boolean cancel() { + return false; + } + })); + + try { + try (var restClient = createRestClient(internalCluster().getRandomNodeName())) { + // one-node REST client to avoid retries + expectThrows(ConnectionClosedException.class, () -> restClient.performRequest(request)); + } + safeAwait(responseStarted); + safeAwait(bodyConsumed); + } finally { + assertNull(getExpectedEntries()); // mainly just checking that all refs are released + } + } + + private static Map getExpectedEntries() { + final List> nodeResponses = StreamSupport + // concatenate all the chunks in all the entries + .stream(internalCluster().getInstances(PluginsService.class).spliterator(), false) + .flatMap(p -> p.filterPlugins(RandomXContentResponsePlugin.class)) + .flatMap(p -> { + final var response = p.responseRef.getAndSet(null); + if (response == null) { + return Stream.of(); + } else { + safeAwait(response.completedLatch()); // ensures that all refs have been released + return Stream.of(response.fragments()); + } + }) + .toList(); + assertThat(nodeResponses, hasSize(1)); + return nodeResponses.get(0); + } +} diff --git a/server/src/main/java/org/elasticsearch/rest/StreamingXContentResponse.java b/server/src/main/java/org/elasticsearch/rest/StreamingXContentResponse.java new file mode 100644 index 0000000000000..9f20416ff8b06 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/StreamingXContentResponse.java @@ -0,0 +1,435 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.rest; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.io.stream.BytesStream; +import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; +import org.elasticsearch.common.recycler.Recycler; +import org.elasticsearch.common.xcontent.ChunkedToXContent; +import org.elasticsearch.core.AbstractRefCounted; +import org.elasticsearch.core.IOUtils; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.RefCounted; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.core.RestApiVersion; +import org.elasticsearch.core.Streams; +import org.elasticsearch.transport.Transports; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A REST response with an XContent body to which the caller can write fragments of content in an asynchronous and streaming fashion. + *

+ * Callers submit individual fragments of content using {@link #writeFragment}. Internally, the output entries are held in a queue. + * If the queue becomes empty then the response transmission is paused until the next entry becomes available. + *

+ * The internal queue is unbounded. It is the caller's responsibility to ensure that the response does not consume an excess of resources + * while it's being sent. + *

+ * The caller must eventually call {@link StreamingXContentResponse#close} to finish the transmission of the response. + */ +public final class StreamingXContentResponse implements Releasable { + + /** + * The underlying stream that collects the raw bytes to be transmitted. Mutable, because we collect the contents of each chunk in a + * distinct stream that is held in this field while that chunk is under construction. + */ + @Nullable // if there's no chunk under construction + private BytesStream targetStream; + + private final XContentBuilder xContentBuilder; + + private final RestChannel restChannel; + private final ToXContent.Params params; + private final Releasable onCompletion; + + /** + * A listener for the next fragment to become available for transmission after a pause. Completed with the newly-created unique active + * {@link AvailableFragmentsResponseBodyPart} within {@link #writeFragment}, and subscribed to via + * {@link AvailableFragmentsResponseBodyPart#getNextPart} when the current {@link AvailableFragmentsResponseBodyPart} + * becomes inactive because of a transmission pause. + */ + @Nullable // if the first fragment hasn't been sent yet + private SubscribableListener nextAvailableFragmentListener; + + /** + * A resource to be released when the transmission of the current fragment is complete. Note that we may complete the transmission of + * multiple fragments at the same time, if they are all processed by one call to {@link AvailableFragmentsResponseBodyPart#encodeChunk} + * and transmitted together. + */ + @Nullable // if not currently sending a fragment + private Releasable currentFragmentReleasable; + + /** + * @param restChannel The {@link RestChannel} on which to send the response. + * @param params The {@link ToXContent.Params} to control the serialization. + * @param onCompletion A resource which is released when the transmission is complete. + */ + public StreamingXContentResponse(RestChannel restChannel, ToXContent.Params params, Releasable onCompletion) throws IOException { + this.restChannel = restChannel; + this.params = params; + this.onCompletion = onCompletion; + this.xContentBuilder = restChannel.newBuilder( + restChannel.request().getXContentType(), + null, + true, + Streams.noCloseStream(new OutputStream() { + @Override + public void write(int b) throws IOException { + assert targetStream != null; + targetStream.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + assert targetStream != null; + targetStream.write(b, off, len); + } + }) + ); + } + + /** + * Close this {@link StreamingXContentResponse}, indicating that there will be no more fragments to send. + */ + @Override + public void close() { + writeFragment(p -> NO_MORE_FRAGMENTS, () -> { + if (isRestResponseFinished.compareAndSet(false, true)) { + queueRefs.decRef(); + } + }); + } + + private Iterator getChunksIterator(StreamingFragment fragment) { + return xContentBuilder.getRestApiVersion() == RestApiVersion.V_7 + ? fragment.fragment().toXContentChunkedV7(params) + : fragment.fragment().toXContentChunked(params); + } + + /** + * Enqueue the given fragment for transmission. + * @param fragment The fragment to send. + * @param releasable A resource which is released when the fragment has been completely processed, i.e. when + *

    + *
  • it has been fully sent, or
  • + *
  • the overall response was cancelled before completion and all resources related to the partial transmission of + * this fragment have been released.
  • + *
+ */ + public void writeFragment(ChunkedToXContent fragment, Releasable releasable) { + if (tryAcquireQueueRef()) { + try { + fragmentQueue.add(new StreamingFragment(fragment, releasable)); + if (queueLength.getAndIncrement() == 0) { + // There is no active AvailableChunksZipResponseBodyPart, but there is now an entry in the queue, so we must create a + // AvailableChunksZipResponseBodyPart to process it (along with any other entries that are concurrently added to the + // queue). It's safe to mutate releasable and continuationListener here because they are only otherwise accessed by an + // active AvailableChunksZipResponseBodyPart (which does not exist) or when all queueRefs have been released (which they + // have not here). + final var nextFragment = fragmentQueue.poll(); + assert nextFragment != null; + final var availableFragments = new AvailableFragmentsResponseBodyPart(getChunksIterator(nextFragment)); + assert currentFragmentReleasable == null; + currentFragmentReleasable = nextFragment.releasable(); + final var currentAvailableFragmentListener = nextAvailableFragmentListener; + nextAvailableFragmentListener = new SubscribableListener<>(); + if (currentAvailableFragmentListener == null) { + // We are not resuming after a pause, this is the first fragment to be sent, so we start the response transmission. + restChannel.sendResponse(RestResponse.chunked(RestStatus.OK, availableFragments, this::restResponseFinished)); + } else { + // We are resuming transmission after a pause, so just carry on sending the response body. + assert currentAvailableFragmentListener.isDone() == false; + currentAvailableFragmentListener.onResponse(availableFragments); + } + } + } finally { + queueRefs.decRef(); + } + } else { + Releasables.closeExpectNoException(releasable); + } + } + + /** + * A fragment which is ready for transmission, to be stored in {@link #fragmentQueue}. + * + * @param fragment The fragment of XContent to send. + * @param releasable A resource to release when this fragment has been fully transmitted, or is no longer required because the + * transmission was cancelled. + */ + private record StreamingFragment(ChunkedToXContent fragment, Releasable releasable) {} + + /** + * Queue of fragments that are ready for transmission. + */ + private final Queue fragmentQueue = new LinkedBlockingQueue<>(); + + /** + * Upper bound on the number of fragments in the queue, atomically modified to ensure there's only one thread processing the queue + * at once. + */ + private final AtomicInteger queueLength = new AtomicInteger(); + + /** + * Ref-counting for access to the queue, to avoid clearing the queue on abort concurrently with a fragment being sent. + */ + private final RefCounted queueRefs = AbstractRefCounted.of(this::drainQueue); + + /** + * Flag to indicate if the request has been aborted, at which point we should stop enqueueing more fragments and promptly clean up the + * ones being sent. It's safe to ignore this, but without it in theory a constant stream of calls to {@link #writeFragment} could + * prevent {@link #drainQueue} from running for arbitrarily long. + */ + private final AtomicBoolean isRestResponseFinished = new AtomicBoolean(); + + private boolean tryAcquireQueueRef() { + return isRestResponseFinished.get() == false && queueRefs.tryIncRef(); + } + + private void restResponseFinished() { + assert Transports.assertTransportThread(); + if (isRestResponseFinished.compareAndSet(false, true)) { + queueRefs.decRef(); + } + } + + private void drainQueue() { + assert isRestResponseFinished.get(); + assert queueRefs.hasReferences() == false; + final var taskCount = queueLength.get() + 2 /* currentFragmentReleasable and onCompletion */ ; + final var releasables = new ArrayList(taskCount); + try { + releasables.add(currentFragmentReleasable); + currentFragmentReleasable = null; + StreamingFragment fragment; + while ((fragment = fragmentQueue.poll()) != null) { + releasables.add(fragment.releasable()); + } + assert fragmentQueue.isEmpty() : fragmentQueue.size(); // no concurrent adds + assert releasables.size() == taskCount - 1 || releasables.size() == taskCount - 2 : taskCount + " vs " + releasables.size(); + } finally { + releasables.add(onCompletion); + Releasables.closeExpectNoException(Releasables.wrap(releasables)); + } + } + + /** + * A {@link ChunkedRestResponseBodyPart} which will yield all currently-available fragments by consuming from {@link #fragmentQueue}. + * There is only ever at most one active instance of this class at any time, in the sense that one such instance becoming inactive + * happens-before the creation of the next instance. One of these parts may send chunks for more than one fragment. + */ + private final class AvailableFragmentsResponseBodyPart implements ChunkedRestResponseBodyPart { + + /** + * An iterator over the chunks of the fragment currently being transmitted. + */ + private Iterator fragmentChunksIterator; + + /** + * True when we have run out of chunks ready for immediate transmission, so the response is paused, but we expect to send more data + * later. + */ + private boolean isResponsePaused; + + /** + * True when we have sent the last chunk of the last fragment, or the response was cancelled. + */ + private boolean isResponseComplete; + + /** + * A listener which is created when there are no more available fragments, so transmission is paused, subscribed to in + * {@link #getNextPart}, and then completed with the next body part (sequence of fragments, i.e. a new (unique) active + * {@link AvailableFragmentsResponseBodyPart}). + */ + private SubscribableListener getNextPartListener; + + /** + * A cache for an empty list to be used to collect the {@code Releasable} instances to be released when the next chunk has been + * fully transmitted. It's a list because a call to {@link #encodeChunk} may yield a chunk that completes several fragments, each of + * which has its own resources to release. We cache this value across chunks because most chunks won't release anything, so we can + * keep the empty list around for later to save on allocations. + */ + private ArrayList nextReleasablesCache = new ArrayList<>(); + + AvailableFragmentsResponseBodyPart(Iterator fragmentChunksIterator) { + this.fragmentChunksIterator = fragmentChunksIterator; + } + + /** + * @return whether this part of the response is complete + */ + @Override + public boolean isPartComplete() { + return isResponsePaused || isResponseComplete; + } + + @Override + public boolean isLastPart() { + return isResponseComplete; + } + + @Override + public void getNextPart(ActionListener listener) { + assert getNextPartListener != null; + getNextPartListener.addListener(listener); + } + + /** + * Transfer {@link #currentFragmentReleasable} into the supplied collection (i.e. add it to {@code releasables} and then clear + * {@link #currentFragmentReleasable}). Called when the last chunk of the current fragment is serialized, so that we + * can start serializing chunks of the next fragment straight away whilst delaying the release of the current fragment's resources + * until the transmission of the chunk that is currently under construction. + */ + private void transferCurrentFragmentReleasable(ArrayList releasables) { + assert queueRefs.hasReferences(); + + if (currentFragmentReleasable == null) { + return; + } + + if (releasables == nextReleasablesCache) { + // adding the first value, so we must line up a new cached value for the next caller + nextReleasablesCache = new ArrayList<>(); + } + + releasables.add(currentFragmentReleasable); + currentFragmentReleasable = null; + } + + @Override + public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) throws IOException { + assert Transports.isTransportThread(Thread.currentThread()); + + final ArrayList releasables = nextReleasablesCache; + assert releasables.isEmpty(); + try { + if (tryAcquireQueueRef()) { + try { + assert queueLength.get() > 0; + // This is the current unique active AvailableFragmentsResponseBodyPart (i.e. queueLength is strictly positive and + // we hold a queueRef), so any concurrent calls to writeFragment() at this point will just add to the queue and + // won't spawn a new AvailableFragmentsResponseBodyPart or mutate any fields. + + final RecyclerBytesStreamOutput chunkStream = new RecyclerBytesStreamOutput(recycler); + assert targetStream == null; + targetStream = chunkStream; + + do { + if (fragmentChunksIterator.hasNext()) { + fragmentChunksIterator.next().toXContent(xContentBuilder, params); + } else { + completeCurrentFragment(releasables); + } + } while (isResponseComplete == false && isResponsePaused == false && chunkStream.size() < sizeHint); + + assert (releasables == nextReleasablesCache) == releasables.isEmpty(); + assert nextReleasablesCache.isEmpty(); + + final Releasable chunkStreamReleasable = () -> Releasables.closeExpectNoException(chunkStream); + final var result = new ReleasableBytesReference( + chunkStream.bytes(), + releasables.isEmpty() + ? chunkStreamReleasable + : Releasables.wrap(Iterators.concat(Iterators.single(chunkStreamReleasable), releasables.iterator())) + ); + targetStream = null; + return result; + } finally { + queueRefs.decRef(); + } + } else { + // request aborted, nothing more to send (queue is being cleared by queueRefs#closeInternal) + isResponseComplete = true; + return new ReleasableBytesReference(BytesArray.EMPTY, () -> {}); + } + } catch (Exception e) { + logger.error("failure encoding chunk", e); + throw e; + } finally { + if (targetStream != null) { + assert false : "failure encoding chunk"; + IOUtils.closeWhileHandlingException(targetStream, Releasables.wrap(releasables)); + targetStream = null; + } + } + } + + private void completeCurrentFragment(ArrayList releasables) throws IOException { + transferCurrentFragmentReleasable(releasables); + final var localNextAvailableFragmentListener = nextAvailableFragmentListener; // read before queue len decr + final var newQueueLength = queueLength.decrementAndGet(); + if (fragmentChunksIterator == NO_MORE_FRAGMENTS) { + // The current fragment is the last-fragment sentinel, so we stop processing the queue completely. Note + // that closing the XContentBuilder here ensures that the response is well-formed - it's up to the + // caller to ensure this, even if errors occur. + xContentBuilder.close(); + isResponseComplete = true; + } else if (newQueueLength == 0) { + // The current fragment is complete, but the next fragment isn't available yet, so we pause + // transmission. This means we are no longer an active AvailableFragmentsResponseBodyPart, so any + // concurrent calls to writeFragment() at this point will now spawn a new + // AvailableFragmentsResponseBodyPart to take our place. + xContentBuilder.flush(); + isResponsePaused = true; + assert getNextPartListener == null; + assert localNextAvailableFragmentListener != null; + // Calling our getNextPart() will eventually yield the next fragment supplied to writeFragment(): + getNextPartListener = localNextAvailableFragmentListener; + } else { + // The current fragment is complete, and the next fragment is already available, so we start sending its + // chunks too. This means we're still the unique active AvailableFragmentsResponseBodyPart. We re-use + // this AvailableFragmentsResponseBodyPart instance rather than creating a new one to avoid unnecessary + // allocations. + + final var nextFragment = fragmentQueue.poll(); + assert nextFragment != null; + currentFragmentReleasable = nextFragment.releasable(); + fragmentChunksIterator = getChunksIterator(nextFragment); + } + } + + @Override + public String getResponseContentTypeString() { + return xContentBuilder.getResponseContentTypeString(); + } + } + + /** + * Sentinel fragment indicating the end of the response. + */ + private static final Iterator NO_MORE_FRAGMENTS = new Iterator<>() { + @Override + public boolean hasNext() { + return false; + } + + @Override + public ToXContent next() { + assert false : "not called"; + return ToXContent.EMPTY; + } + }; +}