@@ -46,7 +46,7 @@ public static ReadPreferenceServerSelector Primary
46
46
#endregion
47
47
48
48
// fields
49
- private readonly TimeSpan ? _maxStaleness ; // with Zero and InfiniteTimespan converted to null
49
+ private readonly TimeSpan ? _maxStaleness ; // with InfiniteTimespan converted to null
50
50
private readonly ReadPreference _readPreference ;
51
51
52
52
// constructors
@@ -57,7 +57,7 @@ public static ReadPreferenceServerSelector Primary
57
57
public ReadPreferenceServerSelector ( ReadPreference readPreference )
58
58
{
59
59
_readPreference = Ensure . IsNotNull ( readPreference , nameof ( readPreference ) ) ;
60
- if ( readPreference . MaxStaleness == TimeSpan . Zero || readPreference . MaxStaleness == Timeout . InfiniteTimeSpan )
60
+ if ( readPreference . MaxStaleness == Timeout . InfiniteTimeSpan )
61
61
{
62
62
_maxStaleness = null ;
63
63
}
@@ -132,53 +132,42 @@ private IEnumerable<ServerDescription> SelectByTagSets(IEnumerable<ServerDescrip
132
132
133
133
private IEnumerable < ServerDescription > SelectForReplicaSet ( ClusterDescription cluster , IEnumerable < ServerDescription > servers )
134
134
{
135
- if ( _maxStaleness . HasValue )
136
- {
137
- var minHeartBeatIntervalTicks = servers . Select ( s => s . HeartbeatInterval . Ticks ) . Min ( ) ;
138
- if ( _maxStaleness . Value . Ticks < 2 * minHeartBeatIntervalTicks )
139
- {
140
- throw new MongoClientException ( "MaxStaleness must be at least twice the heartbeat interval." ) ;
141
- }
135
+ EnsureMaxStalenessIsValid ( cluster ) ;
142
136
143
- servers = new CachedEnumerable < ServerDescription > ( SelectFreshServers ( cluster , servers ) ) ; // prevent multiple enumeration
144
- }
145
- else
146
- {
147
- servers = new CachedEnumerable < ServerDescription > ( servers ) ; // prevent multiple enumeration
148
- }
137
+ servers = new CachedEnumerable < ServerDescription > ( servers ) ; // prevent multiple enumeration
149
138
150
139
switch ( _readPreference . ReadPreferenceMode )
151
140
{
152
141
case ReadPreferenceMode . Primary :
153
- return servers . Where ( n => n . Type == ServerType . ReplicaSetPrimary ) ;
142
+ return SelectPrimary ( servers ) ;
154
143
155
144
case ReadPreferenceMode . PrimaryPreferred :
156
- var primary = servers . FirstOrDefault ( n => n . Type == ServerType . ReplicaSetPrimary ) ;
157
- if ( primary != null )
145
+ var primary = SelectPrimary ( servers ) ;
146
+ if ( primary . Count != 0 )
158
147
{
159
- return new [ ] { primary } ;
148
+ return primary ;
160
149
}
161
150
else
162
151
{
163
- return SelectByTagSets ( servers . Where ( n => n . Type == ServerType . ReplicaSetSecondary ) ) ;
152
+ return SelectByTagSets ( SelectFreshSecondaries ( cluster , servers ) ) ;
164
153
}
165
154
166
155
case ReadPreferenceMode . Secondary :
167
- return SelectByTagSets ( servers . Where ( n => n . Type == ServerType . ReplicaSetSecondary ) ) ;
156
+ return SelectByTagSets ( SelectFreshSecondaries ( cluster , servers ) ) ;
168
157
169
158
case ReadPreferenceMode . SecondaryPreferred :
170
- var matchingSecondaries = SelectByTagSets ( servers . Where ( n => n . Type == ServerType . ReplicaSetSecondary ) ) . ToList ( ) ;
171
- if ( matchingSecondaries . Count != 0 )
159
+ var selectedSecondaries = SelectByTagSets ( SelectFreshSecondaries ( cluster , servers ) ) . ToList ( ) ;
160
+ if ( selectedSecondaries . Count != 0 )
172
161
{
173
- return matchingSecondaries ;
162
+ return selectedSecondaries ;
174
163
}
175
164
else
176
165
{
177
- return servers . Where ( n => n . Type == ServerType . ReplicaSetPrimary ) ;
166
+ return SelectPrimary ( servers ) ;
178
167
}
179
168
180
169
case ReadPreferenceMode . Nearest :
181
- return SelectByTagSets ( servers . Where ( n => n . Type == ServerType . ReplicaSetPrimary || n . Type == ServerType . ReplicaSetSecondary ) ) ;
170
+ return SelectByTagSets ( SelectPrimary ( servers ) . Concat ( SelectFreshSecondaries ( cluster , servers ) ) ) ;
182
171
183
172
default :
184
173
throw new ArgumentException ( "Invalid ReadPreferenceMode." ) ;
@@ -195,44 +184,93 @@ private IEnumerable<ServerDescription> SelectForStandaloneCluster(IEnumerable<Se
195
184
return servers . Where ( n => n . Type == ServerType . Standalone ) ; // standalone servers match any ReadPreference (to facilitate testing)
196
185
}
197
186
198
- private IReadOnlyList < ServerDescription > SelectFreshServers ( ClusterDescription cluster , IEnumerable < ServerDescription > servers )
187
+ private List < ServerDescription > SelectPrimary ( IEnumerable < ServerDescription > servers )
199
188
{
200
- var primary = cluster . Servers . SingleOrDefault ( s => s . Type == ServerType . ReplicaSetPrimary ) ;
201
- if ( primary == null )
189
+ var primary = servers . Where ( s => s . Type == ServerType . ReplicaSetPrimary ) . ToList ( ) ;
190
+ if ( primary . Count > 1 )
202
191
{
203
- return SelectFreshServersWithNoPrimary ( cluster , servers ) ;
192
+ throw new MongoClientException ( $ "More than one primary found: [{ string . Join ( ", " , servers . Select ( s => s . ToString ( ) ) ) } ].") ;
193
+ }
194
+ return primary ; // returned as a list because otherwise some callers would have to create a new list
195
+ }
196
+
197
+ private IEnumerable < ServerDescription > SelectSecondaries ( IEnumerable < ServerDescription > servers )
198
+ {
199
+ return servers . Where ( s => s . Type == ServerType . ReplicaSetSecondary ) ;
200
+ }
201
+
202
+ private IEnumerable < ServerDescription > SelectFreshSecondaries ( ClusterDescription cluster , IEnumerable < ServerDescription > servers )
203
+ {
204
+ var secondaries = SelectSecondaries ( servers ) ;
205
+
206
+ if ( _maxStaleness . HasValue )
207
+ {
208
+ var primary = SelectPrimary ( cluster . Servers ) . SingleOrDefault ( ) ;
209
+ if ( primary == null )
210
+ {
211
+ return SelectFreshSecondariesWithNoPrimary ( secondaries ) ;
212
+ }
213
+ else
214
+ {
215
+
216
+ return SelectFreshSecondariesWithPrimary ( primary , secondaries ) ;
217
+ }
204
218
}
205
219
else
206
220
{
207
- return SelectFreshServersWithPrimary ( cluster , primary , servers ) ;
221
+ return secondaries ;
208
222
}
209
223
}
210
224
211
- private IReadOnlyList < ServerDescription > SelectFreshServersWithNoPrimary ( ClusterDescription cluster , IEnumerable < ServerDescription > servers )
225
+ private IEnumerable < ServerDescription > SelectFreshSecondariesWithNoPrimary ( IEnumerable < ServerDescription > secondaries )
212
226
{
213
- var smax = servers
214
- . Where ( s => s . Type == ServerType . ReplicaSetSecondary )
227
+ var smax = secondaries
215
228
. OrderByDescending ( s => s . LastWriteTimestamp )
216
229
. FirstOrDefault ( ) ;
217
- return servers
230
+ if ( smax == null )
231
+ {
232
+ return Enumerable . Empty < ServerDescription > ( ) ;
233
+ }
234
+
235
+ return secondaries
218
236
. Where ( s =>
219
237
{
220
238
var estimatedStaleness = smax . LastWriteTimestamp . Value - s . LastWriteTimestamp . Value + s . HeartbeatInterval ;
221
239
return estimatedStaleness <= _maxStaleness ;
222
- } )
223
- . ToList ( ) ;
240
+ } ) ;
224
241
}
225
242
226
- private IReadOnlyList < ServerDescription > SelectFreshServersWithPrimary ( ClusterDescription cluster , ServerDescription primary , IEnumerable < ServerDescription > servers )
243
+ private IEnumerable < ServerDescription > SelectFreshSecondariesWithPrimary ( ServerDescription primary , IEnumerable < ServerDescription > secondaries )
227
244
{
228
245
var p = primary ;
229
- return servers
246
+ return secondaries
230
247
. Where ( s =>
231
- {
248
+ {
232
249
var estimatedStaleness = ( s . LastUpdateTimestamp - s . LastWriteTimestamp . Value ) - ( p . LastUpdateTimestamp - p . LastWriteTimestamp . Value ) + s . HeartbeatInterval ;
233
250
return estimatedStaleness <= _maxStaleness ;
234
- } )
235
- . ToList ( ) ;
251
+ } ) ;
252
+ }
253
+
254
+ private void EnsureMaxStalenessIsValid ( ClusterDescription cluster )
255
+ {
256
+ if ( _maxStaleness . HasValue )
257
+ {
258
+ var primary = SelectPrimary ( cluster . Servers ) . SingleOrDefault ( ) ;
259
+ var primaryIdleWritePeriod = primary ? . IdleWritePeriod ;
260
+
261
+ foreach ( var server in cluster . Servers )
262
+ {
263
+ if ( server . Type == ServerType . ReplicaSetPrimary || server . Type == ServerType . ReplicaSetSecondary )
264
+ {
265
+ var heartbeatInterval = server . HeartbeatInterval ;
266
+ var idleWriteTime = primaryIdleWritePeriod ?? server . IdleWritePeriod ;
267
+ if ( _maxStaleness . Value < heartbeatInterval + idleWriteTime )
268
+ {
269
+ throw new Exception ( "Max staleness must greater than or equal to heartbeat interval plus idle write period." ) ;
270
+ }
271
+ }
272
+ }
273
+ }
236
274
}
237
275
}
238
276
}
0 commit comments