Skip to content

Commit 6e510b3

Browse files
committed
JAVA-2564: Implement server session pooling behavior from specification
1 parent 2e93d41 commit 6e510b3

File tree

4 files changed

+343
-14
lines changed

4 files changed

+343
-14
lines changed

driver/src/main/com/mongodb/Mongo.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public class Mongo {
9999

100100
private final ConcurrentLinkedQueue<ServerCursorAndNamespace> orphanedCursors = new ConcurrentLinkedQueue<ServerCursorAndNamespace>();
101101
private final ExecutorService cursorCleaningService;
102-
private final ServerSessionPool serverSessionPool = new ServerSessionPool();
102+
private final ServerSessionPool serverSessionPool;
103103

104104
/**
105105
* Creates a Mongo instance based on a (single) mongodb node (localhost, default port)
@@ -314,6 +314,7 @@ public Mongo(
314314

315315
Mongo(final Cluster cluster, final MongoClientOptions options, final List<MongoCredential> credentialsList) {
316316
this.cluster = cluster;
317+
this.serverSessionPool = new ServerSessionPool(cluster);
317318
this.options = options;
318319
this.readPreference = options.getReadPreference() != null ? options.getReadPreference() : primary();
319320
this.writeConcern = options.getWriteConcern() != null ? options.getWriteConcern() : WriteConcern.UNACKNOWLEDGED;
@@ -531,8 +532,8 @@ public void dropDatabase(final String dbName) {
531532
* databases obtained from it can no longer be used.
532533
*/
533534
public void close() {
534-
cluster.close();
535535
serverSessionPool.close();
536+
cluster.close();
536537
if (cursorCleaningService != null) {
537538
cursorCleaningService.shutdownNow();
538539
}

driver/src/main/com/mongodb/ServerSessionPool.java

Lines changed: 113 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,43 +16,145 @@
1616

1717
package com.mongodb;
1818

19+
import com.mongodb.connection.Cluster;
20+
import com.mongodb.connection.Connection;
1921
import com.mongodb.internal.connection.ConcurrentPool;
2022
import com.mongodb.internal.connection.ConcurrentPool.Prune;
23+
import com.mongodb.internal.connection.NoOpSessionContext;
24+
import com.mongodb.internal.validator.NoOpFieldNameValidator;
25+
import com.mongodb.selector.ReadPreferenceServerSelector;
26+
import org.bson.BsonArray;
2127
import org.bson.BsonBinary;
2228
import org.bson.BsonDocument;
2329
import org.bson.BsonDocumentWriter;
2430
import org.bson.UuidRepresentation;
31+
import org.bson.codecs.BsonDocumentCodec;
2532
import org.bson.codecs.EncoderContext;
2633
import org.bson.codecs.UuidCodec;
2734

35+
import java.util.ArrayList;
36+
import java.util.List;
2837
import java.util.UUID;
2938

39+
import static com.mongodb.assertions.Assertions.isTrue;
40+
import static java.util.concurrent.TimeUnit.MINUTES;
41+
3042
class ServerSessionPool {
31-
private final ConcurrentPool<ServerSession> serverSessionPool =
32-
new ConcurrentPool<ServerSession>(Integer.MAX_VALUE, new ServerSessionItemFactory());
43+
44+
private static final int END_SESSIONS_BATCH_SIZE = 10000;
45+
46+
private final ConcurrentPool<ServerSessionImpl> serverSessionPool =
47+
new ConcurrentPool<ServerSessionImpl>(Integer.MAX_VALUE, new ServerSessionItemFactory());
48+
private final Cluster cluster;
49+
private final ServerSessionPool.Clock clock;
50+
private volatile boolean closing;
51+
private volatile boolean closed;
52+
private final List<BsonDocument> closedSessionIdentifiers = new ArrayList<BsonDocument>();
53+
54+
interface Clock {
55+
long millis();
56+
}
57+
58+
ServerSessionPool(final Cluster cluster) {
59+
this(cluster, new Clock() {
60+
@Override
61+
public long millis() {
62+
return System.currentTimeMillis();
63+
}
64+
});
65+
}
66+
67+
ServerSessionPool(final Cluster cluster, final Clock clock) {
68+
this.cluster = cluster;
69+
this.clock = clock;
70+
}
3371

3472
ServerSession get() {
35-
return serverSessionPool.get();
73+
isTrue("server session pool is open", !closed);
74+
ServerSessionImpl serverSession = serverSessionPool.get();
75+
while (shouldPrune(serverSession)) {
76+
serverSessionPool.release(serverSession, true);
77+
serverSession = serverSessionPool.get();
78+
}
79+
return serverSession;
3680
}
3781

3882
void release(final ServerSession serverSession) {
39-
serverSessionPool.release(serverSession);
83+
serverSessionPool.release((ServerSessionImpl) serverSession);
4084
}
4185

4286
void close() {
43-
serverSessionPool.close();
87+
try {
88+
closing = true;
89+
serverSessionPool.close();
90+
endClosedSessions();
91+
} finally {
92+
closed = true;
93+
}
94+
}
95+
96+
private void closeSession(final ServerSessionImpl serverSession) {
97+
// only track closed sessions when pool is in the process of closing
98+
if (!closing) {
99+
return;
100+
}
101+
102+
closedSessionIdentifiers.add(serverSession.getIdentifier());
103+
if (closedSessionIdentifiers.size() == END_SESSIONS_BATCH_SIZE) {
104+
endClosedSessions();
105+
}
106+
}
107+
108+
private void endClosedSessions() {
109+
if (closedSessionIdentifiers.isEmpty()) {
110+
return;
111+
}
112+
113+
Connection connection = cluster.selectServer(new ReadPreferenceServerSelector(ReadPreference.primaryPreferred())).getConnection();
114+
try {
115+
connection.command("admin",
116+
new BsonDocument("endSessions", new BsonArray(closedSessionIdentifiers)),
117+
ReadPreference.primaryPreferred(), new NoOpFieldNameValidator(),
118+
new BsonDocumentCodec(), NoOpSessionContext.INSTANCE);
119+
} catch (MongoException e) {
120+
// ignore exceptions
121+
} finally {
122+
closedSessionIdentifiers.clear();
123+
connection.release();
124+
}
44125
}
45126

46-
private static class ServerSessionImpl implements ServerSession {
127+
private boolean shouldPrune(final ServerSessionImpl serverSession) {
128+
Integer logicalSessionTimeoutMinutes = cluster.getDescription().getLogicalSessionTimeoutMinutes();
129+
if (logicalSessionTimeoutMinutes == null) {
130+
return false;
131+
}
132+
long currentTimeMillis = clock.millis();
133+
final long timeSinceLastUse = currentTimeMillis - serverSession.lastUsedAtMillis;
134+
final long oneMinuteFromTimeout = MINUTES.toMillis(logicalSessionTimeoutMinutes - 1);
135+
return timeSinceLastUse > oneMinuteFromTimeout;
136+
}
137+
138+
final class ServerSessionImpl implements ServerSession {
47139
private final BsonDocument identifier;
48140
private int transactionNumber;
141+
private volatile long lastUsedAtMillis = clock.millis();
49142

50143
ServerSessionImpl(final BsonBinary identifier) {
51144
this.identifier = new BsonDocument("id", identifier);
52145
}
53146

147+
long getLastUsedAtMillis() {
148+
return lastUsedAtMillis;
149+
}
150+
151+
int getTransactionNumber() {
152+
return transactionNumber;
153+
}
154+
54155
@Override
55156
public BsonDocument getIdentifier() {
157+
lastUsedAtMillis = clock.millis();
56158
return identifier;
57159
}
58160

@@ -62,19 +164,19 @@ public long advanceTransactionNumber() {
62164
}
63165
}
64166

65-
private static final class ServerSessionItemFactory implements ConcurrentPool.ItemFactory<ServerSession> {
167+
private final class ServerSessionItemFactory implements ConcurrentPool.ItemFactory<ServerSessionImpl> {
66168
@Override
67-
public ServerSession create(final boolean initialize) {
169+
public ServerSessionImpl create(final boolean initialize) {
68170
return new ServerSessionImpl(createNewServerSessionIdentifier());
69171
}
70172

71173
@Override
72-
public void close(final ServerSession serverSession) {
73-
// TODO: pruning
174+
public void close(final ServerSessionImpl serverSession) {
175+
closeSession(serverSession);
74176
}
75177

76178
@Override
77-
public Prune shouldPrune(final ServerSession serverSession) {
179+
public Prune shouldPrune(final ServerSessionImpl serverSession) {
78180
return Prune.STOP;
79181
}
80182

driver/src/test/functional/com/mongodb/MongoClientSessionSpecification.groovy

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,6 @@ class MongoClientSessionSpecification extends FunctionalSpecification {
266266
then:
267267
def pingCommandStartedEvent = commandListener.events.get(0)
268268
!(pingCommandStartedEvent as CommandStartedEvent).command.containsKey('lsid')
269-
270269
cleanup:
271270
client?.close()
272271
}

0 commit comments

Comments
 (0)