Skip to content

Commit 603f11a

Browse files
authored
Make buffer pool shared and support pruning (#887)
* Make buffer pool shared across all MongoClient instances * Enable pruning on the shared buffer pool JAVA-4511 JAVA-4518
1 parent 0bbf441 commit 603f11a

File tree

10 files changed

+119
-49
lines changed

10 files changed

+119
-49
lines changed

driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/AbstractBsonDocumentBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232

3333
public abstract class AbstractBsonDocumentBenchmark<T> extends Benchmark {
3434

35-
protected final PowerOfTwoBufferPool bufferPool = new PowerOfTwoBufferPool();
35+
protected final PowerOfTwoBufferPool bufferPool = PowerOfTwoBufferPool.DEFAULT;
3636
protected final Codec<T> codec;
3737

3838
private final String name;

driver-core/src/main/com/mongodb/connection/AsynchronousSocketChannelStreamFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
* @since 3.0
3232
*/
3333
public class AsynchronousSocketChannelStreamFactory implements StreamFactory {
34-
private final PowerOfTwoBufferPool bufferProvider = new PowerOfTwoBufferPool();
34+
private final PowerOfTwoBufferPool bufferProvider = PowerOfTwoBufferPool.DEFAULT;
3535
private final SocketSettings settings;
3636
private final AsynchronousChannelGroup group;
3737

driver-core/src/main/com/mongodb/connection/SocketStreamFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public class SocketStreamFactory implements StreamFactory {
3838
private final SocketSettings settings;
3939
private final SslSettings sslSettings;
4040
private final SocketFactory socketFactory;
41-
private final BufferProvider bufferProvider = new PowerOfTwoBufferPool();
41+
private final BufferProvider bufferProvider = PowerOfTwoBufferPool.DEFAULT;
4242

4343
/**
4444
* Creates a new factory with the given settings for connecting to servers and the given SSL settings

driver-core/src/main/com/mongodb/connection/TlsChannelStreamFactoryFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public class TlsChannelStreamFactoryFactory implements StreamFactoryFactory, Clo
6363
private final SelectorMonitor selectorMonitor;
6464
private final AsynchronousTlsChannelGroup group;
6565
private final boolean ownsGroup;
66-
private final PowerOfTwoBufferPool bufferPool = new PowerOfTwoBufferPool();
66+
private final PowerOfTwoBufferPool bufferPool = PowerOfTwoBufferPool.DEFAULT;
6767

6868
/**
6969
* Construct a new instance

driver-core/src/main/com/mongodb/internal/connection/PowerOfTwoBufferPool.java

Lines changed: 93 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.mongodb.connection.BufferProvider;
2020
import com.mongodb.internal.connection.ConcurrentPool.Prune;
21+
import com.mongodb.internal.thread.DaemonThreadFactory;
2122
import org.bson.ByteBuf;
2223
import org.bson.ByteBufNIO;
2324

@@ -26,6 +27,9 @@
2627
import java.nio.ByteOrder;
2728
import java.util.HashMap;
2829
import java.util.Map;
30+
import java.util.concurrent.Executors;
31+
import java.util.concurrent.ScheduledExecutorService;
32+
import java.util.concurrent.TimeUnit;
2933

3034
import static com.mongodb.internal.connection.ConcurrentPool.INFINITE_SIZE;
3135

@@ -36,12 +40,61 @@
3640
*/
3741
public class PowerOfTwoBufferPool implements BufferProvider {
3842

39-
private final Map<Integer, ConcurrentPool<ByteBuffer>> powerOfTwoToPoolMap = new HashMap<Integer, ConcurrentPool<ByteBuffer>>();
43+
/**
44+
* The global default pool. Pruning is enabled on this pool. Idle buffers are pruned after one minute.
45+
*/
46+
public static final PowerOfTwoBufferPool DEFAULT = new PowerOfTwoBufferPool().enablePruning();
47+
48+
private static final class IdleTrackingByteBuffer {
49+
private final long lastUsedNanos;
50+
private final ByteBuffer buffer;
51+
52+
private IdleTrackingByteBuffer(final ByteBuffer buffer) {
53+
this.lastUsedNanos = System.nanoTime();
54+
this.buffer = buffer;
55+
}
56+
57+
public long getLastUsedNanos() {
58+
return lastUsedNanos;
59+
}
60+
61+
public ByteBuffer getBuffer() {
62+
return buffer;
63+
}
64+
}
65+
66+
private final class ItemFactory implements ConcurrentPool.ItemFactory<IdleTrackingByteBuffer> {
67+
private final int size;
68+
69+
private ItemFactory(final int size) {
70+
this.size = size;
71+
}
72+
73+
@Override
74+
public IdleTrackingByteBuffer create() {
75+
return new IdleTrackingByteBuffer(createNew(size));
76+
}
77+
78+
@Override
79+
public void close(final IdleTrackingByteBuffer idleTrackingByteBuffer) {
80+
}
81+
82+
@Override
83+
public Prune shouldPrune(final IdleTrackingByteBuffer idleTrackingByteBuffer) {
84+
return System.nanoTime() - idleTrackingByteBuffer.getLastUsedNanos() >= maxIdleTimeNanos
85+
? Prune.YES : Prune.STOP;
86+
}
87+
}
88+
89+
private final Map<Integer, ConcurrentPool<IdleTrackingByteBuffer>> powerOfTwoToPoolMap
90+
= new HashMap<Integer, ConcurrentPool<IdleTrackingByteBuffer>>();
91+
private final long maxIdleTimeNanos;
92+
private final ScheduledExecutorService pruner;
4093

4194
/**
4295
* Construct an instance with a highest power of two of 24.
4396
*/
44-
public PowerOfTwoBufferPool() {
97+
PowerOfTwoBufferPool() {
4598
this(24);
4699
}
47100

@@ -50,28 +103,38 @@ public PowerOfTwoBufferPool() {
50103
*
51104
* @param highestPowerOfTwo the highest power of two buffer size that will be pooled
52105
*/
53-
public PowerOfTwoBufferPool(final int highestPowerOfTwo) {
106+
PowerOfTwoBufferPool(final int highestPowerOfTwo) {
107+
this(highestPowerOfTwo, 1, TimeUnit.MINUTES);
108+
}
109+
110+
/**
111+
* Construct an instance.
112+
*
113+
* @param highestPowerOfTwo the highest power of two buffer size that will be pooled
114+
* @param maxIdleTime max idle time when pruning is enabled
115+
* @param timeUnit time unit of maxIdleTime
116+
*/
117+
PowerOfTwoBufferPool(final int highestPowerOfTwo, final long maxIdleTime, final TimeUnit timeUnit) {
54118
int powerOfTwo = 1;
55119
for (int i = 0; i <= highestPowerOfTwo; i++) {
56-
final int size = powerOfTwo;
57-
powerOfTwoToPoolMap.put(i, new ConcurrentPool<>(INFINITE_SIZE,
58-
new ConcurrentPool.ItemFactory<ByteBuffer>() {
59-
@Override
60-
public ByteBuffer create() {
61-
return createNew(size);
62-
}
63-
64-
@Override
65-
public void close(final ByteBuffer byteBuffer) {
66-
}
67-
68-
@Override
69-
public Prune shouldPrune(final ByteBuffer byteBuffer) {
70-
return Prune.STOP;
71-
}
72-
}));
120+
int size = powerOfTwo;
121+
powerOfTwoToPoolMap.put(i, new ConcurrentPool<>(INFINITE_SIZE, new ItemFactory(size)));
73122
powerOfTwo = powerOfTwo << 1;
74123
}
124+
maxIdleTimeNanos = timeUnit.toNanos(maxIdleTime);
125+
pruner = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("BufferPoolPruner"));
126+
}
127+
128+
/**
129+
* Call this method at most once to enable a background thread that prunes idle buffers from the pool
130+
*/
131+
PowerOfTwoBufferPool enablePruning() {
132+
pruner.scheduleAtFixedRate(this::prune, maxIdleTimeNanos, maxIdleTimeNanos / 2, TimeUnit.NANOSECONDS);
133+
return this;
134+
}
135+
136+
void disablePruning() {
137+
pruner.shutdownNow();
75138
}
76139

77140
@Override
@@ -80,27 +143,32 @@ public ByteBuf getBuffer(final int size) {
80143
}
81144

82145
public ByteBuffer getByteBuffer(final int size) {
83-
ConcurrentPool<ByteBuffer> pool = powerOfTwoToPoolMap.get(log2(roundUpToNextHighestPowerOfTwo(size)));
84-
ByteBuffer byteBuffer = (pool == null) ? createNew(size) : pool.get();
146+
ConcurrentPool<IdleTrackingByteBuffer> pool = powerOfTwoToPoolMap.get(log2(roundUpToNextHighestPowerOfTwo(size)));
147+
ByteBuffer byteBuffer = (pool == null) ? createNew(size) : pool.get().getBuffer();
85148

86149
((Buffer) byteBuffer).clear();
87150
((Buffer) byteBuffer).limit(size);
88151
return byteBuffer;
89152
}
90153

91154
private ByteBuffer createNew(final int size) {
92-
ByteBuffer buf = ByteBuffer.allocate(size); // TODO: configure whether this uses allocateDirect or allocate
155+
ByteBuffer buf = ByteBuffer.allocate(size);
93156
buf.order(ByteOrder.LITTLE_ENDIAN);
94157
return buf;
95158
}
96159

97160
public void release(final ByteBuffer buffer) {
98-
ConcurrentPool<ByteBuffer> pool = powerOfTwoToPoolMap.get(log2(roundUpToNextHighestPowerOfTwo(buffer.capacity())));
161+
ConcurrentPool<IdleTrackingByteBuffer> pool =
162+
powerOfTwoToPoolMap.get(log2(roundUpToNextHighestPowerOfTwo(buffer.capacity())));
99163
if (pool != null) {
100-
pool.release(buffer);
164+
pool.release(new IdleTrackingByteBuffer(buffer));
101165
}
102166
}
103167

168+
private void prune() {
169+
powerOfTwoToPoolMap.values().forEach(ConcurrentPool::prune);
170+
}
171+
104172
static int log2(final int powerOfTwo) {
105173
return 31 - Integer.numberOfLeadingZeros(powerOfTwo);
106174
}

driver-core/src/test/unit/com/mongodb/internal/connection/PowerOfTwoBufferPoolTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.junit.Test;
2222

2323
import java.nio.ByteBuffer;
24+
import java.util.concurrent.TimeUnit;
2425

2526
import static org.junit.Assert.assertEquals;
2627
import static org.junit.Assert.assertNotSame;
@@ -73,4 +74,21 @@ public void testHugeBufferRequest() {
7374
buf.release();
7475
assertNotSame(buf, pool.getBuffer((int) Math.pow(2, 10) + 1));
7576
}
77+
78+
// Racy test
79+
@Test
80+
public void testPruning() throws InterruptedException {
81+
PowerOfTwoBufferPool pool = new PowerOfTwoBufferPool(10, 5, TimeUnit.MILLISECONDS)
82+
.enablePruning();
83+
try {
84+
ByteBuf byteBuf = pool.getBuffer(256);
85+
ByteBuffer wrappedByteBuf = byteBuf.asNIO();
86+
byteBuf.release();
87+
Thread.sleep(50);
88+
ByteBuf newByteBuf = pool.getBuffer(256);
89+
assertNotSame(wrappedByteBuf, newByteBuf.asNIO());
90+
} finally {
91+
pool.disablePruning();
92+
}
93+
}
7694
}

driver-legacy/src/main/com/mongodb/DB.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.mongodb.client.model.DBCreateViewOptions;
2424
import com.mongodb.client.model.ValidationAction;
2525
import com.mongodb.client.model.ValidationLevel;
26-
import com.mongodb.connection.BufferProvider;
2726
import com.mongodb.internal.operation.BatchCursor;
2827
import com.mongodb.internal.operation.CommandReadOperation;
2928
import com.mongodb.internal.operation.CreateCollectionOperation;
@@ -528,10 +527,6 @@ OperationExecutor getExecutor() {
528527
return executor;
529528
}
530529

531-
BufferProvider getBufferPool() {
532-
return getMongoClient().getBufferProvider();
533-
}
534-
535530
private BsonDocument wrap(final DBObject document) {
536531
return new BsonDocumentWrapper<DBObject>(document, commandCodec);
537532
}

driver-legacy/src/main/com/mongodb/DBCollection.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@
2626
import com.mongodb.client.model.DBCollectionFindOptions;
2727
import com.mongodb.client.model.DBCollectionRemoveOptions;
2828
import com.mongodb.client.model.DBCollectionUpdateOptions;
29-
import com.mongodb.connection.BufferProvider;
3029
import com.mongodb.internal.bulk.DeleteRequest;
3130
import com.mongodb.internal.bulk.IndexRequest;
3231
import com.mongodb.internal.bulk.InsertRequest;
3332
import com.mongodb.internal.bulk.UpdateRequest;
3433
import com.mongodb.internal.bulk.WriteRequest.Type;
34+
import com.mongodb.internal.connection.PowerOfTwoBufferPool;
3535
import com.mongodb.internal.operation.AggregateOperation;
3636
import com.mongodb.internal.operation.AggregateToCollectionOperation;
3737
import com.mongodb.internal.operation.BaseWriteOperation;
@@ -1815,7 +1815,7 @@ public synchronized void setDBDecoderFactory(@Nullable final DBDecoderFactory fa
18151815
// If yes then we can use CollectibleDBObjectCodec directly, otherwise it will be wrapped.
18161816
Decoder<DBObject> decoder = (factory == null || factory == DefaultDBDecoder.FACTORY)
18171817
? getDefaultDBObjectCodec()
1818-
: new DBDecoderAdapter(factory.create(), this, getBufferPool());
1818+
: new DBDecoderAdapter(factory.create(), this, PowerOfTwoBufferPool.DEFAULT);
18191819
this.objectCodec = new CompoundDBObjectCodec(objectCodec.getEncoder(), decoder);
18201820
}
18211821

@@ -2150,10 +2150,6 @@ MongoNamespace getNamespace() {
21502150
return new MongoNamespace(getDB().getName(), getName());
21512151
}
21522152

2153-
BufferProvider getBufferPool() {
2154-
return getDB().getBufferPool();
2155-
}
2156-
21572153
@Nullable
21582154
BsonDocument wrapAllowNull(@Nullable final DBObject document) {
21592155
if (document == null) {

driver-legacy/src/main/com/mongodb/DBCursor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.mongodb.client.model.Collation;
2424
import com.mongodb.client.model.DBCollectionCountOptions;
2525
import com.mongodb.client.model.DBCollectionFindOptions;
26+
import com.mongodb.internal.connection.PowerOfTwoBufferPool;
2627
import com.mongodb.internal.operation.FindOperation;
2728
import com.mongodb.lang.Nullable;
2829
import org.bson.BsonString;
@@ -780,7 +781,7 @@ public DBCursor setDecoderFactory(final DBDecoderFactory factory) {
780781
this.decoderFactory = factory;
781782

782783
//Not creating new CompoundDBObjectCodec because we don't care about encoder.
783-
this.decoder = new DBDecoderAdapter(factory.create(), collection, getCollection().getBufferPool());
784+
this.decoder = new DBDecoderAdapter(factory.create(), collection, PowerOfTwoBufferPool.DEFAULT);
784785
return this;
785786
}
786787

driver-legacy/src/main/com/mongodb/MongoClient.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.mongodb.client.MongoIterable;
2424
import com.mongodb.client.internal.MongoClientImpl;
2525
import com.mongodb.client.internal.OperationExecutor;
26-
import com.mongodb.connection.BufferProvider;
2726
import com.mongodb.connection.ClusterConnectionMode;
2827
import com.mongodb.connection.ClusterDescription;
2928
import com.mongodb.connection.ClusterSettings;
@@ -34,7 +33,6 @@
3433
import com.mongodb.internal.binding.SingleServerBinding;
3534
import com.mongodb.internal.connection.Cluster;
3635
import com.mongodb.internal.connection.Connection;
37-
import com.mongodb.internal.connection.PowerOfTwoBufferPool;
3836
import com.mongodb.internal.session.ServerSessionPool;
3937
import com.mongodb.internal.thread.DaemonThreadFactory;
4038
import com.mongodb.lang.Nullable;
@@ -116,8 +114,6 @@ public class MongoClient implements Closeable {
116114

117115
private final MongoClientOptions options;
118116

119-
private final BufferProvider bufferProvider = new PowerOfTwoBufferPool();
120-
121117
private final ConcurrentLinkedQueue<ServerCursorAndNamespace> orphanedCursors = new ConcurrentLinkedQueue<>();
122118
private final ExecutorService cursorCleaningService;
123119
private final MongoClientImpl delegate;
@@ -795,10 +791,6 @@ ServerSessionPool getServerSessionPool() {
795791
return delegate.getServerSessionPool();
796792
}
797793

798-
BufferProvider getBufferProvider() {
799-
return bufferProvider;
800-
}
801-
802794
@Nullable
803795
ExecutorService getCursorCleaningService() {
804796
return cursorCleaningService;

0 commit comments

Comments
 (0)