- 
                Notifications
    
You must be signed in to change notification settings  - Fork 25.6k
 
Replace auto-read with proper flow-control in HTTP pipeline #127817
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace auto-read with proper flow-control in HTTP pipeline #127817
Conversation
Re-applying elastic#126441 (cf. elastic#127259) with: - the extra `FlowControlHandler` needed to ensure one-chunk-per-read semantics (also present in elastic#127259). - no extra `read()` after exhausting a `Netty4HttpRequestBodyStream` (the bug behind elastic#127391 and elastic#127391). See elastic#127111 for related tests.
| 
           Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination)  | 
    
| 
           Hi @DaveCTurner, I've created a changelog YAML for you.  | 
    
| 
           The diff over #127259 is the following: diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java
index b4396569ae35..f7c9d1404c61 100644
--- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java
+++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java
@@ -31,6 +31,7 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
     private final ThreadContext threadContext;
     private final ChannelHandlerContext ctx;
     private boolean closing = false;
+    private boolean readLastChunk = false;
     private HttpBody.ChunkHandler handler;
     private ThreadContext.StoredContext requestContext;
     private final ChannelFutureListener closeListener = future -> doClose();
@@ -49,6 +50,7 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
 
     @Override
     public void setHandler(ChunkHandler chunkHandler) {
+        assert ctx.channel().eventLoop().inEventLoop() : Thread.currentThread().getName();
         this.handler = chunkHandler;
     }
 
@@ -70,10 +72,12 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
     }
 
     public void handleNettyContent(HttpContent httpContent) {
+        assert ctx.channel().eventLoop().inEventLoop() : Thread.currentThread().getName();
         if (closing) {
             httpContent.release();
             read();
         } else {
+            assert readLastChunk == false;
             try (var ignored = threadContext.restoreExistingContext(requestContext)) {
                 var isLast = httpContent instanceof LastHttpContent;
                 var buf = Netty4Utils.toReleasableBytesReference(httpContent.content());
@@ -82,8 +86,9 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
                 }
                 handler.onNext(buf, isLast);
                 if (isLast) {
-                    read();
+                    readLastChunk = true;
                     ctx.channel().closeFuture().removeListener(closeListener);
+                    read();
                 }
             }
         }
@@ -99,6 +104,7 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
     }
 
     private void doClose() {
+        assert ctx.channel().eventLoop().inEventLoop() : Thread.currentThread().getName();
         closing = true;
         try (var ignored = threadContext.restoreExistingContext(requestContext)) {
             for (var tracer : tracingHandlers) {
@@ -108,6 +114,8 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
                 handler.close();
             }
         }
-        read();
+        if (readLastChunk == false) {
+            read();
+        }
     }
 }This avoids a spurious extra   | 
    
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you
…127817) Re-applying elastic#126441 (cf. elastic#127259) with: - the extra `FlowControlHandler` needed to ensure one-chunk-per-read semantics (also present in elastic#127259). - no extra `read()` after exhausting a `Netty4HttpRequestBodyStream` (the bug behind elastic#127391 and elastic#127391). See elastic#127111 for related tests.
…127817) Re-applying elastic#126441 (cf. elastic#127259) with: - the extra `FlowControlHandler` needed to ensure one-chunk-per-read semantics (also present in elastic#127259). - no extra `read()` after exhausting a `Netty4HttpRequestBodyStream` (the bug behind elastic#127391 and elastic#127391). See elastic#127111 for related tests.
Re-applying #126441 (cf. #127259) with:
the extra
FlowControlHandlerneeded to ensure one-chunk-per-readsemantics (also present in Replace auto-read with proper flow-control in HTTP pipeline #127259).
no extra
read()after exhausting aNetty4HttpRequestBodyStream(the bug behind [CI] Netty4IncrementalRequestHandlingIT testHttpClientStats failing #127391 and [CI] Netty4IncrementalRequestHandlingIT testHttpClientStats failing #127391).
See #127111 for related tests.