Skip to content

Commit 02558cb

Browse files
committed
CSHARP-1720: Allow users to set a limit on acceptable staleness.
1 parent eeb85dc commit 02558cb

File tree

344 files changed

+13405
-4742
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

344 files changed

+13405
-4742
lines changed

src/MongoDB.Driver.Core/Core/Clusters/ServerSelectors/ReadPreferenceServerSelector.cs

Lines changed: 82 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright 2013-2015 MongoDB Inc.
1+
/* Copyright 2013-2016 MongoDB Inc.
22
*
33
* Licensed under the Apache License, Version 2.0 (the "License");
44
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
1616
using System;
1717
using System.Collections.Generic;
1818
using System.Linq;
19+
using System.Threading;
1920
using MongoDB.Driver.Core.Misc;
2021
using MongoDB.Driver.Core.Servers;
2122

@@ -45,6 +46,7 @@ public static ReadPreferenceServerSelector Primary
4546
#endregion
4647

4748
// fields
49+
private readonly TimeSpan? _maxStaleness; // with Zero and InfiniteTimespan converted to null
4850
private readonly ReadPreference _readPreference;
4951

5052
// constructors
@@ -55,20 +57,36 @@ public static ReadPreferenceServerSelector Primary
5557
public ReadPreferenceServerSelector(ReadPreference readPreference)
5658
{
5759
_readPreference = Ensure.IsNotNull(readPreference, nameof(readPreference));
60+
if (readPreference.MaxStaleness == TimeSpan.Zero || readPreference.MaxStaleness == Timeout.InfiniteTimeSpan)
61+
{
62+
_maxStaleness = null;
63+
}
64+
else
65+
{
66+
_maxStaleness = readPreference.MaxStaleness;
67+
}
5868
}
5969

6070
// methods
6171
/// <inheritdoc/>
6272
public IEnumerable<ServerDescription> SelectServers(ClusterDescription cluster, IEnumerable<ServerDescription> servers)
6373
{
74+
if (_maxStaleness.HasValue)
75+
{
76+
if (cluster.Servers.Any(s => s.Type != ServerType.Unknown && !Feature.MaxStaleness.IsSupported(s.Version)))
77+
{
78+
throw new NotSupportedException("All servers must be version 3.4 or newer to use max staleness.");
79+
}
80+
}
81+
6482
if (cluster.ConnectionMode == ClusterConnectionMode.Direct)
6583
{
6684
return servers;
6785
}
6886

6987
switch (cluster.Type)
7088
{
71-
case ClusterType.ReplicaSet: return SelectForReplicaSet(servers);
89+
case ClusterType.ReplicaSet: return SelectForReplicaSet(cluster, servers);
7290
case ClusterType.Sharded: return SelectForShardedCluster(servers);
7391
case ClusterType.Standalone: return SelectForStandaloneCluster(servers);
7492
case ClusterType.Unknown: return __noServers;
@@ -112,42 +130,55 @@ private IEnumerable<ServerDescription> SelectByTagSets(IEnumerable<ServerDescrip
112130
return __noServers;
113131
}
114132

115-
private IEnumerable<ServerDescription> SelectForReplicaSet(IEnumerable<ServerDescription> servers)
133+
private IEnumerable<ServerDescription> SelectForReplicaSet(ClusterDescription cluster, IEnumerable<ServerDescription> servers)
116134
{
117-
var materializedList = servers as IReadOnlyList<ServerDescription> ?? servers.ToList();
135+
if (_maxStaleness.HasValue)
136+
{
137+
var minHeartBeatIntervalTicks = servers.Select(s => s.HeartbeatInterval.Ticks).Min();
138+
if (_maxStaleness.Value.Ticks < 2 * minHeartBeatIntervalTicks)
139+
{
140+
throw new MongoClientException("MaxStaleness must be at least twice the heartbeat frequency.");
141+
}
142+
143+
servers = new CachedEnumerable<ServerDescription>(SelectFreshServers(cluster, servers)); // prevent multiple enumeration
144+
}
145+
else
146+
{
147+
servers = new CachedEnumerable<ServerDescription>(servers); // prevent multiple enumeration
148+
}
118149

119150
switch (_readPreference.ReadPreferenceMode)
120151
{
121152
case ReadPreferenceMode.Primary:
122-
return materializedList.Where(n => n.Type == ServerType.ReplicaSetPrimary);
153+
return servers.Where(n => n.Type == ServerType.ReplicaSetPrimary);
123154

124155
case ReadPreferenceMode.PrimaryPreferred:
125-
var primary = materializedList.FirstOrDefault(n => n.Type == ServerType.ReplicaSetPrimary);
156+
var primary = servers.FirstOrDefault(n => n.Type == ServerType.ReplicaSetPrimary);
126157
if (primary != null)
127158
{
128159
return new[] { primary };
129160
}
130161
else
131162
{
132-
return SelectByTagSets(materializedList.Where(n => n.Type == ServerType.ReplicaSetSecondary));
163+
return SelectByTagSets(servers.Where(n => n.Type == ServerType.ReplicaSetSecondary));
133164
}
134165

135166
case ReadPreferenceMode.Secondary:
136-
return SelectByTagSets(materializedList.Where(n => n.Type == ServerType.ReplicaSetSecondary));
167+
return SelectByTagSets(servers.Where(n => n.Type == ServerType.ReplicaSetSecondary));
137168

138169
case ReadPreferenceMode.SecondaryPreferred:
139-
var matchingSecondaries = SelectByTagSets(materializedList.Where(n => n.Type == ServerType.ReplicaSetSecondary)).ToList();
170+
var matchingSecondaries = SelectByTagSets(servers.Where(n => n.Type == ServerType.ReplicaSetSecondary)).ToList();
140171
if (matchingSecondaries.Count != 0)
141172
{
142173
return matchingSecondaries;
143174
}
144175
else
145176
{
146-
return materializedList.Where(n => n.Type == ServerType.ReplicaSetPrimary);
177+
return servers.Where(n => n.Type == ServerType.ReplicaSetPrimary);
147178
}
148179

149180
case ReadPreferenceMode.Nearest:
150-
return SelectByTagSets(materializedList.Where(n => n.Type == ServerType.ReplicaSetPrimary || n.Type == ServerType.ReplicaSetSecondary));
181+
return SelectByTagSets(servers.Where(n => n.Type == ServerType.ReplicaSetPrimary || n.Type == ServerType.ReplicaSetSecondary));
151182

152183
default:
153184
throw new ArgumentException("Invalid ReadPreferenceMode.");
@@ -163,5 +194,45 @@ private IEnumerable<ServerDescription> SelectForStandaloneCluster(IEnumerable<Se
163194
{
164195
return servers.Where(n => n.Type == ServerType.Standalone); // standalone servers match any ReadPreference (to facilitate testing)
165196
}
197+
198+
private IReadOnlyList<ServerDescription> SelectFreshServers(ClusterDescription cluster, IEnumerable<ServerDescription> servers)
199+
{
200+
var primary = cluster.Servers.SingleOrDefault(s => s.Type == ServerType.ReplicaSetPrimary);
201+
if (primary == null)
202+
{
203+
return SelectFreshServersWithNoPrimary(cluster, servers);
204+
}
205+
else
206+
{
207+
return SelectFreshServersWithPrimary(cluster, primary, servers);
208+
}
209+
}
210+
211+
private IReadOnlyList<ServerDescription> SelectFreshServersWithNoPrimary(ClusterDescription cluster, IEnumerable<ServerDescription> servers)
212+
{
213+
var smax = servers
214+
.Where(s => s.Type == ServerType.ReplicaSetSecondary)
215+
.OrderByDescending(s => s.LastWriteTimestamp)
216+
.FirstOrDefault();
217+
return servers
218+
.Where(s =>
219+
{
220+
var estimatedStaleness = smax.LastWriteTimestamp.Value - s.LastWriteTimestamp.Value + s.HeartbeatInterval;
221+
return estimatedStaleness <= _maxStaleness;
222+
})
223+
.ToList();
224+
}
225+
226+
private IReadOnlyList<ServerDescription> SelectFreshServersWithPrimary(ClusterDescription cluster, ServerDescription primary, IEnumerable<ServerDescription> servers)
227+
{
228+
var p = primary;
229+
return servers
230+
.Where(s =>
231+
{
232+
var estimatedStaleness = (s.LastUpdateTimestamp - s.LastWriteTimestamp.Value) - (p.LastUpdateTimestamp - p.LastWriteTimestamp.Value) + s.HeartbeatInterval;
233+
return estimatedStaleness <= _maxStaleness;
234+
})
235+
.ToList();
236+
}
166237
}
167238
}

src/MongoDB.Driver.Core/Core/Configuration/ClusterBuilderExtensions.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,14 @@ public static ClusterBuilder ConfigureWithConnectionString(this ClusterBuilder b
6565
{
6666
builder = builder.ConfigureTcp(s => s.With(connectTimeout: connectionString.ConnectTimeout.Value));
6767
}
68+
if (connectionString.HeartbeatInterval.HasValue)
69+
{
70+
builder = builder.ConfigureServer(s => s.With(heartbeatInterval: connectionString.HeartbeatInterval.Value));
71+
}
72+
if (connectionString.HeartbeatTimeout.HasValue)
73+
{
74+
builder = builder.ConfigureServer(s => s.With(heartbeatTimeout: connectionString.HeartbeatTimeout.Value));
75+
}
6876
if (connectionString.Ipv6.HasValue && connectionString.Ipv6.Value)
6977
{
7078
builder = builder.ConfigureTcp(s => s.With(addressFamily: AddressFamily.InterNetworkV6));

src/MongoDB.Driver.Core/Core/Configuration/ConnectionString.cs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,16 @@ public sealed class ConnectionString
4747
private TimeSpan? _connectTimeout;
4848
private string _databaseName;
4949
private bool? _fsync;
50+
private TimeSpan? _heartbeatInterval;
51+
private TimeSpan? _heartbeatTimeout;
5052
private IReadOnlyList<EndPoint> _hosts;
5153
private bool? _ipv6;
5254
private bool? _journal;
5355
private TimeSpan? _localThreshold;
5456
private TimeSpan? _maxIdleTime;
5557
private TimeSpan? _maxLifeTime;
5658
private int? _maxPoolSize;
59+
private TimeSpan? _maxStaleness;
5760
private int? _minPoolSize;
5861
private string _password;
5962
private ReadConcernLevel? _readConcernLevel;
@@ -160,6 +163,22 @@ public bool? FSync
160163
get { return _fsync; }
161164
}
162165

166+
/// <summary>
167+
/// Gets the heartbeat interval.
168+
/// </summary>
169+
public TimeSpan? HeartbeatInterval
170+
{
171+
get { return _heartbeatInterval; }
172+
}
173+
174+
/// <summary>
175+
/// Gets the heartbeat timeout.
176+
/// </summary>
177+
public TimeSpan? HeartbeatTimeout
178+
{
179+
get { return _heartbeatTimeout; }
180+
}
181+
163182
/// <summary>
164183
/// Gets the hosts.
165184
/// </summary>
@@ -216,6 +235,14 @@ public int? MaxPoolSize
216235
get { return _maxPoolSize; }
217236
}
218237

238+
/// <summary>
239+
/// Gets the max staleness.
240+
/// </summary>
241+
public TimeSpan? MaxStaleness
242+
{
243+
get { return _maxStaleness; }
244+
}
245+
219246
/// <summary>
220247
/// Gets the min size of the connection pool.
221248
/// </summary>
@@ -480,6 +507,16 @@ private void ParseOption(string name, string value)
480507
case "gssapiservicename":
481508
_authMechanismProperties.Add("SERVICE_NAME", value);
482509
break;
510+
case "heartbeatfrequency":
511+
case "heartbeatfrequencyms":
512+
case "heartbeatinterval":
513+
case "heartbeatintervalms":
514+
_heartbeatInterval = ParseTimeSpan(name, value);
515+
break;
516+
case "heartbeattimeout":
517+
case "heartbeattimeoutms":
518+
_heartbeatTimeout = ParseTimeSpan(name, value);
519+
break;
483520
case "ipv6":
484521
_ipv6 = ParseBoolean(name, value);
485522
break;
@@ -498,6 +535,10 @@ private void ParseOption(string name, string value)
498535
case "maxpoolsize":
499536
_maxPoolSize = ParseInt32(name, value);
500537
break;
538+
case "maxstaleness":
539+
case "maxstalenessms":
540+
_maxStaleness = ParseTimeSpan(name, value);
541+
break;
501542
case "minpoolsize":
502543
_minPoolSize = ParseInt32(name, value);
503544
break;

src/MongoDB.Driver.Core/Core/Configuration/ServerSettings.cs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,19 @@ namespace MongoDB.Driver.Core.Configuration
2424
/// </summary>
2525
public class ServerSettings
2626
{
27+
#region static
28+
// public static methods
29+
/// <summary>
30+
/// Gets the default heartbeat interval.
31+
/// </summary>
32+
public static TimeSpan DefaultHeartbeatInterval => TimeSpan.FromSeconds(10);
33+
34+
/// <summary>
35+
/// Gets the default heartbeat timeout.
36+
/// </summary>
37+
public static TimeSpan DefaultHeartbeatTimeout => TimeSpan.FromSeconds(10);
38+
#endregion
39+
2740
// fields
2841
private readonly TimeSpan _heartbeatInterval;
2942
private readonly TimeSpan _heartbeatTimeout;
@@ -38,8 +51,8 @@ public ServerSettings(
3851
Optional<TimeSpan> heartbeatInterval = default(Optional<TimeSpan>),
3952
Optional<TimeSpan> heartbeatTimeout = default(Optional<TimeSpan>))
4053
{
41-
_heartbeatInterval = Ensure.IsInfiniteOrGreaterThanOrEqualToZero(heartbeatInterval.WithDefault(TimeSpan.FromSeconds(10)), "heartbeatInterval");
42-
_heartbeatTimeout = Ensure.IsInfiniteOrGreaterThanOrEqualToZero(heartbeatTimeout.WithDefault(TimeSpan.FromSeconds(10)), "heartbeatTimeout");
54+
_heartbeatInterval = Ensure.IsInfiniteOrGreaterThanOrEqualToZero(heartbeatInterval.WithDefault(DefaultHeartbeatInterval), "heartbeatInterval");
55+
_heartbeatTimeout = Ensure.IsInfiniteOrGreaterThanOrEqualToZero(heartbeatTimeout.WithDefault(DefaultHeartbeatTimeout), "heartbeatTimeout");
4356
}
4457

4558
// properties

src/MongoDB.Driver.Core/Core/Connections/IsMasterResult.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,26 @@ public bool IsReplicaSetMember
8585
get { return ServerType.IsReplicaSetMember(); }
8686
}
8787

88+
/// <summary>
89+
/// Gets the last write timestamp.
90+
/// </summary>
91+
/// <value>
92+
/// The last write timestamp.
93+
/// </value>
94+
public DateTime? LastWriteTimestamp
95+
{
96+
get
97+
{
98+
BsonValue value;
99+
if (_wrapped.TryGetValue("lastWrite", out value))
100+
{
101+
return value["lastWriteDate"].ToUniversalTime();
102+
}
103+
104+
return null;
105+
}
106+
}
107+
88108
/// <summary>
89109
/// Gets the maximum number of documents in a batch.
90110
/// </summary>
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/* Copyright 2013-2015 MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
using System.Collections;
17+
using System.Collections.Generic;
18+
using System.Linq;
19+
20+
namespace MongoDB.Driver.Core.Misc
21+
{
22+
internal class CachedEnumerable<T> : IEnumerable<T>
23+
{
24+
private readonly IEnumerable<T> _enumerable;
25+
private IReadOnlyList<T> _cached;
26+
27+
public CachedEnumerable(IEnumerable<T> enumerable)
28+
{
29+
_enumerable = enumerable;
30+
}
31+
32+
public IEnumerator<T> GetEnumerator()
33+
{
34+
if (_cached == null)
35+
{
36+
_cached = (_enumerable as IReadOnlyList<T>) ?? _enumerable.ToList();
37+
}
38+
return _cached.GetEnumerator();
39+
}
40+
41+
IEnumerator IEnumerable.GetEnumerator()
42+
{
43+
return GetEnumerator();
44+
}
45+
}
46+
}

src/MongoDB.Driver.Core/Core/Misc/Ensure.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,20 @@ public static void That(bool assertion, string message)
393393
}
394394
}
395395

396+
/// <summary>
397+
/// Ensures that an assertion is true.
398+
/// </summary>
399+
/// <param name="assertion">The assertion.</param>
400+
/// <param name="message">The message to use with the exception that is thrown if the assertion is false.</param>
401+
/// <param name="paramName">The parameter name.</param>
402+
public static void That(bool assertion, string message, string paramName)
403+
{
404+
if (!assertion)
405+
{
406+
throw new ArgumentException(message, paramName);
407+
}
408+
}
409+
396410
/// <summary>
397411
/// Ensures that the value of a parameter meets an assertion.
398412
/// </summary>

0 commit comments

Comments
 (0)