Skip to content

Commit bb50a17

Browse files
rschmittok2c
authored andcommitted
Adds StaleCheckCommand
The `StaleCheckCommand`, like all `Command`s, is modeled as a write operation, and `InternalDataChannel::onIOEvent` processes reads before writes. Therefore, by the time the stale check command is processed, the client's view of the connection is already up-to-date; any server-initiated connection closure (FIN, RST, GOAWAY) has already been read and processed.
1 parent ccb7b49 commit bb50a17

File tree

4 files changed

+88
-7
lines changed

4 files changed

+88
-7
lines changed

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.concurrent.ConcurrentLinkedQueue;
4343
import java.util.concurrent.atomic.AtomicBoolean;
4444
import java.util.concurrent.atomic.AtomicInteger;
45+
import java.util.function.Consumer;
4546

4647
import javax.net.ssl.SSLHandshakeException;
4748
import javax.net.ssl.SSLSession;
@@ -68,6 +69,7 @@
6869
import org.apache.hc.core5.http.nio.command.CommandSupport;
6970
import org.apache.hc.core5.http.nio.command.ExecutableCommand;
7071
import org.apache.hc.core5.http.nio.command.ShutdownCommand;
72+
import org.apache.hc.core5.http.nio.command.StaleCheckCommand;
7173
import org.apache.hc.core5.http.protocol.HttpCoreContext;
7274
import org.apache.hc.core5.http.protocol.HttpProcessor;
7375
import org.apache.hc.core5.http2.H2ConnectionException;
@@ -413,6 +415,14 @@ private int generateStreamId() {
413415
}
414416
}
415417

418+
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
419+
420+
void doStalecheck(final Consumer<Boolean> callback) throws IOException {
421+
callback.accept(ioSession.isOpen() &&
422+
connState.compareTo(ConnectionHandshake.ACTIVE) == 0 &&
423+
ioSession.read(EMPTY_BUFFER) == 0);
424+
}
425+
416426
public final void onConnect() throws HttpException, IOException {
417427
connState = ConnectionHandshake.ACTIVE;
418428
final RawFrame settingsFrame = frameFactory.createSettings(
@@ -656,6 +666,8 @@ private void processPendingCommands() throws IOException, HttpException {
656666
if (!outputQueue.isEmpty()) {
657667
return;
658668
}
669+
} else if (command instanceof StaleCheckCommand) {
670+
doStalecheck(((StaleCheckCommand) command).getCallback());
659671
}
660672
}
661673
}

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/pool/H2ConnPool.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,8 @@
3939
import org.apache.hc.core5.http.URIScheme;
4040
import org.apache.hc.core5.http.impl.DefaultAddressResolver;
4141
import org.apache.hc.core5.http.nio.command.ShutdownCommand;
42+
import org.apache.hc.core5.http.nio.command.StaleCheckCommand;
4243
import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
43-
import org.apache.hc.core5.http2.nio.command.PingCommand;
44-
import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
4544
import org.apache.hc.core5.io.CloseMode;
4645
import org.apache.hc.core5.reactor.AbstractIOSessionPool;
4746
import org.apache.hc.core5.reactor.Command;
@@ -146,11 +145,7 @@ protected void validateSession(
146145
final long lastAccessTime = Math.min(ioSession.getLastReadTime(), ioSession.getLastWriteTime());
147146
final long deadline = lastAccessTime + timeValue.toMilliseconds();
148147
if (deadline <= System.currentTimeMillis()) {
149-
final Timeout socketTimeoutMillis = ioSession.getSocketTimeout();
150-
ioSession.enqueue(new PingCommand(new BasicPingHandler(result -> {
151-
ioSession.setSocketTimeout(socketTimeoutMillis);
152-
callback.execute(result);
153-
})), Command.Priority.NORMAL);
148+
ioSession.enqueue(new StaleCheckCommand(callback::execute), Command.Priority.NORMAL);
154149
return;
155150
}
156151
}

httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.List;
3737
import java.util.concurrent.atomic.AtomicInteger;
3838
import java.util.concurrent.locks.ReentrantLock;
39+
import java.util.function.Consumer;
3940

4041
import javax.net.ssl.SSLHandshakeException;
4142
import javax.net.ssl.SSLSession;
@@ -68,6 +69,7 @@
6869
import org.apache.hc.core5.http.nio.command.CommandSupport;
6970
import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
7071
import org.apache.hc.core5.http.nio.command.ShutdownCommand;
72+
import org.apache.hc.core5.http.nio.command.StaleCheckCommand;
7173
import org.apache.hc.core5.io.CloseMode;
7274
import org.apache.hc.core5.io.SocketTimeoutExceptionFactory;
7375
import org.apache.hc.core5.reactor.Command;
@@ -244,6 +246,8 @@ private void processCommands() throws HttpException, IOException {
244246
execute((RequestExecutionCommand) command);
245247
return;
246248
}
249+
} else if (command instanceof StaleCheckCommand) {
250+
doStalecheck(((StaleCheckCommand) command).getCallback());
247251
} else {
248252
throw new HttpException("Unexpected command: " + command.getClass());
249253
}
@@ -429,6 +433,14 @@ void requestShutdown(final CloseMode closeMode) {
429433
ioSession.setEvent(SelectionKey.OP_WRITE);
430434
}
431435

436+
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
437+
438+
void doStalecheck(final Consumer<Boolean> callback) throws IOException {
439+
callback.accept(ioSession.isOpen() &&
440+
connState.compareTo(ConnectionState.ACTIVE) == 0 &&
441+
ioSession.read(EMPTY_BUFFER) == 0);
442+
}
443+
432444
void commitMessageHead(
433445
final OutgoingMessage messageHead,
434446
final boolean endStream,
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
28+
package org.apache.hc.core5.http.nio.command;
29+
30+
import java.util.function.Consumer;
31+
32+
import org.apache.hc.core5.annotation.Internal;
33+
import org.apache.hc.core5.reactor.Command;
34+
import org.apache.hc.core5.util.Args;
35+
36+
/**
37+
* Stale check command. The {@code callback} will be invoked after the client's event loop has finished processing
38+
* all pending reads on the connection. If the connection is still active at that point, the callback will be completed
39+
* with {@code true}. In any other event, the callback will be cancelled, failed, or completed with {@code false}.
40+
*
41+
* @since 5.4
42+
*/
43+
@Internal
44+
public final class StaleCheckCommand implements Command {
45+
46+
private final Consumer<Boolean> callback;
47+
48+
public StaleCheckCommand(final Consumer<Boolean> callback) {
49+
this.callback = Args.notNull(callback, "Callback");
50+
}
51+
52+
public Consumer<Boolean> getCallback() {
53+
return callback;
54+
}
55+
56+
@Override
57+
public boolean cancel() {
58+
callback.accept(false);
59+
return true;
60+
}
61+
62+
}

0 commit comments

Comments
 (0)