Skip to content

Commit f399388

Browse files
committed
Use the electionId reported by ismaster to detect a stale primary by invalidate primaries that have an older electionId than one already seen.
JAVA-1799
1 parent 1f8f13e commit f399388

File tree

6 files changed

+111
-17
lines changed

6 files changed

+111
-17
lines changed

src/main/com/mongodb/MultiServerCluster.java

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.mongodb;
1818

19+
import org.bson.types.ObjectId;
20+
1921
import java.net.UnknownHostException;
2022
import java.util.ArrayList;
2123
import java.util.HashSet;
@@ -42,6 +44,7 @@ final class MultiServerCluster extends BaseCluster {
4244

4345
private ClusterType clusterType;
4446
private String replicaSetName;
47+
private ObjectId maxElectionId;
4548
private final ConcurrentMap<ServerAddress, ServerTuple> addressToServerTupleMap =
4649
new ConcurrentHashMap<ServerAddress, ServerTuple>();
4750

@@ -117,6 +120,7 @@ private void onChange(final ChangeEvent<ServerDescription> event) {
117120
return;
118121
}
119122

123+
boolean shouldUpdateDescription = true;
120124
synchronized (this) {
121125
ServerDescription newDescription = event.getNewValue();
122126
ServerTuple serverTuple = addressToServerTupleMap.get(newDescription.getAddress());
@@ -132,36 +136,40 @@ private void onChange(final ChangeEvent<ServerDescription> event) {
132136

133137
switch (clusterType) {
134138
case ReplicaSet:
135-
handleReplicaSetMemberChanged(newDescription);
139+
shouldUpdateDescription = handleReplicaSetMemberChanged(newDescription);
136140
break;
137141
case Sharded:
138-
handleShardRouterChanged(newDescription);
142+
shouldUpdateDescription = handleShardRouterChanged(newDescription);
139143
break;
140144
case StandAlone:
141-
handleStandAloneChanged(newDescription);
145+
shouldUpdateDescription = handleStandAloneChanged(newDescription);
142146
break;
143147
default:
144148
break;
145149
}
146150
}
147151

148-
serverTuple.description = newDescription;
149-
updateDescription();
152+
if (shouldUpdateDescription) {
153+
serverTuple.description = newDescription;
154+
updateDescription();
155+
}
156+
}
157+
if (shouldUpdateDescription) {
158+
fireChangeEvent();
150159
}
151-
fireChangeEvent();
152160
}
153161

154-
private void handleReplicaSetMemberChanged(final ServerDescription newDescription) {
162+
private boolean handleReplicaSetMemberChanged(final ServerDescription newDescription) {
155163
if (!newDescription.isReplicaSetMember()) {
156164
LOGGER.severe(format("Expecting replica set member, but found a %s. Removing %s from client view of cluster.",
157165
newDescription.getType(), newDescription.getAddress()));
158166
removeServer(newDescription.getAddress());
159-
return;
167+
return true;
160168
}
161169

162170
if (newDescription.getType() == ReplicaSetGhost) {
163171
LOGGER.info(format("Server %s does not appear to be a member of an initiated replica set.", newDescription.getAddress()));
164-
return;
172+
return true;
165173
}
166174

167175
if (replicaSetName == null) {
@@ -174,39 +182,51 @@ private void handleReplicaSetMemberChanged(final ServerDescription newDescriptio
174182
replicaSetName, newDescription.getSetName(), newDescription.getAddress()
175183
));
176184
removeServer(newDescription.getAddress());
177-
return;
185+
return true;
178186
}
179187

180188
ensureServers(newDescription);
181189

182190
if (newDescription.isPrimary()) {
191+
if (newDescription.getElectionId() != null) {
192+
if (maxElectionId != null && maxElectionId.compareTo(newDescription.getElectionId()) > 0) {
193+
addressToServerTupleMap.get(newDescription.getAddress()).server.invalidate();
194+
return false;
195+
}
196+
197+
maxElectionId = newDescription.getElectionId();
198+
}
199+
183200
if (isNotAlreadyPrimary(newDescription.getAddress())) {
184201
LOGGER.info(format("Discovered replica set primary %s", newDescription.getAddress()));
185202
}
186203
invalidateOldPrimaries(newDescription.getAddress());
187204
}
205+
return true;
188206
}
189207

190208
private boolean isNotAlreadyPrimary(final ServerAddress address) {
191209
ServerTuple serverTuple = addressToServerTupleMap.get(address);
192210
return serverTuple == null || !serverTuple.description.isPrimary();
193211
}
194212

195-
private void handleShardRouterChanged(final ServerDescription newDescription) {
213+
private boolean handleShardRouterChanged(final ServerDescription newDescription) {
196214
if (newDescription.getClusterType() != Sharded) {
197215
LOGGER.severe(format("Expecting a %s, but found a %s. Removing %s from client view of cluster.",
198216
ShardRouter, newDescription.getType(), newDescription.getAddress()));
199217
removeServer(newDescription.getAddress());
200218
}
219+
return true;
201220
}
202221

203-
private void handleStandAloneChanged(final ServerDescription newDescription) {
222+
private boolean handleStandAloneChanged(final ServerDescription newDescription) {
204223
if (getSettings().getHosts().size() > 1) {
205224
LOGGER.severe(format("Expecting a single %s, but found more than one. Removing %s from client view of cluster.",
206225
ServerType.StandAlone, newDescription.getAddress()));
207226
clusterType = Unknown;
208227
removeServer(newDescription.getAddress());
209228
}
229+
return true;
210230
}
211231

212232
private void addServer(final ServerAddress serverAddress) {

src/main/com/mongodb/ServerDescription.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.mongodb;
1818

19+
import org.bson.types.ObjectId;
1920
import org.bson.util.annotations.Immutable;
2021

2122
import java.text.DecimalFormat;
@@ -71,6 +72,8 @@ class ServerDescription {
7172
private final int minWireVersion;
7273
private final int maxWireVersion;
7374

75+
private final ObjectId electionId;
76+
7477
private Throwable exception;
7578

7679
static class Builder {
@@ -91,6 +94,7 @@ static class Builder {
9194
private ServerVersion version = new ServerVersion();
9295
private int minWireVersion = 0;
9396
private int maxWireVersion = 0;
97+
private ObjectId electionId;
9498
private Throwable exception;
9599

96100
// CHECKSTYLE:OFF
@@ -180,6 +184,17 @@ public Builder maxWireVersion(final int maxWireVersion) {
180184
return this;
181185
}
182186

187+
/**
188+
* Sets the electionId reported by this server.
189+
*
190+
* @param electionId the electionId
191+
* @return this
192+
*/
193+
public Builder electionId(final ObjectId electionId) {
194+
this.electionId = electionId;
195+
return this;
196+
}
197+
183198
public Builder exception(final Throwable exception) {
184199
this.exception = exception;
185200
return this;
@@ -302,6 +317,15 @@ public int getMaxWireVersion() {
302317
return maxWireVersion;
303318
}
304319

320+
/**
321+
* The replica set electionid reported by this MongoDB server.
322+
*
323+
* @return the electionId, which may be null
324+
*/
325+
public ObjectId getElectionId() {
326+
return electionId;
327+
}
328+
305329
/**
306330
* Returns true if the server has the given tags. A server of either type {@code ServerType.StandAlone} or
307331
* {@code ServerType.ShardRouter} is considered to have all tags, so this method will always return true for instances of either of
@@ -420,6 +444,9 @@ public boolean equals(final Object o) {
420444
if (maxWireVersion != that.maxWireVersion) {
421445
return false;
422446
}
447+
if (electionId != null ? !electionId.equals(that.electionId) : that.electionId != null) {
448+
return false;
449+
}
423450

424451
// Compare class equality and message as exceptions rarely override equals
425452
Class thisExceptionClass = exception != null ? exception.getClass() : null;
@@ -456,6 +483,7 @@ public int hashCode() {
456483
result = 31 * result + version.hashCode();
457484
result = 31 * result + minWireVersion;
458485
result = 31 * result + maxWireVersion;
486+
result = 31 * result + (electionId != null ? electionId.hashCode() : 0);
459487
result = 31 * result + (exception == null ? 0 : exception.getClass().hashCode());
460488
result = 31 * result + (exception == null ? 0 : exception.getMessage().hashCode());
461489
return result;
@@ -473,6 +501,7 @@ public String toString() {
473501
+ ", maxDocumentSize=" + maxDocumentSize
474502
+ ", maxMessageSize=" + maxMessageSize
475503
+ ", maxWriteBatchSize=" + maxWriteBatchSize
504+
+ ", electionId=" + electionId
476505
+ ", tagSet=" + tagSet
477506
+ ", setName='" + setName + '\''
478507
+ ", averageLatencyNanos=" + averageLatencyNanos
@@ -534,6 +563,7 @@ private String getAverageLatencyFormattedInMilliseconds() {
534563
ok = builder.ok;
535564
minWireVersion = builder.minWireVersion;
536565
maxWireVersion = builder.maxWireVersion;
566+
electionId = builder.electionId;
537567
exception = builder.exception;
538568
}
539569
}

src/main/com/mongodb/ServerMonitor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ private ServerDescription createDescription(final CommandResult commandResult, f
257257
.setName(commandResult.getString("setName"))
258258
.minWireVersion(commandResult.getInt("minWireVersion", ServerDescription.getDefaultMinWireVersion()))
259259
.maxWireVersion(commandResult.getInt("maxWireVersion", ServerDescription.getDefaultMaxWireVersion()))
260+
.electionId(commandResult.containsKey("electionId") ? commandResult.getObjectId("electionId") : null)
260261
.averageLatency(averageLatencyNanos, TimeUnit.NANOSECONDS)
261262
.ok(commandResult.ok()).build();
262263
}
@@ -319,4 +320,4 @@ private ServerDescription getConnectingServerDescription(final Throwable throwab
319320
.exception(throwable)
320321
.build();
321322
}
322-
}
323+
}

src/test/com/mongodb/MultiServerClusterSpecification.groovy

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.mongodb
1818

19+
import org.bson.types.ObjectId
1920
import spock.lang.Specification
2021

2122
import static com.mongodb.ClusterConnectionMode.Multiple
@@ -259,6 +260,22 @@ class MultiServerClusterSpecification extends Specification {
259260
getClusterDescription(cluster).all == getServerDescriptions(firstServer, secondServer, thirdServer)
260261
}
261262

263+
def 'should invalidate new primary if its electionId is less than the previously reported electionId'() {
264+
given:
265+
def cluster = new MultiServerCluster(CLUSTER_ID, ClusterSettings.builder().hosts([firstServer, secondServer]).build(), factory,
266+
CLUSTER_LISTENER)
267+
sendNotification(firstServer, ReplicaSetPrimary, [firstServer, secondServer, thirdServer], new ObjectId(new Date(1000)))
268+
269+
when:
270+
sendNotification(secondServer, ReplicaSetPrimary, [firstServer, secondServer, thirdServer], new ObjectId(new Date(999)))
271+
272+
then:
273+
getServerDescription(firstServer).state == Connected
274+
getServerDescription(firstServer).type == ReplicaSetPrimary
275+
getServerDescription(secondServer).state == Connecting
276+
getClusterDescription(cluster).all == getServerDescriptions(firstServer, secondServer, thirdServer)
277+
}
278+
262279
def 'should remove a server when a server in the seed list is not in hosts list, it should be removed'() {
263280
given:
264281
def serverAddressAlias = new ServerAddress('alternate')
@@ -465,11 +482,17 @@ class MultiServerClusterSpecification extends Specification {
465482

466483
def sendNotification(ServerAddress serverAddress, ServerType serverType, List<ServerAddress> hosts, List<ServerAddress> passives,
467484
String setName) {
468-
factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, passives, true, setName).build())
485+
factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, passives, true, setName, null)
486+
.build())
487+
}
488+
489+
def sendNotification(ServerAddress serverAddress, ServerType serverType, List<ServerAddress> hosts, ObjectId electionId) {
490+
factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, [], true, 'test', electionId)
491+
.build())
469492
}
470493

471494
def sendNotification(ServerAddress serverAddress, ServerType serverType, List<ServerAddress> hosts, boolean ok) {
472-
factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, [], ok, null).build())
495+
factory.getServer(serverAddress).sendNotification(getBuilder(serverAddress, serverType, hosts, [], ok, null, null).build())
473496
}
474497

475498
def getClusterDescription(MultiServerCluster cluster) {
@@ -485,7 +508,7 @@ class MultiServerClusterSpecification extends Specification {
485508
}
486509

487510
def getBuilder(ServerAddress serverAddress, ServerType serverType, List<ServerAddress> hosts, List<ServerAddress> passives, boolean ok,
488-
String setName) {
511+
String setName, ObjectId electionId) {
489512
ServerDescription.builder()
490513
.address(serverAddress)
491514
.type(serverType)
@@ -494,5 +517,6 @@ class MultiServerClusterSpecification extends Specification {
494517
.hosts(hosts*.toString() as Set)
495518
.passives(passives*.toString() as Set)
496519
.setName(setName)
520+
.electionId(electionId)
497521
}
498522
}

src/test/com/mongodb/ServerDescriptionTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.mongodb;
1818

1919

20+
import org.bson.types.ObjectId;
2021
import org.junit.Test;
2122

2223
import java.net.UnknownHostException;
@@ -85,6 +86,7 @@ public void testDefaults() throws UnknownHostException {
8586
assertEquals(new ServerVersion(), serverDescription.getVersion());
8687
assertEquals(0, serverDescription.getMinWireVersion());
8788
assertEquals(0, serverDescription.getMaxWireVersion());
89+
assertNull(serverDescription.getElectionId());
8890
assertNull(serverDescription.getException());
8991
}
9092

@@ -112,6 +114,7 @@ public void testBuilder() throws UnknownHostException {
112114
.version(new ServerVersion(asList(2, 4, 1)))
113115
.minWireVersion(1)
114116
.maxWireVersion(2)
117+
.electionId(new ObjectId("123412341234123412341234"))
115118
.exception(exception)
116119
.build();
117120

@@ -144,6 +147,7 @@ public void testBuilder() throws UnknownHostException {
144147
assertEquals(new ServerVersion(asList(2, 4, 1)), serverDescription.getVersion());
145148
assertEquals(1, serverDescription.getMinWireVersion());
146149
assertEquals(2, serverDescription.getMaxWireVersion());
150+
assertEquals(new ObjectId("123412341234123412341234"), serverDescription.getElectionId());
147151
assertEquals(exception, serverDescription.getException());
148152
}
149153

@@ -167,6 +171,7 @@ public void testObjectOverrides() throws UnknownHostException {
167171
.version(new ServerVersion(asList(2, 4, 1)))
168172
.minWireVersion(1)
169173
.maxWireVersion(2)
174+
.electionId(new ObjectId())
170175
.exception(new IllegalArgumentException("This is illegal"));
171176
assertEquals(builder.build(), builder.build());
172177
assertEquals(builder.build().hashCode(), builder.build().hashCode());
@@ -389,4 +394,4 @@ public void serverWithMaxWireVersionLessThanDriverMinWireVersionShouldBeIncompat
389394

390395
}
391396

392-
}
397+
}

src/test/com/mongodb/ServerMonitorSpecification.groovy

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
package com.mongodb
2+
23
import spock.lang.IgnoreIf
34

45
import java.util.concurrent.CountDownLatch
@@ -45,6 +46,19 @@ class ServerMonitorSpecification extends FunctionalSpecification {
4546
newDescription.maxWriteBatchSize == expected
4647
}
4748

49+
def 'should set electionId'() {
50+
given:
51+
initializeServerMonitor(new ServerAddress())
52+
CommandResult commandResult = database.command(new BasicDBObject('ismaster', 1))
53+
def expected = commandResult.get('electionId')
54+
55+
when:
56+
latch.await()
57+
58+
then:
59+
newDescription.electionId == expected
60+
}
61+
4862
@IgnoreIf( { serverIsAtLeastVersion(2.6) } )
4963
def 'should set default max wire batch size when not provided by server'() {
5064
given:

0 commit comments

Comments
 (0)