-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Add flow-control and remove auto-read in netty4 HTTP pipeline #126441
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3fe635b
a89fec2
d83f45b
8170e19
b749dd3
3be681b
f8d7fb1
e9df377
fea33ed
d8c89c2
c90d4d7
55681ed
2a6a792
294002e
34f5884
c849a98
9edc660
d6464a4
6b3083e
833884b
33ec141
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| pr: 126441 | ||
| summary: Add flow-control and remove auto-read in netty4 http pipeline | ||
| area: Network | ||
| type: enhancement | ||
| issues: [] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the "Elastic License | ||
| * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
| * Public License v 1"; you may not use this file except in compliance with, at | ||
| * your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
| * License v3.0 only", or the "Server Side Public License, v 1". | ||
| */ | ||
|
|
||
| package org.elasticsearch.http.netty4; | ||
|
|
||
| import io.netty.channel.ChannelDuplexHandler; | ||
| import io.netty.channel.ChannelHandlerContext; | ||
| import io.netty.util.concurrent.ScheduledFuture; | ||
|
|
||
| import org.apache.logging.log4j.LogManager; | ||
| import org.apache.logging.log4j.Logger; | ||
| import org.elasticsearch.common.time.TimeProvider; | ||
| import org.elasticsearch.common.util.concurrent.FutureUtils; | ||
|
|
||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| /** | ||
| * When channel auto-read is disabled handlers are responsible to read from channel. | ||
| * But it's hard to detect when read is missing. This helper class print warnings | ||
| * when no reads where detected in given time interval. Normally, in tests, 10 seconds is enough | ||
| * to avoid test hang for too long, but can be increased if needed. | ||
| */ | ||
| class MissingReadDetector extends ChannelDuplexHandler { | ||
|
|
||
| private static final Logger logger = LogManager.getLogger(MissingReadDetector.class); | ||
|
|
||
| private final long interval; | ||
| private final TimeProvider timer; | ||
| private boolean pendingRead; | ||
| private long lastRead; | ||
| private ScheduledFuture<?> checker; | ||
|
|
||
| MissingReadDetector(TimeProvider timer, long missingReadIntervalMillis) { | ||
| this.interval = missingReadIntervalMillis; | ||
| this.timer = timer; | ||
| } | ||
|
|
||
| @Override | ||
| public void handlerAdded(ChannelHandlerContext ctx) throws Exception { | ||
| checker = ctx.channel().eventLoop().scheduleAtFixedRate(() -> { | ||
| if (pendingRead == false) { | ||
| long now = timer.absoluteTimeInMillis(); | ||
| if (now >= lastRead + interval) { | ||
| logger.warn("chan-id={} haven't read from channel for [{}ms]", ctx.channel().id(), (now - lastRead)); | ||
| } | ||
| } | ||
| }, interval, interval, TimeUnit.MILLISECONDS); | ||
| super.handlerAdded(ctx); | ||
| } | ||
|
|
||
| @Override | ||
| public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { | ||
| if (checker != null) { | ||
| FutureUtils.cancel(checker); | ||
| } | ||
| super.handlerRemoved(ctx); | ||
| } | ||
|
|
||
| @Override | ||
| public void read(ChannelHandlerContext ctx) throws Exception { | ||
| pendingRead = true; | ||
| ctx.read(); | ||
| } | ||
|
|
||
| @Override | ||
| public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { | ||
| assert ctx.channel().config().isAutoRead() == false : "auto-read must be always disabled"; | ||
| pendingRead = false; | ||
| lastRead = timer.absoluteTimeInMillis(); | ||
| ctx.fireChannelRead(msg); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,7 @@ | |
| import io.netty.handler.codec.http.HttpObjectAggregator; | ||
| import io.netty.handler.codec.http.HttpRequest; | ||
| import io.netty.handler.codec.http.HttpRequestDecoder; | ||
| import io.netty.handler.codec.http.LastHttpContent; | ||
|
|
||
| import org.elasticsearch.http.HttpPreRequest; | ||
| import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils; | ||
|
|
@@ -48,6 +49,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception | |
| } | ||
| if (aggregating || msg instanceof FullHttpRequest) { | ||
| super.channelRead(ctx, msg); | ||
| if (msg instanceof LastHttpContent == false) { | ||
| ctx.read(); // HttpObjectAggregator is tricky with auto-read off, it might not call read again, calling on its behalf | ||
| } | ||
| } else { | ||
| streamContentSizeHandler.channelRead(ctx, msg); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This branch is apparently not covered by the unit test suite, could we add a test that hits it?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, next step is to remove aggregator from netty. There is no incentive to add more tests here, more to remove in coming days. |
||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -123,6 +123,7 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) { | |
| isContinueExpected = true; | ||
| } else { | ||
| ctx.writeAndFlush(EXPECTATION_FAILED_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE); | ||
| ctx.read(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
| return; | ||
| } | ||
| } | ||
|
|
@@ -136,6 +137,7 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) { | |
| decoder.reset(); | ||
| } | ||
| ctx.writeAndFlush(TOO_LARGE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); | ||
| ctx.read(); | ||
| } else { | ||
| ignoreContent = false; | ||
| currentContentLength = 0; | ||
|
|
@@ -150,11 +152,13 @@ private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) { | |
| private void handleContent(ChannelHandlerContext ctx, HttpContent msg) { | ||
| if (ignoreContent) { | ||
| msg.release(); | ||
| ctx.read(); | ||
| } else { | ||
| currentContentLength += msg.content().readableBytes(); | ||
| if (currentContentLength > maxContentLength) { | ||
| msg.release(); | ||
| ctx.writeAndFlush(TOO_LARGE_CLOSE.retainedDuplicate()).addListener(ChannelFutureListener.CLOSE); | ||
| ctx.read(); | ||
| } else { | ||
| ctx.fireChannelRead(msg); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether it is worthwhile to warn faster if the last seen message is not either a
FullHttpRequestorLastHttpContent?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is use-case for this? Not reading from stream at anytime sounds equally bad. Our transport code that missed read at the end of request or stream handler that forgot to read chunk or close stream, both problematic.