Skip to content

Commit 0953908

Browse files
CSHARP-2288: Add option for applications to register a custom server selector
1 parent fc75cb4 commit 0953908

File tree

6 files changed

+239
-19
lines changed

6 files changed

+239
-19
lines changed

src/MongoDB.Driver.Core/Core/Clusters/Cluster.cs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ internal abstract class Cluster : ICluster
6767
private ClusterDescription _description;
6868
private TaskCompletionSource<bool> _descriptionChangedTaskCompletionSource;
6969
private readonly object _descriptionLock = new object();
70+
private readonly LatencyLimitingServerSelector _latencyLimitingServerSelector;
7071
private Timer _rapidHeartbeatTimer;
7172
private readonly object _serverSelectionWaitQueueLock = new object();
7273
private int _serverSelectionWaitQueueSize;
@@ -91,6 +92,7 @@ protected Cluster(ClusterSettings settings, IClusterableServerFactory serverFact
9192
_clusterId = new ClusterId();
9293
_description = ClusterDescription.CreateInitial(_clusterId, _settings.ConnectionMode);
9394
_descriptionChangedTaskCompletionSource = new TaskCompletionSource<bool>();
95+
_latencyLimitingServerSelector = new LatencyLimitingServerSelector(settings.LocalThreshold);
9496

9597
_rapidHeartbeatTimer = new Timer(RapidHeartbeatTimerCallback, null, Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
9698

@@ -507,25 +509,23 @@ public void WaitingForDescriptionToChange()
507509
private IServerSelector DecorateSelector(IServerSelector selector)
508510
{
509511
var settings = _cluster.Settings;
510-
if (settings.PreServerSelector != null || settings.PostServerSelector != null)
512+
var allSelectors = new List<IServerSelector>();
513+
514+
if (settings.PreServerSelector != null)
511515
{
512-
var allSelectors = new List<IServerSelector>();
513-
if (settings.PreServerSelector != null)
514-
{
515-
allSelectors.Add(settings.PreServerSelector);
516-
}
516+
allSelectors.Add(settings.PreServerSelector);
517+
}
517518

518-
allSelectors.Add(selector);
519+
allSelectors.Add(selector);
519520

520-
if (settings.PostServerSelector != null)
521-
{
522-
allSelectors.Add(settings.PostServerSelector);
523-
}
524-
525-
return new CompositeServerSelector(allSelectors);
521+
if (settings.PostServerSelector != null)
522+
{
523+
allSelectors.Add(settings.PostServerSelector);
526524
}
527525

528-
return selector;
526+
allSelectors.Add(_cluster._latencyLimitingServerSelector);
527+
528+
return new CompositeServerSelector(allSelectors);
529529
}
530530
}
531531

src/MongoDB.Driver.Core/Core/Configuration/ClusterSettings.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class ClusterSettings
3838
private readonly ClusterConnectionMode _connectionMode;
3939
private readonly IReadOnlyList<EndPoint> _endPoints;
4040
private readonly IReadOnlyDictionary<string, IReadOnlyDictionary<string, object>> _kmsProviders;
41+
private readonly TimeSpan _localThreshold;
4142
private readonly int _maxServerSelectionWaitQueueSize;
4243
private readonly string _replicaSetName;
4344
private readonly IReadOnlyDictionary<string, BsonDocument> _schemaMap;
@@ -53,6 +54,7 @@ public class ClusterSettings
5354
/// <param name="connectionMode">The connection mode.</param>
5455
/// <param name="endPoints">The end points.</param>
5556
/// <param name="kmsProviders">The kms providers.</param>
57+
/// <param name="localThreshold">The local threshold.</param>
5658
/// <param name="maxServerSelectionWaitQueueSize">Maximum size of the server selection wait queue.</param>
5759
/// <param name="replicaSetName">Name of the replica set.</param>
5860
/// <param name="serverSelectionTimeout">The server selection timeout.</param>
@@ -64,6 +66,7 @@ public ClusterSettings(
6466
Optional<ClusterConnectionMode> connectionMode = default(Optional<ClusterConnectionMode>),
6567
Optional<IEnumerable<EndPoint>> endPoints = default(Optional<IEnumerable<EndPoint>>),
6668
Optional<IReadOnlyDictionary<string, IReadOnlyDictionary<string, object>>> kmsProviders = default(Optional<IReadOnlyDictionary<string, IReadOnlyDictionary<string, object>>>),
69+
Optional<TimeSpan> localThreshold = default,
6770
Optional<int> maxServerSelectionWaitQueueSize = default(Optional<int>),
6871
Optional<string> replicaSetName = default(Optional<string>),
6972
Optional<TimeSpan> serverSelectionTimeout = default(Optional<TimeSpan>),
@@ -75,6 +78,7 @@ public ClusterSettings(
7578
_connectionMode = connectionMode.WithDefault(ClusterConnectionMode.Automatic);
7679
_endPoints = Ensure.IsNotNull(endPoints.WithDefault(__defaultEndPoints), "endPoints").ToList();
7780
_kmsProviders = kmsProviders.WithDefault(null);
81+
_localThreshold = Ensure.IsGreaterThanOrEqualToZero(localThreshold.WithDefault(TimeSpan.FromMilliseconds(15)), "localThreshold");
7882
_maxServerSelectionWaitQueueSize = Ensure.IsGreaterThanOrEqualToZero(maxServerSelectionWaitQueueSize.WithDefault(500), "maxServerSelectionWaitQueueSize");
7983
_replicaSetName = replicaSetName.WithDefault(null);
8084
_serverSelectionTimeout = Ensure.IsGreaterThanOrEqualToZero(serverSelectionTimeout.WithDefault(TimeSpan.FromSeconds(30)), "serverSelectionTimeout");
@@ -118,6 +122,17 @@ public IReadOnlyDictionary<string, IReadOnlyDictionary<string, object>> KmsProvi
118122
get { return _kmsProviders; }
119123
}
120124

125+
/// <summary>
126+
/// Gets the local threshold.
127+
/// </summary>
128+
/// <value>
129+
/// The local threshold.
130+
/// </value>
131+
public TimeSpan LocalThreshold
132+
{
133+
get { return _localThreshold; }
134+
}
135+
121136
/// <summary>
122137
/// Gets the maximum size of the server selection wait queue.
123138
/// </summary>
@@ -202,6 +217,7 @@ public IServerSelector PostServerSelector
202217
/// <param name="connectionMode">The connection mode.</param>
203218
/// <param name="endPoints">The end points.</param>
204219
/// <param name="kmsProviders">The kms providers.</param>
220+
/// <param name="localThreshold">The local threshold.</param>
205221
/// <param name="maxServerSelectionWaitQueueSize">Maximum size of the server selection wait queue.</param>
206222
/// <param name="replicaSetName">Name of the replica set.</param>
207223
/// <param name="serverSelectionTimeout">The server selection timeout.</param>
@@ -214,6 +230,7 @@ public ClusterSettings With(
214230
Optional<ClusterConnectionMode> connectionMode = default(Optional<ClusterConnectionMode>),
215231
Optional<IEnumerable<EndPoint>> endPoints = default(Optional<IEnumerable<EndPoint>>),
216232
Optional<IReadOnlyDictionary<string, IReadOnlyDictionary<string, object>>> kmsProviders = default(Optional<IReadOnlyDictionary<string, IReadOnlyDictionary<string, object>>>),
233+
Optional<TimeSpan> localThreshold = default(Optional<TimeSpan>),
217234
Optional<int> maxServerSelectionWaitQueueSize = default(Optional<int>),
218235
Optional<string> replicaSetName = default(Optional<string>),
219236
Optional<TimeSpan> serverSelectionTimeout = default(Optional<TimeSpan>),
@@ -226,6 +243,7 @@ public ClusterSettings With(
226243
connectionMode: connectionMode.WithDefault(_connectionMode),
227244
endPoints: Optional.Enumerable(endPoints.WithDefault(_endPoints)),
228245
kmsProviders: Optional.Create(kmsProviders.WithDefault(_kmsProviders)),
246+
localThreshold: localThreshold.WithDefault(_localThreshold),
229247
maxServerSelectionWaitQueueSize: maxServerSelectionWaitQueueSize.WithDefault(_maxServerSelectionWaitQueueSize),
230248
replicaSetName: replicaSetName.WithDefault(_replicaSetName),
231249
serverSelectionTimeout: serverSelectionTimeout.WithDefault(_serverSelectionTimeout),

src/MongoDB.Driver/ClusterRegistry.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,10 @@ private ClusterSettings ConfigureCluster(ClusterSettings settings, ClusterKey cl
8585
connectionMode: clusterKey.ConnectionMode.ToCore(),
8686
endPoints: Optional.Enumerable(endPoints),
8787
kmsProviders: Optional.Create(clusterKey.KmsProviders),
88+
localThreshold: clusterKey.LocalThreshold,
8889
replicaSetName: clusterKey.ReplicaSetName,
8990
maxServerSelectionWaitQueueSize: clusterKey.WaitQueueSize,
9091
serverSelectionTimeout: clusterKey.ServerSelectionTimeout,
91-
postServerSelector: new LatencyLimitingServerSelector(clusterKey.LocalThreshold),
9292
schemaMap: Optional.Create(clusterKey.SchemaMap),
9393
scheme: clusterKey.Scheme);
9494
}

tests/MongoDB.Driver.Core.Tests/Core/Clusters/ClusterTests.cs

Lines changed: 72 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,13 @@ namespace MongoDB.Driver.Core.Clusters
3737
{
3838
public class ClusterTests
3939
{
40-
private EventCapturer _capturedEvents;
41-
private Mock<IClusterableServerFactory> _mockServerFactory;
40+
private readonly EventCapturer _capturedEvents;
41+
private readonly Mock<IClusterableServerFactory> _mockServerFactory;
4242
private ClusterSettings _settings;
4343

4444
public ClusterTests()
4545
{
46-
_settings = new ClusterSettings(serverSelectionTimeout: TimeSpan.FromSeconds(2),
47-
postServerSelector: new LatencyLimitingServerSelector(TimeSpan.FromMinutes(2)));
46+
_settings = new ClusterSettings(serverSelectionTimeout: TimeSpan.FromSeconds(2));
4847
_mockServerFactory = new Mock<IClusterableServerFactory>();
4948
_mockServerFactory.Setup(f => f.CreateServer(It.IsAny<ClusterId>(), It.IsAny<IClusterClock>(), It.IsAny<EndPoint>()))
5049
.Returns((ClusterId clusterId, IClusterClock clusterClock, EndPoint endPoint) =>
@@ -481,6 +480,59 @@ public void SelectServer_should_apply_both_pre_and_post_server_selectors(
481480
_capturedEvents.Any().Should().BeFalse();
482481
}
483482

483+
[Theory]
484+
[ParameterAttributeData]
485+
public void SelectServer_should_call_custom_selector(
486+
[Values(true, false)] bool withEligibleServers,
487+
[Values(true, false)] bool async)
488+
{
489+
int numberOfCustomServerSelectorCalls = 0;
490+
var customServerSelector = new DelegateServerSelector((c, s) =>
491+
{
492+
numberOfCustomServerSelectorCalls++;
493+
return s.Skip(1);
494+
});
495+
496+
var settings = _settings.With(postServerSelector: customServerSelector);
497+
var subject = new StubCluster(settings, _mockServerFactory.Object, _capturedEvents);
498+
499+
subject.Initialize();
500+
subject.SetServerDescriptions(
501+
ServerDescriptionHelper.Connected(subject.Description.ClusterId, new DnsEndPoint("localhost", 27019)),
502+
ServerDescriptionHelper.Connected(subject.Description.ClusterId, new DnsEndPoint("localhost", 27020)));
503+
_capturedEvents.Clear();
504+
505+
if (withEligibleServers)
506+
{
507+
var selectedServer = SelectServerAttempt(
508+
subject,
509+
new DelegateServerSelector((c, s) => s), // do not filter servers
510+
async);
511+
512+
var selectedServerPort = ((DnsEndPoint)selectedServer.EndPoint).Port;
513+
selectedServerPort.Should().Be(27020);
514+
_capturedEvents.Next().Should().BeOfType<ClusterSelectingServerEvent>();
515+
_capturedEvents.Next().Should().BeOfType<ClusterSelectedServerEvent>();
516+
}
517+
else
518+
{
519+
var exception = Record.Exception(
520+
() =>
521+
SelectServerAttempt(
522+
subject,
523+
new DelegateServerSelector((c, s) => new ServerDescription[0]), // no eligible servers
524+
async));
525+
526+
exception.Should().BeOfType<TimeoutException>();
527+
_capturedEvents.Next().Should().BeOfType<ClusterSelectingServerEvent>();
528+
_capturedEvents.Next().Should().BeOfType<ClusterSelectingServerFailedEvent>();
529+
}
530+
531+
numberOfCustomServerSelectorCalls.Should().Be(1);
532+
_capturedEvents.Any().Should().BeFalse();
533+
}
534+
535+
// private methods
484536
private StubCluster CreateSubject(ClusterConnectionMode connectionMode = ClusterConnectionMode.Automatic, TimeSpan? serverSelectionTimeout = null)
485537
{
486538
_settings = _settings.With(connectionMode: connectionMode);
@@ -492,6 +544,22 @@ private StubCluster CreateSubject(ClusterConnectionMode connectionMode = Cluster
492544
return new StubCluster(_settings, _mockServerFactory.Object, _capturedEvents);
493545
}
494546

547+
private IServer SelectServerAttempt(Cluster cluster, IServerSelector operationSelector, bool async)
548+
{
549+
if (async)
550+
{
551+
return cluster
552+
.SelectServerAsync(operationSelector, CancellationToken.None)
553+
.GetAwaiter()
554+
.GetResult();
555+
}
556+
else
557+
{
558+
return cluster.SelectServer(operationSelector, CancellationToken.None);
559+
}
560+
}
561+
562+
// nested types
495563
private class StubCluster : Cluster
496564
{
497565
private Dictionary<EndPoint, IClusterableServer> _servers = new Dictionary<EndPoint, IClusterableServer>();

0 commit comments

Comments
 (0)