29
29
import java .util .concurrent .BlockingQueue ;
30
30
import java .util .concurrent .TimeUnit ;
31
31
import java .util .concurrent .TimeoutException ;
32
+ import java .util .concurrent .atomic .AtomicReference ;
32
33
import java .util .logging .Level ;
33
34
import java .util .logging .Logger ;
34
35
@@ -69,7 +70,7 @@ public final class BlockingClientCall<ReqT, RespT> {
69
70
private final ThreadSafeThreadlessExecutor executor ;
70
71
71
72
private boolean writeClosed ;
72
- private volatile Status closedStatus ; // null if not closed
73
+ private AtomicReference < CloseState > closeState = new AtomicReference <>();
73
74
74
75
BlockingClientCall (ClientCall <ReqT , RespT > call , ThreadSafeThreadlessExecutor executor ) {
75
76
this .call = call ;
@@ -120,22 +121,22 @@ private RespT read(boolean waitForever, long endNanoTime)
120
121
logger .finer ("Client Blocking read had value: " + bufferedValue );
121
122
}
122
123
123
- Status currentClosedStatus ;
124
+ CloseState currentCloseState ;
124
125
if (bufferedValue != null ) {
125
126
call .request (1 );
126
127
return bufferedValue ;
127
- } else if ((currentClosedStatus = closedStatus ) == null ) {
128
+ } else if ((currentCloseState = closeState . get () ) == null ) {
128
129
throw new IllegalStateException (
129
130
"The message disappeared... are you reading from multiple threads?" );
130
- } else if (!currentClosedStatus .isOk ()) {
131
- throw currentClosedStatus . asException ();
131
+ } else if (!currentCloseState . status .isOk ()) {
132
+ throw currentCloseState . status . asException (currentCloseState . trailers );
132
133
} else {
133
134
return null ;
134
135
}
135
136
}
136
137
137
138
boolean skipWaitingForRead () {
138
- return closedStatus != null || !buffer .isEmpty ();
139
+ return closeState . get () != null || !buffer .isEmpty ();
139
140
}
140
141
141
142
/**
@@ -148,11 +149,11 @@ boolean skipWaitingForRead() {
148
149
* @throws StatusException If the stream was closed in an error state
149
150
*/
150
151
public boolean hasNext () throws InterruptedException , StatusException {
151
- executor .waitAndDrain ((x ) -> !x .buffer .isEmpty () || x .closedStatus != null , this );
152
+ executor .waitAndDrain ((x ) -> !x .buffer .isEmpty () || x .closeState . get () != null , this );
152
153
153
- Status currentClosedStatus = closedStatus ;
154
- if (currentClosedStatus != null && !currentClosedStatus .isOk ()) {
155
- throw currentClosedStatus . asException ();
154
+ CloseState currentCloseState = closeState . get () ;
155
+ if (currentCloseState != null && !currentCloseState . status .isOk ()) {
156
+ throw currentCloseState . status . asException (currentCloseState . trailers );
156
157
}
157
158
158
159
return !buffer .isEmpty ();
@@ -221,17 +222,16 @@ private boolean write(boolean waitForever, ReqT request, long endNanoTime)
221
222
}
222
223
223
224
Predicate <BlockingClientCall <ReqT , RespT >> predicate =
224
- (x ) -> x .call .isReady () || x .closedStatus != null ;
225
+ (x ) -> x .call .isReady () || x .closeState . get () != null ;
225
226
executor .waitAndDrainWithTimeout (waitForever , endNanoTime , predicate , this );
226
- Status savedClosedStatus = closedStatus ;
227
- if (savedClosedStatus == null ) {
227
+ CloseState savedCloseState = closeState . get () ;
228
+ if (savedCloseState == null || savedCloseState . status == null ) {
228
229
call .sendMessage (request );
229
230
return true ;
230
- } else if (savedClosedStatus .isOk ()) {
231
+ } else if (savedCloseState . status .isOk ()) {
231
232
return false ;
232
233
} else {
233
- // Propagate any errors returned from the server
234
- throw savedClosedStatus .asException ();
234
+ throw savedCloseState .status .asException (savedCloseState .trailers );
235
235
}
236
236
}
237
237
@@ -274,7 +274,8 @@ public void halfClose() {
274
274
@ VisibleForTesting
275
275
Status getClosedStatus () {
276
276
drainQuietly ();
277
- return closedStatus ;
277
+ CloseState state = closeState .get ();
278
+ return (state == null ) ? null : state .status ;
278
279
}
279
280
280
281
/**
@@ -317,7 +318,7 @@ boolean isWriteReady() {
317
318
* @return True if writes haven't been closed and the server hasn't closed the stream
318
319
*/
319
320
private boolean isWriteLegal () {
320
- return !writeClosed && closedStatus == null ;
321
+ return !writeClosed && closeState . get () == null ;
321
322
}
322
323
323
324
ClientCall .Listener <RespT > getListener () {
@@ -335,15 +336,25 @@ private void drainQuietly() {
335
336
private final class QueuingListener extends ClientCall .Listener <RespT > {
336
337
@ Override
337
338
public void onMessage (RespT value ) {
338
- Preconditions .checkState (closedStatus == null , "ClientCall already closed" );
339
+ Preconditions .checkState (closeState . get () == null , "ClientCall already closed" );
339
340
buffer .add (value );
340
341
}
341
342
342
343
@ Override
343
344
public void onClose (Status status , Metadata trailers ) {
344
- Preconditions .checkState (closedStatus == null , "ClientCall already closed" );
345
- closedStatus = status ;
345
+ CloseState newCloseState = new CloseState (status , trailers );
346
+ boolean wasSet = closeState .compareAndSet (null , newCloseState );
347
+ Preconditions .checkState (wasSet , "ClientCall already closed" );
346
348
}
347
349
}
348
350
351
+ private static final class CloseState {
352
+ final Status status ;
353
+ final Metadata trailers ;
354
+
355
+ CloseState (Status status , Metadata trailers ) {
356
+ this .status = Preconditions .checkNotNull (status , "status" );
357
+ this .trailers = trailers ;
358
+ }
359
+ }
349
360
}
0 commit comments