55import java .util .concurrent .CancellationException ;
66import java .util .concurrent .CompletableFuture ;
77import java .util .concurrent .atomic .AtomicReference ;
8+ import java .util .concurrent .locks .ReentrantLock ;
89
910import javax .annotation .Nullable ;
1011
1819import tech .ydb .core .Status ;
1920import tech .ydb .core .grpc .GrpcReadWriteStream ;
2021import tech .ydb .core .grpc .GrpcStatuses ;
21- import tech .ydb .core .grpc .GrpcTransport ;
2222import tech .ydb .core .impl .auth .AuthCallOptions ;
2323
2424/**
2828 * @param <W> type of message to be sent to the server
2929 */
3030public class ReadWriteStreamCall <R , W > extends ClientCall .Listener <R > implements GrpcReadWriteStream <R , W > {
31- private static final Logger logger = LoggerFactory .getLogger (GrpcTransport .class );
31+ private static final Logger logger = LoggerFactory .getLogger (ReadWriteStreamCall .class );
3232
3333 private final String traceId ;
3434 private final ClientCall <W , R > call ;
35+ private final ReentrantLock callLock = new ReentrantLock ();
3536 private final GrpcStatusHandler statusConsumer ;
3637 private final Metadata headers ;
3738 private final AuthCallOptions callOptions ;
@@ -65,32 +66,38 @@ public CompletableFuture<Status> start(Observer<R> observer) {
6566 throw new IllegalStateException ("Read stream call is already started" );
6667 }
6768
68- synchronized (call ) {
69+ callLock .lock ();
70+
71+ try {
72+ call .start (this , headers );
73+ call .request (1 );
74+ } catch (Throwable t ) {
6975 try {
70- call .start (this , headers );
71- call .request (1 );
72- } catch (Throwable t ) {
73- try {
74- call .cancel (null , t );
75- } catch (Throwable ex ) {
76- logger .error ("Exception encountered while closing the unary call" , ex );
77- }
78-
79- statusFuture .completeExceptionally (t );
76+ call .cancel (null , t );
77+ } catch (Throwable ex ) {
78+ logger .error ("Exception encountered while closing the unary call" , ex );
8079 }
80+
81+ statusFuture .completeExceptionally (t );
82+ } finally {
83+ callLock .unlock ();
8184 }
8285
8386 return statusFuture ;
8487 }
8588
8689 @ Override
8790 public void sendNext (W message ) {
88- synchronized (call ) {
91+ callLock .lock ();
92+
93+ try {
8994 if (flush ()) {
9095 call .sendMessage (message );
9196 } else {
9297 messagesQueue .add (message );
9398 }
99+ } finally {
100+ callLock .unlock ();
94101 }
95102 }
96103
@@ -112,8 +119,12 @@ private boolean flush() {
112119
113120 @ Override
114121 public void cancel () {
115- synchronized (call ) {
122+ callLock .lock ();
123+
124+ try {
116125 call .cancel ("Cancelled on user request" , new CancellationException ());
126+ } finally {
127+ callLock .unlock ();
117128 }
118129 }
119130
@@ -126,40 +137,53 @@ public void onMessage(R message) {
126137
127138 observerReference .get ().onNext (message );
128139 // request delivery of the next inbound message.
129- synchronized (call ) {
140+ callLock .lock ();
141+
142+ try {
130143 call .request (1 );
144+ } finally {
145+ callLock .unlock ();
131146 }
132147 } catch (Exception ex ) {
133148 statusFuture .completeExceptionally (ex );
149+ callLock .lock ();
134150
135151 try {
136- synchronized (call ) {
137- call .cancel ("Canceled by exception from observer" , ex );
138- }
152+ call .cancel ("Canceled by exception from observer" , ex );
139153 } catch (Throwable th ) {
140154 logger .error ("Exception encountered while canceling the read write stream call" , th );
155+ } finally {
156+ callLock .unlock ();
141157 }
142158 }
143159 }
144160
145161 @ Override
146162 public void onReady () {
147- synchronized (call ) {
163+ callLock .lock ();
164+
165+ try {
148166 flush ();
167+ } finally {
168+ callLock .unlock ();
149169 }
150170 }
151171
152172 @ Override
153173 public void close () {
154- synchronized (call ) {
174+ callLock .lock ();
175+
176+ try {
155177 call .halfClose ();
178+ } finally {
179+ callLock .unlock ();
156180 }
157181 }
158182
159183 @ Override
160184 public void onClose (io .grpc .Status status , @ Nullable Metadata trailers ) {
161185 if (logger .isTraceEnabled ()) {
162- logger .trace ("ReadWriteStreamCall[{}] closed with status {}" , status );
186+ logger .trace ("ReadWriteStreamCall[{}] closed with status {}" , traceId , status );
163187 }
164188 statusConsumer .accept (status , trailers );
165189
@@ -170,4 +194,3 @@ public void onClose(io.grpc.Status status, @Nullable Metadata trailers) {
170194 }
171195 }
172196}
173-
0 commit comments