24
24
using MongoDB . Driver . Core . Async ;
25
25
using MongoDB . Driver . Core . Configuration ;
26
26
using MongoDB . Driver . Core . Events ;
27
+ using MongoDB . Driver . Core . Events . Diagnostics ;
27
28
using MongoDB . Driver . Core . Misc ;
28
29
using MongoDB . Driver . Core . Servers ;
29
30
@@ -51,6 +52,7 @@ internal sealed class MultiServerCluster : Cluster
51
52
private readonly Action < ClusterAddedServerEvent > _addedServerEventHandler ;
52
53
private readonly Action < ClusterRemovingServerEvent > _removingServerEventHandler ;
53
54
private readonly Action < ClusterRemovedServerEvent > _removedServerEventHandler ;
55
+ private readonly Action < SdamInformationEvent > _sdamInformationEventHandler ;
54
56
55
57
// constructors
56
58
public MultiServerCluster ( ClusterSettings settings , IClusterableServerFactory serverFactory , IEventSubscriber eventSubscriber )
@@ -80,6 +82,7 @@ public MultiServerCluster(ClusterSettings settings, IClusterableServerFactory se
80
82
eventSubscriber . TryGetEventHandler ( out _addedServerEventHandler ) ;
81
83
eventSubscriber . TryGetEventHandler ( out _removingServerEventHandler ) ;
82
84
eventSubscriber . TryGetEventHandler ( out _removedServerEventHandler ) ;
85
+ eventSubscriber . TryGetEventHandler ( out _sdamInformationEventHandler ) ;
83
86
}
84
87
85
88
// methods
@@ -327,7 +330,10 @@ private ClusterDescription ProcessReplicaSetChange(ClusterDescription clusterDes
327
330
if ( _maxElectionInfo != null )
328
331
{
329
332
isCurrentPrimaryStale = _maxElectionInfo . IsStale ( args . NewServerDescription . ReplicaSetConfig . Version . Value , args . NewServerDescription . ElectionId ) ;
330
- var isReportedPrimaryStale = ! isCurrentPrimaryStale ;
333
+ var isReportedPrimaryStale = _maxElectionInfo . IsFresher (
334
+ args . NewServerDescription . ReplicaSetConfig . Version . Value ,
335
+ args . NewServerDescription . ElectionId ) ;
336
+
331
337
332
338
if ( isReportedPrimaryStale && args . NewServerDescription . ElectionId != null )
333
339
{
@@ -336,6 +342,20 @@ private ClusterDescription ProcessReplicaSetChange(ClusterDescription clusterDes
336
342
{
337
343
var server = _servers . SingleOrDefault ( x => EndPointHelper . Equals ( args . NewServerDescription . EndPoint , x . EndPoint ) ) ;
338
344
server . Invalidate ( ) ;
345
+
346
+ _sdamInformationEventHandler ? . Invoke ( new SdamInformationEvent ( ( ) =>
347
+ string . Format (
348
+ @"Invalidating server: Setting ServerType to ""Unknown"" for {0} because it " +
349
+ @"claimed to be the replica set primary for replica set ""{1}"" but sent a " +
350
+ @"(setVersion, electionId) tuple of ({2}, {3}) that was less than than the " +
351
+ @"largest tuple seen, (maxSetVersion, maxElectionId), of ({4}, {5})." ,
352
+ args . NewServerDescription . EndPoint ,
353
+ args . NewServerDescription . ReplicaSetConfig . Name ,
354
+ args . NewServerDescription . ReplicaSetConfig . Version ,
355
+ args . NewServerDescription . ElectionId ,
356
+ _maxElectionInfo . SetVersion ,
357
+ _maxElectionInfo . ElectionId ) ) ) ;
358
+
339
359
return clusterDescription . WithServerDescription (
340
360
new ServerDescription ( server . ServerId , server . EndPoint ) ) ;
341
361
}
@@ -344,6 +364,58 @@ private ClusterDescription ProcessReplicaSetChange(ClusterDescription clusterDes
344
364
345
365
if ( isCurrentPrimaryStale )
346
366
{
367
+ if ( _maxElectionInfo == null )
368
+ {
369
+ _sdamInformationEventHandler ? . Invoke ( new SdamInformationEvent ( ( ) =>
370
+ string . Format (
371
+ @"Initializing (maxSetVersion, maxElectionId): Saving tuple " +
372
+ @"(setVersion, electionId) of ({0}, {1}) as (maxSetVersion, maxElectionId) for " +
373
+ @"replica set ""{2}"" because replica set primary {3} sent ({0}, {1}), the first " +
374
+ @"(setVersion, electionId) tuple ever seen for replica set ""{4}""." ,
375
+ args . NewServerDescription . ReplicaSetConfig . Version ,
376
+ args . NewServerDescription . ElectionId ,
377
+ args . NewServerDescription . ReplicaSetConfig . Name ,
378
+ args . NewServerDescription . EndPoint ,
379
+ args . NewServerDescription . ReplicaSetConfig . Name ) ) ) ;
380
+ }
381
+ else
382
+ {
383
+ if ( _maxElectionInfo . SetVersion < args . NewServerDescription . ReplicaSetConfig . Version . Value )
384
+ {
385
+ _sdamInformationEventHandler ? . Invoke ( new SdamInformationEvent ( ( ) =>
386
+ string . Format (
387
+ @"Updating stale setVersion: Updating the current " +
388
+ @"(maxSetVersion, maxElectionId) tuple from ({0}, {1}) to ({2}, {3}) for " +
389
+ @"replica set ""{4}"" because replica set primary {5} sent ({6}, {7})—a larger " +
390
+ @"(setVersion, electionId) tuple then the saved tuple, ({0}, {1})." ,
391
+ _maxElectionInfo . SetVersion ,
392
+ _maxElectionInfo . ElectionId ,
393
+ args . NewServerDescription . ReplicaSetConfig . Version ,
394
+ args . NewServerDescription . ElectionId ,
395
+ args . NewServerDescription . ReplicaSetConfig . Name ,
396
+ args . NewServerDescription . EndPoint ,
397
+ args . NewServerDescription . ReplicaSetConfig . Version ,
398
+ args . NewServerDescription . ElectionId ) ) ) ;
399
+ }
400
+ else // current primary is stale & setVersion is not stale ⇒ the electionId must be stale
401
+ {
402
+ _sdamInformationEventHandler ? . Invoke ( new SdamInformationEvent ( ( ) =>
403
+ string . Format (
404
+ @"Updating stale electionId: Updating the current " +
405
+ @"(maxSetVersion, maxElectionId) tuple from ({0}, {1}) to ({2}, {3}) for " +
406
+ @"replica set ""{4}"" because replica set primary {5} sent ({6}, {7})—" +
407
+ @"a larger (setVersion, electionId) tuple than the saved tuple, ({0}, {1})." ,
408
+ _maxElectionInfo . SetVersion ,
409
+ _maxElectionInfo . ElectionId ,
410
+ args . NewServerDescription . ReplicaSetConfig . Version ,
411
+ args . NewServerDescription . ElectionId ,
412
+ args . NewServerDescription . ReplicaSetConfig . Name ,
413
+ args . NewServerDescription . EndPoint ,
414
+ args . NewServerDescription . ReplicaSetConfig . Version ,
415
+ args . NewServerDescription . ElectionId ) ) ) ;
416
+ }
417
+ }
418
+
347
419
_maxElectionInfo = new ElectionInfo (
348
420
args . NewServerDescription . ReplicaSetConfig . Version . Value ,
349
421
args . NewServerDescription . ElectionId ) ;
@@ -522,6 +594,13 @@ public ElectionInfo(int setVersion, ElectionId electionId)
522
594
523
595
public ElectionId ElectionId => _electionId ;
524
596
597
+ public bool IsFresher ( int setVersion , ElectionId electionId )
598
+ {
599
+ return
600
+ _setVersion > setVersion ||
601
+ _setVersion == setVersion && _electionId != null && _electionId . CompareTo ( electionId ) > 0 ;
602
+ }
603
+
525
604
public bool IsStale ( int setVersion , ElectionId electionId )
526
605
{
527
606
if ( _setVersion < setVersion )
@@ -532,13 +611,18 @@ public bool IsStale(int setVersion, ElectionId electionId)
532
611
{
533
612
return false ;
534
613
}
535
-
614
+ // Now it must be that _setVersion == setVersion
536
615
if ( _electionId == null )
537
616
{
538
617
return true ;
539
618
}
540
-
541
- return _electionId . CompareTo ( electionId ) <= 0 ;
619
+
620
+ return _electionId . CompareTo ( electionId ) < 0 ;
621
+
622
+ /* above is equivalent to:
623
+ * return
624
+ * _setVersion < setVersion
625
+ * || _setVersion == setVersion && (_electionId == null || _electionId.CompareTo(electionId) < 0); */
542
626
}
543
627
}
544
628
}
0 commit comments