Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/127259.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 127259
summary: Replace auto-read with proper flow-control in HTTP pipeline
area: Network
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ public void testClientConnectionCloseMidStream() throws Exception {

// await stream handler is ready and request full content
var handler = clientContext.awaitRestChannelAccepted(opaqueId);
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));

assertFalse(handler.isClosed());

Expand All @@ -207,7 +206,6 @@ public void testClientConnectionCloseMidStream() throws Exception {
assertEquals(requestTransmittedLength, handler.readUntilClose());

assertTrue(handler.isClosed());
assertEquals(0, handler.stream.bufSize());
}
}

Expand All @@ -224,7 +222,6 @@ public void testServerCloseConnectionMidStream() throws Exception {

// await stream handler is ready and request full content
var handler = clientContext.awaitRestChannelAccepted(opaqueId);
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));
assertFalse(handler.isClosed());

// terminate connection on server and wait resources are released
Expand All @@ -233,7 +230,6 @@ public void testServerCloseConnectionMidStream() throws Exception {
handler.channel.request().getHttpChannel().close();
assertThat(safeGet(exceptionFuture), instanceOf(ClosedChannelException.class));
assertTrue(handler.isClosed());
assertBusy(() -> assertEquals(0, handler.stream.bufSize()));
}
}

Expand All @@ -249,7 +245,6 @@ public void testServerExceptionMidStream() throws Exception {

// await stream handler is ready and request full content
var handler = clientContext.awaitRestChannelAccepted(opaqueId);
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));
assertFalse(handler.isClosed());

// terminate connection on server and wait resources are released
Expand All @@ -261,7 +256,6 @@ public void testServerExceptionMidStream() throws Exception {
final var exception = asInstanceOf(RuntimeException.class, safeGet(exceptionFuture));
assertEquals(ServerRequestHandler.SIMULATED_EXCEPTION_MESSAGE, exception.getMessage());
safeAwait(handler.closedLatch);
assertBusy(() -> assertEquals(0, handler.stream.bufSize()));
}
}

Expand Down Expand Up @@ -302,7 +296,7 @@ public void testClientBackpressure() throws Exception {
});
handler.readBytes(partSize);
}
assertTrue(handler.stream.hasLast());
assertTrue(handler.receivedLastChunk);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.http.netty4;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.ScheduledFuture;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.time.TimeProvider;
import org.elasticsearch.common.util.concurrent.FutureUtils;

import java.util.concurrent.TimeUnit;

/**
* When channel auto-read is disabled handlers are responsible to read from channel.
* But it's hard to detect when read is missing. This helper class print warnings
* when no reads where detected in given time interval. Normally, in tests, 10 seconds is enough
* to avoid test hang for too long, but can be increased if needed.
*/
class MissingReadDetector extends ChannelDuplexHandler {

private static final Logger logger = LogManager.getLogger(MissingReadDetector.class);

private final long interval;
private final TimeProvider timer;
private boolean pendingRead;
private long lastRead;
private ScheduledFuture<?> checker;

MissingReadDetector(TimeProvider timer, long missingReadIntervalMillis) {
this.interval = missingReadIntervalMillis;
this.timer = timer;
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
checker = ctx.channel().eventLoop().scheduleAtFixedRate(() -> {
if (pendingRead == false) {
long now = timer.absoluteTimeInMillis();
if (now >= lastRead + interval) {
logger.warn("chan-id={} haven't read from channel for [{}ms]", ctx.channel().id(), (now - lastRead));
}
}
}, interval, interval, TimeUnit.MILLISECONDS);
super.handlerAdded(ctx);
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
if (checker != null) {
FutureUtils.cancel(checker);
}
super.handlerRemoved(ctx);
}

@Override
public void read(ChannelHandlerContext ctx) throws Exception {
pendingRead = true;
ctx.read();
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
assert ctx.channel().config().isAutoRead() == false : "auto-read must be always disabled";
pendingRead = false;
lastRead = timer.absoluteTimeInMillis();
ctx.fireChannelRead(msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.LastHttpContent;

import org.elasticsearch.http.HttpPreRequest;
import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils;
Expand Down Expand Up @@ -48,6 +49,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
if (aggregating || msg instanceof FullHttpRequest) {
super.channelRead(ctx, msg);
if (msg instanceof LastHttpContent == false) {
ctx.read(); // HttpObjectAggregator is tricky with auto-read off, it might not call read again, calling on its behalf
}
} else {
streamContentSizeHandler.channelRead(ctx, msg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) {
isContinueExpected = true;
} else {
ctx.writeAndFlush(EXPECTATION_FAILED_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE);
ctx.read();
return;
}
}
Expand All @@ -136,6 +137,7 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) {
decoder.reset();
}
ctx.writeAndFlush(TOO_LARGE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
ctx.read();
} else {
ignoreContent = false;
currentContentLength = 0;
Expand All @@ -150,11 +152,13 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) {
private void handleContent(ChannelHandlerContext ctx, HttpContent msg) {
if (ignoreContent) {
msg.release();
ctx.read();
} else {
currentContentLength += msg.content().readableBytes();
if (currentContentLength > maxContentLength) {
msg.release();
ctx.writeAndFlush(TOO_LARGE_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE);
ctx.read();
} else {
ctx.fireChannelRead(msg);
}
Expand Down
Loading
Loading