Skip to content
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/117787.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 117787
summary: "Move HTTP content aggregation from Netty to RestController"
area: Network
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,10 @@ public void testEmptyContent() throws Exception {
assertTrue(recvChunk.isLast);
assertEquals(0, recvChunk.chunk.length());
recvChunk.chunk.close();
assertFalse(handler.streamClosed);
assertBusy(() -> assertTrue(handler.streamClosed));

// send response to process following request
handler.sendResponse(new RestResponse(RestStatus.OK, ""));
assertBusy(() -> assertTrue(handler.streamClosed));
}
assertBusy(() -> assertEquals("should receive all server responses", totalRequests, ctx.clientRespQueue.size()));
}
Expand Down Expand Up @@ -154,10 +153,9 @@ public void testReceiveAllChunks() throws Exception {
}
}

assertFalse(handler.streamClosed);
assertBusy(() -> assertTrue(handler.streamClosed));
assertEquals("sent and received payloads are not the same", sendData, recvData);
handler.sendResponse(new RestResponse(RestStatus.OK, ""));
assertBusy(() -> assertTrue(handler.streamClosed));
}
assertBusy(() -> assertEquals("should receive all server responses", totalRequests, ctx.clientRespQueue.size()));
}
Expand Down Expand Up @@ -327,38 +325,35 @@ public void test413TooLargeOnExpect100Continue() throws Exception {
assertEquals(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status());
resp.release();

// terminate request
// HttpRequestEncoder should properly close request, not required on server side
ctx.clientChannel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
}
}
}

// ensures that oversized chunked encoded request has no limits at http layer
// rest handler is responsible for oversized requests
public void testOversizedChunkedEncodingNoLimits() throws Exception {
// ensures that oversized chunked encoded request has limits at http layer
// and closes connection after reaching limit
public void testOversizedChunkedEncodingLimits() throws Exception {
try (var ctx = setupClientCtx()) {
for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
var id = opaqueId(reqNo);
var contentSize = maxContentLength() + 1;
var content = randomByteArrayOfLength(contentSize);
var is = new ByteBufInputStream(Unpooled.wrappedBuffer(content));
var chunkedIs = new ChunkedStream(is);
var httpChunkedIs = new HttpChunkedInput(chunkedIs, LastHttpContent.EMPTY_LAST_CONTENT);
var req = httpRequest(id, 0);
HttpUtil.setTransferEncodingChunked(req, true);

ctx.clientChannel.pipeline().addLast(new ChunkedWriteHandler());
ctx.clientChannel.writeAndFlush(req);
ctx.clientChannel.writeAndFlush(httpChunkedIs);
var handler = ctx.awaitRestChannelAccepted(id);
var consumed = handler.readAllBytes();
assertEquals(contentSize, consumed);
handler.sendResponse(new RestResponse(RestStatus.OK, ""));

var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue);
assertEquals(HttpResponseStatus.OK, resp.status());
resp.release();
}
var id = opaqueId(0);
var contentSize = maxContentLength() + 1;
var content = randomByteArrayOfLength(contentSize);
var is = new ByteBufInputStream(Unpooled.wrappedBuffer(content));
var chunkedIs = new ChunkedStream(is);
var httpChunkedIs = new HttpChunkedInput(chunkedIs, LastHttpContent.EMPTY_LAST_CONTENT);
var req = httpRequest(id, 0);
HttpUtil.setTransferEncodingChunked(req, true);

ctx.clientChannel.pipeline().addLast(new ChunkedWriteHandler());
ctx.clientChannel.writeAndFlush(req);
ctx.clientChannel.writeAndFlush(httpChunkedIs);
var handler = ctx.awaitRestChannelAccepted(id);
handler.readAllBytes();

var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue);
assertEquals(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status());
safeGet(ctx.clientChannel.closeFuture());
resp.release();
}
}

Expand Down Expand Up @@ -594,7 +589,7 @@ record Ctx(String testName, String nodeName, Bootstrap clientBootstrap, Channel
@Override
public void close() throws Exception {
safeGet(clientChannel.close());
safeGet(clientBootstrap.config().group().shutdownGracefully());
safeGet(clientBootstrap.config().group().shutdownGracefully(0, 0, TimeUnit.SECONDS));
clientRespQueue.forEach(o -> { if (o instanceof FullHttpResponse resp) resp.release(); });
for (var opaqueId : ControlServerRequestPlugin.handlers.keySet()) {
if (opaqueId.startsWith(testName)) {
Expand Down Expand Up @@ -655,24 +650,27 @@ void sendResponse(RestResponse response) {
channel.sendResponse(response);
}

int readBytes(int bytes) {
int readBytes(int bytes) throws InterruptedException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make this method interrupt-safe rather than pushing that responsibility up to the caller? I.e. catch InterruptedException, reinstate the thread's interrupt status flag, and throw an AssertionError since we do not expect to be interrupted in these tests.

var consumed = 0;
if (recvLast == false) {
while (consumed < bytes) {
stream.next();
var recvChunk = safePoll(recvChunks);
consumed += recvChunk.chunk.length();
recvChunk.chunk.close();
if (recvChunk.isLast) {
recvLast = true;
break;
stream.next();
while (consumed < bytes && streamClosed == false) {
var recvChunk = recvChunks.poll(10, TimeUnit.MILLISECONDS);
if (recvChunk != null) {
consumed += recvChunk.chunk.length();
recvChunk.chunk.close();
if (recvChunk.isLast) {
recvLast = true;
break;
}
stream.next();
}
}
}
return consumed;
}

int readAllBytes() {
int readAllBytes() throws InterruptedException {
return readBytes(Integer.MAX_VALUE);
}

Expand Down Expand Up @@ -704,6 +702,11 @@ public String getName() {
return ROUTE;
}

@Override
public boolean supportContentStream() {
return true;
}

@Override
public List<Route> routes() {
return List.of(new Route(RestRequest.Method.POST, ROUTE));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.http.netty4;

import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.util.AttributeKey;

import java.util.BitSet;

/**
* AutoReadSync provides coordinated access to the {@link ChannelConfig#setAutoRead(boolean)}.
* We use autoRead flag for the data flow control in the channel pipeline to prevent excessive
* buffering inside channel handlers. Every actor in the pipeline should obtain its own {@link Handle}
* by calling {@link AutoReadSync#getHandle} channel. Channel autoRead is enabled as long as all Handles
* are enabled. If one of handles disables autoRead, channel autoRead disables too.
* Simply, {@code channel.setAutoRead(allHandlesTrue)}.
* <br><br>
* TODO: this flow control should be removed when {@link Netty4HttpHeaderValidator} moves to RestController.
* And whole control flow can be simplified to {@link io.netty.handler.flow.FlowControlHandler}.
*/
class AutoReadSync {

private static final AttributeKey<AutoReadSync> AUTO_READ_SYNC_KEY = AttributeKey.valueOf("AutoReadSync");
private final Channel channel;
private final ChannelConfig config;

// A pool of reusable handles and their states. Handle id is a sequence number in the set.
// Handles bitset is a pool of ids. Toggles bitset is a set of autoRead states.
// Default value for toggle is 0, which means autoRead is enabled.
private final BitSet handles;
private final BitSet toggles;

AutoReadSync(Channel channel) {
this.channel = channel;
this.config = channel.config();
this.handles = new BitSet();
this.toggles = new BitSet();
}

static Handle getHandle(Channel channel) {
assert channel.eventLoop().inEventLoop();
var autoRead = channel.attr(AUTO_READ_SYNC_KEY).get();
if (autoRead == null) {
autoRead = new AutoReadSync(channel);
channel.attr(AUTO_READ_SYNC_KEY).set(autoRead);
}
return autoRead.getHandle();
}

Handle getHandle() {
var handleId = handles.nextClearBit(0); // next unused handle id
handles.set(handleId, true); // acquire lease
return new Handle(handleId);
}

class Handle {
private final int id;
private boolean released;

Handle(int id) {
this.id = id;
}

private void assertState() {
assert channel.eventLoop().inEventLoop();
assert released == false;
}

boolean isEnabled() {
assertState();
return toggles.get(id) == false;
}

void enable() {
assertState();
toggles.set(id, false);
config.setAutoRead(toggles.isEmpty());
}

void disable() {
assertState();
toggles.set(id, true);
config.setAutoRead(false);
}

void release() {
assertState();
enable();
handles.set(id, false);
released = true;
}
}

}

This file was deleted.

Loading