1
1
/*
2
- * Copyright (c) 2015, 2018 , Oracle and/or its affiliates. All rights reserved.
2
+ * Copyright (c) 2015, 2024 , Oracle and/or its affiliates. All rights reserved.
3
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
4
*
5
5
* This code is free software; you can redistribute it and/or modify it
31
31
import jdk .internal .net .http .frame .WindowUpdateFrame ;
32
32
import jdk .internal .net .http .common .Utils ;
33
33
import java .util .concurrent .atomic .AtomicInteger ;
34
+ import java .util .concurrent .atomic .AtomicLong ;
34
35
36
+ /**
37
+ * A class that tracks the amount of flow controlled
38
+ * data received on an HTTP/2 connection
39
+ */
35
40
abstract class WindowUpdateSender {
36
41
37
42
final Logger debug =
38
43
Utils .getDebugLogger (this ::dbgString , Utils .DEBUG );
39
44
45
+ // The threshold at which window updates are sent in bytes
40
46
final int limit ;
47
+ // The flow control window in bytes
48
+ final int windowSize ;
41
49
final Http2Connection connection ;
42
- final AtomicInteger received = new AtomicInteger ();
50
+ // The amount of flow controlled data received and processed, in bytes,
51
+ // since the start of the window.
52
+ // The window is exhausted when received + unprocessed >= windowSize
53
+ final AtomicLong received = new AtomicLong ();
54
+ // The amount of flow controlled data received and unprocessed, in bytes,
55
+ // since the start of the window.
56
+ // The window is exhausted when received + unprocessed >= windowSize
57
+ final AtomicLong unprocessed = new AtomicLong ();
43
58
44
59
WindowUpdateSender (Http2Connection connection ) {
45
60
this (connection , connection .clientSettings .getParameter (SettingsFrame .INITIAL_WINDOW_SIZE ));
@@ -65,15 +80,117 @@ abstract class WindowUpdateSender {
65
80
maxFrameSize , initWindowSize , limit );
66
81
}
67
82
83
+ // O for the connection window, > 0 for a stream window
68
84
abstract int getStreamId ();
69
85
86
+ /**
87
+ * {@return {@code true} if buffering the given amount of
88
+ * flow controlled data would not exceed the flow control
89
+ * window}
90
+ * <p>
91
+ * This method is called before buffering and processing
92
+ * a DataFrame. The count of unprocessed bytes is incremented
93
+ * by the given amount, and checked against the number of
94
+ * available bytes in the flow control window.
95
+ * <p>
96
+ * This method returns {@code true} if the bytes can be buffered
97
+ * without exceeding the flow control window, {@code false}
98
+ * if the flow control window is exceeded and corrective
99
+ * action (close/reset) has been taken.
100
+ * <p>
101
+ * When this method returns true, either {@link #processed(int)}
102
+ * or {@link #released(int)} must eventually be called to release
103
+ * the bytes from the flow control window.
104
+ *
105
+ * @implSpec
106
+ * an HTTP/2 endpoint may disable its own flow control
107
+ * (see <a href="https://www.rfc-editor.org/rfc/rfc9113.html#section-5.2.1">
108
+ * RFC 9113, section 5.2.1</a>), in which case this
109
+ * method may return true even if the flow control window would
110
+ * be exceeded: that is, the flow control window is exceeded but
111
+ * the endpoint decided to take no corrective action.
112
+ *
113
+ * @param len a number of unprocessed bytes, which
114
+ * the caller wants to buffer.
115
+ */
116
+ boolean canBufferUnprocessedBytes (int len ) {
117
+ return !checkWindowSizeExceeded (unprocessed .addAndGet (len ));
118
+ }
119
+
120
+ // adds the provided amount to the amount of already
121
+ // received and processed bytes and checks whether the
122
+ // flow control window is exceeded. If so, take
123
+ // corrective actions and return true.
124
+ private boolean checkWindowSizeExceeded (long len ) {
125
+ // because windowSize is bound by Integer.MAX_VALUE
126
+ // we will never reach the point where received.get() + len
127
+ // could overflow
128
+ long rcv = received .get () + len ;
129
+ return rcv > windowSize && windowSizeExceeded (rcv );
130
+ }
131
+
132
+ /**
133
+ * Called after unprocessed buffered bytes have been
134
+ * processed, to release part of the flow control window
135
+ *
136
+ * @apiNote this method is called only when releasing bytes
137
+ * that where buffered after calling
138
+ * {@link #canBufferUnprocessedBytes(int)}.
139
+ *
140
+ * @param delta the amount of processed bytes to release
141
+ */
142
+ void processed (int delta ) {
143
+ long rest = unprocessed .addAndGet (-delta );
144
+ assert rest >= 0 ;
145
+ update (delta );
146
+ }
147
+
148
+ /**
149
+ * Called when it is desired to release unprocessed bytes
150
+ * without processing them, or without triggering the
151
+ * sending of a window update. This method can be called
152
+ * instead of calling {@link #processed(int)}.
153
+ * When this method is called instead of calling {@link #processed(int)},
154
+ * it should generally be followed by a call to {@link #update(int)},
155
+ * unless the stream or connection is being closed.
156
+ *
157
+ * @apiNote this method should only be called to release bytes that
158
+ * have been buffered after calling {@link
159
+ * #canBufferUnprocessedBytes(int)}.
160
+ *
161
+ * @param delta the amount of bytes to release from the window
162
+ *
163
+ * @return the amount of remaining unprocessed bytes
164
+ */
165
+ long released (int delta ) {
166
+ long rest = unprocessed .addAndGet (-delta );
167
+ assert rest >= 0 ;
168
+ return rest ;
169
+ }
170
+
171
+ /**
172
+ * This method is called to update the flow control window,
173
+ * and possibly send a window update
174
+ *
175
+ * @apiNote this method can be called directly if a frame is
176
+ * dropped before calling {@link #canBufferUnprocessedBytes(int)}.
177
+ * Otherwise, either {@link #processed(int)} or {@link #released(int)}
178
+ * should be called, depending on whether sending a window update
179
+ * is desired or not. It is typically not desired to send an update
180
+ * if the stream or connection is being closed.
181
+ *
182
+ * @param delta the amount of bytes released from the window.
183
+ */
70
184
void update (int delta ) {
71
- int rcv = received .addAndGet (delta );
185
+ long rcv = received .addAndGet (delta );
72
186
if (debug .on ()) debug .log ("update: %d, received: %d, limit: %d" , delta , rcv , limit );
187
+ if (rcv > windowSize && windowSizeExceeded (rcv )) {
188
+ return ;
189
+ }
73
190
if (rcv > limit ) {
74
191
synchronized (this ) {
75
- int tosend = received .get ();
76
- if ( tosend > limit ) {
192
+ int tosend = ( int ) Math . min ( received .get (), Integer . MAX_VALUE );
193
+ if ( tosend > limit ) {
77
194
received .getAndAdd (-tosend );
78
195
sendWindowUpdate (tosend );
79
196
}
0 commit comments