Skip to content

Commit 7c2b0a6

Browse files
committed
Simplify HTTP/2 framework.
Motivation: The introduction of the HTTP/2 multiplex implementation has introduced a level of indirection between the HTTP client implementation and the internal stream API. The internal stream API is implemented by the HTTP/2 framework client stream in order to adapt one to the other. Changes: Remove the un-necessary indirection and let Http2ClientStream implement the internal HttpClientStream API.
1 parent ea580e0 commit 7c2b0a6

24 files changed

+481
-782
lines changed

vertx-core/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java

Lines changed: 19 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,6 @@ private static class StreamImpl extends Stream implements HttpClientStream {
426426
private Handler<HttpResponseHead> headHandler;
427427
private Handler<Buffer> chunkHandler;
428428
private Handler<MultiMap> trailerHandler;
429-
private Handler<Void> endHandler;
430429
private Handler<Void> drainHandler;
431430
private Handler<Void> continueHandler;
432431

@@ -450,13 +449,9 @@ protected void handlePause() {
450449
@Override
451450
protected void handleMessage(Object item) {
452451
if (item instanceof MultiMap) {
453-
Handler<MultiMap> handler1 = trailerHandler;
454-
if (handler1 != null) {
455-
context.dispatch((MultiMap) item, handler1);
456-
}
457-
Handler<Void> handler2 = endHandler;
458-
if (handler2 != null) {
459-
context.dispatch(null, handler2);
452+
Handler<MultiMap> handler = trailerHandler;
453+
if (handler != null) {
454+
context.dispatch((MultiMap) item, handler);
460455
}
461456
} else {
462457
Buffer buffer = (Buffer) item;
@@ -500,24 +495,29 @@ public HttpClientStream setWriteQueueMaxSize(int maxSize) {
500495
}
501496

502497
@Override
503-
public boolean writeQueueFull() {
504-
return conn.writeQueueFull();
498+
public boolean isWritable() {
499+
return !conn.writeQueueFull();
505500
}
506501

507502
@Override
508-
public HttpClientStream headHandler(Handler<HttpResponseHead> handler) {
503+
public HttpClientStream headersHandler(Handler<HttpResponseHead> handler) {
509504
this.headHandler = handler;
510505
return this;
511506
}
512507

508+
@Override
509+
public HttpClientStream resetHandler(Handler<Long> handler) {
510+
return this;
511+
}
512+
513513
@Override
514514
public HttpClientStream closeHandler(Handler<Void> handler) {
515515
closeHandler = handler;
516516
return this;
517517
}
518518

519519
@Override
520-
public HttpClientStream priorityHandler(Handler<StreamPriority> handler) {
520+
public HttpClientStream priorityChangeHandler(Handler<StreamPriority> handler) {
521521
// No op
522522
return this;
523523
}
@@ -529,7 +529,7 @@ public HttpClientStream pushHandler(Handler<Http2ClientPush> handler) {
529529
}
530530

531531
@Override
532-
public HttpClientStream unknownFrameHandler(Handler<HttpFrame> handler) {
532+
public HttpClientStream customFrameHandler(Handler<HttpFrame> handler) {
533533
// No op
534534
return this;
535535
}
@@ -593,35 +593,30 @@ public HttpClientStream pause() {
593593
return this;
594594
}
595595

596-
@Override
597-
public HttpClientStream resume() {
598-
queue.fetch(Long.MAX_VALUE);
599-
return this;
600-
}
601-
602596
@Override
603597
public HttpClientStream fetch(long amount) {
604598
queue.fetch(amount);
605599
return this;
606600
}
607601

608602
@Override
609-
public Future<Void> reset(Throwable cause) {
603+
public Future<Void> writeReset(long code) {
610604
Promise<Void> promise = context.promise();
611605
EventLoop eventLoop = conn.context.nettyEventLoop();
612606
if (eventLoop.inEventLoop()) {
613-
reset(cause, promise);
607+
reset(code, promise);
614608
} else {
615-
eventLoop.execute(() -> reset(cause, promise));
609+
eventLoop.execute(() -> reset(code, promise));
616610
}
617611
return promise.future();
618612
}
619613

620-
private void reset(Throwable cause, Promise<Void> promise) {
614+
private void reset(long code, Promise<Void> promise) {
621615
Boolean removed = conn.reset(this);
622616
if (removed == null) {
623617
promise.fail("Stream already reset");
624618
} else {
619+
Throwable cause = new StreamResetException(code);
625620
if (removed) {
626621
context.execute(cause, this::handleClosed);
627622
} else {
@@ -674,7 +669,7 @@ void handleHead(HttpResponseHead response) {
674669
}
675670

676671
@Override
677-
public HttpClientStream handler(Handler<Buffer> handler) {
672+
public HttpClientStream dataHandler(Handler<Buffer> handler) {
678673
chunkHandler = handler;
679674
return this;
680675
}
@@ -685,12 +680,6 @@ public HttpClientStream trailersHandler(Handler<MultiMap> handler) {
685680
return this;
686681
}
687682

688-
@Override
689-
public HttpClientStream endHandler(Handler<Void> handler) {
690-
endHandler = handler;
691-
return this;
692-
}
693-
694683
void handleChunk(Buffer buff) {
695684
queue.write(buff);
696685
}

0 commit comments

Comments
 (0)