Skip to content

Commit 709878e

Browse files
committed
JAVA-2377: Update max staleness implementation to support the latest specification
Rename maxStalenessMS to maxStalenessSeconds in the connection string Add idleWritePeriodMillis to minimum max staleness check Add new tests from the specification
1 parent 99ae9d1 commit 709878e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+639
-68
lines changed

driver-core/src/main/com/mongodb/ConnectionString.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -155,10 +155,10 @@
155155
* <li>Order matters when using multiple readPreferenceTags.</li>
156156
* </ul>
157157
* </li>
158-
* <li>{@code maxStalenessMS=ms}. The maximum staleness in milliseconds. For use with any non-primary read preference, the driver estimates
159-
* the staleness of each secondary, based on lastWriteDate values provided in server isMaster responses, and selects only those secondaries
160-
* whose staleness is less than or equal to maxStalenessMS. Not providing the parameter or explicitly setting it to -1 indicates that
161-
* there should be no max staleness check.
158+
* <li>{@code maxStalenessSeconds=seconds}. The maximum staleness in seconds. For use with any non-primary read preference, the driver
159+
* estimates the staleness of each secondary, based on lastWriteDate values provided in server isMaster responses, and selects only those
160+
* secondaries whose staleness is less than or equal to maxStalenessSeconds. Not providing the parameter or explicitly setting it to -1
161+
* indicates that there should be no max staleness check.
162162
* </li>
163163
* </ul>
164164
* <p>Authentication configuration:</p>
@@ -340,7 +340,7 @@ public ConnectionString(final String connectionString) {
340340

341341
READ_PREFERENCE_KEYS.add("readpreference");
342342
READ_PREFERENCE_KEYS.add("readpreferencetags");
343-
READ_PREFERENCE_KEYS.add("maxstalenessms");
343+
READ_PREFERENCE_KEYS.add("maxstalenessseconds");
344344

345345
WRITE_CONCERN_KEYS.add("safe");
346346
WRITE_CONCERN_KEYS.add("w");
@@ -448,7 +448,7 @@ private WriteConcern createWriteConcern(final Map<String, List<String>> optionsM
448448
private ReadPreference createReadPreference(final Map<String, List<String>> optionsMap) {
449449
String readPreferenceType = null;
450450
List<TagSet> tagSetList = new ArrayList<TagSet>();
451-
long maxStalenessMS = -1;
451+
double maxStalenessSeconds = -1;
452452

453453
for (final String key : READ_PREFERENCE_KEYS) {
454454
String value = getLastValue(optionsMap, key);
@@ -458,16 +458,16 @@ private ReadPreference createReadPreference(final Map<String, List<String>> opti
458458

459459
if (key.equals("readpreference")) {
460460
readPreferenceType = value;
461-
} else if (key.equals("maxstalenessms")) {
462-
maxStalenessMS = Integer.parseInt(value);
461+
} else if (key.equals("maxstalenessseconds")) {
462+
maxStalenessSeconds = Double.parseDouble(value);
463463
} else if (key.equals("readpreferencetags")) {
464464
for (final String cur : optionsMap.get(key)) {
465465
TagSet tagSet = getTags(cur.trim());
466466
tagSetList.add(tagSet);
467467
}
468468
}
469469
}
470-
return buildReadPreference(readPreferenceType, tagSetList, maxStalenessMS);
470+
return buildReadPreference(readPreferenceType, tagSetList, maxStalenessSeconds);
471471
}
472472

473473
private MongoCredential createCredentials(final Map<String, List<String>> optionsMap, final String userName,
@@ -601,16 +601,17 @@ private Map<String, List<String>> parseOptions(final String optionsPart) {
601601
}
602602

603603
private ReadPreference buildReadPreference(final String readPreferenceType,
604-
final List<TagSet> tagSetList, final long maxStalenessMS) {
604+
final List<TagSet> tagSetList, final double maxStalenessSeconds) {
605605
if (readPreferenceType != null) {
606-
if (tagSetList.isEmpty() && maxStalenessMS == -1) {
606+
if (tagSetList.isEmpty() && maxStalenessSeconds == -1) {
607607
return ReadPreference.valueOf(readPreferenceType);
608-
} else if (maxStalenessMS == -1) {
608+
} else if (maxStalenessSeconds == -1) {
609609
return ReadPreference.valueOf(readPreferenceType, tagSetList);
610610
} else {
611-
return ReadPreference.valueOf(readPreferenceType, tagSetList, maxStalenessMS, TimeUnit.MILLISECONDS);
611+
return ReadPreference.valueOf(readPreferenceType, tagSetList,
612+
Math.round(maxStalenessSeconds * 1000), TimeUnit.MILLISECONDS);
612613
}
613-
} else if (!(tagSetList.isEmpty() && maxStalenessMS == -1)) {
614+
} else if (!(tagSetList.isEmpty() && maxStalenessSeconds == -1)) {
614615
throw new IllegalArgumentException("Read preference mode must be specified if "
615616
+ "either read preference tags or max staleness is specified");
616617
}

driver-core/src/main/com/mongodb/TaggableReadPreference.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@
2020
import com.mongodb.connection.ClusterDescription;
2121
import com.mongodb.connection.ClusterType;
2222
import com.mongodb.connection.ServerDescription;
23+
import com.mongodb.connection.ServerType;
2324
import org.bson.BsonArray;
2425
import org.bson.BsonDocument;
25-
import org.bson.BsonInt64;
26+
import org.bson.BsonDouble;
2627
import org.bson.BsonString;
2728

2829
import java.util.ArrayList;
@@ -32,7 +33,9 @@
3233

3334
import static com.mongodb.assertions.Assertions.isTrueArgument;
3435
import static com.mongodb.assertions.Assertions.notNull;
36+
import static java.lang.String.format;
3537
import static java.util.concurrent.TimeUnit.MILLISECONDS;
38+
import static java.util.concurrent.TimeUnit.NANOSECONDS;
3639

3740
/**
3841
* Abstract class for all preference which can be combined with tags
@@ -70,7 +73,7 @@ public BsonDocument toDocument() {
7073
}
7174

7275
if (maxStalenessMS != null) {
73-
readPrefObject.put("maxStalenessMS", new BsonInt64(maxStalenessMS));
76+
readPrefObject.put("maxStalenessSeconds", new BsonDouble(maxStalenessMS / 1000.0));
7477
}
7578
return readPrefObject;
7679
}
@@ -175,8 +178,12 @@ protected List<ServerDescription> selectFreshServers(final ClusterDescription cl
175178

176179
long heartbeatFrequencyMS = clusterDescription.getServerSettings().getHeartbeatFrequency(MILLISECONDS);
177180

178-
if (getMaxStaleness(MILLISECONDS) < 2 * heartbeatFrequencyMS) {
179-
throw new MongoConfigurationException("Max staleness must be at least twice the heartbeat frequency");
181+
ServerDescription mostUpToDateServerDescription = getMostUpToDateServerDescription(clusterDescription);
182+
if (mostUpToDateServerDescription != null
183+
&& getMaxStaleness(MILLISECONDS) < heartbeatFrequencyMS + mostUpToDateServerDescription.getIdleWritePeriodMillis()) {
184+
throw new MongoConfigurationException(format("Max staleness (%d ms) must be at least the heartbeat period (%d ms) "
185+
+ "plus the idle write period (%d ms)",
186+
getMaxStaleness(MILLISECONDS), heartbeatFrequencyMS, mostUpToDateServerDescription.getIdleWritePeriodMillis()));
180187
}
181188

182189
List<ServerDescription> freshServers = new ArrayList<ServerDescription>(servers.size());
@@ -193,7 +200,7 @@ protected List<ServerDescription> selectFreshServers(final ClusterDescription cl
193200
}
194201
}
195202
}
196-
} else {
203+
} else {
197204
ServerDescription mostUpdateToDateSecondary = findMostUpToDateSecondary(clusterDescription);
198205
for (ServerDescription cur : servers) {
199206
if (mostUpdateToDateSecondary.getLastWriteDate().getTime() - cur.getLastWriteDate().getTime() + heartbeatFrequencyMS
@@ -206,6 +213,22 @@ protected List<ServerDescription> selectFreshServers(final ClusterDescription cl
206213
return freshServers;
207214
}
208215

216+
private ServerDescription getMostUpToDateServerDescription(final ClusterDescription clusterDescription) {
217+
ServerDescription mostUpToDateServerDescription = null;
218+
for (ServerDescription cur : clusterDescription.getServerDescriptions()) {
219+
if (cur.getType() == ServerType.REPLICA_SET_PRIMARY) {
220+
mostUpToDateServerDescription = cur;
221+
break;
222+
} else if (cur.getType() == ServerType.REPLICA_SET_SECONDARY) {
223+
if (mostUpToDateServerDescription == null
224+
|| cur.getLastUpdateTime(NANOSECONDS) > mostUpToDateServerDescription.getLastUpdateTime(NANOSECONDS)) {
225+
mostUpToDateServerDescription = cur;
226+
}
227+
}
228+
}
229+
return mostUpToDateServerDescription;
230+
}
231+
209232
private long getStalenessOfSecondaryRelativeToPrimary(final ServerDescription primary, final ServerDescription serverDescription,
210233
final long heartbeatFrequencyMS) {
211234
return primary.getLastWriteDate().getTime()
@@ -237,7 +260,7 @@ private ServerDescription findMostUpToDateSecondary(final ClusterDescription clu
237260

238261
private boolean serversAreAllThreeDotFour(final ClusterDescription clusterDescription) {
239262
for (ServerDescription cur : clusterDescription.getServerDescriptions()) {
240-
if (cur.getMaxWireVersion() < 5) {
263+
if (cur.isOk() && cur.getMaxWireVersion() < 5) {
241264
return false;
242265
}
243266
}

driver-core/src/main/com/mongodb/connection/DescriptionHelper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.bson.BsonBoolean;
2424
import org.bson.BsonDocument;
2525
import org.bson.BsonInt32;
26+
import org.bson.BsonInt64;
2627
import org.bson.BsonString;
2728
import org.bson.BsonValue;
2829
import org.bson.types.ObjectId;
@@ -38,6 +39,7 @@
3839
import static com.mongodb.connection.ConnectionDescription.getDefaultMaxMessageSize;
3940
import static com.mongodb.connection.ConnectionDescription.getDefaultMaxWriteBatchSize;
4041
import static com.mongodb.connection.ServerConnectionState.CONNECTED;
42+
import static com.mongodb.connection.ServerDescription.getDefaultIdleWritePeriodMillis;
4143
import static com.mongodb.connection.ServerDescription.getDefaultMaxDocumentSize;
4244
import static com.mongodb.connection.ServerDescription.getDefaultMaxWireVersion;
4345
import static com.mongodb.connection.ServerDescription.getDefaultMinWireVersion;
@@ -85,6 +87,8 @@ static ServerDescription createServerDescription(final ServerAddress serverAddre
8587
.setVersion(getSetVersion(isMasterResult))
8688
.lastWriteDate(getLastWriteDate(isMasterResult))
8789
.roundTripTime(roundTripTime, NANOSECONDS)
90+
.idleWritePeriodMillis(isMasterResult.getNumber("idleWritePeriodMillis",
91+
new BsonInt64(getDefaultIdleWritePeriodMillis())).longValue())
8892
.ok(CommandHelper.isCommandOk(isMasterResult)).build();
8993
}
9094

driver-core/src/main/com/mongodb/connection/ServerDescription.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class ServerDescription {
4949
static final int MAX_DRIVER_WIRE_VERSION = 3;
5050

5151
private static final int DEFAULT_MAX_DOCUMENT_SIZE = 0x1000000; // 16MB
52+
static final int DEFAULT_IDLE_WRITE_PERIOD_MILLIS = 10000; // 10 seconds
5253

5354
private final ServerAddress address;
5455

@@ -73,6 +74,7 @@ public class ServerDescription {
7374
private final Integer setVersion;
7475
private final Date lastWriteDate;
7576
private final long lastUpdateTimeNanos;
77+
private final long idleWritePeriodMillis;
7678

7779
private final Throwable exception;
7880

@@ -120,6 +122,8 @@ public static class Builder {
120122
private Integer setVersion;
121123
private Date lastWriteDate;
122124
private long lastUpdateTimeNanos = Time.nanoTime();
125+
private long idleWritePeriodMillis = DEFAULT_IDLE_WRITE_PERIOD_MILLIS;
126+
123127
private Throwable exception;
124128

125129
/**
@@ -357,6 +361,18 @@ public Builder lastUpdateTimeNanos(final long lastUpdateTimeNanos) {
357361
return this;
358362
}
359363

364+
/**
365+
* Sets the idle write period in milliseconds for this description. The default 10,000 (10 seconds).
366+
*
367+
* @param idleWritePeriodMillis the idle write period in milliseconds
368+
* @return this
369+
*
370+
* @since 3.4
371+
*/
372+
public Builder idleWritePeriodMillis(final long idleWritePeriodMillis) {
373+
this.idleWritePeriodMillis = idleWritePeriodMillis;
374+
return this;
375+
}
360376
/**
361377
* Sets the exception thrown while attempting to determine the server description.
362378
*
@@ -427,6 +443,17 @@ public static int getDefaultMaxWireVersion() {
427443
return 0;
428444
}
429445

446+
/**
447+
* Get the default idle write period in milliseconds
448+
*
449+
* @return the default idle write period in milliseconds
450+
*
451+
* @since 3.4
452+
*/
453+
public static int getDefaultIdleWritePeriodMillis() {
454+
return DEFAULT_IDLE_WRITE_PERIOD_MILLIS;
455+
}
456+
430457
/**
431458
* Gets the address of this server
432459
*
@@ -679,6 +706,16 @@ public long getRoundTripTimeNanos() {
679706
return roundTripTimeNanos;
680707
}
681708

709+
/**
710+
* Gets the idle write period in milliseconds for this description. The default 10,000 (10 seconds).
711+
*
712+
* @return the idle write period in milliseconds
713+
* @since 3.4
714+
*/
715+
public long getIdleWritePeriodMillis() {
716+
return idleWritePeriodMillis;
717+
}
718+
682719
/**
683720
* Gets the exception thrown while attempting to determine the server description. This is useful for diagnostic purposed when
684721
* determining the root cause of a connectivity failure.
@@ -765,6 +802,10 @@ public boolean equals(final Object o) {
765802
return false;
766803
}
767804

805+
if (idleWritePeriodMillis != that.idleWritePeriodMillis) {
806+
return false;
807+
}
808+
768809
// Compare class equality and message as exceptions rarely override equals
769810
Class<?> thisExceptionClass = exception != null ? exception.getClass() : null;
770811
Class<?> thatExceptionClass = that.exception != null ? that.exception.getClass() : null;
@@ -797,6 +838,7 @@ public int hashCode() {
797838
result = 31 * result + (setVersion != null ? setVersion.hashCode() : 0);
798839
result = 31 * result + (lastWriteDate != null ? lastWriteDate.hashCode() : 0);
799840
result = 31 * result + (int) (lastUpdateTimeNanos ^ (lastUpdateTimeNanos >>> 32));
841+
result = 31 * result + (int) (idleWritePeriodMillis ^ (idleWritePeriodMillis >>> 32));
800842
result = 31 * result + (ok ? 1 : 0);
801843
result = 31 * result + state.hashCode();
802844
result = 31 * result + version.hashCode();
@@ -835,6 +877,7 @@ public String toString() {
835877
+ ", setVersion=" + setVersion
836878
+ ", lastWriteDate=" + lastWriteDate
837879
+ ", lastUpdateTimeNanos=" + lastUpdateTimeNanos
880+
+ ", idleWritePeriodMillis=" + idleWritePeriodMillis
838881
: "")
839882
+ (exception == null ? "" : ", exception=" + translateExceptionToString())
840883
+ '}';
@@ -899,6 +942,7 @@ private String getRoundTripFormattedInMilliseconds() {
899942
setVersion = builder.setVersion;
900943
lastWriteDate = builder.lastWriteDate;
901944
lastUpdateTimeNanos = builder.lastUpdateTimeNanos;
945+
idleWritePeriodMillis = builder.idleWritePeriodMillis;
902946
exception = builder.exception;
903947
}
904948
}

driver-core/src/test/resources/max-staleness/server_selection/README.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ Configure it with the heartbeatFrequencyMS specified by the test,
4141
or accept the driver's default heartbeatFrequencyMS if the test omits this field.
4242

4343
(Single-threaded and multi-threaded clients now make heartbeatFrequencyMS configurable.
44-
This is a change in Server Discovery and Monitoring to support maxStalenessMS.
44+
This is a change in Server Discovery and Monitoring to support maxStalenessSeconds.
4545
Before, multi-threaded clients were allowed to make it configurable or not.)
4646

4747
For each test, create a new TopologyDescription object initialized with the
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
{
2+
"heartbeatFrequencyMS": 500,
3+
"in_latency_window": [
4+
{
5+
"address": "a:27017",
6+
"avg_rtt_ms": 5,
7+
"idleWritePeriodMillis": 9000,
8+
"lastUpdateTime": 1,
9+
"lastWrite": {
10+
"lastWriteDate": {
11+
"$numberLong": "1"
12+
}
13+
},
14+
"maxWireVersion": 5,
15+
"type": "RSSecondary"
16+
}
17+
],
18+
"read_preference": {
19+
"maxStalenessSeconds": 10,
20+
"mode": "Nearest"
21+
},
22+
"suitable_servers": [
23+
{
24+
"address": "a:27017",
25+
"avg_rtt_ms": 5,
26+
"idleWritePeriodMillis": 9000,
27+
"lastUpdateTime": 1,
28+
"lastWrite": {
29+
"lastWriteDate": {
30+
"$numberLong": "1"
31+
}
32+
},
33+
"maxWireVersion": 5,
34+
"type": "RSSecondary"
35+
},
36+
{
37+
"address": "b:27017",
38+
"avg_rtt_ms": 50,
39+
"idleWritePeriodMillis": 11000,
40+
"lastUpdateTime": 0,
41+
"lastWrite": {
42+
"lastWriteDate": {
43+
"$numberLong": "1"
44+
}
45+
},
46+
"maxWireVersion": 5,
47+
"type": "RSSecondary"
48+
}
49+
],
50+
"topology_description": {
51+
"servers": [
52+
{
53+
"address": "a:27017",
54+
"avg_rtt_ms": 5,
55+
"idleWritePeriodMillis": 9000,
56+
"lastUpdateTime": 1,
57+
"lastWrite": {
58+
"lastWriteDate": {
59+
"$numberLong": "1"
60+
}
61+
},
62+
"maxWireVersion": 5,
63+
"type": "RSSecondary"
64+
},
65+
{
66+
"address": "b:27017",
67+
"avg_rtt_ms": 50,
68+
"idleWritePeriodMillis": 11000,
69+
"lastUpdateTime": 0,
70+
"lastWrite": {
71+
"lastWriteDate": {
72+
"$numberLong": "1"
73+
}
74+
},
75+
"maxWireVersion": 5,
76+
"type": "RSSecondary"
77+
}
78+
],
79+
"type": "ReplicaSetNoPrimary"
80+
}
81+
}

0 commit comments

Comments
 (0)