Skip to content

Commit 52d188e

Browse files
authored
Merge pull request #381 from Eistern/feature/replace_synchronized_for_locks
Prevent Virtual Thread Deadlocks by Replacing Synchronized Blocks
2 parents 291f913 + 19ce14a commit 52d188e

File tree

7 files changed

+235
-122
lines changed

7 files changed

+235
-122
lines changed

core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import java.util.concurrent.CancellationException;
44
import java.util.concurrent.CompletableFuture;
55
import java.util.concurrent.atomic.AtomicReference;
6+
import java.util.concurrent.locks.Lock;
7+
import java.util.concurrent.locks.ReentrantLock;
68

79
import javax.annotation.Nullable;
810

@@ -29,6 +31,7 @@ public class ReadStreamCall<ReqT, RespT> extends ClientCall.Listener<RespT> impl
2931

3032
private final String traceId;
3133
private final ClientCall<ReqT, RespT> call;
34+
private final Lock callLock = new ReentrantLock();
3235
private final GrpcStatusHandler statusConsumer;
3336
private final ReqT request;
3437
private final Metadata headers;
@@ -56,34 +59,38 @@ public CompletableFuture<Status> start(Observer<RespT> observer) {
5659
throw new IllegalStateException("Read stream call is already started");
5760
}
5861

59-
synchronized (call) {
62+
callLock.lock();
63+
try {
64+
call.start(this, headers);
65+
if (logger.isTraceEnabled()) {
66+
logger.trace("ReadStreamCall[{}] --> {}", traceId, TextFormat.shortDebugString((Message) request));
67+
}
68+
call.sendMessage(request);
69+
// close stream by client side
70+
call.halfClose();
71+
call.request(1);
72+
} catch (Throwable t) {
6073
try {
61-
call.start(this, headers);
62-
if (logger.isTraceEnabled()) {
63-
logger.trace("ReadStreamCall[{}] --> {}", traceId, TextFormat.shortDebugString((Message) request));
64-
}
65-
call.sendMessage(request);
66-
// close stream by client side
67-
call.halfClose();
68-
call.request(1);
69-
} catch (Throwable t) {
70-
try {
71-
call.cancel(null, t);
72-
} catch (Throwable ex) {
73-
logger.error("ReadStreamCall[{}] got exception while canceling", traceId, ex);
74-
}
75-
76-
statusFuture.completeExceptionally(t);
74+
call.cancel(null, t);
75+
} catch (Throwable ex) {
76+
logger.error("ReadStreamCall[{}] got exception while canceling", traceId, ex);
7777
}
78+
79+
statusFuture.completeExceptionally(t);
80+
} finally {
81+
callLock.unlock();
7882
}
7983

8084
return statusFuture;
8185
}
8286

8387
@Override
8488
public void cancel() {
85-
synchronized (call) {
89+
callLock.lock();
90+
try {
8691
call.cancel("Cancelled on user request", new CancellationException());
92+
} finally {
93+
callLock.unlock();
8794
}
8895
}
8996

@@ -95,15 +102,21 @@ public void onMessage(RespT message) {
95102
}
96103
observerReference.get().onNext(message);
97104
// request delivery of the next inbound message.
98-
synchronized (call) {
105+
callLock.lock();
106+
try {
99107
call.request(1);
108+
} finally {
109+
callLock.unlock();
100110
}
101111
} catch (Exception ex) {
102112
statusFuture.completeExceptionally(ex);
103113

104114
try {
105-
synchronized (call) {
115+
callLock.lock();
116+
try {
106117
call.cancel("Canceled by exception from observer", ex);
118+
} finally {
119+
callLock.unlock();
107120
}
108121
} catch (Throwable th) {
109122
logger.error("ReadStreamCall[{}] got exception while canceling", traceId, th);

core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import java.util.concurrent.CancellationException;
66
import java.util.concurrent.CompletableFuture;
77
import java.util.concurrent.atomic.AtomicReference;
8+
import java.util.concurrent.locks.Lock;
9+
import java.util.concurrent.locks.ReentrantLock;
810

911
import javax.annotation.Nullable;
1012

@@ -33,6 +35,7 @@ public class ReadWriteStreamCall<R, W> extends ClientCall.Listener<R> implements
3335

3436
private final String traceId;
3537
private final ClientCall<W, R> call;
38+
private final Lock callLock = new ReentrantLock();
3639
private final GrpcStatusHandler statusConsumer;
3740
private final Metadata headers;
3841
private final AuthCallOptions callOptions;
@@ -66,27 +69,29 @@ public CompletableFuture<Status> start(Observer<R> observer) {
6669
throw new IllegalStateException("Read stream call is already started");
6770
}
6871

69-
synchronized (call) {
72+
callLock.lock();
73+
try {
74+
call.start(this, headers);
75+
call.request(1);
76+
} catch (Throwable t) {
7077
try {
71-
call.start(this, headers);
72-
call.request(1);
73-
} catch (Throwable t) {
74-
try {
75-
call.cancel(null, t);
76-
} catch (Throwable ex) {
77-
logger.error("Exception encountered while closing the unary call", ex);
78-
}
79-
80-
statusFuture.completeExceptionally(t);
78+
call.cancel(null, t);
79+
} catch (Throwable ex) {
80+
logger.error("Exception encountered while closing the unary call", ex);
8181
}
82+
83+
statusFuture.completeExceptionally(t);
84+
} finally {
85+
callLock.unlock();
8286
}
8387

8488
return statusFuture;
8589
}
8690

8791
@Override
8892
public void sendNext(W message) {
89-
synchronized (call) {
93+
callLock.lock();
94+
try {
9095
if (flush()) {
9196
if (logger.isTraceEnabled()) {
9297
String msg = TextFormat.shortDebugString((Message) message);
@@ -96,6 +101,8 @@ public void sendNext(W message) {
96101
} else {
97102
messagesQueue.add(message);
98103
}
104+
} finally {
105+
callLock.unlock();
99106
}
100107
}
101108

@@ -118,8 +125,11 @@ private boolean flush() {
118125

119126
@Override
120127
public void cancel() {
121-
synchronized (call) {
128+
callLock.lock();
129+
try {
122130
call.cancel("Cancelled on user request", new CancellationException());
131+
} finally {
132+
callLock.unlock();
123133
}
124134
}
125135

@@ -132,15 +142,21 @@ public void onMessage(R message) {
132142

133143
observerReference.get().onNext(message);
134144
// request delivery of the next inbound message.
135-
synchronized (call) {
145+
callLock.lock();
146+
try {
136147
call.request(1);
148+
} finally {
149+
callLock.unlock();
137150
}
138151
} catch (Exception ex) {
139152
statusFuture.completeExceptionally(ex);
140153

141154
try {
142-
synchronized (call) {
155+
callLock.lock();
156+
try {
143157
call.cancel("Canceled by exception from observer", ex);
158+
} finally {
159+
callLock.unlock();
144160
}
145161
} catch (Throwable th) {
146162
logger.error("Exception encountered while canceling the read write stream call", th);
@@ -150,15 +166,21 @@ public void onMessage(R message) {
150166

151167
@Override
152168
public void onReady() {
153-
synchronized (call) {
169+
callLock.lock();
170+
try {
154171
flush();
172+
} finally {
173+
callLock.unlock();
155174
}
156175
}
157176

158177
@Override
159178
public void close() {
160-
synchronized (call) {
179+
callLock.lock();
180+
try {
161181
call.halfClose();
182+
} finally {
183+
callLock.unlock();
162184
}
163185
}
164186

@@ -176,4 +198,3 @@ public void onClose(io.grpc.Status status, @Nullable Metadata trailers) {
176198
}
177199
}
178200
}
179-

core/src/test/java/tech/ydb/core/impl/MockedScheduler.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import java.util.concurrent.ScheduledFuture;
1919
import java.util.concurrent.TimeUnit;
2020
import java.util.concurrent.TimeoutException;
21+
import java.util.concurrent.locks.Lock;
22+
import java.util.concurrent.locks.ReentrantLock;
2123

2224
import org.junit.Assert;
2325

@@ -27,6 +29,7 @@
2729
*/
2830
public class MockedScheduler implements ScheduledExecutorService {
2931
private final MockedClock clock;
32+
private final Lock nextTaskLock = new ReentrantLock();
3033
private final Queue<MockedTask<?>> tasks = new ConcurrentLinkedQueue<>();
3134

3235
private volatile boolean queueIsBlocked = false;
@@ -46,18 +49,23 @@ public MockedScheduler hasTasksCount(int count) {
4649
return this;
4750
}
4851

49-
public synchronized MockedScheduler runNextTask() {
50-
queueIsBlocked = true;
51-
MockedTask<?> next = tasks.poll();
52-
Assert.assertNotNull("Scheduler's queue is empty", next);
53-
clock.goToFuture(next.time);
54-
next.run();
55-
if (next.time != null) {
56-
tasks.add(next);
57-
}
52+
public MockedScheduler runNextTask() {
53+
nextTaskLock.lock();
54+
try {
55+
queueIsBlocked = true;
56+
MockedTask<?> next = tasks.poll();
57+
Assert.assertNotNull("Scheduler's queue is empty", next);
58+
clock.goToFuture(next.time);
59+
next.run();
60+
if (next.time != null) {
61+
tasks.add(next);
62+
}
5863

59-
queueIsBlocked = false;
60-
return this;
64+
queueIsBlocked = false;
65+
return this;
66+
} finally {
67+
nextTaskLock.unlock();
68+
}
6169
}
6270

6371
@Override

core/src/test/java/tech/ydb/core/impl/pool/ManagedChannelMock.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import java.util.concurrent.Executors;
77
import java.util.concurrent.LinkedBlockingDeque;
88
import java.util.concurrent.TimeUnit;
9+
import java.util.concurrent.locks.Lock;
10+
import java.util.concurrent.locks.ReentrantLock;
911

1012
import io.grpc.CallOptions;
1113
import io.grpc.ClientCall;
@@ -26,7 +28,7 @@ public class ManagedChannelMock extends ManagedChannel {
2628

2729
private final ExecutorService executor = Executors.newFixedThreadPool(1);
2830
private final BlockingQueue<ConnectivityState> nextStates = new LinkedBlockingDeque<>();
29-
private final Object sync = new Object();
31+
private final Lock sync = new ReentrantLock();
3032

3133
private volatile ConnectivityState state;
3234
private Runnable listener = null;
@@ -49,14 +51,17 @@ private void requestUpdate() {
4951

5052
logger.trace("next mock state {}", next);
5153

52-
synchronized (sync) {
54+
sync.lock();
55+
try {
5356
this.state = next;
5457
if (this.listener != null) {
5558
Runnable callback = this.listener;
5659
logger.trace("call listener {}", callback.hashCode());
5760
this.listener = null;
5861
callback.run();
5962
}
63+
} finally {
64+
sync.unlock();
6065
}
6166
});
6267
}
@@ -90,25 +95,31 @@ public String authority() {
9095

9196
@Override
9297
public ConnectivityState getState(boolean requestConnection) {
93-
synchronized (sync) {
98+
sync.lock();
99+
try {
94100
logger.trace("get state {} with request {}", state, requestConnection);
95101
if (requestConnection) {
96102
requestUpdate();
97103
}
98104
return state;
105+
} finally {
106+
sync.unlock();
99107
}
100108
}
101109

102110
@Override
103111
public void notifyWhenStateChanged(ConnectivityState source, Runnable callback) {
104-
synchronized (sync) {
112+
sync.lock();
113+
try {
105114
logger.trace("notify of changes for state {} with current {} and callback {}",
106115
source, state, callback.hashCode());
107116
if (source != state) {
108117
callback.run();
109118
} else {
110119
this.listener = callback;
111120
}
121+
} finally {
122+
sync.unlock();
112123
}
113124
requestUpdate();
114125
}

0 commit comments

Comments
 (0)