Skip to content

Commit 7aaf94a

Browse files
committed
JAVA-2643: Gossip cluster time via the server monitor
1 parent 624b071 commit 7aaf94a

11 files changed

+135
-75
lines changed
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright 2017 MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.connection;
18+
19+
import org.bson.BsonDocument;
20+
import org.bson.BsonTimestamp;
21+
22+
final class ClusterClockAdvancingSessionContext implements SessionContext {
23+
24+
private final SessionContext wrapped;
25+
private final ClusterClock clusterClock;
26+
27+
ClusterClockAdvancingSessionContext(final SessionContext wrapped, final ClusterClock clusterClock) {
28+
this.wrapped = wrapped;
29+
this.clusterClock = clusterClock;
30+
}
31+
32+
@Override
33+
public boolean hasSession() {
34+
return wrapped.hasSession();
35+
}
36+
37+
@Override
38+
public BsonDocument getSessionId() {
39+
return wrapped.getSessionId();
40+
}
41+
42+
@Override
43+
public boolean isCausallyConsistent() {
44+
return wrapped.isCausallyConsistent();
45+
}
46+
47+
@Override
48+
public long advanceTransactionNumber() {
49+
return wrapped.advanceTransactionNumber();
50+
}
51+
52+
@Override
53+
public BsonTimestamp getOperationTime() {
54+
return wrapped.getOperationTime();
55+
}
56+
57+
@Override
58+
public void advanceOperationTime(final BsonTimestamp operationTime) {
59+
wrapped.advanceOperationTime(operationTime);
60+
}
61+
62+
@Override
63+
public BsonDocument getClusterTime() {
64+
return clusterClock.greaterOf(wrapped.getClusterTime());
65+
}
66+
67+
@Override
68+
public void advanceClusterTime(final BsonDocument clusterTime) {
69+
wrapped.advanceClusterTime(clusterTime);
70+
clusterClock.advance(clusterTime);
71+
}
72+
}

driver-core/src/main/com/mongodb/connection/CommandHelper.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,18 @@
3030

3131
final class CommandHelper {
3232
static BsonDocument executeCommand(final String database, final BsonDocument command, final InternalConnection internalConnection) {
33-
return sendAndReceive(database, command, internalConnection);
33+
return sendAndReceive(database, command, null, internalConnection);
34+
}
35+
36+
static BsonDocument executeCommand(final String database, final BsonDocument command, final ClusterClock clusterClock,
37+
final InternalConnection internalConnection) {
38+
return sendAndReceive(database, command, clusterClock, internalConnection);
3439
}
3540

3641
static BsonDocument executeCommandWithoutCheckingForFailure(final String database, final BsonDocument command,
3742
final InternalConnection internalConnection) {
3843
try {
39-
return sendAndReceive(database, command, internalConnection);
44+
return sendAndReceive(database, command, null, internalConnection);
4045
} catch (MongoServerException e) {
4146
return new BsonDocument();
4247
}
@@ -72,9 +77,11 @@ static boolean isCommandOk(final BsonDocument response) {
7277
}
7378

7479
private static BsonDocument sendAndReceive(final String database, final BsonDocument command,
75-
final InternalConnection internalConnection) {
80+
final ClusterClock clusterClock, final InternalConnection internalConnection) {
81+
SessionContext sessionContext = clusterClock == null ? NoOpSessionContext.INSTANCE
82+
: new ClusterClockAdvancingSessionContext(NoOpSessionContext.INSTANCE, clusterClock);
7683
return internalConnection.sendAndReceive(getCommandMessage(database, command, internalConnection), new BsonDocumentCodec(),
77-
NoOpSessionContext.INSTANCE);
84+
sessionContext);
7885
}
7986

8087
private static CommandMessage getCommandMessage(final String database, final BsonDocument command,

driver-core/src/main/com/mongodb/connection/CommandMessage.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,10 +169,10 @@ private BsonDocument getCommandToEncode() {
169169
private List<BsonElement> getExtraElements(final SessionContext sessionContext) {
170170
List<BsonElement> extraElements = new ArrayList<BsonElement>();
171171
extraElements.add(new BsonElement("$db", new BsonString(new MongoNamespace(getCollectionName()).getDatabaseName())));
172+
if (sessionContext.getClusterTime() != null) {
173+
extraElements.add(new BsonElement("$clusterTime", sessionContext.getClusterTime()));
174+
}
172175
if (sessionContext.hasSession()) {
173-
if (sessionContext.getClusterTime() != null) {
174-
extraElements.add(new BsonElement("$clusterTime", sessionContext.getClusterTime()));
175-
}
176176
extraElements.add(new BsonElement("lsid", sessionContext.getSessionId()));
177177
}
178178
if (!isDefaultReadPreference(getReadPreference())) {

driver-core/src/main/com/mongodb/connection/DefaultClusterableServerFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public ClusterableServer create(final ServerAddress serverAddress, final ServerL
6565
new InternalStreamConnectionFactory(streamFactory, credentialList, applicationName,
6666
mongoDriverInformation, compressorList, commandListener), connectionPoolSettings);
6767
ServerMonitorFactory serverMonitorFactory =
68-
new DefaultServerMonitorFactory(new ServerId(clusterId, serverAddress), serverSettings,
68+
new DefaultServerMonitorFactory(new ServerId(clusterId, serverAddress), serverSettings, clusterClock,
6969
new InternalStreamConnectionFactory(heartbeatStreamFactory, credentialList, applicationName,
7070
mongoDriverInformation, Collections.<MongoCompressor>emptyList(), null), connectionPool);
7171

driver-core/src/main/com/mongodb/connection/DefaultServer.java

Lines changed: 2 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@
3030
import com.mongodb.event.ServerDescriptionChangedEvent;
3131
import com.mongodb.event.ServerListener;
3232
import com.mongodb.event.ServerOpeningEvent;
33-
import org.bson.BsonDocument;
34-
import org.bson.BsonTimestamp;
3533

3634
import static com.mongodb.assertions.Assertions.isTrue;
3735
import static com.mongodb.assertions.Assertions.notNull;
@@ -186,7 +184,7 @@ public void onResult(final T result, final Throwable t) {
186184
public <T> T execute(final CommandProtocol<T> protocol, final InternalConnection connection,
187185
final SessionContext sessionContext) {
188186
try {
189-
protocol.sessionContext(new ClusterClockAdvancingSessionContext(sessionContext));
187+
protocol.sessionContext(new ClusterClockAdvancingSessionContext(sessionContext, clusterClock));
190188
return protocol.execute(connection);
191189
} catch (MongoException e) {
192190
handleThrowable(e);
@@ -197,7 +195,7 @@ public <T> T execute(final CommandProtocol<T> protocol, final InternalConnection
197195
@Override
198196
public <T> void executeAsync(final CommandProtocol<T> protocol, final InternalConnection connection,
199197
final SessionContext sessionContext, final SingleResultCallback<T> callback) {
200-
protocol.sessionContext(new ClusterClockAdvancingSessionContext(sessionContext));
198+
protocol.sessionContext(new ClusterClockAdvancingSessionContext(sessionContext, clusterClock));
201199
protocol.executeAsync(connection, errorHandlingCallback(new SingleResultCallback<T>() {
202200
@Override
203201
public void onResult(final T result, final Throwable t) {
@@ -220,54 +218,4 @@ public void stateChanged(final ChangeEvent<ServerDescription> event) {
220218
serverListener.serverDescriptionChanged(new ServerDescriptionChangedEvent(serverId, description, oldDescription));
221219
}
222220
}
223-
224-
private final class ClusterClockAdvancingSessionContext implements SessionContext {
225-
226-
private SessionContext wrapped;
227-
228-
private ClusterClockAdvancingSessionContext(final SessionContext wrapped) {
229-
this.wrapped = wrapped;
230-
}
231-
232-
@Override
233-
public boolean hasSession() {
234-
return wrapped.hasSession();
235-
}
236-
237-
@Override
238-
public BsonDocument getSessionId() {
239-
return wrapped.getSessionId();
240-
}
241-
242-
@Override
243-
public boolean isCausallyConsistent() {
244-
return wrapped.isCausallyConsistent();
245-
}
246-
247-
@Override
248-
public long advanceTransactionNumber() {
249-
return wrapped.advanceTransactionNumber();
250-
}
251-
252-
@Override
253-
public BsonTimestamp getOperationTime() {
254-
return wrapped.getOperationTime();
255-
}
256-
257-
@Override
258-
public void advanceOperationTime(final BsonTimestamp operationTime) {
259-
wrapped.advanceOperationTime(operationTime);
260-
}
261-
262-
@Override
263-
public BsonDocument getClusterTime() {
264-
return clusterClock.greaterOf(wrapped.getClusterTime());
265-
}
266-
267-
@Override
268-
public void advanceClusterTime(final BsonDocument clusterTime) {
269-
wrapped.advanceClusterTime(clusterTime);
270-
clusterClock.advance(clusterTime);
271-
}
272-
}
273221
}

driver-core/src/main/com/mongodb/connection/DefaultServerMonitor.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.concurrent.locks.Lock;
3232
import java.util.concurrent.locks.ReentrantLock;
3333

34+
import static com.mongodb.assertions.Assertions.notNull;
3435
import static com.mongodb.connection.CommandHelper.executeCommand;
3536
import static com.mongodb.connection.DescriptionHelper.createServerDescription;
3637
import static com.mongodb.connection.ServerConnectionState.CONNECTING;
@@ -47,6 +48,7 @@ class DefaultServerMonitor implements ServerMonitor {
4748

4849
private final ServerId serverId;
4950
private final ServerMonitorListener serverMonitorListener;
51+
private final ClusterClock clusterClock;
5052
private final ChangeListener<ServerDescription> serverStateListener;
5153
private final InternalConnectionFactory internalConnectionFactory;
5254
private final ConnectionPool connectionPool;
@@ -58,13 +60,14 @@ class DefaultServerMonitor implements ServerMonitor {
5860
private volatile boolean isClosed;
5961

6062
DefaultServerMonitor(final ServerId serverId, final ServerSettings serverSettings,
61-
final ChangeListener<ServerDescription> serverStateListener,
63+
final ClusterClock clusterClock, final ChangeListener<ServerDescription> serverStateListener,
6264
final InternalConnectionFactory internalConnectionFactory, final ConnectionPool connectionPool) {
63-
this.serverSettings = serverSettings;
64-
this.serverId = serverId;
65+
this.serverSettings = notNull("serverSettings", serverSettings);
66+
this.serverId = notNull("serverId", serverId);
6567
this.serverMonitorListener = getServerMonitorListener(serverSettings);
68+
this.clusterClock = notNull("clusterClock", clusterClock);
6669
this.serverStateListener = serverStateListener;
67-
this.internalConnectionFactory = internalConnectionFactory;
70+
this.internalConnectionFactory = notNull("internalConnectionFactory", internalConnectionFactory);
6871
this.connectionPool = connectionPool;
6972
monitor = new ServerMonitorRunnable();
7073
monitorThread = new Thread(monitor, "cluster-" + this.serverId.getClusterId() + "-" + this.serverId.getAddress());
@@ -170,7 +173,8 @@ private ServerDescription lookupServerDescription(final InternalConnection conne
170173

171174
long start = System.nanoTime();
172175
try {
173-
BsonDocument isMasterResult = executeCommand("admin", new BsonDocument("ismaster", new BsonInt32(1)), connection);
176+
BsonDocument isMasterResult =
177+
executeCommand("admin", new BsonDocument("ismaster", new BsonInt32(1)), clusterClock, connection);
174178
long elapsedTimeNanos = System.nanoTime() - start;
175179
averageRoundTripTime.addSample(elapsedTimeNanos);
176180

driver-core/src/main/com/mongodb/connection/DefaultServerMonitorFactory.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,22 @@
2121
class DefaultServerMonitorFactory implements ServerMonitorFactory {
2222
private final ServerId serverId;
2323
private final ServerSettings settings;
24+
private final ClusterClock clusterClock;
2425
private final InternalConnectionFactory internalConnectionFactory;
2526
private final ConnectionPool connectionPool;
2627

2728
DefaultServerMonitorFactory(final ServerId serverId, final ServerSettings settings,
28-
final InternalConnectionFactory internalConnectionFactory,
29+
final ClusterClock clusterClock, final InternalConnectionFactory internalConnectionFactory,
2930
final ConnectionPool connectionPool) {
3031
this.serverId = notNull("serverId", serverId);
3132
this.settings = notNull("settings", settings);
33+
this.clusterClock = notNull("clusterClock", clusterClock);
3234
this.internalConnectionFactory = notNull("internalConnectionFactory", internalConnectionFactory);
3335
this.connectionPool = notNull("connectionPool", connectionPool);
3436
}
3537

3638
@Override
3739
public ServerMonitor create(final ChangeListener<ServerDescription> serverStateListener) {
38-
return new DefaultServerMonitor(serverId, settings, serverStateListener, internalConnectionFactory, connectionPool);
40+
return new DefaultServerMonitor(serverId, settings, clusterClock, serverStateListener, internalConnectionFactory, connectionPool);
3941
}
4042
}

driver-core/src/test/functional/com/mongodb/connection/CommandHelperSpecification.groovy

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package com.mongodb.connection
33
import category.Async
44
import com.mongodb.MongoCommandException
55
import com.mongodb.MongoException
6+
import com.mongodb.ServerAddress
67
import com.mongodb.connection.netty.NettyStreamFactory
78
import org.bson.BsonDocument
89
import org.bson.BsonInt32
10+
import org.bson.BsonTimestamp
911
import org.junit.experimental.categories.Category
1012
import spock.lang.Specification
1113

@@ -14,6 +16,7 @@ import java.util.concurrent.CountDownLatch
1416
import static com.mongodb.ClusterFixture.getCredentialList
1517
import static com.mongodb.ClusterFixture.getPrimary
1618
import static com.mongodb.ClusterFixture.getSslSettings
19+
import static com.mongodb.connection.CommandHelper.executeCommand
1720
import static com.mongodb.connection.CommandHelper.executeCommandAsync
1821

1922
class CommandHelperSpecification extends Specification {
@@ -25,10 +28,27 @@ class CommandHelperSpecification extends Specification {
2528
.create(new ServerId(new ClusterId(), getPrimary()))
2629
connection.open()
2730
}
31+
2832
def cleanup() {
2933
connection?.close()
3034
}
3135

36+
def 'should gossip cluster time'() {
37+
given:
38+
def connection = Mock(InternalStreamConnection) {
39+
getDescription() >> new ConnectionDescription(new ConnectionId(new ServerId(new ClusterId(), new ServerAddress())),
40+
new ServerVersion(3, 6), ServerType.REPLICA_SET_PRIMARY, 1000, 1000, 1000)
41+
}
42+
def clusterClock = new ClusterClock()
43+
clusterClock.advance(new BsonDocument('clusterTime', new BsonTimestamp(42L)))
44+
45+
when:
46+
executeCommand('admin', new BsonDocument('ismaster', new BsonInt32(1)), clusterClock, connection)
47+
48+
then:
49+
1 * connection.sendAndReceive(_, _, ) { it instanceof ClusterClockAdvancingSessionContext }
50+
}
51+
3252
@Category(Async)
3353
def 'should execute command asynchronously'() {
3454
when:

driver-core/src/test/functional/com/mongodb/connection/ServerMonitorSpecification.groovy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ class ServerMonitorSpecification extends OperationFunctionalSpecification {
207207

208208
def initializeServerMonitor(ServerAddress address) {
209209
serverMonitor = new DefaultServerMonitor(new ServerId(new ClusterId(), address), ServerSettings.builder().build(),
210+
new ClusterClock(),
210211
new ChangeListener<ServerDescription>() {
211212
@Override
212213
void stateChanged(final ChangeEvent<ServerDescription> event) {

driver-core/src/test/unit/com/mongodb/connection/CommandMessageSpecification.groovy

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,10 @@ class CommandMessageSpecification extends Specification {
5757
def expectedCommandDocument = command.clone()
5858
.append('$db', new BsonString('db'))
5959

60+
if (sessionContext.clusterTime != null) {
61+
expectedCommandDocument.append('$clusterTime', sessionContext.clusterTime)
62+
}
6063
if (sessionContext.hasSession()) {
61-
if (sessionContext.clusterTime != null) {
62-
expectedCommandDocument.append('$clusterTime', sessionContext.clusterTime)
63-
}
6464
expectedCommandDocument.append('lsid', sessionContext.sessionId)
6565
}
6666

@@ -75,6 +75,12 @@ class CommandMessageSpecification extends Specification {
7575
[
7676
Stub(SessionContext) {
7777
hasSession() >> false
78+
getClusterTime() >> null
79+
getSessionId() >> new BsonDocument('id', new BsonBinary([1, 2, 3] as byte[]))
80+
},
81+
Stub(SessionContext) {
82+
hasSession() >> false
83+
getClusterTime() >> new BsonDocument('clusterTime', new BsonTimestamp(42, 1))
7884
},
7985
Stub(SessionContext) {
8086
hasSession() >> true

0 commit comments

Comments
 (0)