Skip to content

Commit 4b2d777

Browse files
committed
8335181: Incorrect handling of HTTP/2 GOAWAY frames in HttpClient
Reviewed-by: rschmelter, abakhtin Backport-of: 209939a90e9b804da4c37ebd978b088cb8f41ead
1 parent 01d92b5 commit 4b2d777

File tree

12 files changed

+625
-54
lines changed

12 files changed

+625
-54
lines changed

src/java.net.http/share/classes/jdk/internal/net/http/ExchangeImpl.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -58,6 +58,10 @@ abstract class ExchangeImpl<T> {
5858

5959
final Exchange<T> exchange;
6060

61+
// this will be set to true only when the peer explicitly states (through a GOAWAY frame or
62+
// a relevant error code in reset frame) that the corresponding stream (id) wasn't processed
63+
private volatile boolean unprocessedByPeer;
64+
6165
ExchangeImpl(Exchange<T> e) {
6266
// e == null means a http/2 pushed stream
6367
this.exchange = e;
@@ -264,4 +268,13 @@ void upgraded() { }
264268
// Called when server returns non 100 response to
265269
// an Expect-Continue
266270
void expectContinueFailed(int rcode) { }
271+
272+
final boolean isUnprocessedByPeer() {
273+
return this.unprocessedByPeer;
274+
}
275+
276+
// Marks the exchange as unprocessed by the peer
277+
final void markUnprocessedByPeer() {
278+
this.unprocessedByPeer = true;
279+
}
267280
}

src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.concurrent.ConcurrentLinkedQueue;
4949
import java.util.concurrent.Flow;
5050
import java.util.concurrent.atomic.AtomicInteger;
51+
import java.util.concurrent.atomic.AtomicLong;
5152
import java.util.concurrent.atomic.AtomicReference;
5253
import java.util.function.Function;
5354
import java.util.function.Supplier;
@@ -317,6 +318,7 @@ private record PushContinuationState(PushPromiseDecoder pushContDecoder, PushPro
317318
private final String key; // for HttpClientImpl.connections map
318319
private final FramesDecoder framesDecoder;
319320
private final FramesEncoder framesEncoder = new FramesEncoder();
321+
private final AtomicLong lastProcessedStreamInGoAway = new AtomicLong(-1);
320322

321323
/**
322324
* Send Window controller for both connection and stream windows.
@@ -709,7 +711,9 @@ final int maxConcurrentServerInitiatedStreams() {
709711

710712
void close() {
711713
if (markHalfClosedLocal()) {
712-
if (connection.channel().isOpen()) {
714+
// we send a GOAWAY frame only if the remote side hasn't already indicated
715+
// the intention to close the connection by previously sending a GOAWAY of its own
716+
if (connection.channel().isOpen() && !isMarked(closedState, HALF_CLOSED_REMOTE)) {
713717
Log.logTrace("Closing HTTP/2 connection: to {0}", connection.address());
714718
GoAwayFrame f = new GoAwayFrame(0,
715719
ErrorFrame.NO_ERROR,
@@ -1232,13 +1236,46 @@ private void handlePing(PingFrame frame)
12321236
sendUnorderedFrame(frame);
12331237
}
12341238

1235-
private void handleGoAway(GoAwayFrame frame)
1236-
throws IOException
1237-
{
1238-
if (markHalfClosedLRemote()) {
1239-
shutdown(new IOException(
1240-
connection.channel().getLocalAddress()
1241-
+ ": GOAWAY received"));
1239+
private void handleGoAway(final GoAwayFrame frame) {
1240+
final long lastProcessedStream = frame.getLastStream();
1241+
assert lastProcessedStream >= 0 : "unexpected last stream id: "
1242+
+ lastProcessedStream + " in GOAWAY frame";
1243+
1244+
markHalfClosedRemote();
1245+
setFinalStream(); // don't allow any new streams on this connection
1246+
if (debug.on()) {
1247+
debug.log("processing incoming GOAWAY with last processed stream id:%s in frame %s",
1248+
lastProcessedStream, frame);
1249+
}
1250+
// see if this connection has previously received a GOAWAY from the peer and if yes
1251+
// then check if this new last processed stream id is lesser than the previous
1252+
// known last processed stream id. Only update the last processed stream id if the new
1253+
// one is lesser than the previous one.
1254+
long prevLastProcessed = lastProcessedStreamInGoAway.get();
1255+
while (prevLastProcessed == -1 || lastProcessedStream < prevLastProcessed) {
1256+
if (lastProcessedStreamInGoAway.compareAndSet(prevLastProcessed,
1257+
lastProcessedStream)) {
1258+
break;
1259+
}
1260+
prevLastProcessed = lastProcessedStreamInGoAway.get();
1261+
}
1262+
handlePeerUnprocessedStreams(lastProcessedStreamInGoAway.get());
1263+
}
1264+
1265+
private void handlePeerUnprocessedStreams(final long lastProcessedStream) {
1266+
final AtomicInteger numClosed = new AtomicInteger(); // atomic merely to allow usage within lambda
1267+
streams.forEach((id, exchange) -> {
1268+
if (id > lastProcessedStream) {
1269+
// any streams with an stream id higher than the last processed stream
1270+
// can be retried (on a new connection). we close the exchange as unprocessed
1271+
// to facilitate the retrying.
1272+
client2.client().theExecutor().ensureExecutedAsync(exchange::closeAsUnprocessed);
1273+
numClosed.incrementAndGet();
1274+
}
1275+
});
1276+
if (debug.on()) {
1277+
debug.log(numClosed.get() + " stream(s), with id greater than " + lastProcessedStream
1278+
+ ", will be closed as unprocessed");
12421279
}
12431280
}
12441281

@@ -1716,7 +1753,7 @@ private boolean markHalfClosedLocal() {
17161753
return markClosedState(HALF_CLOSED_LOCAL);
17171754
}
17181755

1719-
private boolean markHalfClosedLRemote() {
1756+
private boolean markHalfClosedRemote() {
17201757
return markClosedState(HALF_CLOSED_REMOTE);
17211758
}
17221759

src/java.net.http/share/classes/jdk/internal/net/http/MultiExchange.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -90,7 +90,7 @@ class MultiExchange<T> implements Cancelable {
9090
Exchange<T> exchange; // the current exchange
9191
Exchange<T> previous;
9292
volatile Throwable retryCause;
93-
volatile boolean expiredOnce;
93+
volatile boolean retriedOnce;
9494
volatile HttpResponse<T> response;
9595

9696
// Maximum number of times a request will be retried/redirected
@@ -453,7 +453,7 @@ private CompletableFuture<Response> responseAsyncImpl() {
453453
return exch.ignoreBody().handle((r,t) -> {
454454
previousreq = currentreq;
455455
currentreq = newrequest;
456-
expiredOnce = false;
456+
retriedOnce = false;
457457
setExchange(new Exchange<>(currentreq, this, acc));
458458
return responseAsyncImpl();
459459
}).thenCompose(Function.identity());
@@ -466,7 +466,7 @@ private CompletableFuture<Response> responseAsyncImpl() {
466466
return completedFuture(response);
467467
}
468468
// all exceptions thrown are handled here
469-
CompletableFuture<Response> errorCF = getExceptionalCF(ex);
469+
CompletableFuture<Response> errorCF = getExceptionalCF(ex, exch.exchImpl);
470470
if (errorCF == null) {
471471
return responseAsyncImpl();
472472
} else {
@@ -538,34 +538,38 @@ private Throwable retryCause(Throwable t) {
538538
* Takes a Throwable and returns a suitable CompletableFuture that is
539539
* completed exceptionally, or null.
540540
*/
541-
private CompletableFuture<Response> getExceptionalCF(Throwable t) {
541+
private CompletableFuture<Response> getExceptionalCF(Throwable t, ExchangeImpl<?> exchImpl) {
542542
if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {
543543
if (t.getCause() != null) {
544544
t = t.getCause();
545545
}
546546
}
547+
final boolean retryAsUnprocessed = exchImpl != null && exchImpl.isUnprocessedByPeer();
547548
if (cancelled && !requestCancelled() && t instanceof IOException) {
548549
if (!(t instanceof HttpTimeoutException)) {
549550
t = toTimeoutException((IOException)t);
550551
}
551-
} else if (retryOnFailure(t)) {
552+
} else if (retryAsUnprocessed || retryOnFailure(t)) {
552553
Throwable cause = retryCause(t);
553554

554555
if (!(t instanceof ConnectException)) {
555556
// we may need to start a new connection, and if so
556557
// we want to start with a fresh connect timeout again.
557558
if (connectTimeout != null) connectTimeout.reset();
558-
if (!canRetryRequest(currentreq)) {
559-
return failedFuture(cause); // fails with original cause
559+
if (!retryAsUnprocessed && !canRetryRequest(currentreq)) {
560+
// a (peer) processed request which cannot be retried, fail with
561+
// the original cause
562+
return failedFuture(cause);
560563
}
561564
} // ConnectException: retry, but don't reset the connectTimeout.
562565

563566
// allow the retry mechanism to do its work
564567
retryCause = cause;
565-
if (!expiredOnce) {
568+
if (!retriedOnce) {
566569
if (debug.on())
567-
debug.log(t.getClass().getSimpleName() + " (async): retrying...", t);
568-
expiredOnce = true;
570+
debug.log(t.getClass().getSimpleName()
571+
+ " (async): retrying " + currentreq + " due to: ", t);
572+
retriedOnce = true;
569573
// The connection was abruptly closed.
570574
// We return null to retry the same request a second time.
571575
// The request filters have already been applied to the
@@ -576,7 +580,7 @@ private CompletableFuture<Response> getExceptionalCF(Throwable t) {
576580
} else {
577581
if (debug.on()) {
578582
debug.log(t.getClass().getSimpleName()
579-
+ " (async): already retried once.", t);
583+
+ " (async): already retried once " + currentreq, t);
580584
}
581585
t = cause;
582586
}

src/java.net.http/share/classes/jdk/internal/net/http/Stream.java

Lines changed: 57 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -636,20 +636,39 @@ void handleReset(ResetFrame frame, Flow.Subscriber<?> subscriber) {
636636
closed = true;
637637
}
638638
try {
639-
int error = frame.getErrorCode();
640-
IOException e = new IOException("Received RST_STREAM: "
641-
+ ErrorFrame.stringForCode(error));
642-
if (errorRef.compareAndSet(null, e)) {
643-
if (subscriber != null) {
644-
subscriber.onError(e);
639+
final int error = frame.getErrorCode();
640+
// A REFUSED_STREAM error code implies that the stream wasn't processed by the
641+
// peer and the client is free to retry the request afresh.
642+
if (error == ErrorFrame.REFUSED_STREAM) {
643+
// Here we arrange for the request to be retried. Note that we don't call
644+
// closeAsUnprocessed() method here because the "closed" state is already set
645+
// to true a few lines above and calling close() from within
646+
// closeAsUnprocessed() will end up being a no-op. We instead do the additional
647+
// bookkeeping here.
648+
markUnprocessedByPeer();
649+
errorRef.compareAndSet(null, new IOException("request not processed by peer"));
650+
if (debug.on()) {
651+
debug.log("request unprocessed by peer (REFUSED_STREAM) " + this.request);
652+
}
653+
} else {
654+
final String reason = ErrorFrame.stringForCode(error);
655+
final IOException failureCause = new IOException("Received RST_STREAM: " + reason);
656+
if (debug.on()) {
657+
debug.log(streamid + " received RST_STREAM with code: " + reason);
658+
}
659+
if (errorRef.compareAndSet(null, failureCause)) {
660+
if (subscriber != null) {
661+
subscriber.onError(failureCause);
662+
}
645663
}
646664
}
647-
completeResponseExceptionally(e);
665+
final Throwable failureCause = errorRef.get();
666+
completeResponseExceptionally(failureCause);
648667
if (!requestBodyCF.isDone()) {
649-
requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body..
668+
requestBodyCF.completeExceptionally(failureCause); // we may be sending the body..
650669
}
651670
if (responseBodyCF != null) {
652-
responseBodyCF.completeExceptionally(errorRef.get());
671+
responseBodyCF.completeExceptionally(failureCause);
653672
}
654673
} finally {
655674
connection.decrementStreamsCount(streamid);
@@ -1632,7 +1651,35 @@ synchronized Throwable getCancelCause() {
16321651
}
16331652

16341653
final String dbgString() {
1635-
return connection.dbgString() + "/Stream("+streamid+")";
1654+
final int id = streamid;
1655+
final String sid = id == 0 ? "?" : String.valueOf(id);
1656+
return connection.dbgString() + "/Stream(" + sid + ")";
1657+
}
1658+
1659+
/**
1660+
* An unprocessed exchange is one that hasn't been processed by a peer. The local end of the
1661+
* connection would be notified about such exchanges when it receives a GOAWAY frame with
1662+
* a stream id that tells which exchanges have been unprocessed.
1663+
* This method is called on such unprocessed exchanges and the implementation of this method
1664+
* will arrange for the request, corresponding to this exchange, to be retried afresh on a
1665+
* new connection.
1666+
*/
1667+
void closeAsUnprocessed() {
1668+
try {
1669+
// We arrange for the request to be retried on a new connection as allowed by the RFC-9113
1670+
markUnprocessedByPeer();
1671+
this.errorRef.compareAndSet(null, new IOException("request not processed by peer"));
1672+
if (debug.on()) {
1673+
debug.log("closing " + this.request + " as unprocessed by peer");
1674+
}
1675+
// close the exchange and complete the response CF exceptionally
1676+
close();
1677+
completeResponseExceptionally(this.errorRef.get());
1678+
} finally {
1679+
// decrementStreamsCount isn't really needed but we do it to make sure
1680+
// the log messages, where these counts/states get reported, show the accurate state.
1681+
connection.decrementStreamsCount(streamid);
1682+
}
16361683
}
16371684

16381685
private class HeadersConsumer extends ValidatingHeadersConsumer implements DecodingCallback {

src/java.net.http/share/classes/jdk/internal/net/http/WindowController.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2016, 2024, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -100,13 +100,16 @@ void removeStream(int streamid) {
100100
controllerLock.lock();
101101
try {
102102
Integer old = streams.remove(streamid);
103-
// Odd stream numbers (client streams) should have been registered.
103+
// A client initiated stream might be closed (as unprocessed, due to a
104+
// GOAWAY received on the connection) even before the stream is
105+
// registered with this WindowController instance (when sending out request headers).
106+
// Thus, for client initiated streams, we don't enforce the presence of the
107+
// stream in the registered "streams" map.
108+
104109
// Even stream numbers (server streams - aka Push Streams) should
105110
// not be registered
106111
final boolean isClientStream = (streamid & 0x1) == 1;
107-
if (old == null && isClientStream) {
108-
throw new InternalError("Expected entry for streamid: " + streamid);
109-
} else if (old != null && !isClientStream) {
112+
if (old != null && !isClientStream) {
110113
throw new InternalError("Unexpected entry for streamid: " + streamid);
111114
}
112115
} finally {

src/java.net.http/share/classes/jdk/internal/net/http/frame/GoAwayFrame.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -57,7 +57,9 @@ int length() {
5757

5858
@Override
5959
public String toString() {
60-
return super.toString() + " Debugdata: " + new String(debugData, UTF_8);
60+
return super.toString()
61+
+ " lastStreamId=" + lastStream
62+
+ ", Debugdata: " + new String(debugData, UTF_8);
6163
}
6264

6365
public int getLastStream() {

0 commit comments

Comments
 (0)