Skip to content

Commit c58fbc6

Browse files
committed
HttpClientStream contract should declare Buffer types over ByteBuf
1 parent fa03336 commit c58fbc6

11 files changed

+58
-51
lines changed

vertx-core/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -566,25 +566,25 @@ public ContextInternal context() {
566566
}
567567

568568
@Override
569-
public Future<Void> writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, boolean connect) {
569+
public Future<Void> writeHead(HttpRequestHead request, boolean chunked, Buffer buf, boolean end, StreamPriority priority, boolean connect) {
570570
PromiseInternal<Void> promise = context.promise();
571-
conn.writeHead(this, request, chunked, buf, end, connect, promise);
571+
conn.writeHead(this, request, chunked, buf != null ? ((BufferInternal)buf).getByteBuf() : null, end, connect, promise);
572572
return promise.future();
573573
}
574574

575575
@Override
576-
public Future<Void> write(ByteBuf buff, boolean end) {
576+
public Future<Void> write(Buffer buff, boolean end) {
577577
if (buff != null || end) {
578578
Promise<Void> listener = context.promise();
579-
conn.writeBuffer(this, buff, end, listener);
579+
conn.writeBuffer(this, buff != null ? ((BufferInternal)buff).getByteBuf() : null, end, listener);
580580
return listener.future();
581581
} else {
582582
throw new IllegalStateException("???");
583583
}
584584
}
585585

586586
@Override
587-
public Future<Void> writeFrame(int type, int flags, ByteBuf payload) {
587+
public Future<Void> writeFrame(int type, int flags, Buffer payload) {
588588
throw new IllegalStateException("Cannot write an HTTP/2 frame over an HTTP/1.x connection");
589589
}
590590

vertx-core/src/main/java/io/vertx/core/http/impl/Http2UpgradeClientConnection.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
*/
1111
package io.vertx.core.http.impl;
1212

13-
import io.netty.buffer.ByteBuf;
1413
import io.netty.channel.Channel;
1514
import io.netty.channel.ChannelHandlerContext;
1615
import io.netty.channel.ChannelPipeline;
@@ -147,17 +146,17 @@ public ContextInternal context() {
147146
}
148147

149148
@Override
150-
public Future<Void> writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, boolean connect) {
149+
public Future<Void> writeHead(HttpRequestHead request, boolean chunked, Buffer buf, boolean end, StreamPriority priority, boolean connect) {
151150
return delegate.writeHead(request, chunked, buf, end, priority, connect);
152151
}
153152

154153
@Override
155-
public Future<Void> write(ByteBuf buf, boolean end) {
154+
public Future<Void> write(Buffer buf, boolean end) {
156155
return delegate.write(buf, end);
157156
}
158157

159158
@Override
160-
public Future<Void> writeFrame(int type, int flags, ByteBuf payload) {
159+
public Future<Void> writeFrame(int type, int flags, Buffer payload) {
161160
return delegate.writeFrame(type, flags, payload);
162161
}
163162

@@ -373,11 +372,11 @@ public HttpClientConnection connection() {
373372
*/
374373
@Override
375374
public Future<Void> writeHead(HttpRequestHead request,
376-
boolean chunked,
377-
ByteBuf buf,
378-
boolean end,
379-
StreamPriority priority,
380-
boolean connect) {
375+
boolean chunked,
376+
Buffer buf,
377+
boolean end,
378+
StreamPriority priority,
379+
boolean connect) {
381380
UpgradeResult blah = new UpgradeResult() {
382381
@Override
383382
public void upgradeAccepted(HttpClientConnection connection, HttpClientStream upgradedStream) {
@@ -408,7 +407,7 @@ public void upgradeFailure(Throwable cause) {
408407

409408
private void writeHead(HttpRequestHead head,
410409
boolean chunked,
411-
ByteBuf buf,
410+
Buffer buf,
412411
boolean end,
413412
StreamPriority priority,
414413
boolean connect,
@@ -606,7 +605,7 @@ public boolean isWritable() {
606605
}
607606

608607
@Override
609-
public Future<Void> write(ByteBuf buf, boolean end) {
608+
public Future<Void> write(Buffer buf, boolean end) {
610609
EventExecutor exec = upgradingConnection.channelHandlerContext().executor();
611610
if (exec.inEventLoop()) {
612611
Future<Void> future = upgradingStream.write(buf, end);
@@ -630,7 +629,7 @@ public Future<Void> write(ByteBuf buf, boolean end) {
630629
}
631630

632631
@Override
633-
public Future<Void> writeFrame(int type, int flags, ByteBuf payload) {
632+
public Future<Void> writeFrame(int type, int flags, Buffer payload) {
634633
if (upgradedStream != null) {
635634
return upgradedStream.writeFrame(type, flags, payload);
636635
} else {
@@ -903,7 +902,7 @@ public interface Http2ChannelUpgrade {
903902

904903
void upgrade(HttpClientStream upgradingStream,
905904
HttpRequestHead request,
906-
ByteBuf content,
905+
Buffer content,
907906
boolean end,
908907
Channel channel,
909908
boolean pooled,

vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ public Future<Void> writeCustomFrame(int type, int flags, Buffer payload) {
365365
synchronized (this) {
366366
checkEnded();
367367
}
368-
return stream.writeFrame(type, flags, ((BufferInternal)payload).getByteBuf());
368+
return stream.writeFrame(type, flags, payload);
369369
}
370370

371371
private void handleDrained(Void v) {
@@ -451,18 +451,21 @@ void handleResponse(Promise<HttpClientResponse> promise, HttpClientResponse resp
451451

452452
@Override
453453
public Future<Void> end(String chunk) {
454-
return write(BufferInternal.buffer(chunk).getByteBuf(), true);
454+
return write(BufferInternal.buffer(chunk), true);
455455
}
456456

457457
@Override
458458
public Future<Void> end(String chunk, String enc) {
459459
Objects.requireNonNull(enc, "no null encoding accepted");
460-
return write(BufferInternal.buffer(chunk, enc).getByteBuf(), true);
460+
return write(BufferInternal.buffer(chunk, enc), true);
461461
}
462462

463463
@Override
464464
public Future<Void> end(Buffer chunk) {
465-
return write(((BufferInternal)chunk).getByteBuf(), true);
465+
if (chunk == null) {
466+
throw new NullPointerException("no null chunk accepted");
467+
}
468+
return write(chunk, true);
466469
}
467470

468471
@Override
@@ -472,29 +475,31 @@ public Future<Void> end() {
472475

473476
@Override
474477
public Future<Void> write(Buffer chunk) {
475-
ByteBuf buf = ((BufferInternal)chunk).getByteBuf();
476-
return write(buf, false);
478+
if (chunk == null) {
479+
throw new NullPointerException("no null chunk accepted");
480+
}
481+
return write(chunk, false);
477482
}
478483

479484
@Override
480485
public Future<Void> write(String chunk) {
481-
return write(BufferInternal.buffer(chunk).getByteBuf(), false);
486+
return write(BufferInternal.buffer(chunk), false);
482487
}
483488

484489
@Override
485490
public Future<Void> write(String chunk, String enc) {
486491
Objects.requireNonNull(enc, "no null encoding accepted");
487-
return write(BufferInternal.buffer(chunk, enc).getByteBuf(), false);
492+
return write(BufferInternal.buffer(chunk, enc), false);
488493
}
489494

490495
private boolean requiresContentLength() {
491496
return !chunked && (headers == null || !headers.contains(CONTENT_LENGTH)) && !isConnect;
492497
}
493498

494-
private Future<Void> write(ByteBuf buff, boolean end) {
499+
private Future<Void> write(Buffer buff, boolean end) {
495500
if (end) {
496501
if (buff != null && requiresContentLength()) {
497-
headers().set(CONTENT_LENGTH, HttpUtils.positiveLongToString(buff.readableBytes()));
502+
headers().set(CONTENT_LENGTH, HttpUtils.positiveLongToString(buff.length()));
498503
}
499504
} else if (requiresContentLength()) {
500505
throw new IllegalStateException("You must set the Content-Length header to be the total size of the message "
@@ -503,7 +508,7 @@ private Future<Void> write(ByteBuf buff, boolean end) {
503508
return doWrite(buff, end, false);
504509
}
505510

506-
private Future<Void> doWrite(ByteBuf buff, boolean end, boolean connect) {
511+
private Future<Void> doWrite(Buffer buff, boolean end, boolean connect) {
507512
boolean writeHead;
508513
boolean writeEnd;
509514
synchronized (this) {

vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientResponseImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,15 +87,15 @@ public WriteStream<Buffer> exceptionHandler(@Nullable Handler<Throwable> handler
8787
}
8888
@Override
8989
public Future<Void> write(Buffer data) {
90-
return stream.write(((BufferInternal)data).getByteBuf(), false);
90+
return stream.write(data, false);
9191
}
9292
@Override
9393
public Future<Void> end(Buffer data) {
94-
return stream.write(((BufferInternal)data).getByteBuf(), true);
94+
return stream.write(data, true);
9595
}
9696
@Override
9797
public Future<Void> end() {
98-
return stream.write(Unpooled.EMPTY_BUFFER, true);
98+
return stream.write(BufferInternal.buffer(Unpooled.EMPTY_BUFFER), true);
9999
}
100100
@Override
101101
public WriteStream<Buffer> setWriteQueueMaxSize(int maxSize) {

vertx-core/src/main/java/io/vertx/core/http/impl/HttpClientStream.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
package io.vertx.core.http.impl;
1313

14-
import io.netty.buffer.ByteBuf;
1514
import io.vertx.core.Future;
1615
import io.vertx.core.Handler;
1716
import io.vertx.core.MultiMap;
@@ -43,9 +42,9 @@ public interface HttpClientStream {
4342
HttpClientConnection connection();
4443
ContextInternal context();
4544

46-
Future<Void> writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, boolean connect);
47-
Future<Void> write(ByteBuf buf, boolean end);
48-
Future<Void> writeFrame(int type, int flags, ByteBuf payload);
45+
Future<Void> writeHead(HttpRequestHead request, boolean chunked, Buffer buf, boolean end, StreamPriority priority, boolean connect);
46+
Future<Void> write(Buffer buf, boolean end);
47+
Future<Void> writeFrame(int type, int flags, Buffer payload);
4948
Future<Void> writeReset(long code);
5049

5150
HttpClientStream resetHandler(Handler<Long> handler);

vertx-core/src/main/java/io/vertx/core/http/impl/StatisticsGatheringHttpClientStream.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
*/
1111
package io.vertx.core.http.impl;
1212

13-
import io.netty.buffer.ByteBuf;
1413
import io.vertx.codegen.annotations.Fluent;
1514
import io.vertx.codegen.annotations.Nullable;
1615
import io.vertx.core.Future;
@@ -70,7 +69,7 @@ public ContextInternal context() {
7069
}
7170

7271
@Override
73-
public Future<Void> writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, boolean connect) {
72+
public Future<Void> writeHead(HttpRequestHead request, boolean chunked, Buffer buf, boolean end, StreamPriority priority, boolean connect) {
7473
endpointRequest.reportRequestBegin();
7574
if (end) {
7675
endpointRequest.reportRequestEnd();
@@ -79,15 +78,15 @@ public Future<Void> writeHead(HttpRequestHead request, boolean chunked, ByteBuf
7978
}
8079

8180
@Override
82-
public Future<Void> write(ByteBuf buf, boolean end) {
81+
public Future<Void> write(Buffer buf, boolean end) {
8382
if (end) {
8483
endpointRequest.reportRequestEnd();
8584
}
8685
return delegate.write(buf, end);
8786
}
8887

8988
@Override
90-
public Future<Void> writeFrame(int type, int flags, ByteBuf payload) {
89+
public Future<Void> writeFrame(int type, int flags, Buffer payload) {
9190
return delegate.writeFrame(type, flags, payload);
9291
}
9392

vertx-core/src/main/java/io/vertx/core/http/impl/http2/Http2ClientStream.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.vertx.core.Handler;
1919
import io.vertx.core.MultiMap;
2020
import io.vertx.core.Promise;
21+
import io.vertx.core.buffer.Buffer;
2122
import io.vertx.core.http.HttpConnection;
2223
import io.vertx.core.http.HttpHeaders;
2324
import io.vertx.core.http.HttpMethod;
@@ -27,6 +28,7 @@
2728
import io.vertx.core.http.impl.headers.HeadersMultiMap;
2829
import io.vertx.core.internal.ContextInternal;
2930
import io.vertx.core.internal.PromiseInternal;
31+
import io.vertx.core.internal.buffer.BufferInternal;
3032
import io.vertx.core.net.impl.MessageWrite;
3133
import io.vertx.core.spi.metrics.ClientMetrics;
3234
import io.vertx.core.spi.tracing.SpanKind;
@@ -83,17 +85,17 @@ public void upgrade(Object metric, Object trace) {
8385
}
8486

8587
@Override
86-
public Future<Void> writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, boolean connect) {
88+
public Future<Void> writeHead(HttpRequestHead request, boolean chunked, Buffer buf, boolean end, StreamPriority priority, boolean connect) {
8789
PromiseInternal<Void> promise = context.promise();
8890
priority(priority);
89-
write(new HeadersWrite(request, buf, end, promise));
91+
write(new HeadersWrite(request, buf != null ? ((BufferInternal)buf).getByteBuf() : null, end, promise));
9092
return promise.future();
9193
}
9294

9395
@Override
94-
public Future<Void> write(ByteBuf buf, boolean end) {
96+
public Future<Void> write(Buffer buf, boolean end) {
9597
Promise<Void> promise = context.promise();
96-
writeData(buf, end, promise);
98+
writeData(buf != null ? ((BufferInternal)buf).getByteBuf() : null, end, promise);
9799
return promise.future();
98100
}
99101

vertx-core/src/main/java/io/vertx/core/http/impl/http2/Http2ServerResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,7 @@ public Future<Void> writeCustomFrame(int type, int flags, Buffer payload) {
489489
checkValid();
490490
checkSendHeaders(false);
491491
}
492-
return stream.writeFrame(type, flags, ((BufferInternal)payload).getByteBuf());
492+
return stream.writeFrame(type, flags, payload);
493493
}
494494

495495
private void checkValid() {

vertx-core/src/main/java/io/vertx/core/http/impl/http2/Http2StreamBase.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import io.netty.buffer.Unpooled;
1616
import io.netty.channel.EventLoop;
1717
import io.netty.handler.codec.http2.EmptyHttp2Headers;
18-
import io.netty.handler.codec.http2.Http2Stream;
1918
import io.netty.handler.stream.ChunkedInput;
2019
import io.vertx.core.Future;
2120
import io.vertx.core.Handler;
@@ -28,6 +27,7 @@
2827
import io.vertx.core.http.impl.HttpUtils;
2928
import io.vertx.core.internal.ContextInternal;
3029
import io.vertx.core.internal.VertxInternal;
30+
import io.vertx.core.internal.buffer.BufferInternal;
3131
import io.vertx.core.internal.concurrent.InboundMessageQueue;
3232
import io.vertx.core.internal.concurrent.OutboundMessageQueue;
3333
import io.vertx.core.net.impl.MessageWrite;
@@ -264,13 +264,14 @@ public final S fetch(long amount) {
264264
return (S)this;
265265
}
266266

267-
public final Future<Void> writeFrame(int type, int flags, ByteBuf payload) {
267+
public final Future<Void> writeFrame(int type, int flags, Buffer payload) {
268268
Promise<Void> promise = context.promise();
269269
EventLoop eventLoop = connection.context().nettyEventLoop();
270+
ByteBuf byteBuf = ((BufferInternal) payload).getByteBuf();
270271
if (eventLoop.inEventLoop()) {
271-
connection.writeFrame(id, type, flags, payload, promise);
272+
connection.writeFrame(id, type, flags, byteBuf, promise);
272273
} else {
273-
eventLoop.execute(() -> connection.writeFrame(id, type, flags, payload, promise));
274+
eventLoop.execute(() -> connection.writeFrame(id, type, flags, byteBuf, promise));
274275
}
275276
return promise.future();
276277
}

vertx-core/src/main/java/io/vertx/core/http/impl/http2/codec/Http2CodecClientChannelInitializer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import io.netty.handler.codec.http.HttpResponseStatus;
1616
import io.netty.util.ReferenceCountUtil;
1717
import io.vertx.core.Promise;
18+
import io.vertx.core.buffer.Buffer;
1819
import io.vertx.core.http.impl.Http1xClientConnection;
1920
import io.vertx.core.http.impl.Http2UpgradeClientConnection;
2021
import io.vertx.core.http.impl.HttpClientBase;
@@ -106,7 +107,7 @@ public CodecChannelUpgrade(HttpClientBase client,
106107
}
107108

108109
public void upgrade(HttpClientStream upgradingStream, HttpRequestHead request,
109-
ByteBuf content,
110+
Buffer content,
110111
boolean end,
111112
Channel channel,
112113
boolean pooled,

0 commit comments

Comments
 (0)