Skip to content

Commit 0ed2460

Browse files
author
Igor Melnichenko
committed
synchronized blocks were replaced with ReentrantLock in ReadStreamCall
1 parent 9a7adfa commit 0ed2460

File tree

1 file changed

+35
-22
lines changed

1 file changed

+35
-22
lines changed

core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.util.concurrent.CancellationException;
44
import java.util.concurrent.CompletableFuture;
55
import java.util.concurrent.atomic.AtomicReference;
6+
import java.util.concurrent.locks.ReentrantLock;
67

78
import javax.annotation.Nullable;
89

@@ -29,6 +30,7 @@ public class ReadStreamCall<ReqT, RespT> extends ClientCall.Listener<RespT> impl
2930

3031
private final String traceId;
3132
private final ClientCall<ReqT, RespT> call;
33+
private final ReentrantLock callLock = new ReentrantLock();
3234
private final GrpcStatusHandler statusConsumer;
3335
private final ReqT request;
3436
private final Metadata headers;
@@ -56,34 +58,40 @@ public CompletableFuture<Status> start(Observer<RespT> observer) {
5658
throw new IllegalStateException("Read stream call is already started");
5759
}
5860

59-
synchronized (call) {
61+
callLock.lock();
62+
63+
try {
64+
call.start(this, headers);
65+
call.request(1);
66+
if (logger.isTraceEnabled()) {
67+
logger.trace("ReadStreamCall[{}] --> {}", traceId, TextFormat.shortDebugString((Message) request));
68+
}
69+
call.sendMessage(request);
70+
// close stream by client side
71+
call.halfClose();
72+
} catch (Throwable t) {
6073
try {
61-
call.start(this, headers);
62-
call.request(1);
63-
if (logger.isTraceEnabled()) {
64-
logger.trace("ReadStreamCall[{}] --> {}", traceId, TextFormat.shortDebugString((Message) request));
65-
}
66-
call.sendMessage(request);
67-
// close stream by client side
68-
call.halfClose();
69-
} catch (Throwable t) {
70-
try {
71-
call.cancel(null, t);
72-
} catch (Throwable ex) {
73-
logger.error("ReadStreamCall[{}] got exception while canceling", traceId, ex);
74-
}
75-
76-
statusFuture.completeExceptionally(t);
74+
call.cancel(null, t);
75+
} catch (Throwable ex) {
76+
logger.error("ReadStreamCall[{}] got exception while canceling", traceId, ex);
7777
}
78+
79+
statusFuture.completeExceptionally(t);
80+
} finally {
81+
callLock.unlock();
7882
}
7983

8084
return statusFuture;
8185
}
8286

8387
@Override
8488
public void cancel() {
85-
synchronized (call) {
89+
callLock.lock();
90+
91+
try {
8692
call.cancel("Cancelled on user request", new CancellationException());
93+
} finally {
94+
callLock.unlock();
8795
}
8896
}
8997

@@ -95,18 +103,23 @@ public void onMessage(RespT message) {
95103
}
96104
observerReference.get().onNext(message);
97105
// request delivery of the next inbound message.
98-
synchronized (call) {
106+
callLock.lock();
107+
108+
try {
99109
call.request(1);
110+
} finally {
111+
callLock.unlock();
100112
}
101113
} catch (Exception ex) {
102114
statusFuture.completeExceptionally(ex);
115+
callLock.lock();
103116

104117
try {
105-
synchronized (call) {
106-
call.cancel("Canceled by exception from observer", ex);
107-
}
118+
call.cancel("Canceled by exception from observer", ex);
108119
} catch (Throwable th) {
109120
logger.error("ReadStreamCall[{}] got exception while canceling", traceId, th);
121+
} finally {
122+
callLock.unlock();
110123
}
111124
}
112125
}

0 commit comments

Comments
 (0)