Skip to content

Commit 2e93d41

Browse files
committed
JAVA-2564: Enhance pruning support in ConcurrentPool so that pruning can be halted
1 parent e5e1b98 commit 2e93d41

File tree

7 files changed

+178
-29
lines changed

7 files changed

+178
-29
lines changed

driver-core/src/main/com/mongodb/connection/DefaultConnectionPool.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import com.mongodb.event.ConnectionPoolWaitQueueExitedEvent;
3737
import com.mongodb.event.ConnectionRemovedEvent;
3838
import com.mongodb.internal.connection.ConcurrentPool;
39+
import com.mongodb.internal.connection.ConcurrentPool.Prune;
3940
import com.mongodb.internal.thread.DaemonThreadFactory;
4041
import org.bson.ByteBuf;
4142
import org.bson.codecs.Decoder;
@@ -551,8 +552,8 @@ private String getReasonForClosing(final UsageTrackingInternalConnection connect
551552
}
552553

553554
@Override
554-
public boolean shouldPrune(final UsageTrackingInternalConnection usageTrackingConnection) {
555-
return DefaultConnectionPool.this.shouldPrune(usageTrackingConnection);
555+
public Prune shouldPrune(final UsageTrackingInternalConnection usageTrackingConnection) {
556+
return DefaultConnectionPool.this.shouldPrune(usageTrackingConnection) ? Prune.YES : Prune.NO;
556557
}
557558
}
558559
}

driver-core/src/main/com/mongodb/internal/connection/ConcurrentLinkedDeque.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -866,11 +866,11 @@ public <T> T[] toArray(T[] a) {
866866
*
867867
* @return an iterator over the elements in this deque in proper sequence
868868
*/
869-
public Iterator<E> iterator() {
869+
public RemovalReportingIterator<E> iterator() {
870870
return new CLDIterator();
871871
}
872872

873-
final class CLDIterator implements Iterator<E> {
873+
final class CLDIterator implements RemovalReportingIterator<E> {
874874
Node<E> last;
875875
Node<E> next = header.forward();
876876

@@ -887,11 +887,19 @@ public E next() {
887887
}
888888

889889
public void remove() {
890+
reportingRemove();
891+
}
892+
893+
@Override
894+
public boolean reportingRemove() {
890895
Node<E> l = last;
891896
if (l == null)
892897
throw new IllegalStateException();
893-
while (!l.delete() && !l.isDeleted())
894-
;
898+
boolean successfullyRemoved = l.delete();
899+
while (!successfullyRemoved && !l.isDeleted()) {
900+
successfullyRemoved = l.delete();
901+
}
902+
return successfullyRemoved;
895903
}
896904
}
897905

@@ -901,4 +909,19 @@ public void remove() {
901909
public Iterator<E> descendingIterator() {
902910
throw new UnsupportedOperationException();
903911
}
912+
913+
public interface RemovalReportingIterator<E> extends Iterator<E> {
914+
/**
915+
* Removes from the underlying collection the last element returned by this iterator and reports whether the current element was
916+
* removed by the call. This method can be called only once per call to {@link #next}.
917+
*
918+
* @return true if the element was successfully removed by this call, false if the element had already been removed by a concurrent
919+
* removal
920+
* @throws IllegalStateException if the {@code next} method has not
921+
* yet been called, or the {@code remove} method has already
922+
* been called after the last call to the {@code next}
923+
* method
924+
*/
925+
boolean reportingRemove();
926+
}
904927
}

driver-core/src/main/com/mongodb/internal/connection/ConcurrentPool.java

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
import com.mongodb.MongoInternalException;
2020
import com.mongodb.MongoInterruptedException;
2121
import com.mongodb.MongoTimeoutException;
22+
import com.mongodb.internal.connection.ConcurrentLinkedDeque.RemovalReportingIterator;
2223

23-
import java.util.Deque;
2424
import java.util.Iterator;
2525
import java.util.concurrent.Semaphore;
2626
import java.util.concurrent.TimeUnit;
@@ -35,10 +35,24 @@ public class ConcurrentPool<T> implements Pool<T> {
3535
private final int maxSize;
3636
private final ItemFactory<T> itemFactory;
3737

38-
private final Deque<T> available = new ConcurrentLinkedDeque<T>();
38+
private final ConcurrentLinkedDeque<T> available = new ConcurrentLinkedDeque<T>();
3939
private final Semaphore permits;
4040
private volatile boolean closed;
4141

42+
public enum Prune {
43+
/**
44+
* Prune this element
45+
*/
46+
YES,
47+
/**
48+
* Don't prone this element
49+
*/
50+
NO,
51+
/**
52+
* Don't prune this element and stop attempting to prune additional elements
53+
*/
54+
STOP
55+
}
4256
/**
4357
* Factory for creating and closing pooled items.
4458
*
@@ -49,7 +63,7 @@ public interface ItemFactory<T> {
4963

5064
void close(T t);
5165

52-
boolean shouldPrune(T t);
66+
Prune shouldPrune(T t);
5367
}
5468

5569
/**
@@ -136,17 +150,20 @@ public T get(final long timeout, final TimeUnit timeUnit) {
136150
}
137151

138152
public void prune() {
139-
int currentAvailableCount = getAvailableCount();
140-
for (int numAttempts = 0; numAttempts < currentAvailableCount; numAttempts++) {
141-
if (!acquirePermit(10, TimeUnit.MILLISECONDS)) {
153+
for (RemovalReportingIterator<T> iter = available.iterator(); iter.hasNext();) {
154+
T cur = iter.next();
155+
Prune shouldPrune = itemFactory.shouldPrune(cur);
156+
157+
if (shouldPrune == Prune.STOP) {
142158
break;
143159
}
144-
T cur = available.pollFirst();
145-
if (cur == null) {
146-
releasePermit();
147-
break;
160+
161+
if (shouldPrune == Prune.YES) {
162+
boolean removed = iter.reportingRemove();
163+
if (removed) {
164+
close(cur);
165+
}
148166
}
149-
release(cur, itemFactory.shouldPrune(cur));
150167
}
151168
}
152169

driver-core/src/main/com/mongodb/internal/connection/PowerOfTwoBufferPool.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.mongodb.internal.connection;
1818

1919
import com.mongodb.connection.BufferProvider;
20+
import com.mongodb.internal.connection.ConcurrentPool.Prune;
2021
import org.bson.ByteBuf;
2122
import org.bson.ByteBufNIO;
2223

@@ -63,8 +64,8 @@ public void close(final ByteBuffer byteBuffer) {
6364
}
6465

6566
@Override
66-
public boolean shouldPrune(final ByteBuffer byteBuffer) {
67-
return false;
67+
public Prune shouldPrune(final ByteBuffer byteBuffer) {
68+
return Prune.STOP;
6869
}
6970
}));
7071
powerOfTwo = powerOfTwo << 1;
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Copyright 2017 MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.connection
18+
19+
import com.mongodb.internal.connection.ConcurrentLinkedDeque
20+
import spock.lang.Specification
21+
22+
// Mostly untested since it was a straight copy from an existing high-quality open source implementation
23+
class ConcurrentLinkedDequeSpecification extends Specification {
24+
def 'should report successful removal from iterator'() {
25+
given:
26+
def deque = new ConcurrentLinkedDeque<Integer>()
27+
deque.add(1)
28+
deque.add(2)
29+
deque.add(3)
30+
def iter = deque.iterator()
31+
32+
when:
33+
def next = iter.next()
34+
def successfullyRemoved = iter.reportingRemove()
35+
36+
then:
37+
next == 1
38+
successfullyRemoved
39+
40+
when:
41+
next = iter.next()
42+
successfullyRemoved = iter.reportingRemove()
43+
44+
then:
45+
next == 2
46+
successfullyRemoved
47+
48+
when:
49+
next = iter.next()
50+
successfullyRemoved = iter.reportingRemove()
51+
52+
then:
53+
next == 3
54+
successfullyRemoved
55+
}
56+
57+
def 'should report unsuccessful removal from iterator'() {
58+
given:
59+
def deque = new ConcurrentLinkedDeque<Integer>()
60+
deque.add(1)
61+
deque.add(2)
62+
deque.add(3)
63+
def iter = deque.iterator()
64+
65+
when:
66+
def next = iter.next()
67+
deque.remove(next)
68+
def successfullyRemoved = iter.reportingRemove()
69+
70+
then:
71+
next == 1
72+
!successfullyRemoved
73+
74+
when:
75+
next = iter.next()
76+
deque.remove(next)
77+
successfullyRemoved = iter.reportingRemove()
78+
79+
then:
80+
next == 2
81+
!successfullyRemoved
82+
83+
when:
84+
next = iter.next()
85+
deque.remove(next)
86+
successfullyRemoved = iter.reportingRemove()
87+
88+
then:
89+
next == 3
90+
!successfullyRemoved
91+
}
92+
93+
}

driver-core/src/test/unit/com/mongodb/internal/connection/ConcurrentPoolTest.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -190,21 +190,34 @@ public void testThatEnsuringMinSizeReleasesPermitIfCreateFails() {
190190

191191
@Test
192192
public void testPrune() {
193-
pool = new ConcurrentPool<TestCloseable>(3, new TestItemFactory());
193+
pool = new ConcurrentPool<TestCloseable>(5, new TestItemFactory());
194194

195195
TestCloseable t1 = pool.get();
196196
TestCloseable t2 = pool.get();
197-
t1.shouldPrune = true;
198-
t2.shouldPrune = true;
197+
TestCloseable t3 = pool.get();
198+
TestCloseable t4 = pool.get();
199+
TestCloseable t5 = pool.get();
200+
t1.shouldPrune = ConcurrentPool.Prune.YES;
201+
t2.shouldPrune = ConcurrentPool.Prune.NO;
202+
t3.shouldPrune = ConcurrentPool.Prune.YES;
203+
t4.shouldPrune = ConcurrentPool.Prune.STOP;
204+
t5.shouldPrune = null;
199205

200206
pool.release(t1);
201207
pool.release(t2);
208+
pool.release(t3);
209+
pool.release(t4);
210+
pool.release(t5);
202211

203212
pool.prune();
204-
assertEquals(0, pool.getAvailableCount());
213+
214+
assertEquals(3, pool.getAvailableCount());
205215
assertEquals(0, pool.getInUseCount());
206216
assertTrue(t1.isClosed());
207-
assertTrue(t2.isClosed());
217+
assertTrue(!t2.isClosed());
218+
assertTrue(t3.isClosed());
219+
assertTrue(!t4.isClosed());
220+
assertTrue(!t5.isClosed());
208221
}
209222

210223
class TestItemFactory implements ConcurrentPool.ItemFactory<TestCloseable> {
@@ -232,14 +245,14 @@ public void close(final TestCloseable closeable) {
232245
}
233246

234247
@Override
235-
public boolean shouldPrune(final TestCloseable testCloseable) {
248+
public ConcurrentPool.Prune shouldPrune(final TestCloseable testCloseable) {
236249
return testCloseable.shouldPrune();
237250
}
238251
}
239252

240253
static class TestCloseable implements Closeable {
241254
private boolean closed;
242-
private boolean shouldPrune;
255+
private ConcurrentPool.Prune shouldPrune;
243256
private final boolean initialized;
244257

245258
TestCloseable(final boolean initialize) {
@@ -259,7 +272,7 @@ public boolean isInitialized() {
259272
return initialized;
260273
}
261274

262-
public boolean shouldPrune() {
275+
public ConcurrentPool.Prune shouldPrune() {
263276
return shouldPrune;
264277
}
265278
}

driver/src/main/com/mongodb/ServerSessionPool.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.mongodb;
1818

1919
import com.mongodb.internal.connection.ConcurrentPool;
20+
import com.mongodb.internal.connection.ConcurrentPool.Prune;
2021
import org.bson.BsonBinary;
2122
import org.bson.BsonDocument;
2223
import org.bson.BsonDocumentWriter;
@@ -73,8 +74,8 @@ public void close(final ServerSession serverSession) {
7374
}
7475

7576
@Override
76-
public boolean shouldPrune(final ServerSession serverSession) {
77-
return false;
77+
public Prune shouldPrune(final ServerSession serverSession) {
78+
return Prune.STOP;
7879
}
7980

8081
private BsonBinary createNewServerSessionIdentifier() {

0 commit comments

Comments
 (0)