Skip to content

Commit 6a87af8

Browse files
committed
8293786: HttpClient will not send more than 64 kb of data from the 2nd request in http2
Backport-of: ad90fb6da38da066dfc7a5439196887bbcda766f
1 parent 9221440 commit 6a87af8

File tree

6 files changed

+233
-17
lines changed

6 files changed

+233
-17
lines changed

src/java.net.http/share/classes/jdk/internal/net/http/Stream.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -190,9 +190,9 @@ private void schedule() {
190190
}
191191
while (!inputQ.isEmpty() && errorRef.get() == null) {
192192
Http2Frame frame = inputQ.peek();
193-
if (frame instanceof ResetFrame) {
193+
if (frame instanceof ResetFrame rf) {
194194
inputQ.remove();
195-
handleReset((ResetFrame)frame, subscriber);
195+
handleReset(rf, subscriber);
196196
return;
197197
}
198198
DataFrame df = (DataFrame)frame;
@@ -472,24 +472,23 @@ private boolean checkRequestCancelled() {
472472
void incoming(Http2Frame frame) throws IOException {
473473
if (debug.on()) debug.log("incoming: %s", frame);
474474
var cancelled = checkRequestCancelled() || closed;
475-
if ((frame instanceof HeaderFrame)) {
476-
HeaderFrame hframe = (HeaderFrame) frame;
477-
if (hframe.endHeaders()) {
475+
if ((frame instanceof HeaderFrame hf)) {
476+
if (hf.endHeaders()) {
478477
Log.logTrace("handling response (streamid={0})", streamid);
479478
handleResponse();
480479
}
481-
if (hframe.getFlag(HeaderFrame.END_STREAM)) {
480+
if (hf.getFlag(HeaderFrame.END_STREAM)) {
482481
if (debug.on()) debug.log("handling END_STREAM: %d", streamid);
483482
receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of()));
484483
}
485-
} else if (frame instanceof DataFrame) {
484+
} else if (frame instanceof DataFrame df) {
486485
if (cancelled) {
487486
if (debug.on()) {
488487
debug.log("request cancelled or stream closed: dropping data frame");
489488
}
490-
connection.dropDataFrame((DataFrame) frame);
489+
connection.dropDataFrame(df);
491490
} else {
492-
receiveDataFrame((DataFrame) frame);
491+
receiveDataFrame(df);
493492
}
494493
} else {
495494
if (!cancelled) otherFrame(frame);
@@ -606,6 +605,16 @@ void incoming_reset(ResetFrame frame) {
606605
} else {
607606
Flow.Subscriber<?> subscriber =
608607
responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber;
608+
if (!requestBodyCF.isDone()) {
609+
// If a RST_STREAM is received, complete the requestBody. This will allow the
610+
// response to be read before the Reset is handled in the case where the client's
611+
// input stream is partially consumed or not consumed at all by the server.
612+
if (frame.getErrorCode() != ResetFrame.NO_ERROR) {
613+
requestBodyCF.completeExceptionally(new IOException("RST_STREAM received"));
614+
} else {
615+
requestBodyCF.complete(null);
616+
}
617+
}
609618
if (response == null && subscriber == null) {
610619
// we haven't received the headers yet, and won't receive any!
611620
// handle reset now.
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* Copyright (c) 2023, Oracle and/or its affiliates. All rights reserved.
3+
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4+
*
5+
* This code is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU General Public License version 2 only, as
7+
* published by the Free Software Foundation.
8+
*
9+
* This code is distributed in the hope that it will be useful, but WITHOUT
10+
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11+
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12+
* version 2 for more details (a copy is included in the LICENSE file that
13+
* accompanied this code).
14+
*
15+
* You should have received a copy of the GNU General Public License version
16+
* 2 along with this work; if not, write to the Free Software Foundation,
17+
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18+
*
19+
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20+
* or visit www.oracle.com if you need additional information or have any
21+
* questions.
22+
*/
23+
24+
/*
25+
* @test
26+
* @bug 8293786
27+
* @summary Checks to see if the HttpClient can process a request to cancel a transmission from a remote if the server
28+
* does not process any data. The client should read all data from the server and close the connection.
29+
* @library /test/jdk/java/net/httpclient/lib
30+
* @build jdk.httpclient.test.lib.http2.Http2TestServer
31+
* @run testng/othervm/timeout=50 -Djdk.httpclient.HttpClient.log=all
32+
* PostPutTest
33+
*/
34+
35+
import jdk.httpclient.test.lib.http2.Http2Handler;
36+
import jdk.httpclient.test.lib.http2.Http2TestExchange;
37+
import jdk.httpclient.test.lib.http2.Http2TestServer;
38+
39+
import org.testng.annotations.AfterTest;
40+
import org.testng.annotations.BeforeTest;
41+
import org.testng.annotations.DataProvider;
42+
import org.testng.annotations.Test;
43+
44+
import java.io.IOException;
45+
import java.io.PrintStream;
46+
import java.net.URI;
47+
import java.net.http.HttpClient;
48+
import java.net.http.HttpRequest;
49+
import java.net.http.HttpResponse;
50+
51+
import static java.net.http.HttpClient.Version.HTTP_2;
52+
import static java.net.http.HttpRequest.BodyPublishers.ofByteArray;
53+
54+
public class PostPutTest {
55+
56+
Http2TestServer http2TestServer;
57+
URI warmupURI, testHandlerBasicURI, testHandlerCloseBosURI, testHandleNegativeContentLengthURI;
58+
static PrintStream testLog = System.err;
59+
60+
// As per jdk.internal.net.http.WindowController.DEFAULT_INITIAL_WINDOW_SIZE
61+
final int DEFAULT_INITIAL_WINDOW_SIZE = (64 * 1024) - 1;
62+
// Add on a small amount of arbitrary bytes to see if client hangs when receiving RST_STREAM
63+
byte[] data = new byte[DEFAULT_INITIAL_WINDOW_SIZE + 10];
64+
65+
@BeforeTest
66+
public void setup() throws Exception {
67+
http2TestServer = new Http2TestServer(false, 0);
68+
http2TestServer.addHandler(new WarmupHandler(), "/Warmup");
69+
http2TestServer.addHandler(new TestHandlerBasic(), "/TestHandlerBasic");
70+
http2TestServer.addHandler(new TestHandlerCloseBos(), "/TestHandlerCloseBos");
71+
http2TestServer.addHandler(new TestHandleNegativeContentLength(), "/TestHandleNegativeContentLength");
72+
http2TestServer.start();
73+
testLog.println("PostPutTest.setup(): Starting server");
74+
warmupURI = new URI("http://" + http2TestServer.serverAuthority() + "/Warmup");
75+
testHandlerBasicURI = new URI("http://" + http2TestServer.serverAuthority() + "/TestHandlerBasic");
76+
testHandlerCloseBosURI = new URI("http://" + http2TestServer.serverAuthority() + "/TestHandlerCloseBos");
77+
testHandleNegativeContentLengthURI = new URI("http://" + http2TestServer.serverAuthority() + "/TestHandleNegativeContentLength");
78+
testLog.println("PostPutTest.setup(): warmupURI: " + warmupURI);
79+
testLog.println("PostPutTest.setup(): testHandlerBasicURI: " + testHandlerBasicURI);
80+
testLog.println("PostPutTest.setup(): testHandlerCloseBosURI: " + testHandlerCloseBosURI);
81+
testLog.println("PostPutTest.setup(): testHandleNegativeContentLengthURI: " + testHandleNegativeContentLengthURI);
82+
}
83+
84+
@AfterTest
85+
public void teardown() {
86+
testLog.println("PostPutTest.teardown(): Stopping server");
87+
http2TestServer.stop();
88+
data = null;
89+
}
90+
91+
@DataProvider(name = "variants")
92+
public Object[][] variants() {
93+
HttpRequest over64kPost, over64kPut, over64kPostCloseBos, over64kPutCloseBos, over64kPostNegativeContentLength, over64kPutNegativeContentLength;
94+
over64kPost = HttpRequest.newBuilder().version(HTTP_2).POST(ofByteArray(data)).uri(testHandlerBasicURI).build();
95+
over64kPut = HttpRequest.newBuilder().version(HTTP_2).PUT(ofByteArray(data)).uri(testHandlerBasicURI).build();
96+
97+
over64kPostCloseBos = HttpRequest.newBuilder().version(HTTP_2).POST(ofByteArray(data)).uri(testHandlerCloseBosURI).build();
98+
over64kPutCloseBos = HttpRequest.newBuilder().version(HTTP_2).PUT(ofByteArray(data)).uri(testHandlerCloseBosURI).build();
99+
100+
over64kPostNegativeContentLength = HttpRequest.newBuilder().version(HTTP_2).POST(ofByteArray(data)).uri(testHandleNegativeContentLengthURI).build();
101+
over64kPutNegativeContentLength = HttpRequest.newBuilder().version(HTTP_2).PUT(ofByteArray(data)).uri(testHandleNegativeContentLengthURI).build();
102+
103+
return new Object[][] {
104+
{ over64kPost, "POST data over 64k bytes" },
105+
{ over64kPut, "PUT data over 64k bytes" },
106+
{ over64kPostCloseBos, "POST data over 64k bytes with close bos" },
107+
{ over64kPutCloseBos, "PUT data over 64k bytes with close bos" },
108+
{ over64kPostNegativeContentLength, "POST data over 64k bytes with negative content length" },
109+
{ over64kPutNegativeContentLength, "PUT data over 64k bytes with negative content length" }
110+
};
111+
}
112+
113+
public HttpRequest getWarmupReq() {
114+
return HttpRequest.newBuilder()
115+
.GET()
116+
.uri(warmupURI)
117+
.build();
118+
}
119+
120+
@Test(dataProvider = "variants")
121+
public void testOver64kPUT(HttpRequest req, String testMessage) {
122+
testLog.println("PostPutTest: Performing test: " + testMessage);
123+
HttpClient hc = HttpClient.newBuilder().version(HTTP_2).build();
124+
hc.sendAsync(getWarmupReq(), HttpResponse.BodyHandlers.ofString()).join();
125+
hc.sendAsync(req, HttpResponse.BodyHandlers.ofString()).join();
126+
/*
127+
If this test fails in timeout, it is likely due to one of two reasons:
128+
- The responseSubscriber is null, so no incoming frames are being processed by the client
129+
(See Stream::schedule)
130+
- The test server is for some reason not sending a RST_STREAM with the NO_ERROR flag set after
131+
sending an empty DATA frame with the END_STREAM flag set.
132+
*/
133+
}
134+
135+
private static class TestHandlerBasic implements Http2Handler {
136+
137+
@Override
138+
public void handle(Http2TestExchange exchange) throws IOException {
139+
// The input stream is not read in this bug as this will trigger window updates for the server. This bug
140+
// concerns the case where no updates are sent and the server instead tells the client to abort the transmission.
141+
exchange.sendResponseHeaders(200, 0);
142+
}
143+
}
144+
145+
private static class TestHandlerCloseBos implements Http2Handler {
146+
147+
@Override
148+
public void handle(Http2TestExchange exchange) throws IOException {
149+
// This case does actually cause the test to hang due to the body input stream being closed before it can send
150+
// the RST_STREAM frame.
151+
exchange.sendResponseHeaders(200, 0);
152+
exchange.getResponseBody().close();
153+
}
154+
}
155+
156+
private static class TestHandleNegativeContentLength implements Http2Handler {
157+
158+
@Override
159+
public void handle(Http2TestExchange exchange) throws IOException {
160+
exchange.sendResponseHeaders(200, -1);
161+
}
162+
}
163+
164+
private static class WarmupHandler implements Http2Handler {
165+
166+
@Override
167+
public void handle(Http2TestExchange exchange) throws IOException {
168+
exchange.sendResponseHeaders(200, 0);
169+
}
170+
}
171+
}

test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/BodyInputStream.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ class BodyInputStream extends InputStream {
4141

4242
final Queue<Http2Frame> q;
4343
final int streamid;
44-
boolean closed;
45-
boolean eof;
44+
volatile boolean closed;
45+
volatile boolean eof;
4646
final Http2TestServerConnection conn;
4747

4848
@SuppressWarnings({"rawtypes","unchecked"})
@@ -100,6 +100,11 @@ private ByteBuffer getBuffer() throws IOException {
100100
return buffer;
101101
}
102102

103+
104+
public boolean isEof() {
105+
return eof;
106+
}
107+
103108
@Override
104109
public int read(byte[] buf, int offset, int length) throws IOException {
105110
if (closed) {
@@ -128,9 +133,12 @@ public int read() throws IOException {
128133
return one[0] & 0xFF;
129134
}
130135

136+
public boolean unconsumed() {
137+
return (!isEof() || q.size() > 0);
138+
}
139+
131140
@Override
132141
public void close() {
133-
// TODO reset this stream
134142
closed = true;
135143
}
136144
}

test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/BodyOutputStream.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.nio.ByteBuffer;
2828

2929
import jdk.internal.net.http.frame.DataFrame;
30+
import jdk.internal.net.http.frame.ResetFrame;
3031

3132
/**
3233
* OutputStream. Incoming window updates handled by the main connection
@@ -39,6 +40,8 @@ public class BodyOutputStream extends OutputStream {
3940
final int streamid;
4041
int window;
4142
volatile boolean closed;
43+
volatile BodyInputStream bis;
44+
volatile int resetErrorCode;
4245
boolean goodToGo = false; // not allowed to send until headers sent
4346
final Http2TestServerConnection conn;
4447
final Queue outputQ;
@@ -131,13 +134,25 @@ public void close() {
131134
}
132135
try {
133136
sendEndStream();
137+
if (bis!= null && bis.unconsumed()) {
138+
// Send a reset if there is still unconsumed data in the input stream
139+
sendReset(EMPTY_BARRAY, 0, 0, ResetFrame.NO_ERROR);
140+
}
134141
} catch (IOException ex) {
135-
System.err.println("TestServer: OutputStream.close exception: " + ex);
136142
ex.printStackTrace();
137143
}
138144
}
139145

140-
protected void sendEndStream() throws IOException {
146+
public void sendEndStream() throws IOException {
141147
send(EMPTY_BARRAY, 0, 0, DataFrame.END_STREAM);
142148
}
149+
150+
public void sendReset(byte[] buf, int offset, int len, int flags) throws IOException {
151+
ByteBuffer buffer = ByteBuffer.allocate(len);
152+
buffer.put(buf, offset, len);
153+
buffer.flip();
154+
assert streamid != 0;
155+
ResetFrame rf = new ResetFrame(streamid, flags);
156+
outputQ.put(rf);
157+
}
143158
}

test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestExchangeImpl.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import jdk.internal.net.http.frame.HeaderFrame;
2828
import jdk.internal.net.http.frame.HeadersFrame;
2929
import jdk.internal.net.http.frame.Http2Frame;
30+
import jdk.internal.net.http.frame.ResetFrame;
3031

3132
import javax.net.ssl.SSLSession;
3233
import java.io.IOException;
@@ -47,7 +48,7 @@ public class Http2TestExchangeImpl implements Http2TestExchange {
4748
protected final HttpHeadersBuilder rspheadersBuilder;
4849
final URI uri;
4950
final String method;
50-
final InputStream is;
51+
protected final InputStream is;
5152
protected final BodyOutputStream os;
5253
final SSLSession sslSession;
5354
protected final int streamid;
@@ -156,9 +157,16 @@ public void sendResponseHeaders(int rCode, long responseLength,
156157

157158
if (responseLength < 0 || rCode == 204) {
158159
response.setFlag(HeadersFrame.END_STREAM);
160+
conn.outputQ.put(response);
161+
// Put a reset frame on the outputQ if there is still unconsumed data in the input stream and output stream
162+
// is going to be marked closed.
163+
if (is instanceof BodyInputStream bis && bis.unconsumed()) {
164+
conn.outputQ.put(new ResetFrame(streamid, ResetFrame.NO_ERROR));
165+
}
159166
os.markClosed();
167+
} else {
168+
conn.outputQ.put(response);
160169
}
161-
conn.outputQ.put(response);
162170
os.goodToGo();
163171
System.err.println("Sent response headers " + rCode);
164172
}

test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestServerConnection.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -791,6 +791,11 @@ headers, rspheadersBuilder, uri, bis, getSSLSession(),
791791

792792
// give to user
793793
Http2Handler handler = server.getHandlerFor(uri.getPath());
794+
795+
// Need to pass the BodyInputStream reference to the BodyOutputStream, so it can determine if the stream
796+
// must be reset due to the BodyInputStream not being consumed by the handler when invoked.
797+
if (bis instanceof BodyInputStream bodyInputStream) bos.bis = bodyInputStream;
798+
794799
try {
795800
handler.handle(exchange);
796801
} catch (IOException closed) {
@@ -1114,7 +1119,7 @@ private void handlePush(OutgoingPushPromise op) throws IOException {
11141119
SettingsFrame.INITIAL_WINDOW_SIZE), this) {
11151120

11161121
@Override
1117-
protected void sendEndStream() throws IOException {
1122+
public void sendEndStream() throws IOException {
11181123
if (properties.getProperty("sendTrailingHeadersAfterPushPromise", "0").equals("1")) {
11191124
conn.outputQ.put(getTrailingHeadersFrame(promisedStreamid, List.of()));
11201125
} else {

0 commit comments

Comments
 (0)