Skip to content

Commit 68348ab

Browse files
committed
Ensure that the HTTP server supports WebSocket upgrades for HTTP/1.0 non persistent connections.
1 parent 5224240 commit 68348ab

File tree

2 files changed

+46
-37
lines changed

2 files changed

+46
-37
lines changed

src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java

Lines changed: 31 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,7 @@ private abstract static class Stream {
381381

382382
private Object trace;
383383
private Object metric;
384+
private HttpRequestHead request;
384385
private HttpResponseHead response;
385386
private boolean responseEnded;
386387
private long bytesRead;
@@ -422,7 +423,6 @@ private static class StreamImpl extends Stream implements HttpClientStream {
422423
private final InboundBuffer<Object> queue;
423424
private boolean reset;
424425
private boolean closed;
425-
private HttpRequestHead request;
426426
private Handler<HttpResponseHead> headHandler;
427427
private Handler<Buffer> chunkHandler;
428428
private Handler<MultiMap> endHandler;
@@ -554,7 +554,7 @@ public void writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boo
554554
private void writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, boolean connect, Handler<AsyncResult<Void>> handler) {
555555
EventLoop eventLoop = conn.context.nettyEventLoop();
556556
if (eventLoop.inEventLoop()) {
557-
this.request = request;
557+
((Stream)this).request = request;
558558
conn.beginRequest(this, request, chunked, buf, end, connect, handler);
559559
} else {
560560
eventLoop.execute(() -> writeHead(request, chunked, buf, end, connect, handler));
@@ -840,40 +840,14 @@ private void handleResponseBegin(Stream stream, HttpResponseHead response) {
840840
} else {
841841
HttpRequestHead request;
842842
synchronized (this) {
843-
request = ((StreamImpl)stream).request;
843+
request = stream.request;
844844
stream.response = response;
845845

846846
if (metrics != null) {
847847
metrics.responseBegin(stream.metric, response);
848848
}
849-
850-
//
851-
if (response.statusCode != 100 && request.method != HttpMethod.CONNECT) {
852-
// See https://tools.ietf.org/html/rfc7230#section-6.3
853-
String responseConnectionHeader = response.headers.get(HttpHeaderNames.CONNECTION);
854-
String requestConnectionHeader = request.headers != null ? request.headers.get(HttpHeaderNames.CONNECTION) : null;
855-
// We don't need to protect against concurrent changes on forceClose as it only goes from false -> true
856-
if (HttpHeaderValues.CLOSE.contentEqualsIgnoreCase(responseConnectionHeader) || HttpHeaderValues.CLOSE.contentEqualsIgnoreCase(requestConnectionHeader)) {
857-
// In all cases, if we have a close connection option then we SHOULD NOT treat the connection as persistent
858-
this.close = true;
859-
} else if (response.version == HttpVersion.HTTP_1_0 && !HttpHeaderValues.KEEP_ALIVE.contentEqualsIgnoreCase(responseConnectionHeader)) {
860-
// In the HTTP/1.0 case both request/response need a keep-alive connection header the connection to be persistent
861-
// currently Vertx forces the Connection header if keepalive is enabled for 1.0
862-
this.close = true;
863-
}
864-
String keepAliveHeader = response.headers.get(HttpHeaderNames.KEEP_ALIVE);
865-
if (keepAliveHeader != null) {
866-
int timeout = HttpUtils.parseKeepAliveHeaderTimeout(keepAliveHeader);
867-
if (timeout != -1) {
868-
this.keepAliveTimeout = timeout;
869-
}
870-
}
871-
}
872849
}
873-
874-
//
875850
stream.handleHead(response);
876-
877851
if (isConnect) {
878852
if ((request.method == HttpMethod.CONNECT &&
879853
response.statusCode == 200) || (
@@ -922,19 +896,44 @@ private void handleResponseChunk(Stream stream, ByteBuf chunk) {
922896

923897
private void handleResponseEnd(Stream stream, LastHttpContent trailer) {
924898
boolean check;
899+
HttpResponseHead response;
925900
synchronized (this) {
926-
if (stream.response == null) {
901+
response = stream.response;
902+
if (response == null) {
927903
// 100-continue
928904
return;
929905
}
930906
responses.pop();
931-
close |= !options.isKeepAlive();
907+
HttpRequestHead request = stream.request;
908+
if ((request.method != HttpMethod.CONNECT && response.statusCode != 101)) {
909+
// See https://tools.ietf.org/html/rfc7230#section-6.3
910+
String responseConnectionHeader = response.headers.get(HttpHeaderNames.CONNECTION);
911+
String requestConnectionHeader = request.headers != null ? request.headers.get(HttpHeaderNames.CONNECTION) : null;
912+
// We don't need to protect against concurrent changes on forceClose as it only goes from false -> true
913+
boolean close = !options.isKeepAlive();
914+
if (HttpHeaderValues.CLOSE.contentEqualsIgnoreCase(responseConnectionHeader) || HttpHeaderValues.CLOSE.contentEqualsIgnoreCase(requestConnectionHeader)) {
915+
// In all cases, if we have a close connection option then we SHOULD NOT treat the connection as persistent
916+
close = true;
917+
} else if (response.version == HttpVersion.HTTP_1_0 && !HttpHeaderValues.KEEP_ALIVE.contentEqualsIgnoreCase(responseConnectionHeader)) {
918+
// In the HTTP/1.0 case both request/response need a keep-alive connection header the connection to be persistent
919+
// currently Vertx forces the Connection header if keepalive is enabled for 1.0
920+
close = true;
921+
}
922+
this.close = close;
923+
String keepAliveHeader = response.headers.get(HttpHeaderNames.KEEP_ALIVE);
924+
if (keepAliveHeader != null) {
925+
int timeout = HttpUtils.parseKeepAliveHeaderTimeout(keepAliveHeader);
926+
if (timeout != -1) {
927+
this.keepAliveTimeout = timeout;
928+
}
929+
}
930+
}
932931
stream.responseEnded = true;
933932
check = requests.peek() != stream;
934933
}
935934
VertxTracer tracer = context.tracer();
936935
if (tracer != null) {
937-
tracer.receiveResponse(stream.context, stream.response, stream.trace, null, HttpUtils.CLIENT_RESPONSE_TAG_EXTRACTOR);
936+
tracer.receiveResponse(stream.context, response, stream.trace, null, HttpUtils.CLIENT_RESPONSE_TAG_EXTRACTOR);
938937
}
939938
if (metrics != null) {
940939
metrics.responseEnd(stream.metric, stream.bytesRead);

src/test/java/io/vertx/core/http/WebSocketTest.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import io.vertx.test.tls.Cert;
4545
import io.vertx.test.tls.Trust;
4646
import org.junit.Test;
47+
import org.junit.Ignore;
4748

4849
import java.io.IOException;
4950
import java.io.UnsupportedEncodingException;
@@ -2478,7 +2479,7 @@ public void testServerWebSocketPingPong() {
24782479
}));
24792480
await();
24802481
}
2481-
2482+
24822483
@Test
24832484
public void testWebSocketPausePing() throws InterruptedException {
24842485
server = vertx.createHttpServer(new HttpServerOptions().setIdleTimeout(1).setPort(DEFAULT_HTTP_PORT).setHost(HttpTestBase.DEFAULT_HTTP_HOST));
@@ -2951,17 +2952,26 @@ private void testCloseCustomPayloadFromClient(Consumer<WebSocket> closeOp) {
29512952
}
29522953

29532954
@Test
2954-
public void testServerWebSocketHandshakeWithNonPersistentConnection() {
2955+
public void testServerWebSocketHandshakeWithNonPersistentHTTP1_0Connection() {
2956+
testServerWebSocketHandshakeWithNonPersistentConnection(HttpVersion.HTTP_1_0);
2957+
}
2958+
2959+
@Ignore
2960+
@Test
2961+
public void testServerWebSocketHandshakeWithNonPersistentHTTP1_1Connection() {
2962+
// Cannot pass until we merge connection header as it implies a "Connection: upgrade, close" header
2963+
testServerWebSocketHandshakeWithNonPersistentConnection(HttpVersion.HTTP_1_1);
2964+
}
2965+
2966+
private void testServerWebSocketHandshakeWithNonPersistentConnection(HttpVersion version) {
29552967
server = vertx.createHttpServer();
29562968
server.webSocketHandler(ws -> {
29572969
ws.frameHandler(frame -> {
29582970
ws.close();
29592971
});
29602972
});
29612973
server.listen(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, onSuccess(v1 -> {
2962-
handshake(vertx.createHttpClient(), req -> {
2963-
MultiMap headers = req.headers();
2964-
headers.add("Connection", "close");
2974+
handshake(vertx.createHttpClient(new HttpClientOptions().setProtocolVersion(version).setKeepAlive(false)), req -> {
29652975
req.send(onSuccess(resp -> {
29662976
assertEquals(101, resp.statusCode());
29672977
resp.endHandler(v -> {

0 commit comments

Comments
 (0)