Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/127817.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 127817
summary: Replace auto-read with proper flow-control in HTTP pipeline
area: Network
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ public void testClientConnectionCloseMidStream() throws Exception {

// await stream handler is ready and request full content
var handler = clientContext.awaitRestChannelAccepted(opaqueId);
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));

assertFalse(handler.isClosed());

Expand All @@ -218,7 +217,6 @@ public void testClientConnectionCloseMidStream() throws Exception {
assertEquals(requestTransmittedLength, handler.readUntilClose());

assertTrue(handler.isClosed());
assertEquals(0, handler.stream.bufSize());
}
}

Expand All @@ -235,7 +233,6 @@ public void testServerCloseConnectionMidStream() throws Exception {

// await stream handler is ready and request full content
var handler = clientContext.awaitRestChannelAccepted(opaqueId);
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));
assertFalse(handler.isClosed());

// terminate connection on server and wait resources are released
Expand All @@ -244,7 +241,6 @@ public void testServerCloseConnectionMidStream() throws Exception {
handler.channel.request().getHttpChannel().close();
assertThat(safeGet(exceptionFuture), instanceOf(ClosedChannelException.class));
assertTrue(handler.isClosed());
assertBusy(() -> assertEquals(0, handler.stream.bufSize()));
}
}

Expand All @@ -260,7 +256,6 @@ public void testServerExceptionMidStream() throws Exception {

// await stream handler is ready and request full content
var handler = clientContext.awaitRestChannelAccepted(opaqueId);
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));
assertFalse(handler.isClosed());

// terminate connection on server and wait resources are released
Expand All @@ -272,7 +267,6 @@ public void testServerExceptionMidStream() throws Exception {
final var exception = asInstanceOf(RuntimeException.class, safeGet(exceptionFuture));
assertEquals(ServerRequestHandler.SIMULATED_EXCEPTION_MESSAGE, exception.getMessage());
safeAwait(handler.closedLatch);
assertBusy(() -> assertEquals(0, handler.stream.bufSize()));
}
}

Expand Down Expand Up @@ -313,7 +307,7 @@ public void testClientBackpressure() throws Exception {
});
handler.readBytes(partSize);
}
assertTrue(handler.stream.hasLast());
assertTrue(handler.receivedLastChunk);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.http.netty4;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.ScheduledFuture;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.time.TimeProvider;
import org.elasticsearch.common.util.concurrent.FutureUtils;

import java.util.concurrent.TimeUnit;

/**
* When channel auto-read is disabled handlers are responsible to read from channel.
* But it's hard to detect when read is missing. This helper class print warnings
* when no reads where detected in given time interval. Normally, in tests, 10 seconds is enough
* to avoid test hang for too long, but can be increased if needed.
*/
class MissingReadDetector extends ChannelDuplexHandler {

private static final Logger logger = LogManager.getLogger(MissingReadDetector.class);

private final long interval;
private final TimeProvider timer;
private boolean pendingRead;
private long lastRead;
private ScheduledFuture<?> checker;

MissingReadDetector(TimeProvider timer, long missingReadIntervalMillis) {
this.interval = missingReadIntervalMillis;
this.timer = timer;
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
checker = ctx.channel().eventLoop().scheduleAtFixedRate(() -> {
if (pendingRead == false) {
long now = timer.absoluteTimeInMillis();
if (now >= lastRead + interval) {
logger.warn("chan-id={} haven't read from channel for [{}ms]", ctx.channel().id(), (now - lastRead));
}
}
}, interval, interval, TimeUnit.MILLISECONDS);
super.handlerAdded(ctx);
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
if (checker != null) {
FutureUtils.cancel(checker);
}
super.handlerRemoved(ctx);
}

@Override
public void read(ChannelHandlerContext ctx) throws Exception {
pendingRead = true;
ctx.read();
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
assert ctx.channel().config().isAutoRead() == false : "auto-read must be always disabled";
pendingRead = false;
lastRead = timer.absoluteTimeInMillis();
ctx.fireChannelRead(msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.LastHttpContent;

import org.elasticsearch.http.HttpPreRequest;
import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils;
Expand Down Expand Up @@ -48,6 +49,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
if (aggregating || msg instanceof FullHttpRequest) {
super.channelRead(ctx, msg);
if (msg instanceof LastHttpContent == false) {
ctx.read(); // HttpObjectAggregator is tricky with auto-read off, it might not call read again, calling on its behalf
}
} else {
streamContentSizeHandler.channelRead(ctx, msg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) {
isContinueExpected = true;
} else {
ctx.writeAndFlush(EXPECTATION_FAILED_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE);
ctx.read();
return;
}
}
Expand All @@ -136,6 +137,7 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) {
decoder.reset();
}
ctx.writeAndFlush(TOO_LARGE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
ctx.read();
} else {
ignoreContent = false;
currentContentLength = 0;
Expand All @@ -150,11 +152,13 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) {
private void handleContent(ChannelHandlerContext ctx, HttpContent msg) {
if (ignoreContent) {
msg.release();
ctx.read();
} else {
currentContentLength += msg.content().readableBytes();
if (currentContentLength > maxContentLength) {
msg.release();
ctx.writeAndFlush(TOO_LARGE_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE);
ctx.read();
} else {
ctx.fireChannelRead(msg);
}
Expand Down
Loading
Loading