Skip to content

Commit 6bda3fe

Browse files
committed
CSHARP-2565: Topology should synchronously update ServerDescriptions
1 parent 3a85356 commit 6bda3fe

File tree

3 files changed

+97
-147
lines changed

3 files changed

+97
-147
lines changed

src/MongoDB.Driver.Core/Core/Clusters/MultiServerCluster.cs

Lines changed: 73 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@ internal sealed class MultiServerCluster : Cluster
3939
private readonly CancellationTokenSource _monitorServersCancellationTokenSource;
4040
private volatile ElectionInfo _maxElectionInfo;
4141
private volatile string _replicaSetName;
42-
private readonly AsyncQueue<ServerDescriptionChangedEventArgs> _serverDescriptionChangedQueue;
4342
private readonly List<IClusterableServer> _servers;
4443
private readonly object _serversLock = new object();
4544
private readonly InterlockedInt32 _state;
45+
private readonly object _updateClusterDescriptionLock = new object();
4646

4747
private readonly Action<ClusterClosingEvent> _closingEventHandler;
4848
private readonly Action<ClusterClosedEvent> _closedEventHandler;
@@ -69,7 +69,6 @@ public MultiServerCluster(ClusterSettings settings, IClusterableServerFactory se
6969
}
7070

7171
_monitorServersCancellationTokenSource = new CancellationTokenSource();
72-
_serverDescriptionChangedQueue = new AsyncQueue<ServerDescriptionChangedEventArgs>();
7372
_servers = new List<IClusterableServer>();
7473
_state = new InterlockedInt32(State.Initial);
7574
_replicaSetName = settings.ReplicaSetName;
@@ -132,24 +131,27 @@ public override void Initialize()
132131
}
133132

134133
var stopwatch = Stopwatch.StartNew();
135-
MonitorServersAsync().ConfigureAwait(false);
136-
// We lock here even though AddServer locks. Monitors
137-
// are re-entrant such that this won't cause problems,
138-
// but could prevent issues of conflicting reports
139-
// from servers that are quick to respond.
140-
var clusterDescription = Description.WithType(Settings.ConnectionMode.ToClusterType());
134+
141135
var newServers = new List<IClusterableServer>();
142-
lock (_serversLock)
136+
lock (_updateClusterDescriptionLock)
143137
{
144-
foreach (var endPoint in Settings.EndPoints)
138+
// We lock here even though AddServer locks. Monitors
139+
// are re-entrant such that this won't cause problems,
140+
// but could prevent issues of conflicting reports
141+
// from servers that are quick to respond.
142+
var clusterDescription = Description.WithType(Settings.ConnectionMode.ToClusterType());
143+
lock (_serversLock)
145144
{
146-
clusterDescription = EnsureServer(clusterDescription, endPoint, newServers);
145+
foreach (var endPoint in Settings.EndPoints)
146+
{
147+
clusterDescription = EnsureServer(clusterDescription, endPoint, newServers);
148+
}
147149
}
148-
}
149150

150-
stopwatch.Stop();
151+
stopwatch.Stop();
151152

152-
UpdateClusterDescription(clusterDescription);
153+
UpdateClusterDescription(clusterDescription);
154+
}
153155

154156
foreach (var server in newServers)
155157
{
@@ -217,95 +219,84 @@ protected override void RequestHeartbeat()
217219
}
218220
}
219221

220-
private async Task MonitorServersAsync()
222+
private void ServerDescriptionChangedHandler(object sender, ServerDescriptionChangedEventArgs args)
221223
{
222-
var monitorServersCancellationToken = _monitorServersCancellationTokenSource.Token;
223-
while (!monitorServersCancellationToken.IsCancellationRequested)
224+
try
224225
{
225-
try
226-
{
227-
var eventArgs = await _serverDescriptionChangedQueue.DequeueAsync(monitorServersCancellationToken).ConfigureAwait(false); // TODO: add timeout and cancellationToken to DequeueAsync
228-
ProcessServerDescriptionChanged(eventArgs);
229-
}
230-
catch (OperationCanceledException) when (monitorServersCancellationToken.IsCancellationRequested)
231-
{
232-
// ignore OperationCanceledException when monitor servers cancellation is requested
233-
}
234-
catch (Exception unexpectedException)
226+
ProcessServerDescriptionChanged(args);
227+
}
228+
catch (Exception unexpectedException)
229+
{
230+
// if we catch an exception here it's because of a bug in the driver
231+
var handler = _sdamInformationEventHandler;
232+
if (handler != null)
235233
{
236-
// if we catch an exception here it's because of a bug in the driver
237-
238-
var handler = _sdamInformationEventHandler;
239-
if (handler != null)
234+
try
240235
{
241-
try
242-
{
243-
handler.Invoke(new SdamInformationEvent(() =>
244-
string.Format(
245-
"Unexpected exception in MultiServerCluster.MonitorServersAsync: {0}",
246-
unexpectedException.ToString())));
247-
}
248-
catch
249-
{
250-
// ignore any exceptions thrown by the handler (note: event handlers aren't supposed to throw exceptions)
251-
}
236+
handler.Invoke(new SdamInformationEvent(() =>
237+
string.Format(
238+
"Unexpected exception in MultiServerCluster.ServerDescriptionChangedHandler: {0}",
239+
unexpectedException.ToString())));
240+
}
241+
catch
242+
{
243+
// ignore any exceptions thrown by the handler (note: event handlers aren't supposed to throw exceptions)
252244
}
253-
254-
// TODO: should we reset the cluster state in some way? (the state is undefined since an unexpected exception was thrown)
255245
}
246+
// TODO: should we reset the cluster state in some way? (the state is undefined since an unexpected exception was thrown)
256247
}
257248
}
258249

259-
private void ServerDescriptionChangedHandler(object sender, ServerDescriptionChangedEventArgs args)
260-
{
261-
_serverDescriptionChangedQueue.Enqueue(args);
262-
}
263-
264250
private void ProcessServerDescriptionChanged(ServerDescriptionChangedEventArgs args)
265251
{
266-
var newServerDescription = args.NewServerDescription;
267-
var newClusterDescription = Description;
268-
269-
if (!_servers.Any(x => EndPointHelper.Equals(x.EndPoint, newServerDescription.EndPoint)))
270-
{
271-
return;
272-
}
273-
274252
var newServers = new List<IClusterableServer>();
275-
if (newServerDescription.State == ServerState.Disconnected)
253+
lock (_updateClusterDescriptionLock)
276254
{
277-
newClusterDescription = newClusterDescription.WithServerDescription(newServerDescription);
278-
}
279-
else
280-
{
281-
if (IsServerValidForCluster(newClusterDescription.Type, Settings.ConnectionMode, newServerDescription.Type))
255+
var newServerDescription = args.NewServerDescription;
256+
var newClusterDescription = Description;
257+
258+
if (!_servers.Any(x => EndPointHelper.Equals(x.EndPoint, newServerDescription.EndPoint)))
282259
{
283-
if (newClusterDescription.Type == ClusterType.Unknown)
284-
{
285-
newClusterDescription = newClusterDescription.WithType(newServerDescription.Type.ToClusterType());
286-
}
260+
return;
261+
}
287262

288-
switch (newClusterDescription.Type)
263+
if (newServerDescription.State == ServerState.Disconnected)
264+
{
265+
newClusterDescription = newClusterDescription.WithServerDescription(newServerDescription);
266+
}
267+
else
268+
{
269+
if (IsServerValidForCluster(newClusterDescription.Type, Settings.ConnectionMode, newServerDescription.Type))
289270
{
290-
case ClusterType.ReplicaSet:
291-
newClusterDescription = ProcessReplicaSetChange(newClusterDescription, args, newServers);
292-
break;
271+
if (newClusterDescription.Type == ClusterType.Unknown)
272+
{
273+
newClusterDescription = newClusterDescription.WithType(newServerDescription.Type.ToClusterType());
274+
}
275+
276+
switch (newClusterDescription.Type)
277+
{
293278

294-
case ClusterType.Sharded:
295-
newClusterDescription = ProcessShardedChange(newClusterDescription, args);
296-
break;
297279

298-
default:
299-
throw new MongoInternalException("Unexpected cluster type.");
280+
case ClusterType.ReplicaSet:
281+
newClusterDescription = ProcessReplicaSetChange(newClusterDescription, args, newServers);
282+
break;
283+
284+
case ClusterType.Sharded:
285+
newClusterDescription = ProcessShardedChange(newClusterDescription, args);
286+
break;
287+
288+
default:
289+
throw new MongoInternalException("Unexpected cluster type.");
290+
}
291+
}
292+
else
293+
{
294+
newClusterDescription = newClusterDescription.WithoutServerDescription(newServerDescription.EndPoint);
300295
}
301296
}
302-
else
303-
{
304-
newClusterDescription = newClusterDescription.WithoutServerDescription(newServerDescription.EndPoint);
305-
}
306-
}
307297

308-
UpdateClusterDescription(newClusterDescription);
298+
UpdateClusterDescription(newClusterDescription);
299+
}
309300

310301
foreach (var server in newServers)
311302
{

src/MongoDB.Driver.Core/Core/Servers/ServerDescription.cs

Lines changed: 21 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -507,57 +507,27 @@ public ServerDescription With(
507507
Optional<SemanticVersion> version = default(Optional<SemanticVersion>),
508508
Optional<Range<int>> wireVersionRange = default(Optional<Range<int>>))
509509
{
510-
if (!lastUpdateTimestamp.HasValue)
511-
{
512-
lastUpdateTimestamp = DateTime.UtcNow;
513-
}
514-
515-
if (
516-
averageRoundTripTime.Replaces(_averageRoundTripTime) ||
517-
canonicalEndPoint.Replaces(_canonicalEndPoint) ||
518-
electionId.Replaces(_electionId) ||
519-
heartbeatException.Replaces(_heartbeatException) ||
520-
heartbeatInterval.Replaces(_heartbeatInterval) ||
521-
lastUpdateTimestamp.Replaces(_lastUpdateTimestamp) ||
522-
lastWriteTimestamp.Replaces(_lastWriteTimestamp) ||
523-
logicalSessionTimeout.Replaces(_logicalSessionTimeout) ||
524-
maxBatchCount.Replaces(_maxBatchCount) ||
525-
maxDocumentSize.Replaces(_maxDocumentSize) ||
526-
maxMessageSize.Replaces(_maxMessageSize) ||
527-
maxWireDocumentSize.Replaces(_maxWireDocumentSize) ||
528-
replicaSetConfig.Replaces(_replicaSetConfig) ||
529-
state.Replaces(_state) ||
530-
tags.Replaces(_tags) ||
531-
type.Replaces(_type) ||
532-
version.Replaces(_version) ||
533-
wireVersionRange.Replaces(_wireVersionRange))
534-
{
535-
return new ServerDescription(
536-
_serverId,
537-
_endPoint,
538-
averageRoundTripTime: averageRoundTripTime.WithDefault(_averageRoundTripTime),
539-
canonicalEndPoint: canonicalEndPoint.WithDefault(_canonicalEndPoint),
540-
electionId: electionId.WithDefault(_electionId),
541-
heartbeatException: heartbeatException.WithDefault(_heartbeatException),
542-
heartbeatInterval: heartbeatInterval.WithDefault(_heartbeatInterval),
543-
lastUpdateTimestamp: lastUpdateTimestamp.WithDefault(_lastUpdateTimestamp),
544-
lastWriteTimestamp: lastWriteTimestamp.WithDefault(_lastWriteTimestamp),
545-
logicalSessionTimeout: logicalSessionTimeout.WithDefault(_logicalSessionTimeout),
546-
maxBatchCount: maxBatchCount.WithDefault(_maxBatchCount),
547-
maxDocumentSize: maxDocumentSize.WithDefault(_maxDocumentSize),
548-
maxMessageSize: maxMessageSize.WithDefault(_maxMessageSize),
549-
maxWireDocumentSize: maxWireDocumentSize.WithDefault(_maxWireDocumentSize),
550-
replicaSetConfig: replicaSetConfig.WithDefault(_replicaSetConfig),
551-
state: state.WithDefault(_state),
552-
tags: tags.WithDefault(_tags),
553-
type: type.WithDefault(_type),
554-
version: version.WithDefault(_version),
555-
wireVersionRange: wireVersionRange.WithDefault(_wireVersionRange));
556-
}
557-
else
558-
{
559-
return this;
560-
}
510+
return new ServerDescription(
511+
_serverId,
512+
_endPoint,
513+
averageRoundTripTime: averageRoundTripTime.WithDefault(_averageRoundTripTime),
514+
canonicalEndPoint: canonicalEndPoint.WithDefault(_canonicalEndPoint),
515+
electionId: electionId.WithDefault(_electionId),
516+
heartbeatException: heartbeatException.WithDefault(_heartbeatException),
517+
heartbeatInterval: heartbeatInterval.WithDefault(_heartbeatInterval),
518+
lastUpdateTimestamp: lastUpdateTimestamp.WithDefault(DateTime.UtcNow),
519+
lastWriteTimestamp: lastWriteTimestamp.WithDefault(_lastWriteTimestamp),
520+
logicalSessionTimeout: logicalSessionTimeout.WithDefault(_logicalSessionTimeout),
521+
maxBatchCount: maxBatchCount.WithDefault(_maxBatchCount),
522+
maxDocumentSize: maxDocumentSize.WithDefault(_maxDocumentSize),
523+
maxMessageSize: maxMessageSize.WithDefault(_maxMessageSize),
524+
maxWireDocumentSize: maxWireDocumentSize.WithDefault(_maxWireDocumentSize),
525+
replicaSetConfig: replicaSetConfig.WithDefault(_replicaSetConfig),
526+
state: state.WithDefault(_state),
527+
tags: tags.WithDefault(_tags),
528+
type: type.WithDefault(_type),
529+
version: version.WithDefault(_version),
530+
wireVersionRange: wireVersionRange.WithDefault(_wireVersionRange));
561531
}
562532

563533
/// <summary>

src/MongoDB.Driver.Core/Core/Servers/ServerMonitor.cs

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public void Initialize()
9393

9494
public void Invalidate()
9595
{
96-
SetDescriptionIfChanged(_baseDescription);
96+
SetDescription(_baseDescription.With(lastUpdateTimestamp: DateTime.UtcNow));
9797
RequestHeartbeat();
9898
}
9999

@@ -232,15 +232,15 @@ private async Task<bool> HeartbeatAsync(CancellationToken cancellationToken)
232232
}
233233
else
234234
{
235-
newDescription = _baseDescription;
235+
newDescription = _baseDescription.With(lastUpdateTimestamp: DateTime.UtcNow);
236236
}
237237

238238
if (heartbeatException != null)
239239
{
240240
newDescription = newDescription.With(heartbeatException: heartbeatException);
241241
}
242242

243-
SetDescriptionIfChanged(newDescription);
243+
SetDescription(newDescription);
244244

245245
return true;
246246
}
@@ -322,17 +322,6 @@ private void SetDescription(ServerDescription oldDescription, ServerDescription
322322
OnDescriptionChanged(oldDescription, newDescription);
323323
}
324324

325-
private void SetDescriptionIfChanged(ServerDescription newDescription)
326-
{
327-
var oldDescription = Interlocked.CompareExchange(ref _currentDescription, null, null);
328-
if (oldDescription.Equals(newDescription))
329-
{
330-
return;
331-
}
332-
333-
SetDescription(oldDescription, newDescription);
334-
}
335-
336325
private void ThrowIfDisposed()
337326
{
338327
if (_state.Value == State.Disposed)

0 commit comments

Comments
 (0)