Skip to content

Commit 4f0eea7

Browse files
committed
Stale check command for async connections
1 parent 076296c commit 4f0eea7

File tree

3 files changed

+97
-0
lines changed

3 files changed

+97
-0
lines changed

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import javax.net.ssl.SSLSession;
4747

4848
import org.apache.hc.core5.concurrent.CancellableDependency;
49+
import org.apache.hc.core5.concurrent.FutureCallback;
4950
import org.apache.hc.core5.http.ConnectionClosedException;
5051
import org.apache.hc.core5.http.EndpointDetails;
5152
import org.apache.hc.core5.http.Header;
@@ -65,6 +66,7 @@
6566
import org.apache.hc.core5.http.nio.HandlerFactory;
6667
import org.apache.hc.core5.http.nio.command.ExecutableCommand;
6768
import org.apache.hc.core5.http.nio.command.ShutdownCommand;
69+
import org.apache.hc.core5.http.nio.command.StaleCheckCommand;
6870
import org.apache.hc.core5.http.protocol.HttpCoreContext;
6971
import org.apache.hc.core5.http.protocol.HttpProcessor;
7072
import org.apache.hc.core5.http2.H2ConnectionException;
@@ -410,6 +412,23 @@ private int generateStreamId() {
410412
}
411413
}
412414

415+
void doStalecheck(final FutureCallback<Boolean> callback) throws IOException {
416+
if (connState.compareTo(ConnectionHandshake.ACTIVE) > 0) {
417+
callback.completed(false);
418+
}
419+
final ByteBuffer buffer = ByteBuffer.allocate(0);
420+
final int bytesRead = ioSession.channel().read(buffer);
421+
if (bytesRead == -1) {
422+
if (connState == ConnectionHandshake.ACTIVE) {
423+
connState = ConnectionHandshake.GRACEFUL_SHUTDOWN;
424+
}
425+
requestSessionOutput();
426+
callback.completed(false);
427+
} else {
428+
callback.completed(true);
429+
}
430+
}
431+
413432
public final void onConnect() throws HttpException, IOException {
414433
connState = ConnectionHandshake.ACTIVE;
415434
final RawFrame settingsFrame = frameFactory.createSettings(
@@ -647,6 +666,8 @@ private void processPendingCommands() throws IOException, HttpException {
647666
if (!outputQueue.isEmpty()) {
648667
return;
649668
}
669+
} else if (command instanceof StaleCheckCommand) {
670+
doStalecheck(((StaleCheckCommand) command).getCallback());
650671
}
651672
}
652673
}

httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import javax.net.ssl.SSLHandshakeException;
4141
import javax.net.ssl.SSLSession;
4242

43+
import org.apache.hc.core5.concurrent.FutureCallback;
4344
import org.apache.hc.core5.http.ConnectionClosedException;
4445
import org.apache.hc.core5.http.ContentLengthStrategy;
4546
import org.apache.hc.core5.http.EndpointDetails;
@@ -68,6 +69,7 @@
6869
import org.apache.hc.core5.http.nio.command.CommandSupport;
6970
import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
7071
import org.apache.hc.core5.http.nio.command.ShutdownCommand;
72+
import org.apache.hc.core5.http.nio.command.StaleCheckCommand;
7173
import org.apache.hc.core5.io.CloseMode;
7274
import org.apache.hc.core5.io.SocketTimeoutExceptionFactory;
7375
import org.apache.hc.core5.reactor.Command;
@@ -244,6 +246,8 @@ private void processCommands() throws HttpException, IOException {
244246
execute((RequestExecutionCommand) command);
245247
return;
246248
}
249+
} else if (command instanceof StaleCheckCommand) {
250+
doStalecheck(((StaleCheckCommand) command).getCallback());
247251
} else {
248252
throw new HttpException("Unexpected command: " + command.getClass());
249253
}
@@ -429,6 +433,20 @@ void requestShutdown(final CloseMode closeMode) {
429433
ioSession.setEvent(SelectionKey.OP_WRITE);
430434
}
431435

436+
void doStalecheck(final FutureCallback<Boolean> callback) throws IOException {
437+
if (connState.compareTo(ConnectionState.ACTIVE) > 0) {
438+
callback.completed(false);
439+
}
440+
final ByteBuffer buffer = ByteBuffer.allocate(0);
441+
final int bytesRead = ioSession.channel().read(buffer);
442+
if (bytesRead == -1) {
443+
requestShutdown(CloseMode.GRACEFUL);
444+
callback.completed(false);
445+
} else {
446+
callback.completed(true);
447+
}
448+
}
449+
432450
void commitMessageHead(
433451
final OutgoingMessage messageHead,
434452
final boolean endStream,
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
28+
package org.apache.hc.core5.http.nio.command;
29+
30+
import org.apache.hc.core5.annotation.Internal;
31+
import org.apache.hc.core5.concurrent.FutureCallback;
32+
import org.apache.hc.core5.reactor.Command;
33+
import org.apache.hc.core5.util.Args;
34+
35+
/**
36+
* Stale check command.
37+
*
38+
* @since 5.4
39+
*/
40+
@Internal
41+
public final class StaleCheckCommand implements Command {
42+
43+
private final FutureCallback<Boolean> callback;
44+
45+
public StaleCheckCommand(final FutureCallback<Boolean> callback) {
46+
this.callback = Args.notNull(callback, "Callback");
47+
}
48+
49+
public FutureCallback<Boolean> getCallback() {
50+
return callback;
51+
}
52+
53+
@Override
54+
public boolean cancel() {
55+
return true;
56+
}
57+
58+
}

0 commit comments

Comments
 (0)