Skip to content

Commit 0ba0235

Browse files
committed
Use the electionId reported by ismaster to detect a stale primary by invalidate primaries that have an older electionId than one already seen.
JAVA-1799
1 parent a57392b commit 0ba0235

File tree

11 files changed

+455
-21
lines changed

11 files changed

+455
-21
lines changed

driver-core/src/main/com/mongodb/connection/DescriptionHelper.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.bson.BsonInt32;
2626
import org.bson.BsonString;
2727
import org.bson.BsonValue;
28+
import org.bson.types.ObjectId;
2829

2930
import java.util.ArrayList;
3031
import java.util.Collections;
@@ -77,10 +78,15 @@ static ServerDescription createServerDescription(final ServerAddress serverAddre
7778
new BsonInt32(getDefaultMinWireVersion())).getValue())
7879
.maxWireVersion(isMasterResult.getInt32("maxWireVersion",
7980
new BsonInt32(getDefaultMaxWireVersion())).getValue())
81+
.electionId(getElectionId(isMasterResult))
8082
.roundTripTime(roundTripTime, NANOSECONDS)
8183
.ok(CommandHelper.isCommandOk(isMasterResult)).build();
8284
}
8385

86+
private static ObjectId getElectionId(final BsonDocument isMasterResult) {
87+
return isMasterResult.containsKey("electionId") ? isMasterResult.getObjectId("electionId").getValue() : null;
88+
}
89+
8490
private static int getMaxMessageSizeBytes(final BsonDocument isMasterResult) {
8591
return isMasterResult.getInt32("maxMessageSizeBytes", new BsonInt32(getDefaultMaxMessageSize())).getValue();
8692
}

driver-core/src/main/com/mongodb/connection/MultiServerCluster.java

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.mongodb.diagnostics.logging.Logger;
2121
import com.mongodb.diagnostics.logging.Loggers;
2222
import com.mongodb.event.ClusterListener;
23+
import org.bson.types.ObjectId;
2324

2425
import java.util.ArrayList;
2526
import java.util.HashSet;
@@ -45,6 +46,8 @@ final class MultiServerCluster extends BaseCluster {
4546

4647
private ClusterType clusterType;
4748
private String replicaSetName;
49+
private ObjectId maxElectionId;
50+
4851
private final ConcurrentMap<ServerAddress, ServerTuple> addressToServerTupleMap =
4952
new ConcurrentHashMap<ServerAddress, ServerTuple>();
5053

@@ -123,6 +126,7 @@ private void onChange(final ChangeEvent<ServerDescription> event) {
123126
return;
124127
}
125128

129+
boolean shouldUpdateDescription = true;
126130
synchronized (this) {
127131
ServerDescription newDescription = event.getNewValue();
128132
ServerTuple serverTuple = addressToServerTupleMap.get(newDescription.getAddress());
@@ -140,38 +144,42 @@ private void onChange(final ChangeEvent<ServerDescription> event) {
140144

141145
switch (clusterType) {
142146
case REPLICA_SET:
143-
handleReplicaSetMemberChanged(newDescription);
147+
shouldUpdateDescription = handleReplicaSetMemberChanged(newDescription);
144148
break;
145149
case SHARDED:
146-
handleShardRouterChanged(newDescription);
150+
shouldUpdateDescription = handleShardRouterChanged(newDescription);
147151
break;
148152
case STANDALONE:
149-
handleStandAloneChanged(newDescription);
153+
shouldUpdateDescription = handleStandAloneChanged(newDescription);
150154
break;
151155
default:
152156
break;
153157
}
154158
}
155159

156-
serverTuple.description = newDescription;
157-
updateDescription();
160+
if (shouldUpdateDescription) {
161+
serverTuple.description = newDescription;
162+
updateDescription();
163+
}
164+
}
165+
if (shouldUpdateDescription) {
166+
fireChangeEvent();
158167
}
159-
fireChangeEvent();
160168
}
161169

162-
private void handleReplicaSetMemberChanged(final ServerDescription newDescription) {
170+
private boolean handleReplicaSetMemberChanged(final ServerDescription newDescription) {
163171
if (!newDescription.isReplicaSetMember()) {
164172
LOGGER.error(format("Expecting replica set member, but found a %s. Removing %s from client view of cluster.",
165-
newDescription.getType(), newDescription.getAddress()));
173+
newDescription.getType(), newDescription.getAddress()));
166174
removeServer(newDescription.getAddress());
167-
return;
175+
return true;
168176
}
169177

170178
if (newDescription.getType() == REPLICA_SET_GHOST) {
171179
if (LOGGER.isInfoEnabled()) {
172180
LOGGER.info(format("Server %s does not appear to be a member of an initiated replica set.", newDescription.getAddress()));
173181
}
174-
return;
182+
return true;
175183
}
176184

177185
if (replicaSetName == null) {
@@ -183,39 +191,51 @@ private void handleReplicaSetMemberChanged(final ServerDescription newDescriptio
183191
+ "Removing %s from client view of cluster.",
184192
replicaSetName, newDescription.getSetName(), newDescription.getAddress()));
185193
removeServer(newDescription.getAddress());
186-
return;
194+
return true;
187195
}
188196

189197
ensureServers(newDescription);
190198

191199
if (newDescription.isPrimary()) {
200+
if (newDescription.getElectionId() != null) {
201+
if (maxElectionId != null && maxElectionId.compareTo(newDescription.getElectionId()) > 0) {
202+
addressToServerTupleMap.get(newDescription.getAddress()).server.invalidate();
203+
return false;
204+
}
205+
206+
maxElectionId = newDescription.getElectionId();
207+
}
208+
192209
if (isNotAlreadyPrimary(newDescription.getAddress())) {
193210
LOGGER.info(format("Discovered replica set primary %s", newDescription.getAddress()));
194211
}
195212
invalidateOldPrimaries(newDescription.getAddress());
196213
}
214+
return true;
197215
}
198216

199217
private boolean isNotAlreadyPrimary(final ServerAddress address) {
200218
ServerTuple serverTuple = addressToServerTupleMap.get(address);
201219
return serverTuple == null || !serverTuple.description.isPrimary();
202220
}
203221

204-
private void handleShardRouterChanged(final ServerDescription newDescription) {
222+
private boolean handleShardRouterChanged(final ServerDescription newDescription) {
205223
if (!newDescription.isShardRouter()) {
206224
LOGGER.error(format("Expecting a %s, but found a %s. Removing %s from client view of cluster.",
207225
SHARD_ROUTER, newDescription.getType(), newDescription.getAddress()));
208226
removeServer(newDescription.getAddress());
209227
}
228+
return true;
210229
}
211230

212-
private void handleStandAloneChanged(final ServerDescription newDescription) {
231+
private boolean handleStandAloneChanged(final ServerDescription newDescription) {
213232
if (getSettings().getHosts().size() > 1) {
214233
LOGGER.error(format("Expecting a single %s, but found more than one. Removing %s from client view of cluster.",
215234
STANDALONE, newDescription.getAddress()));
216235
clusterType = UNKNOWN;
217236
removeServer(newDescription.getAddress());
218237
}
238+
return true;
219239
}
220240

221241
private void addServer(final ServerAddress serverAddress) {

driver-core/src/main/com/mongodb/connection/ServerDescription.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.mongodb.ServerAddress;
2020
import com.mongodb.TagSet;
2121
import com.mongodb.annotations.Immutable;
22+
import org.bson.types.ObjectId;
2223

2324
import java.text.DecimalFormat;
2425
import java.util.Collections;
@@ -65,7 +66,9 @@ public class ServerDescription {
6566
private final int minWireVersion;
6667
private final int maxWireVersion;
6768

68-
private Throwable exception;
69+
private final ObjectId electionId;
70+
71+
private final Throwable exception;
6972

7073
/**
7174
* Gets a Builder for creating a new ServerDescription instance.
@@ -95,6 +98,7 @@ public static class Builder {
9598
private ServerVersion version = new ServerVersion();
9699
private int minWireVersion = 0;
97100
private int maxWireVersion = 0;
101+
private ObjectId electionId;
98102
private Throwable exception;
99103

100104
/**
@@ -269,6 +273,17 @@ public Builder maxWireVersion(final int maxWireVersion) {
269273
return this;
270274
}
271275

276+
/**
277+
* Sets the electionId reported by this server.
278+
*
279+
* @param electionId the electionId
280+
* @return this
281+
*/
282+
public Builder electionId(final ObjectId electionId) {
283+
this.electionId = electionId;
284+
return this;
285+
}
286+
272287
/**
273288
* Sets the exception thrown while attempting to determine the server description.
274289
*
@@ -468,6 +483,15 @@ public int getMaxWireVersion() {
468483
return maxWireVersion;
469484
}
470485

486+
/**
487+
* The replica set electionid reported by this MongoDB server.
488+
*
489+
* @return the electionId, which may be null
490+
*/
491+
public ObjectId getElectionId() {
492+
return electionId;
493+
}
494+
471495
/**
472496
* Returns true if the server has the given tags. A server of either type {@code ServerType.STANDALONE} or {@code
473497
* ServerType.SHARD_ROUTER} is considered to have all tags, so this method will always return true for instances of either of those
@@ -620,6 +644,9 @@ public boolean equals(final Object o) {
620644
if (maxWireVersion != that.maxWireVersion) {
621645
return false;
622646
}
647+
if (electionId != null ? !electionId.equals(that.electionId) : that.electionId != null) {
648+
return false;
649+
}
623650

624651
// Compare class equality and message as exceptions rarely override equals
625652
Class<?> thisExceptionClass = exception != null ? exception.getClass() : null;
@@ -648,6 +675,7 @@ public int hashCode() {
648675
result = 31 * result + maxDocumentSize;
649676
result = 31 * result + tagSet.hashCode();
650677
result = 31 * result + (setName != null ? setName.hashCode() : 0);
678+
result = 31 * result + (electionId != null ? electionId.hashCode() : 0);
651679
result = 31 * result + (ok ? 1 : 0);
652680
result = 31 * result + state.hashCode();
653681
result = 31 * result + version.hashCode();
@@ -670,6 +698,7 @@ public String toString() {
670698
+ ", version=" + version
671699
+ ", minWireVersion=" + minWireVersion
672700
+ ", maxWireVersion=" + maxWireVersion
701+
+ ", electionId=" + electionId
673702
+ ", maxDocumentSize=" + maxDocumentSize
674703
+ ", roundTripTimeNanos=" + roundTripTimeNanos
675704
: "")
@@ -740,6 +769,7 @@ private String getRoundTripFormattedInMilliseconds() {
740769
ok = builder.ok;
741770
minWireVersion = builder.minWireVersion;
742771
maxWireVersion = builder.maxWireVersion;
772+
electionId = builder.electionId;
743773
exception = builder.exception;
744774
}
745775
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
{
2+
"description": "New primary with equal electionId",
3+
"phases": [
4+
{
5+
"outcome": {
6+
"maxElectionId": {
7+
"$oid": "000000000000000000000001"
8+
},
9+
"servers": {
10+
"a:27017": {
11+
"electionId": null,
12+
"setName": null,
13+
"type": "Unknown"
14+
},
15+
"b:27017": {
16+
"electionId": {
17+
"$oid": "000000000000000000000001"
18+
},
19+
"setName": "rs",
20+
"type": "RSPrimary"
21+
}
22+
},
23+
"setName": "rs",
24+
"topologyType": "ReplicaSetWithPrimary"
25+
},
26+
"responses": [
27+
[
28+
"a:27017",
29+
{
30+
"electionId": {
31+
"$oid": "000000000000000000000001"
32+
},
33+
"hosts": [
34+
"a:27017",
35+
"b:27017"
36+
],
37+
"ismaster": true,
38+
"ok": 1,
39+
"setName": "rs"
40+
}
41+
],
42+
[
43+
"b:27017",
44+
{
45+
"electionId": {
46+
"$oid": "000000000000000000000001"
47+
},
48+
"hosts": [
49+
"a:27017",
50+
"b:27017"
51+
],
52+
"ismaster": true,
53+
"ok": 1,
54+
"setName": "rs"
55+
}
56+
]
57+
]
58+
}
59+
],
60+
"uri": "mongodb://a/?replicaSet=rs"
61+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
{
2+
"description": "Ignore a secondary's electionId",
3+
"phases": [
4+
{
5+
"outcome": {
6+
"maxElectionId": null,
7+
"servers": {
8+
"a:27017": {
9+
"electionId": {
10+
"$oid": "000000000000000000000001"
11+
},
12+
"setName": "rs",
13+
"type": "RSSecondary"
14+
}
15+
},
16+
"setName": "rs",
17+
"topologyType": "ReplicaSetNoPrimary"
18+
},
19+
"responses": [
20+
[
21+
"a:27017",
22+
{
23+
"electionId": {
24+
"$oid": "000000000000000000000001"
25+
},
26+
"hosts": [
27+
"a:27017"
28+
],
29+
"ismaster": false,
30+
"ok": 1,
31+
"secondary": true,
32+
"setName": "rs"
33+
}
34+
]
35+
]
36+
}
37+
],
38+
"uri": "mongodb://a/?replicaSet=rs"
39+
}

0 commit comments

Comments
 (0)