Skip to content

Commit 933b5dc

Browse files
SWIFT-1465 Filter servers by max staleness (#735)
1 parent b6d5534 commit 933b5dc

File tree

72 files changed

+4633
-45
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+4633
-45
lines changed

Sources/MongoSwift/SDAM.swift

Lines changed: 151 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
import CLibMongoC
22
import Foundation
33

4+
internal enum SDAMConstants {
5+
internal static let defaultHeartbeatFrequencyMS = 10000
6+
internal static let idleWritePeriodMS = 10000
7+
internal static let smallestMaxStalenessSeconds = 90
8+
}
9+
410
/// A struct representing a network address, consisting of a host and port.
511
public struct ServerAddress: Equatable, Hashable {
612
/// The hostname or IP address.
@@ -230,20 +236,27 @@ public struct ServerDescription {
230236
self.tags = hello?.tags ?? [:]
231237
}
232238

233-
// For testing purposes
234-
internal init(address: ServerAddress, type: ServerType, tags: [String: String]?) {
239+
// Used for server selection/max staleness tests.
240+
internal init(
241+
address: ServerAddress,
242+
type: ServerType,
243+
tags: [String: String]?,
244+
lastWriteDate: Date?,
245+
maxWireVersion: Int?,
246+
lastUpdateTime: Date?
247+
) {
235248
self.address = address
236249
self.type = type
237250
self.tags = tags ?? [:]
251+
self.lastWriteDate = lastWriteDate
252+
self.lastUpdateTime = lastUpdateTime ?? Date()
253+
self.maxWireVersion = maxWireVersion ?? 0
238254

239255
// these fields are not used by the server selection tests
240256
self.serverId = 0
241257
self.roundTripTime = 0
242258
self.averageRoundTripTimeMS = nil
243-
self.lastUpdateTime = Date()
244-
self.lastWriteDate = nil
245259
self.minWireVersion = 0
246-
self.maxWireVersion = 0
247260
self.me = self.address
248261
self.setName = nil
249262
self.setVersion = nil
@@ -418,29 +431,42 @@ public struct TopologyDescription: Equatable {
418431
}
419432

420433
extension TopologyDescription {
421-
internal func findSuitableServers(readPreference: ReadPreference? = nil) -> [ServerDescription] {
434+
internal func findSuitableServers(
435+
readPreference: ReadPreference? = nil,
436+
heartbeatFrequencyMS: Int
437+
) throws -> [ServerDescription] {
438+
try readPreference?.validateMaxStalenessSeconds(
439+
heartbeatFrequencyMS: heartbeatFrequencyMS,
440+
topologyType: self.type
441+
)
422442
switch self.type._topologyType {
423443
case .unknown:
424444
return []
425-
case .single,
426-
.loadBalanced:
445+
case .single, .loadBalanced:
427446
return self.servers
428-
case .replicaSetNoPrimary,
429-
.replicaSetWithPrimary:
447+
case .replicaSetNoPrimary, .replicaSetWithPrimary:
430448
switch readPreference?.mode {
431449
case .secondary:
432-
let secondaries = self.servers.filter { $0.type == .rsSecondary }
433-
return self.filterReplicaSetServers(readPreference: readPreference, servers: secondaries)
450+
return self.filterReplicaSetServers(
451+
readPreference: readPreference,
452+
heartbeatFrequencyMS: heartbeatFrequencyMS,
453+
includePrimary: false
454+
)
434455
case .nearest:
435-
let secondariesAndPrimary = self.servers.filter { $0.type == .rsSecondary || $0.type == .rsPrimary }
436-
return self.filterReplicaSetServers(readPreference: readPreference, servers: secondariesAndPrimary)
456+
return self.filterReplicaSetServers(
457+
readPreference: readPreference,
458+
heartbeatFrequencyMS: heartbeatFrequencyMS,
459+
includePrimary: true
460+
)
437461
case .secondaryPreferred:
438462
// If mode is 'secondaryPreferred', attempt the selection algorithm with mode 'secondary' and the
439463
// user's maxStalenessSeconds and tag_sets. If no server matches, select the primary.
440-
let secondaries = self.servers.filter { $0.type == .rsSecondary }
441-
let primaries = self.servers.filter { $0.type == .rsPrimary }
442-
let matches = self.filterReplicaSetServers(readPreference: readPreference, servers: secondaries)
443-
return matches.isEmpty ? primaries : matches
464+
let secondaryMatches = self.filterReplicaSetServers(
465+
readPreference: readPreference,
466+
heartbeatFrequencyMS: heartbeatFrequencyMS,
467+
includePrimary: false
468+
)
469+
return secondaryMatches.isEmpty ? self.servers.filter { $0.type == .rsPrimary } : secondaryMatches
444470
case .primaryPreferred:
445471
// If mode is 'primaryPreferred' or a readPreference is not provided, select the primary if it is known,
446472
// otherwise attempt the selection algorithm with mode 'secondary' and the user's
@@ -449,8 +475,11 @@ extension TopologyDescription {
449475
if !primaries.isEmpty {
450476
return primaries
451477
}
452-
let secondaries = self.servers.filter { $0.type == .rsSecondary }
453-
return self.filterReplicaSetServers(readPreference: readPreference, servers: secondaries)
478+
return self.filterReplicaSetServers(
479+
readPreference: readPreference,
480+
heartbeatFrequencyMS: heartbeatFrequencyMS,
481+
includePrimary: false
482+
)
454483
default: // or .primary
455484
// the default mode is 'primary'.
456485
return self.servers.filter { $0.type == .rsPrimary }
@@ -460,13 +489,33 @@ extension TopologyDescription {
460489
}
461490
}
462491

463-
internal func filterReplicaSetServers(
492+
/// Filters the replica set servers in this topology first by max staleness and then by tag sets.
493+
private func filterReplicaSetServers(
464494
readPreference: ReadPreference?,
465-
servers: [ServerDescription]
495+
heartbeatFrequencyMS: Int,
496+
includePrimary: Bool
466497
) -> [ServerDescription] {
467-
// TODO: Filter out servers staler than maxStalenessSeconds
498+
// The initial set of servers from which to filter. Only include the secondaries unless includePrimary is true.
499+
var servers = self.servers.filter { ($0.type == .rsPrimary && includePrimary) || $0.type == .rsSecondary }
500+
501+
// Filter by max staleness. If maxStalenessSeconds is not configured as a positive number, all servers are
502+
// eligible.
503+
if let maxStalenessSeconds = readPreference?.maxStalenessSeconds, maxStalenessSeconds > 0 {
504+
let primary = self.servers.first { $0.type == .rsPrimary }
505+
let maxLastWriteDate = self.getMaxLastWriteDate()
506+
servers.removeAll {
507+
guard let staleness = $0.calculateStalenessSeconds(
508+
primary: primary,
509+
maxLastWriteDate: maxLastWriteDate,
510+
heartbeatFrequencyMS: heartbeatFrequencyMS
511+
) else {
512+
return false
513+
}
514+
return staleness > maxStalenessSeconds
515+
}
516+
}
468517

469-
// Filter by tag_sets
518+
// Filter by tag sets.
470519
guard let tagSets = readPreference?.tagSets else {
471520
return servers
472521
}
@@ -476,8 +525,86 @@ extension TopologyDescription {
476525
return matches
477526
}
478527
}
528+
529+
// If no matches were found during tag set filtering, return an empty list.
479530
return []
480531
}
532+
533+
/// Returns a `Date` representing the latest `lastWriteDate` configured on a secondary in the topology, or `nil`
534+
/// if none is found.
535+
private func getMaxLastWriteDate() -> Date? {
536+
let secondaryLastWriteDates = self.servers.compactMap {
537+
$0.type == .rsSecondary ? $0.lastWriteDate : nil
538+
}
539+
return secondaryLastWriteDates.max()
540+
}
541+
}
542+
543+
extension ServerDescription {
544+
/// Calculates the staleness of this server. If the server is not a secondary, the staleness is 0. Otherwise,
545+
/// compare against the primary if one is present, or the maximum last write date seen in the topology if present.
546+
/// If staleness cannot be calculated due to an absence of values, `nil` is returned.
547+
fileprivate func calculateStalenessSeconds(
548+
primary: ServerDescription?,
549+
maxLastWriteDate: Date?,
550+
heartbeatFrequencyMS: Int
551+
) -> Int? {
552+
guard self.type == .rsSecondary else {
553+
return 0
554+
}
555+
guard let lastWriteDate = self.lastWriteDate else {
556+
return nil
557+
}
558+
if let primary = primary {
559+
guard let primaryLastWriteDate = primary.lastWriteDate else {
560+
return nil
561+
}
562+
let selfInterval = self.lastUpdateTime.timeIntervalSince(lastWriteDate)
563+
let primaryInterval = primary.lastUpdateTime.timeIntervalSince(primaryLastWriteDate)
564+
// timeIntervalSince returns a TimeInterval in seconds, so heartbeatFrequencyMS needs to be converted from
565+
// milliseconds to seconds.
566+
let stalenessSeconds = selfInterval - primaryInterval + Double(heartbeatFrequencyMS) / 1000.0
567+
return Int(stalenessSeconds.rounded(.up))
568+
} else {
569+
guard let maxLastWriteDate = maxLastWriteDate else {
570+
return nil
571+
}
572+
let interval = maxLastWriteDate.timeIntervalSince(lastWriteDate)
573+
let stalenessSeconds = interval + Double(heartbeatFrequencyMS) / 1000.0
574+
return Int(stalenessSeconds.rounded(.up))
575+
}
576+
}
577+
}
578+
579+
extension ReadPreference {
580+
fileprivate func validateMaxStalenessSeconds(
581+
heartbeatFrequencyMS: Int,
582+
topologyType: TopologyDescription.TopologyType
583+
) throws {
584+
if let maxStalenessSeconds = self.maxStalenessSeconds {
585+
if self.mode == .primary && maxStalenessSeconds > 0 {
586+
throw MongoError.InvalidArgumentError(
587+
message: "A positive maxStalenessSeconds cannot be specified when the read preference mode is"
588+
+ " primary"
589+
)
590+
}
591+
if topologyType == .replicaSetWithPrimary || topologyType == .replicaSetNoPrimary {
592+
if maxStalenessSeconds * 1000 < heartbeatFrequencyMS + SDAMConstants.idleWritePeriodMS {
593+
throw MongoError.InvalidArgumentError(
594+
message: "maxStalenessSeconds must be at least the sum of the heartbeatFrequencyMS configured"
595+
+ " on the client (\(heartbeatFrequencyMS)) and the idleWritePeriodMS"
596+
+ " (\(SDAMConstants.idleWritePeriodMS))"
597+
)
598+
}
599+
if maxStalenessSeconds < SDAMConstants.smallestMaxStalenessSeconds {
600+
throw MongoError.InvalidArgumentError(
601+
message: "The maxStalenessSeconds configured for a replica set must be at least"
602+
+ " \(SDAMConstants.smallestMaxStalenessSeconds)"
603+
)
604+
}
605+
}
606+
}
607+
}
481608
}
482609

483610
extension ServerDescription {

Sources/TestsCommon/CodableExtensions.swift

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import Foundation
12
@testable import MongoSwift
23

34
/// Allows a type to specify a set of known keys and check whether any unknown top-level keys are found in a decoder.
@@ -146,9 +147,15 @@ extension ReadPreference: StrictDecodable {
146147
public init(from decoder: Decoder) throws {
147148
if let container = try? decoder.container(keyedBy: CodingKeys.self) {
148149
try Self.checkKeys(using: decoder)
149-
let mode = try container.decode(Mode.self, forKey: .mode)
150-
let tagSets = try container.decodeIfPresent([BSONDocument].self, forKey: .tagSets)
151-
try self.init(mode, tagSets: tagSets)
150+
// Some tests specify a read preference with no fields to indicate the default read preference (i.e.
151+
// primary). Because this is not representable in the Swift driver due to the mode field not being
152+
// optional, this sets the mode to be primary explicitly if one is not present.
153+
let mode = try container.decodeIfPresent(Mode.self, forKey: .mode) ?? Mode.primary
154+
self.init(mode)
155+
// The init method that takes in these fields also performs validation, so these fields are set manually to
156+
// allow decoding to succeed and ensure that validation occurs during server selection.
157+
self.tagSets = try container.decodeIfPresent([BSONDocument].self, forKey: .tagSets)
158+
self.maxStalenessSeconds = try container.decodeIfPresent(Int.self, forKey: .maxStalenessSeconds)
152159
} else { // sometimes the spec tests only specify the mode as a string
153160
let container = try decoder.singleValueContainer()
154161
let mode = try container.decode(ReadPreference.Mode.self)
@@ -157,7 +164,7 @@ extension ReadPreference: StrictDecodable {
157164
}
158165

159166
internal enum CodingKeys: String, CodingKey, CaseIterable {
160-
case mode, tagSets = "tag_sets"
167+
case mode, tagSets = "tag_sets", maxStalenessSeconds
161168
}
162169
}
163170

@@ -205,12 +212,40 @@ extension ServerDescription: StrictDecodable {
205212
let address = try ServerAddress(try values.decode(String.self, forKey: .address))
206213
let type = try values.decode(ServerType.self, forKey: .type)
207214
let tags = try values.decodeIfPresent([String: String].self, forKey: .tags) ?? [:]
208-
// TODO: SWIFT-1456: decode and set averageRoundTripTimeMS
215+
let maxWireVersion = try values.decodeIfPresent(Int.self, forKey: .maxWireVersion)
216+
217+
var lastUpdateTime: Date?
218+
if let lastUpdateTimeMS = try values.decodeIfPresent(Int64.self, forKey: .lastUpdateTime) {
219+
lastUpdateTime = Date(msSinceEpoch: lastUpdateTimeMS)
220+
}
209221

210-
self.init(address: address, type: type, tags: tags)
222+
// lastWriteDate is specified in a document in the form described in the error message below
223+
var lastWriteDate: Date?
224+
if let lastWrite = try values.decodeIfPresent(BSONDocument.self, forKey: .lastWrite) {
225+
guard let lastWriteDateMS = lastWrite["lastWriteDate"]?.int64Value else {
226+
throw DecodingError.dataCorruptedError(
227+
forKey: .lastWrite,
228+
in: values,
229+
debugDescription: "lastWrite should be specified in the form"
230+
+ " lastWrite: { lastWriteDate: { \"$numberLong\": value } }"
231+
)
232+
}
233+
lastWriteDate = Date(msSinceEpoch: lastWriteDateMS)
234+
}
235+
236+
// TODO: SWIFT-1461: decode and set averageRoundTripTimeMS
237+
238+
self.init(
239+
address: address,
240+
type: type,
241+
tags: tags,
242+
lastWriteDate: lastWriteDate,
243+
maxWireVersion: maxWireVersion,
244+
lastUpdateTime: lastUpdateTime
245+
)
211246
}
212247

213248
internal enum CodingKeys: String, CodingKey, CaseIterable {
214-
case address, type, tags, averageRoundTripTimeMS = "avg_rtt_ms"
249+
case address, type, tags, averageRoundTripTimeMS = "avg_rtt_ms", lastWrite, maxWireVersion, lastUpdateTime
215250
}
216251
}

0 commit comments

Comments
 (0)