Skip to content

Conversation

@DaveCTurner
Copy link
Contributor

Re-applying #126441 (cf. #127259) with:

See #127111 for related tests.

Re-applying elastic#126441 (cf. elastic#127259) with:

- the extra `FlowControlHandler` needed to ensure one-chunk-per-read
  semantics (also present in elastic#127259).

- no extra `read()` after exhausting a `Netty4HttpRequestBodyStream`
  (the bug behind elastic#127391 and elastic#127391).

See elastic#127111 for related tests.
@DaveCTurner DaveCTurner added >enhancement :Distributed Coordination/Network Http and internode communication implementations v9.1.0 labels May 7, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination)

@elasticsearchmachine elasticsearchmachine added the Team:Distributed Coordination Meta label for Distributed Coordination team label May 7, 2025
@elasticsearchmachine
Copy link
Collaborator

Hi @DaveCTurner, I've created a changelog YAML for you.

@DaveCTurner
Copy link
Contributor Author

The diff over #127259 is the following:

diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java
index b4396569ae35..f7c9d1404c61 100644
--- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java
+++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java
@@ -31,6 +31,7 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
     private final ThreadContext threadContext;
     private final ChannelHandlerContext ctx;
     private boolean closing = false;
+    private boolean readLastChunk = false;
     private HttpBody.ChunkHandler handler;
     private ThreadContext.StoredContext requestContext;
     private final ChannelFutureListener closeListener = future -> doClose();
@@ -49,6 +50,7 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
 
     @Override
     public void setHandler(ChunkHandler chunkHandler) {
+        assert ctx.channel().eventLoop().inEventLoop() : Thread.currentThread().getName();
         this.handler = chunkHandler;
     }
 
@@ -70,10 +72,12 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
     }
 
     public void handleNettyContent(HttpContent httpContent) {
+        assert ctx.channel().eventLoop().inEventLoop() : Thread.currentThread().getName();
         if (closing) {
             httpContent.release();
             read();
         } else {
+            assert readLastChunk == false;
             try (var ignored = threadContext.restoreExistingContext(requestContext)) {
                 var isLast = httpContent instanceof LastHttpContent;
                 var buf = Netty4Utils.toReleasableBytesReference(httpContent.content());
@@ -82,8 +86,9 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
                 }
                 handler.onNext(buf, isLast);
                 if (isLast) {
-                    read();
+                    readLastChunk = true;
                     ctx.channel().closeFuture().removeListener(closeListener);
+                    read();
                 }
             }
         }
@@ -99,6 +104,7 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
     }
 
     private void doClose() {
+        assert ctx.channel().eventLoop().inEventLoop() : Thread.currentThread().getName();
         closing = true;
         try (var ignored = threadContext.restoreExistingContext(requestContext)) {
             for (var tracer : tracingHandlers) {
@@ -108,6 +114,8 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
                 handler.close();
             }
         }
-        read();
+        if (readLastChunk == false) {
+            read();
+        }
     }
 }

This avoids a spurious extra read() from doClose() in the case that we already got to the end of the request in handleNettyContent (and potentially started processing the next request if pipelining).

Copy link
Contributor

@mhl-b mhl-b left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thank you

@DaveCTurner DaveCTurner added the auto-merge-without-approval Automatically merge pull request when CI checks pass (NB doesn't wait for reviews!) label May 8, 2025
@elasticsearchmachine elasticsearchmachine merged commit 85d9990 into elastic:main May 8, 2025
17 checks passed
@DaveCTurner DaveCTurner deleted the 2025/05/06/netty-autoread-reapply-2 branch May 8, 2025 07:35
ywangd pushed a commit to ywangd/elasticsearch that referenced this pull request May 9, 2025
…127817)

Re-applying elastic#126441 (cf. elastic#127259) with:

- the extra `FlowControlHandler` needed to ensure one-chunk-per-read
  semantics (also present in elastic#127259).

- no extra `read()` after exhausting a `Netty4HttpRequestBodyStream`
  (the bug behind elastic#127391 and elastic#127391).

See elastic#127111 for related tests.
jfreden pushed a commit to jfreden/elasticsearch that referenced this pull request May 12, 2025
…127817)

Re-applying elastic#126441 (cf. elastic#127259) with:

- the extra `FlowControlHandler` needed to ensure one-chunk-per-read
  semantics (also present in elastic#127259).

- no extra `read()` after exhausting a `Netty4HttpRequestBodyStream`
  (the bug behind elastic#127391 and elastic#127391).

See elastic#127111 for related tests.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

auto-merge-without-approval Automatically merge pull request when CI checks pass (NB doesn't wait for reviews!) :Distributed Coordination/Network Http and internode communication implementations >enhancement Team:Distributed Coordination Meta label for Distributed Coordination team v9.1.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants