diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java index 0158384b47aa4..856d6e294f817 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java @@ -39,11 +39,16 @@ import org.apache.logging.log4j.Level; import org.elasticsearch.ESNetty4IntegTestCase; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionTestUtils; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; @@ -52,6 +57,8 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; import org.elasticsearch.features.NodeFeature; import org.elasticsearch.http.HttpBodyTracer; import org.elasticsearch.http.HttpServerTransport; @@ -72,14 +79,19 @@ import org.elasticsearch.transport.Transports; import org.elasticsearch.transport.netty4.Netty4Utils; +import java.nio.channels.ClosedChannelException; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingDeque; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; -import java.util.function.Function; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.LongSupplier; import java.util.function.Predicate; import java.util.function.Supplier; @@ -88,11 +100,15 @@ import static io.netty.handler.codec.http.HttpHeaderValues.APPLICATION_JSON; import static io.netty.handler.codec.http.HttpMethod.POST; import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; +import static org.hamcrest.Matchers.anEmptyMap; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.lessThanOrEqualTo; @ESIntegTestCase.ClusterScope(numDataNodes = 1) public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase { - private static final int MAX_CONTENT_LENGTH = ByteSizeUnit.MB.toIntBytes(50); + private static final int MAX_CONTENT_LENGTH = ByteSizeUnit.MB.toIntBytes(10); @Override protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { @@ -106,163 +122,169 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { // ensure empty http content has single 0 size chunk public void testEmptyContent() throws Exception { - try (var ctx = setupClientCtx()) { + try (var clientContext = newClientContext()) { var totalRequests = randomIntBetween(1, 10); - for (int reqNo = 0; reqNo < totalRequests; reqNo++) { - var opaqueId = opaqueId(reqNo); + for (int requestIndex = 0; requestIndex < totalRequests; requestIndex++) { + var opaqueId = clientContext.newOpaqueId(); // send request with empty content - ctx.clientChannel.writeAndFlush(fullHttpRequest(opaqueId, Unpooled.EMPTY_BUFFER)); - var handler = ctx.awaitRestChannelAccepted(opaqueId); - handler.stream.next(); + clientContext.channel().writeAndFlush(fullHttpRequest(opaqueId, Unpooled.EMPTY_BUFFER)); + var handler = clientContext.awaitRestChannelAccepted(opaqueId); // should receive a single empty chunk - var recvChunk = safePoll(handler.recvChunks); - assertTrue(recvChunk.isLast); - assertEquals(0, recvChunk.chunk.length()); - recvChunk.chunk.close(); - assertFalse(handler.streamClosed); + try (var chunk = handler.getNextChunk()) { + assertTrue(chunk.isLast()); + assertEquals(0, chunk.length()); + } + assertFalse(handler.isClosed()); // send response to process following request handler.sendResponse(new RestResponse(RestStatus.OK, "")); - assertBusy(() -> assertTrue(handler.streamClosed)); + safeAwait(handler.closedLatch); + } + for (int requestIndex = 0; requestIndex < totalRequests; requestIndex++) { + clientContext.getNextResponse().release(); } - assertBusy(() -> assertEquals("should receive all server responses", totalRequests, ctx.clientRespQueue.size())); } } - // ensures content integrity, no loses and re-order + // ensures content integrity, no losses or re-ordering public void testReceiveAllChunks() throws Exception { - try (var ctx = setupClientCtx()) { + try (var clientContext = newClientContext()) { var totalRequests = randomIntBetween(1, 10); - for (int reqNo = 0; reqNo < totalRequests; reqNo++) { - var opaqueId = opaqueId(reqNo); + for (int requestIndex = 0; requestIndex < totalRequests; requestIndex++) { + var opaqueId = clientContext.newOpaqueId(); // this dataset will be compared with one on server side - var dataSize = randomIntBetween(1024, MAX_CONTENT_LENGTH); + var dataSize = randomIntBetween(1, MAX_CONTENT_LENGTH); var sendData = Unpooled.wrappedBuffer(randomByteArrayOfLength(dataSize)); sendData.retain(); - ctx.clientChannel.writeAndFlush(fullHttpRequest(opaqueId, sendData)); + clientContext.channel().writeAndFlush(fullHttpRequest(opaqueId, sendData)); - var handler = ctx.awaitRestChannelAccepted(opaqueId); + var handler = clientContext.awaitRestChannelAccepted(opaqueId); - var recvData = Unpooled.buffer(dataSize); + var receivedData = Unpooled.buffer(dataSize); while (true) { - handler.stream.next(); - var recvChunk = safePoll(handler.recvChunks); - try (recvChunk.chunk) { - recvData.writeBytes(Netty4Utils.toByteBuf(recvChunk.chunk)); - if (recvChunk.isLast) { + try (var chunk = handler.getNextChunk()) { + receivedData.writeBytes(Netty4Utils.toByteBuf(chunk.data())); + if (chunk.isLast()) { break; } } } - assertFalse(handler.streamClosed); - assertEquals("sent and received payloads are not the same", sendData, recvData); + assertFalse(handler.isClosed()); + assertEquals("sent and received payloads are not the same", sendData, receivedData); handler.sendResponse(new RestResponse(RestStatus.OK, "")); - assertBusy(() -> assertTrue(handler.streamClosed)); + safeAwait(handler.closedLatch); + } + for (int requestIndex = 0; requestIndex < totalRequests; requestIndex++) { + clientContext.getNextResponse().release(); } - assertBusy(() -> assertEquals("should receive all server responses", totalRequests, ctx.clientRespQueue.size())); } } // ensures that all received chunks are released when connection closed and handler notified public void testClientConnectionCloseMidStream() throws Exception { - try (var ctx = setupClientCtx()) { - var opaqueId = opaqueId(0); + try (var clientContext = newClientContext()) { + var opaqueId = clientContext.newOpaqueId(); - // write half of http request - ctx.clientChannel.write(httpRequest(opaqueId, 2 * 1024)); - ctx.clientChannel.writeAndFlush(randomContent(1024, false)); + // write less than the complete request body + final var requestContentLength = between(2, ByteSizeUnit.KB.toIntBytes(10)); + clientContext.channel().write(httpRequest(opaqueId, requestContentLength)); + final var requestTransmittedLength = between(1, requestContentLength - 1); + clientContext.channel().writeAndFlush(randomContent(requestTransmittedLength, false)); // await stream handler is ready and request full content - var handler = ctx.awaitRestChannelAccepted(opaqueId); + var handler = clientContext.awaitRestChannelAccepted(opaqueId); assertBusy(() -> assertNotEquals(0, handler.stream.bufSize())); - assertFalse(handler.streamClosed); + assertFalse(handler.isClosed()); // terminate client connection - ctx.clientChannel.close(); - // read the first half of the request - handler.stream.next(); - // attempt to read more data and it should notice channel being closed eventually - handler.stream.next(); + clientContext.channel().close(); + // read the transmitted bytes and notice the channel being closed + assertEquals(requestTransmittedLength, handler.readUntilClose()); - // wait for resources to be released - assertBusy(() -> { - assertEquals(0, handler.stream.bufSize()); - assertTrue(handler.streamClosed); - }); + assertTrue(handler.isClosed()); + assertEquals(0, handler.stream.bufSize()); } } - // ensures that all recieved chunks are released when server decides to close connection + // ensures that all received chunks are released when server decides to close connection public void testServerCloseConnectionMidStream() throws Exception { - try (var ctx = setupClientCtx()) { - var opaqueId = opaqueId(0); + try (var clientContext = newClientContext()) { + var opaqueId = clientContext.newOpaqueId(); - // write half of http request - ctx.clientChannel.write(httpRequest(opaqueId, 2 * 1024)); - ctx.clientChannel.writeAndFlush(randomContent(1024, false)); + // write less than the complete request body + final var requestContentLength = between(2, ByteSizeUnit.KB.toIntBytes(10)); + clientContext.channel().write(httpRequest(opaqueId, requestContentLength)); + final var requestTransmittedLength = between(1, requestContentLength - 1); + clientContext.channel().writeAndFlush(randomContent(requestTransmittedLength, false)); // await stream handler is ready and request full content - var handler = ctx.awaitRestChannelAccepted(opaqueId); + var handler = clientContext.awaitRestChannelAccepted(opaqueId); assertBusy(() -> assertNotEquals(0, handler.stream.bufSize())); - assertFalse(handler.streamClosed); + assertFalse(handler.isClosed()); // terminate connection on server and wait resources are released + final var exceptionFuture = new PlainActionFuture(); + assertNull(handler.nextChunkListenerRef.getAndSet(ActionTestUtils.assertNoSuccessListener(exceptionFuture::onResponse))); handler.channel.request().getHttpChannel().close(); - assertBusy(() -> { - assertEquals(0, handler.stream.bufSize()); - assertTrue(handler.streamClosed); - }); + assertThat(safeGet(exceptionFuture), instanceOf(ClosedChannelException.class)); + assertTrue(handler.isClosed()); + assertBusy(() -> assertEquals(0, handler.stream.bufSize())); } } public void testServerExceptionMidStream() throws Exception { - try (var ctx = setupClientCtx()) { - var opaqueId = opaqueId(0); + try (var clientContext = newClientContext()) { + var opaqueId = clientContext.newOpaqueId(); - // write half of http request - ctx.clientChannel.write(httpRequest(opaqueId, 2 * 1024)); - ctx.clientChannel.writeAndFlush(randomContent(1024, false)); + // write less than the complete request body + final var requestContentLength = between(2, ByteSizeUnit.KB.toIntBytes(10)); + clientContext.channel().write(httpRequest(opaqueId, requestContentLength)); + final var requestTransmittedLength = between(1, requestContentLength - 1); + clientContext.channel().writeAndFlush(randomContent(requestTransmittedLength, false)); // await stream handler is ready and request full content - var handler = ctx.awaitRestChannelAccepted(opaqueId); + var handler = clientContext.awaitRestChannelAccepted(opaqueId); assertBusy(() -> assertNotEquals(0, handler.stream.bufSize())); - assertFalse(handler.streamClosed); + assertFalse(handler.isClosed()); + // terminate connection on server and wait resources are released + final var exceptionFuture = new PlainActionFuture(); + assertNull(handler.nextChunkListenerRef.getAndSet(ActionTestUtils.assertNoSuccessListener(exceptionFuture::onResponse))); handler.shouldThrowInsideHandleChunk = true; handler.stream.next(); - - assertBusy(() -> { - assertEquals(0, handler.stream.bufSize()); - assertTrue(handler.streamClosed); - }); + // simulated exception inside handleChunk + final var exception = asInstanceOf(RuntimeException.class, safeGet(exceptionFuture)); + assertEquals(ServerRequestHandler.SIMULATED_EXCEPTION_MESSAGE, exception.getMessage()); + safeAwait(handler.closedLatch); + assertBusy(() -> assertEquals(0, handler.stream.bufSize())); } } // ensure that client's socket buffers data when server is not consuming data public void testClientBackpressure() throws Exception { - try (var ctx = setupClientCtx()) { - var opaqueId = opaqueId(0); + try (var clientContext = newClientContext()) { + var opaqueId = clientContext.newOpaqueId(); var payloadSize = MAX_CONTENT_LENGTH; var totalParts = 10; var partSize = payloadSize / totalParts; - ctx.clientChannel.writeAndFlush(httpRequest(opaqueId, payloadSize)); + clientContext.channel().writeAndFlush(httpRequest(opaqueId, payloadSize)); for (int i = 0; i < totalParts; i++) { - ctx.clientChannel.writeAndFlush(randomContent(partSize, false)); + clientContext.channel().writeAndFlush(randomContent(partSize, false)); } assertFalse( "should not flush last content immediately", - ctx.clientChannel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).isDone() + clientContext.channel().writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).isDone() ); - var handler = ctx.awaitRestChannelAccepted(opaqueId); + var handler = clientContext.awaitRestChannelAccepted(opaqueId); // some data flushes from channel into OS buffer and won't be visible here, usually 4-8Mb - var osBufferOffset = MBytes(10); + var osBufferOffset = ByteSizeUnit.MB.toIntBytes(10); // incrementally read data on server side and ensure client side buffer drains accordingly for (int readBytes = 0; readBytes <= payloadSize; readBytes += partSize) { @@ -272,7 +294,7 @@ public void testClientBackpressure() throws Exception { // it might take a few busy-iterations before channel buffer flush to OS // and bytesBeforeWritable will stop changing assertBusy(() -> { - var bufSize = ctx.clientChannel.bytesBeforeWritable(); + var bufSize = clientContext.channel().bytesBeforeWritable(); assertTrue( "client's channel buffer should be in range [" + minBufSize + "," + maxBufSize + "], got " + bufSize, bufSize >= minBufSize && bufSize <= maxBufSize @@ -286,188 +308,182 @@ public void testClientBackpressure() throws Exception { // ensures that server reply 100-continue on acceptable request size public void test100Continue() throws Exception { - try (var ctx = setupClientCtx()) { - for (int reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) { - var id = opaqueId(reqNo); + try (var clientContext = newClientContext()) { + var totalRequests = randomIntBetween(1, 10); + for (int requestIndex = 0; requestIndex < totalRequests; requestIndex++) { + var opaqueId = clientContext.newOpaqueId(); var acceptableContentLength = randomIntBetween(0, MAX_CONTENT_LENGTH); // send request header and await 100-continue - var req = httpRequest(id, acceptableContentLength); - HttpUtil.set100ContinueExpected(req, true); - ctx.clientChannel.writeAndFlush(req); - var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue); - assertEquals(HttpResponseStatus.CONTINUE, resp.status()); - resp.release(); + final var request = httpRequest(opaqueId, acceptableContentLength); + HttpUtil.set100ContinueExpected(request, true); + clientContext.channel().writeAndFlush(request); + final var continueResponse = clientContext.getNextResponse(); + assertEquals(HttpResponseStatus.CONTINUE, continueResponse.status()); + continueResponse.release(); // send content - var content = randomContent(acceptableContentLength, true); - ctx.clientChannel.writeAndFlush(content); + clientContext.channel().writeAndFlush(randomContent(acceptableContentLength, true)); // consume content and reply 200 - var handler = ctx.awaitRestChannelAccepted(id); - var consumed = handler.readAllBytes(); + final var handler = clientContext.awaitRestChannelAccepted(opaqueId); + final var consumed = handler.readAllBytes(); assertEquals(acceptableContentLength, consumed); handler.sendResponse(new RestResponse(RestStatus.OK, "")); - resp = (FullHttpResponse) safePoll(ctx.clientRespQueue); - assertEquals(HttpResponseStatus.OK, resp.status()); - resp.release(); + final var finalResponse = clientContext.getNextResponse(); + assertEquals(HttpResponseStatus.OK, finalResponse.status()); + finalResponse.release(); } } } // ensures that server reply 413-too-large on oversized request with expect-100-continue public void test413TooLargeOnExpect100Continue() throws Exception { - try (var ctx = setupClientCtx()) { - for (int reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) { - var id = opaqueId(reqNo); - var oversized = MAX_CONTENT_LENGTH + 1; + try (var clientContext = newClientContext()) { + var totalRequests = randomIntBetween(1, 10); + for (int requestIndex = 0; requestIndex < totalRequests; requestIndex++) { + var opaqueId = clientContext.newOpaqueId(); // send request header and await 413 too large - var req = httpRequest(id, oversized); - HttpUtil.set100ContinueExpected(req, true); - ctx.clientChannel.writeAndFlush(req); - var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue); - assertEquals(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status()); - resp.release(); + final var request = httpRequest(opaqueId, MAX_CONTENT_LENGTH + 1); + HttpUtil.set100ContinueExpected(request, true); + clientContext.channel().writeAndFlush(request); + final var response = clientContext.getNextResponse(); + assertEquals(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, response.status()); + response.release(); // terminate request - ctx.clientChannel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); + clientContext.channel().writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); } } } // ensures that oversized chunked encoded request has maxContentLength limit and returns 413 public void testOversizedChunkedEncoding() throws Exception { - try (var ctx = setupClientCtx()) { - var id = opaqueId(0); - var contentSize = MAX_CONTENT_LENGTH + 1; - var content = randomByteArrayOfLength(contentSize); - var is = new ByteBufInputStream(Unpooled.wrappedBuffer(content)); - var chunkedIs = new ChunkedStream(is); - var httpChunkedIs = new HttpChunkedInput(chunkedIs, LastHttpContent.EMPTY_LAST_CONTENT); - var req = httpRequest(id, 0); - HttpUtil.setTransferEncodingChunked(req, true); - - ctx.clientChannel.pipeline().addLast(new ChunkedWriteHandler()); - ctx.clientChannel.writeAndFlush(req); - ctx.clientChannel.writeAndFlush(httpChunkedIs); - var handler = ctx.awaitRestChannelAccepted(id); - var consumed = handler.readAllBytes(); - assertTrue(consumed <= MAX_CONTENT_LENGTH); - - var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue); - assertEquals(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status()); - resp.release(); + try (var clientContext = newClientContext(t -> {/* ignore exception from e.g. server closing socket */})) { + var opaqueId = clientContext.newOpaqueId(); + final var requestBodyStream = new HttpChunkedInput( + new ChunkedStream(new ByteBufInputStream(Unpooled.wrappedBuffer(randomByteArrayOfLength(MAX_CONTENT_LENGTH + 1)))), + LastHttpContent.EMPTY_LAST_CONTENT + ); + final var request = httpRequest(opaqueId, 0); + HttpUtil.setTransferEncodingChunked(request, true); + + clientContext.channel().pipeline().addLast(new ChunkedWriteHandler()); + clientContext.channel().writeAndFlush(request); + clientContext.channel().writeAndFlush(requestBodyStream); + var handler = clientContext.awaitRestChannelAccepted(opaqueId); + assertThat(handler.readUntilClose(), lessThanOrEqualTo(MAX_CONTENT_LENGTH)); + + var response = clientContext.getNextResponse(); + assertEquals(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, response.status()); + response.release(); } } - // ensures that we dont leak buffers in stream on 400-bad-request + // ensures that we don't leak buffers in stream on 400-bad-request // some bad requests are dispatched from rest-controller before reaching rest handler // test relies on netty's buffer leak detection public void testBadRequestReleaseQueuedChunks() throws Exception { - try (var ctx = setupClientCtx()) { - for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) { - var id = opaqueId(reqNo); - var contentSize = randomIntBetween(0, MAX_CONTENT_LENGTH); - var req = httpRequest(id, contentSize); - var content = randomContent(contentSize, true); + try (var clientContext = newClientContext()) { + var totalRequests = randomIntBetween(1, 10); + for (int requestIndex = 0; requestIndex < totalRequests; requestIndex++) { + var opaqueId = clientContext.newOpaqueId(); + + final var contentSize = randomIntBetween(0, MAX_CONTENT_LENGTH); + final var request = httpRequest(opaqueId, contentSize); + final var content = randomContent(contentSize, true); // set unacceptable content-type - req.headers().set(CONTENT_TYPE, "unknown"); - ctx.clientChannel.writeAndFlush(req); - ctx.clientChannel.writeAndFlush(content); + request.headers().set(CONTENT_TYPE, "unknown"); + clientContext.channel().writeAndFlush(request); + clientContext.channel().writeAndFlush(content); - var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue); - assertEquals(HttpResponseStatus.BAD_REQUEST, resp.status()); - resp.release(); + final var response = clientContext.getNextResponse(); + assertEquals(HttpResponseStatus.BAD_REQUEST, response.status()); + response.release(); } } } - private static long transportStatsRequestBytesSize(Ctx ctx) { - var httpTransport = internalCluster().getInstance(HttpServerTransport.class, ctx.nodeName); - var stats = httpTransport.stats().clientStats(); - var bytes = 0L; - for (var s : stats) { - bytes += s.requestSizeBytes(); - } - return bytes; - } - /** * ensures that {@link org.elasticsearch.http.HttpClientStatsTracker} counts streamed content bytes */ public void testHttpClientStats() throws Exception { - try (var ctx = setupClientCtx()) { + try (var clientContext = newClientContext()) { // need to offset starting point, since we reuse cluster and other tests already sent some data - var totalBytesSent = transportStatsRequestBytesSize(ctx); + var totalBytesSent = clientContext.transportStatsRequestBytesSize(); - for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) { - var id = opaqueId(reqNo); - var contentSize = randomIntBetween(0, MAX_CONTENT_LENGTH); + final var totalRequests = between(1, 10); + for (var requestIndex = 0; requestIndex < totalRequests; requestIndex++) { + var opaqueId = clientContext.newOpaqueId(); + final var contentSize = randomIntBetween(0, MAX_CONTENT_LENGTH); totalBytesSent += contentSize; - ctx.clientChannel.writeAndFlush(httpRequest(id, contentSize)); - ctx.clientChannel.writeAndFlush(randomContent(contentSize, true)); - var handler = ctx.awaitRestChannelAccepted(id); + clientContext.channel().writeAndFlush(httpRequest(opaqueId, contentSize)); + clientContext.channel().writeAndFlush(randomContent(contentSize, true)); + final var handler = clientContext.awaitRestChannelAccepted(opaqueId); handler.readAllBytes(); handler.sendResponse(new RestResponse(RestStatus.OK, "")); - assertEquals(totalBytesSent, transportStatsRequestBytesSize(ctx)); + assertEquals(totalBytesSent, clientContext.transportStatsRequestBytesSize()); + } + for (int requestIndex = 0; requestIndex < totalRequests; requestIndex++) { + clientContext.getNextResponse().release(); } } } - /** - * ensures that we log parts of http body and final line - */ + // ensures that we log parts of http body and final line @TestLogging( reason = "testing TRACE logging", value = "org.elasticsearch.http.HttpTracer:TRACE,org.elasticsearch.http.HttpBodyTracer:TRACE" ) public void testHttpBodyLogging() throws Exception { - assertHttpBodyLogging((ctx) -> () -> { - try { - var req = fullHttpRequest(opaqueId(0), randomByteBuf(8 * 1024)); - ctx.clientChannel.writeAndFlush(req); - var handler = ctx.awaitRestChannelAccepted(opaqueId(0)); - handler.readAllBytes(); - } catch (Exception e) { - fail(e); - } + assertHttpBodyLogging(clientContext -> { + var opaqueId = clientContext.newOpaqueId(); + clientContext.channel() + .writeAndFlush(fullHttpRequest(opaqueId, Unpooled.wrappedBuffer(randomByteArrayOfLength(ByteSizeUnit.KB.toIntBytes(8))))); + final var handler = clientContext.awaitRestChannelAccepted(opaqueId); + handler.readAllBytes(); }); } - /** - * ensures that we log some parts of body and final line when connection is closed in the middle - */ + // ensures that we log some parts of body and final line when connection is closed in the middle @TestLogging( reason = "testing TRACE logging", value = "org.elasticsearch.http.HttpTracer:TRACE,org.elasticsearch.http.HttpBodyTracer:TRACE" ) public void testHttpBodyLoggingChannelClose() throws Exception { - assertHttpBodyLogging((ctx) -> () -> { - try { - var req = httpRequest(opaqueId(0), 2 * 8192); - var halfContent = randomContent(8192, false); - ctx.clientChannel.writeAndFlush(req); - ctx.clientChannel.writeAndFlush(halfContent); - var handler = ctx.awaitRestChannelAccepted(opaqueId(0)); - handler.readBytes(8192); - ctx.clientChannel.close(); - handler.stream.next(); - assertBusy(() -> assertTrue(handler.streamClosed)); - } catch (Exception e) { - fail(e); - } + assertHttpBodyLogging(clientContext -> { + var opaqueId = clientContext.newOpaqueId(); + + // write less than the complete request body, but still enough to get multiple log lines + final var requestContentLength = between(ByteSizeUnit.KB.toIntBytes(8) + 1, ByteSizeUnit.KB.toIntBytes(16)); + clientContext.channel().write(httpRequest(opaqueId, requestContentLength)); + final var requestTransmittedLength = between(ByteSizeUnit.KB.toIntBytes(8), requestContentLength - 1); + clientContext.channel().writeAndFlush(randomContent(requestTransmittedLength, false)); + + final var handler = clientContext.awaitRestChannelAccepted(opaqueId); + assertEquals(requestTransmittedLength, handler.readBytes(requestTransmittedLength)); + clientContext.channel().close(); + assertEquals(0, handler.readUntilClose()); + assertTrue(handler.isClosed()); }); } // asserts that we emit at least one logging event for a part and last line // http body should be large enough to split across multiple lines, > 4kb - private void assertHttpBodyLogging(Function test) throws Exception { - try (var ctx = setupClientCtx()) { + private void assertHttpBodyLogging(Consumer test) throws Exception { + try (var clientContext = newClientContext()) { + final Runnable testRunnable = () -> { + try { + test.accept(clientContext); + } catch (Exception e) { + fail(e); + } + }; MockLog.assertThatLogger( - test.apply(ctx), + testRunnable, HttpBodyTracer.class, new MockLog.SeenEventExpectation( "request part", @@ -485,43 +501,20 @@ private void assertHttpBodyLogging(Function test) throws Exceptio } } - private String opaqueId(int reqNo) { - return getTestName() + "-" + reqNo; - } - - static int MBytes(int m) { - return m * 1024 * 1024; - } - - static T safePoll(BlockingDeque queue) { - try { - var t = queue.poll(SAFE_AWAIT_TIMEOUT.seconds(), TimeUnit.SECONDS); - assertNotNull("queue is empty", t); - return t; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new AssertionError(e); - } - } - static FullHttpRequest fullHttpRequest(String opaqueId, ByteBuf content) { - var req = new DefaultFullHttpRequest(HTTP_1_1, POST, ControlServerRequestPlugin.ROUTE, Unpooled.wrappedBuffer(content)); - req.headers().add(CONTENT_LENGTH, content.readableBytes()); - req.headers().add(CONTENT_TYPE, APPLICATION_JSON); - req.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId); - return req; + var request = new DefaultFullHttpRequest(HTTP_1_1, POST, ControlServerRequestPlugin.ROUTE, Unpooled.wrappedBuffer(content)); + request.headers().add(CONTENT_LENGTH, content.readableBytes()); + request.headers().add(CONTENT_TYPE, APPLICATION_JSON); + request.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId); + return request; } static HttpRequest httpRequest(String opaqueId, int contentLength) { - return httpRequest(ControlServerRequestPlugin.ROUTE, opaqueId, contentLength); - } - - static HttpRequest httpRequest(String uri, String opaqueId, int contentLength) { - var req = new DefaultHttpRequest(HTTP_1_1, POST, uri); - req.headers().add(CONTENT_LENGTH, contentLength); - req.headers().add(CONTENT_TYPE, APPLICATION_JSON); - req.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId); - return req; + var request = new DefaultHttpRequest(HTTP_1_1, POST, ControlServerRequestPlugin.ROUTE); + request.headers().add(CONTENT_LENGTH, contentLength); + request.headers().add(CONTENT_TYPE, APPLICATION_JSON); + request.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId); + return request; } static HttpContent randomContent(int size, boolean isLast) { @@ -533,22 +526,29 @@ static HttpContent randomContent(int size, boolean isLast) { } } - static ByteBuf randomByteBuf(int size) { - return Unpooled.wrappedBuffer(randomByteArrayOfLength(size)); + @Override + protected Collection> nodePlugins() { + return CollectionUtils.concatLists(List.of(ControlServerRequestPlugin.class), super.nodePlugins()); } - Ctx setupClientCtx() throws Exception { - var nodeName = internalCluster().getRandomNodeName(); - var clientRespQueue = new LinkedBlockingDeque<>(16); - var bootstrap = bootstrapClient(nodeName, clientRespQueue); - var channel = bootstrap.connect().sync().channel(); - return new Ctx(getTestName(), nodeName, bootstrap, channel, clientRespQueue); + @Override + protected boolean addMockHttpTransport() { + return false; // enable http } - Bootstrap bootstrapClient(String node, BlockingQueue queue) { - var httpServer = internalCluster().getInstance(HttpServerTransport.class, node); - var remoteAddr = randomFrom(httpServer.boundAddress().boundAddresses()); - return new Bootstrap().group(new NioEventLoopGroup(1)) + private static final LongSupplier idGenerator = new AtomicLong()::getAndIncrement; + + private ClientContext newClientContext() throws Exception { + return newClientContext(cause -> ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError(cause))); + } + + private ClientContext newClientContext(Consumer exceptionHandler) throws Exception { + var nodeName = internalCluster().getRandomNodeName(); + var clientResponseQueue = new LinkedBlockingDeque(16); + final var httpServerTransport = internalCluster().getInstance(HttpServerTransport.class, nodeName); + var remoteAddr = randomFrom(httpServerTransport.boundAddress().boundAddresses()); + var handlersByOpaqueId = internalCluster().getInstance(HandlersByOpaqueId.class, nodeName); + var bootstrap = new Bootstrap().group(new NioEventLoopGroup(1)) .channel(NioSocketChannel.class) .remoteAddress(remoteAddr.getAddress(), remoteAddr.getPort()) .handler(new ChannelInitializer() { @@ -556,68 +556,110 @@ Bootstrap bootstrapClient(String node, BlockingQueue queue) { protected void initChannel(SocketChannel ch) { var p = ch.pipeline(); p.addLast(new HttpClientCodec()); - p.addLast(new HttpObjectAggregator(4096)); + p.addLast(new HttpObjectAggregator(ByteSizeUnit.KB.toIntBytes(4))); p.addLast(new SimpleChannelInboundHandler() { @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) { msg.retain(); - queue.add(msg); + clientResponseQueue.add(msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - queue.add(cause); + exceptionHandler.accept(cause); } }); } }); + var channel = bootstrap.connect().sync().channel(); + return new ClientContext( + getTestName() + "-" + randomIdentifier() + "-" + idGenerator.getAsLong(), + httpServerTransport, + handlersByOpaqueId, + bootstrap, + channel, + clientResponseQueue + ); } - @Override - protected Collection> nodePlugins() { - return CollectionUtils.concatLists(List.of(ControlServerRequestPlugin.class), super.nodePlugins()); - } - - @Override - protected boolean addMockHttpTransport() { - return false; // enable http - } + /** + * Collects together the objects that make up the client and surrounding context in which each test runs. + */ + static final class ClientContext implements AutoCloseable { + private final String contextName; + private final HttpServerTransport httpServerTransport; + private final HandlersByOpaqueId handlersByOpaqueId; + private final Bootstrap bootstrap; + private final Channel channel; + private final BlockingDeque responseQueue; + + ClientContext( + String contextName, + HttpServerTransport httpServerTransport, + HandlersByOpaqueId handlersByOpaqueId, + Bootstrap bootstrap, + Channel channel, + BlockingDeque responseQueue + ) { + this.contextName = contextName; + this.httpServerTransport = httpServerTransport; + this.handlersByOpaqueId = handlersByOpaqueId; + this.bootstrap = bootstrap; + this.channel = channel; + this.responseQueue = responseQueue; + } - record Ctx(String testName, String nodeName, Bootstrap clientBootstrap, Channel clientChannel, BlockingDeque clientRespQueue) - implements - AutoCloseable { + String newOpaqueId() { + return contextName + "-" + idGenerator.getAsLong(); + } @Override - public void close() throws Exception { - safeGet(clientChannel.close()); - safeGet(clientBootstrap.config().group().shutdownGracefully(0, 0, TimeUnit.SECONDS)); - clientRespQueue.forEach(o -> { if (o instanceof FullHttpResponse resp) resp.release(); }); - for (var opaqueId : ControlServerRequestPlugin.handlers.keySet()) { - if (opaqueId.startsWith(testName)) { - var handler = ControlServerRequestPlugin.handlers.get(opaqueId); - handler.recvChunks.forEach(c -> c.chunk.close()); - handler.channel.request().getHttpChannel().close(); - ControlServerRequestPlugin.handlers.remove(opaqueId); - } + public void close() { + safeGet(channel.close()); + safeGet(bootstrap.config().group().shutdownGracefully(0, 0, TimeUnit.SECONDS)); + assertThat(responseQueue, empty()); + handlersByOpaqueId.removeHandlers(contextName); + } + + FullHttpResponse getNextResponse() { + try { + var response = responseQueue.poll(SAFE_AWAIT_TIMEOUT.seconds(), TimeUnit.SECONDS); + assertNotNull("queue is empty", response); + return response; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); } } - ServerRequestHandler awaitRestChannelAccepted(String opaqueId) throws Exception { - assertBusy(() -> assertTrue(ControlServerRequestPlugin.handlers.containsKey(opaqueId))); - var handler = ControlServerRequestPlugin.handlers.get(opaqueId); + ServerRequestHandler awaitRestChannelAccepted(String opaqueId) { + var handler = safeAwait(handlersByOpaqueId.getHandlerFor(opaqueId)); safeAwait(handler.channelAccepted); return handler; } + + long transportStatsRequestBytesSize() { + var bytes = 0L; + for (final var clientStats : httpServerTransport.stats().clientStats()) { + bytes += clientStats.requestSizeBytes(); + } + return bytes; + } + + Channel channel() { + return channel; + } } + /** A streaming request handler which allows tests to consume exactly the data (bytes or chunks) they expect. */ static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkConsumer { final SubscribableListener channelAccepted = new SubscribableListener<>(); final String opaqueId; - final BlockingDeque recvChunks = new LinkedBlockingDeque<>(); + private final AtomicReference> nextChunkListenerRef = new AtomicReference<>(); final Netty4HttpRequestBodyStream stream; RestChannel channel; - boolean recvLast = false; - volatile boolean streamClosed = false; + boolean receivedLastChunk = false; + final CountDownLatch closedLatch = new CountDownLatch(1); volatile boolean shouldThrowInsideHandleChunk = false; ServerRequestHandler(String opaqueId, Netty4HttpRequestBodyStream stream) { @@ -625,50 +667,83 @@ static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkCon this.stream = stream; } + static final String SIMULATED_EXCEPTION_MESSAGE = "simulated exception inside handleChunk"; + + boolean isClosed() { + return closedLatch.getCount() == 0; + } + @Override public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) { Transports.assertTransportThread(); + assertFalse("should not get any chunks after close", isClosed()); + final var nextChunkListener = nextChunkListenerRef.getAndSet(null); + assertNotNull("next chunk must be explicitly requested", nextChunkListener); if (shouldThrowInsideHandleChunk) { // Must close the chunk. This is the contract of this method. chunk.close(); - throw new RuntimeException("simulated exception inside handleChunk"); + final var exception = new RuntimeException(SIMULATED_EXCEPTION_MESSAGE); + nextChunkListener.onFailure(exception); + throw exception; } - recvChunks.add(new Chunk(chunk, isLast)); + nextChunkListener.onResponse(new Chunk(chunk, isLast)); } @Override - public void accept(RestChannel channel) throws Exception { + public void accept(RestChannel channel) { this.channel = channel; channelAccepted.onResponse(null); } @Override public void streamClose() { - streamClosed = true; + Transports.assertTransportThread(); + closedLatch.countDown(); + final var nextChunkListener = nextChunkListenerRef.getAndSet(null); + if (nextChunkListener != null) { + // might get a chunk and then a close in one read event, in which case the chunk consumes the listener + nextChunkListener.onFailure(new ClosedChannelException()); + } } void sendResponse(RestResponse response) { channel.sendResponse(response); } + Chunk getNextChunk() throws Exception { + final var exception = new AtomicReference(); + final var future = new PlainActionFuture(); + assertTrue(nextChunkListenerRef.compareAndSet(null, ActionListener.assertOnce(future.delegateResponse((l, e) -> { + assertTrue(exception.compareAndSet(null, e)); + l.onResponse(null); + })))); + if (isClosed()) { + // check streamClosed _after_ registering listener ref + nextChunkListenerRef.set(null); + if (randomBoolean()) { + stream.next(); // shouldn't do anything after close anyway + } + throw new ClosedChannelException(); + } + stream.next(); + final var chunk = safeGet(future); + if (exception.get() != null) { + throw exception.get(); + } + return chunk; + } + int readBytes(int bytes) { var consumed = 0; - if (recvLast == false) { - stream.next(); - while (consumed < bytes && streamClosed == false) { - try { - var recvChunk = recvChunks.poll(10, TimeUnit.MILLISECONDS); - if (recvChunk != null) { - consumed += recvChunk.chunk.length(); - recvChunk.chunk.close(); - if (recvChunk.isLast) { - recvLast = true; - break; - } - stream.next(); + if (receivedLastChunk == false) { + while ((bytes == -1 || consumed < bytes) && isClosed() == false) { + try (var chunk = getNextChunk()) { + consumed = Math.addExact(consumed, chunk.length()); + if (chunk.isLast()) { + receivedLastChunk = true; + break; } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + } catch (Exception e) { throw new AssertionError(e); } } @@ -677,18 +752,77 @@ int readBytes(int bytes) { } int readAllBytes() { - return readBytes(Integer.MAX_VALUE); + return readBytes(-1); + } + + int readUntilClose() { + int handledBytes = 0; + while (true) { + try { + try (var chunk = getNextChunk()) { + handledBytes = Math.addExact(handledBytes, chunk.length()); + } + } catch (ClosedChannelException e) { + return handledBytes; + } catch (Exception e) { + throw new AssertionError(e); + } + } } - record Chunk(ReleasableBytesReference chunk, boolean isLast) {} + record Chunk(ReleasableBytesReference data, boolean isLast) implements Releasable { + @Override + public void close() { + Releasables.closeExpectNoException(data); + } + + public int length() { + return data.length(); + } + } } - // takes full control of rest handler from the outside + /** Collection of ServerRequestHandler instances keyed by the X-Opaque-Id header of the request that created it, on each node */ + public static class HandlersByOpaqueId extends AbstractLifecycleComponent { + private final Map> handlers = new ConcurrentHashMap<>(); + + SubscribableListener getHandlerFor(String opaqueId) { + return handlers.computeIfAbsent(opaqueId, ignored -> new SubscribableListener<>()); + } + + void removeHandlers(String contextName) { + for (var opaqueId : handlers.keySet()) { + if (opaqueId.startsWith(contextName + "-")) { + var handler = safeAwait(handlers.get(opaqueId)); + handler.channel.request().getHttpChannel().close(); + handlers.remove(opaqueId); + } + } + } + + @Override + protected void doStart() {} + + @Override + protected void doStop() { + assertThat(handlers, anEmptyMap()); // should not have leaked any handlers at node shutdown + } + + @Override + protected void doClose() {} + } + + /** Exposes a RestHandler over which the test has full control */ public static class ControlServerRequestPlugin extends Plugin implements ActionPlugin { static final String ROUTE = "/_test/request-stream"; - static final ConcurrentHashMap handlers = new ConcurrentHashMap<>(); + private final HandlersByOpaqueId handlersByOpaqueId = new HandlersByOpaqueId(); + + @Override + public Collection createComponents(PluginServices services) { + return List.of(handlersByOpaqueId); + } @Override public Collection getRestHandlers( @@ -718,7 +852,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli var stream = (Netty4HttpRequestBodyStream) request.contentStream(); var opaqueId = request.getHeaders().get(Task.X_OPAQUE_ID_HTTP_HEADER).get(0); var handler = new ServerRequestHandler(opaqueId, stream); - handlers.put(opaqueId, handler); + handlersByOpaqueId.getHandlerFor(opaqueId).onResponse(handler); return handler; } });