Skip to content

Commit e67636b

Browse files
committed
Stale check command for async connections
1 parent 076296c commit e67636b

File tree

3 files changed

+97
-2
lines changed

3 files changed

+97
-2
lines changed

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,10 @@
4141
import java.util.concurrent.ConcurrentLinkedDeque;
4242
import java.util.concurrent.ConcurrentLinkedQueue;
4343
import java.util.concurrent.atomic.AtomicInteger;
44-
4544
import javax.net.ssl.SSLHandshakeException;
4645
import javax.net.ssl.SSLSession;
47-
4846
import org.apache.hc.core5.concurrent.CancellableDependency;
47+
import org.apache.hc.core5.concurrent.FutureCallback;
4948
import org.apache.hc.core5.http.ConnectionClosedException;
5049
import org.apache.hc.core5.http.EndpointDetails;
5150
import org.apache.hc.core5.http.Header;
@@ -65,6 +64,7 @@
6564
import org.apache.hc.core5.http.nio.HandlerFactory;
6665
import org.apache.hc.core5.http.nio.command.ExecutableCommand;
6766
import org.apache.hc.core5.http.nio.command.ShutdownCommand;
67+
import org.apache.hc.core5.http.nio.command.StaleCheckCommand;
6868
import org.apache.hc.core5.http.protocol.HttpCoreContext;
6969
import org.apache.hc.core5.http.protocol.HttpProcessor;
7070
import org.apache.hc.core5.http2.H2ConnectionException;
@@ -410,6 +410,23 @@ private int generateStreamId() {
410410
}
411411
}
412412

413+
void doStalecheck(final FutureCallback<Boolean> callback) throws IOException {
414+
if (!ioSession.isOpen() || connState.compareTo(ConnectionHandshake.ACTIVE) > 0) {
415+
callback.completed(false);
416+
}
417+
final ByteBuffer buffer = ByteBuffer.allocate(0);
418+
final int bytesRead = ioSession.channel().read(buffer);
419+
if (bytesRead == -1) {
420+
if (connState == ConnectionHandshake.ACTIVE) {
421+
connState = ConnectionHandshake.GRACEFUL_SHUTDOWN;
422+
}
423+
requestSessionOutput();
424+
callback.completed(false);
425+
} else {
426+
callback.completed(true);
427+
}
428+
}
429+
413430
public final void onConnect() throws HttpException, IOException {
414431
connState = ConnectionHandshake.ACTIVE;
415432
final RawFrame settingsFrame = frameFactory.createSettings(
@@ -647,6 +664,8 @@ private void processPendingCommands() throws IOException, HttpException {
647664
if (!outputQueue.isEmpty()) {
648665
return;
649666
}
667+
} else if (command instanceof StaleCheckCommand) {
668+
doStalecheck(((StaleCheckCommand) command).getCallback());
650669
}
651670
}
652671
}

httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import javax.net.ssl.SSLHandshakeException;
4141
import javax.net.ssl.SSLSession;
4242

43+
import org.apache.hc.core5.concurrent.FutureCallback;
4344
import org.apache.hc.core5.http.ConnectionClosedException;
4445
import org.apache.hc.core5.http.ContentLengthStrategy;
4546
import org.apache.hc.core5.http.EndpointDetails;
@@ -68,6 +69,7 @@
6869
import org.apache.hc.core5.http.nio.command.CommandSupport;
6970
import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
7071
import org.apache.hc.core5.http.nio.command.ShutdownCommand;
72+
import org.apache.hc.core5.http.nio.command.StaleCheckCommand;
7173
import org.apache.hc.core5.io.CloseMode;
7274
import org.apache.hc.core5.io.SocketTimeoutExceptionFactory;
7375
import org.apache.hc.core5.reactor.Command;
@@ -244,6 +246,8 @@ private void processCommands() throws HttpException, IOException {
244246
execute((RequestExecutionCommand) command);
245247
return;
246248
}
249+
} else if (command instanceof StaleCheckCommand) {
250+
doStalecheck(((StaleCheckCommand) command).getCallback());
247251
} else {
248252
throw new HttpException("Unexpected command: " + command.getClass());
249253
}
@@ -429,6 +433,20 @@ void requestShutdown(final CloseMode closeMode) {
429433
ioSession.setEvent(SelectionKey.OP_WRITE);
430434
}
431435

436+
void doStalecheck(final FutureCallback<Boolean> callback) throws IOException {
437+
if (!ioSession.isOpen() || connState.compareTo(ConnectionState.ACTIVE) > 0) {
438+
callback.completed(false);
439+
}
440+
final ByteBuffer buffer = ByteBuffer.allocate(0);
441+
final int bytesRead = ioSession.channel().read(buffer);
442+
if (bytesRead == -1) {
443+
requestShutdown(CloseMode.GRACEFUL);
444+
callback.completed(false);
445+
} else {
446+
callback.completed(true);
447+
}
448+
}
449+
432450
void commitMessageHead(
433451
final OutgoingMessage messageHead,
434452
final boolean endStream,
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
28+
package org.apache.hc.core5.http.nio.command;
29+
30+
import org.apache.hc.core5.annotation.Internal;
31+
import org.apache.hc.core5.concurrent.FutureCallback;
32+
import org.apache.hc.core5.reactor.Command;
33+
import org.apache.hc.core5.util.Args;
34+
35+
/**
36+
* Stale check command.
37+
*
38+
* @since 5.4
39+
*/
40+
@Internal
41+
public final class StaleCheckCommand implements Command {
42+
43+
private final FutureCallback<Boolean> callback;
44+
45+
public StaleCheckCommand(final FutureCallback<Boolean> callback) {
46+
this.callback = Args.notNull(callback, "Callback");
47+
}
48+
49+
public FutureCallback<Boolean> getCallback() {
50+
return callback;
51+
}
52+
53+
@Override
54+
public boolean cancel() {
55+
return true;
56+
}
57+
58+
}

0 commit comments

Comments
 (0)