@@ -105,6 +105,7 @@ private enum ConnectionState { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
105
105
106
106
private volatile ProtocolVersion version ;
107
107
private volatile EndpointDetails endpointDetails ;
108
+ private volatile boolean endOfStream ;
108
109
109
110
AbstractHttp1StreamDuplexer (
110
111
final ProtocolIOSession ioSession ,
@@ -265,6 +266,17 @@ IncomingMessage parseMessageHead(final boolean endOfStream) throws IOException,
265
266
return messageHead ;
266
267
}
267
268
269
+ private int fillBuffer () throws IOException {
270
+ final int bytesRead = inbuf .fill (ioSession );
271
+ if (bytesRead > 0 ) {
272
+ inTransportMetrics .incrementBytesTransferred (bytesRead );
273
+ }
274
+ if (bytesRead == -1 ) {
275
+ endOfStream = true ;
276
+ }
277
+ return bytesRead ;
278
+ }
279
+
268
280
public final void onInput (final ByteBuffer src ) throws HttpException , IOException {
269
281
if (src != null ) {
270
282
final int n = src .remaining ();
@@ -277,17 +289,9 @@ public final void onInput(final ByteBuffer src) throws HttpException, IOExceptio
277
289
return ;
278
290
}
279
291
280
- boolean endOfStream = false ;
281
- if (incomingMessage == null ) {
282
- final int bytesRead = inbuf .fill (ioSession );
283
- if (bytesRead > 0 ) {
284
- inTransportMetrics .incrementBytesTransferred (bytesRead );
285
- }
286
- endOfStream = bytesRead == -1 ;
287
- }
288
-
289
292
do {
290
293
if (incomingMessage == null ) {
294
+ fillBuffer ();
291
295
292
296
final IncomingMessage messageHead = parseMessageHead (endOfStream );
293
297
if (messageHead != null ) {
@@ -342,6 +346,7 @@ public final void onInput(final ByteBuffer src) throws HttpException, IOExceptio
342
346
incomingMessage = null ;
343
347
ioSession .setEvent (SelectionKey .OP_READ );
344
348
inputEnd ();
349
+ fillBuffer ();
345
350
} else if (bytesRead == 0 ) {
346
351
break ;
347
352
}
@@ -561,7 +566,7 @@ public void close(final CloseMode closeMode) {
561
566
562
567
@ Override
563
568
public boolean isOpen () {
564
- return connState .compareTo (ConnectionState .ACTIVE ) <= 0 ;
569
+ return connState .compareTo (ConnectionState .ACTIVE ) <= 0 && ! endOfStream ;
565
570
}
566
571
567
572
@ Override
0 commit comments