Skip to content

Commit 53c26f4

Browse files
committed
CR
1 parent 56706d3 commit 53c26f4

File tree

2 files changed

+54
-35
lines changed

2 files changed

+54
-35
lines changed

servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java

Lines changed: 52 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@
2626
import java.io.IOException;
2727
import java.util.Queue;
2828
import java.util.concurrent.ConcurrentLinkedQueue;
29-
import java.util.concurrent.locks.Lock;
30-
import java.util.concurrent.locks.ReentrantLock;
29+
import java.util.concurrent.locks.StampedLock;
3130
import java.util.function.BiFunction;
3231
import java.util.function.BooleanSupplier;
3332
import java.util.logging.Level;
@@ -38,13 +37,34 @@
3837
/** Handles write actions from the container thread and the application thread. */
3938
final class AsyncServletOutputStreamWriter {
4039

40+
private final StampedLock writeLock = new StampedLock();
41+
42+
/**
43+
* The servlet output stream is ready, and the writeQueue is empty.
44+
*
45+
* <p>There are two threads, the container thread (calling {@code onWritePossible()}) and the
46+
* application thread (calling {@code runOrBuffer()}) that read and update the
47+
* writeState. Only onWritePossible() may turn {@code readyAndDrained} from false to true, and
48+
* only runOrBuffer() may turn it from true to false.
49+
*
50+
* <p>readyAndDrained turns from false to true when:
51+
* {@code onWritePossible()} exits while currently there is no more data to write, but the last
52+
* check of {@link javax.servlet.ServletOutputStream#isReady()} is true.
53+
*
54+
* <p>readyAndDrained turns from true to false when:
55+
* {@code runOrBuffer()} exits while either the action item is written directly to the
56+
* servlet output stream and the check of {@link javax.servlet.ServletOutputStream#isReady()}
57+
* right after that returns false, or the action item is buffered into the writeQueue.
58+
*/
59+
// @GuardedBy("writeLock")
60+
private boolean readyAndDrained;
61+
4162
private final Log log;
4263
private final BiFunction<byte[], Integer, ActionItem> writeAction;
4364
private final ActionItem flushAction;
4465
private final ActionItem completeAction;
4566
private final BooleanSupplier isReady;
4667

47-
private final Lock writeLock = new ReentrantLock();
4868
/**
4969
* New write actions will be buffered into this queue.
5070
*/
@@ -143,35 +163,21 @@ void complete() {
143163
/** Called from the container thread {@link javax.servlet.WriteListener#onWritePossible()}. */
144164
void onWritePossible() throws IOException {
145165
log.finest("onWritePossible: ENTRY. The servlet output stream becomes ready");
146-
do {
147-
writeLock.lock();
148-
try {
149-
if (writeFromQueue() == WriteResult.OUTPUT_NOT_READY) {
150-
log.finest("onWritePossible: EXIT. The servlet output stream becomes not ready");
166+
long stamp = writeLock.writeLock();
167+
try {
168+
while (isReady.getAsBoolean()) {
169+
ActionItem actionItem = writeQueue.poll();
170+
if (actionItem == null) {
171+
readyAndDrained = true;
172+
log.finest("onWritePossible: EXIT. Queue drained");
151173
return;
152174
}
153-
} finally {
154-
writeLock.unlock();
175+
actionItem.run();
155176
}
156-
// retry if runOrBuffer() added tasks, but have not acquired the lock
157-
} while (!writeQueue.isEmpty());
158-
log.finest("onWritePossible: EXIT. Queue drained");
159-
}
160-
161-
private enum WriteResult {
162-
OUTPUT_NOT_READY,
163-
QUEUE_DRAINED
164-
}
165-
166-
private WriteResult writeFromQueue() throws IOException {
167-
while (isReady.getAsBoolean()) {
168-
ActionItem actionItem = writeQueue.poll();
169-
if (actionItem == null) {
170-
return WriteResult.QUEUE_DRAINED;
171-
}
172-
actionItem.run();
177+
log.finest("onWritePossible: EXIT. The servlet output stream becomes not ready");
178+
} finally {
179+
writeLock.unlockWrite(stamp);
173180
}
174-
return WriteResult.OUTPUT_NOT_READY;
175181
}
176182

177183
/**
@@ -182,12 +188,25 @@ private WriteResult writeFromQueue() throws IOException {
182188
*/
183189
private void runOrBuffer(ActionItem actionItem) throws IOException {
184190
writeQueue.offer(actionItem);
185-
if (writeLock.tryLock()) { // write to the outputStream directly
186-
try {
187-
writeFromQueue();
188-
} finally {
189-
writeLock.unlock();
191+
long stamp = writeLock.tryWriteLock();
192+
if (stamp == 0L) {
193+
return;
194+
}
195+
try {
196+
if (readyAndDrained) { // write to the outputStream directly
197+
ActionItem toWrite = writeQueue.poll();
198+
if (toWrite != null) {
199+
toWrite.run();
200+
if (toWrite == completeAction) {
201+
return;
202+
}
203+
if (!isReady.getAsBoolean()) {
204+
readyAndDrained = false;
205+
}
206+
}
190207
}
208+
} finally {
209+
writeLock.unlockWrite(stamp);
191210
}
192211
}
193212

servlet/src/threadingTest/java/io/grpc/servlet/AsyncServletOutputStreamWriterConcurrencyTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,8 @@ private void writeOrFlush() {
103103

104104
private boolean isReady() {
105105
if (!isReady) {
106-
//assertWithMessage("isReady() already returned false, onWritePossible() will be invoked")
107-
//.that(isReadyReturnedFalse).isFalse();
106+
assertWithMessage("isReady() already returned false, onWritePossible() will be invoked")
107+
.that(isReadyReturnedFalse).isFalse();
108108
isReadyReturnedFalse = true;
109109
}
110110
return isReady;

0 commit comments

Comments
 (0)