Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
Expand All @@ -63,6 +65,7 @@
import org.elasticsearch.http.HttpBodyTracer;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.HttpTransportSettings;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.BaseRestHandler;
Expand All @@ -73,13 +76,17 @@
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.rest.ObjectPath;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.transport.netty4.Netty4Utils;
import org.elasticsearch.xcontent.json.JsonXContent;

import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -102,6 +109,7 @@
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

Expand Down Expand Up @@ -361,7 +369,12 @@ public void test413TooLargeOnExpect100Continue() throws Exception {

// ensures that oversized chunked encoded request has maxContentLength limit and returns 413
public void testOversizedChunkedEncoding() throws Exception {
try (var clientContext = newClientContext(t -> {/* ignore exception from e.g. server closing socket */})) {
try (
var clientContext = newClientContext(
internalCluster().getRandomNodeName(),
t -> {/* ignore exception from e.g. server closing socket */}
)
) {
var opaqueId = clientContext.newOpaqueId();
final var requestBodyStream = new HttpChunkedInput(
new ChunkedStream(new ByteBufInputStream(Unpooled.wrappedBuffer(randomByteArrayOfLength(MAX_CONTENT_LENGTH + 1)))),
Expand Down Expand Up @@ -501,6 +514,72 @@ private void assertHttpBodyLogging(Consumer<ClientContext> test) throws Exceptio
}
}

public void testBulkIndexingRequestSplitting() throws Exception {
final var watermarkBytes = between(100, 200);
final var tinyNode = internalCluster().startCoordinatingOnlyNode(
Settings.builder()
.put(IndexingPressure.SPLIT_BULK_LOW_WATERMARK.getKey(), ByteSizeValue.ofBytes(watermarkBytes))
.put(IndexingPressure.SPLIT_BULK_LOW_WATERMARK_SIZE.getKey(), ByteSizeValue.ofBytes(watermarkBytes))
.build()
);

try (var clientContext = newClientContext(tinyNode, cause -> ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError(cause)))) {
final var request = new DefaultHttpRequest(HTTP_1_1, POST, "/_bulk");
request.headers().add(CONTENT_TYPE, APPLICATION_JSON);
HttpUtil.setTransferEncodingChunked(request, true);

final var channel = clientContext.channel();
channel.writeAndFlush(request);

final var indexName = randomIdentifier();
final var indexCreatedListener = ClusterServiceUtils.addTemporaryStateListener(
cs -> Iterators.filter(
cs.metadata().indicesAllProjects().iterator(),
indexMetadata -> indexMetadata.getIndex().getName().equals(indexName)
).hasNext()
);

indexCreatedListener.addListener(ActionListener.running(() -> logger.info("--> index created")));

final var valueLength = between(10, 30);
final var docSizeBytes = "{'field':''}".length() + valueLength;
final var itemCount = between(watermarkBytes / docSizeBytes + 1, 300); // enough to split at least once
assertThat(itemCount * docSizeBytes, greaterThan(watermarkBytes));
for (int i = 0; i < itemCount; i++) {
channel.write(new DefaultHttpContent(Unpooled.wrappedBuffer(Strings.format("""
{"index":{"_index":"%s"}}
{"field":"%s"}
""", indexName, randomAlphaOfLength(valueLength)).getBytes(StandardCharsets.UTF_8))));
}

channel.flush();
safeAwait(indexCreatedListener); // index must be created before we finish sending the request

channel.writeAndFlush(new DefaultLastHttpContent());
final var response = clientContext.getNextResponse();
try {
assertEquals(RestStatus.OK.getStatus(), response.status().code());
final ObjectPath responseBody;
final var copy = response.content().copy(); // Netty4Utils doesn't handle direct buffers, so copy to heap first
try {
responseBody = ObjectPath.createFromXContent(JsonXContent.jsonXContent, Netty4Utils.toBytesReference(copy));
} finally {
copy.release();
}
assertFalse(responseBody.evaluate("errors"));
assertEquals(itemCount, responseBody.evaluateArraySize("items"));
for (int i = 0; i < itemCount; i++) {
assertEquals(
RestStatus.CREATED.getStatus(),
(int) asInstanceOf(int.class, responseBody.evaluateExact("items", Integer.toString(i), "index", "status"))
);
}
} finally {
response.release();
}
}
}

static FullHttpRequest fullHttpRequest(String opaqueId, ByteBuf content) {
var request = new DefaultFullHttpRequest(HTTP_1_1, POST, ControlServerRequestPlugin.ROUTE, Unpooled.wrappedBuffer(content));
request.headers().add(CONTENT_LENGTH, content.readableBytes());
Expand Down Expand Up @@ -539,11 +618,13 @@ protected boolean addMockHttpTransport() {
private static final LongSupplier idGenerator = new AtomicLong()::getAndIncrement;

private ClientContext newClientContext() throws Exception {
return newClientContext(cause -> ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError(cause)));
return newClientContext(
internalCluster().getRandomNodeName(),
cause -> ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError(cause))
);
}

private ClientContext newClientContext(Consumer<Throwable> exceptionHandler) throws Exception {
var nodeName = internalCluster().getRandomNodeName();
private ClientContext newClientContext(String nodeName, Consumer<Throwable> exceptionHandler) throws Exception {
var clientResponseQueue = new LinkedBlockingDeque<FullHttpResponse>(16);
final var httpServerTransport = internalCluster().getInstance(HttpServerTransport.class, nodeName);
var remoteAddr = randomFrom(httpServerTransport.boundAddress().boundAddresses());
Expand All @@ -556,7 +637,7 @@ private ClientContext newClientContext(Consumer<Throwable> exceptionHandler) thr
protected void initChannel(SocketChannel ch) {
var p = ch.pipeline();
p.addLast(new HttpClientCodec());
p.addLast(new HttpObjectAggregator(ByteSizeUnit.KB.toIntBytes(4)));
p.addLast(new HttpObjectAggregator(ByteSizeUnit.MB.toIntBytes(4)));
p.addLast(new SimpleChannelInboundHandler<FullHttpResponse>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) {
Expand Down