Skip to content

Commit 4df30ad

Browse files
committed
Fixed session pool deadlock
1 parent 4ece287 commit 4df30ad

File tree

4 files changed

+26
-15
lines changed

4 files changed

+26
-15
lines changed

core/src/test/java/tech/ydb/core/impl/pool/EndpointPoolTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,12 @@ public class EndpointPoolTest {
3737

3838
@Before
3939
public void setUp() throws IOException {
40+
Mockito.doNothing().when(socket).connect(Mockito.any(SocketAddress.class));
41+
Mockito.when(socketFactory.createSocket()).thenReturn(socket);
42+
4043
mocks = MockitoAnnotations.openMocks(this);
4144
threadLocalStaticMock.when(ThreadLocalRandom::current).thenReturn(random);
4245
socketFactoryStaticMock.when(SocketFactory::getDefault).thenReturn(socketFactory);
43-
Mockito.when(socketFactory.createSocket()).thenReturn(socket);
44-
Mockito.doNothing().when(socket).connect(Mockito.any(SocketAddress.class));
4546
}
4647

4748
@After

table/src/main/java/tech/ydb/table/impl/pool/WaitingQueue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ public void delete(T object) {
140140
handler.destroy(object);
141141

142142
// After deleting one object we can try to create new pending if it needed
143-
checkNextWaitingAcquire();
143+
CompletableFuture.runAsync(this::checkNextWaitingAcquire);
144144
}
145145

146146
@Override

table/src/test/java/tech/ydb/table/impl/pool/MockedScheduler.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.util.concurrent.ScheduledFuture;
1717
import java.util.concurrent.TimeUnit;
1818
import java.util.concurrent.TimeoutException;
19+
1920
import org.junit.Assert;
2021

2122
/**
@@ -30,11 +31,11 @@ public class MockedScheduler implements ScheduledExecutorService {
3031
public MockedScheduler(MockedClock clock) {
3132
this.clock = clock;
3233
}
33-
34+
3435
public Checker check() {
3536
return new Checker();
3637
}
37-
38+
3839
public void runTasksTo(Instant timestamp, Runnable... runs) {
3940
int runIdx = 0;
4041
MockedTask<?> next = tasks.peek();
@@ -57,7 +58,7 @@ public void runTasksTo(Instant timestamp, Runnable... runs) {
5758

5859
clock.goToFuture(timestamp);
5960
}
60-
61+
6162
@Override
6263
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
6364
Instant time = clock.instant().plusNanos(unit.toNanos(delay));
@@ -133,27 +134,27 @@ public <T> Future<T> submit(Callable<T> task) {
133134

134135
@Override
135136
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
136-
throw new UnsupportedOperationException("Not supported yet.");
137+
throw new UnsupportedOperationException("Not supported yet.");
137138
}
138139

139140
@Override
140141
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
141-
throw new UnsupportedOperationException("Not supported yet.");
142+
throw new UnsupportedOperationException("Not supported yet.");
142143
}
143144

144145
@Override
145146
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
146-
throw new UnsupportedOperationException("Not supported yet.");
147+
throw new UnsupportedOperationException("Not supported yet.");
147148
}
148149

149150
@Override
150151
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
151-
throw new UnsupportedOperationException("Not supported yet.");
152+
throw new UnsupportedOperationException("Not supported yet.");
152153
}
153154

154155
@Override
155156
public void execute(Runnable command) {
156-
throw new UnsupportedOperationException("Not supported yet.");
157+
throw new UnsupportedOperationException("Not supported yet.");
157158
}
158159

159160
private class MockedTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
@@ -195,7 +196,7 @@ public int compareTo(Delayed other) {
195196
MockedTask<?> x = (MockedTask<?>)other;
196197
return time.compareTo(x.time);
197198
}
198-
199+
199200
@SuppressWarnings("null")
200201
long diff = getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS);
201202
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
@@ -225,7 +226,7 @@ public void run() {
225226
cancel(false);
226227
return;
227228
}
228-
229+
229230
if (isPeriodic()) {
230231
if (super.runAndReset()) {
231232
setNextRunTime();
@@ -236,14 +237,14 @@ public void run() {
236237
}
237238
}
238239
}
239-
240+
240241
public class Checker {
241242
public Checker isClosed() {
242243
Assert.assertTrue("Scheduler is shutdown", isShutdown());
243244
Assert.assertTrue("Scheduler is terminated", isTerminated());
244245
return this;
245246
}
246-
247+
247248
public Checker hasNoTasks() {
248249
Assert.assertTrue("Scheduler hasn't tasks", tasks.isEmpty());
249250
return this;

table/src/test/java/tech/ydb/table/impl/pool/WaitingQueueTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import java.util.Set;
99
import java.util.concurrent.CompletableFuture;
1010
import java.util.concurrent.ConcurrentLinkedQueue;
11+
import java.util.concurrent.ForkJoinPool;
12+
import java.util.concurrent.TimeUnit;
1113
import java.util.concurrent.atomic.AtomicInteger;
1214
import java.util.function.Supplier;
1315

@@ -595,6 +597,7 @@ public void checkWaitingAfterDeleteTest() {
595597
// After deleting current resource queue must create new pending to complete waiting
596598
queue.delete(r1);
597599

600+
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.SECONDS);
598601
check(rs).requestsCount(1).activeCount(0);
599602
check(queue).queueSize(1).idleSize(0).waitingsCount(3);
600603
futureIsPending(w1);
@@ -611,6 +614,8 @@ public void checkWaitingAfterDeleteTest() {
611614
Assert.assertNotEquals("After deleting waiting got different resource ", r1, r2);
612615

613616
queue.delete(r2);
617+
618+
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.SECONDS);
614619
check(rs).requestsCount(1).activeCount(0);
615620

616621
// If pending completed with exception - queue must repeat it
@@ -623,16 +628,20 @@ public void checkWaitingAfterDeleteTest() {
623628
futureIsPending(w3);
624629

625630
Assert.assertNotEquals("After deleting waiting got different resource ", r2, r3);
631+
632+
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.SECONDS);
626633
check(rs).requestsCount(0).activeCount(1);
627634
check(queue).queueSize(1).idleSize(0).waitingsCount(1);
628635

629636
queue.delete(r3);
630637

631638
// After canceling of pending waiting queue will move resource to idle
639+
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.SECONDS);
632640
check(rs).requestsCount(1).activeCount(0);
633641
w3.cancel(true);
634642
rs.completeNext();
635643

644+
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.SECONDS);
636645
check(rs).requestsCount(0).activeCount(1);
637646
check(queue).queueSize(1).idleSize(1).waitingsCount(0);
638647

0 commit comments

Comments
 (0)