Skip to content

Commit 98c87ef

Browse files
authored
Re-instate minimum connections after failed connection (#612)
1 parent 7900fed commit 98c87ef

File tree

5 files changed

+239
-34
lines changed

5 files changed

+239
-34
lines changed

Sources/ConnectionPoolModule/ConnectionPool.swift

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,13 +124,17 @@ public struct ConnectionPoolConfiguration: Sendable {
124124
/// pool before it is closed.
125125
public var idleTimeout: Duration
126126

127+
/// Maximum number of in-progress new connection requests to run at any one time
128+
public var maximumConcurrentConnectionRequests: Int
129+
127130
/// initializer
128131
public init() {
129132
self.minimumConnectionCount = 0
130133
self.maximumConnectionSoftLimit = 16
131134
self.maximumConnectionHardLimit = 16
132135
self.circuitBreakerTripAfter = .seconds(60)
133136
self.idleTimeout = .seconds(60)
137+
self.maximumConcurrentConnectionRequests = 20
134138
}
135139
}
136140

@@ -380,6 +384,15 @@ public final class ConnectionPool<
380384
self.cancelTimers(timers)
381385
self.eventContinuation.yield(.makeConnection(request))
382386

387+
case .makeConnectionsCancelAndScheduleTimers(let requests, let cancelledTimers, let scheduledTimers):
388+
self.cancelTimers(cancelledTimers)
389+
for request in requests {
390+
self.eventContinuation.yield(.makeConnection(request))
391+
}
392+
for timer in scheduledTimers {
393+
self.eventContinuation.yield(.scheduleTimer(timer))
394+
}
395+
383396
case .runKeepAlive(let connection, let cancelContinuation):
384397
cancelContinuation?.resume(returning: ())
385398
self.eventContinuation.yield(.runKeepAlive(connection))
@@ -581,6 +594,7 @@ extension PoolConfiguration {
581594
self.keepAliveDuration = keepAliveBehavior.keepAliveFrequency
582595
self.idleTimeoutDuration = configuration.idleTimeout
583596
self.circuitBreakerTripAfter = configuration.circuitBreakerTripAfter
597+
self.maximumConcurrentConnectionRequests = configuration.maximumConcurrentConnectionRequests
584598
}
585599
}
586600

Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,7 @@ extension PoolStateMachine {
588588
preconditionFailure("Failing a connection we don't have a record of.")
589589
}
590590

591+
self.stats.connecting -= 1
591592
self.connections[index].destroyFailedConnection()
592593
return self.swapForDeletion(index: index)
593594
}

Sources/ConnectionPoolModule/PoolStateMachine.swift

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ struct PoolConfiguration: Sendable {
3232

3333
@usableFromInline
3434
var idleTimeoutDuration: Duration = .seconds(30)
35+
36+
@usableFromInline
37+
var maximumConcurrentConnectionRequests: Int = 20
3538
}
3639

3740
@usableFromInline
@@ -90,6 +93,7 @@ struct PoolStateMachine<
9093

9194
case scheduleTimers(Max2Sequence<Timer>)
9295
case makeConnection(ConnectionRequest, TinyFastSequence<TimerCancellationToken>)
96+
case makeConnectionsCancelAndScheduleTimers(TinyFastSequence<ConnectionRequest>, TinyFastSequence<TimerCancellationToken>, Max2Sequence<Timer>)
9397
case runKeepAlive(Connection, TimerCancellationToken?)
9498
case cancelTimers(TinyFastSequence<TimerCancellationToken>)
9599
case closeConnection(Connection, Max2Sequence<TimerCancellationToken>)
@@ -312,6 +316,12 @@ struct PoolStateMachine<
312316
request: requestAction,
313317
connection: .none
314318
)
319+
} else if self.connections.stats.connecting >= self.configuration.maximumConcurrentConnectionRequests {
320+
// We have too many connection requests, lets delay creating any new connections
321+
return .init(
322+
request: requestAction,
323+
connection: .none
324+
)
315325
} else if let request = self.connections.createNewDemandConnectionIfPossible() {
316326
// Can we create a demand connection
317327
return .init(
@@ -673,9 +683,20 @@ struct PoolStateMachine<
673683
let requests = self.requestQueue.pop(max: availableContext.info.availableStreams)
674684
if !requests.isEmpty {
675685
let leaseResult = self.connections.leaseConnection(at: index, streams: UInt16(requests.count))
686+
let connectionsRequired: Int
687+
if self.requestQueue.count <= self.connections.stats.availableStreams + self.connections.stats.leasedStreams {
688+
connectionsRequired = self.configuration.minimumConnectionCount - Int(self.connections.stats.active)
689+
} else {
690+
connectionsRequired = 1
691+
}
692+
let connectionAction = self.createMultipleConnectionsAction(
693+
connectionsRequired,
694+
cancelledTimers: .init(leaseResult.timersToCancel),
695+
scheduledTimers: []
696+
) ?? .cancelTimers(.init(leaseResult.timersToCancel))
676697
return .init(
677698
request: .leaseConnection(requests, leaseResult.connection),
678-
connection: .cancelTimers(.init(leaseResult.timersToCancel))
699+
connection: connectionAction
679700
)
680701
}
681702

@@ -704,10 +725,13 @@ struct PoolStateMachine<
704725
}
705726
let timers = self.connections.parkConnection(at: index, hasBecomeIdle: newIdle).map(self.mapTimers)
706727

707-
return .init(
708-
request: .none,
709-
connection: .scheduleTimers(timers)
710-
)
728+
let connectionsRequired = self.configuration.minimumConnectionCount - Int(self.connections.stats.active)
729+
let connectionAction = self.createMultipleConnectionsAction(
730+
connectionsRequired,
731+
cancelledTimers: [],
732+
scheduledTimers: timers
733+
) ?? .scheduleTimers(timers)
734+
return .init(request: .none, connection: connectionAction)
711735
}
712736

713737
case .overflow:
@@ -723,6 +747,31 @@ struct PoolStateMachine<
723747

724748
}
725749

750+
@inlinable
751+
/* private */ mutating func createMultipleConnectionsAction(
752+
_ connectionCount: Int,
753+
cancelledTimers: TinyFastSequence<TimerCancellationToken>,
754+
scheduledTimers: Max2Sequence<Timer>
755+
) -> ConnectionAction? {
756+
let connectionCountLimitedByNumberOfRequests = min(
757+
connectionCount,
758+
self.configuration.maximumConcurrentConnectionRequests - Int(self.connections.stats.connecting)
759+
)
760+
let connectionCountLimitedByHardLimit = min(
761+
connectionCountLimitedByNumberOfRequests,
762+
self.configuration.maximumConnectionHardLimit - Int(self.connections.stats.active)
763+
)
764+
guard connectionCountLimitedByHardLimit > 0 else { return nil }
765+
766+
var connectionRequests = TinyFastSequence<ConnectionRequest>()
767+
connectionRequests.reserveCapacity(connectionCountLimitedByHardLimit)
768+
for _ in 0..<connectionCountLimitedByHardLimit {
769+
connectionRequests.append(self.connections.createNewConnection())
770+
}
771+
return .makeConnectionsCancelAndScheduleTimers(connectionRequests, cancelledTimers, scheduledTimers)
772+
773+
}
774+
726775
@inlinable
727776
/* private */ func mapTimers(_ connectionTimer: ConnectionTimer) -> Timer {
728777
switch connectionTimer.usecase {
@@ -796,9 +845,6 @@ extension PoolStateMachine {
796845
@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
797846
extension PoolStateMachine.Action: Equatable where TimerCancellationToken: Equatable, Request: Equatable {}
798847

799-
@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
800-
//extension PoolStateMachine.PoolState: Equatable {}
801-
802848
@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
803849
extension PoolStateMachine.ConnectionAction: Equatable where TimerCancellationToken: Equatable {
804850
@usableFromInline
@@ -808,6 +854,9 @@ extension PoolStateMachine.ConnectionAction: Equatable where TimerCancellationTo
808854
return lhs == rhs
809855
case (.makeConnection(let lhsRequest, let lhsToken), .makeConnection(let rhsRequest, let rhsToken)):
810856
return lhsRequest == rhsRequest && lhsToken == rhsToken
857+
case (.makeConnectionsCancelAndScheduleTimers(let lhsRequests, let lhsTokens, let lhsTimers),
858+
.makeConnectionsCancelAndScheduleTimers(let rhsRequests, let rhsTokens, let rhsTimers)):
859+
return lhsRequests == rhsRequests && lhsTokens == rhsTokens && lhsTimers == rhsTimers
811860
case (.runKeepAlive(let lhsConn, let lhsToken), .runKeepAlive(let rhsConn, let rhsToken)):
812861
return lhsConn === rhsConn && lhsToken == rhsToken
813862
case (.closeConnection(let lhsConn, let lhsTimers), .closeConnection(let rhsConn, let rhsTimers)):

Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1044,7 +1044,7 @@ import Testing
10441044
}
10451045

10461046
@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
1047-
@Test func testConnectionTimeout() async throws {
1047+
@Test func testCircuitBreaker() async throws {
10481048
struct ConnectionFailedError: Error {}
10491049
let clock = MockClock()
10501050
let factory = MockConnectionFactory<MockClock>()

0 commit comments

Comments
 (0)