Skip to content

Commit 2f6b74b

Browse files
authored
Merge pull request #5747 from eclipse-vertx/http2-simplification
HTTP stream internal API
2 parents b4c0625 + 1c713f8 commit 2f6b74b

31 files changed

+538
-349
lines changed

vertx-core/src/main/java/io/vertx/core/http/HttpServerResponse.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.vertx.core.Handler;
1717
import io.vertx.core.MultiMap;
1818
import io.vertx.core.buffer.Buffer;
19+
import io.vertx.core.http.impl.HttpServerResponseImpl;
1920
import io.vertx.core.net.HostAndPort;
2021
import io.vertx.core.streams.ReadStream;
2122
import io.vertx.core.streams.WriteStream;
@@ -41,7 +42,7 @@
4142
* serving files from the server since buffers do not have to be read one by one
4243
* from the file and written to the outgoing socket. If the developer wants to use directly a
4344
* {@link java.nio.channels.FileChannel} and manage its lifecycle use {@link #sendFile(FileChannel)}.
44-
* This is not yet supported in HTTP/2 for {@link io.vertx.core.http.impl.Http2ServerResponse}.
45+
* This is not yet supported in HTTP/2 for {@link HttpServerResponseImpl}.
4546
* <p>
4647
* It implements {@link io.vertx.core.streams.WriteStream} so it can be used with
4748
* {@link io.vertx.core.streams.Pipe} to pipe data with flow control.

vertx-core/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import io.vertx.core.http.HttpVersion;
3838
import io.vertx.core.http.WebSocketVersion;
3939
import io.vertx.core.http.impl.headers.HeadersAdaptor;
40-
import io.vertx.core.http.impl.http2.Http2ClientPush;
4140
import io.vertx.core.internal.ContextInternal;
4241
import io.vertx.core.internal.PromiseInternal;
4342
import io.vertx.core.internal.buffer.BufferInternal;
@@ -378,6 +377,7 @@ private abstract static class Stream {
378377
protected final ContextInternal context;
379378
protected final int id;
380379

380+
private HttpVersion version;
381381
private Object trace;
382382
private Object metric;
383383
private HttpRequestHead request;
@@ -500,7 +500,7 @@ public boolean isWritable() {
500500
}
501501

502502
@Override
503-
public HttpClientStream headersHandler(Handler<HttpResponseHead> handler) {
503+
public HttpClientStream headHandler(Handler<HttpResponseHead> handler) {
504504
this.headHandler = handler;
505505
return this;
506506
}
@@ -523,7 +523,7 @@ public HttpClientStream priorityChangeHandler(Handler<StreamPriority> handler) {
523523
}
524524

525525
@Override
526-
public HttpClientStream pushHandler(Handler<Http2ClientPush> handler) {
526+
public HttpClientStream pushHandler(Handler<HttpClientPush> handler) {
527527
// No op
528528
return this;
529529
}
@@ -565,25 +565,25 @@ public ContextInternal context() {
565565
}
566566

567567
@Override
568-
public Future<Void> writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, boolean connect) {
568+
public Future<Void> writeHead(HttpRequestHead request, boolean chunked, Buffer buf, boolean end, StreamPriority priority, boolean connect) {
569569
PromiseInternal<Void> promise = context.promise();
570-
conn.writeHead(this, request, chunked, buf, end, connect, promise);
570+
conn.writeHead(this, request, chunked, buf != null ? ((BufferInternal)buf).getByteBuf() : null, end, connect, promise);
571571
return promise.future();
572572
}
573573

574574
@Override
575-
public Future<Void> write(ByteBuf buff, boolean end) {
575+
public Future<Void> writeChunk(Buffer buff, boolean end) {
576576
if (buff != null || end) {
577577
Promise<Void> listener = context.promise();
578-
conn.writeBuffer(this, buff, end, listener);
578+
conn.writeBuffer(this, buff != null ? ((BufferInternal)buff).getByteBuf() : null, end, listener);
579579
return listener.future();
580580
} else {
581581
throw new IllegalStateException("???");
582582
}
583583
}
584584

585585
@Override
586-
public Future<Void> writeFrame(int type, int flags, ByteBuf payload) {
586+
public Future<Void> writeFrame(int type, int flags, Buffer payload) {
587587
throw new IllegalStateException("Cannot write an HTTP/2 frame over an HTTP/1.x connection");
588588
}
589589

@@ -793,8 +793,7 @@ private void handleHttpMessage(HttpObject obj) {
793793
} else {
794794
version = io.vertx.core.http.HttpVersion.HTTP_1_1;
795795
}
796-
handleResponseBegin(stream, new HttpResponseHead(
797-
version,
796+
handleResponseBegin(stream, version, new HttpResponseHead(
798797
response.status().code(),
799798
response.status().reasonPhrase(),
800799
new HeadersAdaptor(response.headers())));
@@ -822,7 +821,7 @@ private void handleChunk(ByteBuf chunk) {
822821
}
823822
}
824823

825-
private void handleResponseBegin(Stream stream, HttpResponseHead response) {
824+
private void handleResponseBegin(Stream stream, HttpVersion version, HttpResponseHead response) {
826825
// How can we handle future undefined 1xx informational response codes?
827826
if (response.statusCode == HttpResponseStatus.CONTINUE.code()) {
828827
stream.handleContinue();
@@ -832,6 +831,7 @@ private void handleResponseBegin(Stream stream, HttpResponseHead response) {
832831
HttpRequestHead request;
833832
synchronized (this) {
834833
request = stream.request;
834+
stream.version = version;
835835
stream.response = response;
836836
if (metrics != null) {
837837
metrics.responseBegin(stream.metric, response);
@@ -885,9 +885,11 @@ private void handleResponseChunk(Stream stream, ByteBuf chunk) {
885885

886886
private void handleResponseEnd(Stream stream, LastHttpContent trailer) {
887887
boolean check;
888-
HttpResponseHead response ;
888+
HttpResponseHead response;
889+
HttpVersion version;
889890
synchronized (this) {
890891
response = stream.response;
892+
version = stream.version;
891893
if (response == null) {
892894
// 100-continue
893895
return;
@@ -903,7 +905,7 @@ private void handleResponseEnd(Stream stream, LastHttpContent trailer) {
903905
if (HttpHeaderValues.CLOSE.contentEqualsIgnoreCase(responseConnectionHeader) || HttpHeaderValues.CLOSE.contentEqualsIgnoreCase(requestConnectionHeader)) {
904906
// In all cases, if we have a close connection option then we SHOULD NOT treat the connection as persistent
905907
close = true;
906-
} else if (response.version == HttpVersion.HTTP_1_0 && !HttpHeaderValues.KEEP_ALIVE.contentEqualsIgnoreCase(responseConnectionHeader)) {
908+
} else if (version == HttpVersion.HTTP_1_0 && !HttpHeaderValues.KEEP_ALIVE.contentEqualsIgnoreCase(responseConnectionHeader)) {
907909
// In the HTTP/1.0 case both request/response need a keep-alive connection header the connection to be persistent
908910
// currently Vertx forces the Connection header if keepalive is enabled for 1.0
909911
close = true;

vertx-core/src/main/java/io/vertx/core/http/impl/Http1xServerConnection.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.vertx.core.*;
2929
import io.vertx.core.buffer.Buffer;
3030
import io.vertx.core.http.ServerWebSocketHandshake;
31+
import io.vertx.core.http.impl.headers.HeadersMultiMap;
3132
import io.vertx.core.internal.buffer.BufferInternal;
3233
import io.vertx.core.http.HttpServerOptions;
3334
import io.vertx.core.http.HttpServerRequest;
@@ -114,6 +115,16 @@ protected void handleShutdown(ChannelPromise promise) {
114115
}
115116
}
116117

118+
@Override
119+
public MultiMap newHeaders() {
120+
return io.vertx.core.http.HttpHeaders.headers();
121+
}
122+
123+
@Override
124+
public boolean supportsSendFile() {
125+
return true;
126+
}
127+
117128
TracingPolicy tracingPolicy() {
118129
return tracingPolicy;
119130
}

vertx-core/src/main/java/io/vertx/core/http/impl/Http2UpgradeClientConnection.java

Lines changed: 25 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
*/
1111
package io.vertx.core.http.impl;
1212

13-
import io.netty.buffer.ByteBuf;
1413
import io.netty.channel.Channel;
1514
import io.netty.channel.ChannelHandlerContext;
1615
import io.netty.channel.ChannelPipeline;
@@ -23,7 +22,6 @@
2322
import io.vertx.core.buffer.Buffer;
2423
import io.vertx.core.http.*;
2524
import io.vertx.core.http.HttpVersion;
26-
import io.vertx.core.http.impl.http2.Http2ClientPush;
2725
import io.vertx.core.internal.ContextInternal;
2826
import io.vertx.core.internal.PromiseInternal;
2927
import io.vertx.core.internal.logging.Logger;
@@ -147,17 +145,17 @@ public ContextInternal context() {
147145
}
148146

149147
@Override
150-
public Future<Void> writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, boolean connect) {
148+
public Future<Void> writeHead(HttpRequestHead request, boolean chunked, Buffer buf, boolean end, StreamPriority priority, boolean connect) {
151149
return delegate.writeHead(request, chunked, buf, end, priority, connect);
152150
}
153151

154152
@Override
155-
public Future<Void> write(ByteBuf buf, boolean end) {
156-
return delegate.write(buf, end);
153+
public Future<Void> writeChunk(Buffer buf, boolean end) {
154+
return delegate.writeChunk(buf, end);
157155
}
158156

159157
@Override
160-
public Future<Void> writeFrame(int type, int flags, ByteBuf payload) {
158+
public Future<Void> writeFrame(int type, int flags, Buffer payload) {
161159
return delegate.writeFrame(type, flags, payload);
162160
}
163161

@@ -174,7 +172,7 @@ public HttpClientStream earlyHintsHandler(Handler<MultiMap> handler) {
174172
}
175173

176174
@Override
177-
public HttpClientStream pushHandler(Handler<Http2ClientPush> handler) {
175+
public HttpClientStream pushHandler(Handler<HttpClientPush> handler) {
178176
delegate.pushHandler(handler);
179177
return this;
180178
}
@@ -186,8 +184,8 @@ public HttpClientStream customFrameHandler(Handler<HttpFrame> handler) {
186184
}
187185

188186
@Override
189-
public HttpClientStream headersHandler(Handler<HttpResponseHead> handler) {
190-
delegate.headersHandler(handler);
187+
public HttpClientStream headHandler(Handler<HttpResponseHead> handler) {
188+
delegate.headHandler(handler);
191189
return this;
192190
}
193191

@@ -281,7 +279,7 @@ private static class UpgradingStream implements HttpClientStream {
281279

282280
void handleUpgrade(HttpClientConnection conn, HttpClientStream stream) {
283281
upgradedStream = stream;
284-
upgradedStream.headersHandler(headHandler);
282+
upgradedStream.headHandler(headHandler);
285283
upgradedStream.dataHandler(chunkHandler);
286284
upgradedStream.trailersHandler(trailersHandler);
287285
upgradedStream.priorityChangeHandler(priorityHandler);
@@ -293,7 +291,7 @@ void handleUpgrade(HttpClientConnection conn, HttpClientStream stream) {
293291
upgradedStream.pushHandler(pushHandler);
294292
upgradedStream.customFrameHandler(unknownFrameHandler);
295293
upgradedStream.closeHandler(closeHandler);
296-
upgradingStream.headersHandler(null);
294+
upgradingStream.headHandler(null);
297295
upgradingStream.dataHandler(null);
298296
upgradingStream.trailersHandler(null);
299297
upgradingStream.priorityChangeHandler(null);
@@ -352,7 +350,7 @@ void handleUpgrade(HttpClientConnection conn, HttpClientStream stream) {
352350
private Handler<Void> drainHandler;
353351
private Handler<Void> continueHandler;
354352
private Handler<MultiMap> earlyHintsHandler;
355-
private Handler<Http2ClientPush> pushHandler;
353+
private Handler<HttpClientPush> pushHandler;
356354
private Handler<HttpFrame> unknownFrameHandler;
357355
private Handler<Void> closeHandler;
358356

@@ -373,11 +371,11 @@ public HttpClientConnection connection() {
373371
*/
374372
@Override
375373
public Future<Void> writeHead(HttpRequestHead request,
376-
boolean chunked,
377-
ByteBuf buf,
378-
boolean end,
379-
StreamPriority priority,
380-
boolean connect) {
374+
boolean chunked,
375+
Buffer buf,
376+
boolean end,
377+
StreamPriority priority,
378+
boolean connect) {
381379
UpgradeResult blah = new UpgradeResult() {
382380
@Override
383381
public void upgradeAccepted(HttpClientConnection connection, HttpClientStream upgradedStream) {
@@ -408,7 +406,7 @@ public void upgradeFailure(Throwable cause) {
408406

409407
private void writeHead(HttpRequestHead head,
410408
boolean chunked,
411-
ByteBuf buf,
409+
Buffer buf,
412410
boolean end,
413411
StreamPriority priority,
414412
boolean connect,
@@ -477,7 +475,7 @@ public HttpClientStream earlyHintsHandler(Handler<MultiMap> handler) {
477475
}
478476

479477
@Override
480-
public HttpClientStream pushHandler(Handler<Http2ClientPush> handler) {
478+
public HttpClientStream pushHandler(Handler<HttpClientPush> handler) {
481479
if (upgradedStream != null) {
482480
upgradedStream.pushHandler(handler);
483481
} else {
@@ -532,11 +530,11 @@ public UpgradingStream exceptionHandler(Handler<Throwable> handler) {
532530
}
533531

534532
@Override
535-
public HttpClientStream headersHandler(Handler<HttpResponseHead> handler) {
533+
public HttpClientStream headHandler(Handler<HttpResponseHead> handler) {
536534
if (upgradedStream != null) {
537-
upgradedStream.headersHandler(handler);
535+
upgradedStream.headHandler(handler);
538536
} else {
539-
upgradingStream.headersHandler(handler);
537+
upgradingStream.headHandler(handler);
540538
headHandler = handler;
541539
}
542540
return this;
@@ -606,10 +604,10 @@ public boolean isWritable() {
606604
}
607605

608606
@Override
609-
public Future<Void> write(ByteBuf buf, boolean end) {
607+
public Future<Void> writeChunk(Buffer buf, boolean end) {
610608
EventExecutor exec = upgradingConnection.channelHandlerContext().executor();
611609
if (exec.inEventLoop()) {
612-
Future<Void> future = upgradingStream.write(buf, end);
610+
Future<Void> future = upgradingStream.writeChunk(buf, end);
613611
if (end) {
614612
ChannelPipeline pipeline = upgradingConnection.channelHandlerContext().pipeline();
615613
future = future.andThen(ar -> {
@@ -622,15 +620,15 @@ public Future<Void> write(ByteBuf buf, boolean end) {
622620
} else {
623621
Promise<Void> promise = upgradingStream.context().promise();
624622
exec.execute(() -> {
625-
Future<Void> future = write(buf, end);
623+
Future<Void> future = writeChunk(buf, end);
626624
future.onComplete(promise);
627625
});
628626
return promise.future();
629627
}
630628
}
631629

632630
@Override
633-
public Future<Void> writeFrame(int type, int flags, ByteBuf payload) {
631+
public Future<Void> writeFrame(int type, int flags, Buffer payload) {
634632
if (upgradedStream != null) {
635633
return upgradedStream.writeFrame(type, flags, payload);
636634
} else {
@@ -903,7 +901,7 @@ public interface Http2ChannelUpgrade {
903901

904902
void upgrade(HttpClientStream upgradingStream,
905903
HttpRequestHead request,
906-
ByteBuf content,
904+
Buffer content,
907905
boolean end,
908906
Channel channel,
909907
boolean pooled,

vertx-core/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ private void http1xConnected(HttpVersion version,
259259
conn2.createStream(conn.context()).onComplete(ar -> {
260260
if (ar.succeeded()) {
261261
HttpClientStream stream = ar.result();
262-
stream.headersHandler(resp -> {
262+
stream.headHandler(resp -> {
263263
Http2UpgradeClientConnection connection = (Http2UpgradeClientConnection) stream.connection();
264264
HttpClientConnection unwrap = connection.unwrap();
265265
future.tryComplete(unwrap);

vertx-core/src/main/java/io/vertx/core/http/impl/http2/Http2ClientPush.java renamed to vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientPush.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,40 +8,39 @@
88
*
99
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
1010
*/
11-
package io.vertx.core.http.impl.http2;
11+
package io.vertx.core.http.impl;
1212

1313
import io.vertx.core.MultiMap;
1414
import io.vertx.core.http.HttpMethod;
15-
import io.vertx.core.http.impl.HttpClientStream;
1615
import io.vertx.core.net.HostAndPort;
1716
import io.vertx.core.net.SocketAddress;
1817
import io.vertx.core.spi.observability.HttpRequest;
1918

2019
/**
2120
* @author <a href="mailto:[email protected]">Julien Viet</a>
2221
*/
23-
public class Http2ClientPush implements HttpRequest {
22+
public class HttpClientPush implements HttpRequest {
2423

2524
private final String uri;
2625
private final HttpMethod method;
2726
private final HostAndPort authority;
2827
private final HttpClientStream stream;
2928
private final MultiMap headers;
3029

31-
Http2ClientPush(Http2HeadersMultiMap headers, HttpClientStream stream) {
30+
public HttpClientPush(HttpRequestHead head, HttpClientStream stream) {
3231

33-
String rawMethod = headers.method().toString();
34-
String authority = headers.authority() != null ? headers.authority().toString() : null;
32+
String rawMethod = head.method().toString();
33+
String authority = head.authority != null ? head.authority.toString() : null;
3534
int pos = authority == null ? -1 : authority.indexOf(':');
3635
if (pos == -1) {
3736
this.authority = HostAndPort.create(authority, 80);
3837
} else {
3938
this.authority = HostAndPort.create(authority.substring(0, pos), Integer.parseInt(authority.substring(pos + 1)));
4039
}
4140
this.method = HttpMethod.valueOf(rawMethod);
42-
this.uri = headers.path().toString();
41+
this.uri = head.uri;
4342
this.stream = stream;
44-
this.headers = headers;
43+
this.headers = head.headers;
4544
}
4645

4746
public HttpClientStream stream() {
@@ -77,4 +76,5 @@ public String uri() {
7776
public HttpMethod method() {
7877
return method;
7978
}
79+
8080
}

0 commit comments

Comments
 (0)