1616
1717package io .grpc .servlet ;
1818
19- import static com .google .common .base .Preconditions .checkState ;
2019import static io .grpc .servlet .ServletServerStream .toHexString ;
2120import static java .util .logging .Level .FINE ;
2221import static java .util .logging .Level .FINEST ;
2322
2423import com .google .common .annotations .VisibleForTesting ;
25- import com .google .errorprone .annotations .CheckReturnValue ;
2624import io .grpc .InternalLogId ;
2725import io .grpc .servlet .ServletServerStream .ServletTransportState ;
2826import java .io .IOException ;
29- import java .time .Duration ;
3027import java .util .Queue ;
3128import java .util .concurrent .ConcurrentLinkedQueue ;
32- import java .util .concurrent .atomic . AtomicReference ;
33- import java .util .concurrent .locks .LockSupport ;
29+ import java .util .concurrent .locks . Lock ;
30+ import java .util .concurrent .locks .ReentrantLock ;
3431import java .util .function .BiFunction ;
3532import java .util .function .BooleanSupplier ;
3633import java .util .logging .Level ;
3734import java .util .logging .Logger ;
38- import javax .annotation .Nullable ;
3935import javax .servlet .AsyncContext ;
4036import javax .servlet .ServletOutputStream ;
37+ import org .checkerframework .checker .lock .qual .GuardedBy ;
4138
4239/** Handles write actions from the container thread and the application thread. */
4340final class AsyncServletOutputStreamWriter {
4441
45- /**
46- * Memory boundary for write actions.
47- *
48- * <pre>
49- * WriteState curState = writeState.get(); // mark a boundary
50- * doSomething(); // do something within the boundary
51- * boolean successful = writeState.compareAndSet(curState, newState); // try to mark a boundary
52- * if (successful) {
53- * // state has not changed since
54- * return;
55- * } else {
56- * // state is changed by another thread while doSomething(), need recompute
57- * }
58- * </pre>
59- *
60- * <p>There are two threads, the container thread (calling {@code onWritePossible()}) and the
61- * application thread (calling {@code runOrBuffer()}) that read and update the
62- * writeState. Only onWritePossible() may turn {@code readyAndDrained} from false to true, and
63- * only runOrBuffer() may turn it from true to false.
64- */
65- private final AtomicReference <WriteState > writeState = new AtomicReference <>(WriteState .DEFAULT );
66-
6742 private final Log log ;
6843 private final BiFunction <byte [], Integer , ActionItem > writeAction ;
6944 private final ActionItem flushAction ;
7045 private final ActionItem completeAction ;
7146 private final BooleanSupplier isReady ;
7247
48+ private final Lock writeLock = new ReentrantLock ();
7349 /**
74- * New write actions will be buffered into this queue if the servlet output stream is not ready or
75- * the queue is not drained.
50+ * New write actions will be buffered into this queue if the servlet output stream is not ready
51+ * or the queue is not drained.
7652 */
7753 // SPSC queue would do
7854 private final Queue <ActionItem > writeChain = new ConcurrentLinkedQueue <>();
79- // for a theoretical race condition that onWritePossible() is called immediately after isReady()
80- // returns false and before writeState.compareAndSet()
81- @ Nullable
82- private volatile Thread parkingThread ;
8355
8456 AsyncServletOutputStreamWriter (
8557 AsyncContext asyncContext ,
@@ -173,40 +145,41 @@ void complete() {
173145 /** Called from the container thread {@link javax.servlet.WriteListener#onWritePossible()}. */
174146 void onWritePossible () throws IOException {
175147 log .finest ("onWritePossible: ENTRY. The servlet output stream becomes ready" );
176- assureReadyAndDrainedTurnsFalse ();
177- while (isReady .getAsBoolean ()) {
178- WriteState curState = writeState .get ();
179-
180- ActionItem actionItem = writeChain .poll ();
181- if (actionItem != null ) {
182- actionItem .run ();
183- continue ;
148+ do {
149+ writeLock .lock ();
150+ try {
151+ writeFromQueue ();
152+ if (!outputKnownToBeReady ) {
153+ log .finest ("onWritePossible: EXIT. The servlet output stream becomes not ready" );
154+ return ;
155+ }
156+ } finally {
157+ writeLock .unlock ();
184158 }
159+ } while (!writeChain .isEmpty ());
160+ log .finest ("onWritePossible: EXIT. Queue drained" );
161+ }
185162
186- if (writeState .compareAndSet (curState , curState .withReadyAndDrained (true ))) {
187- // state has not changed since.
188- log .finest (
189- "onWritePossible: EXIT. All data available now is sent out and the servlet output"
190- + " stream is still ready" );
163+ @ GuardedBy ("writeLock" )
164+ private boolean outputKnownToBeReady = false ;
165+
166+ private void writeFromQueue () throws IOException {
167+ for (;;) {
168+ ActionItem actionItem ;
169+ if (outputKnownToBeReady ) {
170+ actionItem = writeChain .poll ();
171+ } else if (isReady .getAsBoolean ()) {
172+ outputKnownToBeReady = true ;
173+ actionItem = writeChain .poll ();
174+ } else {
191175 return ;
192176 }
193- // else, state changed by another thread (runOrBuffer()), need to drain the writeChain
194- // again
195- }
196- log .finest ("onWritePossible: EXIT. The servlet output stream becomes not ready" );
197- }
198-
199- private void assureReadyAndDrainedTurnsFalse () {
200- // readyAndDrained should have been set to false already.
201- // Just in case due to a race condition readyAndDrained is still true at this moment and is
202- // being set to false by runOrBuffer() concurrently.
203- while (writeState .get ().readyAndDrained ) {
204- parkingThread = Thread .currentThread ();
205- // Try to sleep for an extremely long time to avoid writeState being changed at exactly
206- // the time when sleep time expires (in extreme scenario, such as #9917).
207- LockSupport .parkNanos (Duration .ofHours (1 ).toNanos ()); // should return immediately
177+ if (actionItem == null ) {
178+ return ;
179+ }
180+ actionItem .run ();
181+ outputKnownToBeReady = false ;
208182 }
209- parkingThread = null ;
210183 }
211184
212185 /**
@@ -216,31 +189,13 @@ private void assureReadyAndDrainedTurnsFalse() {
216189 * <p>Called from application thread.
217190 */
218191 private void runOrBuffer (ActionItem actionItem ) throws IOException {
219- WriteState curState = writeState .get ();
220- if (curState .readyAndDrained ) { // write to the outputStream directly
221- actionItem .run ();
222- if (actionItem == completeAction ) {
223- return ;
224- }
225- if (!isReady .getAsBoolean ()) {
226- boolean successful =
227- writeState .compareAndSet (curState , curState .withReadyAndDrained (false ));
228- LockSupport .unpark (parkingThread );
229- checkState (successful , "Bug: curState is unexpectedly changed by another thread" );
230- log .finest ("the servlet output stream becomes not ready" );
192+ writeChain .offer (actionItem );
193+ if (writeLock .tryLock ()) { // write to the outputStream directly
194+ try {
195+ writeFromQueue ();
196+ } finally {
197+ writeLock .unlock ();
231198 }
232- } else { // buffer to the writeChain
233- writeChain .offer (actionItem );
234- if (!writeState .compareAndSet (curState , curState .withReadyAndDrained (false ))) {
235- checkState (
236- writeState .get ().readyAndDrained ,
237- "Bug: onWritePossible() should have changed readyAndDrained to true, but not" );
238- ActionItem lastItem = writeChain .poll ();
239- if (lastItem != null ) {
240- checkState (lastItem == actionItem , "Bug: lastItem != actionItem" );
241- runOrBuffer (lastItem );
242- }
243- } // state has not changed since
244199 }
245200 }
246201
@@ -254,43 +209,11 @@ interface ActionItem {
254209 @ VisibleForTesting // Lincheck test can not run with java.util.logging dependency.
255210 interface Log {
256211 default boolean isLoggable (Level level ) {
257- return false ;
212+ return false ;
258213 }
259214
260215 default void fine (String str , Object ...params ) {}
261216
262217 default void finest (String str , Object ...params ) {}
263218 }
264-
265- private static final class WriteState {
266-
267- static final WriteState DEFAULT = new WriteState (false );
268-
269- /**
270- * The servlet output stream is ready and the writeChain is empty.
271- *
272- * <p>readyAndDrained turns from false to true when:
273- * {@code onWritePossible()} exits while currently there is no more data to write, but the last
274- * check of {@link javax.servlet.ServletOutputStream#isReady()} is true.
275- *
276- * <p>readyAndDrained turns from true to false when:
277- * {@code runOrBuffer()} exits while either the action item is written directly to the
278- * servlet output stream and the check of {@link javax.servlet.ServletOutputStream#isReady()}
279- * right after that returns false, or the action item is buffered into the writeChain.
280- */
281- final boolean readyAndDrained ;
282-
283- WriteState (boolean readyAndDrained ) {
284- this .readyAndDrained = readyAndDrained ;
285- }
286-
287- /**
288- * Only {@code onWritePossible()} can set readyAndDrained to true, and only {@code
289- * runOrBuffer()} can set it to false.
290- */
291- @ CheckReturnValue
292- WriteState withReadyAndDrained (boolean readyAndDrained ) {
293- return new WriteState (readyAndDrained );
294- }
295- }
296219}
0 commit comments