Skip to content

Commit b0e0272

Browse files
committed
Backport 32ac3e713ea4370e496717967fff7de9450d2f69
1 parent ae0f03b commit b0e0272

File tree

12 files changed

+889
-26
lines changed

12 files changed

+889
-26
lines changed

src/java.net.http/share/classes/jdk/internal/net/http/Http2ClientImpl.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import jdk.internal.net.http.common.MinimalFuture;
4242
import jdk.internal.net.http.common.Utils;
4343
import jdk.internal.net.http.frame.SettingsFrame;
44+
45+
import static jdk.internal.net.http.frame.SettingsFrame.INITIAL_CONNECTION_WINDOW_SIZE;
4446
import static jdk.internal.net.http.frame.SettingsFrame.INITIAL_WINDOW_SIZE;
4547
import static jdk.internal.net.http.frame.SettingsFrame.ENABLE_PUSH;
4648
import static jdk.internal.net.http.frame.SettingsFrame.HEADER_TABLE_SIZE;
@@ -265,9 +267,13 @@ int getConnectionWindowSize(SettingsFrame clientSettings) {
265267
int defaultValue = Math.min(Integer.MAX_VALUE,
266268
Math.max(streamWindow, K*K*32));
267269

270+
// The min value is the max between the streamWindow and
271+
// the initial connection window size
272+
int minValue = Math.max(INITIAL_CONNECTION_WINDOW_SIZE, streamWindow);
273+
268274
return getParameter(
269275
"jdk.httpclient.connectionWindowSize",
270-
streamWindow, Integer.MAX_VALUE, defaultValue);
276+
minValue, Integer.MAX_VALUE, defaultValue);
271277
}
272278

273279
/**

src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -975,6 +975,34 @@ private String checkMaxOrphanedHeadersExceeded(HeaderFrame hf) {
975975
return null;
976976
}
977977

978+
// This method is called when a DataFrame that was added
979+
// to a Stream::inputQ is later dropped from the queue
980+
// without being consumed.
981+
//
982+
// Before adding a frame to the queue, the Stream calls
983+
// connection.windowUpdater.canBufferUnprocessedBytes(), which
984+
// increases the count of unprocessed bytes in the connection.
985+
// After consuming the frame, it calls connection.windowUpdater::processed,
986+
// which decrements the count of unprocessed bytes, and possibly
987+
// sends a window update to the peer.
988+
//
989+
// This method is called when connection.windowUpdater::processed
990+
// will not be called, which can happen when consuming the frame
991+
// fails, or when an empty DataFrame terminates the stream,
992+
// or when the stream is cancelled while data is still
993+
// sitting in its inputQ. In the later case, it is called for
994+
// each frame that is dropped from the queue.
995+
final void releaseUnconsumed(DataFrame df) {
996+
windowUpdater.released(df.payloadLength());
997+
dropDataFrame(df);
998+
}
999+
1000+
// This method can be called directly when a DataFrame is dropped
1001+
// before/without having been added to any Stream::inputQ.
1002+
// In that case, the number of unprocessed bytes hasn't been incremented
1003+
// by the stream, and does not need to be decremented.
1004+
// Otherwise, if the frame is dropped after having been added to the
1005+
// inputQ, releaseUnconsumed above should be called.
9781006
final void dropDataFrame(DataFrame df) {
9791007
if (isMarked(closedState, SHUTDOWN_REQUESTED)) return;
9801008
if (debug.on()) {
@@ -1331,11 +1359,12 @@ private void sendConnectionPreface() throws IOException {
13311359
// Note that the default initial window size, not to be confused
13321360
// with the initial window size, is defined by RFC 7540 as
13331361
// 64K -1.
1334-
final int len = windowUpdater.initialWindowSize - DEFAULT_INITIAL_WINDOW_SIZE;
1335-
if (len != 0) {
1362+
final int len = windowUpdater.initialWindowSize - INITIAL_CONNECTION_WINDOW_SIZE;
1363+
assert len >= 0;
1364+
if (len > 0) {
13361365
if (Log.channel()) {
13371366
Log.logChannel("Sending initial connection window update frame: {0} ({1} - {2})",
1338-
len, windowUpdater.initialWindowSize, DEFAULT_INITIAL_WINDOW_SIZE);
1367+
len, windowUpdater.initialWindowSize, INITIAL_CONNECTION_WINDOW_SIZE);
13391368
}
13401369
windowUpdater.sendWindowUpdate(len);
13411370
}
@@ -1722,6 +1751,19 @@ public ConnectionWindowUpdateSender(Http2Connection connection,
17221751
int getStreamId() {
17231752
return 0;
17241753
}
1754+
1755+
@Override
1756+
protected boolean windowSizeExceeded(long received) {
1757+
if (connection.isOpen()) {
1758+
try {
1759+
connection.protocolError(ErrorFrame.FLOW_CONTROL_ERROR,
1760+
"connection window exceeded");
1761+
} catch (IOException io) {
1762+
connection.shutdown(io);
1763+
}
1764+
}
1765+
return true;
1766+
}
17251767
}
17261768

17271769
/**

src/java.net.http/share/classes/jdk/internal/net/http/Stream.java

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,8 @@ private void schedule() {
203203
int size = Utils.remaining(dsts, Integer.MAX_VALUE);
204204
if (size == 0 && finished) {
205205
inputQ.remove();
206-
connection.ensureWindowUpdated(df); // must update connection window
206+
// consumed will not be called
207+
connection.releaseUnconsumed(df); // must update connection window
207208
Log.logTrace("responseSubscriber.onComplete");
208209
if (debug.on()) debug.log("incoming: onComplete");
209210
sched.stop();
@@ -219,7 +220,11 @@ private void schedule() {
219220
try {
220221
subscriber.onNext(dsts);
221222
} catch (Throwable t) {
222-
connection.dropDataFrame(df); // must update connection window
223+
// Data frames that have been added to the inputQ
224+
// must be released using releaseUnconsumed() to
225+
// account for the amount of unprocessed bytes
226+
// tracked by the connection.windowUpdater.
227+
connection.releaseUnconsumed(df);
223228
throw t;
224229
}
225230
if (consumed(df)) {
@@ -271,8 +276,12 @@ private void schedule() {
271276
private void drainInputQueue() {
272277
Http2Frame frame;
273278
while ((frame = inputQ.poll()) != null) {
274-
if (frame instanceof DataFrame) {
275-
connection.dropDataFrame((DataFrame)frame);
279+
if (frame instanceof DataFrame df) {
280+
// Data frames that have been added to the inputQ
281+
// must be released using releaseUnconsumed() to
282+
// account for the amount of unprocessed bytes
283+
// tracked by the connection.windowUpdater.
284+
connection.releaseUnconsumed(df);
276285
}
277286
}
278287
}
@@ -298,12 +307,13 @@ private boolean consumed(DataFrame df) {
298307
boolean endStream = df.getFlag(DataFrame.END_STREAM);
299308
if (len == 0) return endStream;
300309

301-
connection.windowUpdater.update(len);
302-
310+
connection.windowUpdater.processed(len);
303311
if (!endStream) {
312+
streamWindowUpdater.processed(len);
313+
} else {
304314
// Don't send window update on a stream which is
305315
// closed or half closed.
306-
windowUpdater.update(len);
316+
streamWindowUpdater.released(len);
307317
}
308318

309319
// true: end of stream; false: more data coming
@@ -373,8 +383,21 @@ public String toString() {
373383
}
374384

375385
private void receiveDataFrame(DataFrame df) {
376-
inputQ.add(df);
377-
sched.runOrSchedule();
386+
try {
387+
int len = df.payloadLength();
388+
if (len > 0) {
389+
// we return from here if the connection is being closed.
390+
if (!connection.windowUpdater.canBufferUnprocessedBytes(len)) return;
391+
// we return from here if the stream is being closed.
392+
if (closed || !streamWindowUpdater.canBufferUnprocessedBytes(len)) {
393+
connection.releaseUnconsumed(df);
394+
return;
395+
}
396+
}
397+
inputQ.add(df);
398+
} finally {
399+
sched.runOrSchedule();
400+
}
378401
}
379402

380403
/** Handles a RESET frame. RESET is always handled inline in the queue. */
@@ -452,7 +475,7 @@ CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
452475
this.responseHeadersBuilder = new HttpHeadersBuilder();
453476
this.rspHeadersConsumer = new HeadersConsumer();
454477
this.requestPseudoHeaders = createPseudoHeaders(request);
455-
this.windowUpdater = new StreamWindowUpdateSender(connection);
478+
this.streamWindowUpdater = new StreamWindowUpdateSender(connection);
456479
}
457480

458481
private boolean checkRequestCancelled() {
@@ -1331,12 +1354,18 @@ void cancel(IOException cause) {
13311354

13321355
@Override
13331356
void onProtocolError(final IOException cause) {
1357+
onProtocolError(cause, ResetFrame.PROTOCOL_ERROR);
1358+
}
1359+
1360+
void onProtocolError(final IOException cause, int code) {
13341361
if (debug.on()) {
1335-
debug.log("cancelling exchange on stream %d due to protocol error: %s", streamid, cause.getMessage());
1362+
debug.log("cancelling exchange on stream %d due to protocol error [%s]: %s",
1363+
streamid, ErrorFrame.stringForCode(code),
1364+
cause.getMessage());
13361365
}
13371366
Log.logError("cancelling exchange on stream {0} due to protocol error: {1}\n", streamid, cause);
13381367
// send a RESET frame and close the stream
1339-
cancelImpl(cause, ResetFrame.PROTOCOL_ERROR);
1368+
cancelImpl(cause, code);
13401369
}
13411370

13421371
void connectionClosing(Throwable cause) {
@@ -1632,6 +1661,13 @@ String dbgString() {
16321661
return dbgString = dbg;
16331662
}
16341663
}
1664+
1665+
@Override
1666+
protected boolean windowSizeExceeded(long received) {
1667+
onProtocolError(new ProtocolException("stream %s flow control window exceeded"
1668+
.formatted(streamid)), ResetFrame.FLOW_CONTROL_ERROR);
1669+
return true;
1670+
}
16351671
}
16361672

16371673
/**

src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ abstract class WindowUpdateSender {
5151

5252
WindowUpdateSender(Http2Connection connection, int maxFrameSize, int initWindowSize) {
5353
this.connection = connection;
54+
this.windowSize = initWindowSize;
5455
int v0 = Math.max(0, initWindowSize - maxFrameSize);
5556
int v1 = (initWindowSize + (maxFrameSize - 1)) / maxFrameSize;
5657
v1 = v1 * maxFrameSize / 2;
@@ -82,6 +83,7 @@ void update(int delta) {
8283

8384
void sendWindowUpdate(int delta) {
8485
if (debug.on()) debug.log("sending window update: %d", delta);
86+
assert delta > 0 : "illegal window update delta: " + delta;
8587
connection.sendUnorderedFrame(new WindowUpdateFrame(getStreamId(), delta));
8688
}
8789

@@ -99,4 +101,16 @@ String dbgString() {
99101
}
100102
}
101103

104+
/**
105+
* Called when the flow control window size is exceeded
106+
* This method may return false if flow control is disabled
107+
* in this endpoint.
108+
*
109+
* @param received the amount of data received, which is greater
110+
* than {@code windowSize}
111+
* @return {@code true} if the error was reported to the peer
112+
* and no further window update should be sent.
113+
*/
114+
protected abstract boolean windowSizeExceeded(long received);
115+
102116
}

src/java.net.http/share/classes/jdk/internal/net/http/frame/FramesDecoder.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -463,6 +463,16 @@ private Http2Frame parseSettingsFrame(int frameLength, int streamid, int flags)
463463
int val = getInt();
464464
if (id > 0 && id <= SettingsFrame.MAX_PARAM) {
465465
// a known parameter. Ignore otherwise
466+
if (id == SettingsFrame.INITIAL_WINDOW_SIZE && val < 0) {
467+
return new MalformedFrame(ErrorFrame.FLOW_CONTROL_ERROR,
468+
"SettingsFrame with INITIAL_WINDOW_SIZE > 2^31 -1: "
469+
+ (val & 0xffffffffL));
470+
}
471+
if (id == SettingsFrame.MAX_FRAME_SIZE && (val < 16384 || val > 16777215)) {
472+
return new MalformedFrame(ErrorFrame.PROTOCOL_ERROR,
473+
"SettingsFrame with MAX_FRAME_SIZE out of range: "
474+
+ (val & 0xffffffffL));
475+
}
466476
sf.setParameter(id, val); // TODO parameters validation
467477
}
468478
}
@@ -530,7 +540,12 @@ private Http2Frame parseWindowUpdateFrame(int frameLength, int streamid, int fla
530540
return new MalformedFrame(ErrorFrame.FRAME_SIZE_ERROR,
531541
"WindowUpdateFrame length is "+ frameLength+", expected 4");
532542
}
533-
return new WindowUpdateFrame(streamid, getInt() & 0x7fffffff);
543+
int update = getInt();
544+
if (update < 0) {
545+
return new MalformedFrame(ErrorFrame.FLOW_CONTROL_ERROR,
546+
"WindowUpdateFrame with value > 2^31 -1 " + (update & 0xffffffffL));
547+
}
548+
return new WindowUpdateFrame(streamid, update & 0x7fffffff);
534549
}
535550

536551
private Http2Frame parseContinuationFrame(int frameLength, int streamid, int flags) {

src/java.net.http/share/classes/jdk/internal/net/http/frame/SettingsFrame.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -166,6 +166,11 @@ public synchronized void update(SettingsFrame updated) {
166166
// The initial value is 2^14 (16,384) octets.
167167
public static final int DEFAULT_MAX_FRAME_SIZE = 16 * K;
168168

169+
// Initial connection window size. This cannot be updated using the
170+
// SETTINGS frame.
171+
public static final int INITIAL_CONNECTION_WINDOW_SIZE = DEFAULT_INITIAL_WINDOW_SIZE;
172+
173+
169174
public static SettingsFrame defaultRFCSettings() {
170175
SettingsFrame f = new SettingsFrame();
171176
f.setParameter(ENABLE_PUSH, DEFAULT_ENABLE_PUSH);

test/jdk/java/net/httpclient/GZIPInputStreamTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
* @bug 8217264
2727
* @summary Tests that you can map an InputStream to a GZIPInputStream
2828
* @library /test/lib /test/jdk/java/net/httpclient/lib
29-
* @build jdk.test.lib.net.SimpleSSLContext jdk.httpclient.test.lib.common.HttpServerAdapters
29+
* @build jdk.test.lib.net.SimpleSSLContext jdk.httpclient.test.lib.common.HttpServerAdapters ReferenceTracker
3030
* @run testng/othervm GZIPInputStreamTest
3131
*/
3232

test/jdk/java/net/httpclient/ProxySelectorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ public void setup() throws Exception {
376376
public void teardown() throws Exception {
377377
client = null;
378378
Thread.sleep(100);
379-
AssertionError fail = TRACKER.check(500);
379+
AssertionError fail = TRACKER.check(1500);
380380
try {
381381
proxy.stop();
382382
authproxy.stop();

0 commit comments

Comments
 (0)