|
34 | 34 | import com.mongodb.connection.ServerDescription;
|
35 | 35 | import com.mongodb.connection.SocketStreamFactory;
|
36 | 36 | import com.mongodb.event.ClusterListener;
|
37 |
| -import com.mongodb.internal.connection.ConcurrentPool; |
38 | 37 | import com.mongodb.internal.connection.PowerOfTwoBufferPool;
|
39 | 38 | import com.mongodb.internal.thread.DaemonThreadFactory;
|
40 | 39 | import com.mongodb.operation.BatchCursor;
|
|
44 | 43 | import com.mongodb.operation.ReadOperation;
|
45 | 44 | import com.mongodb.operation.WriteOperation;
|
46 | 45 | import com.mongodb.selector.LatencyMinimizingServerSelector;
|
47 |
| -import org.bson.BsonBinary; |
48 | 46 | import org.bson.BsonBoolean;
|
49 | 47 | import org.bson.BsonDocument;
|
50 |
| -import org.bson.BsonDocumentWriter; |
51 | 48 | import org.bson.BsonTimestamp;
|
52 |
| -import org.bson.UuidRepresentation; |
53 |
| -import org.bson.codecs.EncoderContext; |
54 |
| -import org.bson.codecs.UuidCodec; |
55 | 49 |
|
56 | 50 | import java.util.ArrayList;
|
57 | 51 | import java.util.Collection;
|
58 | 52 | import java.util.Collections;
|
59 | 53 | import java.util.List;
|
60 |
| -import java.util.UUID; |
61 | 54 | import java.util.concurrent.ConcurrentHashMap;
|
62 | 55 | import java.util.concurrent.ConcurrentLinkedQueue;
|
63 | 56 | import java.util.concurrent.ConcurrentMap;
|
@@ -106,6 +99,7 @@ public class Mongo {
|
106 | 99 |
|
107 | 100 | private final ConcurrentLinkedQueue<ServerCursorAndNamespace> orphanedCursors = new ConcurrentLinkedQueue<ServerCursorAndNamespace>();
|
108 | 101 | private final ExecutorService cursorCleaningService;
|
| 102 | + private final ServerSessionPool serverSessionPool = new ServerSessionPool(); |
109 | 103 |
|
110 | 104 | /**
|
111 | 105 | * Creates a Mongo instance based on a (single) mongodb node (localhost, default port)
|
@@ -538,6 +532,7 @@ public void dropDatabase(final String dbName) {
|
538 | 532 | */
|
539 | 533 | public void close() {
|
540 | 534 | cluster.close();
|
| 535 | + serverSessionPool.close(); |
541 | 536 | if (cursorCleaningService != null) {
|
542 | 537 | cursorCleaningService.shutdownNow();
|
543 | 538 | }
|
@@ -960,28 +955,6 @@ public void close() {
|
960 | 955 | }
|
961 | 956 | }
|
962 | 957 |
|
963 |
| - private final ConcurrentPool<ServerSession> serverSessionPool = |
964 |
| - new ConcurrentPool<ServerSession>(Integer.MAX_VALUE, new ServerSessionItemFactory()); |
965 |
| - |
966 |
| - private static class ServerSessionImpl implements ServerSession { |
967 |
| - private final BsonDocument identifier; |
968 |
| - private int transactionNumber; |
969 |
| - |
970 |
| - ServerSessionImpl(final BsonBinary identifier) { |
971 |
| - this.identifier = new BsonDocument("id", identifier); |
972 |
| - } |
973 |
| - |
974 |
| - @Override |
975 |
| - public BsonDocument getIdentifier() { |
976 |
| - return identifier; |
977 |
| - } |
978 |
| - |
979 |
| - @Override |
980 |
| - public long advanceTransactionNumber() { |
981 |
| - return transactionNumber++; |
982 |
| - } |
983 |
| - } |
984 |
| - |
985 | 958 | private ExecutorService createCursorCleaningService() {
|
986 | 959 | ScheduledExecutorService newTimer = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("CleanCursors"));
|
987 | 960 | newTimer.scheduleAtFixedRate(new Runnable() {
|
@@ -1033,35 +1006,6 @@ private static class ServerCursorAndNamespace {
|
1033 | 1006 | }
|
1034 | 1007 | }
|
1035 | 1008 |
|
1036 |
| - |
1037 |
| - private static final class ServerSessionItemFactory implements ConcurrentPool.ItemFactory<ServerSession> { |
1038 |
| - @Override |
1039 |
| - public ServerSession create(final boolean initialize) { |
1040 |
| - return new ServerSessionImpl(createNewServerSessionIdentifier()); |
1041 |
| - } |
1042 |
| - |
1043 |
| - @Override |
1044 |
| - public void close(final ServerSession serverSession) { |
1045 |
| - // TODO: pruning |
1046 |
| - } |
1047 |
| - |
1048 |
| - @Override |
1049 |
| - public boolean shouldPrune(final ServerSession serverSession) { |
1050 |
| - return false; |
1051 |
| - } |
1052 |
| - |
1053 |
| - private BsonBinary createNewServerSessionIdentifier() { |
1054 |
| - UuidCodec uuidCodec = new UuidCodec(UuidRepresentation.STANDARD); |
1055 |
| - BsonDocument holder = new BsonDocument(); |
1056 |
| - BsonDocumentWriter bsonDocumentWriter = new BsonDocumentWriter(holder); |
1057 |
| - bsonDocumentWriter.writeStartDocument(); |
1058 |
| - bsonDocumentWriter.writeName("id"); |
1059 |
| - uuidCodec.encode(bsonDocumentWriter, UUID.randomUUID(), EncoderContext.builder().build()); |
1060 |
| - bsonDocumentWriter.writeEndDocument(); |
1061 |
| - return holder.getBinary("id"); |
1062 |
| - } |
1063 |
| - } |
1064 |
| - |
1065 | 1009 | /**
|
1066 | 1010 | * Mongo.Holder can be used as a static place to hold several instances of Mongo. Security is not enforced at this level, and needs to
|
1067 | 1011 | * be done on the application side.
|
|
0 commit comments