1616
1717package io .grpc .servlet ;
1818
19- import static com .google .common .base .Preconditions .checkState ;
2019import static io .grpc .servlet .ServletServerStream .toHexString ;
2120import static java .util .logging .Level .FINE ;
2221import static java .util .logging .Level .FINEST ;
2322
2423import com .google .common .annotations .VisibleForTesting ;
25- import com .google .errorprone .annotations .CheckReturnValue ;
2624import io .grpc .InternalLogId ;
2725import io .grpc .servlet .ServletServerStream .ServletTransportState ;
2826import java .io .IOException ;
29- import java .time .Duration ;
3027import java .util .Queue ;
3128import java .util .concurrent .ConcurrentLinkedQueue ;
32- import java .util .concurrent .atomic .AtomicReference ;
33- import java .util .concurrent .locks .LockSupport ;
29+ import java .util .concurrent .locks .StampedLock ;
3430import java .util .function .BiFunction ;
3531import java .util .function .BooleanSupplier ;
3632import java .util .logging .Level ;
3733import java .util .logging .Logger ;
38- import javax .annotation .Nullable ;
3934import javax .servlet .AsyncContext ;
4035import javax .servlet .ServletOutputStream ;
4136
4237/** Handles write actions from the container thread and the application thread. */
4338final class AsyncServletOutputStreamWriter {
4439
40+ private final StampedLock writeLock = new StampedLock ();
41+
4542 /**
46- * Memory boundary for write actions.
47- *
48- * <pre>
49- * WriteState curState = writeState.get(); // mark a boundary
50- * doSomething(); // do something within the boundary
51- * boolean successful = writeState.compareAndSet(curState, newState); // try to mark a boundary
52- * if (successful) {
53- * // state has not changed since
54- * return;
55- * } else {
56- * // state is changed by another thread while doSomething(), need recompute
57- * }
58- * </pre>
43+ * The servlet output stream is ready, and the writeQueue is empty.
5944 *
6045 * <p>There are two threads, the container thread (calling {@code onWritePossible()}) and the
6146 * application thread (calling {@code runOrBuffer()}) that read and update the
6247 * writeState. Only onWritePossible() may turn {@code readyAndDrained} from false to true, and
6348 * only runOrBuffer() may turn it from true to false.
49+ *
50+ * <p>readyAndDrained turns from false to true when:
51+ * {@code onWritePossible()} exits while currently there is no more data to write, but the last
52+ * check of {@link javax.servlet.ServletOutputStream#isReady()} is true.
53+ *
54+ * <p>readyAndDrained turns from true to false when:
55+ * {@code runOrBuffer()} exits while either the action item is written directly to the
56+ * servlet output stream and the check of {@link javax.servlet.ServletOutputStream#isReady()}
57+ * right after that returns false, or the action item is buffered into the writeQueue.
6458 */
65- private final AtomicReference <WriteState > writeState = new AtomicReference <>(WriteState .DEFAULT );
59+ // @GuardedBy("writeLock")
60+ private boolean readyAndDrained ;
6661
6762 private final Log log ;
6863 private final BiFunction <byte [], Integer , ActionItem > writeAction ;
@@ -71,15 +66,10 @@ final class AsyncServletOutputStreamWriter {
7166 private final BooleanSupplier isReady ;
7267
7368 /**
74- * New write actions will be buffered into this queue if the servlet output stream is not ready or
75- * the queue is not drained.
69+ * New write actions will be buffered into this queue.
7670 */
7771 // SPSC queue would do
78- private final Queue <ActionItem > writeChain = new ConcurrentLinkedQueue <>();
79- // for a theoretical race condition that onWritePossible() is called immediately after isReady()
80- // returns false and before writeState.compareAndSet()
81- @ Nullable
82- private volatile Thread parkingThread ;
72+ private final Queue <ActionItem > writeQueue = new ConcurrentLinkedQueue <>();
8373
8474 AsyncServletOutputStreamWriter (
8575 AsyncContext asyncContext ,
@@ -128,7 +118,7 @@ public void finest(String str, Object... params) {
128118 log .fine ("call completed" );
129119 });
130120 };
131- this .isReady = () -> outputStream . isReady () ;
121+ this .isReady = outputStream :: isReady ;
132122 }
133123
134124 /**
@@ -173,40 +163,21 @@ void complete() {
173163 /** Called from the container thread {@link javax.servlet.WriteListener#onWritePossible()}. */
174164 void onWritePossible () throws IOException {
175165 log .finest ("onWritePossible: ENTRY. The servlet output stream becomes ready" );
176- assureReadyAndDrainedTurnsFalse ();
177- while (isReady .getAsBoolean ()) {
178- WriteState curState = writeState .get ();
179-
180- ActionItem actionItem = writeChain .poll ();
181- if (actionItem != null ) {
166+ long stamp = writeLock .writeLock ();
167+ try {
168+ while (isReady .getAsBoolean ()) {
169+ ActionItem actionItem = writeQueue .poll ();
170+ if (actionItem == null ) {
171+ readyAndDrained = true ;
172+ log .finest ("onWritePossible: EXIT. Queue drained" );
173+ return ;
174+ }
182175 actionItem .run ();
183- continue ;
184176 }
185-
186- if (writeState .compareAndSet (curState , curState .withReadyAndDrained (true ))) {
187- // state has not changed since.
188- log .finest (
189- "onWritePossible: EXIT. All data available now is sent out and the servlet output"
190- + " stream is still ready" );
191- return ;
192- }
193- // else, state changed by another thread (runOrBuffer()), need to drain the writeChain
194- // again
195- }
196- log .finest ("onWritePossible: EXIT. The servlet output stream becomes not ready" );
197- }
198-
199- private void assureReadyAndDrainedTurnsFalse () {
200- // readyAndDrained should have been set to false already.
201- // Just in case due to a race condition readyAndDrained is still true at this moment and is
202- // being set to false by runOrBuffer() concurrently.
203- while (writeState .get ().readyAndDrained ) {
204- parkingThread = Thread .currentThread ();
205- // Try to sleep for an extremely long time to avoid writeState being changed at exactly
206- // the time when sleep time expires (in extreme scenario, such as #9917).
207- LockSupport .parkNanos (Duration .ofHours (1 ).toNanos ()); // should return immediately
177+ log .finest ("onWritePossible: EXIT. The servlet output stream becomes not ready" );
178+ } finally {
179+ writeLock .unlockWrite (stamp );
208180 }
209- parkingThread = null ;
210181 }
211182
212183 /**
@@ -216,31 +187,26 @@ private void assureReadyAndDrainedTurnsFalse() {
216187 * <p>Called from application thread.
217188 */
218189 private void runOrBuffer (ActionItem actionItem ) throws IOException {
219- WriteState curState = writeState .get ();
220- if (curState .readyAndDrained ) { // write to the outputStream directly
221- actionItem .run ();
222- if (actionItem == completeAction ) {
223- return ;
224- }
225- if (!isReady .getAsBoolean ()) {
226- boolean successful =
227- writeState .compareAndSet (curState , curState .withReadyAndDrained (false ));
228- LockSupport .unpark (parkingThread );
229- checkState (successful , "Bug: curState is unexpectedly changed by another thread" );
230- log .finest ("the servlet output stream becomes not ready" );
231- }
232- } else { // buffer to the writeChain
233- writeChain .offer (actionItem );
234- if (!writeState .compareAndSet (curState , curState .withReadyAndDrained (false ))) {
235- checkState (
236- writeState .get ().readyAndDrained ,
237- "Bug: onWritePossible() should have changed readyAndDrained to true, but not" );
238- ActionItem lastItem = writeChain .poll ();
239- if (lastItem != null ) {
240- checkState (lastItem == actionItem , "Bug: lastItem != actionItem" );
241- runOrBuffer (lastItem );
190+ writeQueue .offer (actionItem );
191+ long stamp = writeLock .tryWriteLock ();
192+ if (stamp == 0L ) {
193+ return ;
194+ }
195+ try {
196+ if (readyAndDrained ) { // write to the outputStream directly
197+ ActionItem toWrite = writeQueue .poll ();
198+ if (toWrite != null ) {
199+ toWrite .run ();
200+ if (toWrite == completeAction ) {
201+ return ;
202+ }
203+ if (!isReady .getAsBoolean ()) {
204+ readyAndDrained = false ;
205+ }
242206 }
243- } // state has not changed since
207+ }
208+ } finally {
209+ writeLock .unlockWrite (stamp );
244210 }
245211 }
246212
@@ -254,43 +220,11 @@ interface ActionItem {
254220 @ VisibleForTesting // Lincheck test can not run with java.util.logging dependency.
255221 interface Log {
256222 default boolean isLoggable (Level level ) {
257- return false ;
223+ return false ;
258224 }
259225
260226 default void fine (String str , Object ...params ) {}
261227
262228 default void finest (String str , Object ...params ) {}
263229 }
264-
265- private static final class WriteState {
266-
267- static final WriteState DEFAULT = new WriteState (false );
268-
269- /**
270- * The servlet output stream is ready and the writeChain is empty.
271- *
272- * <p>readyAndDrained turns from false to true when:
273- * {@code onWritePossible()} exits while currently there is no more data to write, but the last
274- * check of {@link javax.servlet.ServletOutputStream#isReady()} is true.
275- *
276- * <p>readyAndDrained turns from true to false when:
277- * {@code runOrBuffer()} exits while either the action item is written directly to the
278- * servlet output stream and the check of {@link javax.servlet.ServletOutputStream#isReady()}
279- * right after that returns false, or the action item is buffered into the writeChain.
280- */
281- final boolean readyAndDrained ;
282-
283- WriteState (boolean readyAndDrained ) {
284- this .readyAndDrained = readyAndDrained ;
285- }
286-
287- /**
288- * Only {@code onWritePossible()} can set readyAndDrained to true, and only {@code
289- * runOrBuffer()} can set it to false.
290- */
291- @ CheckReturnValue
292- WriteState withReadyAndDrained (boolean readyAndDrained ) {
293- return new WriteState (readyAndDrained );
294- }
295- }
296230}
0 commit comments