Skip to content

Commit 8782c7e

Browse files
authored
Merge pull request #568 from alex268/master
Fixed session creation loop
2 parents cfe4187 + 19c2d07 commit 8782c7e

File tree

3 files changed

+49
-39
lines changed

3 files changed

+49
-39
lines changed

table/src/main/java/tech/ydb/table/impl/pool/WaitingQueue.java

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public interface Handler<T> {
6161

6262
@VisibleForTesting
6363
WaitingQueue(Handler<T> handler, int maxSize, int waitingsLimit) {
64-
Preconditions.checkArgument(maxSize > 0, "WaitingQueue max size (%d) must be positive", maxSize);
64+
Preconditions.checkArgument(maxSize > 0, "WaitingQueue max size (%s) must be positive", maxSize);
6565
Preconditions.checkArgument(handler != null, "WaitingQueue handler must be not null");
6666

6767
this.handler = handler;
@@ -267,23 +267,20 @@ private boolean tryToCompleteWaiting(T object) {
267267
}
268268

269269
private void checkNextWaitingAcquire() {
270-
if (stopped || waitingAcquires.isEmpty()) {
270+
if (stopped) {
271271
return;
272272
}
273273

274-
// Try to create new pending request
275-
CompletableFuture<T> pending = new CompletableFuture<>();
276-
if (tryToCreateNewPending(pending)) {
277-
pending.whenComplete((object, th) -> {
278-
if (th != null) {
279-
checkNextWaitingAcquire();
280-
}
281-
if (object != null) {
282-
if (!tryToCompleteWaiting(object)) {
283-
idle.offerFirst(object);
284-
}
285-
}
286-
});
274+
CompletableFuture<T> next = waitingAcquires.peek();
275+
while (next != null && next.isDone()) {
276+
waitingAcquires.remove();
277+
waitingAcqueireCount.decrementAndGet();
278+
next = waitingAcquires.peek();
279+
}
280+
281+
if (next != null && tryToCreateNewPending(next)) {
282+
waitingAcquires.remove();
283+
waitingAcqueireCount.decrementAndGet();
287284
}
288285
}
289286

@@ -343,15 +340,19 @@ public void accept(T object, Throwable th) {
343340
if (th != null) {
344341
queueSize.decrementAndGet();
345342
acquire.completeExceptionally(th);
343+
checkNextWaitingAcquire();
346344
return;
347345
}
348346

349-
if (!acquire.isDone() && safeAcquireObject(acquire, object)) {
347+
if (safeAcquireObject(acquire, object)) {
350348
return;
351349
}
352350

353-
// If acquire future is already canceled, put new object to hot queue
354-
idle.offerFirst(object); // ConcurrentLinkedQueue always return true
351+
// If acquire future is already canceled, try to complete waiting or put to hot queue
352+
if (!tryToCompleteWaiting(object)) {
353+
idle.offerFirst(object); // ConcurrentLinkedQueue always return true
354+
}
355+
355356
if (stopped) {
356357
clear();
357358
}

table/src/test/java/tech/ydb/table/impl/pool/SessionPoolTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,11 +117,10 @@ public void increaseLimitsTest() {
117117
// increase pool limit
118118
pool.updateMaxSize(2);
119119
tableRpc.check().sessionRequests(2);
120-
// NOTE: we have increasing of pendings because one request may be in both queues (pendings + waitings)
121-
check(pool).idle(0).acquired(0).pending(2 + wsize).size(0, 2).totalClient(0, 0).totalServer(2, 0, 0, 0);
120+
check(pool).idle(0).acquired(0).pending(1 + wsize).size(0, 2).totalClient(0, 0).totalServer(2, 0, 0, 0);
122121

123122
tableRpc.nextCreateSession().completeSuccess();
124-
check(pool).idle(0).acquired(1).pending(1 + wsize).size(0, 2).totalClient(1, 0).totalServer(2, 1, 0, 0);
123+
check(pool).idle(0).acquired(1).pending(wsize).size(0, 2).totalClient(1, 0).totalServer(2, 1, 0, 0);
125124

126125
tableRpc.nextCreateSession().completeSuccess();
127126
check(pool).idle(0).acquired(2).pending(-1 + wsize).size(0, 2).totalClient(2, 0).totalServer(2, 2, 0, 0);

table/src/test/java/tech/ydb/table/impl/pool/WaitingQueueTest.java

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -579,14 +579,16 @@ public void canceledWaitingTest() {
579579
@Test
580580
public void checkWaitingAfterDeleteTest() {
581581
ResourceHandler rs = new ResourceHandler();
582-
WaitingQueue<Resource> queue = new WaitingQueue<>(rs, 1, 3);
582+
WaitingQueue<Resource> queue = new WaitingQueue<>(rs, 1, 5);
583583

584584
CompletableFuture<Resource> f1 = pendingFuture(acquire(queue));
585585
CompletableFuture<Resource> w1 = pendingFuture(acquire(queue));
586586
CompletableFuture<Resource> w2 = pendingFuture(acquire(queue));
587587
CompletableFuture<Resource> w3 = pendingFuture(acquire(queue));
588+
CompletableFuture<Resource> w4 = pendingFuture(acquire(queue));
589+
CompletableFuture<Resource> w5 = pendingFuture(acquire(queue));
588590

589-
check(queue).queueSize(1).idleSize(0).waitingsCount(3);
591+
check(queue).queueSize(1).idleSize(0).waitingsCount(5);
590592
rs.completeNext();
591593
check(rs).requestsCount(0).activeCount(1);
592594

@@ -596,41 +598,46 @@ public void checkWaitingAfterDeleteTest() {
596598
queue.delete(r1);
597599

598600
check(rs).requestsCount(1).activeCount(0);
599-
check(queue).queueSize(1).idleSize(0).waitingsCount(3);
601+
check(queue).queueSize(1).idleSize(0).waitingsCount(4);
600602
futureIsPending(w1);
601603
futureIsPending(w2);
602604
futureIsPending(w3);
605+
futureIsPending(w4);
603606

604607
rs.completeNext();
605608
check(rs).requestsCount(0).activeCount(1);
606-
check(queue).queueSize(1).idleSize(0).waitingsCount(2);
609+
check(queue).queueSize(1).idleSize(0).waitingsCount(4);
607610

608611
Resource r2 = pendingIsReady(w1);
609612
futureIsPending(w2);
613+
futureIsPending(w3);
614+
w3.cancel(true);
610615

611616
Assert.assertNotEquals("After deleting waiting got different resource ", r1, r2);
612617

613618
queue.delete(r2);
614619
check(rs).requestsCount(1).activeCount(0);
615620

616-
// If pending completed with exception - queue must repeat it
621+
// If pending completed with exception - next waiting got that trouble
617622
rs.completeNextWithException(new RuntimeException("Trouble"));
623+
futureIsExceptionally(w2, "Trouble");
624+
618625
check(rs).requestsCount(1).activeCount(0);
619626

620627
rs.completeNext();
621628

622-
Resource r3 = pendingIsReady(w2);
623-
futureIsPending(w3);
629+
Resource r4 = pendingIsReady(w4);
630+
futureIsPending(w5);
624631

625-
Assert.assertNotEquals("After deleting waiting got different resource ", r2, r3);
632+
Assert.assertNotEquals("After deleting waiting got different resource ", r2, r4);
626633
check(rs).requestsCount(0).activeCount(1);
627634
check(queue).queueSize(1).idleSize(0).waitingsCount(1);
628635

629-
queue.delete(r3);
636+
queue.delete(r4);
630637

631638
// After canceling of pending waiting queue will move resource to idle
632639
check(rs).requestsCount(1).activeCount(0);
633-
w3.cancel(true);
640+
w5.cancel(true);
634641
rs.completeNext();
635642

636643
check(rs).requestsCount(0).activeCount(1);
@@ -660,14 +667,17 @@ public void cleanWaitingAfterClosingTest() {
660667
queue.release(r1);
661668
}
662669

663-
@Test(expected = IllegalArgumentException.class)
664-
public void validateQueueMaxSize() {
665-
new WaitingQueue<>(new ResourceHandler(), 0, 3);
666-
}
667-
668-
@Test(expected = IllegalArgumentException.class)
669-
public void validateQueueHandler() {
670-
new WaitingQueue<>(null, 1);
670+
@Test
671+
public void validateQueueArguments() {
672+
IllegalArgumentException ex1 = Assert.assertThrows("Invalid max size expected", IllegalArgumentException.class,
673+
() -> new WaitingQueue<>(new ResourceHandler(), 0, 3)
674+
);
675+
Assert.assertEquals("WaitingQueue max size (0) must be positive", ex1.getMessage());
676+
677+
IllegalArgumentException ex2 = Assert.assertThrows("Invalid handler expected", IllegalArgumentException.class,
678+
() -> new WaitingQueue<>(null, 1, 3)
679+
);
680+
Assert.assertEquals("WaitingQueue handler must be not null", ex2.getMessage());
671681
}
672682

673683
@Test

0 commit comments

Comments
 (0)