diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java index d20bf6971..3badb5445 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java @@ -105,6 +105,7 @@ private enum ConnectionState { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN} private volatile ProtocolVersion version; private volatile EndpointDetails endpointDetails; + private volatile boolean endOfStream; AbstractHttp1StreamDuplexer( final ProtocolIOSession ioSession, @@ -265,6 +266,17 @@ IncomingMessage parseMessageHead(final boolean endOfStream) throws IOException, return messageHead; } + private int fillBuffer() throws IOException { + final int bytesRead = inbuf.fill(ioSession); + if (bytesRead > 0) { + inTransportMetrics.incrementBytesTransferred(bytesRead); + } + if (bytesRead == -1) { + endOfStream = true; + } + return bytesRead; + } + public final void onInput(final ByteBuffer src) throws HttpException, IOException { if (src != null) { final int n = src.remaining(); @@ -277,17 +289,9 @@ public final void onInput(final ByteBuffer src) throws HttpException, IOExceptio return; } - boolean endOfStream = false; - if (incomingMessage == null) { - final int bytesRead = inbuf.fill(ioSession); - if (bytesRead > 0) { - inTransportMetrics.incrementBytesTransferred(bytesRead); - } - endOfStream = bytesRead == -1; - } - do { if (incomingMessage == null) { + fillBuffer(); final IncomingMessage messageHead = parseMessageHead(endOfStream); if (messageHead != null) { @@ -342,6 +346,7 @@ public final void onInput(final ByteBuffer src) throws HttpException, IOExceptio incomingMessage = null; ioSession.setEvent(SelectionKey.OP_READ); inputEnd(); + fillBuffer(); } else if (bytesRead == 0) { break; } @@ -561,7 +566,7 @@ public void close(final CloseMode closeMode) { @Override public boolean isOpen() { - return connState.compareTo(ConnectionState.ACTIVE) <= 0; + return connState.compareTo(ConnectionState.ACTIVE) <= 0 && !endOfStream; } @Override