33import java .util .concurrent .CancellationException ;
44import java .util .concurrent .CompletableFuture ;
55import java .util .concurrent .atomic .AtomicReference ;
6- import java .util .concurrent .locks .ReentrantLock ;
76
87import javax .annotation .Nullable ;
98
@@ -30,7 +29,6 @@ public class ReadStreamCall<ReqT, RespT> extends ClientCall.Listener<RespT> impl
3029
3130 private final String traceId ;
3231 private final ClientCall <ReqT , RespT > call ;
33- private final ReentrantLock callLock = new ReentrantLock ();
3432 private final GrpcStatusHandler statusConsumer ;
3533 private final ReqT request ;
3634 private final Metadata headers ;
@@ -58,40 +56,34 @@ public CompletableFuture<Status> start(Observer<RespT> observer) {
5856 throw new IllegalStateException ("Read stream call is already started" );
5957 }
6058
61- callLock .lock ();
62-
63- try {
64- call .start (this , headers );
65- call .request (1 );
66- if (logger .isTraceEnabled ()) {
67- logger .trace ("ReadStreamCall[{}] --> {}" , traceId , TextFormat .shortDebugString ((Message ) request ));
68- }
69- call .sendMessage (request );
70- // close stream by client side
71- call .halfClose ();
72- } catch (Throwable t ) {
59+ synchronized (call ) {
7360 try {
74- call .cancel (null , t );
75- } catch (Throwable ex ) {
76- logger .error ("ReadStreamCall[{}] got exception while canceling" , traceId , ex );
61+ call .start (this , headers );
62+ call .request (1 );
63+ if (logger .isTraceEnabled ()) {
64+ logger .trace ("ReadStreamCall[{}] --> {}" , traceId , TextFormat .shortDebugString ((Message ) request ));
65+ }
66+ call .sendMessage (request );
67+ // close stream by client side
68+ call .halfClose ();
69+ } catch (Throwable t ) {
70+ try {
71+ call .cancel (null , t );
72+ } catch (Throwable ex ) {
73+ logger .error ("ReadStreamCall[{}] got exception while canceling" , traceId , ex );
74+ }
75+
76+ statusFuture .completeExceptionally (t );
7777 }
78-
79- statusFuture .completeExceptionally (t );
80- } finally {
81- callLock .unlock ();
8278 }
8379
8480 return statusFuture ;
8581 }
8682
8783 @ Override
8884 public void cancel () {
89- callLock .lock ();
90-
91- try {
85+ synchronized (call ) {
9286 call .cancel ("Cancelled on user request" , new CancellationException ());
93- } finally {
94- callLock .unlock ();
9587 }
9688 }
9789
@@ -103,23 +95,18 @@ public void onMessage(RespT message) {
10395 }
10496 observerReference .get ().onNext (message );
10597 // request delivery of the next inbound message.
106- callLock .lock ();
107-
108- try {
98+ synchronized (call ) {
10999 call .request (1 );
110- } finally {
111- callLock .unlock ();
112100 }
113101 } catch (Exception ex ) {
114102 statusFuture .completeExceptionally (ex );
115- callLock .lock ();
116103
117104 try {
118- call .cancel ("Canceled by exception from observer" , ex );
105+ synchronized (call ) {
106+ call .cancel ("Canceled by exception from observer" , ex );
107+ }
119108 } catch (Throwable th ) {
120109 logger .error ("ReadStreamCall[{}] got exception while canceling" , traceId , th );
121- } finally {
122- callLock .unlock ();
123110 }
124111 }
125112 }
0 commit comments